Skip to content
This repository has been archived by the owner on Aug 20, 2022. It is now read-only.

Commit

Permalink
mimicing blocking
Browse files Browse the repository at this point in the history
  • Loading branch information
CrackerJackMack committed Nov 29, 2012
1 parent 95e0997 commit 5892381
Showing 1 changed file with 30 additions and 4 deletions.
34 changes: 30 additions & 4 deletions slbackup.py
Expand Up @@ -20,7 +20,7 @@
import ConfigParser import ConfigParser
from copy import copy from copy import copy
from hashlib import md5 from hashlib import md5
from multiprocessing import Manager, Pool, cpu_count from multiprocessing import Manager, Pool, cpu_count, TimeoutError
import Queue import Queue


try: try:
Expand Down Expand Up @@ -362,14 +362,16 @@ def threaded_processor(app, files, directories, remote_objects, uploads,
deletes, mkdirs): deletes, mkdirs):
l = logging.getLogger("threaded_processor") l = logging.getLogger("threaded_processor")
workers = Pool(app.threads) workers = Pool(app.threads)
file_proc = None
dir_proc = None


if directories.qsize() > 1: if directories.qsize() > 1:
l.info("Processing %d directories", directories.qsize()) l.info("Processing %d directories", directories.qsize())
dir_done = IterUnwrap(threaded_done_marker, mkdirs) dir_done = IterUnwrap(threaded_done_marker, mkdirs)
process_directories = IterUnwrap(process_directory, process_directories = IterUnwrap(process_directory,
copy(app), remote_objects, mkdirs) copy(app), remote_objects, mkdirs)
workers.map_async(process_directories, queue_iter(directories), dir_proc = workers.map_async(process_directories,
1, dir_done) queue_iter(directories), 1, dir_done)


l.info("Creating Directories") l.info("Creating Directories")
mkdir = IterUnwrap(create_directory, copy(app)) mkdir = IterUnwrap(create_directory, copy(app))
Expand All @@ -382,7 +384,8 @@ def threaded_processor(app, files, directories, remote_objects, uploads,
file_done = IterUnwrap(threaded_done_marker, uploads) file_done = IterUnwrap(threaded_done_marker, uploads)
process_files = IterUnwrap(process_file, process_files = IterUnwrap(process_file,
copy(app), remote_objects, uploads) copy(app), remote_objects, uploads)
workers.map_async(process_files, queue_iter(files), None, file_done) file_proc = workers.map_async(process_files,
queue_iter(files), None, file_done)


l.info("Starting uploader") l.info("Starting uploader")
process_uploads = IterUnwrap(upload_file, copy(app), uploads) process_uploads = IterUnwrap(upload_file, copy(app), uploads)
Expand All @@ -391,7 +394,30 @@ def threaded_processor(app, files, directories, remote_objects, uploads,
directories.get_nowait() directories.get_nowait()


l.info("Waiting to process files") l.info("Waiting to process files")

#TODO wait for processing to finish #TODO wait for processing to finish
while file_proc and dir_proc:
if file_proc:
try:
file_res = file_proc.get(1)
except TimeoutError:
pass
else:
file_proc = None
if isinstance(file_res, Exception):
raise

if dir_proc:
try:
dir_res = dir_proc.get(1)
except TimeoutError:
pass
else:
dir_proc = None
if isinstance(dir_res, Exception):
raise

l.info("Waiting for processing")


# After the readers have all exited, we know that remote_objects # After the readers have all exited, we know that remote_objects
# contains the remaining files that should be deleted from # contains the remaining files that should be deleted from
Expand Down

0 comments on commit 5892381

Please sign in to comment.