Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

More pool refactoring

Refactored all the functions to work better with map()
Initial testing appears to work for threaded pools
  • Loading branch information...
commit cb7634c4305212a77ba275ad830d62e9fa1ef8d1 1 parent d8940ab
Kevin Landreth authored
Showing with 61 additions and 29 deletions.
  1. +61 −29 slbackup.py
View
90 slbackup.py
@@ -21,6 +21,7 @@
from copy import copy
from hashlib import md5
from multiprocessing import Manager, Pool, cpu_count
+from itertools import repeat
import Queue
try:
@@ -324,14 +325,14 @@ def serial_processor(app, files, directories, remote_objects, uploads, deletes):
l.info("Processing directories (%d)", directories.qsize())
for job in queue_iter(directories):
- create_directory(app, job, remote_objects)
+ create_directory(job, app, remote_objects)
for job in queue_iter(files):
- process_file(app, job, remote_objects, uploads)
+ process_file(job, app, remote_objects, uploads)
uploads.put(None)
for item in queue_iter(uploads):
- upload_file(app, item, uploads)
+ upload_file(item, app, uploads)
l.info("%d objects scheduled for deletion", len(remote_objects))
for d in remote_objects.values():
@@ -339,32 +340,52 @@ def serial_processor(app, files, directories, remote_objects, uploads, deletes):
deletes.put(None)
for item in queue_iter(deletes):
- delete_file(app, item)
+ delete_file(item, app)
-def threaded_processor(app, files, remote_objects, uploads, deletes):
+def threaded_processor(app, files, directories, remote_objects, uploads, deletes):
+ l = logging.getLogger("threaded_processor")
workers = Pool(app.threads)
# For each scanner, create an backlog manager as we don't want the http
# backlog to grow too long and uploading/deleting will take 2-4x as long
- for p in xrange(app.threads):
- pool.append(Process(target=process_files,
- args=(copy(app), remote_objects, files, uploads)))
- if (p % 2) == 0:
- workers.append(Process(target=upload_files,
- args=(copy(app), uploads)))
- else:
- workers.append(Process(target=delete_files,
- args=(copy(app), deletes,)))
- workers.map_async(
-
- logging.info("Processing %d files (%d reads/%d writers)",
- files.qsize(), len(pool), len(workers))
-
- for s in (pool + workers):
- s.start()
-
+ l.info("Creating directories")
+ process_directories = IterUnwrap(create_directory,
+ copy(app), remote_objects)
+ r = workers.map_async(process_directories, queue_iter(directories),
+ directories.qsize())
+
+ l.info("Processing files")
+ process_files = IterUnwrap(process_file,
+ copy(app), remote_objects, uploads)
+ r = workers.map_async(process_files, queue_iter(files), files.qsize(),
+ lambda x: uploads.put(None))
+
+ l.info("Starting uploader")
+ process_uploads = IterUnwrap(upload_file, copy(app), uploads)
+ r = workers.map_async(process_uploads, queue_iter(uploads), 1)
+
+ workers.close()
+ workers.join()
+
+# for p in xrange(app.threads):
+# pool.append(Process(target=process_files,
+# args=(copy(app), remote_objects, files, uploads)))
+# if (p % 2) == 0:
+# workers.append(Process(target=upload_files,
+# args=(copy(app), uploads)))
+# else:
+# workers.append(Process(target=delete_files,
+# args=(copy(app), deletes,)))
+# workers.map_async(unwrap, zip((process_file, copy(app), remote_objects, uploads)))
+#
+# logging.info("Processing %d files (%d reads/%d writers)",
+# files.qsize(), len(pool), len(workers))
+#
+# for s in (pool + workers):
+# s.start()
+#
logging.info("Waiting for files to empty")
# wait for the queue to empty
while not files.empty():
@@ -374,8 +395,8 @@ def threaded_processor(app, files, remote_objects, uploads, deletes):
# join the readers after the queue in empty
# as to not prematurely delete any files
# that have pending operations
- for s in pool:
- s.join()
+ #for s in pool:
+ # s.join()
# After the readers have all exited, we know that remote_objects
# contains the remaining files that should be deleted from
@@ -403,7 +424,7 @@ def encode_filename(string):
return uc.encode('ascii', 'replace')
-def create_directory(app, directory, remote_objects):
+def create_directory(directory, app, remote_objects):
container = get_container(app)
_dir = directory
@@ -424,7 +445,7 @@ def create_directory(app, directory, remote_objects):
del remote_objects[safe_dir]
-def delete_file(app, obj):
+def delete_file(obj, app):
logging.info("Deleting %s", obj['name'])
@@ -436,7 +457,7 @@ def delete_file(app, obj):
rm.delete()
-def process_file(app, _file, objects, backlog):
+def process_file(_file, app, objects, backlog):
l = logging.getLogger('process_file')
safe_filename = encode_filename(_file)
@@ -532,7 +553,7 @@ def new_revision(app, _from, marker):
delete_later(app, rev)
-def delete_later(app, obj):
+def delete_later(obj, app):
""" lacking this in the bindings currently, work around it.
Deletes a file after the specified number of days
"""
@@ -546,7 +567,7 @@ def delete_later(app, obj):
obj.make_request('POST', headers=headers)
-def upload_file(app, job, jobs):
+def upload_file(job, app, jobs):
l = logging.getLogger('upload_file')
container = get_container(app)
@@ -629,6 +650,17 @@ def queue_iter(queue):
yield item
+class IterUnwrap(object):
+ def __init__(self, func, *args, **kwargs):
+ self.func = func
+ self.args = args
+ self.kwargs = kwargs
+
+ def __call__(self, item):
+ a = (item,) + self.args
+ return self.func(*a, **self.kwargs)
+
+
if __name__ == "__main__":
import optparse
Please sign in to comment.
Something went wrong with that request. Please try again.