<img src="http://drive.google.com/uc?export=view&id=1tpOCamr9aWz817atPnyXus8w5gJ3mIts" width=500px>

Proprietary content. © Great Learning. All Rights Reserved. Unauthorized use or distribution prohibited.

## <font color='blue'> Modifying data collection </font>

Config file `config_sub.json` for subscriber

In [None]:
{
    "broker_host": "localhost",
    "broker_port": 1883,

    "db_host": "localhost",
    "db_port": 27017,
    "db_name": "iot-db",
    "db_collection":"iot-sensors-data-timestamped"
}

Source code `subscribe.py`

In [None]:
import paho.mqtt.client as mqtt
import json
import pymongo

# Callback function - executed when the program successfully connects to the broker
def on_connect(client, userdata, flags, rc):
    print("Connected with result code "+str(rc))
    client.subscribe("devices/#")

#Callback function - executed when the program gracefully disconnects from the broker
def on_disconnect(client, userdata, rc):
    print("Disconnected with result code "+str(rc))

#Callback function - executed whenever a message is published to the topics that 
#this program is subscribed to
def on_message(client, userdata, msg):
    item = {"topic":msg.topic, "payload":msg.payload, "timestamp":json.loads(msg.payload)["timestamp"]}
    dbt.insert_one(item)
    print("Received a messsage on " + msg.topic + " and inserted it to the DB")

#Defining an MQTT client object
client = mqtt.Client()

#Setting callback functions for various client operations
client.on_connect = on_connect
client.on_message = on_message
client.on_disconnect = on_disconnect


#Reading the configuration file
f=open("config_sub.json")
config = json.loads(f.read())
f.close()

#Initializing connection to the database
dbclient = pymongo.MongoClient(config["db_host"], config["db_port"])
db = dbclient[config["db_name"]]
dbt = db[config["db_collection"]]

#Connecting to broker
client.connect(host=config["broker_host"], port=config["broker_port"], keepalive=60)

'''
Start the MQTT client non-blocking loop to listen the broker for messages 
in subscribed topics and other operations for which the callback functions 
are defined
'''
client.loop_start()

while True:
    try:
        pass
    #Disconnect the client from MQTT broker and stop the loop gracefully at 
    # Keyboard interrupt (Ctrl+C)
    except KeyboardInterrupt:
        client.disconnect()
        client.loop_stop()
        break

Sample response:
    
```bash
Received a messsage on devices/temp and inserted it to the DB
Received a messsage on devices/temp and inserted it to the DB
Received a messsage on devices/hum and inserted it to the DB
Received a messsage on devices/hum and inserted it to the DB
Received a messsage on devices/co2 and inserted it to the DB
Received a messsage on devices/temp and inserted it to the DB
Received a messsage on devices/temp and inserted it to the DB
Received a messsage on devices/co2 and inserted it to the DB
Received a messsage on devices/temp and inserted it to the DB
Received a messsage on devices/temp and inserted it to the DB
Received a messsage on devices/hum and inserted it to the DB
```

Config file `config_pub.json` for publisher

In [None]:
{
    "broker_host": "localhost",
    "broker_port": 1883,
    "devices": [
        {
            "type": "temperature",
            "publish_frequency": 5,
            "publish_topic": "devices/temp",
            "std_val": 25,
            "device_count": 2
        },
        {
            "type": "humidity",
            "publish_frequency": 10,
            "publish_topic": "devices/hum",
            "std_val": 40,
            "device_count": 2
        },
        {
            "type": "co2",
            "publish_frequency": 5,
            "publish_topic": "devices/co2",
            "std_val": 20,
            "device_count": 1
        }
    ]
}

Source code `publish.py`

In [None]:
import paho.mqtt.client as mqtt
import json
import time
import datetime
import numpy as np

#Instantiating an object with mqtt
client = mqtt.Client()

# Callback function - executed when the program successfully connects to the broker
def on_connect(client, userdata, flags, rc):
    print("Connected with result code "+str(rc))
    client.subscribe("test")

#Callback function - executed when the program gracefully disconnects from the broker
def on_disconnect(client, userdata, rc):
    print("Disconnected with result code "+str(rc))

#Callback function - executed whenever a message is published to the topics that 
#this program is subscribed to
def on_message(client, userdata, msg):
    print(msg.topic,str(msg.payload), "retain", msg.retain, "qos", msg.qos, str(userdata) )


#Setting callback functions for various client actions
client.on_connect = on_connect
client.on_message = on_message
client.on_disconnect = on_disconnect



# Reading the configuration file
f=open("config_pub.json")
config = json.loads(f.read())
f.close()

# Initialising devices from the config.json file and assigning device_ids to each device
device_config = []
for devices in config['devices']:
    for n in range(devices['device_count']):
        dev = {}
        dev['device_id'] = devices['type']+"_"+str(n)
        dev['device_type'] = devices['type']
        dev['publish_frequency'] = devices['publish_frequency']
        dev['std_val'] = devices['std_val']
        dev['publish_topic'] = devices['publish_topic']
        device_config.append(dev)
        
        
#Connecting to broker
client.connect(host=config["broker_host"], port=config["broker_port"], keepalive=60)

'''
Start the MQTT client non-blocking loop to listen the broker for messages 
in subscribed topics and other operations for which the callback functions 
are defined
'''
client.loop_start()


clock=0
while True:
    try:
        time.sleep(1) 
        clock = clock+1
        for devices in device_config:
            if clock%devices['publish_frequency']==0:
                print("Published to devices/"+devices["device_type"])
                
                #Initialize a dictionary to be sent as publish message
                message = {}
                
                #Generate timestamp in YYYY-MM-DD HH:MM:SS format
                prevday_timestamp = datetime.datetime.now() - datetime.timedelta(days=1)
                message["timestamp"] = prevday_timestamp.strftime("%Y-%m-%dT%H:%M:%SZ")
                message["device_id"] = devices["device_id"]
                message["device_type"] = devices["device_type"]
                message["value"] = round(np.random.normal(devices["std_val"],2),2)
                #Publish the message
                client.publish(devices["publish_topic"], json.dumps(message))
        
    #Disconnect the client from MQTT broker and stop the loop gracefully at Keyboard interrupt (Ctrl+C)
    except KeyboardInterrupt:
        client.disconnect()
        client.loop_stop()
        break


Source code to Check the DB collection `viewdb.py`

In [None]:
import pymongo
import json

#Reading the configuration file
f=open("config_sub.json")
config = json.loads(f.read())
f.close()

#Initializing connection to the database
dbclient = pymongo.MongoClient(config["db_host"], config["db_port"])
db = dbclient[config["db_name"]]
dbt = db[config["db_collection"]]

#Querying for the messages that were published to the `devices/temp` topic, on 01 Jan 2021 
entries = dbt.find({"topic":"devices/temp", \
                    "timestamp":  {"$gte": "2021-01-01T00:00:00.000Z", \
                                   "$lt" : "2021-01-02T00:00:00.000Z"}})

#Print the entries
for entry in entries:
    print(entry)

Sample response:

``` json
{'_id': ObjectId('60017cdfcb163953b3e92b45'), 'topic': 'devices/temp', 'payload': '{"timestamp": "2021-01-15 17:00:39", "value": 23.69, "device_type": "temperature", "device_id": "temperature_0"}'}
{'_id': ObjectId('60017cdfcb163953b3e92b46'), 'topic': 'devices/temp', 'payload': '{"timestamp": "2021-01-15 17:00:39", "value": 26.7, "device_type": "temperature", "device_id": "temperature_1"}'}
{'_id': ObjectId('60017ce4cb163953b3e92b4a'), 'topic': 'devices/temp', 'payload': '{"timestamp": "2021-01-15 17:00:44", "value": 25.35, "device_type": "temperature", "device_id": "temperature_0"}'}
{'_id': ObjectId('60017ce4cb163953b3e92b4b'), 'topic': 'devices/temp', 'payload': '{"timestamp": "2021-01-15 17:00:44", "value": 25.48, "device_type": "temperature", "device_id": "temperature_1"}'}
{'_id': ObjectId('60017ce9cb163953b3e92b4d'), 'topic': 'devices/temp', 'payload': '{"timestamp": "2021-01-15 17:00:49", "value": 26.76, "device_type": "temperature", "device_id": "temperature_0"}'}
{'_id': ObjectId('60017ce9cb163953b3e92b4e'), 'topic': 'devices/temp', 'payload': '{"timestamp": "2021-01-15 17:00:49", "value": 26.93, "device_type": "temperature", "device_id": "temperature_1"}'}
```

## <font color='blue'>Flattening the objects</font>

Source code `flatten.py`

In [None]:
import datetime
import pymongo
import json

today = datetime.date.today()                        # datetime.date(2021, 1, 15)
prevday = today-datetime.timedelta(days=1)           #datetime.date(2021, 1, 14)
prevday = prevday.strftime("%Y-%m-%dT00:00:00Z")   #'2021-01-14T00:00:00Z'
today = today.strftime("%Y-%m-%dT00:00:00Z")         #'2021-01-15T00:00:00Z'

#Initializing connection to the database
dbclient = pymongo.MongoClient("localhost", 27017)
db = dbclient["iot-db"]
dbt = db["iot-sensors-data-timestamped"]

#Querying for the messages that were published the previous day
prevday_entries = dbt.find({"timestamp":  {"$gte": prevday, "$lt" : today}})

# Define the collection to store previous day entries
pde_flattened = db["iot-sensors-data-flattened"]

# Iterate through every datapoint
for entry in prevday_entries:
    # Load the value of payload field as json
    payload = json.loads(entry["payload"])
    
    #Iterate through every field in the payload and add it to the datapoint
    for field in payload:
        entry[field] = payload[field]
    
    # Delete the payload field since it has been flattened
    del entry["payload"]
    
    # Insert the flattened datapoint to collection
    pde_flattened.insert_one(entry)

## <font color='blue'>Devices list</font>

Source code `add_devices.py`

In [None]:
import pymongo
import json

# Initializing connection to the database
dbclient = pymongo.MongoClient("localhost", 27017)
db = dbclient["iot-db"]
pde_flattened = db["iot-sensors-data-flattened"]

entries = pde_flattened.find()

# Loading the devices list from collection
db_devices_list = db["iot-devices-list"]
devices = db_devices_list.find()

# Extracting the device_ids in a list
existing_devices = []
for device in devices:
    existing_devices.append(device["device_id"])
    
# Iterate through each entry from previous day's objects and add new devices to the collection
for entry in entries:
    if entry["device_id"] not in existing_devices:
        db_devices_list.insert_one({"device_id":entry["device_id"], "device_type":entry["device_type"]})
        existing_devices.append(entry["device_id"])

Source code `list_devices.py`

In [None]:
import pymongo
import json

#Initializing connection to the database
dbclient = pymongo.MongoClient("localhost", 27017)
db = dbclient["iot-db"]
db_devices_list = db["iot-devices-list"]

#Loading the devices list
entries = db_devices_list.find()

for entry in entries:
    print("Device ID: " + entry["device_id"] + " | Device Type: " + entry["device_type"])

Output:

```
Device ID: temperature_0 | Device Type: temperature
Device ID: temperature_1 | Device Type: temperature
Device ID: co2_0 | Device Type: co2
Device ID: humidity_0 | Device Type: humidity
Device ID: humidity_1 | Device Type: humidity
```

## <font color='blue'>Calculating the aggregate</font>

Source code `summarize.py`

In [None]:
import pymongo
import json

#Initializing connection to the database and fetching the list of devices
dbclient = pymongo.MongoClient("localhost", 27017)
db = dbclient["iot-db"]
db_devices_list = db["iot-devices-list"]
devices = db_devices_list.find()

# Initializing a *values* list to capture data from previous day's entries
values= {}
for device in devices:
    values[device["device_id"]] = {}
    values[device["device_id"]]["values"] = []
    
# Fetching the previous day iot-devices data
pde_flattened = db["iot-sensors-data-flattened"]
entries = pde_flattened.find()

# Adding the iot devices data to the appropriate device_id list
for entry in entries:
    values[entry["device_id"]]["values"].append(entry["value"])
    
# Fetch the list of devices
devices = db_devices_list.find()

# Iterate through devices list and calculate the min, max and aggregate
for device in devices:
    values[device["device_id"]]["min_value"] = min(values[device["device_id"]]["values"])
    values[device["device_id"]]["max_value"] = max(values[device["device_id"]]["values"])
    values[device["device_id"]]["count"] = len(values[device["device_id"]]["values"])
    values[device["device_id"]]["agg_value"] = round(sum(values[device["device_id"]]["values"])\
        /values[device["device_id"]]["count"],2)
    values[device["device_id"]]["device_type"] = device["device_type"]
    # Delete the raw values as we don't need it in the daily summary
    del values[device["device_id"]]["values"]
    
# Calculate previous day's day in YYYY-MM-DD format
import datetime
previousday = datetime.date.today() - datetime.timedelta(days=1)
previousday = previousday.strftime("%Y-%m-%d")

# Insert the summary into the *daily-summary* collection
dailysummary = db["daily-summary"]
dailysummary.insert_one({"date": previousday, "devices": values})

Source code `daily_summary.py`

In [None]:
import pymongo
import json

#Initializing connection to the database
dbclient = pymongo.MongoClient("localhost", 27017)
db = dbclient["iot-db"]
dailysummary = db["daily-summary"]

# Fetch the daily Summary and print it
summaries = dailysummary.find()
for day in summaries:
    print("Date: " + day["date"])
    print ("Device\t\t Device Type\t Min Val\t Max Val\t Count\t Aggregate")
    for device in day["devices"]:
        print(device, "\t", \
            day["devices"][device]["device_type"], "\t", \
            day["devices"][device]["min_value"], "\t", \
            day["devices"][device]["max_value"], "\t", \
            day["devices"][device]["count"], "\t", \
            day["devices"][device]["agg_value"]
        )

Output:

```
Date: 2021-01-14
Device           Device Type     Min Val         Max Val         Count   Aggregate
temperature_0    temperature     23.46   26.76   3       25.27
temperature_1    temperature     25.24   30.13   3       27.1
co2_0    co2     16.58   21.82   3       19.93
humidity_0       humidity        38.3    38.3    1       38.3
humidity_1       humidity        42.2    42.2    1       42.2

Date: 2021-01-15
Device           Device Type     Min Val         Max Val         Count   Aggregate
temperature_0    temperature     23.46   26.76   3       25.27
temperature_1    temperature     25.24   30.13   3       27.1
co2_0    co2     16.58   21.82   3       19.93
humidity_0       humidity        38.3    38.3    1       38.3
humidity_1       humidity        42.2    42.2    1       42.2

```

## <font color='blue'>Creating alerts</font>

Config file `alerts_config.json`

In [None]:
{
    "temperature": 28,
    "humidity": 40,
    "co2": 20
}

Source file `alerts.py`

In [None]:
import pymongo
import datetime
import json

# Initializing connection to the database
dbclient = pymongo.MongoClient("localhost", 27017)
db = dbclient["iot-db"]
dailysummary = db["daily-summary"]


previousday = datetime.date.today() - datetime.timedelta(days=1)
previousday = previousday.strftime("%Y-%m-%d")

# Querying for the objects that were published on the previous day
entries = dailysummary.find({"date": previousday})

# Extract the devices summary from the object
devices = entries[0]["devices"]

# Read the alert rules
alerts_config_file=open('alerts_config.json')
alerts_config = json.loads(alerts_config_file.read())
alerts_config_file.close()

# Iterating through each rule and printing an alert when devices value is over the threshold
for rule in alerts_config:
    for device in devices:
        if(devices[device]["device_type"]==rule):
            if(devices[device]["max_value"]>alerts_config[rule]):
                print(device + " has its max value beyond the threshold value on " + previousday)

Output:
```
temperature_1 has its max value beyond the threshold on 2021-01-14
humidity_1 has its max value beyond the threshold on 2021-01-14
co2_0 has its max value beyond the threshold on 2021-01-14
```