Skip to content

Commit

Permalink
Eliminated file copying/moving in file services
Browse files Browse the repository at this point in the history
Instead of using a directory of files to upload, a list of pending file
paths is stored in status.ini. This allows failed uploads to be resumed
next time pywws is run.

Signed-off-by: Jim Easterbrook <jim@jim-easterbrook.me.uk>
  • Loading branch information
jim-easterbrook committed Aug 21, 2018
1 parent 5054dc7 commit 4eef439
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 41 deletions.
4 changes: 2 additions & 2 deletions src/pywws/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
__version__ = '18.8.0'
_release = '1565'
_commit = '7f1e07e'
_release = '1566'
_commit = '5054dc7'
5 changes: 3 additions & 2 deletions src/pywws/regulartasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,9 +172,10 @@ def _do_common(self, now, sections, live_data=None):
continue
self.do_template(template, data=live_data, local='L' in flags)
# do service tasks
for name in self.services:
self.services[name].do_catchup()
for name, option in service_tasks:
if name in self.services:
self.services[name].upload(live_data=live_data, option=option)
self.services[name].upload(live_data=live_data, option=option)
# upload non local files
upload_files = []
for name in os.listdir(self.uploads_directory):
Expand Down
75 changes: 38 additions & 37 deletions src/pywws/service/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@

from collections import deque
from datetime import datetime, timedelta
import os
import shutil
import sys
import threading

Expand Down Expand Up @@ -188,6 +186,9 @@ def __init__(self, context):
self.last_update = self.context.status.get_datetime(
'last update', self.service_name)

def do_catchup(self):
pass

def upload(self, catchup=True, live_data=None, test_mode=False, option=''):
OK = True
count = 0
Expand Down Expand Up @@ -276,50 +277,50 @@ def upload_batch(self):


class FileService(ServiceBase):
def __init__(self, context):
super(FileService, self).__init__(context)
# create directory for files to upload
work_dir = context.params.get('paths', 'work', '/tmp/pywws')
self.uploads_directory = os.path.join(work_dir, self.service_name)
if not os.path.isdir(self.uploads_directory):
os.makedirs(self.uploads_directory)
# queue any pending files
for name in os.listdir(self.uploads_directory):
path = os.path.join(self.uploads_directory, name)
self.upload(option=path, delete=True)

def upload(self, live_data=None, option='', delete=True):
self.queue.append((option, delete))
def do_catchup(self):
pending = eval(self.context.status.get(
'pending', self.service_name, '[]'))
if pending:
self.upload(option='CATCHUP')

def upload(self, live_data=None, option=''):
self.queue.append(option)
# start upload thread
if self.queue and not self.is_alive():
self.start()

def upload_batch(self):
# make list of files to upload
pending = eval(self.context.status.get(
'pending', self.service_name, '[]'))
files = []
while self.queue and not self.context.shutdown.is_set():
upload = self.queue[0]
if upload is None:
break
if upload == 'CATCHUP':
for path in pending:
if path not in files:
files.append(path)
else:
if upload not in files:
files.append(upload)
if upload not in pending:
pending.append(upload)
self.queue.popleft()
# upload files
OK = True
with self.session() as session:
while self.queue and not self.context.shutdown.is_set():
# send upload without taking it off queue
upload = self.queue[0]
if upload is None:
OK = False
break
path, delete = upload
if delete:
src_dir, base_name = os.path.split(path)
if src_dir != self.uploads_directory:
target = os.path.join(self.uploads_directory, base_name)
if os.path.exists(target):
os.unlink(target)
shutil.move(path, target)
path = target
for path in files:
OK, message = self.upload_file(session, path)
self.log(message)
if not OK:
if OK:
pending.remove(path)
else:
break
if delete:
os.unlink(path)
# finally remove upload from queue
self.queue.popleft()
self.context.status.set('pending', self.service_name, repr(pending))
if self.queue and self.queue[0] is None:
OK = False
return OK


Expand Down Expand Up @@ -353,7 +354,7 @@ def main(class_, argv=None):
return 0
if issubclass(class_, FileService):
for file in args.file:
uploader.upload(option=file, delete=False)
uploader.upload(option=file)
else:
uploader.upload(catchup=args.catchup, test_mode=not args.catchup)
uploader.stop()
Expand Down

0 comments on commit 4eef439

Please sign in to comment.