Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Emon pi develop #38

Merged
merged 18 commits into from Aug 16, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
53 changes: 47 additions & 6 deletions src/emonhub.py
Expand Up @@ -16,6 +16,7 @@
import signal
import argparse
import pprint

try:
import pymodbus
pymodbus_found = True
Expand Down Expand Up @@ -85,7 +86,7 @@

class EmonHub(object):

__version__ = "emonHub 'emon-pi' variant v1.2"
__version__ = "emonHub emon-pi variant v2.0.0"

def __init__(self, setup):
"""Setup an OpenEnergyMonitor emonHub.
Expand Down Expand Up @@ -124,6 +125,11 @@ def run(self):
# Set signal handler to catch SIGINT and shutdown gracefully
signal.signal(signal.SIGINT, self._sigint_handler)

# Initialise thread restart counters
restart_count={}
for I in self._interfacers.itervalues():
restart_count[I.name]=0

# Until asked to stop
while not self._exit:

Expand All @@ -133,12 +139,47 @@ def run(self):
self._update_settings(self._setup.settings)

# For all Interfacers
kill_list=[]
for I in self._interfacers.itervalues():
# Check thread is still running
# Check threads are still running
if not I.isAlive():
#I.start()
self._log.warning(I.name + " thread is dead") # had to be restarted")

kill_list.append(I.name) # <-avoid modification of iterable within loop

# Read each interfacers pub channels
for pub_channel in I._settings['pubchannels']:

if pub_channel in I._pub_channels:
if len(I._pub_channels[pub_channel])>0:

# POP cargo item (one at a time)
cargo = I._pub_channels[pub_channel].pop(0)

# Post to each subscriber interface
for sub_interfacer in self._interfacers.itervalues():
# For each subsciber channel
for sub_channel in sub_interfacer._settings['subchannels']:
# If channel names match
if sub_channel==pub_channel:
# init if empty
if not sub_channel in sub_interfacer._sub_channels:
sub_interfacer._sub_channels[sub_channel] = []

# APPEND cargo item
sub_interfacer._sub_channels[sub_channel].append(cargo)

# ->avoid modification of iterable within loop
for name in kill_list:
self._log.warning(name + " thread is dead.")

# The following should trigger a restart ... unless the
# interfacer is also removed from the settings table.
del(self._interfacers[name])

# Trigger restart by calling update settings
self._log.warning("Attempting to restart thread "+name+" (thread has been restarted "+str(restart_count[name])+" times...")
restart_count[name]+=1
self._update_settings(self._setup.settings)

# Sleep until next iteration
time.sleep(0.2)

Expand Down Expand Up @@ -208,7 +249,7 @@ def _update_settings(self, settings):
if I['Type'] in ('EmonModbusTcpInterfacer','EmonFroniusModbusTcpInterfacer') and not pymodbus_found :
self._log.error("Python module pymodbus not installed. unable to load modbus interfacer")
# This gets the class from the 'Type' string
interfacer = getattr(ehi, I['Type'])(name, **I['init_settings'])
interfacer = getattr(ehi, I['Type'])(name,**I['init_settings'])
interfacer.set(**I['runtimesettings'])
interfacer.init_settings = I['init_settings']
interfacer.start()
Expand Down
2 changes: 0 additions & 2 deletions src/interfacers/EmonHubBMWInterfacer.py
Expand Up @@ -20,8 +20,6 @@
import requests
import os.path

from pydispatch import dispatcher

from emonhub_interfacer import EmonHubInterfacer

"""class EmonHubBMWInterfacer
Expand Down
80 changes: 50 additions & 30 deletions src/interfacers/EmonHubEmoncmsHTTPInterfacer.py
Expand Up @@ -4,7 +4,6 @@
import json
import urllib2
import httplib
from pydispatch import dispatcher
from emonhub_interfacer import EmonHubInterfacer

class EmonHubEmoncmsHTTPInterfacer(EmonHubInterfacer):
Expand All @@ -26,38 +25,64 @@ def __init__(self, name):
'sendinterval': 30
}

self.buffer = []
# Initialize message queue
self._pub_channels = {}
self._sub_channels = {}

self.lastsent = time.time()
self.lastsentstatus = time.time()

def receiver(self, cargo):

# Create a frame of data in "emonCMS format"
f = []
try:
f.append(float(cargo.timestamp))
f.append(cargo.nodeid)
for i in cargo.realdata:
f.append(i)
if cargo.rssi:
f.append(cargo.rssi)
self._log.debug(str(cargo.uri) + " adding frame to buffer => "+ str(f))
except:
self._log.warning("Failed to create emonCMS frame " + str(f))

# Append to bulk post buffer
self.buffer.append(f)

def action(self):

now = time.time()

if (now-self.lastsent) > (int(self._settings['sendinterval'])):
self.lastsent = now
# print json.dumps(self.buffer)

if int(self._settings['senddata']):
self.bulkpost(self.buffer)
self.buffer = []
# It might be better here to combine the output from all sub channels
# into a single bulk post, most of the time there is only one sub channel
for channel in self._settings["subchannels"]:
if channel in self._sub_channels:

# only try to prepare and send data if there is any
if len(self._sub_channels[channel])>0:

bulkdata = []

for cargo in self._sub_channels[channel]:
# Create a frame of data in "emonCMS format"
f = []
try:
f.append(float(cargo.timestamp))
f.append(cargo.nodeid)
for i in cargo.realdata:
f.append(i)
if cargo.rssi:
f.append(cargo.rssi)
self._log.debug(str(cargo.uri) + " adding frame to buffer => "+ str(f))
except:
self._log.warning("Failed to create emonCMS frame " + str(f))

bulkdata.append(f)

# Get the length of the data to be sent
bulkdata_length = len(bulkdata)
self._log.debug("Sending bulkdata, length: "+str(bulkdata_length))

# Attempt to send the data
success = self.bulkpost(bulkdata)

self._log.debug("Sending bulkdata, success: "+str(success))

# if bulk post is successful delete the range posted
if success:
for i in range(0,bulkdata_length):
self._sub_channels[channel].pop(0)
self._log.debug("Deleted sent data from queue")

self._log.debug("New queue length: "+str(len(self._sub_channels[channel])))


if (now-self.lastsentstatus)> (int(self._settings['sendinterval'])):
self.lastsentstatus = now
Expand All @@ -68,7 +93,7 @@ def bulkpost(self,databuffer):

if not 'apikey' in self._settings.keys() or str.__len__(str(self._settings['apikey'])) != 32 \
or str.lower(str(self._settings['apikey'])) == 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx':
return
return False

data_string = json.dumps(databuffer, separators=(',', ':'))

Expand Down Expand Up @@ -100,6 +125,7 @@ def bulkpost(self,databuffer):
return True
else:
self._log.warning("send failure: wanted 'ok' but got '" +reply+ "'")
return False

def _send_post(self, post_url, post_body=None):
"""
Expand Down Expand Up @@ -157,9 +183,3 @@ def set(self, **kwargs):
if key in kwargs.keys():
# replace default
self._settings[key] = kwargs[key]

# Subscribe to internal channels
for channel in self._settings["subchannels"]:
dispatcher.connect(self.receiver, channel)
self._log.debug(self._name+" Subscribed to channel' : " + str(channel))

28 changes: 2 additions & 26 deletions src/interfacers/EmonHubJeeInterfacer.py
@@ -1,7 +1,5 @@

import time
from pydispatch import dispatcher

import datetime
import Cargo
import EmonHubSerialInterfacer as ehi
Expand Down Expand Up @@ -49,6 +47,7 @@ def __init__(self, name, com_port='/dev/ttyAMA0', com_baud=0):
else:
self._log.warning("Device communication error - check settings")
self._rx_buf=""

self._ser.flushInput()

# Initialize settings
Expand All @@ -66,7 +65,7 @@ def __init__(self, name, com_port='/dev/ttyAMA0', com_baud=0):
# Pre-load Jee settings only if info string available for checks
if all(i in self.info[1] for i in (" i", " g", " @ ", " MHz")):
self._settings.update(self._jee_settings)

def read(self):
"""Read data from serial port and process if complete line received.

Expand Down Expand Up @@ -146,12 +145,6 @@ def read(self):

return c

# # unix timestamp
# t = round(time.time(), 2)
#
# # Process data frame
# self._r xq.put(self._process_rx(f, t))

def set(self, **kwargs):
"""Send configuration parameters to the "Jee" type device through COM port

Expand Down Expand Up @@ -221,26 +214,10 @@ def action(self):
self._ser.write("00,%02d,%02d,00,s" % (now.hour, now.minute))

def send (self, cargo):
"""
"""
#self._process_tx(self._txq.get())
#self._rxq.put( self._process_rx(f, t))
#dest = f[1]
#packet = f[2:-1]
#self.send_packet(packet, dest)
# TODO amalgamate into 1 send

#def send_packet(self, packet, id=0, cmd="s"):
"""

"""
f = cargo
cmd = "s"

# # If the use of acks gets implemented
# ack = False
# if ack:
# cmd = "a"
if self.getName() in f.encoded:
data = f.encoded[self.getName()]
else:
Expand All @@ -257,4 +234,3 @@ def send (self, cargo):

self._log.debug(str(f.uri) + " sent TX packet: " + payload)
self._ser.write(payload)