Permalink
Browse files

Logging and async improvements

Almost have the deadlock issue fixed
Adjusted log levels to be less verbose on level INFO
  • Loading branch information...
1 parent 07d72a8 commit 95e099785f8d62bd9c1065b9bd71d3f3ac8565f2 Kevin Landreth committed Nov 28, 2012
Showing with 98 additions and 58 deletions.
  1. +98 −58 slbackup.py
View
@@ -216,10 +216,10 @@ def catalog_directory(app, directory, files, directories):
b, p = os.path.split(a)
if p in dirnames:
if len(b) < 1:
- logging.info("Pruning %s", a)
+ logging.debug("Pruning %s", a)
dirnames.remove(p)
elif root.find('./' + b) == 0:
- logging.info("Pruning %s", a)
+ logging.debug("Pruning %s", a)
dirnames.remove(p)
for _dir in dirnames:
@@ -262,17 +262,18 @@ def upload_directory(app):
remote_objects = manager.dict()
uploads = manager.Queue()
deletes = manager.Queue()
+ mkdirs = manager.Queue()
app.authenticate()
- logging.info("%s %s", app.token, app.url)
+ logging.debug("%s %s", app.token, app.url)
if app.threads:
threaded_harvestor(app, files, directories, remote_objects)
else:
serial_harvestor(app, files, directories, remote_objects)
- args = (app, files, directories, remote_objects, uploads, deletes,)
+ args = (app, files, directories, remote_objects, uploads, deletes, mkdirs,)
if app.threads:
threaded_processor(*args)
@@ -320,13 +321,18 @@ def threaded_harvestor(app, files, directories, remote_objects):
def serial_processor(app, files, directories, remote_objects, uploads,
- deletes):
+ deletes, mkdirs):
l = logging.getLogger('serial_processor')
l.info("Processing directories (%d)", directories.qsize())
- process_directories = IterUnwrap(create_directory,
- copy(app), remote_objects)
+ process_directories = IterUnwrap(process_directory,
+ copy(app), remote_objects, mkdirs)
map(process_directories, queue_iter(directories))
+ mkdirs.put(None)
+
+ l.info("Creating Directories")
+ create_dir = IterUnwrap(create_directory, copy(app))
+ map(create_dir, queue_iter(mkdirs))
process_files = IterUnwrap(process_file,
copy(app), remote_objects, uploads)
@@ -342,52 +348,73 @@ def serial_processor(app, files, directories, remote_objects, uploads,
deletes.put(d)
deletes.put(None)
- delete_files = IterUnwrap(delete_file, copy(app))
+ delete_files = IterUnwrap(delete_file, copy(app), deletes)
map(delete_files, queue_iter(deletes))
+def threaded_done_marker(results, queue):
+ if isinstance(results, Exception):
+ logging.exception(results)
+ queue.put(None)
+
+
def threaded_processor(app, files, directories, remote_objects, uploads,
- deletes):
+ deletes, mkdirs):
l = logging.getLogger("threaded_processor")
workers = Pool(app.threads)
- l.info("Creating directories")
- process_directories = IterUnwrap(create_directory,
- copy(app), remote_objects)
- mkdir = workers.map_async(process_directories, queue_iter(directories),
- directories.qsize())
-
- l.info("Processing files")
- process_files = IterUnwrap(process_file,
- copy(app), remote_objects, uploads)
- touch = workers.map_async(process_files, queue_iter(files), files.qsize(),
- lambda x: uploads.put(None))
-
- l.info("Starting uploader")
- process_uploads = IterUnwrap(upload_file, copy(app), uploads)
- uploader = workers.map_async(process_uploads, queue_iter(uploads), 1)
+ if directories.qsize() > 1:
+ l.info("Processing %d directories", directories.qsize())
+ dir_done = IterUnwrap(threaded_done_marker, mkdirs)
+ process_directories = IterUnwrap(process_directory,
+ copy(app), remote_objects, mkdirs)
+ workers.map_async(process_directories, queue_iter(directories),
+ 1, dir_done)
+
+ l.info("Creating Directories")
+ mkdir = IterUnwrap(create_directory, copy(app))
+ workers.map_async(mkdir, queue_iter(mkdirs))
+ else:
+ directories.get_nowait()
+
+ if files.qsize() > 1:
+ l.info("Processing files")
+ file_done = IterUnwrap(threaded_done_marker, uploads)
+ process_files = IterUnwrap(process_file,
+ copy(app), remote_objects, uploads)
+ workers.map_async(process_files, queue_iter(files), None, file_done)
+
+ l.info("Starting uploader")
+ process_uploads = IterUnwrap(upload_file, copy(app), uploads)
+ workers.map_async(process_uploads, queue_iter(uploads))
+ else:
+ directories.get_nowait()
- logging.info("Waiting for files to empty")
- # wait for the queue to empty
- while not touch.ready() and not mkdir.ready():
- time.sleep(0.2)
+ l.info("Waiting to process files")
+ #TODO wait for processing to finish
# After the readers have all exited, we know that remote_objects
# contains the remaining files that should be deleted from
# the backups. Dump these into a Queue for the writers to take
# care of.
- logging.info("%d objects scheduled for deletion", len(remote_objects))
for d in remote_objects.values():
deletes.put(d)
deletes.put(None)
- delete_files = IterUnwrap(delete_file, copy(app))
+ logging.info("%d objects scheduled for deletion", deletes.qsize() - 1)
+ delete_files = IterUnwrap(delete_file, copy(app), deletes)
workers.map_async(delete_files, queue_iter(deletes))
- if not uploader.ready():
- l.info("Still uploading")
-
- l.info("Cleaning up, joining workers")
workers.close()
+
+ while (uploads.qsize() + mkdirs.qsize() + deletes.qsize()) > 0:
+ l.info("Actions remaining:- uploading:%d mkdir:%d deletes:%d",
+ directories.qsize(),
+ files.qsize(),
+ deletes.qsize(),
+ )
+ time.sleep(1)
+ l.info("Cleaning up, letting pending items finish %d",
+ (uploads.qsize() + mkdirs.qsize() + deletes.qsize()))
workers.join()
@@ -397,37 +424,48 @@ def encode_filename(string):
return uc.encode('ascii', 'replace')
-def create_directory(directory, app, remote_objects):
- container = get_container(app)
-
+def process_directory(directory, app, remote_objects, mkdirs):
_dir = directory
-
safe_dir = encode_filename(_dir)
+
if safe_dir in remote_objects and \
remote_objects[safe_dir].get('content_type', None) == \
'application/directory':
del remote_objects[safe_dir]
return
- logging.info("Creating directory %s", safe_dir)
+ if safe_dir in remote_objects:
+ del remote_objects[safe_dir]
+
+ mkdirs.put(safe_dir)
+
+def create_directory(safe_dir, app):
+ l = logging.getLogger("create_directory")
+ l.info("Creating %s", safe_dir)
+
+ container = get_container(app)
obj = container.storage_object(safe_dir)
obj.content_type = 'application/directory'
obj.create()
- if safe_dir in remote_objects:
- del remote_objects[safe_dir]
-def delete_file(obj, app):
+def delete_file(obj, app, jobs):
+ l = logging.getLogger("delete_file")
+ l.info("Deleting %s", obj['name'])
- logging.info("Deleting %s", obj['name'])
-
- # Copy the file out of the way
- new_revision(app, obj['name'], obj.get('hash', 'deleted'))
+ try:
+ # Copy the file out of the way
+ new_revision(app, obj['name'], obj.get('hash', 'deleted'))
- # then delete it as it no longer exists.
- rm = get_container(app).storage_object(obj['name'])
- rm.delete()
+ # then delete it as it no longer exists.
+ rm = get_container(app).storage_object(obj['name'])
+ rm.delete()
+ except Exception, e:
+ l.error("Failed to upload %s, requeueing. Error: %s", obj['name'], e)
+ jobs.put(obj)
+ # in case we got disconnected, reset the container
+ app.authenticate()
def process_file(_file, app, objects, backlog):
@@ -437,7 +475,7 @@ def process_file(_file, app, objects, backlog):
# don't bother with checksums for new files
if safe_filename not in objects:
- l.info("Queued missing %s", safe_filename)
+ l.debug("Queued missing %s", safe_filename)
backlog.put((_file, safe_filename,))
return
@@ -459,7 +497,7 @@ def process_file(_file, app, objects, backlog):
oldtime = time.mktime(time.strptime(oldtime,
'%Y-%m-%dT%H:%M:%S.%f'))
except ValueError:
- l.info("Failed to figure out the time format, skipping %s",
+ l.warn("Failed to figure out the time format, skipping %s",
_file)
return
else:
@@ -470,7 +508,7 @@ def process_file(_file, app, objects, backlog):
del objects[safe_filename]
return
elif app.checkhash:
- l.info("Checksumming %s", _file)
+ l.debug("Checksumming %s", _file)
try:
newhash = swifthash(_file)
except (OSError, IOError), e:
@@ -483,10 +521,10 @@ def process_file(_file, app, objects, backlog):
del objects[safe_filename]
return
else:
- l.info("Revised: HASH:%s:%s FILE:%s",
+ l.debug("Revised: HASH:%s:%s FILE:%s",
oldhash, newhash, safe_filename)
else:
- l.info("Revised: SIZE:%s:%s DATE:%s:%s FILE:%s",
+ l.debug("Revised: SIZE:%s:%s DATE:%s:%s FILE:%s",
oldsize, cursize, oldtime, curdate, safe_filename)
del objects[safe_filename]
@@ -495,8 +533,9 @@ def process_file(_file, app, objects, backlog):
def new_revision(app, _from, marker):
+ l = logging.getLogger("new_revision")
if app.retention < 1:
- logging.info("Retention disabled for %s", _from)
+ l.warn("Retention disabled for %s", _from)
return None
# copy the file to the -revisions container so we don't
@@ -518,7 +557,7 @@ def new_revision(app, _from, marker):
rev = revcontainer.storage_object(new_file)
if obj.exists():
- logging.info("Copying %s to %s", obj.name, rev.name)
+ l.debug("Copying %s to %s", obj.name, rev.name)
rev.create()
@@ -530,9 +569,10 @@ def delete_later(obj, app):
""" lacking this in the bindings currently, work around it.
Deletes a file after the specified number of days
"""
+ l = logging.getLogger("delete_later")
delta = int(app.retention) * 24 * 60 * 60
when = int(time.time()) + delta
- logging.info("Setting retention(%d) on %s", when, obj.name)
+ l.debug("Setting retention(%d) on %s", when, obj.name)
headers = {
'X-Delete-At': str(when),
@@ -551,7 +591,7 @@ def upload_file(job, app, jobs):
obj = container.storage_object(target)
l.info("Uploading file %s", obj.name)
chunk_upload(obj, _file)
- l.info("Finished file %s ", obj.name)
+ l.debug("Finished file %s ", obj.name)
except (OSError, IOError), e:
# For some reason we couldn't read the file, skip it but log it
l.error("Failed to upload %s. %s", _file, e)

0 comments on commit 95e0997

Please sign in to comment.