Skip to content

Commit

Permalink
Add LiveDataService for services without catchup
Browse files Browse the repository at this point in the history
Signed-off-by: Jim Easterbrook <jim@jim-easterbrook.me.uk>
  • Loading branch information
jim-easterbrook committed Aug 21, 2018
1 parent 4e3889c commit b8bedb4
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 20 deletions.
4 changes: 2 additions & 2 deletions src/pywws/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
__version__ = '18.8.0'
_release = '1569'
_commit = '1a5bac2'
_release = '1570'
_commit = '4e3889c'
79 changes: 67 additions & 12 deletions src/pywws/service/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,11 +166,9 @@ def log(self, message):
self.old_message = message


class DataService(ServiceBase):
catchup = 7

class DataServiceBase(ServiceBase):
def __init__(self, context):
super(DataService, self).__init__(context)
super(DataServiceBase, self).__init__(context)
# check config
template = context.params.get(self.service_name, 'template')
if template == 'default':
Expand All @@ -190,6 +188,18 @@ def __init__(self, context):
def do_catchup(self):
pass

def prepare_data(self, data):
data_str = self.templater.make_text(self.template_file, data)
self.template_file.seek(0)
return eval('{' + data_str + '}')

def valid_data(self, data):
return True


class DataService(DataServiceBase):
catchup = 7

def upload(self, catchup=True, live_data=None, test_mode=False, option=''):
OK = True
count = 0
Expand All @@ -208,11 +218,6 @@ def upload(self, catchup=True, live_data=None, test_mode=False, option=''):
if self.queue and not self.is_alive():
self.start()

def prepare_data(self, data):
data_str = self.templater.make_text(self.template_file, data)
self.template_file.seek(0)
return eval('{' + data_str + '}')

def next_data(self, catchup, live_data):
if not catchup:
start = self.context.calib_data.before(datetime.max)
Expand All @@ -236,9 +241,6 @@ def next_data(self, catchup, live_data):
yield live_data, True
self.last_update = live_data['idx']

def valid_data(self, data):
return True

def upload_batch(self):
OK = True
count = 0
Expand Down Expand Up @@ -277,6 +279,57 @@ def upload_batch(self):
return OK


class LiveDataService(DataServiceBase):
def upload(self, live_data=None, test_mode=False, option=''):
if live_data:
data = live_data
else:
idx = self.context.calib_data.before(datetime.max)
if not idx:
return
data = self.context.calib_data[idx]
timestamp = data['idx']
if test_mode:
timestamp = None
elif self.last_update and timestamp < self.last_update + self.interval:
return
if not self.valid_data(data):
return
self.last_update = data['idx']
prepared_data = self.prepare_data(data)
prepared_data.update(self.fixed_data)
self.queue.append((timestamp, {'prepared_data': prepared_data,
'live': bool(live_data)}))
# start upload thread
if self.queue and not self.is_alive():
self.start()

def upload_batch(self):
# remove stale uploads from queue
drop = len(self.queue) - 1
if self.queue[-1] is None:
drop -= 1
if drop > 0:
for i in range(drop):
self.queue.popleft()
self.logger.warning('{:d} record(s) dropped'.format(drop))
# send upload without taking it off queue
upload = self.queue[0]
if upload is None:
return False
timestamp, kwds = upload
with self.session() as session:
OK, message = self.upload_data(session, **kwds)
self.log(message)
if OK:
if timestamp:
self.context.status.set(
'last update', self.service_name, str(timestamp))
# finally remove upload from queue
self.queue.popleft()
return OK


class FileService(ServiceBase):
def __init__(self, context):
super(FileService, self).__init__(context)
Expand Down Expand Up @@ -363,6 +416,8 @@ def main(class_, argv=None):
if issubclass(class_, FileService):
for file in args.file:
uploader.upload(option=os.path.abspath(file))
elif issubclass(class_, LiveDataService):
uploader.upload(test_mode=True)
else:
uploader.upload(catchup=args.catchup, test_mode=not args.catchup)
uploader.stop()
Expand Down
3 changes: 1 addition & 2 deletions src/pywws/service/cwop.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,7 @@
logger = logging.getLogger(__name__)


class ToService(pywws.service.DataService):
catchup = 0
class ToService(pywws.service.LiveDataService):
fixed_data = {'version': pywws.__version__}
interval = timedelta(seconds=290)
logger = logger
Expand Down
3 changes: 1 addition & 2 deletions src/pywws/service/mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,7 @@
logger = logging.getLogger(__name__)


class ToService(pywws.service.DataService):
catchup = 0
class ToService(pywws.service.LiveDataService):
fixed_data = {}
interval = timedelta(seconds=40)
logger = logger
Expand Down
3 changes: 1 addition & 2 deletions src/pywws/service/temperaturnu.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,7 @@
logger = logging.getLogger(__name__)


class ToService(pywws.service.DataService):
catchup = 0
class ToService(pywws.service.LiveDataService):
fixed_data = {}
interval = timedelta(seconds=40)
logger = logger
Expand Down

0 comments on commit b8bedb4

Please sign in to comment.