Skip to content

Commit

Permalink
Revert Emoncms HTTP Interfacer to use emonhub buffer approach develop…
Browse files Browse the repository at this point in the history
…ed by @pb66
  • Loading branch information
glynhudson committed Jan 31, 2018
1 parent d7b17d5 commit 65278b2
Show file tree
Hide file tree
Showing 3 changed files with 264 additions and 122 deletions.
115 changes: 115 additions & 0 deletions src/emonhub_buffer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
"""
This code is released under the GNU Affero General Public License.
OpenEnergyMonitor project:
http://openenergymonitor.org
"""

import logging

"""class AbstractBuffer
Represents the actual buffer being used.
"""


class AbstractBuffer():

def storeItem(self, data):
raise NotImplementedError

def retrieveItems(self, number):
raise NotImplementedError

def retrieveItem(self):
raise NotImplementedError

def discardLastRetrievedItem(self):
raise NotImplementedError

def discardLastRetrievedItems(self, number):
raise NotImplementedError

def hasItems(self):
raise NotImplementedError

"""
This implementation of the AbstractBuffer just uses an in-memory data structure.
It's basically identical to the previous (inline) buffer.
"""


class InMemoryBuffer(AbstractBuffer):

def __init__(self, bufferName, buffer_size):
self._bufferName = str(bufferName)
self._buffer_type = "memory"
self._maximumEntriesInBuffer = int(buffer_size)
self._data_buffer = []
self._log = logging.getLogger("EmonHub")

def hasItems(self):
return self.size() > 0

def isFull(self):
return self.size() >= self._maximumEntriesInBuffer

def getMaxEntrySliceIndex(self):
return max(0,
self.size() - self._maximumEntriesInBuffer - 1)

def discardOldestItems(self):
self._data_buffer = self._data_buffer[self.getMaxEntrySliceIndex():]

def discardOldestItemsIfFull(self):
if self.isFull():
self._log.warning(
"In-memory buffer (%s) reached limit of %d items, deleting oldest"
% (self._bufferName, self._maximumEntriesInBuffer))
self.discardOldestItems()

def storeItem(self, data):
self.discardOldestItemsIfFull()
self._data_buffer.append(data)

def retrieveItem(self):
return self._data_buffer[0]

def retrieveItems(self, number):
blen = len(self._data_buffer)
if number > blen:
number = blen
return self._data_buffer[:number]

def discardLastRetrievedItem(self):
del self._data_buffer[0]

def discardLastRetrievedItems(self, number):
blen = len(self._data_buffer)
if number > blen:
number = blen
self._data_buffer = self._data_buffer[number:]

def size(self):
return len(self._data_buffer)


"""
The getBuffer function returns the buffer class corresponding to a
buffering method passed as argument.
"""
bufferMethodMap = {
'memory': InMemoryBuffer
}


def getBuffer(method):
"""Returns the buffer class corresponding to the method
method (string): buffering method
"""
return bufferMethodMap[method]

125 changes: 16 additions & 109 deletions src/interfacers/EmonHubEmoncmsHTTPInterfacer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import urllib2
import httplib
from emonhub_interfacer import EmonHubInterfacer
import emonhub_buffer as ehb

class EmonHubEmoncmsHTTPInterfacer(EmonHubInterfacer):

Expand All @@ -13,88 +14,30 @@ def __init__(self, name):
super(EmonHubEmoncmsHTTPInterfacer, self).__init__(name)

self._name = name


# add or alter any default settings for this reporter
self._defaults.update({'batchsize': 100,'interval': 30})
self._settings = {
'subchannels':['ch1'],
'pubchannels':['ch2'],

'apikey': "",
'url': "http://emoncms.org",
'senddata': 1,
'sendstatus': 0,
'sendinterval': 30
'interval': 30
}

# Initialize message queue
self._pub_channels = {}
self._sub_channels = {}
# This line will stop the default values printing to logfile at start-up
self._settings.update(self._defaults)

self.lastsent = time.time()
self.lastsentstatus = time.time()
# set an absolute upper limit for number of items to process per post
self._item_limit = 250

def _process_post(self, databuffer):
"""Send data to server."""

# databuffer is of format:
# [[timestamp, nodeid, datavalues][timestamp, nodeid, datavalues]]
# [[1399980731, 10, 150, 250 ...]]

def action(self):

now = time.time()

if (now-self.lastsent) > (int(self._settings['sendinterval'])):
self.lastsent = now

# It might be better here to combine the output from all sub channels
# into a single bulk post, most of the time there is only one sub channel
for channel in self._settings["subchannels"]:
if channel in self._sub_channels:

# only try to prepare and send data if there is any
if len(self._sub_channels[channel])>0:

bulkdata = []

for cargo in self._sub_channels[channel]:
# Create a frame of data in "emonCMS format"
f = []
try:
f.append(float(cargo.timestamp))
f.append(cargo.nodeid)
for i in cargo.realdata:
f.append(i)
if cargo.rssi:
f.append(cargo.rssi)
#self._log.debug(str(cargo.uri) + " adding frame to buffer => "+ str(f))
except:
self._log.warning("Failed to create emonCMS frame " + str(f))

bulkdata.append(f)

# Get the length of the data to be sent
bulkdata_length = len(bulkdata)

if int(self._settings['senddata']):
self._log.debug("Sending bulkdata, length: "+str(bulkdata_length))
# Attempt to send the data
success = self.bulkpost(bulkdata)
self._log.debug("Sending bulkdata, success: "+str(success))
else:
success = True


# if bulk post is successful delete the range posted
if success:
for i in range(0,bulkdata_length):
self._sub_channels[channel].pop(0)
#self._log.debug("Deleted sent data from queue")


if int(self._settings['senddata']):
self._log.debug("Current queue length: "+str(len(self._sub_channels[channel])))


if (now-self.lastsentstatus)> (int(self._settings['sendinterval'])):
self.lastsentstatus = now
if int(self._settings['sendstatus']):
self.sendstatus()

def bulkpost(self,databuffer):

if not 'apikey' in self._settings.keys() or str.__len__(str(self._settings['apikey'])) != 32 \
or str.lower(str(self._settings['apikey'])) == 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx':
return False
Expand Down Expand Up @@ -131,42 +74,6 @@ def bulkpost(self,databuffer):
self._log.warning("send failure: wanted 'ok' but got '" +reply+ "'")
return False

def _send_post(self, post_url, post_body=None):
"""
:param post_url:
:param post_body:
:return: the received reply if request is successful
"""
"""Send data to server.
data (list): node and values (eg: '[node,val1,val2,...]')
time (int): timestamp, time when sample was recorded
return True if data sent correctly
"""

reply = ""
request = urllib2.Request(post_url, post_body)
try:
response = urllib2.urlopen(request, timeout=60)
except urllib2.HTTPError as e:
self._log.warning(self.name + " couldn't send to server, HTTPError: " +
str(e.code))
except urllib2.URLError as e:
self._log.warning(self.name + " couldn't send to server, URLError: " +
str(e.reason))
except httplib.HTTPException:
self._log.warning(self.name + " couldn't send to server, HTTPException")
except Exception:
import traceback
self._log.warning(self.name + " couldn't send to server, Exception: " +
traceback.format_exc())
else:
reply = response.read()
finally:
return reply

def sendstatus(self):
if not 'apikey' in self._settings.keys() or str.__len__(str(self._settings['apikey'])) != 32 \
Expand Down
Loading

0 comments on commit 65278b2

Please sign in to comment.