Skip to content

Commit

Permalink
Add FileService base class
Browse files Browse the repository at this point in the history
and merge UploadThread class into base class.

This should allow easier definition of "services" that upload a file
rather than data. The UploadThread class has not been removed as it's
used by the current web site uploader and Twitter modules.

Signed-off-by: Jim Easterbrook <jim@jim-easterbrook.me.uk>
  • Loading branch information
jim-easterbrook committed Aug 20, 2018
1 parent 46de5de commit 5bfc528
Show file tree
Hide file tree
Showing 2 changed files with 152 additions and 14 deletions.
4 changes: 2 additions & 2 deletions src/pywws/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
__version__ = '18.8.0'
_release = '1562'
_commit = '3b73c4d'
_release = '1563'
_commit = '46de5de'
162 changes: 150 additions & 12 deletions src/pywws/service/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from collections import deque
from datetime import datetime, timedelta
import os
import shutil
import sys
import threading

Expand Down Expand Up @@ -119,11 +120,58 @@ def log(self, message):
self.old_message = message


class DataService(object):
log_count = True
class ServiceBase(threading.Thread):
interval = timedelta(seconds=40)

def __init__(self, context):
super(ServiceBase, self).__init__()
self.context = context
self.queue = deque()

def run(self):
self.logger.debug('thread started ' + self.name)
self.old_message = ''
if self.context.live_logging:
polling_interval = self.interval.total_seconds() / 20
polling_interval = min(max(polling_interval, 4.0), 40.0)
else:
polling_interval = 4.0
while not self.context.shutdown.is_set():
OK = True
if self.queue:
try:
OK = self.upload_batch()
except Exception as ex:
self.log(str(ex))
OK = False
if OK:
pause = polling_interval
elif self.context.live_logging:
# upload failed, wait before trying again
pause = 40.0
else:
# upload failed or nothing more to do
break
self.context.shutdown.wait(pause)

def stop(self):
if self.is_alive():
self.logger.debug('stopping thread ' + self.name)
self.queue.append(None)

def log(self, message):
if message == self.old_message:
self.logger.debug(message)
else:
self.logger.error(message)
self.old_message = message


class DataService(ServiceBase):
catchup = 7

def __init__(self, context):
super(DataService, self).__init__(context)
# check config
template = context.params.get(self.service_name, 'template')
if template == 'default':
Expand All @@ -139,27 +187,24 @@ def __init__(self, context):
# get time stamp of last uploaded data
self.last_update = self.context.status.get_datetime(
'last update', self.service_name)
# create upload thread
self.upload_thread = UploadThread(self, context)
self.stop = self.upload_thread.stop

def upload(self, catchup=True, live_data=None, test_mode=False, option=''):
OK = True
count = 0
for data, live in self.next_data(catchup and not test_mode, live_data):
if count >= 30 or len(self.upload_thread.queue) >= 60:
if count >= 30 or len(self.queue) >= 60:
break
timestamp = data['idx']
if test_mode:
timestamp = None
prepared_data = self.prepare_data(data)
prepared_data.update(self.fixed_data)
self.upload_thread.queue.append(
self.queue.append(
(timestamp, {'prepared_data': prepared_data, 'live': live}))
count += 1
# start upload thread
if self.upload_thread.queue and not self.upload_thread.is_alive():
self.upload_thread.start()
if self.queue and not self.is_alive():
self.start()

def prepare_data(self, data):
data_str = self.templater.make_text(self.template_file, data)
Expand Down Expand Up @@ -192,10 +237,96 @@ def next_data(self, catchup, live_data):
def valid_data(self, data):
return True

def upload_batch(self):
OK = True
count = 0
with self.session() as session:
while self.queue and not self.context.shutdown.is_set():
if self.catchup == 0:
# "live only" service, so ignore old records
drop = len(self.queue) - 1
if self.queue[-1] is None:
drop -= 1
if drop > 0:
for i in range(drop):
self.queue.popleft()
self.logger.warning(
'{:d} record(s) dropped'.format(drop))
# send upload without taking it off queue
upload = self.queue[0]
if upload is None:
OK = False
break
timestamp, kwds = upload
OK, message = self.upload_data(session, **kwds)
self.log(message)
if not OK:
break
count += 1
if timestamp:
self.context.status.set(
'last update', self.service_name, str(timestamp))
# finally remove upload from queue
self.queue.popleft()
if count > 1:
self.logger.warning('{:d} records sent'.format(count))
elif count:
self.logger.info('1 record sent')
return OK


class FileService(ServiceBase):
def __init__(self, context):
super(FileService, self).__init__(context)
# create directory for files to upload
work_dir = context.params.get('paths', 'work', '/tmp/pywws')
self.uploads_directory = os.path.join(work_dir, self.service_name)
if not os.path.isdir(self.uploads_directory):
os.makedirs(self.uploads_directory)
# queue any pending files
for name in os.listdir(self.uploads_directory):
path = os.path.join(self.uploads_directory, name)
self.upload(option=path, delete=True)

def upload(self, live_data=None, option='', delete=True):
self.queue.append((option, delete))
# start upload thread
if self.queue and not self.is_alive():
self.start()

def upload_batch(self):
OK = True
with self.session() as session:
while self.queue and not self.context.shutdown.is_set():
# send upload without taking it off queue
upload = self.queue[0]
if upload is None:
OK = False
break
path, delete = upload
if delete:
src_dir, base_name = os.path.split(path)
if src_dir != self.uploads_directory:
target = os.path.join(self.uploads_directory, base_name)
if os.path.exists(target):
os.unlink(target)
shutil.move(path, target)
path = target
OK, message = self.upload_file(session, path)
self.log(message)
if not OK:
break
if delete:
os.unlink(path)
# finally remove upload from queue
self.queue.popleft()
return OK


def main(class_, argv=None):
import argparse
import inspect

if argv is None:
argv = sys.argv
docstring = inspect.getdoc(sys.modules[class_.__module__]).split('\n\n')
Expand All @@ -204,11 +335,14 @@ def main(class_, argv=None):
if hasattr(class_, 'register'):
parser.add_argument('-r', '--register', action='store_true',
help='register (or update) with service')
parser.add_argument('-c', '--catchup', action='store_true',
help='upload all data since last upload')
if issubclass(class_, DataService):
parser.add_argument('-c', '--catchup', action='store_true',
help='upload all data since last upload')
parser.add_argument('-v', '--verbose', action='count',
help='increase amount of reassuring messages')
parser.add_argument('data_dir', help='root directory of the weather data')
if issubclass(class_, FileService):
parser.add_argument('file', nargs='*', help='file to be uploaded')
args = parser.parse_args(argv[1:])
pywws.logger.setup_handler(args.verbose or 0)
with pywws.storage.pywws_context(args.data_dir) as context:
Expand All @@ -217,6 +351,10 @@ def main(class_, argv=None):
uploader.register()
context.flush()
return 0
uploader.upload(catchup=args.catchup, test_mode=not args.catchup)
if issubclass(class_, FileService):
for file in args.file:
uploader.upload(option=file, delete=False)
else:
uploader.upload(catchup=args.catchup, test_mode=not args.catchup)
uploader.stop()
return 0

0 comments on commit 5bfc528

Please sign in to comment.