Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Added seperate writers

  • Loading branch information...
commit 07a1a6c1299789090e95134cac2119bc46ba48a1 1 parent 5892381
Kevin Landreth authored
Showing with 33 additions and 13 deletions.
  1. +33 −13 slbackup.py
View
46 slbackup.py
@@ -362,20 +362,23 @@ def threaded_processor(app, files, directories, remote_objects, uploads,
deletes, mkdirs):
l = logging.getLogger("threaded_processor")
workers = Pool(app.threads)
+ writers = Pool(app.threads)
file_proc = None
dir_proc = None
+ upload_proc = None
if directories.qsize() > 1:
+
l.info("Processing %d directories", directories.qsize())
dir_done = IterUnwrap(threaded_done_marker, mkdirs)
process_directories = IterUnwrap(process_directory,
copy(app), remote_objects, mkdirs)
dir_proc = workers.map_async(process_directories,
- queue_iter(directories), 1, dir_done)
+ queue_iter(directories), app.threads, dir_done)
l.info("Creating Directories")
mkdir = IterUnwrap(create_directory, copy(app))
- workers.map_async(mkdir, queue_iter(mkdirs))
+ writers.map_async(mkdir, queue_iter(mkdirs), app.threads)
else:
directories.get_nowait()
@@ -385,39 +388,54 @@ def threaded_processor(app, files, directories, remote_objects, uploads,
process_files = IterUnwrap(process_file,
copy(app), remote_objects, uploads)
file_proc = workers.map_async(process_files,
- queue_iter(files), None, file_done)
+ queue_iter(files), 2, file_done)
- l.info("Starting uploader")
- process_uploads = IterUnwrap(upload_file, copy(app), uploads)
- workers.map_async(process_uploads, queue_iter(uploads))
else:
directories.get_nowait()
l.info("Waiting to process files")
#TODO wait for processing to finish
- while file_proc and dir_proc:
+ while file_proc or dir_proc or upload_proc:
+ l.info("Waiting for processing")
if file_proc:
try:
- file_res = file_proc.get(1)
- except TimeoutError:
- pass
+ file_res = file_proc.wait(1)
+ file_proc.successful()
+ except (TimeoutError, AssertionError):
+ l.info("Still processing files")
else:
+ l.info("Done processing files")
file_proc = None
if isinstance(file_res, Exception):
raise
if dir_proc:
try:
- dir_res = dir_proc.get(1)
+ dir_res = dir_proc.wait(1)
except TimeoutError:
- pass
+ l.info("Still processing directories")
else:
+ l.info("Done processing directories")
dir_proc = None
if isinstance(dir_res, Exception):
raise
- l.info("Waiting for processing")
+ if upload_proc:
+ try:
+ upload_proc.wait(1)
+ upload_proc.successful()
+ except (TimeoutError, AssertionError):
+ l.info("Still processing uploads")
+ else:
+ l.info("Done processing uploads")
+ upload_proc = None
+ elif uploads.qsize() > 0:
+ process_uploads = IterUnwrap(upload_file, copy(app), uploads)
+ l.info("Starting uploader")
+ upload_proc = writers.map_async(process_uploads,
+ queue_iter(uploads), app.threads)
+ l.info("This didn't run right away")
# After the readers have all exited, we know that remote_objects
# contains the remaining files that should be deleted from
@@ -431,6 +449,7 @@ def threaded_processor(app, files, directories, remote_objects, uploads,
workers.map_async(delete_files, queue_iter(deletes))
workers.close()
+ writers.close()
while (uploads.qsize() + mkdirs.qsize() + deletes.qsize()) > 0:
l.info("Actions remaining:- uploading:%d mkdir:%d deletes:%d",
@@ -442,6 +461,7 @@ def threaded_processor(app, files, directories, remote_objects, uploads,
l.info("Cleaning up, letting pending items finish %d",
(uploads.qsize() + mkdirs.qsize() + deletes.qsize()))
workers.join()
+ writers.join()
def encode_filename(string):
Please sign in to comment.
Something went wrong with that request. Please try again.