Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Thread exception reporting, thread restarting, restart counting... #36

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 23 additions & 4 deletions src/emonhub.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import signal
import argparse
import pprint

try:
import pymodbus
pymodbus_found = True
Expand Down Expand Up @@ -124,6 +125,11 @@ def run(self):
# Set signal handler to catch SIGINT and shutdown gracefully
signal.signal(signal.SIGINT, self._sigint_handler)

# Initialise thread restart counters
restart_count={}
for I in self._interfacers.itervalues():
restart_count[I.name]=0

# Until asked to stop
while not self._exit:

Expand All @@ -133,11 +139,24 @@ def run(self):
self._update_settings(self._setup.settings)

# For all Interfacers
kill_list=[]
for I in self._interfacers.itervalues():
# Check thread is still running
# Check threads are still running
if not I.isAlive():
#I.start()
self._log.warning(I.name + " thread is dead") # had to be restarted")
kill_list.append(I.name) # <-avoid modification of iterable within loop

# ->avoid modification of iterable within loop
for name in kill_list:
self._log.warning(name + " thread is dead.")

# The following should trigger a restart ... unless the
# interfacer is also removed from the settings table.
del(self._interfacers[name])

# Trigger restart by calling update settings
self._log.warning("Attempting to restart thread "+name+" (thread has been restarted "+str(restart_count[name])+" times...")
restart_count[name]+=1
self._update_settings(self._setup.settings)

# Sleep until next iteration
time.sleep(0.2)
Expand Down Expand Up @@ -208,7 +227,7 @@ def _update_settings(self, settings):
if I['Type'] in ('EmonModbusTcpInterfacer','EmonFroniusModbusTcpInterfacer') and not pymodbus_found :
self._log.error("Python module pymodbus not installed. unable to load modbus interfacer")
# This gets the class from the 'Type' string
interfacer = getattr(ehi, I['Type'])(name, **I['init_settings'])
interfacer = getattr(ehi, I['Type'])(name,**I['init_settings'])
interfacer.set(**I['runtimesettings'])
interfacer.init_settings = I['init_settings']
interfacer.start()
Expand Down
58 changes: 29 additions & 29 deletions src/interfacers/EmonHubMqttInterfacer.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,23 @@ class EmonHubMqttInterfacer(EmonHubInterfacer):
def __init__(self, name, mqtt_user=" ", mqtt_passwd=" ", mqtt_host="127.0.0.1", mqtt_port=1883):
# Initialization
super(EmonHubMqttInterfacer, self).__init__(name)

self._log.info(str(name)+" Init mqtt_host="+str(mqtt_host)+" mqtt_port="+str(mqtt_port)+ " mqtt_user="+str(mqtt_user))
self._name = name
self._host = mqtt_host
self._port = mqtt_port
self._user = mqtt_user
self._passwd = mqtt_passwd
self._connected = False

self._settings = {
'subchannels':['ch1'],
'pubchannels':['ch2'],

# emonhub/rx/10/values format - default emoncms nodes module
'node_format_enable': 1,
'node_format_basetopic': 'emonhub/',

# nodes/emontx/power1 format
'nodevar_format_enable': 0,
'nodevar_format_basetopic': "nodes/"
Expand All @@ -39,7 +39,7 @@ def __init__(self, name, mqtt_user=" ", mqtt_passwd=" ", mqtt_host="127.0.0.1",
self._mqttc.on_disconnect = self.on_disconnect
self._mqttc.on_message = self.on_message
self._mqttc.on_subscribe = self.on_subscribe


def action(self):
if not self._connected:
Expand All @@ -51,9 +51,9 @@ def action(self):
self._log.info("Could not connect...")
time.sleep(1.0)
self._mqttc.loop(0)

def on_connect(self, client, userdata, flags, rc):

connack_string = {0:'Connection successful',
1:'Connection refused - incorrect protocol version',
2:'Connection refused - invalid client identifier',
Expand All @@ -68,25 +68,25 @@ def on_connect(self, client, userdata, flags, rc):
self._connected = True
# Subscribe to MQTT topics
self._mqttc.subscribe(str(self._settings["node_format_basetopic"])+"tx/#")

self._log.debug("CONACK => Return code: "+str(rc))

def on_disconnect(self, client, userdata, rc):
if rc != 0:
self._log.info("Unexpected disconnection")
self._connected = False

def on_subscribe(self, mqttc, obj, mid, granted_qos):
self._log.info("on_subscribe")

def on_message(self, client, userdata, msg):
topic_parts = msg.topic.split("/")

if topic_parts[0] == self._settings["node_format_basetopic"][:-1]:
if topic_parts[1] == "tx":
if topic_parts[3] == "values":
nodeid = int(topic_parts[2])

payload = msg.payload
realdata = payload.split(",")
self._log.debug("Nodeid: "+str(nodeid)+" values: "+msg.payload)
Expand All @@ -107,11 +107,11 @@ def receiver(self, cargo):
# General MQTT format: emonhub/rx/emonpi/power1 ... 100
# ----------------------------------------------------------
if int(self._settings["nodevar_format_enable"])==1:

# Node id or nodename if given
nodestr = str(cargo.nodeid)
if cargo.nodename!=False: nodestr = str(cargo.nodename)

varid = 1
for value in cargo.realdata:
# Variable id or variable name if given
Expand All @@ -121,53 +121,53 @@ def receiver(self, cargo):
# Construct topic
topic = self._settings["nodevar_format_basetopic"]+nodestr+"/"+varstr
payload = str(value)

self._log.info("Publishing: "+topic+" "+payload)
result =self._mqttc.publish(topic, payload=payload, qos=2, retain=False)

if result[0]==4:
self._log.info("Publishing error? returned 4")

varid += 1

# RSSI
topic = self._settings["nodevar_format_basetopic"]+nodestr+"/rssi"
payload = str(cargo.rssi)
self._log.info("Publishing: "+topic+" "+payload)
result =self._mqttc.publish(topic, payload=payload, qos=2, retain=False)
# ----------------------------------------------------------

# ----------------------------------------------------------
# Emoncms nodes module format: emonhub/rx/10/values ... 100,200,300
# ----------------------------------------------------------
if int(self._settings["node_format_enable"])==1:

topic = self._settings["node_format_basetopic"]+"rx/"+str(cargo.nodeid)+"/values"
payload = ",".join(map(str,cargo.realdata))

self._log.info("Publishing: "+topic+" "+payload)
result =self._mqttc.publish(topic, payload=payload, qos=2, retain=False)

if result[0]==4:
self._log.info("Publishing error? returned 4")

# RSSI
topic = self._settings["node_format_basetopic"]+"rx/"+str(cargo.nodeid)+"/rssi"
payload = str(cargo.rssi)

self._log.info("Publishing: "+topic+" "+payload)
result =self._mqttc.publish(topic, payload=payload, qos=2, retain=False)

if result[0]==4:
self._log.info("Publishing error? returned 4")


def set(self, **kwargs):
for key,setting in self._settings.iteritems():
if key in kwargs.keys():
# replace default
self._settings[key] = kwargs[key]
# Subscribe to internal channels

# Subscribe to internal channels
for channel in self._settings["subchannels"]:
dispatcher.connect(self.receiver, channel)
self._log.debug(self._name+" Subscribed to channel' : " + str(channel))
12 changes: 12 additions & 0 deletions src/interfacers/emonhub_interfacer.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import urllib2
import json
import uuid
import traceback

import paho.mqtt.client as mqtt

Expand All @@ -32,6 +33,15 @@
their data source.

"""
def log_exceptions_from_class_method(f):
def wrapper(*args):
self=args[0]
try:
return f(*args)
except:
self._log.warning("Exception caught in "+self.name+" thread. "+traceback.format_exc())
return
return wrapper

class EmonHubInterfacer(threading.Thread):

Expand Down Expand Up @@ -61,6 +71,7 @@ def __init__(self, name):
# create a stop
self.stop = False

@log_exceptions_from_class_method
def run(self):
"""
Run the interfacer.
Expand All @@ -69,6 +80,7 @@ def run(self):
"""

while not self.stop:

# Read the input and process data if available
rxc = self.read()
# if 'pause' in self._settings and \
Expand Down