Fixed a race condition between the local files queue emptying

and deletes firing off while readers had pending file operations.
On short file lists, this would lead to pending uploads being
canceled as the writers had already deleted the file from the backup
and exiting, leaving pending upload items in the queue.

Fixed by blocking on joining the reader threads before processing
deleted files and closing the upload queues.  This was the orignally
intended behavior so this was purely a bugfix.
commit 0263cbc13f2b64e74abbb5dccff4a06e7ae33493 1 parent 2ed3d3a
Kevin Landreth authored
Showing with 24 additions and 9 deletions.
  1. +24 −9
@@ -138,6 +138,7 @@ def upload_directory(_container, directory):
del directories
pool = list()
+ workers = list()
# For each scanner, create an backlog manager as we don't want the http
# backlog to grow too long and uploading/deleting will take 2-4x as long
@@ -145,14 +146,16 @@ def upload_directory(_container, directory):
args=(_container, remote_objects, files, uploads)))
if (p % 2) == 0:
- pool.append(Process(target=upload_files,
+ workers.append(Process(target=upload_files,
args=(_container, uploads)))
- pool.append(Process(target=delete_files,
+ workers.append(Process(target=delete_files,
- logging.warn("Processing %d files (%d threads)", files.qsize(), len(pool))
- for s in pool:
+ logging.warn("Processing %d files (%d reads/%d writers)",
+ files.qsize(), len(pool), len(workers))
+ for s in (pool + workers):
s.start()"Waiting for files to empty")
@@ -160,19 +163,31 @@ def upload_directory(_container, directory):
while not files.empty():
+"Queue empty, joining readers")
+ # join the readers after the queue in empty
+ # as to not prematurely delete any files
+ # that have pending operations
+ for s in pool:
+ s.join()
+ # After the readers have all exited, we know that remote_objects
+ # contains the remaining files that should be deleted from
+ # the backups. Dump these into a Queue for the writers to take
+ # care of."%d objects scheduled for deletion", len(remote_objects))
for d in remote_objects.values():
deletes.put((_container, d))"Stopping uploaders")
# tell the uploaders they are done
- for x in xrange(len(pool)/2):
+ for x in xrange(len(workers)/2):
# join the last of the threads
-"Waiting on file deletes and uploads")
- for s in pool:
+"Joining writers")
+ for s in workers:
logging.warn("Done backing up %s to %s", directory, _container)
@@ -281,9 +296,9 @@ def process_files(_container, objects, files, backlog):"Revised: SIZE:%s:%s DATE:%s:%s FILE:%s",
oldsize, cursize, oldtime, curdate, safe_filename)
- new_revision(_container, _file, oldhash)
- backlog.put((_file, safe_filename))
del objects[safe_filename]
+ new_revision(_container, _file, oldhash)
+ backlog.put((_file, safe_filename,))
def new_revision(_container, _from, marker):
