Skip to content

Commit

Permalink
adaptations to MQTT interfacer to work with abstracted emonhub_interf…
Browse files Browse the repository at this point in the history
…acer class

change to databuffer to store full cargo object rather than cut down emoncms bulk format
  • Loading branch information
glynhudson committed Feb 1, 2018
1 parent 65278b2 commit a1328be
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 82 deletions.
15 changes: 14 additions & 1 deletion src/interfacers/EmonHubEmoncmsHTTPInterfacer.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def __init__(self, name):
# set an absolute upper limit for number of items to process per post
self._item_limit = 250

def _process_post(self, databuffer):
def _process_post(self, cargodatabuffer):
"""Send data to server."""

# databuffer is of format:
Expand All @@ -41,6 +41,19 @@ def _process_post(self, databuffer):
if not 'apikey' in self._settings.keys() or str.__len__(str(self._settings['apikey'])) != 32 \
or str.lower(str(self._settings['apikey'])) == 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx':
return False

# Convert cargo based databuffer into emoncms format
databuffer = []
for c in range(0,len(cargodatabuffer)):
cargo = cargodatabuffer[c]
f = []
f.append(cargo.timestamp)
f.append(cargo.nodeid)
for i in cargo.realdata:
f.append(i)
if cargo.rssi:
f.append(cargo.rssi)
databuffer.append(f)

data_string = json.dumps(databuffer, separators=(',', ':'))

Expand Down
149 changes: 81 additions & 68 deletions src/interfacers/EmonHubMqttInterfacer.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@ def __init__(self, name, mqtt_user=" ", mqtt_passwd=" ", mqtt_host="127.0.0.1",
self._user = mqtt_user
self._passwd = mqtt_passwd
self._connected = False


self._defaults.update({'batchsize': 1,'interval': 0})
self._settings = {
'pubchannels':[],
'subchannels':[],
# emonhub/rx/10/values format - default emoncms nodes module
'node_format_enable': 1,
'node_format_basetopic': 'emonhub/',
Expand All @@ -31,15 +30,15 @@ def __init__(self, name, mqtt_user=" ", mqtt_passwd=" ", mqtt_host="127.0.0.1",
'nodevar_format_enable': 0,
'nodevar_format_basetopic': "nodes/"
}
self._settings.update(self._defaults)

self._mqttc = mqtt.Client()
self._mqttc.on_connect = self.on_connect
self._mqttc.on_disconnect = self.on_disconnect
self._mqttc.on_message = self.on_message
self._mqttc.on_subscribe = self.on_subscribe

# The action method is called from emonhub_interfacer.py method run
def action(self):
def _process_post(self, databuffer):
if not self._connected:
self._log.info("Connecting to MQTT Server")
try:
Expand All @@ -49,72 +48,86 @@ def action(self):
self._log.info("Could not connect...")
time.sleep(1.0)
else:
# Itterate through sub channels
for channel in self._settings["subchannels"]:
# Check for data in sub channel buffer
cargo = databuffer[0]

# ----------------------------------------------------------
# General MQTT format: emonhub/rx/emonpi/power1 ... 100
# ----------------------------------------------------------
if int(self._settings["nodevar_format_enable"])==1:

# Node id or nodename if given
nodestr = str(cargo.nodeid)
if cargo.nodename!=False: nodestr = str(cargo.nodename)

if channel in self._sub_channels:
while len(self._sub_channels[channel])>0:
cargo = self._sub_channels[channel].pop(0)

# ----------------------------------------------------------
# General MQTT format: emonhub/rx/emonpi/power1 ... 100
# ----------------------------------------------------------
if int(self._settings["nodevar_format_enable"])==1:

# Node id or nodename if given
nodestr = str(cargo.nodeid)
if cargo.nodename!=False: nodestr = str(cargo.nodename)

varid = 1
for value in cargo.realdata:
# Variable id or variable name if given
varstr = str(varid)
if (varid-1)<len(cargo.names):
varstr = str(cargo.names[varid-1])
# Construct topic
topic = self._settings["nodevar_format_basetopic"]+nodestr+"/"+varstr
payload = str(value)

self._log.debug("Publishing: "+topic+" "+payload)
result =self._mqttc.publish(topic, payload=payload, qos=2, retain=False)

if result[0]==4:
self._log.info("Publishing error? returned 4")

varid += 1

# RSSI
topic = self._settings["nodevar_format_basetopic"]+nodestr+"/rssi"
payload = str(cargo.rssi)
self._log.info("Publishing: "+topic+" "+payload)
result =self._mqttc.publish(topic, payload=payload, qos=2, retain=False)

# ----------------------------------------------------------
# Emoncms nodes module format: emonhub/rx/10/values ... 100,200,300
# ----------------------------------------------------------
if int(self._settings["node_format_enable"])==1:

topic = self._settings["node_format_basetopic"]+"rx/"+str(cargo.nodeid)+"/values"
payload = ",".join(map(str,cargo.realdata))

self._log.info("Publishing: "+topic+" "+payload)
result =self._mqttc.publish(topic, payload=payload, qos=2, retain=False)

if result[0]==4:
self._log.info("Publishing error? returned 4")

# RSSI
topic = self._settings["node_format_basetopic"]+"rx/"+str(cargo.nodeid)+"/rssi"
payload = str(cargo.rssi)

self._log.info("Publishing: "+topic+" "+payload)
result =self._mqttc.publish(topic, payload=payload, qos=2, retain=False)

if result[0]==4:
self._log.info("Publishing error? returned 4")
varid = 1
for value in cargo.realdata:
# Variable id or variable name if given
varstr = str(varid)
if (varid-1)<len(cargo.names):
varstr = str(cargo.names[varid-1])
# Construct topic
topic = self._settings["nodevar_format_basetopic"]+nodestr+"/"+varstr
payload = str(value)

self._log.debug("Publishing: "+topic+" "+payload)
result =self._mqttc.publish(topic, payload=payload, qos=2, retain=False)

if result[0]==4:
self._log.info("Publishing error? returned 4")

varid += 1

# RSSI
topic = self._settings["nodevar_format_basetopic"]+nodestr+"/rssi"
payload = str(cargo.rssi)
self._log.info("Publishing: "+topic+" "+payload)
result =self._mqttc.publish(topic, payload=payload, qos=2, retain=False)

# ----------------------------------------------------------
# Emoncms nodes module format: emonhub/rx/10/values ... 100,200,300
# ----------------------------------------------------------
if int(self._settings["node_format_enable"])==1:

topic = self._settings["node_format_basetopic"]+"rx/"+str(cargo.nodeid)+"/values"
payload = ",".join(map(str,cargo.realdata))

self._log.info("Publishing: "+topic+" "+payload)
result =self._mqttc.publish(topic, payload=payload, qos=2, retain=False)

if result[0]==4:
self._log.info("Publishing error? returned 4")

# RSSI
topic = self._settings["node_format_basetopic"]+"rx/"+str(cargo.nodeid)+"/rssi"
payload = str(cargo.rssi)

self._log.info("Publishing: "+topic+" "+payload)
result =self._mqttc.publish(topic, payload=payload, qos=2, retain=False)

if result[0]==4:
self._log.info("Publishing error? returned 4")

return True

def action(self):
"""
:return:
"""
self._mqttc.loop(0)

# pause output if 'pause' set to 'all' or 'out'
if 'pause' in self._settings \
and str(self._settings['pause']).lower() in ['all', 'out']:
return

# If an interval is set, check if that time has passed since last post
if int(self._settings['interval']) \
and time.time() - self._interval_timestamp < int(self._settings['interval']):
return
else:
# Then attempt to flush the buffer
self.flush()

def on_connect(self, client, userdata, flags, rc):

Expand Down
29 changes: 16 additions & 13 deletions src/interfacers/emonhub_interfacer.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,19 +133,19 @@ def add(self, cargo):
"""

# Create a frame of data in "emonCMS format"
f = []
try:
f.append(cargo.timestamp)
f.append(cargo.nodeid)
for i in cargo.realdata:
f.append(i)
if cargo.rssi:
f.append(cargo.rssi)

self._log.debug(str(cargo.uri) + " adding frame to buffer => "+ str(f))
# f = []
# try:
# f.append(cargo.timestamp)
# f.append(cargo.nodeid)
# for i in cargo.realdata:
# f.append(i)
# if cargo.rssi:
# f.append(cargo.rssi)
#
# self._log.debug(str(cargo.uri) + " adding frame to buffer => "+ str(f))

except:
self._log.warning("Failed to create emonCMS frame " + str(f))
# except:
# self._log.warning("Failed to create emonCMS frame " + str(f))

# self._log.debug(str(carg.ref) + " added to buffer =>"
# + " time: " + str(carg.timestamp)
Expand All @@ -155,7 +155,10 @@ def add(self, cargo):
# databuffer is of format:
# [[timestamp, nodeid, datavalues][timestamp, nodeid, datavalues]]
# [[1399980731, 10, 150, 3450 ...]]
self.buffer.storeItem(f)

# Pass full cargo item:
# names required for MQTT interfacer and potentially future HTTP interfacer
self.buffer.storeItem(cargo)

def read(self):
"""Read raw data from interface and pass for processing.
Expand Down

0 comments on commit a1328be

Please sign in to comment.