# Home sensors

## Scenario

We have decided to setup a couple of sensors in our home to inspect the quality of life in our home.
There are multiple devices, equipped with different sensors.
The devices send out data once per hour, and the sensors take measurements more frequently.
We have collected the data in a single `.txt` file.


## Data model
The data is presented with the following format:

```json
{
  "id": "id of the device",
  "timestamp_ms": "unix timestamp in milliseconds of the hour the measurements were collected",
  "data": [
    {
      "comment": "sensor specific data, e.g. temperature. The data is an array because the device can have more than a single sensor",
      "device_id": "id of the device",
      "type": "sensor type: temperature, light, contact, humidity, air quality, motion",
      "measurement": "<sensor measurement",
      "unit": "unit of the measurement, e.g. Celsius degrees",
      "timestamp_ms": "unix timestamp in milliseconds when the measurement was collected"
    }
  ]
}
```

It is stored in a `JSON` newline delimited format, you can inspect it [here](./data/sensor_data.txt).
Moreover, we have an additional `.csv` file containing the location of each device, available [here](./data/device_locations.csv).

## Task

Organize the data, extract information from it to estimate the quality of life in our home.

Extra task: draw the ground plan of the flat/house the devices are in.


In [1]:
# some obligatory stuff: creating a SparkContext, importing some utils to pretty display the dataframes
import json
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.sql import SparkSession, DataFrame
from IPython.display import display

conf = SparkConf().setAppName("HomeSensors").setMaster("local[1]")

sc = SparkContext.getOrCreate(conf=conf)
sqlContext = SQLContext(sc)
ss = SparkSession.builder.getOrCreate()

# enables pretty print with display()
ss.conf.set("spark.sql.repl.eagerEval.enabled", True)

First of all, we need to read the data. Once read, we get a list of all lines in the source file.

In [2]:
data = sc.textFile("./data/sensor_data.txt")

count = data.cache().count()
print(f"Data count: {count}")

Data count: 250


In [3]:
lines = data.collect()
lines = '\n'.join(lines[:3])
print(f"A couple of lines of data:\n{lines}")

A couple of lines of data:
{"id": "temp1", "data": [{"device_id": "temp1", "unit": "celsius", "type": "temperature", "measurement": 22.671717271141567, "timestamp_ms": 1598922000.0}, {"device_id": "temp1", "unit": "celsius", "type": "temperature", "measurement": 22.427013031545382, "timestamp_ms": 1598922900.0}, {"device_id": "temp1", "unit": "celsius", "type": "temperature", "measurement": 23.21266136616038, "timestamp_ms": 1598923800.0}, {"device_id": "temp1", "unit": "celsius", "type": "temperature", "measurement": 22.280158977203943, "timestamp_ms": 1598924700.0}], "timestamp_ms": 1598922000.0}
{"id": "temp2", "data": [{"device_id": "temp2", "unit": "celsius", "type": "temperature", "measurement": 14.348216499635889, "timestamp_ms": 1598922000.0}, {"device_id": "temp2", "unit": "celsius", "type": "temperature", "measurement": 14.968794892912364, "timestamp_ms": 1598922900.0}, {"device_id": "temp2", "unit": "celsius", "type": "temperature", "measurement": 14.271602819920133, "timest

So far, we have only been working with string data.
Let's try to convert it into `JSON`, since the data is already in `JSON` format:


In [4]:
json_data = data.map(lambda x: json.loads(x))
json_data.cache().count()

250

In [5]:
json_lines = json_data.take(1)
print(f"Type of the data: {type(json_lines[0])}")
print(f"JSON data: {json_lines[0]}")

Type of the data: <class 'dict'>
JSON data: {'id': 'temp1', 'data': [{'device_id': 'temp1', 'unit': 'celsius', 'type': 'temperature', 'measurement': 22.671717271141567, 'timestamp_ms': 1598922000.0}, {'device_id': 'temp1', 'unit': 'celsius', 'type': 'temperature', 'measurement': 22.427013031545382, 'timestamp_ms': 1598922900.0}, {'device_id': 'temp1', 'unit': 'celsius', 'type': 'temperature', 'measurement': 23.21266136616038, 'timestamp_ms': 1598923800.0}, {'device_id': 'temp1', 'unit': 'celsius', 'type': 'temperature', 'measurement': 22.280158977203943, 'timestamp_ms': 1598924700.0}], 'timestamp_ms': 1598922000.0}


OK, now that we have `JSON`s we can continue with our extraction.

We would like to extract all measurements from each data emission,
so that we can inspect the data more closely.

NOTE: The data is already nested, so we need to "unnest" it.

In [6]:
sensor_data_json = json_data.flatMap(lambda x: x["data"]) # [[1, 2, 3], [4, 5, 6], [7, 8, 9]] -> [1, 2, 3, 4, 5, 6, 7, 8, 9]
sensor_measurements = sensor_data_json.cache().count()

print(f"Amount of sensor measurements: {sensor_measurements}")

Amount of sensor measurements: 717


In [7]:
sample_data = "\n".join([json.dumps(x) for x in sensor_data_json.take(3)])
print(f"Sample data:\n{sample_data}")

Sample data:
{"device_id": "temp1", "unit": "celsius", "type": "temperature", "measurement": 22.671717271141567, "timestamp_ms": 1598922000.0}
{"device_id": "temp1", "unit": "celsius", "type": "temperature", "measurement": 22.427013031545382, "timestamp_ms": 1598922900.0}
{"device_id": "temp1", "unit": "celsius", "type": "temperature", "measurement": 23.21266136616038, "timestamp_ms": 1598923800.0}


Now that we have the JSON data, let's transform it into a data frame
so that we can perform some queries on it:

In [8]:
# additional required dependencies
from pyspark.sql.types import Row, TimestampType

# Row(**dict(l)) => automagically creates a row
sensor_data_df = sensor_data_json.map(lambda l: Row(**dict(l))).toDF()

display(sensor_data_df)

device_id,unit,type,measurement,timestamp_ms
temp1,celsius,temperature,22.671717271141567,1598922000.0
temp1,celsius,temperature,22.427013031545386,1598922900.0
temp1,celsius,temperature,23.21266136616038,1598923800.0
temp1,celsius,temperature,22.280158977203943,1598924700.0
temp2,celsius,temperature,14.348216499635887,1598922000.0
temp2,celsius,temperature,14.968794892912364,1598922900.0
temp2,celsius,temperature,14.271602819920131,1598923800.0
temp2,celsius,temperature,13.578581273189716,1598924700.0
temp3,celsius,temperature,15.4336063013519,1598922000.0
temp3,celsius,temperature,15.73280028139962,1598922900.0


Notice that the `timestamp_ms` is a float value of the timestamp,
and it is practically unreadable since it is denoting the unix epoch.

Let's try to fix this problem:

In [9]:
from pyspark.sql.functions import from_unixtime

def to_timestamp(df: DataFrame):
    # let's create a new field, `timestamp`, and assign it the value of `timestamp_ms`
    # additionally, we drop the old `timestamp_ms` field
    df = df.\
            withColumn("timestamp", from_unixtime(f'timestamp_ms').\
            cast(TimestampType()))
    df = df.drop(f'timestamp_ms')
    return df

In [10]:
sensor_data = to_timestamp(sensor_data_df)

# we need to register the data frame, so SparkSQL will know how to refer to it in the SQL below
sqlContext.registerDataFrameAsTable(sensor_data, "sensorData")

display(sensor_data)

device_id,unit,type,measurement,timestamp
temp1,celsius,temperature,22.671717271141567,2020-09-01 03:00:00
temp1,celsius,temperature,22.427013031545386,2020-09-01 03:15:00
temp1,celsius,temperature,23.21266136616038,2020-09-01 03:30:00
temp1,celsius,temperature,22.280158977203943,2020-09-01 03:45:00
temp2,celsius,temperature,14.348216499635887,2020-09-01 03:00:00
temp2,celsius,temperature,14.968794892912364,2020-09-01 03:15:00
temp2,celsius,temperature,14.271602819920131,2020-09-01 03:30:00
temp2,celsius,temperature,13.578581273189716,2020-09-01 03:45:00
temp3,celsius,temperature,15.4336063013519,2020-09-01 03:00:00
temp3,celsius,temperature,15.73280028139962,2020-09-01 03:15:00


So far we have flattened the measurements, but we have lost the data when the emissions were ocurring.
To get it, we need to remove the measurements from the original `JSON` objects

In [11]:
def remove_property(data: dict, to_delete: str = "data"):
    """
    This method takes a dictionary object and removes the `to_delete` property from it
    """
    del data[to_delete]
    return data

In [12]:
emission_data_df = json_data.map(remove_property).map(lambda l: Row(**dict(l))).toDF()
emission_data_df = to_timestamp(emission_data_df)

display(emission_data_df)

sqlContext.registerDataFrameAsTable(emission_data_df, "emissionData")


id,timestamp
temp1,2020-09-01 03:00:00
temp2,2020-09-01 03:00:00
temp3,2020-09-01 03:00:00
weather1,2020-09-01 03:00:00
weather2,2020-09-01 03:00:00
weather3,2020-09-01 03:00:00
contact1,2020-09-01 03:00:00
contact2,2020-09-01 03:00:00
air1,2020-09-01 03:00:00
air2,2020-09-01 03:00:00


In addition to the measurements data we also have the device location data.
Let's load it as well:

In [13]:
from pyspark.sql.types import StructType, StructField, StringType

# one of the ways how to load a csv file in Spark:
device_locations_df = ss.read.csv(
    "data/device_locations.csv",
    header=True,
    mode="DROPMALFORMED",
    # we need to provide the schema of the csv file:
    schema=StructType([
        StructField("device_id", StringType(), True),
        StructField("location", StringType(), True)
    ])
)
sqlContext.registerDataFrameAsTable(device_locations_df, "deviceLocations")
display(device_locations_df)

device_id,location
temp1,living_room
temp2,master_bedroom
temp3,hallway
weather1,living_room
weather2,bathroom
weather3,master_bedroom
contact1,balcony
contact2,hallway
air1,master_bedroom
air2,living_room


With all the data loaded, we can now start exploring it.

Let's start by extracting all the humidity measurements from the `weather1` device.
Order the results by their `timestamp` in ascending order.

In [14]:
temps = sqlContext.sql("""
    SELECT *
    FROM sensorData
    -- add the constraints:
    WHERE type = 'humidity' AND device_id = 'weather1'
    ORDER BY timestamp
    LIMIT 10
""")

display(temps)

device_id,unit,type,measurement,timestamp
weather1,%,humidity,99.19263396239432,2020-09-01 03:00:00
weather1,%,humidity,99.19263396239432,2020-09-01 04:00:00
weather1,%,humidity,98.61409519508864,2020-09-01 05:00:00
weather1,%,humidity,98.61409519508864,2020-09-01 06:00:00
weather1,%,humidity,97.76089772702514,2020-09-01 07:00:00
weather1,%,humidity,98.37805736392949,2020-09-01 08:00:00
weather1,%,humidity,98.29061885744647,2020-09-01 09:00:00
weather1,%,humidity,98.27012097648674,2020-09-01 10:00:00
weather1,%,humidity,98.27012097648674,2020-09-01 11:00:00
weather1,%,humidity,98.10923661723136,2020-09-01 12:00:00


Let's try to join 2 data frames.

We would like to know where the `weather1` device from the previous query is located.

For that purpose we need to join the `sensorData` and `deviceLocations` data frames.

In [15]:
joinDFs = sqlContext.sql("""
    SELECT *
    FROM sensorData
    -- perform the join:
    INNER JOIN deviceLocations ON sensorData.device_id = deviceLocations.device_id
    WHERE type = 'humidity' AND sensorData.device_id = 'weather1'
    LIMIT 10
""")

display(joinDFs)

device_id,unit,type,measurement,timestamp,device_id.1,location
weather1,%,humidity,99.19263396239432,2020-09-01 03:00:00,weather1,living_room
weather1,%,humidity,99.19263396239432,2020-09-01 04:00:00,weather1,living_room
weather1,%,humidity,98.61409519508864,2020-09-01 05:00:00,weather1,living_room
weather1,%,humidity,98.61409519508864,2020-09-01 06:00:00,weather1,living_room
weather1,%,humidity,97.76089772702514,2020-09-01 07:00:00,weather1,living_room
weather1,%,humidity,98.37805736392949,2020-09-01 08:00:00,weather1,living_room
weather1,%,humidity,98.29061885744647,2020-09-01 09:00:00,weather1,living_room
weather1,%,humidity,98.27012097648674,2020-09-01 10:00:00,weather1,living_room
weather1,%,humidity,98.27012097648674,2020-09-01 11:00:00,weather1,living_room
weather1,%,humidity,98.10923661723136,2020-09-01 12:00:00,weather1,living_room


We would like to know what is the count of measurements per device, with the following output:

```
<device_id>, <number of measurements>
```

In [16]:
groupByExample = sqlContext.sql("""
    SELECT
        device_id DeviceId,
        COUNT(*) NumberOfMeasurements
    FROM sensorData
    GROUP BY device_id
    ORDER BY DeviceId
""")

display(groupByExample)

DeviceId,NumberOfMeasurements
air1,25
air2,25
contact1,72
contact2,70
temp1,100
temp2,100
temp3,100
weather1,75
weather2,75
weather3,75


Try and figure out the answers to the following questions:
1. What are the min, max, mean temperature per room in our home?
2. What is the longest that a door has been opened?
3. How many sensors does each device have?
4. Which location has the best air quality?
5. Does the data make any sense to you? Why?