Skip to content
This repository has been archived by the owner on Mar 15, 2021. It is now read-only.

Commit

Permalink
MQTT Transport Implementation (#48)
Browse files Browse the repository at this point in the history
* MQTT Transport Implementation : (#12)

* MQTT Transport Implementation :
- TLS Implementation
- Username/Password based authentication
- QoS 0, 1, 2 support
- Adding gateway identity layer for encapsulating gateway details

* Removing typo loop_forever and adding check for qos_details

* Adding default parameters

* MQTT Transport Example
- Example for streaming data from device to IoTCC data centre component using MQTT channel
- Adding device comm for MQTT
- Updates in sampleProp.conf for MqttChannel which needs to be subscribed

* Updated MQTT Example
- Print statements removed

* - Modifying gateway name as edge system
- Description for Paho network loop method in MQTT example

* Mqtt

- made suggested changes

* Mqtt (#22)

- made suggested changes

* Mqtt (#25)

- Signature change `subscribe(topic, qos, callback)` 
- Added `loop_start` and `loop_stop` in transports
- Username/Password based authentication
- QoS 0, 1, 2 support
- Adding gateway identity layer for encapsulating gateway details

* Removing typo loop_forever and adding check for qos_details

* Adding default parameters

* MQTT Transport Example
- Example for streaming data from device to IoTCC data centre component using MQTT channel
- Adding device comm for MQTT
- Updates in sampleProp.conf for MqttChannel which needs to be subscribed

* Updated MQTT Example
- Print statements removed

* - Modifying gateway name as edge system
- Description for Paho network loop method in MQTT example

* Changes in example code:
- Created function for MQTT connection setup
- Seperate conf file for MQTT example
-  Code refactoring

* Change in method name

* - Renamed MQTT example
- Added comments
- Change in configuration file

* Updated MQTT example configuration file

* - Adding network loop methods inside MQTT transport
- Subscribe method change : Adding callback as a parameter
- Changes in MQTT example script according to new changes
- Optimised import statements

* Rolling back mistakenly placed extra line

* added 'mess_attr' to send(self, message)

* MqttMessagingAttributes and other changes

* has_attr() check for 'mess_attr'

* few changes in device_comms

* aws_iot dcc and examples

* added Connect-Disconnect-Timeout mechanism

* added and updated docs

* updated Exception handling

* updated example and prop files

* removed blank spaces

* added newline

* updated comment

* updated conf

* bugfix in disconnect timeout

* removing AWSIoT related.

* updated setup.py

* made transport configurable
mess_attr -> msg_attr

* updated

* bug fix

* removed sys.exit(), updated comments, added on_unsubscribe()

* added paho to requirements

* made changes as per review

* updated comment

* updated comment

* renamed DccIdentity > RemoteSystemIdentity

* updated comment

* updated log
  • Loading branch information
Venkat2811 authored and KohliDev committed Jan 16, 2017
1 parent 22bcdc5 commit 944eb15
Show file tree
Hide file tree
Showing 15 changed files with 1,107 additions and 6 deletions.
212 changes: 212 additions & 0 deletions examples/mqtt/iot_cc/iotcc_simulated_mqtt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
# -*- coding: utf-8 -*-
# ----------------------------------------------------------------------------#
# Copyright © 2015-2016 VMware, Inc. All Rights Reserved. #
# #
# Licensed under the BSD 2-Clause License (the “License”); you may not use #
# this file except in compliance with the License. #
# #
# The BSD 2-Clause License #
# #
# Redistribution and use in source and binary forms, with or without #
# modification, are permitted provided that the following conditions are met:#
# #
# - Redistributions of source code must retain the above copyright notice, #
# this list of conditions and the following disclaimer. #
# #
# - Redistributions in binary form must reproduce the above copyright #
# notice, this list of conditions and the following disclaimer in the #
# documentation and/or other materials provided with the distribution. #
# #
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"#
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE #
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE #
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE #
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR #
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF #
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS #
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN #
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) #
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF #
# THE POSSIBILITY OF SUCH DAMAGE. #
# ----------------------------------------------------------------------------#

import Queue

import pint

from liota.dcc_comms.websocket_dcc_comms import WebSocketDccComms
from liota.dccs.dcc import RegistrationFailure
from liota.dccs.iotcc import IotControlCenter
from liota.device_comms.mqtt_device_comms import MqttDeviceComms
from liota.entities.devices.simulated_device import SimulatedDevice
from liota.entities.edge_systems.dk300_edge_system import Dk300EdgeSystem
from liota.entities.metrics.metric import Metric
from liota.lib.identity.identity import RemoteSystemIdentity
from liota.lib.identity.identity import EdgeSystemIdentity
from liota.lib.identity.tls_conf import TLSConf
from liota.lib.transports.mqtt import QoSDetails

# getting values from conf file
config = {}
execfile('samplePropMqtt.conf', config)

# Create unit registry
ureg = pint.UnitRegistry()

# Store temperature values in Queue
kitchen_temperature_data = Queue.Queue()
living_room_temperature_data = Queue.Queue()


# Callback functions
# To put corresponding values in queue
def callback_kitchen_temp(client, userdata, message):
kitchen_temperature_data.put(float(message.payload))


def callback_living_room_temp(client, userdata, message):
living_room_temperature_data.put(float(message.payload))


# Extract data from Queue
def get_value(queue):
return queue.get(block=True)


# ---------------------------------------------------------------------------------
# In this example, we demonstrate how data from two different Mqtt Channels (topics)
# can be collected and sent to IoTCC Dcc using Liota.
#
#
# IoTCC DCC
# /|\
# |
# |
# | WebSocket
# |
# |
# Dell5kEdgeSystem
# /|\ /|\
# | |
# mqtt subscribe | | mqtt subscribe
# (temperature/kitchen)| | (temperature/living-room)
# | |
# --------------------------
# | |
# | MQTT Broker |
# --------------------------
# /|\ /|\
# | |
# mqtt publish | | mqtt publish
# (temperature/kitchen) | | (temperature/living-room)
# | |
# Temperature Sensor Temperature Sensor
# at Kitchen at Living room
#
#
# Data streaming can be done from MQTT channel to IoTCC using LIOTA setting
# sampling_interval_sec to zero.
#
# Temperature values from sensor will be collected using MQTT channel and redirected
# to IoTCC data center component
# ------------------------------------------------------------------------------------


# MQTT connection setup to record kitchen and living room temperature values
def mqtt_subscribe(edge_system_object):
# RemoteSystemIdentity Object to connect with broker used in DeviceComms
remote_system_identity = RemoteSystemIdentity(root_ca_cert=config['broker_root_ca_cert'],
username=config['broker_username'], password=['broker_password'])

# Create Edge System identity object with all required certificate details
edge_system_identity = EdgeSystemIdentity(edge_system=edge_system_object, cert_file=config['edge_system_cert_file'],
key_file=config['edge_system_key_file'])

# Encapsulate TLS parameters
tls_conf = TLSConf(cert_required=config['cert_required'], tls_version=config['tls_version'],
cipher=config['cipher'])

# Encapsulate QoS related parameters
qos_details = QoSDetails(in_flight=config['in_flight'], queue_size=config['queue_size'], retry=config['retry'])

# Create MQTT connection object with required params
mqtt_conn = MqttDeviceComms(remote_system_identity=remote_system_identity, edge_system_identity=edge_system_identity,
tls_details=tls_conf, qos_details=None, url=config['BrokerIP'], clean_session=True,
port=config['BrokerPort'], keep_alive=config['keep_alive'], enable_authentication=True)

# Subscribe to channels : "temperature/kitchen" and "temperature/living-room" with preferred QoS level 0, 1 or 2
# Provide callback function as a parameter for corresponding channel
mqtt_conn.subscribe(config['MqttChannel1'], 1, callback_kitchen_temp)
mqtt_conn.subscribe(config['MqttChannel2'], 1, callback_living_room_temp)


if __name__ == "__main__":

# Create DCC object IoTCC using websocket transport
# with UID and PASS
iotcc = IotControlCenter(config['IotCCUID'], config['IotCCPassword'],
WebSocketDccComms(url=config['WebSocketUrl']))

try:

# Create an Edge System Dk300
edge_system = Dk300EdgeSystem(config['EdgeSystemName'])

# Get kitchen and living room temperature values using MQTT channel
mqtt_subscribe(edge_system)

# Register Edge System with IoT control center
reg_edge_system = iotcc.register(edge_system)

# these call set properties on the Resource representing the IoT System
# properties are a key:value store
reg_edge_system.set_properties(config['SystemPropList'])

# Create kitchen device object and register it on IoTCC
# Add two device names in the configurations as DeviceName1 and DeviceName2
kitchen_temperature_device = SimulatedDevice(name=config['DeviceName1'])
reg_kitchen_temperature_device = iotcc.register(kitchen_temperature_device)

iotcc.create_relationship(reg_edge_system, reg_kitchen_temperature_device)
reg_kitchen_temperature_device.set_properties(config['DevicePropList'])

# Metric Name
metric_name_kitchen_temperature = "temperature.kitchen"

# Create metric for kitchen temperature
kitchen_temperature = Metric(
name=metric_name_kitchen_temperature,
unit=ureg.degC,
interval=0,
sampling_function=lambda: get_value(kitchen_temperature_data)
)

reg_kitchen_temp = iotcc.register(kitchen_temperature)
iotcc.create_relationship(reg_kitchen_temperature_device, reg_kitchen_temp)
reg_kitchen_temp.start_collecting()

# Create living room device object and register it on IoTCC
living_room_temperature_device = SimulatedDevice(name=config['DeviceName2'])
reg_living_room_temperature_device = iotcc.register(living_room_temperature_device)

iotcc.create_relationship(reg_edge_system, reg_living_room_temperature_device)
reg_living_room_temperature_device.set_properties(config['DevicePropList'])

# Metric Name
metric_name_living_room_temperature = "temperature.living"

# Create metric for living room temperature
living_room_temperature = Metric(
name=metric_name_living_room_temperature,
unit=ureg.degC,
interval=0,
sampling_function=lambda:get_value(living_room_temperature_data)
)

reg_living_room_temp = iotcc.register(living_room_temperature)
iotcc.create_relationship(reg_living_room_temperature_device, reg_living_room_temp)
reg_living_room_temp.start_collecting()

except RegistrationFailure:
print "Registration to IOTCC failed"

40 changes: 40 additions & 0 deletions examples/mqtt/iot_cc/samplePropMqtt.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
#### [DEFAULT] ####

EdgeSystemName = "EdgeSystem-Name"
DeviceName1 = "Device-Name1"
DeviceName2 = "Device-Name2"
MetricName = "Metric-Name"
MetricWithOwnTsName = "Metric-With-Own-Ts-Name"
BulkCollectedMetricName = "Bulk-Collected-Metric-Name"

SystemPropList = {"Country":"USA-G", "State":"California", "City":"Palo Alto", "Location":"VMware HQ", "Building":"Promontory H Lab", "Floor":"First Floor"}
DevicePropList = {"Country":"USA-G", "State":"California", "City":"Palo Alto", "Location":"VMware HQ", "Building":"Promontory H Lab", "Floor":"First Floor"}

#### [IOTCC] ####

WebSocketUrl = "Websocket-address-url"
IotCCUID = "Username"
IotCCPassword = "Password"

#### [MQTT] ####

MqttChannel1 = "mqtt-channel1"
MqttChannel2 = "mqtt-channel2"
BrokerIP = "mqtt-broker"
BrokerPort = "mqtt-broker-port"
keep_alive = 60
ConnectDisconnectTimeout = 10
cert_required = "CERT_REQUIRED"
tls_version = "PROTOCOL_TLSv1"
broker_root_ca_cert = "/etc/liota/mqtt/conf/ca.crt"
broker_username = "Username"
broker_password = "Password"
edge_system_cert_file = "/etc/liota/mqtt/conf/client.crt"
edge_system_key_file = "/etc/liota/mqtt/conf/client.key"
userdata = None
protocol = "MQTTv311"
transport = "tcp"
cipher = None
in_flight = 20
queue_size = 0
retry = 5
1 change: 0 additions & 1 deletion examples/sampleProp.conf
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,3 @@ IotCCPassword = "Password"

GraphiteIP = "Graphite-IP"
GraphitePort = None

2 changes: 1 addition & 1 deletion liota/dcc_comms/dcc_comms.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def _disconnect(self):
pass

@abstractmethod
def send(self, message):
def send(self, message, msg_attr):
pass

@abstractmethod
Expand Down
Loading

0 comments on commit 944eb15

Please sign in to comment.