Skip to content

Commit

Permalink
Process backlog before running live log loop
Browse files Browse the repository at this point in the history
Signed-off-by: Jim Easterbrook <jim@jim-easterbrook.me.uk>
  • Loading branch information
jim-easterbrook committed May 22, 2018
1 parent 3d5e644 commit 945844e
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 19 deletions.
4 changes: 2 additions & 2 deletions src/pywws/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
__version__ = '18.5.0'
_release = '1536'
_commit = '1cfbb6a'
_release = '1537'
_commit = '3d5e644'
10 changes: 6 additions & 4 deletions src/pywws/livelog.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,13 @@ def live_log(data_dir):
datalogger = pywws.logdata.DataLogger(context)
# create a RegularTasks object
tasks = pywws.regulartasks.RegularTasks(context)
# clear any processing backlog
if context.raw_data.before(datetime.max):
pywws.process.process_data(context)
# get live data
try:
# catchup with any logged data
if datalogger.catchup_needed():
datalogger.log_data(sync=1)
pywws.process.process_data(context)
tasks.do_tasks()
# get live data
for data, logged in datalogger.live_data(
logged_only=(not tasks.has_live_tasks())):
if logged:
Expand Down
38 changes: 25 additions & 13 deletions src/pywws/logdata.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,16 @@ def check_fixed_block(self):
self.status.set('fixed', 'fixed block', pprint.pformat(fixed_block))
return fixed_block

def catchup_needed(self):
# predict time stamp of next logged data
next_stored = self.raw_data.before(datetime.max)
if not next_stored:
return True
fixed_block = self.ws.get_fixed_block(unbuffered=True)
next_stored += timedelta(minutes=fixed_block['read_period'])
# nothing to fetch if it's far enough into the future
return (next_stored - datetime.utcnow()).total_seconds() < 48.0

def catchup(self, last_date, last_ptr):
fixed_block = self.ws.get_fixed_block(unbuffered=True)
# get time to go back to
Expand Down Expand Up @@ -232,25 +242,27 @@ def log_data(self, sync=None, clear=False):
self.ws.write_data([(ptr, 1), (ptr+1, 0)])

def live_data(self, logged_only=False):
next_ptr = None
next_hour = datetime.utcnow(
).replace(minute=0, second=0, microsecond=0) + HOUR
next_ptr = None
if self.catchup_needed():
for data, ptr, logged in self.ws.live_data(logged_only=True):
self.catchup(data['idx'], ptr)
next_ptr = self.ws.inc_ptr(ptr)
break
for data, ptr, logged in self.ws.live_data(logged_only=logged_only):
now = data['idx']
if logged:
now = data['idx']
if ptr == next_ptr:
# data is contiguous with last logged value
self.raw_data[now] = data
if now >= next_hour:
next_hour += HOUR
self.check_fixed_block()
self.status.set(
'data', 'ptr', '%06x,%s' % (ptr, now.isoformat(' ')))
else:
# catch up missing data
self.catchup(now, ptr)
if next_ptr:
assert(ptr == next_ptr)
self.raw_data[now] = data
self.status.set(
'data', 'ptr', '%06x,%s' % (ptr, now.isoformat(' ')))
next_ptr = self.ws.inc_ptr(ptr)
yield data, logged
if now >= next_hour:
next_hour += HOUR
self.check_fixed_block()


def main(argv=None):
Expand Down

0 comments on commit 945844e

Please sign in to comment.