Skip to content

Commit

Permalink
Add CatchupDataService for services with catchup
Browse files Browse the repository at this point in the history
Signed-off-by: Jim Easterbrook <jim@jim-easterbrook.me.uk>
  • Loading branch information
jim-easterbrook committed Aug 22, 2018
1 parent bff108c commit 52eb439
Show file tree
Hide file tree
Showing 7 changed files with 62 additions and 100 deletions.
2 changes: 1 addition & 1 deletion src/pywws/examples/modules/aws_api_gw_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
logger = logging.getLogger(__name__)


class ToService(pywws.service.DataService):
class ToService(pywws.service.CatchupDataService):
catchup = 100
fixed_data = {}
interval = timedelta(seconds=300)
Expand Down
144 changes: 56 additions & 88 deletions src/pywws/service/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from StringIO import StringIO

import pywws
from pywws.constants import SECOND
import pywws.logger
import pywws.storage
import pywws.template
Expand Down Expand Up @@ -167,6 +168,8 @@ def log(self, message):


class DataServiceBase(ServiceBase):
catchup = 7

def __init__(self, context):
super(DataServiceBase, self).__init__(context)
# check config
Expand All @@ -184,78 +187,31 @@ def __init__(self, context):
# get time stamp of last uploaded data
self.last_update = self.context.status.get_datetime(
'last update', self.service_name)
if not self.last_update:
self.last_update = datetime.utcnow() - timedelta(days=self.catchup)

def do_catchup(self):
pass

def prepare_data(self, data):
def queue_data(self, timestamp, data, live):
if timestamp and timestamp < self.last_update + self.interval:
return
if not self.valid_data(data):
return
data_str = self.templater.make_text(self.template_file, data)
self.template_file.seek(0)
return eval('{' + data_str + '}')
prepared_data = eval('{' + data_str + '}')
prepared_data.update(self.fixed_data)
self.queue.append(
(timestamp, {'prepared_data': prepared_data, 'live': live}))
if timestamp:
self.last_update = timestamp

def valid_data(self, data):
return True


class DataService(DataServiceBase):
catchup = 7

def upload(self, catchup=True, live_data=None, test_mode=False, option=''):
OK = True
count = 0
for data, live in self.next_data(catchup and not test_mode, live_data):
if count >= 30 or len(self.queue) >= 60:
break
timestamp = data['idx']
if test_mode:
timestamp = None
prepared_data = self.prepare_data(data)
prepared_data.update(self.fixed_data)
self.queue.append(
(timestamp, {'prepared_data': prepared_data, 'live': live}))
count += 1
# start upload thread
if self.queue and not self.is_alive():
self.start()

def next_data(self, catchup, live_data):
if not catchup:
start = self.context.calib_data.before(datetime.max)
elif self.last_update:
start = self.last_update + self.interval
else:
start = datetime.utcnow() - max(
timedelta(days=self.catchup), self.interval)
if live_data:
stop = live_data['idx'] - self.interval
else:
stop = None
next_update = start or datetime.min
for data in self.context.calib_data[start:stop]:
if data['idx'] >= next_update and self.valid_data(data):
yield data, False
self.last_update = data['idx']
next_update = self.last_update + self.interval
if (live_data and live_data['idx'] >= next_update and
self.valid_data(live_data)):
yield live_data, True
self.last_update = live_data['idx']

def upload_batch(self):
OK = True
count = 0
with self.session() as session:
while self.queue and not self.context.shutdown.is_set():
if self.catchup == 0:
# "live only" service, so ignore old records
drop = len(self.queue) - 1
if self.queue[-1] is None:
drop -= 1
if drop > 0:
for i in range(drop):
self.queue.popleft()
self.logger.warning(
'{:d} record(s) dropped'.format(drop))
# send upload without taking it off queue
upload = self.queue[0]
if upload is None:
Expand All @@ -279,7 +235,40 @@ def upload_batch(self):
return OK


class CatchupDataService(DataServiceBase):
def do_catchup(self):
start = self.last_update + self.interval
for data in self.context.calib_data[start:]:
if len(self.queue) >= 60:
break
self.queue_data(data['idx'], data, False)
# start upload thread
if self.queue and not self.is_alive():
self.start()

def upload(self, live_data=None, test_mode=False, option=''):
if test_mode:
idx = self.context.calib_data.before(datetime.max)
else:
idx = self.context.calib_data.after(self.last_update + self.interval)
if idx:
data = self.context.calib_data[idx]
timestamp = data['idx']
if test_mode:
timestamp = None
self.queue_data(timestamp, data, False)
idx = self.context.calib_data.after(data['idx'] + SECOND)
if live_data and not idx:
self.queue_data(live_data['idx'], live_data, True)
# start upload thread
if self.queue and not self.is_alive():
self.start()


class LiveDataService(DataServiceBase):
def do_catchup(self):
pass

def upload(self, live_data=None, test_mode=False, option=''):
if live_data:
data = live_data
Expand All @@ -291,15 +280,7 @@ def upload(self, live_data=None, test_mode=False, option=''):
timestamp = data['idx']
if test_mode:
timestamp = None
elif self.last_update and timestamp < self.last_update + self.interval:
return
if not self.valid_data(data):
return
self.last_update = data['idx']
prepared_data = self.prepare_data(data)
prepared_data.update(self.fixed_data)
self.queue.append((timestamp, {'prepared_data': prepared_data,
'live': bool(live_data)}))
self.queue_data(timestamp, data, bool(live_data))
# start upload thread
if self.queue and not self.is_alive():
self.start()
Expand All @@ -313,21 +294,8 @@ def upload_batch(self):
for i in range(drop):
self.queue.popleft()
self.logger.warning('{:d} record(s) dropped'.format(drop))
# send upload without taking it off queue
upload = self.queue[0]
if upload is None:
return False
timestamp, kwds = upload
with self.session() as session:
OK, message = self.upload_data(session, **kwds)
self.log(message)
if OK:
if timestamp:
self.context.status.set(
'last update', self.service_name, str(timestamp))
# finally remove upload from queue
self.queue.popleft()
return OK
# send most recent data
return super(LiveDataService, self).upload_batch()


class FileService(ServiceBase):
Expand Down Expand Up @@ -397,7 +365,7 @@ def main(class_, argv=None):
if hasattr(class_, 'register'):
parser.add_argument('-r', '--register', action='store_true',
help='register (or update) with service')
if issubclass(class_, DataService):
if not issubclass(class_, CatchupDataService):
parser.add_argument('-c', '--catchup', action='store_true',
help='upload all data since last upload')
parser.add_argument('-v', '--verbose', action='count',
Expand All @@ -416,9 +384,9 @@ def main(class_, argv=None):
if issubclass(class_, FileService):
for file in args.file:
uploader.upload(option=os.path.abspath(file))
elif issubclass(class_, LiveDataService):
uploader.upload(test_mode=True)
elif issubclass(class_, CatchupDataService) and args.catchup:
uploader.do_catchup()
else:
uploader.upload(catchup=args.catchup, test_mode=not args.catchup)
uploader.upload(test_mode=True)
uploader.stop()
return 0
3 changes: 1 addition & 2 deletions src/pywws/service/metoffice.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,7 @@
logger = logging.getLogger(__name__)


class ToService(pywws.service.DataService):
catchup = 7
class ToService(pywws.service.CatchupDataService):
fixed_data = {'softwaretype': 'pywws-' + pywws.__version__}
interval = timedelta(seconds=300)
logger = logger
Expand Down
3 changes: 1 addition & 2 deletions src/pywws/service/openweathermap.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,7 @@
logger = logging.getLogger(__name__)


class ToService(pywws.service.DataService):
catchup = 7
class ToService(pywws.service.CatchupDataService):
fixed_data = {}
interval = timedelta(seconds=40)
logger = logger
Expand Down
3 changes: 1 addition & 2 deletions src/pywws/service/pwsweather.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@
logger = logging.getLogger(__name__)


class ToService(pywws.service.DataService):
catchup = 7
class ToService(pywws.service.CatchupDataService):
fixed_data = {'action': 'updateraw', 'softwaretype': 'pywws'}
interval = timedelta(seconds=40)
logger = logger
Expand Down
4 changes: 1 addition & 3 deletions src/pywws/service/underground.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,8 @@
logger = logging.getLogger(__name__)


class ToService(pywws.service.DataService):
catchup = 7
class ToService(pywws.service.CatchupDataService):
fixed_data = {'action': 'updateraw', 'softwaretype': 'pywws'}
interval = timedelta(seconds=40)
logger = logger
service_name = service_name
template = """
Expand Down
3 changes: 1 addition & 2 deletions src/pywws/service/wetterarchivde.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,7 @@
logger = logging.getLogger(__name__)


class ToService(pywws.service.DataService):
catchup = 7
class ToService(pywws.service.CatchupDataService):
fixed_data = {'sid': 'pywws'}
interval = timedelta(seconds=290)
logger = logger
Expand Down

0 comments on commit 52eb439

Please sign in to comment.