From d8940ab45c52e0e5eb3b7965d339145c840d2eed Mon Sep 17 00:00:00 2001 From: Kevin Landreth Date: Tue, 27 Nov 2012 12:29:43 -0600 Subject: [PATCH] Slowly moving to iterators Single threaded processor now using iterators. Threaded processor broken --- slbackup.py | 310 +++++++++++++++++++++++++++------------------------- 1 file changed, 161 insertions(+), 149 deletions(-) diff --git a/slbackup.py b/slbackup.py index f0651f4..24361d8 100755 --- a/slbackup.py +++ b/slbackup.py @@ -6,7 +6,7 @@ __copyright__ = "Copyright 2012, SoftLayer" __credits__ = ["Kevin Landreth", "Kevin McDonald", "Chris Evans"] __license__ = "MIT" -__version__ = "1.3" +__version__ = "2.0" __maintainer__ = "Kevin Landreth" __email__ = "klandreth@softlayer.com" __status__ = "Production" @@ -20,7 +20,8 @@ import ConfigParser from copy import copy from hashlib import md5 -from multiprocessing import Manager, Process, cpu_count +from multiprocessing import Manager, Pool, cpu_count +import Queue try: import object_storage @@ -208,7 +209,7 @@ def get_container(app, name=None): def catalog_directory(app, directory, files, directories): - logging.warn("Gathering local files") + logging.info("Gathering local files") for root, dirnames, filenames in os.walk('.'): # Prune all excluded directories from the list for a in app.excludes: @@ -227,11 +228,13 @@ def catalog_directory(app, directory, files, directories): for _file in filenames: files.put(os.path.relpath(os.path.join(root, _file))) - logging.warn("Done gathering local files") + logging.info("Done gathering local files") + files.put(None) + directories.put(None) def catalog_remote(app, objects): - logging.warn("Grabbing remote objects") + logging.info("Grabbing remote objects") container = get_container(app) container.create() f = container.objects() @@ -248,7 +251,7 @@ def catalog_remote(app, objects): except: break - logging.warn("Objects %d", len(objects)) + logging.info("Objects %d", len(objects)) def upload_directory(app): @@ -262,26 +265,21 @@ def upload_directory(app): app.authenticate() - logging.warn("%s %s", app.token, app.url) + logging.info("%s %s", app.token, app.url) if app.threads: threaded_harvestor(app, files, directories, remote_objects) else: serial_harvestor(app, files, directories, remote_objects) - # haven't needed to thread this out yet, but it's ready if it needs to - logging.warn("Processing directories (%d)", directories.qsize()) - create_directories(app, directories, remote_objects) - del directories - - args = (app, files, remote_objects, uploads, deletes,) + args = (app, files, directories, remote_objects, uploads, deletes,) if app.threads: threaded_processor(*args) else: serial_processor(*args) - logging.warn("Done backing up %s to %s", app.source, app.container) + logging.info("Done backing up %s to %s", app.source, app.container) def serial_harvestor(app, files, directories, remote_objects): @@ -290,42 +288,66 @@ def serial_harvestor(app, files, directories, remote_objects): def threaded_harvestor(app, files, directories, remote_objects): - harvest = list() - harvest.append(Process(target=catalog_directory, - args=(copy(app), app.source, files, directories,))) - harvest.append(Process(target=catalog_remote, - args=(copy(app), remote_objects,))) + pool = Pool(app.threads) logging.info("Starting harvesters") - for harvester in harvest: - harvester.start() + + local = pool.apply_async(catalog_directory, + (copy(app), app.source, files, directories,)) + remote = pool.apply_async(catalog_remote, + (copy(app), remote_objects,)) + + pool.close() logging.info("Waiting for harvest") - for harvester in harvest: - harvester.join() + pool.join() + + if not local.successful(): + logging.error("Local processing encountered an error") + try: + local.get() + except Exception, e: + logging.exception(e) + raise e + + if not remote.successful(): + logging.error("Remote processing encountered an error") + try: + remote.get() + except Exception, e: + logging.exception(e) + raise e + - del harvest +def serial_processor(app, files, directories, remote_objects, uploads, deletes): + l = logging.getLogger('serial_processor') + l.info("Processing directories (%d)", directories.qsize()) + for job in queue_iter(directories): + create_directory(app, job, remote_objects) -def serial_processor(app, files, remote_objects, uploads, deletes): - process_files(app, remote_objects, files, uploads) + for job in queue_iter(files): + process_file(app, job, remote_objects, uploads) uploads.put(None) - upload_files(app, uploads) - logging.info("%d objects scheduled for deletion", len(remote_objects)) + for item in queue_iter(uploads): + upload_file(app, item, uploads) + + l.info("%d objects scheduled for deletion", len(remote_objects)) for d in remote_objects.values(): - deletes.put((app.container, d)) + deletes.put(d) deletes.put(None) - delete_files(app, deletes) + for item in queue_iter(deletes): + delete_file(app, item) def threaded_processor(app, files, remote_objects, uploads, deletes): - pool = list() - workers = list() + workers = Pool(app.threads) # For each scanner, create an backlog manager as we don't want the http # backlog to grow too long and uploading/deleting will take 2-4x as long + for p in xrange(app.threads): pool.append(Process(target=process_files, args=(copy(app), remote_objects, files, uploads))) @@ -335,8 +357,9 @@ def threaded_processor(app, files, remote_objects, uploads, deletes): else: workers.append(Process(target=delete_files, args=(copy(app), deletes,))) + workers.map_async( - logging.warn("Processing %d files (%d reads/%d writers)", + logging.info("Processing %d files (%d reads/%d writers)", files.qsize(), len(pool), len(workers)) for s in (pool + workers): @@ -380,120 +403,101 @@ def encode_filename(string): return uc.encode('ascii', 'replace') -def create_directories(app, directories, remote_objects): - logging.info("Creating directories") +def create_directory(app, directory, remote_objects): container = get_container(app) - while True: - try: - _dir = directories.get_nowait() - except: - break - safe_dir = encode_filename(_dir) - if safe_dir in remote_objects and \ - remote_objects[safe_dir].get('content_type', None) == \ - 'application/directory': - del remote_objects[safe_dir] - continue + _dir = directory - logging.warn("Creating directory %s", safe_dir) + safe_dir = encode_filename(_dir) + if safe_dir in remote_objects and \ + remote_objects[safe_dir].get('content_type', None) == \ + 'application/directory': + del remote_objects[safe_dir] + return - obj = container.storage_object(safe_dir) - obj.content_type = 'application/directory' - obj.create() - if safe_dir in remote_objects: - del remote_objects[safe_dir] + logging.info("Creating directory %s", safe_dir) + obj = container.storage_object(safe_dir) + obj.content_type = 'application/directory' + obj.create() + if safe_dir in remote_objects: + del remote_objects[safe_dir] -def delete_files(app, objects): - l = logging.getLogger("delete_files") - l.warn("Starting") - while True: - try: - _container, obj = objects.get() - logging.info("Deleting %s", obj['name']) +def delete_file(app, obj): - # Copy the file out of the way - new_revision(app, obj['name'], obj.get('hash', 'deleted')) + logging.info("Deleting %s", obj['name']) - # then delete it as it no longer exists. - rm = get_container(app, name=_container)\ - .storage_object(obj['name']) - rm.delete() - except: - break + # Copy the file out of the way + new_revision(app, obj['name'], obj.get('hash', 'deleted')) + # then delete it as it no longer exists. + rm = get_container(app).storage_object(obj['name']) + rm.delete() -def process_files(app, objects, files, backlog): - l = logging.getLogger('process_files') - l.warn("Starting") - while True: - try: - _file = files.get_nowait() - except: - l.info("Queue empty, exiting file processor") - break - safe_filename = encode_filename(_file) +def process_file(app, _file, objects, backlog): + l = logging.getLogger('process_file') + + safe_filename = encode_filename(_file) + + # don't bother with checksums for new files + if safe_filename not in objects: + l.info("Queued missing %s", safe_filename) + backlog.put((_file, safe_filename,)) + return - # don't bother with checksums for new files - if safe_filename not in objects: - l.warn("Queued missing %s", safe_filename) - backlog.put((_file, safe_filename,)) - continue + try: + oldhash = objects[safe_filename].get('hash', None) + + oldsize = int(objects[safe_filename].get('size')) + cursize = int(get_filesize(_file)) + curdate = int(os.path.getmtime(_file)) + oldtime = objects[safe_filename].get('last_modified') + except (OSError, IOError), e: + l.error("Couldn't read file size skipping, %s: %s", _file, e) + del objects[safe_filename] + return + # there are a few formats, try to figure out which one safely + for timeformat in ['%Y-%m-%dT%H:%M:%S.%f', '%Y-%m-%dT%H:%M:%S']: try: - oldhash = objects[safe_filename].get('hash', None) + oldtime = time.mktime(time.strptime(oldtime, + '%Y-%m-%dT%H:%M:%S.%f')) + except ValueError: + l.info("Failed to figure out the time format, skipping %s", + _file) + return + else: + break - oldsize = int(objects[safe_filename].get('size')) - cursize = int(get_filesize(_file)) - curdate = int(os.path.getmtime(_file)) - oldtime = objects[safe_filename].get('last_modified') + if cursize == oldsize and oldtime >= curdate and not app.checkhash: + l.debug("No change in filesize/date: %s", _file) + del objects[safe_filename] + return + elif app.checkhash: + l.info("Checksumming %s", _file) + try: + newhash = swifthash(_file) except (OSError, IOError), e: - l.error("Couldn't read file size skipping, %s: %s", _file, e) + l.error("Couldn't hash skipping, %s: %s", _file, e) del objects[safe_filename] - continue - - # there are a few formats, try to figure out which one safely - for timeformat in ['%Y-%m-%dT%H:%M:%S.%f', '%Y-%m-%dT%H:%M:%S']: - try: - oldtime = time.mktime(time.strptime(oldtime, - '%Y-%m-%dT%H:%M:%S.%f')) - except ValueError: - l.warn("Failed to figure out the time format, skipping %s", - _file) - continue - else: - break + return - if cursize == oldsize and oldtime >= curdate and not app.checkhash: - l.debug("No change in filesize/date: %s", _file) + if oldhash == newhash: + l.debug("No change in checksum: %s", _file) del objects[safe_filename] - continue - elif app.checkhash: - l.info("Checksumming %s", _file) - try: - newhash = swifthash(_file) - except (OSError, IOError), e: - l.error("Couldn't hash skipping, %s: %s", _file, e) - del objects[safe_filename] - continue - - if oldhash == newhash: - l.debug("No change in checksum: %s", _file) - del objects[safe_filename] - continue - else: - l.info("Revised: HASH:%s:%s FILE:%s", - oldhash, newhash, safe_filename) + return else: - l.info("Revised: SIZE:%s:%s DATE:%s:%s FILE:%s", - oldsize, cursize, oldtime, curdate, safe_filename) + l.info("Revised: HASH:%s:%s FILE:%s", + oldhash, newhash, safe_filename) + else: + l.info("Revised: SIZE:%s:%s DATE:%s:%s FILE:%s", + oldsize, cursize, oldtime, curdate, safe_filename) - del objects[safe_filename] - new_revision(app, _file, oldhash) - backlog.put((_file, safe_filename,)) + del objects[safe_filename] + new_revision(app, _file, oldhash) + backlog.put((_file, safe_filename,)) def new_revision(app, _from, marker): @@ -520,7 +524,7 @@ def new_revision(app, _from, marker): rev = revcontainer.storage_object(new_file) if obj.exists(): - logging.warn("Copying %s to %s", obj.name, rev.name) + logging.info("Copying %s to %s", obj.name, rev.name) rev.create() @@ -542,32 +546,27 @@ def delete_later(app, obj): obj.make_request('POST', headers=headers) -def upload_files(app, jobs): - l = logging.getLogger('upload_files') - l.warn("Staring uploader") +def upload_file(app, job, jobs): + l = logging.getLogger('upload_file') container = get_container(app) - while True: - try: - _file, target = jobs.get() - except: - logging.info("Uploader exiting") - break + # job is a tuple + _file, target = job - try: - obj = container.storage_object(target) - l.warn("Uploading file %s", obj.name) - chunk_upload(obj, _file) - l.warn("Finished file %s ", obj.name) - except (OSError, IOError), e: - # For some reason we couldn't read the file, skip it but log it - l.error("Failed to upload %s. %s", _file, e) - except Exception, e: - l.error("Failed to upload %s, requeueing. Error: %s", _file, e) - jobs.put((_file, target,)) - # in case we got disconnected, reset the container - app.authenticate() - container = get_container(app) + try: + obj = container.storage_object(target) + l.info("Uploading file %s", obj.name) + chunk_upload(obj, _file) + l.info("Finished file %s ", obj.name) + except (OSError, IOError), e: + # For some reason we couldn't read the file, skip it but log it + l.error("Failed to upload %s. %s", _file, e) + except Exception, e: + l.error("Failed to upload %s, requeueing. Error: %s", _file, e) + jobs.put((_file, target,)) + # in case we got disconnected, reset the container + app.authenticate() + container = get_container(app) def chunk_upload(obj, filename, headers=None): @@ -617,6 +616,19 @@ def asblocks(_f, buflen=_DEFAULT_OS_BUFLEN): raise e +def queue_iter(queue): + while True: + try: + item = queue.get() + except Queue.Empty: + break + + if item is None: + break + + yield item + + if __name__ == "__main__": import optparse