diff --git a/S3/Config.py b/S3/Config.py index ed9b3c04d..2ea988697 100644 --- a/S3/Config.py +++ b/S3/Config.py @@ -55,6 +55,7 @@ class Config(object): delete_removed = False delete_after = False _doc['delete_removed'] = "[sync] Remove remote S3 objects when local file has been deleted" + delay_updates = False gpg_passphrase = "" gpg_command = "" gpg_encrypt = "%(gpg_command)s -c --verbose --no-use-agent --batch --yes --passphrase-fd %(passphrase_fd)s -o %(output_file)s %(input_file)s" diff --git a/S3/FileLists.py b/S3/FileLists.py index 6756d5758..ea2878201 100644 --- a/S3/FileLists.py +++ b/S3/FileLists.py @@ -323,7 +323,7 @@ def _get_filelist_remote(remote_uri, recursive = True): remote_list[key] = remote_item return remote_list -def compare_filelists(src_list, dst_list, src_remote, dst_remote): +def compare_filelists(src_list, dst_list, src_remote, dst_remote, delay_updates = False): def __direction_str(is_remote): return is_remote and "remote" or "local" @@ -333,6 +333,7 @@ def __direction_str(is_remote): info(u"Verifying attributes...") cfg = Config() exists_list = SortedDict(ignore_case = False) + update_list = SortedDict(ignore_case = False) debug("Comparing filelists (direction: %s -> %s)" % (__direction_str(src_remote), __direction_str(dst_remote))) debug("src_list.keys: %s" % src_list.keys()) @@ -392,10 +393,17 @@ def __direction_str(is_remote): debug(u"IGNR: %s (transfer not needed)" % file) exists_list[file] = src_list[file] del(src_list[file]) + else: + if delay_updates: + ## Remove from source-list, all that is left there will be transferred + ## Add to update-list to transfer last + debug(u"XFER UPDATE: %s" % file) + update_list[file] = src_list[file] + del(src_list[file]) ## Remove from destination-list, all that is left there will be deleted del(dst_list[file]) - return src_list, dst_list, exists_list + return src_list, dst_list, exists_list, update_list # vim:et:ts=4:sts=4:ai diff --git a/s3cmd b/s3cmd index e22b79ec8..74260127d 100755 --- a/s3cmd +++ b/s3cmd @@ -608,9 +608,10 @@ def cmd_sync_remote2remote(args): src_list, exclude_list = filter_exclude_include(src_list) - src_list, dst_list, existing_list = compare_filelists(src_list, dst_list, src_remote = True, dst_remote = True) + src_list, dst_list, existing_list, update_list = compare_filelists(src_list, dst_list, src_remote = True, dst_remote = True, delay_updates = cfg.delay_updates) src_count = len(src_list) + update_count = len(update_list) dst_count = len(dst_list) print(u"Summary: %d source files to copy, %d files at destination to delete" % (src_count, dst_count)) @@ -635,23 +636,29 @@ def cmd_sync_remote2remote(args): if cfg.delete_removed and not cfg.delete_after: _do_deletes(s3, dst_list) + def _upload(src_list, seq, src_count): + file_list = src_list.keys() + file_list.sort() + for file in file_list: + seq += 1 + item = src_list[file] + src_uri = S3Uri(item['object_uri_str']) + dst_uri = S3Uri(item['target_uri']) + seq_label = "[%d of %d]" % (seq, src_count) + extra_headers = copy(cfg.extra_headers) + try: + response = s3.object_copy(src_uri, dst_uri, extra_headers) + output("File %(src)s copied to %(dst)s" % { "src" : src_uri, "dst" : dst_uri }) + except S3Error, e: + error("File %(src)s could not be copied: %(e)s" % { "src" : src_uri, "e" : e }) + return seq + # Perform the synchronization of files timestamp_start = time.time() seq = 0 - file_list = src_list.keys() - file_list.sort() - for file in file_list: - seq += 1 - item = src_list[file] - src_uri = S3Uri(item['object_uri_str']) - dst_uri = S3Uri(item['target_uri']) - seq_label = "[%d of %d]" % (seq, src_count) - extra_headers = copy(cfg.extra_headers) - try: - response = s3.object_copy(src_uri, dst_uri, extra_headers) - output("File %(src)s copied to %(dst)s" % { "src" : src_uri, "dst" : dst_uri }) - except S3Error, e: - error("File %(src)s could not be copied: %(e)s" % { "src" : src_uri, "e" : e }) + seq = _upload(src_list, seq, src_count + update_count) + seq = _upload(update_list, seq, src_count + update_count) + total_elapsed = time.time() - timestamp_start outstr = "Done. Copied %d files in %0.1f seconds, %0.2f files/s" % (seq, total_elapsed, seq/total_elapsed) if seq > 0: @@ -689,27 +696,32 @@ def cmd_sync_remote2local(args): remote_list, exclude_list = filter_exclude_include(remote_list) - remote_list, local_list, existing_list = compare_filelists(remote_list, local_list, src_remote = True, dst_remote = False) + remote_list, local_list, existing_list, update_list = compare_filelists(remote_list, local_list, src_remote = True, dst_remote = False, delay_updates = cfg.delay_updates) local_count = len(local_list) remote_count = len(remote_list) + update_count = len(update_list) - info(u"Summary: %d remote files to download, %d local files to delete" % (remote_count, local_count)) - - if not os.path.isdir(destination_base): - ## We were either given a file name (existing or not) or want STDOUT - if remote_count > 1: - raise ParameterError("Destination must be a directory when downloading multiple sources.") - remote_list[remote_list.keys()[0]]['local_filename'] = deunicodise(destination_base) - else: - if destination_base[-1] != os.path.sep: - destination_base += os.path.sep - for key in remote_list: - local_filename = destination_base + key - if os.path.sep != "/": - local_filename = os.path.sep.join(local_filename.split("/")) - remote_list[key]['local_filename'] = deunicodise(local_filename) + info(u"Summary: %d remote files to download, %d local files to delete" % (remote_count + update_count, local_count)) + def _set_local_filename(remote_list, destination_base): + if not os.path.isdir(destination_base): + ## We were either given a file name (existing or not) or want STDOUT + if len(remote_list) > 1: + raise ParameterError("Destination must be a directory when downloading multiple sources.") + remote_list[remote_list.keys()[0]]['local_filename'] = deunicodise(destination_base) + else: + if destination_base[-1] != os.path.sep: + destination_base += os.path.sep + for key in remote_list: + local_filename = destination_base + key + if os.path.sep != "/": + local_filename = os.path.sep.join(local_filename.split("/")) + remote_list[key]['local_filename'] = deunicodise(local_filename) + + _set_local_filename(remote_list, destination_base) + _set_local_filename(update_list, destination_base) + if cfg.dry_run: for key in exclude_list: output(u"exclude: %s" % unicodise(key)) @@ -718,6 +730,8 @@ def cmd_sync_remote2local(args): output(u"delete: %s" % local_list[key]['full_name_unicode']) for key in remote_list: output(u"download: %s -> %s" % (remote_list[key]['object_uri_str'], remote_list[key]['local_filename'])) + for key in update_list: + output(u"download: %s -> %s" % (update_list[key]['object_uri_str'], update_list[key]['local_filename'])) warning(u"Exitting now because of --dry-run") return @@ -725,85 +739,90 @@ def cmd_sync_remote2local(args): if cfg.delete_removed and not cfg.delete_after: _do_deletes(local_list) - total_size = 0 - total_elapsed = 0.0 - timestamp_start = time.time() - seq = 0 - dir_cache = {} - file_list = remote_list.keys() - file_list.sort() - for file in file_list: - seq += 1 - item = remote_list[file] - uri = S3Uri(item['object_uri_str']) - dst_file = item['local_filename'] - seq_label = "[%d of %d]" % (seq, remote_count) - try: - dst_dir = os.path.dirname(dst_file) - if not dir_cache.has_key(dst_dir): - dir_cache[dst_dir] = Utils.mkdir_with_parents(dst_dir) - if dir_cache[dst_dir] == False: - warning(u"%s: destination directory not writable: %s" % (file, dst_dir)) - continue + def _download(remote_list, seq, total, total_size, dir_cache): + file_list = remote_list.keys() + file_list.sort() + for file in file_list: + seq += 1 + item = remote_list[file] + uri = S3Uri(item['object_uri_str']) + dst_file = item['local_filename'] + seq_label = "[%d of %d]" % (seq, total) try: - open_flags = os.O_CREAT - open_flags |= os.O_TRUNC - # open_flags |= os.O_EXCL - - debug(u"dst_file=%s" % unicodise(dst_file)) - # This will have failed should the file exist - os.close(os.open(dst_file, open_flags)) - # Yeah I know there is a race condition here. Sadly I don't know how to open() in exclusive mode. - dst_stream = open(dst_file, "wb") - response = s3.object_get(uri, dst_stream, extra_label = seq_label) - dst_stream.close() - if response['headers'].has_key('x-amz-meta-s3cmd-attrs') and cfg.preserve_attrs: - attrs = _parse_attrs_header(response['headers']['x-amz-meta-s3cmd-attrs']) - if attrs.has_key('mode'): - os.chmod(dst_file, int(attrs['mode'])) - if attrs.has_key('mtime') or attrs.has_key('atime'): - mtime = attrs.has_key('mtime') and int(attrs['mtime']) or int(time.time()) - atime = attrs.has_key('atime') and int(attrs['atime']) or int(time.time()) - os.utime(dst_file, (atime, mtime)) - ## FIXME: uid/gid / uname/gname handling comes here! TODO - except OSError, e: - try: dst_stream.close() - except: pass - if e.errno == errno.EEXIST: - warning(u"%s exists - not overwriting" % (dst_file)) - continue - if e.errno in (errno.EPERM, errno.EACCES): - warning(u"%s not writable: %s" % (dst_file, e.strerror)) + dst_dir = os.path.dirname(dst_file) + if not dir_cache.has_key(dst_dir): + dir_cache[dst_dir] = Utils.mkdir_with_parents(dst_dir) + if dir_cache[dst_dir] == False: + warning(u"%s: destination directory not writable: %s" % (file, dst_dir)) continue - if e.errno == errno.EISDIR: - warning(u"%s is a directory - skipping over" % dst_file) + try: + open_flags = os.O_CREAT + open_flags |= os.O_TRUNC + # open_flags |= os.O_EXCL + + debug(u"dst_file=%s" % unicodise(dst_file)) + # This will have failed should the file exist + os.close(os.open(dst_file, open_flags)) + # Yeah I know there is a race condition here. Sadly I don't know how to open() in exclusive mode. + dst_stream = open(dst_file, "wb") + response = s3.object_get(uri, dst_stream, extra_label = seq_label) + dst_stream.close() + if response['headers'].has_key('x-amz-meta-s3cmd-attrs') and cfg.preserve_attrs: + attrs = _parse_attrs_header(response['headers']['x-amz-meta-s3cmd-attrs']) + if attrs.has_key('mode'): + os.chmod(dst_file, int(attrs['mode'])) + if attrs.has_key('mtime') or attrs.has_key('atime'): + mtime = attrs.has_key('mtime') and int(attrs['mtime']) or int(time.time()) + atime = attrs.has_key('atime') and int(attrs['atime']) or int(time.time()) + os.utime(dst_file, (atime, mtime)) + ## FIXME: uid/gid / uname/gname handling comes here! TODO + except OSError, e: + try: dst_stream.close() + except: pass + if e.errno == errno.EEXIST: + warning(u"%s exists - not overwriting" % (dst_file)) + continue + if e.errno in (errno.EPERM, errno.EACCES): + warning(u"%s not writable: %s" % (dst_file, e.strerror)) + continue + if e.errno == errno.EISDIR: + warning(u"%s is a directory - skipping over" % dst_file) + continue + raise e + except KeyboardInterrupt: + try: dst_stream.close() + except: pass + warning(u"Exiting after keyboard interrupt") + return + except Exception, e: + try: dst_stream.close() + except: pass + error(u"%s: %s" % (file, e)) continue - raise e - except KeyboardInterrupt: + # We have to keep repeating this call because + # Python 2.4 doesn't support try/except/finally + # construction :-( try: dst_stream.close() except: pass - warning(u"Exiting after keyboard interrupt") - return - except Exception, e: - try: dst_stream.close() - except: pass - error(u"%s: %s" % (file, e)) + except S3DownloadError, e: + error(u"%s: download failed too many times. Skipping that file." % file) continue - # We have to keep repeating this call because - # Python 2.4 doesn't support try/except/finally - # construction :-( - try: dst_stream.close() - except: pass - except S3DownloadError, e: - error(u"%s: download failed too many times. Skipping that file." % file) - continue - speed_fmt = formatSize(response["speed"], human_readable = True, floating_point = True) - if not Config().progress_meter: - output(u"File '%s' stored as '%s' (%d bytes in %0.1f seconds, %0.2f %sB/s) %s" % - (uri, unicodise(dst_file), response["size"], response["elapsed"], speed_fmt[0], speed_fmt[1], - seq_label)) - total_size += response["size"] + speed_fmt = formatSize(response["speed"], human_readable = True, floating_point = True) + if not Config().progress_meter: + output(u"File '%s' stored as '%s' (%d bytes in %0.1f seconds, %0.2f %sB/s) %s" % + (uri, unicodise(dst_file), response["size"], response["elapsed"], speed_fmt[0], speed_fmt[1], + seq_label)) + total_size += response["size"] + return seq, total_size + total_size = 0 + total_elapsed = 0.0 + timestamp_start = time.time() + dir_cache = {} + seq = 0 + seq, total_size = _download(remote_list, seq, remote_count + update_count, total_size, dir_cache) + seq, total_size = _download(update_list, seq, remote_count + update_count, total_size, dir_cache) + total_elapsed = time.time() - timestamp_start speed_fmt = formatSize(total_size/total_elapsed, human_readable = True, floating_point = True) @@ -878,30 +897,37 @@ def cmd_sync_local2remote(args): info(u"Found %d local files, %d remote files" % (local_count, remote_count)) local_list, exclude_list = filter_exclude_include(local_list) - + if single_file_local and len(local_list) == 1 and len(remote_list) == 1: ## Make remote_key same as local_key for comparison if we're dealing with only one file remote_list_entry = remote_list[remote_list.keys()[0]] # Flush remote_list, by the way remote_list = { local_list.keys()[0] : remote_list_entry } - local_list, remote_list, existing_list = compare_filelists(local_list, remote_list, src_remote = False, dst_remote = True) + local_list, remote_list, existing_list, update_list = compare_filelists(local_list, remote_list, src_remote = False, dst_remote = True, delay_updates = cfg.delay_updates) + local_count = len(local_list) + update_count = len(update_list) remote_count = len(remote_list) - info(u"Summary: %d local files to upload, %d remote files to delete" % (local_count, remote_count)) + info(u"Summary: %d local files to upload, %d remote files to delete" % (local_count + update_count, remote_count)) - if local_count > 0: - ## Populate 'remote_uri' only if we've got something to upload - if not destination_base.endswith("/"): - if not single_file_local: - raise ParameterError("Destination S3 URI must end with '/' (ie must refer to a directory on the remote side).") - local_list[local_list.keys()[0]]['remote_uri'] = unicodise(destination_base) - else: - for key in local_list: - local_list[key]['remote_uri'] = unicodise(destination_base + key) + def _set_remote_uri(local_list, destination_base, single_file_local): + if len(local_list) > 0: + ## Populate 'remote_uri' only if we've got something to upload + if not destination_base.endswith("/"): + if not single_file_local: + raise ParameterError("Destination S3 URI must end with '/' (ie must refer to a directory on the remote side).") + local_list[local_list.keys()[0]]['remote_uri'] = unicodise(destination_base) + else: + for key in local_list: + local_list[key]['remote_uri'] = unicodise(destination_base + key) + + _set_remote_uri(local_list, destination_base, single_file_local) + _set_remote_uri(update_list, destination_base, single_file_local) + if cfg.dry_run: for key in exclude_list: output(u"exclude: %s" % unicodise(key)) @@ -910,6 +936,8 @@ def cmd_sync_local2remote(args): output(u"delete: %s" % remote_list[key]['object_uri_str']) for key in local_list: output(u"upload: %s -> %s" % (local_list[key]['full_name_unicode'], local_list[key]['remote_uri'])) + for key in update_list: + output(u"upload: %s -> %s" % (update_list[key]['full_name_unicode'], update_list[key]['remote_uri'])) warning(u"Exitting now because of --dry-run") return @@ -921,36 +949,40 @@ def cmd_sync_local2remote(args): total_size = 0 total_elapsed = 0.0 timestamp_start = time.time() - seq = 0 - file_list = local_list.keys() - file_list.sort() - for file in file_list: - seq += 1 - item = local_list[file] - src = item['full_name'] - uri = S3Uri(item['remote_uri']) - seq_label = "[%d of %d]" % (seq, local_count) - extra_headers = copy(cfg.extra_headers) - try: - if cfg.preserve_attrs: - attr_header = _build_attr_header(src) - debug(u"attr_header: %s" % attr_header) - extra_headers.update(attr_header) - response = s3.object_put(src, uri, extra_headers, extra_label = seq_label) - except InvalidFileError, e: - warning(u"File can not be uploaded: %s" % e) - continue - except S3UploadError, e: - error(u"%s: upload failed too many times. Skipping that file." % item['full_name_unicode']) - continue - speed_fmt = formatSize(response["speed"], human_readable = True, floating_point = True) - if not cfg.progress_meter: - output(u"File '%s' stored as '%s' (%d bytes in %0.1f seconds, %0.2f %sB/s) %s" % - (item['full_name_unicode'], uri, response["size"], response["elapsed"], - speed_fmt[0], speed_fmt[1], seq_label)) - total_size += response["size"] - uploaded_objects_list.append(uri.object()) + def _upload(local_list, seq, total, total_size): + file_list = local_list.keys() + file_list.sort() + for file in file_list: + seq += 1 + item = local_list[file] + src = item['full_name'] + uri = S3Uri(item['remote_uri']) + seq_label = "[%d of %d]" % (seq, total) + extra_headers = copy(cfg.extra_headers) + try: + if cfg.preserve_attrs: + attr_header = _build_attr_header(src) + debug(u"attr_header: %s" % attr_header) + extra_headers.update(attr_header) + response = s3.object_put(src, uri, extra_headers, extra_label = seq_label) + except InvalidFileError, e: + warning(u"File can not be uploaded: %s" % e) + continue + except S3UploadError, e: + error(u"%s: upload failed too many times. Skipping that file." % item['full_name_unicode']) + continue + speed_fmt = formatSize(response["speed"], human_readable = True, floating_point = True) + if not cfg.progress_meter: + output(u"File '%s' stored as '%s' (%d bytes in %0.1f seconds, %0.2f %sB/s) %s" % + (item['full_name_unicode'], uri, response["size"], response["elapsed"], + speed_fmt[0], speed_fmt[1], seq_label)) + total_size += response["size"] + uploaded_objects_list.append(uri.object()) + return seq, total_size + + n, total_size = _upload(local_list, 0, local_count, total_size) + n, total_size = _upload(update_list, n, local_count, total_size) total_elapsed = time.time() - timestamp_start total_speed = total_elapsed and total_size/total_elapsed or 0.0 speed_fmt = formatSize(total_speed, human_readable = True, floating_point = True) @@ -1520,6 +1552,7 @@ def main(): optparser.add_option( "--delete-removed", dest="delete_removed", action="store_true", help="Delete remote objects with no corresponding local file [sync]") optparser.add_option( "--no-delete-removed", dest="delete_removed", action="store_false", help="Don't delete remote objects.") optparser.add_option( "--delete-after", dest="delete_after", action="store_true", help="Perform deletes after new uploads [sync]") + optparser.add_option( "--delay-updates", dest="delay_updates", action="store_true", help="Put all updated files into place at end [sync]") optparser.add_option("-p", "--preserve", dest="preserve_attrs", action="store_true", help="Preserve filesystem attributes (mode, ownership, timestamps). Default for [sync] command.") optparser.add_option( "--no-preserve", dest="preserve_attrs", action="store_false", help="Don't store FS attributes") optparser.add_option( "--exclude", dest="exclude", action="append", metavar="GLOB", help="Filenames and paths matching GLOB will be excluded from sync") diff --git a/s3cmd.1 b/s3cmd.1 index db4302687..8a73a6ab2 100644 --- a/s3cmd.1 +++ b/s3cmd.1 @@ -189,6 +189,9 @@ Don't delete remote objects. \fB\-\-delete\-after\fR Perform deletes after new uploads [sync]. .TP +\fB\-\-delay\-updates\fR +Put all updated files into place at end [sync] +.TP \fB\-p\fR, \fB\-\-preserve\fR Preserve filesystem attributes (mode, ownership, timestamps). Default for [sync] command.