Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

sync: refactor parent/child and single process code

os.fork() and os.wait() don't exist on Windows, and the
multiprocessing module doesn't exist until python 2.6.  So instead, we
conditionalize calling os.fork() depending on its existance, and on
there being > 1 destination.

Also simply rearranges the code so that subfunctions within
local2remote are defined at the top of their respective functions, for
better readability through the main execution of the function.
  • Loading branch information...
commit fb6441ca6753e79a1944616de25af5a7cbb1e694 1 parent 9a5d3ef
@mdomsch authored committed
Showing with 156 additions and 144 deletions.
  1. +156 −144 s3cmd
View
300 s3cmd
@@ -907,175 +907,187 @@ def cmd_sync_local2remote(args):
s3.object_delete(uri)
output(u"deleted: '%s'" % uri)
- s3 = S3(cfg)
-
- if cfg.encrypt:
- error(u"S3cmd 'sync' doesn't yet support GPG encryption, sorry.")
- error(u"Either use unconditional 's3cmd put --recursive'")
- error(u"or disable encryption with --no-encrypt parameter.")
- sys.exit(1)
-
- local_list, single_file_local = fetch_local_list(args[:-1], recursive = True)
-
- # Now that we've done all the disk I/O to look at the local file system and
- # calculate the md5 for each file, fork for each destination to upload to them separately
- # and in parallel
- child_pids = []
- destinations = [args[-1]]
- if cfg.additional_destinations:
- destinations = destinations + cfg.additional_destinations
-
- for dest in destinations:
- ## Normalize URI to convert s3://bkt to s3://bkt/ (trailing slash)
- destination_base_uri = S3Uri(dest)
- if destination_base_uri.type != 's3':
- raise ParameterError("Destination must be S3Uri. Got: %s" % destination_base_uri)
- destination_base = str(destination_base_uri)
- child_pid = os.fork()
- if child_pid == 0:
- is_parent = False
- break
- else:
- is_parent = True
- child_pids.append(child_pid)
+ def _single_process(local_list):
+ for dest in destinations:
+ ## Normalize URI to convert s3://bkt to s3://bkt/ (trailing slash)
+ destination_base_uri = S3Uri(dest)
+ if destination_base_uri.type != 's3':
+ raise ParameterError("Destination must be S3Uri. Got: %s" % destination_base_uri)
+ destination_base = str(destination_base_uri)
+ _child(destination_base, local_list)
+
+ def _parent():
+ # Now that we've done all the disk I/O to look at the local file system and
+ # calculate the md5 for each file, fork for each destination to upload to them separately
+ # and in parallel
+ child_pids = []
+
+ for dest in destinations:
+ ## Normalize URI to convert s3://bkt to s3://bkt/ (trailing slash)
+ destination_base_uri = S3Uri(dest)
+ if destination_base_uri.type != 's3':
+ raise ParameterError("Destination must be S3Uri. Got: %s" % destination_base_uri)
+ destination_base = str(destination_base_uri)
+ child_pid = os.fork()
+ if child_pid == 0:
+ _child(destination_base, local_list)
+ os._exit(0)
+ else:
+ child_pids.append(child_pid)
- if is_parent:
while len(child_pids):
(pid, status) = os.wait()
child_pids.remove(pid)
return
- # This is all executed in the child thread
- # remember to leave here with os._exit() so control doesn't resume elsewhere
- remote_list = fetch_remote_list(destination_base, recursive = True, require_attribs = True)
+ def _child(destination_base, local_list):
+ def _set_remote_uri(local_list, destination_base, single_file_local):
+ if len(local_list) > 0:
+ ## Populate 'remote_uri' only if we've got something to upload
+ if not destination_base.endswith("/"):
+ if not single_file_local:
+ raise ParameterError("Destination S3 URI must end with '/' (ie must refer to a directory on the remote side).")
+ local_list[local_list.keys()[0]]['remote_uri'] = unicodise(destination_base)
+ else:
+ for key in local_list:
+ local_list[key]['remote_uri'] = unicodise(destination_base + key)
+
+ def _upload(local_list, seq, total, total_size):
+ file_list = local_list.keys()
+ file_list.sort()
+ for file in file_list:
+ seq += 1
+ item = local_list[file]
+ src = item['full_name']
+ uri = S3Uri(item['remote_uri'])
+ seq_label = "[%d of %d]" % (seq, total)
+ extra_headers = copy(cfg.extra_headers)
+ try:
+ if cfg.preserve_attrs:
+ attr_header = _build_attr_header(local_list, file)
+ debug(u"attr_header: %s" % attr_header)
+ extra_headers.update(attr_header)
+ response = s3.object_put(src, uri, extra_headers, extra_label = seq_label)
+ except InvalidFileError, e:
+ warning(u"File can not be uploaded: %s" % e)
+ continue
+ except S3UploadError, e:
+ error(u"%s: upload failed too many times. Skipping that file." % item['full_name_unicode'])
+ continue
+ speed_fmt = formatSize(response["speed"], human_readable = True, floating_point = True)
+ if not cfg.progress_meter:
+ output(u"File '%s' stored as '%s' (%d bytes in %0.1f seconds, %0.2f %sB/s) %s" %
+ (item['full_name_unicode'], uri, response["size"], response["elapsed"],
+ speed_fmt[0], speed_fmt[1], seq_label))
+ total_size += response["size"]
+ uploaded_objects_list.append(uri.object())
+ return seq, total_size
- local_count = len(local_list)
- remote_count = len(remote_list)
+ remote_list = fetch_remote_list(destination_base, recursive = True, require_attribs = True)
- info(u"Found %d local files, %d remote files" % (local_count, remote_count))
+ local_count = len(local_list)
+ remote_count = len(remote_list)
- local_list, exclude_list = filter_exclude_include(local_list)
-
- if single_file_local and len(local_list) == 1 and len(remote_list) == 1:
- ## Make remote_key same as local_key for comparison if we're dealing with only one file
- remote_list_entry = remote_list[remote_list.keys()[0]]
- # Flush remote_list, by the way
- remote_list = { local_list.keys()[0] : remote_list_entry }
+ info(u"Found %d local files, %d remote files" % (local_count, remote_count))
- local_list, remote_list, update_list, copy_pairs = compare_filelists(local_list, remote_list, src_remote = False, dst_remote = True, delay_updates = cfg.delay_updates)
+ local_list, exclude_list = filter_exclude_include(local_list)
+ if single_file_local and len(local_list) == 1 and len(remote_list) == 1:
+ ## Make remote_key same as local_key for comparison if we're dealing with only one file
+ remote_list_entry = remote_list[remote_list.keys()[0]]
+ # Flush remote_list, by the way
+ remote_list = { local_list.keys()[0] : remote_list_entry }
- local_count = len(local_list)
- update_count = len(update_list)
- copy_count = len(copy_pairs)
- remote_count = len(remote_list)
+ local_list, remote_list, update_list, copy_pairs = compare_filelists(local_list, remote_list, src_remote = False, dst_remote = True, delay_updates = cfg.delay_updates)
- info(u"Summary: %d local files to upload, %d files to remote copy, %d remote files to delete" % (local_count + update_count, copy_count, remote_count))
+ local_count = len(local_list)
+ update_count = len(update_list)
+ copy_count = len(copy_pairs)
+ remote_count = len(remote_list)
- def _set_remote_uri(local_list, destination_base, single_file_local):
- if len(local_list) > 0:
- ## Populate 'remote_uri' only if we've got something to upload
- if not destination_base.endswith("/"):
- if not single_file_local:
- raise ParameterError("Destination S3 URI must end with '/' (ie must refer to a directory on the remote side).")
- local_list[local_list.keys()[0]]['remote_uri'] = unicodise(destination_base)
- else:
- for key in local_list:
- local_list[key]['remote_uri'] = unicodise(destination_base + key)
+ info(u"Summary: %d local files to upload, %d files to remote copy, %d remote files to delete" % (local_count + update_count, copy_count, remote_count))
- _set_remote_uri(local_list, destination_base, single_file_local)
- _set_remote_uri(update_list, destination_base, single_file_local)
-
- if cfg.dry_run:
- for key in exclude_list:
- output(u"exclude: %s" % unicodise(key))
- for key in local_list:
- output(u"upload: %s -> %s" % (local_list[key]['full_name_unicode'], local_list[key]['remote_uri']))
- for key in update_list:
- output(u"upload: %s -> %s" % (update_list[key]['full_name_unicode'], update_list[key]['remote_uri']))
- for (dst1, dst2) in copy_pairs:
- output(u"remote copy: %s -> %s" % (dst1['object_key'], remote_list[dst2]['object_key']))
- if cfg.delete_removed:
- for key in remote_list:
- output(u"delete: %s" % remote_list[key]['object_uri_str'])
+ _set_remote_uri(local_list, destination_base, single_file_local)
+ _set_remote_uri(update_list, destination_base, single_file_local)
- warning(u"Exitting now because of --dry-run")
- os._exit(0)
+ if cfg.dry_run:
+ for key in exclude_list:
+ output(u"exclude: %s" % unicodise(key))
+ for key in local_list:
+ output(u"upload: %s -> %s" % (local_list[key]['full_name_unicode'], local_list[key]['remote_uri']))
+ for key in update_list:
+ output(u"upload: %s -> %s" % (update_list[key]['full_name_unicode'], update_list[key]['remote_uri']))
+ for (dst1, dst2) in copy_pairs:
+ output(u"remote copy: %s -> %s" % (dst1['object_key'], remote_list[dst2]['object_key']))
+ if cfg.delete_removed:
+ for key in remote_list:
+ output(u"delete: %s" % remote_list[key]['object_uri_str'])
+
+ warning(u"Exitting now because of --dry-run")
+ return
- # if there are copy pairs, we can't do delete_before, on the chance
- # we need one of the to-be-deleted files as a copy source.
- if len(copy_pairs) > 0:
- cfg.delete_after = True
+ # if there are copy pairs, we can't do delete_before, on the chance
+ # we need one of the to-be-deleted files as a copy source.
+ if len(copy_pairs) > 0:
+ cfg.delete_after = True
+
+ if cfg.delete_removed and not cfg.delete_after:
+ _do_deletes(s3, remote_list)
+
+ uploaded_objects_list = []
+ total_size = 0
+ total_elapsed = 0.0
+ timestamp_start = time.time()
+ n, total_size = _upload(local_list, 0, local_count, total_size)
+ n, total_size = _upload(update_list, n, local_count, total_size)
+ n_copies, saved_bytes = remote_copy(s3, copy_pairs, destination_base)
+ if cfg.delete_removed and cfg.delete_after:
+ _do_deletes(s3, remote_list)
+ total_elapsed = time.time() - timestamp_start
+ total_speed = total_elapsed and total_size/total_elapsed or 0.0
+ speed_fmt = formatSize(total_speed, human_readable = True, floating_point = True)
+
+ # Only print out the result if any work has been done or
+ # if the user asked for verbose output
+ outstr = "Done. Uploaded %d bytes in %0.1f seconds, %0.2f %sB/s. Copied %d files saving %d bytes transfer." % (total_size, total_elapsed, speed_fmt[0], speed_fmt[1], n_copies, saved_bytes)
+ if total_size + saved_bytes > 0:
+ output(outstr)
+ else:
+ info(outstr)
- if cfg.delete_removed and not cfg.delete_after:
- _do_deletes(s3, remote_list)
+ if cfg.invalidate_on_cf:
+ if len(uploaded_objects_list) == 0:
+ info("Nothing to invalidate in CloudFront")
+ else:
+ # 'uri' from the last iteration is still valid at this point
+ cf = CloudFront(cfg)
+ result = cf.InvalidateObjects(uri, uploaded_objects_list)
+ if result['status'] == 201:
+ output("Created invalidation request for %d paths" % len(uploaded_objects_list))
+ output("Check progress with: s3cmd cfinvalinfo cf://%s/%s" % (result['dist_id'], result['request_id']))
- uploaded_objects_list = []
- total_size = 0
- total_elapsed = 0.0
- timestamp_start = time.time()
+ return
- def _upload(local_list, seq, total, total_size):
- file_list = local_list.keys()
- file_list.sort()
- for file in file_list:
- seq += 1
- item = local_list[file]
- src = item['full_name']
- uri = S3Uri(item['remote_uri'])
- seq_label = "[%d of %d]" % (seq, total)
- extra_headers = copy(cfg.extra_headers)
- try:
- if cfg.preserve_attrs:
- attr_header = _build_attr_header(local_list, file)
- debug(u"attr_header: %s" % attr_header)
- extra_headers.update(attr_header)
- response = s3.object_put(src, uri, extra_headers, extra_label = seq_label)
- except InvalidFileError, e:
- warning(u"File can not be uploaded: %s" % e)
- continue
- except S3UploadError, e:
- error(u"%s: upload failed too many times. Skipping that file." % item['full_name_unicode'])
- continue
- speed_fmt = formatSize(response["speed"], human_readable = True, floating_point = True)
- if not cfg.progress_meter:
- output(u"File '%s' stored as '%s' (%d bytes in %0.1f seconds, %0.2f %sB/s) %s" %
- (item['full_name_unicode'], uri, response["size"], response["elapsed"],
- speed_fmt[0], speed_fmt[1], seq_label))
- total_size += response["size"]
- uploaded_objects_list.append(uri.object())
- return seq, total_size
+ # main execution
+ s3 = S3(cfg)
+ if cfg.encrypt:
+ error(u"S3cmd 'sync' doesn't yet support GPG encryption, sorry.")
+ error(u"Either use unconditional 's3cmd put --recursive'")
+ error(u"or disable encryption with --no-encrypt parameter.")
+ sys.exit(1)
- n, total_size = _upload(local_list, 0, local_count, total_size)
- n, total_size = _upload(update_list, n, local_count, total_size)
- n_copies, saved_bytes = remote_copy(s3, copy_pairs, destination_base)
- if cfg.delete_removed and cfg.delete_after:
- _do_deletes(s3, remote_list)
- total_elapsed = time.time() - timestamp_start
- total_speed = total_elapsed and total_size/total_elapsed or 0.0
- speed_fmt = formatSize(total_speed, human_readable = True, floating_point = True)
+ local_list, single_file_local = fetch_local_list(args[:-1], recursive = True)
- # Only print out the result if any work has been done or
- # if the user asked for verbose output
- outstr = "Done. Uploaded %d bytes in %0.1f seconds, %0.2f %sB/s. Copied %d files saving %d bytes transfer." % (total_size, total_elapsed, speed_fmt[0], speed_fmt[1], n_copies, saved_bytes)
- if total_size + saved_bytes > 0:
- output(outstr)
+ destinations = [args[-1]]
+ if cfg.additional_destinations:
+ destinations = destinations + cfg.additional_destinations
+
+ if 'fork' not in os.__all__ or len(destinations) < 2:
+ _single_process(local_list)
else:
- info(outstr)
+ _parent()
- if cfg.invalidate_on_cf:
- if len(uploaded_objects_list) == 0:
- info("Nothing to invalidate in CloudFront")
- else:
- # 'uri' from the last iteration is still valid at this point
- cf = CloudFront(cfg)
- result = cf.InvalidateObjects(uri, uploaded_objects_list)
- if result['status'] == 201:
- output("Created invalidation request for %d paths" % len(uploaded_objects_list))
- output("Check progress with: s3cmd cfinvalinfo cf://%s/%s" % (result['dist_id'], result['request_id']))
-
- return os._exit(0)
def cmd_sync(args):
if (len(args) < 2):
Please sign in to comment.
Something went wrong with that request. Please try again.