## MQTT Publisher

This is an example for our hypothetical small city publisher, which sends data:
```
- temperature
    - celcius
    - fahrenheit
- air pollution
    - CO level
    - SO2 level
- traffic speed
    - km/h
```
Each of our clients have been set up at certain point for a street of a city, at this time it has sensors which measures the given list of data from environment. 

These clients will be our publishers to send this data.

Our topic structure is as shown:

```
<system_name>/<city_id>/<street_id>/<sensor_data>/<sub_data>
```

Therefore this client will publish data with following topics:
```
system_name/smart_city/city_01/street_01/temperature/celcius
system_name/smart_city/city_01/street_01/temperature/fahrenheit
system_name/smart_city/city_01/street_01/air_pollution/no2_level
system_name/smart_city/city_01/street_01/air_pollution/so2_level
system_name/smart_city/city_01/street_01/traffic_speed/kmh
```

We will use Eclipse Paho (https://www.eclipse.org/paho/clients/python/) as our library to implement MQTT client's publish requirements.

In [None]:
# let's check if paho-mqtt has been installed
!pip install paho-mqtt

In [1]:
# import paho client, first
import paho.mqtt.client as mqtt
import numpy as np
import asyncio
import datetime
import ssl

In [2]:
# Definitions for remote connection, below constant can also be a configuration file stored in your client which 
# you can update with OTA in anychange.

# unique system name
system_name = "hypo_city"

# city id and street name
city_name = "city_01"
street_name = "street_01"

# uniqie client id
client_id = city_name + "_" + street_name

# define the server IP and port to be used
mqtt_server_ip = "127.0.0.1"
server_port = 8883

Callback methods are used to inform system about responds from server accordingly. In a publisher, we would recommend on_connect, on_disconnect and on_publish

In [3]:
# The callback for when the client receives a CONNACK response from the server.
def on_connect(client, userdata, flags, rc):
    print("Connected with result code " + str(rc))

def on_disconnect(client, userdata, rc):
    print("Disconnected with result code " + str(rc))    
    
def on_publish(client, userdata, result):
    print("Message Published {}".format(result))

In [4]:
# Create Client Object and assign publisher callback methods 
client = mqtt.Client(client_id=client_id)

# assign callback methods
client.on_connect = on_connect
client.on_publish = on_publish
client.on_disconnect = on_disconnect

client.tls_set(ca_certs='ca.crt', tls_version=ssl.PROTOCOL_TLSv1, cert_reqs=ssl.CERT_REQUIRED)

username = 'street1'
password = 'simple_password'

# set username password
client.username_pw_set(username, password=password)

# Connect to MQTT Broker
client.connect(mqtt_server_ip, server_port, keepalive=60)

0

Now, we can proceed to implement client to read data from sensors and publish to broker itself. Since we are desigining a hypothetical city with some streets, we will generate pseuda data to replicate a street for installed sensors.

we will be having an infinite loop to send received sensor data and send them. 

In [5]:
async def publish_celcius_temperature(mqtt_client  = mqtt.Client(), topic_prefix = '', qos = 0, retain = False, mean=21.5, std_dev=3.5):
    while True:
        celcius = np.random.normal(loc=mean, scale=std_dev)
        topic = topic_prefix + "/temperature/celcius"
        
        utc_time = datetime.datetime.utcnow()
        
        #message structure 
        # timestampt \t data
        
        message = '{}\t{}'.format(utc_time, celcius)
        
        publish_info = client.publish(topic, payload=message, qos=qos, retain=retain)

        if publish_info.is_published:
            print("Topic:{}, Message:{}".format(topic, message))
        else:
            print("Message Not Published")

        await asyncio.sleep(10)
    
async def publish_fahrenheit_temperature(mqtt_client  = mqtt.Client(), topic_prefix = '', qos = 0, retain = False, mean=60, std_dev=30):
    while True:
        fahrenheit = np.random.normal(loc=mean, scale=std_dev)
        topic = topic_prefix + "/temperature/fahrenheit"
        
        utc_time = datetime.datetime.utcnow()
        
        #message structure 
        # timestampt \t data
        
        message = '{}\t{}'.format(utc_time, fahrenheit)
        
        publish_info = client.publish(topic, payload=message, qos=qos, retain=retain)

        if publish_info.is_published:
            print("Topic:{}, Message:{}".format(topic, message))
        else:
            print("Message Not Published")

        await asyncio.sleep(10)


async def publish_air_pollution_no2(mqtt_client  = mqtt.Client(), topic_prefix = '', qos = 0, retain = False, mean=5.3, std_dev=4.1):
    #ug/m3
    while True:
        no2_level =np.random.normal(loc=mean, scale=std_dev)
        topic = topic_prefix + "/air_pollution/no2_level"
        
        utc_time = datetime.datetime.utcnow()
        
        #message structure 
        # timestampt \t data
        
        message = '{}\t{}'.format(utc_time, no2_level)
        
        publish_info = client.publish(topic, payload=message, qos=qos, retain=retain)
        

        if publish_info.is_published:
            print("Topic:{}, Message:{}".format(topic, message))
        else:
            print("Message Not Published")

        await asyncio.sleep(10)

async def publish_air_pollution_so2(mqtt_client = mqtt.Client(), topic_prefix = '', qos = 0, retain = False, mean=7.2, std_dev=2.5):
    #ug/m3
    while True:
        so2_level = np.random.normal(loc=mean, scale=std_dev)
        topic = topic_prefix + "/air_pollution/so2_level"
        
        utc_time = datetime.datetime.utcnow()
        
        #message structure 
        # timestampt \t data
        
        message = '{}\t{}'.format(utc_time, so2_level)
        
        publish_info = client.publish(topic, payload=message, qos=qos, retain=retain)

        if publish_info.is_published:
            print("Topic:{}, Message:{}".format(topic, message))
        else:
            print("Message Not Published")

        await asyncio.sleep(10)

async def publish_traffic_speed(mqtt_client = mqtt.Client(), topic_prefix = '', qos = 0, retain = False, mean=10, std_dev=10):
    #km/h
    while True:
        traffic_speed = np.random.normal(loc=mean, scale=std_dev)
        topic = topic_prefix + "/traffic_speed"
        
        utc_time = datetime.datetime.utcnow()
        
        #message structure 
        # timestampt \t data
        
        message = '{}\t{}'.format(utc_time, traffic_speed)
        
        publish_info = client.publish(topic, payload=message, qos=qos, retain=retain)

        if publish_info.is_published:
            print("Topic:{}, Message:{}".format(topic, message))
        else:
            print("Message Not Published")

        await asyncio.sleep(10)

In [6]:
# A prefix for topics for this client
topic_prefix = system_name + "/" + city_name + "/" + street_name

# start sensor readings
loop = asyncio.get_event_loop()

loop.create_task(publish_celcius_temperature(mqtt_client=client, topic_prefix=topic_prefix))
loop.create_task(publish_fahrenheit_temperature(mqtt_client=client, topic_prefix=topic_prefix))
loop.create_task(publish_air_pollution_no2(mqtt_client=client, topic_prefix=topic_prefix))
loop.create_task(publish_air_pollution_so2(mqtt_client=client, topic_prefix=topic_prefix))
loop.create_task(publish_traffic_speed(mqtt_client=client, topic_prefix=topic_prefix))

# Start All tasks forever
if not loop.is_running:
    loop.run_forever()

Message Published 1
Topic:hypo_city/city_01/street_01/temperature/celcius, Message:2018-08-05 16:14:35.083874	26.646256929762124
Message Published 2
Topic:hypo_city/city_01/street_01/temperature/fahrenheit, Message:2018-08-05 16:14:35.083874	73.96154328042306
Message Published 3
Topic:hypo_city/city_01/street_01/air_pollution/no2_level, Message:2018-08-05 16:14:35.083874	2.0339192241085446
Message Published 4
Topic:hypo_city/city_01/street_01/air_pollution/so2_level, Message:2018-08-05 16:14:35.084873	8.671623234113893
Message Published 5
Topic:hypo_city/city_01/street_01/traffic_speed, Message:2018-08-05 16:14:35.084873	24.88326575867103
