Permalink
Browse files

Performance and threading enhancments

Greatly reduced authentication requests (thanks sudorandom)
Authention request reduction increased upload rate
Threading can now be completely turned off by setting threads to zero (-t 0 ; threads = 0)
Properly setting the user-agent
  • Loading branch information...
1 parent ac4e500 commit 45f0648f7b0e5f240dd9ad249ef2e47ed08184ab Kevin Landreth committed Oct 25, 2012
Showing with 89 additions and 23 deletions.
  1. +89 −23 slbackup.py
View
@@ -1,16 +1,16 @@
#!/usr/bin/python
-
""" SoftLayer object storage backup """
__author__ = "Kevin Landreth"
__copyright__ = "Copyright 2012, SoftLayer"
__credits__ = ["Kevin Landreth", "Kevin McDonald", "Chris Evans"]
__license__ = "MIT"
-__version__ = "1.2"
+__version__ = "1.3"
__maintainer__ = "Kevin Landreth"
__email__ = "klandreth@softlayer.com"
__status__ = "Production"
+__agent__ = 'softlayer/slbackup-%s' % __version__
import os
import sys
@@ -117,6 +117,8 @@ def __init__(self, options):
self.excludes = []
self.source = options.get('source')
self.container = options.get('container')
+ self.token = None
+ self.url = None
# CLI overrides config file
if options.get('datacenter', None) is not None:
@@ -152,16 +154,37 @@ def __init__(self, options):
logging.info("Excluding: %s", self.excludes)
+ def authenticate(self):
+ use_network = 'private' if self.use_private else 'public'
+
+ object_storage.consts.USER_AGENT = __agent__
+ client = object_storage.get_client(
+ self.username,
+ self.apikey,
+ datacenter=self.dc,
+ network=use_network)
+
+ logging.info("Logging in as %s in %s",
+ self.username, self.dc)
+ client.conn.auth.authenticate()
+
+ self.url = client.get_url()
+ self.token = copy(client.conn.auth.auth_token)
+ del client
+
def get_container(app, name=None):
if name is None:
name = app.container
- use_network = 'private' if app.use_private else 'public'
- logging.info("Logging in as %s in %s and getting container %s",
- app.username, app.dc, name)
- obj = object_storage.get_client(
- app.username, app.apikey, datacenter=app.dc, network=use_network)
- return obj[name]
+
+ object_storage.consts.USER_AGENT = __agent__
+ client = object_storage.get_client(
+ app.username,
+ app.apikey,
+ auth_token=app.token)
+ client.set_storage_url(app.url)
+
+ return client[name]
def catalog_directory(app, directory, files, directories):
@@ -217,6 +240,36 @@ def upload_directory(app):
uploads = manager.Queue()
deletes = manager.Queue()
+ app.authenticate()
+
+ logging.warn("%s %s", app.token, app.url)
+
+ if app.threads:
+ threaded_harvestor(app, files, directories, remote_objects)
+ else:
+ serial_harvestor(app, files, directories, remote_objects)
+
+ # haven't needed to thread this out yet, but it's ready if it needs to
+ logging.warn("Processing directories (%d)", directories.qsize())
+ create_directories(app, directories, remote_objects)
+ del directories
+
+ args = (app, files, remote_objects, uploads, deletes,)
+
+ if app.threads:
+ threaded_processor(*args)
+ else:
+ serial_processor(*args)
+
+ logging.warn("Done backing up %s to %s", app.source, app.container)
+
+
+def serial_harvestor(app, files, directories, remote_objects):
+ catalog_directory(copy(app), app.source, files, directories)
+ catalog_remote(copy(app), remote_objects)
+
+
+def threaded_harvestor(app, files, directories, remote_objects):
harvest = list()
harvest.append(Process(target=catalog_directory,
args=(copy(app), app.source, files, directories,)))
@@ -233,11 +286,21 @@ def upload_directory(app):
del harvest
- # haven't needed to thread this out yet, but it's ready if it needs to
- logging.warn("Processing directories (%d)", directories.qsize())
- create_directories(app, directories, remote_objects)
- del directories
+def serial_processor(app, files, remote_objects, uploads, deletes):
+ process_files(app, remote_objects, files, uploads)
+ uploads.put(None)
+ upload_files(app, uploads)
+
+ logging.info("%d objects scheduled for deletion", len(remote_objects))
+ for d in remote_objects.values():
+ deletes.put((app.container, d))
+ deletes.put(None)
+
+ delete_files(app, deletes)
+
+
+def threaded_processor(app, files, remote_objects, uploads, deletes):
pool = list()
workers = list()
@@ -248,10 +311,10 @@ def upload_directory(app):
args=(copy(app), remote_objects, files, uploads)))
if (p % 2) == 0:
workers.append(Process(target=upload_files,
- args=(app.container, uploads)))
+ args=(copy(app), uploads)))
else:
workers.append(Process(target=delete_files,
- args=(app, deletes,)))
+ args=(copy(app), deletes,)))
logging.warn("Processing %d files (%d reads/%d writers)",
files.qsize(), len(pool), len(workers))
@@ -290,8 +353,6 @@ def upload_directory(app):
for s in workers:
s.join()
- logging.warn("Done backing up %s to %s", app.source, app.container)
-
def encode_filename(string):
string = str(string)
@@ -325,6 +386,8 @@ def create_directories(app, directories, remote_objects):
def delete_files(app, objects):
+ l = logging.getLogger("delete_files")
+ l.warn("Starting")
while True:
try:
_container, obj = objects.get()
@@ -345,6 +408,7 @@ def delete_files(app, objects):
def process_files(app, objects, files, backlog):
l = logging.getLogger('process_files')
+ l.warn("Starting")
while True:
try:
_file = files.get_nowait()
@@ -411,7 +475,7 @@ def new_revision(app, _from, marker):
# in a seperate container will lead to an ever growing
# list slowing down the backups
- _rev_container = app.container + "-revisions"
+ _rev_container = "%s-revisions" % app.container
safe_filename = encode_filename(_from)
fs = os.path.splitext(safe_filename)
@@ -447,10 +511,11 @@ def delete_later(app, obj):
obj.make_request('POST', headers=headers)
-def upload_files(_container, jobs):
- container = get_container(app, name=_container)
-
+def upload_files(app, jobs):
l = logging.getLogger('upload_files')
+ l.warn("Staring uploader")
+ container = get_container(app)
+
while True:
try:
_file, target = jobs.get()
@@ -464,11 +529,12 @@ def upload_files(_container, jobs):
l.warn("Uploading file %s", obj.name)
chunk_upload(obj, _file)
l.warn("Finished file %s ", obj.name)
- except Exception, e:
+ except Exception:
l.error("Failed to upload %s, requeueing", _file)
jobs.put((_file, target,))
# in case we got disconnected, reset the container
- container = get_container(app, name=_container)
+ app.authenticate()
+ container = get_container(app)
def chunk_upload(obj, filename, headers=None):
@@ -575,7 +641,7 @@ def asblocks(_f, buflen=_DEFAULT_OS_BUFLEN):
oargs.add_option('-d', '--datacenter', nargs=1, type='str',
help="Datacenter of the container. "
- "A contiainer will be created if it doesn't exist. "
+ "A container will be created if it doesn't exist. "
"(default: %s)" % Application._DEFAULT_DC,
metavar=Application._DEFAULT_DC)

0 comments on commit 45f0648

Please sign in to comment.