<img src="http://drive.google.com/uc?export=view&id=1tpOCamr9aWz817atPnyXus8w5gJ3mIts" width=500px>

Proprietary content. © Great Learning. All Rights Reserved. Unauthorized use or distribution prohibited.



## <font color='blue'> Table Of Contents </font> 

<font color='blue'>
    
- ### Introduction to Lab 3
- ### Modifying data collection
- ### Flattening the objects
- ### Devices List
- ### Calculating the aggregate
- ### Creating alerts
</font>

## <font color='blue'> Introduction to Lab 3</font>


**Overview**


In this lab, you will work towards performing some restructuing and analysis on the data acquired and stored

**Objectives**

After completing this lab, you will be able to:
- Architect the data generation and storage process to perform analytics
- Develop a program that runs periodically to detect and store the list of devices 
- Perform basic analysis on the previous day's data and store the summary
- Create alerts based on the defined rules

**Duration**

This lab requires approximately **60 minutes** to complete.

---

## <font color='blue'>Modifying data collection</font>

In order to perform analytics on the huge amount of data generated by the IoT devices that grows massively everyday, it is helpful to perform some kind of batch processing on the latest data in a periodic manner. Say, storing the aggregate values of each type of sensor in a given location on the previous day.

This requires the storage of all the datapoints that allows filtering by date and time, such that we can perform analytics on the data that was generated the previous day

In the Lab 2, the `subscribe.py` program collected the messages from the MQTT broker and inserted it to a database collection. The current structure of this collection has only 3 fields
* _id
* topic
* payload

In an actual use case when this database collection collects several weeks of information, it gets challenging to perform any analytics based on the timestamp of a datapoint. Thus, we need an additional field representing the date and time during when the datapoint generated. 

When we look at the payload field in the database collection, we find the date and time information available in every datapoint

```json
{
    '_id': ObjectId('60066dbd11fd0a6a7d6d5d4e'), 
    'topic': 'devices/temp', 
    'payload': b'{"timestamp": "2021-01-19 10:57:24", 
                    "device_id": "temperature_1", 
                    "device_type": "temperature", 
                    "value": 23.24}'
}
```


But we need this to be present as a field so we can perform filtering based on the date and time, and perform analytics for a given day or any other time period.

In order to achieve this, we will modify the `on_message` callback function in the Lab 2's `subscribe.py` program as follows

In [None]:
#Callback function - executed whenever a message is published to the topics that 
#this program is subscribed to
def on_message(client, userdata, msg):
    item = {"topic":msg.topic, "payload":msg.payload, "timestamp":json.loads(msg.payload)["timestamp"]}
    dbt.insert_one(item)
    print("Received a messsage on " + msg.topic + " and inserted it to the DB")

Since the existing database collection has entries that are not timestamped, we will insert the timestamped entries into a new database collection `iot-sensors-data-timestamped`

You can make this change in the `config.json` file of Lab 2 by modifying the `db_collection` field as follows


```json
    ...
    "db_collection":"iot-sensors-data-timestamped"
    ...

```

Also the standard format to work with timestamp values (such as to fetch entries greater than or less than a given date and time) in MongoDB requires the timestamp value to be stored in **YYYY-MM-DDTHH:MM:SSZ** format, whereas our publish simulator is missing the characters **T** and **Z** in its timestamp.

So we change the change the timestamp field's format in Lab 1's simulator program `publish.py` as follows

```python
    ...
    message["timestamp"] = datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%SZ")
    ...
```

This modifies each entry into the database collection as follows

```json
{
    '_id': ObjectId('600911b7156c72bf358e299d'), 
    'topic': 'devices/temp', 
    'payload': b'{"timestamp": "2021-01-21T11:01:35Z", 
                    "device_id": "temperature_1", 
                    "device_type": "temperature", 
                    "value": 22.94}', 
    'timestamp': '2021-01-21 11:01:35'}
```

These changes are reflected in the companion Source-Code notebook in the appropriate section

Once the above changes are implemented, we can then filter the data based on the timestamps by adding an attribute to the `db.collection.find` function. This change is incorporated in the query program as follows

In [None]:
import pymongo
import json

#Reading the configuration file
f=open("config.json")
config = json.loads(f.read())
f.close()

#Initializing connection to the database
dbclient = pymongo.MongoClient(config["db_host"], config["db_port"])
db = dbclient[config["db_name"]]
dbt = db[config["db_collection"]]

#Querying for the messages that were published to the `devices/temp` topic, on 01 Jan 2021 
entries = dbt.find({"topic":"devices/temp", \
                    "timestamp":  {"$gte": "2021-01-01T00:00:00.000Z", \
                                   "$lt" : "2021-01-02T00:00:00.000Z"}})

#Print the entries
for entry in entries:
    print(entry)

This results in an output similar to the following

```bash
{'_id': ObjectId('600927b4d3411bee51a7bcae'), 'topic': 'devices/temp', 'payload': b'{"timestamp": "2021-01-21T12:35:24Z", "device_id": "temperature_0", "device_type": "temperature", "value": 25.33}', 'timestamp': '2021-01-21T12:35:24Z'}
{'_id': ObjectId('600927b4d3411bee51a7bcaf'), 'topic': 'devices/temp', 'payload': b'{"timestamp": "2021-01-21T12:35:24Z", "device_id": "temperature_1", "device_type": "temperature", "value": 21.4}', 'timestamp': '2021-01-21T12:35:24Z'}
```

Make sure you change the timestamp filter conditions to suit your needs

---

## <font color='blue'>Flattening the objects</font>

The above exercise modified the data collection such that the raw data in `iot-sensors-data-timestamped` collection contains the messages published to the broker from any device at any point of time.

In this section, we develop a program that filters only the entries from the previous day and stores it in a new collection. This is required to perform batch processing only on the previous day's data and storing it in another collection

Here is the walkthrough of a python program to do the same.

**Step 1:** Obtain the timestamp for today and previous day

In [None]:
import datetime

today = datetime.date.today() 
# datetime.date(2021, 1, 15)

prevday = today-datetime.timedelta(days=1)
#datetime.date(2021, 1, 14)

prevday = prevday.strftime("%Y-%m-%dT00:00:00Z")
#'2021-01-14T00:00:00Z'

today = today.strftime("%Y-%m-%dT00:00:00Z")
#'2021-01-15T00:00:00Z'

**Step 2:** Use the program from previous section to fetch the objects from `iot-sensors-data-timestamped` collection

In [None]:
import pymongo
import json

#Initializing connection to the database
dbclient = pymongo.MongoClient("localhost", 27017)
db = dbclient["iot-db"]
dbt = db["iot-sensors-data-timestamped"]

#Querying for the messages that were published the previous day
prevday_entries = dbt.find({"timestamp":  {"$gte": prevday, "$lt" : today}})

**Step 3:** In order to perform analysis on various fields of every object, it is better to flatten the objects. We know that only the **_payload_** field has a nested object in it. So we use the following snippet to flatten it and delete the payload field from the datapoint.

Then we insert it to the flattened database collection `iot-sensors-data-flattened`

In [None]:
# Define the collection to store previous day entries
pde_flattened = db["iot-sensors-data-flattened"]

# Iterate through every datapoint
for entry in prevday_entries:
    # Load the value of payload field as json
    payload = json.loads(entry["payload"])
    
    #Iterate through every field in the payload and add it to the datapoint
    for field in payload:
        entry[field] = payload[field]
    
    # Delete the payload field since it has been flattened
    del entry["payload"]
    
    # Insert the flattened datapoint to collection
    pde_flattened.insert_one(entry)

This programs converts the datapoints from

```json
{
    '_id': ObjectId('600927b4d3411bee51a7bcaf'), 
     'topic': 'devices/temp', 
     'payload': b'{"timestamp": "2021-01-21T12:35:24Z", 
                    "device_id": "temperature_1", 
                    "device_type": "temperature", 
                    "value": 21.4}', 
     'timestamp': '2021-01-21T12:35:24Z'}

```

to

```json
{
    '_id': ObjectId('600927b4d3411bee51a7bcaf'), 
    'topic': 'devices/temp', 
    'timestamp': '2021-01-21T12:35:24Z', 
    'device_id': 'temperature_1', 
    'device_type': 'temperature', 
    'value': 21.4}
```

and stores it in the `iot-sensors-data-flattened` collection

## <font color='blue'>Devices List collection</font>

Although in reality, the device registration happens as the first step of onboarding a device to the IoT system, we presume that we do not have an onboarding process. However, we are interested in maintaining a record of all the devices that has published messages to the MQTT broker.

In order to achieve this, we develop a python program that is scheduled to execute once in a day that scans all objects in the `iot-sensors-data-flattened` collection, finding the unique set of **device_id** fields and storing it in `iot-devices-list` collection

**Step 1:** Load the objects from `iot-sensors-data-flattened` collection


In [None]:
import pymongo
import json

#Initializing connection to the database
dbclient = pymongo.MongoClient("localhost", 27017)
db = dbclient["iot-db"]
pde_flattened = db["iot-sensors-data-flattened"]

entries = pde_flattened.find()

**Step 2:** Load the list of existing devices from the `iot-devices-list` collection

In [None]:
db_devices_list = db["iot-devices-list"]

#Loading the devices list
devices = db_devices_list.find()

#Extracting the device_ids in a list
existing_devices = []
for device in devices:
    existing_devices.append(device["device_id"])

**Step 3:** Iterate through the previous day's objects to find any unique `device_id` and their types

In [None]:
for entry in entries:
    if entry["device_id"] not in existing_devices:
        db_devices_list.insert_one({"device_id":entry["device_id"], "device_type":entry["device_type"]})
        existing_devices.append(entry["device_id"])

Execution of this program adds the new devices to the `iot-devices-list` collection and in this case, the collection will have the following objects

```json
{ "_id" : ObjectId("600a1d4f715dd224c2d2dfd6"), "device_id" : "temperature_0", "device_type" : "temperature" }
{ "_id" : ObjectId("600a1d4f715dd224c2d2dfd7"), "device_id" : "temperature_1", "device_type" : "temperature" }
{ "_id" : ObjectId("600a1d4f715dd224c2d2dfd8"), "device_id" : "co2_0", "device_type" : "co2" }
{ "_id" : ObjectId("600a1d4f715dd224c2d2dfd9"), "device_id" : "humidity_0", "device_type" : "humidity" }
{ "_id" : ObjectId("600a1d4f715dd224c2d2dfda"), "device_id" : "humidity_1", "device_type" : "humidity" }
```

To list the devices from the `iot-devices-list` collection, you may simply execute the following program

In [None]:
import pymongo
import json

#Initializing connection to the database
dbclient = pymongo.MongoClient("localhost", 27017)
db = dbclient["iot-db"]
db_devices_list = db["iot-devices-list"]

#Loading the devices list
entries = db_devices_list.find()

for entry in entries:
    print("Device ID: " + entry["device_id"] + " | Device Type: " + entry["device_type"])

Response:

```
Device ID: temperature_0 | Device Type: temperature
Device ID: temperature_1 | Device Type: temperature
Device ID: co2_0 | Device Type: co2
Device ID: humidity_0 | Device Type: humidity
Device ID: humidity_1 | Device Type: humidity
```

## <font color='blue'>Calculating the aggregate</font>

Calculating the aggregate, maximum and minimum value of every device is one of the basic analysis we can perform on a previous day's data and store it in a collection, say `daily-summary`

**Step 1:** Fetch the devices list from the `iot-devices-list` collection

In [None]:
import pymongo
import json

#Initializing connection to the database and fetching the list of devices
dbclient = pymongo.MongoClient("localhost", 27017)
db = dbclient["iot-db"]
db_devices_list = db["iot-devices-list"]
devices = db_devices_list.find()

**Step 2:** Initialize an object to store the values of every IoT device available in the _devices-list_

In [None]:
# Initializing a *values* list to capture data from previous day's entries
values= {}
for device in devices:
    values[device["device_id"]] = {}
    values[device["device_id"]]["values"] = []

**Step 3:** Fetch the datapoints from all the devices using the `iot-sensors-data-flattened` collection that stores the previous day data and storing the values in approporiate devices field

In [None]:
# Fetching the previous day iot-devices data
pde_flattened = db["iot-sensors-data-flattened"]
entries = pde_flattened.find()

# Adding the iot devices data to the appropriate device_id list
for entry in entries:
    values[entry["device_id"]]["values"].append(entry["value"])

**Step 4:** Iterate through the devices list and calculate the minimum, maximum, count and aggregate value of each IoT device

In [None]:
# Fetch the list of devices
devices = db_devices_list.find()

# Iterate through devices list and calculate the min, max and aggregate
for device in devices:
    values[device["device_id"]]["min_value"] = min(values[device["device_id"]]["values"])
    values[device["device_id"]]["max_value"] = max(values[device["device_id"]]["values"])
    values[device["device_id"]]["count"] = len(values[device["device_id"]]["values"])
    values[device["device_id"]]["agg_value"] = round(sum(values[device["device_id"]]["values"])\
        /values[device["device_id"]]["count"],2)
    values[device["device_id"]]["device_type"] = device["device_type"]
    # Delete the raw values as we don't need it in the daily summary
    del values[device["device_id"]]["values"]

**Step 5:** Calculate the previous day's date

In [None]:
# Calculate previous day's day in YYYY-MM-DD format
import datetime
previousday = datetime.date.today() - datetime.timedelta(days=1)
previousday = previousday.strftime("%Y-%m-%d")

**Step 6:** Insert the summary with respect to previous day's date in the `daily-summary` collection

In [None]:
# Insert the summary into the *daily-summary* collection
dailysummary = db["daily-summary"]
dailysummary.insert_one({"date": previousday, "devices": values})

Execution of these snippets will insert an object into the `daily-summary` collection as follows

```json
{ 
    "_id" : ObjectId("600a53ebb145b12ce7f34377"), 
    "date" : "2021-01-22", 
    "devices" : { 
        "temperature_0" : { "min_value" : 23.46, "max_value" : 26.76, "count" : 3, "agg_value" : 25.27, "device_type" : "temperature" }, 
        "temperature_1" : { "min_value" : 25.24, "max_value" : 30.13, "count" : 3, "agg_value" : 27.1, "device_type" : "temperature" }, 
        "co2_0" : { "min_value" : 16.58, "max_value" : 21.82, "count" : 3, "agg_value" : 19.93, "device_type" : "co2" }, 
        "humidity_0" : { "min_value" : 38.3, "max_value" : 38.3, "count" : 1, "agg_value" : 38.3, "device_type" : "humidity" }, 
        "humidity_1" : { "min_value" : 42.2, "max_value" : 42.2, "count" : 1, "agg_value" : 42.2, "device_type" : "humidity" } 
    } 
}
```

To view the daily summary, you can use the following program

In [None]:
import pymongo
import json

#Initializing connection to the database
dbclient = pymongo.MongoClient("localhost", 27017)
db = dbclient["iot-db"]
dailysummary = db["daily-summary"]

# Fetch the daily Summary and print it
summaries = dailysummary.find()
for day in summaries:
    print("Date: " + day["date"])
    print ("Device\t\t Device Type\t Min Val\t Max Val\t Count\t Aggregate")
    for device in day["devices"]:
        print(device, "\t", \
            day["devices"][device]["device_type"], "\t", \
            day["devices"][device]["min_value"], "\t", \
            day["devices"][device]["max_value"], "\t", \
            day["devices"][device]["count"], "\t", \
            day["devices"][device]["agg_value"]
        )

Output:

```
Date: 2021-01-14
Device           Device Type     Min Val         Max Val         Count   Aggregate
temperature_0    temperature     23.46   26.76   3       25.27
temperature_1    temperature     25.24   30.13   3       27.1
co2_0    co2     16.58   21.82   3       19.93
humidity_0       humidity        38.3    38.3    1       38.3
humidity_1       humidity        42.2    42.2    1       42.2

Date: 2021-01-15
Device           Device Type     Min Val         Max Val         Count   Aggregate
temperature_0    temperature     23.46   26.76   3       25.27
temperature_1    temperature     25.24   30.13   3       27.1
co2_0    co2     16.58   21.82   3       19.93
humidity_0       humidity        38.3    38.3    1       38.3
humidity_1       humidity        42.2    42.2    1       42.2

```

## <font color='blue'>Creating alerts</font>

There are a number of ways to create and process alerts in an IoT system. In this section we will develop a simple mechanism in which we define the alert rules at `alerts_config.json` file. 

Each rule in this json file will define a threshold value for a certain device type

`alerts_config.json`
```json
{
    "temperature": 28,
    "humidity": 40,
    "co2": 20
}
```

With these rules in place, we will now develop a program that prints alerts if the previous day's IoT devices data goes beyond the defined threshold value for each device type.

In an actual use case, we can create alerts if the maximum value of a device goes beyond a defined maximum threshold for a device type, the minimum value of a device drops below the defined minimum threshold value and henceforth. However, we will work only with the first case here. Also these rules are often applied for every datapoint as and when it is published to broker, but here we will work with the summary data from `daily-summary` collection



**Step 1:** Initialize the database connection to `daily-summary` collection

In [None]:
import pymongo

#Initializing connection to the database
dbclient = pymongo.MongoClient("localhost", 27017)
db = dbclient["iot-db"]
dailysummary = db["daily-summary"]

**Step 2:** Obtain the previous day's date to filter the devices summary of previous day, then extract the summary of devices from the entire collection

In [None]:
import datetime
previousday = datetime.date.today() - datetime.timedelta(days=1)
previousday = previousday.strftime("%Y-%m-%d")

#Querying for the objects that were published on the previous day
entries = dailysummary.find({"date": previousday})

# Extract the devices summary from the object
devices = entries[0]["devices"]

At this stage, the `devices` variable contains the following value
```json
{
    'temperature_0': {'min_value': 23.46, 'max_value': 26.76, 'count': 3, 'agg_value': 25.27, 'device_type': 'temperature'}, 
    'temperature_1': {'min_value': 25.24, 'max_value': 30.13, 'count': 3, 'agg_value': 27.1, 'device_type': 'temperature'}, 
    'co2_0': {'min_value': 16.58, 'max_value': 21.82, 'count': 3, 'agg_value': 19.93, 'device_type': 'co2'}, 
    'humidity_0': {'min_value': 38.3, 'max_value': 38.3, 'count': 1, 'agg_value': 38.3, 'device_type': 'humidity'}, 
    'humidity_1': {'min_value': 42.2, 'max_value': 42.2, 'count': 1, 'agg_value': 42.2, 'device_type': 'humidity'}
}
```

**Step 3:** To process this further, we read the alerts rules from the `config_alerts.json` file

In [None]:
import json

alerts_config_file=open('alerts_config.json')
alerts_config = json.loads(alerts_config_file.read())
alerts_config_file.close()

**Step 4:** We then iterate through each alert rule over the devices, matching its device type and printing an alert when the device's max value is beyong the value defined in the alert rule

In [None]:
# Iterating through each rule and printing an alert when devices value is over the threshold
for rule in alerts_config:
    for device in devices:
        if(devices[device]["device_type"]==rule):
            if(devices[device]["max_value"]>alerts_config[rule]):
                print(device + " has its max value beyond the threshold value on " + previousday)

Output:
```
temperature_1 has its max value beyond the threshold on 2021-01-14
humidity_1 has its max value beyond the threshold on 2021-01-14
co2_0 has its max value beyond the threshold on 2021-01-14
```