Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Loading…

add --delay-updates option #34

Merged
merged 1 commit into from

2 participants

@mdomsch
Owner

This code adds --delay-updates to the sync command. The source list is split into two parts: files that are new, and files that are being updated. With --delay-updates, the new files are transferred first, and then the updated files are transferred second.

This will keep a yum or deb repo consistent for a longer period of time, only changing out the updated files (metadata files) at the last possible instance.

@mludvig mludvig merged commit c42c3f2 into from
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Mar 1, 2012
  1. @mdomsch

    add --delay-updates option

    mdomsch authored
This page is out of date. Refresh to see the latest.
Showing with 191 additions and 146 deletions.
  1. +1 −0  S3/Config.py
  2. +10 −2 S3/FileLists.py
  3. +177 −144 s3cmd
  4. +3 −0  s3cmd.1
View
1  S3/Config.py
@@ -54,6 +54,7 @@ class Config(object):
]
delete_removed = False
_doc['delete_removed'] = "[sync] Remove remote S3 objects when local file has been deleted"
+ delay_updates = False
gpg_passphrase = ""
gpg_command = ""
gpg_encrypt = "%(gpg_command)s -c --verbose --no-use-agent --batch --yes --passphrase-fd %(passphrase_fd)s -o %(output_file)s %(input_file)s"
View
12 S3/FileLists.py
@@ -262,7 +262,7 @@ def _get_filelist_remote(remote_uri, recursive = True):
remote_list[key] = remote_item
return remote_list
-def compare_filelists(src_list, dst_list, src_remote, dst_remote):
+def compare_filelists(src_list, dst_list, src_remote, dst_remote, delay_updates = False):
def __direction_str(is_remote):
return is_remote and "remote" or "local"
@@ -272,6 +272,7 @@ def __direction_str(is_remote):
info(u"Verifying attributes...")
cfg = Config()
exists_list = SortedDict(ignore_case = False)
+ update_list = SortedDict(ignore_case = False)
debug("Comparing filelists (direction: %s -> %s)" % (__direction_str(src_remote), __direction_str(dst_remote)))
debug("src_list.keys: %s" % src_list.keys())
@@ -331,10 +332,17 @@ def __direction_str(is_remote):
debug(u"IGNR: %s (transfer not needed)" % file)
exists_list[file] = src_list[file]
del(src_list[file])
+ else:
+ if delay_updates:
+ ## Remove from source-list, all that is left there will be transferred
+ ## Add to update-list to transfer last
+ debug(u"XFER UPDATE: %s" % file)
+ update_list[file] = src_list[file]
+ del(src_list[file])
## Remove from destination-list, all that is left there will be deleted
del(dst_list[file])
- return src_list, dst_list, exists_list
+ return src_list, dst_list, exists_list, update_list
# vim:et:ts=4:sts=4:ai
View
321 s3cmd
@@ -597,9 +597,10 @@ def cmd_sync_remote2remote(args):
src_list, exclude_list = filter_exclude_include(src_list)
- src_list, dst_list, existing_list = compare_filelists(src_list, dst_list, src_remote = True, dst_remote = True)
+ src_list, dst_list, existing_list, update_list = compare_filelists(src_list, dst_list, src_remote = True, dst_remote = True, delay_updates = cfg.delay_updates)
src_count = len(src_list)
+ update_count = len(update_list)
dst_count = len(dst_list)
print(u"Summary: %d source files to copy, %d files at destination to delete" % (src_count, dst_count))
@@ -631,23 +632,29 @@ def cmd_sync_remote2remote(args):
s3.object_delete(uri)
output(u"deleted: '%s'" % uri)
+ def _upload(src_list, seq, src_count):
+ file_list = src_list.keys()
+ file_list.sort()
+ for file in file_list:
+ seq += 1
+ item = src_list[file]
+ src_uri = S3Uri(item['object_uri_str'])
+ dst_uri = S3Uri(item['target_uri'])
+ seq_label = "[%d of %d]" % (seq, src_count)
+ extra_headers = copy(cfg.extra_headers)
+ try:
+ response = s3.object_copy(src_uri, dst_uri, extra_headers)
+ output("File %(src)s copied to %(dst)s" % { "src" : src_uri, "dst" : dst_uri })
+ except S3Error, e:
+ error("File %(src)s could not be copied: %(e)s" % { "src" : src_uri, "e" : e })
+ return seq
+
# Perform the synchronization of files
timestamp_start = time.time()
seq = 0
- file_list = src_list.keys()
- file_list.sort()
- for file in file_list:
- seq += 1
- item = src_list[file]
- src_uri = S3Uri(item['object_uri_str'])
- dst_uri = S3Uri(item['target_uri'])
- seq_label = "[%d of %d]" % (seq, src_count)
- extra_headers = copy(cfg.extra_headers)
- try:
- response = s3.object_copy(src_uri, dst_uri, extra_headers)
- output("File %(src)s copied to %(dst)s" % { "src" : src_uri, "dst" : dst_uri })
- except S3Error, e:
- error("File %(src)s could not be copied: %(e)s" % { "src" : src_uri, "e" : e })
+ seq = _upload(src_list, seq, src_count + update_count)
+ seq = _upload(update_list, seq, src_count + update_count)
+
total_elapsed = time.time() - timestamp_start
outstr = "Done. Copied %d files in %0.1f seconds, %0.2f files/s" % (seq, total_elapsed, seq/total_elapsed)
if seq > 0:
@@ -676,27 +683,32 @@ def cmd_sync_remote2local(args):
remote_list, exclude_list = filter_exclude_include(remote_list)
- remote_list, local_list, existing_list = compare_filelists(remote_list, local_list, src_remote = True, dst_remote = False)
+ remote_list, local_list, existing_list, update_list = compare_filelists(remote_list, local_list, src_remote = True, dst_remote = False, delay_updates = cfg.delay_updates)
local_count = len(local_list)
remote_count = len(remote_list)
+ update_count = len(update_list)
- info(u"Summary: %d remote files to download, %d local files to delete" % (remote_count, local_count))
-
- if not os.path.isdir(destination_base):
- ## We were either given a file name (existing or not) or want STDOUT
- if remote_count > 1:
- raise ParameterError("Destination must be a directory when downloading multiple sources.")
- remote_list[remote_list.keys()[0]]['local_filename'] = deunicodise(destination_base)
- else:
- if destination_base[-1] != os.path.sep:
- destination_base += os.path.sep
- for key in remote_list:
- local_filename = destination_base + key
- if os.path.sep != "/":
- local_filename = os.path.sep.join(local_filename.split("/"))
- remote_list[key]['local_filename'] = deunicodise(local_filename)
+ info(u"Summary: %d remote files to download, %d local files to delete" % (remote_count + update_count, local_count))
+ def _set_local_filename(remote_list, destination_base):
+ if not os.path.isdir(destination_base):
+ ## We were either given a file name (existing or not) or want STDOUT
+ if len(remote_list) > 1:
+ raise ParameterError("Destination must be a directory when downloading multiple sources.")
+ remote_list[remote_list.keys()[0]]['local_filename'] = deunicodise(destination_base)
+ else:
+ if destination_base[-1] != os.path.sep:
+ destination_base += os.path.sep
+ for key in remote_list:
+ local_filename = destination_base + key
+ if os.path.sep != "/":
+ local_filename = os.path.sep.join(local_filename.split("/"))
+ remote_list[key]['local_filename'] = deunicodise(local_filename)
+
+ _set_local_filename(remote_list, destination_base)
+ _set_local_filename(update_list, destination_base)
+
if cfg.dry_run:
for key in exclude_list:
output(u"exclude: %s" % unicodise(key))
@@ -705,6 +717,8 @@ def cmd_sync_remote2local(args):
output(u"delete: %s" % local_list[key]['full_name_unicode'])
for key in remote_list:
output(u"download: %s -> %s" % (remote_list[key]['object_uri_str'], remote_list[key]['local_filename']))
+ for key in update_list:
+ output(u"download: %s -> %s" % (update_list[key]['object_uri_str'], update_list[key]['local_filename']))
warning(u"Exitting now because of --dry-run")
return
@@ -714,85 +728,90 @@ def cmd_sync_remote2local(args):
os.unlink(local_list[key]['full_name'])
output(u"deleted: %s" % local_list[key]['full_name_unicode'])
- total_size = 0
- total_elapsed = 0.0
- timestamp_start = time.time()
- seq = 0
- dir_cache = {}
- file_list = remote_list.keys()
- file_list.sort()
- for file in file_list:
- seq += 1
- item = remote_list[file]
- uri = S3Uri(item['object_uri_str'])
- dst_file = item['local_filename']
- seq_label = "[%d of %d]" % (seq, remote_count)
- try:
- dst_dir = os.path.dirname(dst_file)
- if not dir_cache.has_key(dst_dir):
- dir_cache[dst_dir] = Utils.mkdir_with_parents(dst_dir)
- if dir_cache[dst_dir] == False:
- warning(u"%s: destination directory not writable: %s" % (file, dst_dir))
- continue
+ def _download(remote_list, seq, total, total_size, dir_cache):
+ file_list = remote_list.keys()
+ file_list.sort()
+ for file in file_list:
+ seq += 1
+ item = remote_list[file]
+ uri = S3Uri(item['object_uri_str'])
+ dst_file = item['local_filename']
+ seq_label = "[%d of %d]" % (seq, total)
try:
- open_flags = os.O_CREAT
- open_flags |= os.O_TRUNC
- # open_flags |= os.O_EXCL
-
- debug(u"dst_file=%s" % unicodise(dst_file))
- # This will have failed should the file exist
- os.close(os.open(dst_file, open_flags))
- # Yeah I know there is a race condition here. Sadly I don't know how to open() in exclusive mode.
- dst_stream = open(dst_file, "wb")
- response = s3.object_get(uri, dst_stream, extra_label = seq_label)
- dst_stream.close()
- if response['headers'].has_key('x-amz-meta-s3cmd-attrs') and cfg.preserve_attrs:
- attrs = _parse_attrs_header(response['headers']['x-amz-meta-s3cmd-attrs'])
- if attrs.has_key('mode'):
- os.chmod(dst_file, int(attrs['mode']))
- if attrs.has_key('mtime') or attrs.has_key('atime'):
- mtime = attrs.has_key('mtime') and int(attrs['mtime']) or int(time.time())
- atime = attrs.has_key('atime') and int(attrs['atime']) or int(time.time())
- os.utime(dst_file, (atime, mtime))
- ## FIXME: uid/gid / uname/gname handling comes here! TODO
- except OSError, e:
- try: dst_stream.close()
- except: pass
- if e.errno == errno.EEXIST:
- warning(u"%s exists - not overwriting" % (dst_file))
- continue
- if e.errno in (errno.EPERM, errno.EACCES):
- warning(u"%s not writable: %s" % (dst_file, e.strerror))
+ dst_dir = os.path.dirname(dst_file)
+ if not dir_cache.has_key(dst_dir):
+ dir_cache[dst_dir] = Utils.mkdir_with_parents(dst_dir)
+ if dir_cache[dst_dir] == False:
+ warning(u"%s: destination directory not writable: %s" % (file, dst_dir))
continue
- if e.errno == errno.EISDIR:
- warning(u"%s is a directory - skipping over" % dst_file)
+ try:
+ open_flags = os.O_CREAT
+ open_flags |= os.O_TRUNC
+ # open_flags |= os.O_EXCL
+
+ debug(u"dst_file=%s" % unicodise(dst_file))
+ # This will have failed should the file exist
+ os.close(os.open(dst_file, open_flags))
+ # Yeah I know there is a race condition here. Sadly I don't know how to open() in exclusive mode.
+ dst_stream = open(dst_file, "wb")
+ response = s3.object_get(uri, dst_stream, extra_label = seq_label)
+ dst_stream.close()
+ if response['headers'].has_key('x-amz-meta-s3cmd-attrs') and cfg.preserve_attrs:
+ attrs = _parse_attrs_header(response['headers']['x-amz-meta-s3cmd-attrs'])
+ if attrs.has_key('mode'):
+ os.chmod(dst_file, int(attrs['mode']))
+ if attrs.has_key('mtime') or attrs.has_key('atime'):
+ mtime = attrs.has_key('mtime') and int(attrs['mtime']) or int(time.time())
+ atime = attrs.has_key('atime') and int(attrs['atime']) or int(time.time())
+ os.utime(dst_file, (atime, mtime))
+ ## FIXME: uid/gid / uname/gname handling comes here! TODO
+ except OSError, e:
+ try: dst_stream.close()
+ except: pass
+ if e.errno == errno.EEXIST:
+ warning(u"%s exists - not overwriting" % (dst_file))
+ continue
+ if e.errno in (errno.EPERM, errno.EACCES):
+ warning(u"%s not writable: %s" % (dst_file, e.strerror))
+ continue
+ if e.errno == errno.EISDIR:
+ warning(u"%s is a directory - skipping over" % dst_file)
+ continue
+ raise e
+ except KeyboardInterrupt:
+ try: dst_stream.close()
+ except: pass
+ warning(u"Exiting after keyboard interrupt")
+ return
+ except Exception, e:
+ try: dst_stream.close()
+ except: pass
+ error(u"%s: %s" % (file, e))
continue
- raise e
- except KeyboardInterrupt:
+ # We have to keep repeating this call because
+ # Python 2.4 doesn't support try/except/finally
+ # construction :-(
try: dst_stream.close()
except: pass
- warning(u"Exiting after keyboard interrupt")
- return
- except Exception, e:
- try: dst_stream.close()
- except: pass
- error(u"%s: %s" % (file, e))
+ except S3DownloadError, e:
+ error(u"%s: download failed too many times. Skipping that file." % file)
continue
- # We have to keep repeating this call because
- # Python 2.4 doesn't support try/except/finally
- # construction :-(
- try: dst_stream.close()
- except: pass
- except S3DownloadError, e:
- error(u"%s: download failed too many times. Skipping that file." % file)
- continue
- speed_fmt = formatSize(response["speed"], human_readable = True, floating_point = True)
- if not Config().progress_meter:
- output(u"File '%s' stored as '%s' (%d bytes in %0.1f seconds, %0.2f %sB/s) %s" %
- (uri, unicodise(dst_file), response["size"], response["elapsed"], speed_fmt[0], speed_fmt[1],
- seq_label))
- total_size += response["size"]
+ speed_fmt = formatSize(response["speed"], human_readable = True, floating_point = True)
+ if not Config().progress_meter:
+ output(u"File '%s' stored as '%s' (%d bytes in %0.1f seconds, %0.2f %sB/s) %s" %
+ (uri, unicodise(dst_file), response["size"], response["elapsed"], speed_fmt[0], speed_fmt[1],
+ seq_label))
+ total_size += response["size"]
+ return seq, total_size
+ total_size = 0
+ total_elapsed = 0.0
+ timestamp_start = time.time()
+ dir_cache = {}
+ seq = 0
+ seq, total_size = _download(remote_list, seq, remote_count + update_count, total_size, dir_cache)
+ seq, total_size = _download(update_list, seq, remote_count + update_count, total_size, dir_cache)
+
total_elapsed = time.time() - timestamp_start
speed_fmt = formatSize(total_size/total_elapsed, human_readable = True, floating_point = True)
@@ -858,30 +877,37 @@ def cmd_sync_local2remote(args):
info(u"Found %d local files, %d remote files" % (local_count, remote_count))
local_list, exclude_list = filter_exclude_include(local_list)
-
+
if single_file_local and len(local_list) == 1 and len(remote_list) == 1:
## Make remote_key same as local_key for comparison if we're dealing with only one file
remote_list_entry = remote_list[remote_list.keys()[0]]
# Flush remote_list, by the way
remote_list = { local_list.keys()[0] : remote_list_entry }
- local_list, remote_list, existing_list = compare_filelists(local_list, remote_list, src_remote = False, dst_remote = True)
+ local_list, remote_list, existing_list, update_list = compare_filelists(local_list, remote_list, src_remote = False, dst_remote = True, delay_updates = cfg.delay_updates)
+
local_count = len(local_list)
+ update_count = len(update_list)
remote_count = len(remote_list)
- info(u"Summary: %d local files to upload, %d remote files to delete" % (local_count, remote_count))
+ info(u"Summary: %d local files to upload, %d remote files to delete" % (local_count + update_count, remote_count))
- if local_count > 0:
- ## Populate 'remote_uri' only if we've got something to upload
- if not destination_base.endswith("/"):
- if not single_file_local:
- raise ParameterError("Destination S3 URI must end with '/' (ie must refer to a directory on the remote side).")
- local_list[local_list.keys()[0]]['remote_uri'] = unicodise(destination_base)
- else:
- for key in local_list:
- local_list[key]['remote_uri'] = unicodise(destination_base + key)
+ def _set_remote_uri(local_list, destination_base, single_file_local):
+ if len(local_list) > 0:
+ ## Populate 'remote_uri' only if we've got something to upload
+ if not destination_base.endswith("/"):
+ if not single_file_local:
+ raise ParameterError("Destination S3 URI must end with '/' (ie must refer to a directory on the remote side).")
+ local_list[local_list.keys()[0]]['remote_uri'] = unicodise(destination_base)
+ else:
+ for key in local_list:
+ local_list[key]['remote_uri'] = unicodise(destination_base + key)
+
+ _set_remote_uri(local_list, destination_base, single_file_local)
+ _set_remote_uri(update_list, destination_base, single_file_local)
+
if cfg.dry_run:
for key in exclude_list:
output(u"exclude: %s" % unicodise(key))
@@ -890,6 +916,8 @@ def cmd_sync_local2remote(args):
output(u"delete: %s" % remote_list[key]['object_uri_str'])
for key in local_list:
output(u"upload: %s -> %s" % (local_list[key]['full_name_unicode'], local_list[key]['remote_uri']))
+ for key in update_list:
+ output(u"upload: %s -> %s" % (update_list[key]['full_name_unicode'], update_list[key]['remote_uri']))
warning(u"Exitting now because of --dry-run")
return
@@ -904,36 +932,40 @@ def cmd_sync_local2remote(args):
total_size = 0
total_elapsed = 0.0
timestamp_start = time.time()
- seq = 0
- file_list = local_list.keys()
- file_list.sort()
- for file in file_list:
- seq += 1
- item = local_list[file]
- src = item['full_name']
- uri = S3Uri(item['remote_uri'])
- seq_label = "[%d of %d]" % (seq, local_count)
- extra_headers = copy(cfg.extra_headers)
- try:
- if cfg.preserve_attrs:
- attr_header = _build_attr_header(src)
- debug(u"attr_header: %s" % attr_header)
- extra_headers.update(attr_header)
- response = s3.object_put(src, uri, extra_headers, extra_label = seq_label)
- except InvalidFileError, e:
- warning(u"File can not be uploaded: %s" % e)
- continue
- except S3UploadError, e:
- error(u"%s: upload failed too many times. Skipping that file." % item['full_name_unicode'])
- continue
- speed_fmt = formatSize(response["speed"], human_readable = True, floating_point = True)
- if not cfg.progress_meter:
- output(u"File '%s' stored as '%s' (%d bytes in %0.1f seconds, %0.2f %sB/s) %s" %
- (item['full_name_unicode'], uri, response["size"], response["elapsed"],
- speed_fmt[0], speed_fmt[1], seq_label))
- total_size += response["size"]
- uploaded_objects_list.append(uri.object())
+ def _upload(local_list, seq, total, total_size):
+ file_list = local_list.keys()
+ file_list.sort()
+ for file in file_list:
+ seq += 1
+ item = local_list[file]
+ src = item['full_name']
+ uri = S3Uri(item['remote_uri'])
+ seq_label = "[%d of %d]" % (seq, total)
+ extra_headers = copy(cfg.extra_headers)
+ try:
+ if cfg.preserve_attrs:
+ attr_header = _build_attr_header(src)
+ debug(u"attr_header: %s" % attr_header)
+ extra_headers.update(attr_header)
+ response = s3.object_put(src, uri, extra_headers, extra_label = seq_label)
+ except InvalidFileError, e:
+ warning(u"File can not be uploaded: %s" % e)
+ continue
+ except S3UploadError, e:
+ error(u"%s: upload failed too many times. Skipping that file." % item['full_name_unicode'])
+ continue
+ speed_fmt = formatSize(response["speed"], human_readable = True, floating_point = True)
+ if not cfg.progress_meter:
+ output(u"File '%s' stored as '%s' (%d bytes in %0.1f seconds, %0.2f %sB/s) %s" %
+ (item['full_name_unicode'], uri, response["size"], response["elapsed"],
+ speed_fmt[0], speed_fmt[1], seq_label))
+ total_size += response["size"]
+ uploaded_objects_list.append(uri.object())
+ return seq, total_size
+
+ n, total_size = _upload(local_list, 0, local_count, total_size)
+ n, total_size = _upload(update_list, n, local_count, total_size)
total_elapsed = time.time() - timestamp_start
total_speed = total_elapsed and total_size/total_elapsed or 0.0
speed_fmt = formatSize(total_speed, human_readable = True, floating_point = True)
@@ -1499,6 +1531,7 @@ def main():
optparser.add_option( "--delete-removed", dest="delete_removed", action="store_true", help="Delete remote objects with no corresponding local file [sync]")
optparser.add_option( "--no-delete-removed", dest="delete_removed", action="store_false", help="Don't delete remote objects.")
+ optparser.add_option( "--delay-updates", dest="delay_updates", action="store_true", help="Put all updated files into place at end [sync]")
optparser.add_option("-p", "--preserve", dest="preserve_attrs", action="store_true", help="Preserve filesystem attributes (mode, ownership, timestamps). Default for [sync] command.")
optparser.add_option( "--no-preserve", dest="preserve_attrs", action="store_false", help="Don't store FS attributes")
optparser.add_option( "--exclude", dest="exclude", action="append", metavar="GLOB", help="Filenames and paths matching GLOB will be excluded from sync")
View
3  s3cmd.1
@@ -186,6 +186,9 @@ Delete remote objects with no corresponding local file
\fB\-\-no\-delete\-removed\fR
Don't delete remote objects.
.TP
+\fB\-\-delay\-updates\fR
+Put all updated files into place at end [sync]
+.TP
\fB\-p\fR, \fB\-\-preserve\fR
Preserve filesystem attributes (mode, ownership,
timestamps). Default for [sync] command.
Something went wrong with that request. Please try again.