Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Loading…

Parallel destinations #59

Closed
wants to merge 17 commits into from

1 participant

@mdomsch
Owner

(This builds on top of my previous pull requests, specifically the hardlink handling request. This also uses os.fork() and os.wait(), which don't exist on Windows. If Windows compatibility is still necessary, that will need to be checked for and handled accordingly.)

sync: add --add-destination, parallelize uploads to multiple destinations

Only meaningful at present in the sync local->remote(s) case, this
adds the --add-destination <foo> command line option.  For the last
arg (the traditional destination), and each destination specified via
--add-destination, fork and upload after the initial walk of the local
file system has completed (and done all the disk I/O to calculate md5
values for each file).

This keeps us from pounding the file system doing (the same) disk I/O
for each possible destination, and allows full use of our bandwidth to
upload in parallel.
mdomsch added some commits
@mdomsch mdomsch Apply excludes/includes at local os.walk() time 2e4769e
@mdomsch mdomsch add --delete-after option for sync 3b3727d
@mdomsch mdomsch add more --delete-after to sync variations 5ca02bd
@mdomsch mdomsch Merge remote-tracking branch 'origin/master' into merge b40aa2a
@mdomsch mdomsch Merge branch 'delete-after' into merge 598402b
@mdomsch mdomsch add Config.delete_after b62ce58
@mdomsch mdomsch Merge branch 'delete-after' into merge e1fe732
@mdomsch mdomsch fix os.walk() exclusions for new upstream code 1eaad64
@mdomsch mdomsch Merge branch 'master' into merge ad1f8cc
@mdomsch mdomsch add --delay-updates option c42c3f2
@mdomsch mdomsch finish merge 2dfe4a6
@mdomsch mdomsch Handle hardlinks and duplicate files
Minimize uploads in sync local->remote by looking for existing same
files elsewhere in remote destination and do an S3 COPY command
instead of uploading the file again.

We now store the (locally generated) md5 of the file in the
x-amz-meta-s3cmd-attrs metadata, because we can't count on the ETag
being correct due to multipart uploads.  Use this value if it's
available.

This also reduces the number of local stat() calls made by
recording more useful information during the inital
os.walk().  This cuts the number of stat()s in half.
264ef82
@mdomsch mdomsch hardlink/copy fix
If remote doesn't have any copies of the file, we transfer one
instance first, then copy thereafter.  But we were dereferencing the
destination list improperly in this case, causing a crash.  This patch
fixes the crash cleanly.
a6e43c4
@mdomsch mdomsch remote_copy() doesn't need to know dst_list anymore cdf25f9
@mdomsch mdomsch handle remote->local transfers with local hardlink/copy if possible
Reworked some of the hardlink / same file detection code to be a
little more general purpose.  Now it can be used to detect duplicate
files on either remote or local side.

When transferring remote->local, if we already have a copy (same
md5sum) of a file locally that we would otherwise transfer, don't
transfer, but hardlink it.  Should hardlink not be avaialble (e.g. on
Windows), use shutil.copy2() instead.  This lets us avoid the second
download completely.

_get_filelist_local() grew an initial list argument.  This lets us
avoid copying / merging / updating a bunch of different lists back
into one - it starts as one list and grows.  Much cleaner (and the
fact these were separate cost me several hours of debugging to track
down why something would get set, like the by_md5 hash, only to have
it be empty shortly thereafter.
f881b16
@mdomsch mdomsch sync: add --add-destination, parallelize uploads to multiple destinat…
…ions

Only meaningful at present in the sync local->remote(s) case, this
adds the --add-destination <foo> command line option.  For the last
arg (the traditional destination), and each destination specified via
--add-destination, fork and upload after the initial walk of the local
file system has completed (and done all the disk I/O to calculate md5
values for each file).

This keeps us from pounding the file system doing (the same) disk I/O
for each possible destination, and allows full use of our bandwidth to
upload in parallel.
7de0789
@mdomsch mdomsch sync: refactor parent/child and single process code
os.fork() and os.wait() don't exist on Windows, and the
multiprocessing module doesn't exist until python 2.6.  So instead, we
conditionalize calling os.fork() depending on its existance, and on
there being > 1 destination.

Also simply rearranges the code so that subfunctions within
local2remote are defined at the top of their respective functions, for
better readability through the main execution of the function.
0277256
@mdomsch
Owner

This was merged in 1.5.0-alpha1.

@mdomsch mdomsch closed this
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Feb 24, 2012
  1. @mdomsch
Commits on Feb 27, 2012
  1. @mdomsch
  2. @mdomsch
  3. @mdomsch
  4. @mdomsch
  5. @mdomsch

    add Config.delete_after

    mdomsch authored
  6. @mdomsch
  7. @mdomsch
  8. @mdomsch
Commits on Mar 1, 2012
  1. @mdomsch

    add --delay-updates option

    mdomsch authored
  2. @mdomsch

    finish merge

    mdomsch authored
Commits on Jun 16, 2012
  1. @mdomsch

    Handle hardlinks and duplicate files

    mdomsch authored mdomsch committed
    Minimize uploads in sync local->remote by looking for existing same
    files elsewhere in remote destination and do an S3 COPY command
    instead of uploading the file again.
    
    We now store the (locally generated) md5 of the file in the
    x-amz-meta-s3cmd-attrs metadata, because we can't count on the ETag
    being correct due to multipart uploads.  Use this value if it's
    available.
    
    This also reduces the number of local stat() calls made by
    recording more useful information during the inital
    os.walk().  This cuts the number of stat()s in half.
  2. @mdomsch

    hardlink/copy fix

    mdomsch authored mdomsch committed
    If remote doesn't have any copies of the file, we transfer one
    instance first, then copy thereafter.  But we were dereferencing the
    destination list improperly in this case, causing a crash.  This patch
    fixes the crash cleanly.
Commits on Jun 17, 2012
  1. @mdomsch

    remote_copy() doesn't need to know dst_list anymore

    mdomsch authored mdomsch committed
Commits on Jun 18, 2012
  1. @mdomsch

    handle remote->local transfers with local hardlink/copy if possible

    mdomsch authored mdomsch committed
    Reworked some of the hardlink / same file detection code to be a
    little more general purpose.  Now it can be used to detect duplicate
    files on either remote or local side.
    
    When transferring remote->local, if we already have a copy (same
    md5sum) of a file locally that we would otherwise transfer, don't
    transfer, but hardlink it.  Should hardlink not be avaialble (e.g. on
    Windows), use shutil.copy2() instead.  This lets us avoid the second
    download completely.
    
    _get_filelist_local() grew an initial list argument.  This lets us
    avoid copying / merging / updating a bunch of different lists back
    into one - it starts as one list and grows.  Much cleaner (and the
    fact these were separate cost me several hours of debugging to track
    down why something would get set, like the by_md5 hash, only to have
    it be empty shortly thereafter.
  2. @mdomsch

    sync: add --add-destination, parallelize uploads to multiple destinat…

    mdomsch authored mdomsch committed
    …ions
    
    Only meaningful at present in the sync local->remote(s) case, this
    adds the --add-destination <foo> command line option.  For the last
    arg (the traditional destination), and each destination specified via
    --add-destination, fork and upload after the initial walk of the local
    file system has completed (and done all the disk I/O to calculate md5
    values for each file).
    
    This keeps us from pounding the file system doing (the same) disk I/O
    for each possible destination, and allows full use of our bandwidth to
    upload in parallel.
Commits on Jun 20, 2012
  1. @mdomsch

    sync: refactor parent/child and single process code

    mdomsch authored mdomsch committed
    os.fork() and os.wait() don't exist on Windows, and the
    multiprocessing module doesn't exist until python 2.6.  So instead, we
    conditionalize calling os.fork() depending on its existance, and on
    there being > 1 destination.
    
    Also simply rearranges the code so that subfunctions within
    local2remote are defined at the top of their respective functions, for
    better readability through the main execution of the function.
This page is out of date. Refresh to see the latest.
Showing with 626 additions and 306 deletions.
  1. +4 −0 S3/Config.py
  2. +200 −70 S3/FileLists.py
  3. +43 −0 S3/SortedDict.py
  4. +374 −236 s3cmd
  5. +5 −0 s3cmd.1
View
4 S3/Config.py
@@ -50,10 +50,13 @@ class Config(object):
'mtime', # Modification timestamp
'ctime', # Creation timestamp
'mode', # File mode (e.g. rwxr-xr-x = 755)
+ 'md5', # File MD5 (if known)
#'acl', # Full ACL (not yet supported)
]
delete_removed = False
+ delete_after = False
_doc['delete_removed'] = "[sync] Remove remote S3 objects when local file has been deleted"
+ delay_updates = False
gpg_passphrase = ""
gpg_command = ""
gpg_encrypt = "%(gpg_command)s -c --verbose --no-use-agent --batch --yes --passphrase-fd %(passphrase_fd)s -o %(output_file)s %(input_file)s"
@@ -83,6 +86,7 @@ class Config(object):
website_index = "index.html"
website_error = ""
website_endpoint = "http://%(bucket)s.s3-website-%(location)s.amazonaws.com/"
+ additional_destinations = []
## Creating a singleton
def __new__(self, configfile = None):
View
270 S3/FileLists.py
@@ -14,8 +14,9 @@
import os
import glob
+import copy
-__all__ = ["fetch_local_list", "fetch_remote_list", "compare_filelists", "filter_exclude_include"]
+__all__ = ["fetch_local_list", "fetch_remote_list", "compare_filelists", "filter_exclude_include", "parse_attrs_header"]
def _fswalk_follow_symlinks(path):
'''
@@ -26,13 +27,15 @@ def _fswalk_follow_symlinks(path):
assert os.path.isdir(path) # only designed for directory argument
walkdirs = [path]
for dirpath, dirnames, filenames in os.walk(path):
+ handle_exclude_include_walk(dirpath, dirnames, [])
for dirname in dirnames:
current = os.path.join(dirpath, dirname)
if os.path.islink(current):
walkdirs.append(current)
for walkdir in walkdirs:
- for value in os.walk(walkdir):
- yield value
+ for dirpath, dirnames, filenames in os.walk(walkdir):
+ handle_exclude_include_walk(dirpath, dirnames, [])
+ yield (dirpath, dirnames, filenames)
def _fswalk(path, follow_symlinks):
'''
@@ -43,8 +46,10 @@ def _fswalk(path, follow_symlinks):
follow_symlinks (bool) indicates whether to descend into symbolically linked directories
'''
if follow_symlinks:
- return _fswalk_follow_symlinks(path)
- return os.walk(path)
+ yield _fswalk_follow_symlinks(path)
+ for dirpath, dirnames, filenames in os.walk(path):
+ handle_exclude_include_walk(dirpath, dirnames, filenames)
+ yield (dirpath, dirnames, filenames)
def filter_exclude_include(src_list):
info(u"Applying --exclude/--include")
@@ -75,8 +80,64 @@ def filter_exclude_include(src_list):
debug(u"PASS: %s" % (file))
return src_list, exclude_list
+def handle_exclude_include_walk(root, dirs, files):
+ cfg = Config()
+ copydirs = copy.copy(dirs)
+ copyfiles = copy.copy(files)
+
+ # exclude dir matches in the current directory
+ # this prevents us from recursing down trees we know we want to ignore
+ for x in copydirs:
+ d = os.path.join(root, x, '')
+ debug(u"CHECK: %s" % d)
+ excluded = False
+ for r in cfg.exclude:
+ if r.search(d):
+ excluded = True
+ debug(u"EXCL-MATCH: '%s'" % (cfg.debug_exclude[r]))
+ break
+ if excluded:
+ ## No need to check for --include if not excluded
+ for r in cfg.include:
+ if r.search(d):
+ excluded = False
+ debug(u"INCL-MATCH: '%s'" % (cfg.debug_include[r]))
+ break
+ if excluded:
+ ## Still excluded - ok, action it
+ debug(u"EXCLUDE: %s" % d)
+ dirs.remove(x)
+ continue
+ else:
+ debug(u"PASS: %s" % (d))
+
+ # exclude file matches in the current directory
+ for x in copyfiles:
+ file = os.path.join(root, x)
+ debug(u"CHECK: %s" % file)
+ excluded = False
+ for r in cfg.exclude:
+ if r.search(file):
+ excluded = True
+ debug(u"EXCL-MATCH: '%s'" % (cfg.debug_exclude[r]))
+ break
+ if excluded:
+ ## No need to check for --include if not excluded
+ for r in cfg.include:
+ if r.search(file):
+ excluded = False
+ debug(u"INCL-MATCH: '%s'" % (cfg.debug_include[r]))
+ break
+ if excluded:
+ ## Still excluded - ok, action it
+ debug(u"EXCLUDE: %s" % file)
+ files.remove(x)
+ continue
+ else:
+ debug(u"PASS: %s" % (file))
+
def fetch_local_list(args, recursive = None):
- def _get_filelist_local(local_uri):
+ def _get_filelist_local(loc_list, local_uri):
info(u"Compiling list of local files...")
if local_uri.isdir():
local_base = deunicodise(local_uri.basename())
@@ -88,7 +149,6 @@ def _get_filelist_local(local_uri):
local_path = deunicodise(local_uri.dirname())
filelist = [( local_path, [], [deunicodise(local_uri.basename())] )]
single_file = True
- loc_list = SortedDict(ignore_case = False)
for root, dirs, files in filelist:
rel_root = root.replace(local_path, local_base, 1)
for f in files:
@@ -112,8 +172,16 @@ def _get_filelist_local(local_uri):
'full_name' : full_name,
'size' : sr.st_size,
'mtime' : sr.st_mtime,
+ 'dev' : sr.st_dev,
+ 'inode' : sr.st_ino,
+ 'uid' : sr.st_uid,
+ 'gid' : sr.st_gid,
+ 'sr': sr # save it all, may need it in preserve_attrs_list
## TODO: Possibly more to save here...
}
+ if 'md5' in cfg.sync_checks:
+ md5 = loc_list.get_md5(relative_file)
+ loc_list.record_hardlink(relative_file, sr.st_dev, sr.st_ino, md5)
return loc_list, single_file
cfg = Config()
@@ -136,8 +204,7 @@ def _get_filelist_local(local_uri):
local_uris.append(uri)
for uri in local_uris:
- list_for_uri, single_file = _get_filelist_local(uri)
- local_list.update(list_for_uri)
+ list_for_uri, single_file = _get_filelist_local(local_list, uri)
## Single file is True if and only if the user
## specified one local URI and that URI represents
@@ -196,7 +263,11 @@ def _get_filelist_remote(remote_uri, recursive = True):
'object_key' : object['Key'],
'object_uri_str' : object_uri_str,
'base_uri' : remote_uri,
+ 'dev' : None,
+ 'inode' : None,
}
+ md5 = object['ETag'][1:-1]
+ rem_list.record_md5(key, md5)
if break_now:
break
return rem_list
@@ -222,6 +293,7 @@ def _get_filelist_remote(remote_uri, recursive = True):
objectlist = _get_filelist_remote(uri)
for key in objectlist:
remote_list[key] = objectlist[key]
+ remote_list.record_md5(key, objectlist.get_md5(key))
else:
for uri in remote_uris:
uri_str = str(uri)
@@ -259,82 +331,140 @@ def _get_filelist_remote(remote_uri, recursive = True):
'md5': response['headers']['etag'].strip('"\''),
'timestamp' : dateRFC822toUnix(response['headers']['date'])
})
+ # get md5 from header if it's present. We would have set that during upload
+ if response['headers'].has_key('x-amz-meta-s3cmd-attrs'):
+ attrs = parse_attrs_header(response['headers']['x-amz-meta-s3cmd-attrs'])
+ if attrs.has_key('md5'):
+ remote_item.update({'md5': attrs['md5']})
+
remote_list[key] = remote_item
return remote_list
-def compare_filelists(src_list, dst_list, src_remote, dst_remote):
+def parse_attrs_header(attrs_header):
+ attrs = {}
+ for attr in attrs_header.split("/"):
+ key, val = attr.split(":")
+ attrs[key] = val
+ return attrs
+
+
+def compare_filelists(src_list, dst_list, src_remote, dst_remote, delay_updates = False):
def __direction_str(is_remote):
return is_remote and "remote" or "local"
- # We don't support local->local sync, use 'rsync' or something like that instead ;-)
+ def _compare(src_list, dst_lst, src_remote, dst_remote, file):
+ """Return True if src_list[file] matches dst_list[file], else False"""
+ attribs_match = True
+ if not (src_list.has_key(file) and dst_list.has_key(file)):
+ info(u"file does not exist in one side or the other: src_list=%s, dst_list=%s" % (src_list.has_key(file), dst_list.has_key(file)))
+ return False
+
+ ## check size first
+ if 'size' in cfg.sync_checks and dst_list[file]['size'] != src_list[file]['size']:
+ debug(u"xfer: %s (size mismatch: src=%s dst=%s)" % (file, src_list[file]['size'], dst_list[file]['size']))
+ attribs_match = False
+
+ ## check md5
+ compare_md5 = 'md5' in cfg.sync_checks
+ # Multipart-uploaded files don't have a valid md5 sum - it ends with "...-nn"
+ if compare_md5:
+ if (src_remote == True and src_list[file]['md5'].find("-") >= 0) or (dst_remote == True and dst_list[file]['md5'].find("-") >= 0):
+ compare_md5 = False
+ info(u"disabled md5 check for %s" % file)
+ if attribs_match and compare_md5:
+ try:
+ src_md5 = src_list.get_md5(file)
+ dst_md5 = dst_list.get_md5(file)
+ except (IOError,OSError), e:
+ # md5 sum verification failed - ignore that file altogether
+ debug(u"IGNR: %s (disappeared)" % (file))
+ warning(u"%s: file disappeared, ignoring." % (file))
+ raise
+
+ if src_md5 != dst_md5:
+ ## checksums are different.
+ attribs_match = False
+ debug(u"XFER: %s (md5 mismatch: src=%s dst=%s)" % (file, src_md5, dst_md5))
+
+ return attribs_match
+
+ # we don't support local->local sync, use 'rsync' or something like that instead ;-)
assert(not(src_remote == False and dst_remote == False))
info(u"Verifying attributes...")
cfg = Config()
- exists_list = SortedDict(ignore_case = False)
+ ## Items left on src_list will be transferred
+ ## Items left on update_list will be transferred after src_list
+ ## Items left on copy_pairs will be copied from dst1 to dst2
+ update_list = SortedDict(ignore_case = False)
+ ## Items left on dst_list will be deleted
+ copy_pairs = []
+
debug("Comparing filelists (direction: %s -> %s)" % (__direction_str(src_remote), __direction_str(dst_remote)))
- debug("src_list.keys: %s" % src_list.keys())
- debug("dst_list.keys: %s" % dst_list.keys())
- for file in src_list.keys():
- debug(u"CHECK: %s" % file)
- if dst_list.has_key(file):
+ for relative_file in src_list.keys():
+ debug(u"CHECK: %s" % (relative_file))
+
+ if dst_list.has_key(relative_file):
## Was --skip-existing requested?
- if cfg.skip_existing:
- debug(u"IGNR: %s (used --skip-existing)" % (file))
- exists_list[file] = src_list[file]
- del(src_list[file])
- ## Remove from destination-list, all that is left there will be deleted
- del(dst_list[file])
+ if cfg.skip_existing:
+ debug(u"IGNR: %s (used --skip-existing)" % (relative_file))
+ del(src_list[relative_file])
+ del(dst_list[relative_file])
+ continue
+
+ try:
+ compare_result = _compare(src_list, dst_list, src_remote, dst_remote, relative_file)
+ except (IOError,OSError), e:
+ del(src_list[relative_file])
+ del(dst_list[relative_file])
continue
- attribs_match = True
- ## Check size first
- if 'size' in cfg.sync_checks and dst_list[file]['size'] != src_list[file]['size']:
- debug(u"XFER: %s (size mismatch: src=%s dst=%s)" % (file, src_list[file]['size'], dst_list[file]['size']))
- attribs_match = False
-
- ## Check MD5
- compare_md5 = 'md5' in cfg.sync_checks
- # Multipart-uploaded files don't have a valid MD5 sum - it ends with "...-NN"
- if compare_md5:
- if (src_remote == True and src_list[file]['md5'].find("-") >= 0) or (dst_remote == True and dst_list[file]['md5'].find("-") >= 0):
- compare_md5 = False
- info(u"Disabled MD5 check for %s" % file)
- if attribs_match and compare_md5:
- try:
- if src_remote == False and dst_remote == True:
- src_md5 = hash_file_md5(src_list[file]['full_name'])
- dst_md5 = dst_list[file]['md5']
- elif src_remote == True and dst_remote == False:
- src_md5 = src_list[file]['md5']
- dst_md5 = hash_file_md5(dst_list[file]['full_name'])
- elif src_remote == True and dst_remote == True:
- src_md5 = src_list[file]['md5']
- dst_md5 = dst_list[file]['md5']
- except (IOError,OSError), e:
- # MD5 sum verification failed - ignore that file altogether
- debug(u"IGNR: %s (disappeared)" % (file))
- warning(u"%s: file disappeared, ignoring." % (file))
- del(src_list[file])
- del(dst_list[file])
- continue
-
- if src_md5 != dst_md5:
- ## Checksums are different.
- attribs_match = False
- debug(u"XFER: %s (md5 mismatch: src=%s dst=%s)" % (file, src_md5, dst_md5))
-
- if attribs_match:
- ## Remove from source-list, all that is left there will be transferred
- debug(u"IGNR: %s (transfer not needed)" % file)
- exists_list[file] = src_list[file]
- del(src_list[file])
-
- ## Remove from destination-list, all that is left there will be deleted
- del(dst_list[file])
-
- return src_list, dst_list, exists_list
+ if compare_result:
+ debug(u"IGNR: %s (transfer not needed)" % relative_file)
+ del(src_list[relative_file])
+ del(dst_list[relative_file])
+
+ else:
+ # look for matching file in src
+ md5 = src_list.get_md5(relative_file)
+ if md5 is not None and dst_list.by_md5.has_key(md5):
+ # Found one, we want to copy
+ dst1 = list(dst_list.by_md5[md5])[0]
+ debug(u"REMOTE COPY src: %s -> %s" % (dst1, relative_file))
+ copy_pairs.append((dst1, relative_file))
+ del(src_list[relative_file])
+ del(dst_list[relative_file])
+ else:
+ # record that we will get this file transferred to us (before all the copies), so if we come across it later again,
+ # we can copy from _this_ copy (e.g. we only upload it once, and copy thereafter).
+ dst_list.record_md5(relative_file, md5)
+ update_list[relative_file] = src_list[relative_file]
+ del src_list[relative_file]
+ del dst_list[relative_file]
+
+ else:
+ # dst doesn't have this file
+ # look for matching file elsewhere in dst
+ md5 = src_list.get_md5(relative_file)
+ dst1 = dst_list.find_md5_one(md5)
+ if dst1 is not None:
+ # Found one, we want to copy
+ debug(u"REMOTE COPY dst: %s -> %s" % (dst1, relative_file))
+ copy_pairs.append((dst1, relative_file))
+ del(src_list[relative_file])
+ else:
+ # we don't have this file, and we don't have a copy of this file elsewhere. Get it.
+ # record that we will get this file transferred to us (before all the copies), so if we come across it later again,
+ # we can copy from _this_ copy (e.g. we only upload it once, and copy thereafter).
+ dst_list.record_md5(relative_file, md5)
+
+ for f in dst_list.keys():
+ if not src_list.has_key(f) and not update_list.has_key(f):
+ # leave only those not on src_list + update_list
+ del dst_list[f]
+
+ return src_list, dst_list, update_list, copy_pairs
# vim:et:ts=4:sts=4:ai
View
43 S3/SortedDict.py
@@ -4,6 +4,7 @@
## License: GPL Version 2
from BidirMap import BidirMap
+import Utils
class SortedDictIterator(object):
def __init__(self, sorted_dict, keys):
@@ -26,6 +27,8 @@ def __init__(self, mapping = {}, ignore_case = True, **kwargs):
"""
dict.__init__(self, mapping, **kwargs)
self.ignore_case = ignore_case
+ self.hardlinks = dict() # { dev: { inode : {'md5':, 'relative_files':}}}
+ self.by_md5 = dict() # {md5: set(relative_files)}
def keys(self):
keys = dict.keys(self)
@@ -45,6 +48,46 @@ def keys(self):
def __iter__(self):
return SortedDictIterator(self, self.keys())
+
+ def record_md5(self, relative_file, md5):
+ if md5 not in self.by_md5:
+ self.by_md5[md5] = set()
+ self.by_md5[md5].add(relative_file)
+
+ def find_md5_one(self, md5):
+ try:
+ return list(self.by_md5.get(md5, set()))[0]
+ except:
+ return None
+
+ def get_md5(self, relative_file):
+ md5 = None
+ if 'md5' in self[relative_file]:
+ return self[relative_file]['md5']
+ md5 = self.get_hardlink_md5(relative_file)
+ if md5 is None:
+ md5 = Utils.hash_file_md5(self[relative_file]['full_name'])
+ self.record_md5(relative_file, md5)
+ self[relative_file]['md5'] = md5
+ return md5
+
+ def record_hardlink(self, relative_file, dev, inode, md5):
+ if dev not in self.hardlinks:
+ self.hardlinks[dev] = dict()
+ if inode not in self.hardlinks[dev]:
+ self.hardlinks[dev][inode] = dict(md5=md5, relative_files=set())
+ self.hardlinks[dev][inode]['relative_files'].add(relative_file)
+
+ def get_hardlink_md5(self, relative_file):
+ md5 = None
+ dev = self[relative_file]['dev']
+ inode = self[relative_file]['inode']
+ try:
+ md5 = self.hardlinks[dev][inode]['md5']
+ except:
+ pass
+ return md5
+
if __name__ == "__main__":
d = { 'AWS' : 1, 'Action' : 2, 'america' : 3, 'Auckland' : 4, 'America' : 5 }
sd = SortedDict(d)
View
610 s3cmd
@@ -23,6 +23,7 @@ import locale
import subprocess
import htmlentitydefs
import socket
+import shutil
from copy import copy
from optparse import OptionParser, Option, OptionValueError, IndentedHelpFormatter
@@ -582,6 +583,17 @@ def cmd_info(args):
raise
def cmd_sync_remote2remote(args):
+ def _do_deletes(s3, dst_list):
+ # Delete items in destination that are not in source
+ if cfg.dry_run:
+ for key in dst_list:
+ output(u"delete: %s" % dst_list[key]['object_uri_str'])
+ else:
+ for key in dst_list:
+ uri = S3Uri(dst_list[key]['object_uri_str'])
+ s3.object_delete(uri)
+ output(u"deleted: '%s'" % uri)
+
s3 = S3(Config())
# Normalise s3://uri (e.g. assert trailing slash)
@@ -597,9 +609,10 @@ def cmd_sync_remote2remote(args):
src_list, exclude_list = filter_exclude_include(src_list)
- src_list, dst_list, existing_list = compare_filelists(src_list, dst_list, src_remote = True, dst_remote = True)
+ src_list, dst_list, update_list, copy_pairs = compare_filelists(src_list, dst_list, src_remote = True, dst_remote = True, delay_updates = cfg.delay_updates)
src_count = len(src_list)
+ update_count = len(update_list)
dst_count = len(dst_list)
print(u"Summary: %d source files to copy, %d files at destination to delete" % (src_count, dst_count))
@@ -620,34 +633,39 @@ def cmd_sync_remote2remote(args):
warning(u"Exitting now because of --dry-run")
return
+ # if there are copy pairs, we can't do delete_before, on the chance
+ # we need one of the to-be-deleted files as a copy source.
+ if len(copy_pairs) > 0:
+ cfg.delete_after = True
+
# Delete items in destination that are not in source
- if cfg.delete_removed:
- if cfg.dry_run:
- for key in dst_list:
- output(u"delete: %s" % dst_list[key]['object_uri_str'])
- else:
- for key in dst_list:
- uri = S3Uri(dst_list[key]['object_uri_str'])
- s3.object_delete(uri)
- output(u"deleted: '%s'" % uri)
+ if cfg.delete_removed and not cfg.delete_after:
+ _do_deletes(s3, dst_list)
+
+ def _upload(src_list, seq, src_count):
+ file_list = src_list.keys()
+ file_list.sort()
+ for file in file_list:
+ seq += 1
+ item = src_list[file]
+ src_uri = S3Uri(item['object_uri_str'])
+ dst_uri = S3Uri(item['target_uri'])
+ seq_label = "[%d of %d]" % (seq, src_count)
+ extra_headers = copy(cfg.extra_headers)
+ try:
+ response = s3.object_copy(src_uri, dst_uri, extra_headers)
+ output("File %(src)s copied to %(dst)s" % { "src" : src_uri, "dst" : dst_uri })
+ except S3Error, e:
+ error("File %(src)s could not be copied: %(e)s" % { "src" : src_uri, "e" : e })
+ return seq
# Perform the synchronization of files
timestamp_start = time.time()
seq = 0
- file_list = src_list.keys()
- file_list.sort()
- for file in file_list:
- seq += 1
- item = src_list[file]
- src_uri = S3Uri(item['object_uri_str'])
- dst_uri = S3Uri(item['target_uri'])
- seq_label = "[%d of %d]" % (seq, src_count)
- extra_headers = copy(cfg.extra_headers)
- try:
- response = s3.object_copy(src_uri, dst_uri, extra_headers)
- output("File %(src)s copied to %(dst)s" % { "src" : src_uri, "dst" : dst_uri })
- except S3Error, e:
- error("File %(src)s could not be copied: %(e)s" % { "src" : src_uri, "e" : e })
+ seq = _upload(src_list, seq, src_count + update_count)
+ seq = _upload(update_list, seq, src_count + update_count)
+ n_copied, bytes_saved = remote_copy(s3, copy_pairs, destination_base)
+
total_elapsed = time.time() - timestamp_start
outstr = "Done. Copied %d files in %0.1f seconds, %0.2f files/s" % (seq, total_elapsed, seq/total_elapsed)
if seq > 0:
@@ -655,13 +673,15 @@ def cmd_sync_remote2remote(args):
else:
info(outstr)
+ # Delete items in destination that are not in source
+ if cfg.delete_removed and cfg.delete_after:
+ _do_deletes(s3, dst_list)
+
def cmd_sync_remote2local(args):
- def _parse_attrs_header(attrs_header):
- attrs = {}
- for attr in attrs_header.split("/"):
- key, val = attr.split(":")
- attrs[key] = val
- return attrs
+ def _do_deletes(local_list):
+ for key in local_list:
+ os.unlink(local_list[key]['full_name'])
+ output(u"deleted: %s" % local_list[key]['full_name_unicode'])
s3 = S3(Config())
@@ -676,27 +696,33 @@ def cmd_sync_remote2local(args):
remote_list, exclude_list = filter_exclude_include(remote_list)
- remote_list, local_list, existing_list = compare_filelists(remote_list, local_list, src_remote = True, dst_remote = False)
+ remote_list, local_list, update_list, copy_pairs = compare_filelists(remote_list, local_list, src_remote = True, dst_remote = False, delay_updates = cfg.delay_updates)
local_count = len(local_list)
remote_count = len(remote_list)
+ update_count = len(update_list)
+ copy_pairs_count = len(copy_pairs)
- info(u"Summary: %d remote files to download, %d local files to delete" % (remote_count, local_count))
-
- if not os.path.isdir(destination_base):
- ## We were either given a file name (existing or not) or want STDOUT
- if remote_count > 1:
- raise ParameterError("Destination must be a directory when downloading multiple sources.")
- remote_list[remote_list.keys()[0]]['local_filename'] = deunicodise(destination_base)
- else:
- if destination_base[-1] != os.path.sep:
- destination_base += os.path.sep
- for key in remote_list:
- local_filename = destination_base + key
- if os.path.sep != "/":
- local_filename = os.path.sep.join(local_filename.split("/"))
- remote_list[key]['local_filename'] = deunicodise(local_filename)
+ info(u"Summary: %d remote files to download, %d local files to delete, %d local files to hardlink" % (remote_count + update_count, local_count, copy_pairs_count))
+ def _set_local_filename(remote_list, destination_base):
+ if not os.path.isdir(destination_base):
+ ## We were either given a file name (existing or not) or want STDOUT
+ if len(remote_list) > 1:
+ raise ParameterError("Destination must be a directory when downloading multiple sources.")
+ remote_list[remote_list.keys()[0]]['local_filename'] = deunicodise(destination_base)
+ else:
+ if destination_base[-1] != os.path.sep:
+ destination_base += os.path.sep
+ for key in remote_list:
+ local_filename = destination_base + key
+ if os.path.sep != "/":
+ local_filename = os.path.sep.join(local_filename.split("/"))
+ remote_list[key]['local_filename'] = deunicodise(local_filename)
+
+ _set_local_filename(remote_list, destination_base)
+ _set_local_filename(update_list, destination_base)
+
if cfg.dry_run:
for key in exclude_list:
output(u"exclude: %s" % unicodise(key))
@@ -705,94 +731,105 @@ def cmd_sync_remote2local(args):
output(u"delete: %s" % local_list[key]['full_name_unicode'])
for key in remote_list:
output(u"download: %s -> %s" % (remote_list[key]['object_uri_str'], remote_list[key]['local_filename']))
+ for key in update_list:
+ output(u"download: %s -> %s" % (update_list[key]['object_uri_str'], update_list[key]['local_filename']))
warning(u"Exitting now because of --dry-run")
return
- if cfg.delete_removed:
- for key in local_list:
- os.unlink(local_list[key]['full_name'])
- output(u"deleted: %s" % local_list[key]['full_name_unicode'])
-
- total_size = 0
- total_elapsed = 0.0
- timestamp_start = time.time()
- seq = 0
- dir_cache = {}
- file_list = remote_list.keys()
- file_list.sort()
- for file in file_list:
- seq += 1
- item = remote_list[file]
- uri = S3Uri(item['object_uri_str'])
- dst_file = item['local_filename']
- seq_label = "[%d of %d]" % (seq, remote_count)
- try:
- dst_dir = os.path.dirname(dst_file)
- if not dir_cache.has_key(dst_dir):
- dir_cache[dst_dir] = Utils.mkdir_with_parents(dst_dir)
- if dir_cache[dst_dir] == False:
- warning(u"%s: destination directory not writable: %s" % (file, dst_dir))
- continue
+ # if there are copy pairs, we can't do delete_before, on the chance
+ # we need one of the to-be-deleted files as a copy source.
+ if len(copy_pairs) > 0:
+ cfg.delete_after = True
+
+ if cfg.delete_removed and not cfg.delete_after:
+ _do_deletes(local_list)
+
+ def _download(remote_list, seq, total, total_size, dir_cache):
+ file_list = remote_list.keys()
+ file_list.sort()
+ for file in file_list:
+ seq += 1
+ item = remote_list[file]
+ uri = S3Uri(item['object_uri_str'])
+ dst_file = item['local_filename']
+ seq_label = "[%d of %d]" % (seq, total)
try:
- open_flags = os.O_CREAT
- open_flags |= os.O_TRUNC
- # open_flags |= os.O_EXCL
-
- debug(u"dst_file=%s" % unicodise(dst_file))
- # This will have failed should the file exist
- os.close(os.open(dst_file, open_flags))
- # Yeah I know there is a race condition here. Sadly I don't know how to open() in exclusive mode.
- dst_stream = open(dst_file, "wb")
- response = s3.object_get(uri, dst_stream, extra_label = seq_label)
- dst_stream.close()
- if response['headers'].has_key('x-amz-meta-s3cmd-attrs') and cfg.preserve_attrs:
- attrs = _parse_attrs_header(response['headers']['x-amz-meta-s3cmd-attrs'])
- if attrs.has_key('mode'):
- os.chmod(dst_file, int(attrs['mode']))
- if attrs.has_key('mtime') or attrs.has_key('atime'):
- mtime = attrs.has_key('mtime') and int(attrs['mtime']) or int(time.time())
- atime = attrs.has_key('atime') and int(attrs['atime']) or int(time.time())
- os.utime(dst_file, (atime, mtime))
- ## FIXME: uid/gid / uname/gname handling comes here! TODO
- except OSError, e:
- try: dst_stream.close()
- except: pass
- if e.errno == errno.EEXIST:
- warning(u"%s exists - not overwriting" % (dst_file))
- continue
- if e.errno in (errno.EPERM, errno.EACCES):
- warning(u"%s not writable: %s" % (dst_file, e.strerror))
+ dst_dir = os.path.dirname(dst_file)
+ if not dir_cache.has_key(dst_dir):
+ dir_cache[dst_dir] = Utils.mkdir_with_parents(dst_dir)
+ if dir_cache[dst_dir] == False:
+ warning(u"%s: destination directory not writable: %s" % (file, dst_dir))
continue
- if e.errno == errno.EISDIR:
- warning(u"%s is a directory - skipping over" % dst_file)
+ try:
+ open_flags = os.O_CREAT
+ open_flags |= os.O_TRUNC
+ # open_flags |= os.O_EXCL
+
+ debug(u"dst_file=%s" % unicodise(dst_file))
+ # This will have failed should the file exist
+ os.close(os.open(dst_file, open_flags))
+ # Yeah I know there is a race condition here. Sadly I don't know how to open() in exclusive mode.
+ dst_stream = open(dst_file, "wb")
+ response = s3.object_get(uri, dst_stream, extra_label = seq_label)
+ dst_stream.close()
+ if response['headers'].has_key('x-amz-meta-s3cmd-attrs') and cfg.preserve_attrs:
+ attrs = parse_attrs_header(response['headers']['x-amz-meta-s3cmd-attrs'])
+ if attrs.has_key('mode'):
+ os.chmod(dst_file, int(attrs['mode']))
+ if attrs.has_key('mtime') or attrs.has_key('atime'):
+ mtime = attrs.has_key('mtime') and int(attrs['mtime']) or int(time.time())
+ atime = attrs.has_key('atime') and int(attrs['atime']) or int(time.time())
+ os.utime(dst_file, (atime, mtime))
+ ## FIXME: uid/gid / uname/gname handling comes here! TODO
+ except OSError, e:
+ try: dst_stream.close()
+ except: pass
+ if e.errno == errno.EEXIST:
+ warning(u"%s exists - not overwriting" % (dst_file))
+ continue
+ if e.errno in (errno.EPERM, errno.EACCES):
+ warning(u"%s not writable: %s" % (dst_file, e.strerror))
+ continue
+ if e.errno == errno.EISDIR:
+ warning(u"%s is a directory - skipping over" % dst_file)
+ continue
+ raise e
+ except KeyboardInterrupt:
+ try: dst_stream.close()
+ except: pass
+ warning(u"Exiting after keyboard interrupt")
+ return
+ except Exception, e:
+ try: dst_stream.close()
+ except: pass
+ error(u"%s: %s" % (file, e))
continue
- raise e
- except KeyboardInterrupt:
+ # We have to keep repeating this call because
+ # Python 2.4 doesn't support try/except/finally
+ # construction :-(
try: dst_stream.close()
except: pass
- warning(u"Exiting after keyboard interrupt")
- return
- except Exception, e:
- try: dst_stream.close()
- except: pass
- error(u"%s: %s" % (file, e))
+ except S3DownloadError, e:
+ error(u"%s: download failed too many times. Skipping that file." % file)
continue
- # We have to keep repeating this call because
- # Python 2.4 doesn't support try/except/finally
- # construction :-(
- try: dst_stream.close()
- except: pass
- except S3DownloadError, e:
- error(u"%s: download failed too many times. Skipping that file." % file)
- continue
- speed_fmt = formatSize(response["speed"], human_readable = True, floating_point = True)
- if not Config().progress_meter:
- output(u"File '%s' stored as '%s' (%d bytes in %0.1f seconds, %0.2f %sB/s) %s" %
- (uri, unicodise(dst_file), response["size"], response["elapsed"], speed_fmt[0], speed_fmt[1],
- seq_label))
- total_size += response["size"]
+ speed_fmt = formatSize(response["speed"], human_readable = True, floating_point = True)
+ if not Config().progress_meter:
+ output(u"File '%s' stored as '%s' (%d bytes in %0.1f seconds, %0.2f %sB/s) %s" %
+ (uri, unicodise(dst_file), response["size"], response["elapsed"], speed_fmt[0], speed_fmt[1],
+ seq_label))
+ total_size += response["size"]
+ return seq, total_size
+ total_size = 0
+ total_elapsed = 0.0
+ timestamp_start = time.time()
+ dir_cache = {}
+ seq = 0
+ seq, total_size = _download(remote_list, seq, remote_count + update_count, total_size, dir_cache)
+ seq, total_size = _download(update_list, seq, remote_count + update_count, total_size, dir_cache)
+ local_hardlink(copy_pairs, destination_base)
+
total_elapsed = time.time() - timestamp_start
speed_fmt = formatSize(total_size/total_elapsed, human_readable = True, floating_point = True)
@@ -804,158 +841,253 @@ def cmd_sync_remote2local(args):
else:
info(outstr)
+ if cfg.delete_removed and cfg.delete_after:
+ _do_deletes(local_list)
+
+def local_hardlink(copy_pairs, destination_base):
+ for (dst1, dst2) in copy_pairs:
+ try:
+ os.link(destination_base + dst1, destination_base + dst2)
+ debug(u"Hardlinking %s to %s" % (destination_base + dst1, destination_base + dst2))
+ except:
+ shutil.copy2(destination_base + dst1, destination_base + dst2)
+ debug(u"Hardlinking unavailable, copying %s to %s" % (destination_base + dst1, destination_base + dst2))
+
+def remote_copy(s3, copy_pairs, destination_base):
+ saved_bytes = 0
+ for (dst1, dst2) in copy_pairs:
+ debug(u"Remote Copying from %s to %s" % (dst1, dst2))
+ dst1_uri = S3Uri(destination_base + dst1)
+ dst2_uri = S3Uri(destination_base + dst2)
+ extra_headers = copy(cfg.extra_headers)
+ try:
+ s3.object_copy(dst1_uri, dst2_uri, extra_headers)
+ info = s3.object_info(dst2_uri)
+ saved_bytes = saved_bytes + int(info['headers']['content-length'])
+ except:
+ raise
+ return (len(copy_pairs), saved_bytes)
+
+
def cmd_sync_local2remote(args):
- def _build_attr_header(src):
+ def _build_attr_header(local_list, src):
import pwd, grp
attrs = {}
- src = deunicodise(src)
- try:
- st = os.stat_result(os.stat(src))
- except OSError, e:
- raise InvalidFileError(u"%s: %s" % (unicodise(src), e.strerror))
for attr in cfg.preserve_attrs_list:
if attr == 'uname':
try:
- val = pwd.getpwuid(st.st_uid).pw_name
+ val = pwd.getpwuid(local_list[src]['uid']).pw_name
except KeyError:
attr = "uid"
- val = st.st_uid
- warning(u"%s: Owner username not known. Storing UID=%d instead." % (unicodise(src), val))
+ val = local_list[src].get(uid)
+ warning(u"%s: Owner username not known. Storing UID=%d instead." % (src, val))
elif attr == 'gname':
try:
- val = grp.getgrgid(st.st_gid).gr_name
+ val = grp.getgrgid(local_list[src].get('gid')).gr_name
except KeyError:
attr = "gid"
- val = st.st_gid
- warning(u"%s: Owner groupname not known. Storing GID=%d instead." % (unicodise(src), val))
+ val = local_list[src].get('gid')
+ warning(u"%s: Owner groupname not known. Storing GID=%d instead." % (src, val))
+ elif attr == 'md5':
+ val = local_list.get_md5(src)
else:
- val = getattr(st, 'st_' + attr)
+ val = getattr(local_list[src]['sr'], 'st_' + attr)
attrs[attr] = val
+
+ if 'md5' in attrs and attrs['md5'] is None:
+ del attrs['md5']
+
result = ""
for k in attrs: result += "%s:%s/" % (k, attrs[k])
return { 'x-amz-meta-s3cmd-attrs' : result[:-1] }
- s3 = S3(cfg)
+ def _do_deletes(s3, remote_list):
+ for key in remote_list:
+ uri = S3Uri(remote_list[key]['object_uri_str'])
+ s3.object_delete(uri)
+ output(u"deleted: '%s'" % uri)
- if cfg.encrypt:
- error(u"S3cmd 'sync' doesn't yet support GPG encryption, sorry.")
- error(u"Either use unconditional 's3cmd put --recursive'")
- error(u"or disable encryption with --no-encrypt parameter.")
- sys.exit(1)
+ def _single_process(local_list):
+ for dest in destinations:
+ ## Normalize URI to convert s3://bkt to s3://bkt/ (trailing slash)
+ destination_base_uri = S3Uri(dest)
+ if destination_base_uri.type != 's3':
+ raise ParameterError("Destination must be S3Uri. Got: %s" % destination_base_uri)
+ destination_base = str(destination_base_uri)
+ _child(destination_base, local_list)
+
+ def _parent():
+ # Now that we've done all the disk I/O to look at the local file system and
+ # calculate the md5 for each file, fork for each destination to upload to them separately
+ # and in parallel
+ child_pids = []
+
+ for dest in destinations:
+ ## Normalize URI to convert s3://bkt to s3://bkt/ (trailing slash)
+ destination_base_uri = S3Uri(dest)
+ if destination_base_uri.type != 's3':
+ raise ParameterError("Destination must be S3Uri. Got: %s" % destination_base_uri)
+ destination_base = str(destination_base_uri)
+ child_pid = os.fork()
+ if child_pid == 0:
+ _child(destination_base, local_list)
+ os._exit(0)
+ else:
+ child_pids.append(child_pid)
- ## Normalize URI to convert s3://bkt to s3://bkt/ (trailing slash)
- destination_base_uri = S3Uri(args[-1])
- if destination_base_uri.type != 's3':
- raise ParameterError("Destination must be S3Uri. Got: %s" % destination_base_uri)
- destination_base = str(destination_base_uri)
+ while len(child_pids):
+ (pid, status) = os.wait()
+ child_pids.remove(pid)
+ return
- local_list, single_file_local = fetch_local_list(args[:-1], recursive = True)
- remote_list = fetch_remote_list(destination_base, recursive = True, require_attribs = True)
+ def _child(destination_base, local_list):
+ def _set_remote_uri(local_list, destination_base, single_file_local):
+ if len(local_list) > 0:
+ ## Populate 'remote_uri' only if we've got something to upload
+ if not destination_base.endswith("/"):
+ if not single_file_local:
+ raise ParameterError("Destination S3 URI must end with '/' (ie must refer to a directory on the remote side).")
+ local_list[local_list.keys()[0]]['remote_uri'] = unicodise(destination_base)
+ else:
+ for key in local_list:
+ local_list[key]['remote_uri'] = unicodise(destination_base + key)
+
+ def _upload(local_list, seq, total, total_size):
+ file_list = local_list.keys()
+ file_list.sort()
+ for file in file_list:
+ seq += 1
+ item = local_list[file]
+ src = item['full_name']
+ uri = S3Uri(item['remote_uri'])
+ seq_label = "[%d of %d]" % (seq, total)
+ extra_headers = copy(cfg.extra_headers)
+ try:
+ if cfg.preserve_attrs:
+ attr_header = _build_attr_header(local_list, file)
+ debug(u"attr_header: %s" % attr_header)
+ extra_headers.update(attr_header)
+ response = s3.object_put(src, uri, extra_headers, extra_label = seq_label)
+ except InvalidFileError, e:
+ warning(u"File can not be uploaded: %s" % e)
+ continue
+ except S3UploadError, e:
+ error(u"%s: upload failed too many times. Skipping that file." % item['full_name_unicode'])
+ continue
+ speed_fmt = formatSize(response["speed"], human_readable = True, floating_point = True)
+ if not cfg.progress_meter:
+ output(u"File '%s' stored as '%s' (%d bytes in %0.1f seconds, %0.2f %sB/s) %s" %
+ (item['full_name_unicode'], uri, response["size"], response["elapsed"],
+ speed_fmt[0], speed_fmt[1], seq_label))
+ total_size += response["size"]
+ uploaded_objects_list.append(uri.object())
+ return seq, total_size
- local_count = len(local_list)
- remote_count = len(remote_list)
+ remote_list = fetch_remote_list(destination_base, recursive = True, require_attribs = True)
- info(u"Found %d local files, %d remote files" % (local_count, remote_count))
+ local_count = len(local_list)
+ remote_count = len(remote_list)
- local_list, exclude_list = filter_exclude_include(local_list)
+ info(u"Found %d local files, %d remote files" % (local_count, remote_count))
- if single_file_local and len(local_list) == 1 and len(remote_list) == 1:
- ## Make remote_key same as local_key for comparison if we're dealing with only one file
- remote_list_entry = remote_list[remote_list.keys()[0]]
- # Flush remote_list, by the way
- remote_list = { local_list.keys()[0] : remote_list_entry }
+ local_list, exclude_list = filter_exclude_include(local_list)
- local_list, remote_list, existing_list = compare_filelists(local_list, remote_list, src_remote = False, dst_remote = True)
+ if single_file_local and len(local_list) == 1 and len(remote_list) == 1:
+ ## Make remote_key same as local_key for comparison if we're dealing with only one file
+ remote_list_entry = remote_list[remote_list.keys()[0]]
+ # Flush remote_list, by the way
+ remote_list = { local_list.keys()[0] : remote_list_entry }
- local_count = len(local_list)
- remote_count = len(remote_list)
+ local_list, remote_list, update_list, copy_pairs = compare_filelists(local_list, remote_list, src_remote = False, dst_remote = True, delay_updates = cfg.delay_updates)
- info(u"Summary: %d local files to upload, %d remote files to delete" % (local_count, remote_count))
+ local_count = len(local_list)
+ update_count = len(update_list)
+ copy_count = len(copy_pairs)
+ remote_count = len(remote_list)
- if local_count > 0:
- ## Populate 'remote_uri' only if we've got something to upload
- if not destination_base.endswith("/"):
- if not single_file_local:
- raise ParameterError("Destination S3 URI must end with '/' (ie must refer to a directory on the remote side).")
- local_list[local_list.keys()[0]]['remote_uri'] = unicodise(destination_base)
- else:
+ info(u"Summary: %d local files to upload, %d files to remote copy, %d remote files to delete" % (local_count + update_count, copy_count, remote_count))
+
+ _set_remote_uri(local_list, destination_base, single_file_local)
+ _set_remote_uri(update_list, destination_base, single_file_local)
+
+ if cfg.dry_run:
+ for key in exclude_list:
+ output(u"exclude: %s" % unicodise(key))
for key in local_list:
- local_list[key]['remote_uri'] = unicodise(destination_base + key)
+ output(u"upload: %s -> %s" % (local_list[key]['full_name_unicode'], local_list[key]['remote_uri']))
+ for key in update_list:
+ output(u"upload: %s -> %s" % (update_list[key]['full_name_unicode'], update_list[key]['remote_uri']))
+ for (dst1, dst2) in copy_pairs:
+ output(u"remote copy: %s -> %s" % (dst1['object_key'], remote_list[dst2]['object_key']))
+ if cfg.delete_removed:
+ for key in remote_list:
+ output(u"delete: %s" % remote_list[key]['object_uri_str'])
+
+ warning(u"Exitting now because of --dry-run")
+ return
- if cfg.dry_run:
- for key in exclude_list:
- output(u"exclude: %s" % unicodise(key))
- if cfg.delete_removed:
- for key in remote_list:
- output(u"delete: %s" % remote_list[key]['object_uri_str'])
- for key in local_list:
- output(u"upload: %s -> %s" % (local_list[key]['full_name_unicode'], local_list[key]['remote_uri']))
+ # if there are copy pairs, we can't do delete_before, on the chance
+ # we need one of the to-be-deleted files as a copy source.
+ if len(copy_pairs) > 0:
+ cfg.delete_after = True
+
+ if cfg.delete_removed and not cfg.delete_after:
+ _do_deletes(s3, remote_list)
+
+ uploaded_objects_list = []
+ total_size = 0
+ total_elapsed = 0.0
+ timestamp_start = time.time()
+ n, total_size = _upload(local_list, 0, local_count, total_size)
+ n, total_size = _upload(update_list, n, local_count, total_size)
+ n_copies, saved_bytes = remote_copy(s3, copy_pairs, destination_base)
+ if cfg.delete_removed and cfg.delete_after:
+ _do_deletes(s3, remote_list)
+ total_elapsed = time.time() - timestamp_start
+ total_speed = total_elapsed and total_size/total_elapsed or 0.0
+ speed_fmt = formatSize(total_speed, human_readable = True, floating_point = True)
+
+ # Only print out the result if any work has been done or
+ # if the user asked for verbose output
+ outstr = "Done. Uploaded %d bytes in %0.1f seconds, %0.2f %sB/s. Copied %d files saving %d bytes transfer." % (total_size, total_elapsed, speed_fmt[0], speed_fmt[1], n_copies, saved_bytes)
+ if total_size + saved_bytes > 0:
+ output(outstr)
+ else:
+ info(outstr)
+
+ if cfg.invalidate_on_cf:
+ if len(uploaded_objects_list) == 0:
+ info("Nothing to invalidate in CloudFront")
+ else:
+ # 'uri' from the last iteration is still valid at this point
+ cf = CloudFront(cfg)
+ result = cf.InvalidateObjects(uri, uploaded_objects_list)
+ if result['status'] == 201:
+ output("Created invalidation request for %d paths" % len(uploaded_objects_list))
+ output("Check progress with: s3cmd cfinvalinfo cf://%s/%s" % (result['dist_id'], result['request_id']))
- warning(u"Exitting now because of --dry-run")
return
- if cfg.delete_removed:
- for key in remote_list:
- uri = S3Uri(remote_list[key]['object_uri_str'])
- s3.object_delete(uri)
- output(u"deleted: '%s'" % uri)
+ # main execution
+ s3 = S3(cfg)
- uploaded_objects_list = []
- total_size = 0
- total_elapsed = 0.0
- timestamp_start = time.time()
- seq = 0
- file_list = local_list.keys()
- file_list.sort()
- for file in file_list:
- seq += 1
- item = local_list[file]
- src = item['full_name']
- uri = S3Uri(item['remote_uri'])
- seq_label = "[%d of %d]" % (seq, local_count)
- extra_headers = copy(cfg.extra_headers)
- try:
- if cfg.preserve_attrs:
- attr_header = _build_attr_header(src)
- debug(u"attr_header: %s" % attr_header)
- extra_headers.update(attr_header)
- response = s3.object_put(src, uri, extra_headers, extra_label = seq_label)
- except InvalidFileError, e:
- warning(u"File can not be uploaded: %s" % e)
- continue
- except S3UploadError, e:
- error(u"%s: upload failed too many times. Skipping that file." % item['full_name_unicode'])
- continue
- speed_fmt = formatSize(response["speed"], human_readable = True, floating_point = True)
- if not cfg.progress_meter:
- output(u"File '%s' stored as '%s' (%d bytes in %0.1f seconds, %0.2f %sB/s) %s" %
- (item['full_name_unicode'], uri, response["size"], response["elapsed"],
- speed_fmt[0], speed_fmt[1], seq_label))
- total_size += response["size"]
- uploaded_objects_list.append(uri.object())
+ if cfg.encrypt:
+ error(u"S3cmd 'sync' doesn't yet support GPG encryption, sorry.")
+ error(u"Either use unconditional 's3cmd put --recursive'")
+ error(u"or disable encryption with --no-encrypt parameter.")
+ sys.exit(1)
- total_elapsed = time.time() - timestamp_start
- total_speed = total_elapsed and total_size/total_elapsed or 0.0
- speed_fmt = formatSize(total_speed, human_readable = True, floating_point = True)
+ local_list, single_file_local = fetch_local_list(args[:-1], recursive = True)
- # Only print out the result if any work has been done or
- # if the user asked for verbose output
- outstr = "Done. Uploaded %d bytes in %0.1f seconds, %0.2f %sB/s" % (total_size, total_elapsed, speed_fmt[0], speed_fmt[1])
- if total_size > 0:
- output(outstr)
+ destinations = [args[-1]]
+ if cfg.additional_destinations:
+ destinations = destinations + cfg.additional_destinations
+
+ if 'fork' not in os.__all__ or len(destinations) < 2:
+ _single_process(local_list)
else:
- info(outstr)
+ _parent()
- if cfg.invalidate_on_cf:
- if len(uploaded_objects_list) == 0:
- info("Nothing to invalidate in CloudFront")
- else:
- # 'uri' from the last iteration is still valid at this point
- cf = CloudFront(cfg)
- result = cf.InvalidateObjects(uri, uploaded_objects_list)
- if result['status'] == 201:
- output("Created invalidation request for %d paths" % len(uploaded_objects_list))
- output("Check progress with: s3cmd cfinvalinfo cf://%s/%s" % (result['dist_id'], result['request_id']))
def cmd_sync(args):
if (len(args) < 2):
@@ -1499,6 +1631,9 @@ def main():
optparser.add_option( "--delete-removed", dest="delete_removed", action="store_true", help="Delete remote objects with no corresponding local file [sync]")
optparser.add_option( "--no-delete-removed", dest="delete_removed", action="store_false", help="Don't delete remote objects.")
+ optparser.add_option( "--delete-after", dest="delete_after", action="store_true", help="Perform deletes after new uploads [sync]")
+ optparser.add_option( "--delay-updates", dest="delay_updates", action="store_true", help="Put all updated files into place at end [sync]")
+ optparser.add_option( "--add-destination", dest="additional_destinations", action="append", help="Additional destination for parallel uploads, in addition to last arg. May be repeated.")
optparser.add_option("-p", "--preserve", dest="preserve_attrs", action="store_true", help="Preserve filesystem attributes (mode, ownership, timestamps). Default for [sync] command.")
optparser.add_option( "--no-preserve", dest="preserve_attrs", action="store_false", help="Don't store FS attributes")
optparser.add_option( "--exclude", dest="exclude", action="append", metavar="GLOB", help="Filenames and paths matching GLOB will be excluded from sync")
@@ -1674,6 +1809,9 @@ def main():
## Some CloudFront.Cmd.Options() options are not settable from command line
pass
+ if options.additional_destinations:
+ cfg.additional_destinations = options.additional_destinations
+
## Set output and filesystem encoding for printing out filenames.
sys.stdout = codecs.getwriter(cfg.encoding)(sys.stdout, "replace")
sys.stderr = codecs.getwriter(cfg.encoding)(sys.stderr, "replace")
View
5 s3cmd.1
@@ -186,6 +186,11 @@ Delete remote objects with no corresponding local file
\fB\-\-no\-delete\-removed\fR
Don't delete remote objects.
.TP
+\fB\-\-delete\-after\fR
+Perform deletes after new uploads [sync].
+\fB\-\-delay\-updates\fR
+Put all updated files into place at end [sync]
+.TP
\fB\-p\fR, \fB\-\-preserve\fR
Preserve filesystem attributes (mode, ownership,
timestamps). Default for [sync] command.
Something went wrong with that request. Please try again.