Permalink
Browse files

sync: add --add-destination, parallelize uploads to multiple destinat…

…ions

Only meaningful at present in the sync local->remote(s) case, this
adds the --add-destination <foo> command line option.  For the last
arg (the traditional destination), and each destination specified via
--add-destination, fork and upload after the initial walk of the local
file system has completed (and done all the disk I/O to calculate md5
values for each file).

This keeps us from pounding the file system doing (the same) disk I/O
for each possible destination, and allows full use of our bandwidth to
upload in parallel.
  • Loading branch information...
1 parent ddb5ef9 commit 07c9e2de983df3f4ee352b7ac2cdaa643eb6dda8 @mdomsch committed Jun 18, 2012
Showing with 39 additions and 7 deletions.
  1. +1 −0 S3/Config.py
  2. +38 −7 s3cmd
View
@@ -86,6 +86,7 @@ class Config(object):
website_index = "index.html"
website_error = ""
website_endpoint = "http://%(bucket)s.s3-website-%(location)s.amazonaws.com/"
+ additional_destinations = []
## Creating a singleton
def __new__(self, configfile = None):
View
45 s3cmd
@@ -915,13 +915,38 @@ def cmd_sync_local2remote(args):
error(u"or disable encryption with --no-encrypt parameter.")
sys.exit(1)
- ## Normalize URI to convert s3://bkt to s3://bkt/ (trailing slash)
- destination_base_uri = S3Uri(args[-1])
- if destination_base_uri.type != 's3':
- raise ParameterError("Destination must be S3Uri. Got: %s" % destination_base_uri)
- destination_base = str(destination_base_uri)
-
local_list, single_file_local = fetch_local_list(args[:-1], recursive = True)
+
+ # Now that we've done all the disk I/O to look at the local file system and
+ # calculate the md5 for each file, fork for each destination to upload to them separately
+ # and in parallel
+ child_pids = []
+ destinations = [args[-1]]
+ if cfg.additional_destinations:
+ destinations = destinations + cfg.additional_destinations
+
+ for dest in destinations:
+ ## Normalize URI to convert s3://bkt to s3://bkt/ (trailing slash)
+ destination_base_uri = S3Uri(dest)
+ if destination_base_uri.type != 's3':
+ raise ParameterError("Destination must be S3Uri. Got: %s" % destination_base_uri)
+ destination_base = str(destination_base_uri)
+ child_pid = os.fork()
+ if child_pid == 0:
+ is_parent = False
+ break
+ else:
+ is_parent = True
+ child_pids.append(child_pid)
+
+ if is_parent:
+ while len(child_pids):
+ (pid, status) = os.wait()
+ child_pids.remove(pid)
+ return
+
+ # This is all executed in the child thread
+ # remember to leave here with os._exit() so control doesn't resume elsewhere
remote_list = fetch_remote_list(destination_base, recursive = True, require_attribs = True)
local_count = len(local_list)
@@ -975,7 +1000,7 @@ def cmd_sync_local2remote(args):
output(u"delete: %s" % remote_list[key]['object_uri_str'])
warning(u"Exitting now because of --dry-run")
- return
+ os._exit(0)
# if there are copy pairs, we can't do delete_before, on the chance
# we need one of the to-be-deleted files as a copy source.
@@ -1050,6 +1075,8 @@ def cmd_sync_local2remote(args):
output("Created invalidation request for %d paths" % len(uploaded_objects_list))
output("Check progress with: s3cmd cfinvalinfo cf://%s/%s" % (result['dist_id'], result['request_id']))
+ return os._exit(0)
+
def cmd_sync(args):
if (len(args) < 2):
raise ParameterError("Too few parameters! Expected: %s" % commands['sync']['param'])
@@ -1594,6 +1621,7 @@ def main():
optparser.add_option( "--no-delete-removed", dest="delete_removed", action="store_false", help="Don't delete remote objects.")
optparser.add_option( "--delete-after", dest="delete_after", action="store_true", help="Perform deletes after new uploads [sync]")
optparser.add_option( "--delay-updates", dest="delay_updates", action="store_true", help="Put all updated files into place at end [sync]")
+ optparser.add_option( "--add-destination", dest="additional_destinations", action="append", help="Additional destination for parallel uploads, in addition to last arg. May be repeated.")
optparser.add_option("-p", "--preserve", dest="preserve_attrs", action="store_true", help="Preserve filesystem attributes (mode, ownership, timestamps). Default for [sync] command.")
optparser.add_option( "--no-preserve", dest="preserve_attrs", action="store_false", help="Don't store FS attributes")
optparser.add_option( "--exclude", dest="exclude", action="append", metavar="GLOB", help="Filenames and paths matching GLOB will be excluded from sync")
@@ -1769,6 +1797,9 @@ def main():
## Some CloudFront.Cmd.Options() options are not settable from command line
pass
+ if options.additional_destinations:
+ cfg.additional_destinations = options.additional_destinations
+
## Set output and filesystem encoding for printing out filenames.
sys.stdout = codecs.getwriter(cfg.encoding)(sys.stdout, "replace")
sys.stderr = codecs.getwriter(cfg.encoding)(sys.stderr, "replace")

0 comments on commit 07c9e2d

Please sign in to comment.