Skip to content
This repository has been archived by the owner on Nov 4, 2018. It is now read-only.

Commit

Permalink
handle remote->local transfers with local hardlink/copy if possible
Browse files Browse the repository at this point in the history
Reworked some of the hardlink / same file detection code to be a
little more general purpose.  Now it can be used to detect duplicate
files on either remote or local side.

When transferring remote->local, if we already have a copy (same
md5sum) of a file locally that we would otherwise transfer, don't
transfer, but hardlink it.  Should hardlink not be avaialble (e.g. on
Windows), use shutil.copy2() instead.  This lets us avoid the second
download completely.

_get_filelist_local() grew an initial list argument.  This lets us
avoid copying / merging / updating a bunch of different lists back
into one - it starts as one list and grows.  Much cleaner (and the
fact these were separate cost me several hours of debugging to track
down why something would get set, like the by_md5 hash, only to have
it be empty shortly thereafter.
  • Loading branch information
Matt Domsch authored and mdomsch committed Jun 18, 2012
1 parent cdf25f9 commit f881b16
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 36 deletions.
26 changes: 14 additions & 12 deletions S3/FileLists.py
Expand Up @@ -137,7 +137,7 @@ def handle_exclude_include_walk(root, dirs, files):
debug(u"PASS: %s" % (file)) debug(u"PASS: %s" % (file))


def fetch_local_list(args, recursive = None): def fetch_local_list(args, recursive = None):
def _get_filelist_local(local_uri): def _get_filelist_local(loc_list, local_uri):
info(u"Compiling list of local files...") info(u"Compiling list of local files...")
if local_uri.isdir(): if local_uri.isdir():
local_base = deunicodise(local_uri.basename()) local_base = deunicodise(local_uri.basename())
Expand All @@ -149,7 +149,6 @@ def _get_filelist_local(local_uri):
local_path = deunicodise(local_uri.dirname()) local_path = deunicodise(local_uri.dirname())
filelist = [( local_path, [], [deunicodise(local_uri.basename())] )] filelist = [( local_path, [], [deunicodise(local_uri.basename())] )]
single_file = True single_file = True
loc_list = SortedDict(ignore_case = False)
for root, dirs, files in filelist: for root, dirs, files in filelist:
rel_root = root.replace(local_path, local_base, 1) rel_root = root.replace(local_path, local_base, 1)
for f in files: for f in files:
Expand All @@ -173,15 +172,16 @@ def _get_filelist_local(local_uri):
'full_name' : full_name, 'full_name' : full_name,
'size' : sr.st_size, 'size' : sr.st_size,
'mtime' : sr.st_mtime, 'mtime' : sr.st_mtime,
'nlink' : sr.st_nlink, # record hardlink information
'dev' : sr.st_dev, 'dev' : sr.st_dev,
'inode' : sr.st_ino, 'inode' : sr.st_ino,
'uid' : sr.st_uid, 'uid' : sr.st_uid,
'gid' : sr.st_gid, 'gid' : sr.st_gid,
'sr': sr # save it all, may need it in preserve_attrs_list 'sr': sr # save it all, may need it in preserve_attrs_list
## TODO: Possibly more to save here... ## TODO: Possibly more to save here...
} }
loc_list.record_hardlink(relative_file, sr.st_dev, sr.st_ino) if 'md5' in cfg.sync_checks:
md5 = loc_list.get_md5(relative_file)
loc_list.record_hardlink(relative_file, sr.st_dev, sr.st_ino, md5)
return loc_list, single_file return loc_list, single_file


cfg = Config() cfg = Config()
Expand All @@ -204,8 +204,7 @@ def _get_filelist_local(local_uri):
local_uris.append(uri) local_uris.append(uri)


for uri in local_uris: for uri in local_uris:
list_for_uri, single_file = _get_filelist_local(uri) list_for_uri, single_file = _get_filelist_local(local_list, uri)
local_list.update(list_for_uri)


## Single file is True if and only if the user ## Single file is True if and only if the user
## specified one local URI and that URI represents ## specified one local URI and that URI represents
Expand Down Expand Up @@ -264,7 +263,6 @@ def _get_filelist_remote(remote_uri, recursive = True):
'object_key' : object['Key'], 'object_key' : object['Key'],
'object_uri_str' : object_uri_str, 'object_uri_str' : object_uri_str,
'base_uri' : remote_uri, 'base_uri' : remote_uri,
'nlink' : 1, # S3 doesn't support hardlinks itself
'dev' : None, 'dev' : None,
'inode' : None, 'inode' : None,
} }
Expand Down Expand Up @@ -406,7 +404,7 @@ def _compare(src_list, dst_lst, src_remote, dst_remote, file):
debug("Comparing filelists (direction: %s -> %s)" % (__direction_str(src_remote), __direction_str(dst_remote))) debug("Comparing filelists (direction: %s -> %s)" % (__direction_str(src_remote), __direction_str(dst_remote)))


for relative_file in src_list.keys(): for relative_file in src_list.keys():
debug(u"CHECK: %s: %s" % (relative_file, src_list.get_md5(relative_file))) debug(u"CHECK: %s" % (relative_file))


if dst_list.has_key(relative_file): if dst_list.has_key(relative_file):
## Was --skip-existing requested? ## Was --skip-existing requested?
Expand All @@ -416,7 +414,14 @@ def _compare(src_list, dst_lst, src_remote, dst_remote, file):
del(dst_list[relative_file]) del(dst_list[relative_file])
continue continue


if _compare(src_list, dst_list, src_remote, dst_remote, relative_file): try:
compare_result = _compare(src_list, dst_list, src_remote, dst_remote, relative_file)
except (IOError,OSError), e:
del(src_list[relative_file])
del(dst_list[relative_file])
continue

if compare_result:
debug(u"IGNR: %s (transfer not needed)" % relative_file) debug(u"IGNR: %s (transfer not needed)" % relative_file)
del(src_list[relative_file]) del(src_list[relative_file])
del(dst_list[relative_file]) del(dst_list[relative_file])
Expand All @@ -434,7 +439,6 @@ def _compare(src_list, dst_lst, src_remote, dst_remote, file):
else: else:
# record that we will get this file transferred to us (before all the copies), so if we come across it later again, # record that we will get this file transferred to us (before all the copies), so if we come across it later again,
# we can copy from _this_ copy (e.g. we only upload it once, and copy thereafter). # we can copy from _this_ copy (e.g. we only upload it once, and copy thereafter).
debug(u"REMOTE COPY src before")
dst_list.record_md5(relative_file, md5) dst_list.record_md5(relative_file, md5)
update_list[relative_file] = src_list[relative_file] update_list[relative_file] = src_list[relative_file]
del src_list[relative_file] del src_list[relative_file]
Expand All @@ -448,15 +452,13 @@ def _compare(src_list, dst_lst, src_remote, dst_remote, file):
if dst1 is not None: if dst1 is not None:
# Found one, we want to copy # Found one, we want to copy
debug(u"REMOTE COPY dst: %s -> %s" % (dst1, relative_file)) debug(u"REMOTE COPY dst: %s -> %s" % (dst1, relative_file))
# FIXME this blows up when dst1 is not in dst_list, because we added it below in record_md5 but it's not really in dst_list.
copy_pairs.append((dst1, relative_file)) copy_pairs.append((dst1, relative_file))
del(src_list[relative_file]) del(src_list[relative_file])
else: else:
# we don't have this file, and we don't have a copy of this file elsewhere. Get it. # we don't have this file, and we don't have a copy of this file elsewhere. Get it.
# record that we will get this file transferred to us (before all the copies), so if we come across it later again, # record that we will get this file transferred to us (before all the copies), so if we come across it later again,
# we can copy from _this_ copy (e.g. we only upload it once, and copy thereafter). # we can copy from _this_ copy (e.g. we only upload it once, and copy thereafter).
dst_list.record_md5(relative_file, md5) dst_list.record_md5(relative_file, md5)
debug(u"REMOTE COPY dst before")


for f in dst_list.keys(): for f in dst_list.keys():
if not src_list.has_key(f) and not update_list.has_key(f): if not src_list.has_key(f) and not update_list.has_key(f):
Expand Down
31 changes: 8 additions & 23 deletions S3/SortedDict.py
Expand Up @@ -27,7 +27,7 @@ def __init__(self, mapping = {}, ignore_case = True, **kwargs):
""" """
dict.__init__(self, mapping, **kwargs) dict.__init__(self, mapping, **kwargs)
self.ignore_case = ignore_case self.ignore_case = ignore_case
self.hardlinks = dict() self.hardlinks = dict() # { dev: { inode : {'md5':, 'relative_files':}}}
self.by_md5 = dict() # {md5: set(relative_files)} self.by_md5 = dict() # {md5: set(relative_files)}


def keys(self): def keys(self):
Expand Down Expand Up @@ -60,39 +60,24 @@ def find_md5_one(self, md5):
except: except:
return None return None



def get_md5(self, relative_file): def get_md5(self, relative_file):
md5 = None md5 = None
if 'md5' in self[relative_file]: if 'md5' in self[relative_file]:
return self[relative_file]['md5'] return self[relative_file]['md5']
if self.is_hardlinked(relative_file): # speedup by getting it from one of the hardlinks already processed md5 = self.get_hardlink_md5(relative_file)
md5 = self.get_hardlink_md5(relative_file) if md5 is None:
if md5 is None: md5 = Utils.hash_file_md5(self[relative_file]['full_name'])
md5 = Utils.hash_file_md5(self[relative_file]['full_name']) self.record_md5(relative_file, md5)
self.record_md5(relative_file, md5) self[relative_file]['md5'] = md5
self.set_hardlink_md5(relative_file, md5)
else:
md5 = Utils.hash_file_md5(self[relative_file]['full_name'])
self[relative_file]['md5'] = md5
l.record_md5(relative_file, md5)
return md5 return md5


def record_hardlink(self, relative_file, dev, inode): def record_hardlink(self, relative_file, dev, inode, md5):
if dev not in self.hardlinks: if dev not in self.hardlinks:
self.hardlinks[dev] = dict() self.hardlinks[dev] = dict()
if inode not in self.hardlinks[dev]: if inode not in self.hardlinks[dev]:
self.hardlinks[dev][inode] = dict(md5=None, relative_files=set()) self.hardlinks[dev][inode] = dict(md5=md5, relative_files=set())
self.hardlinks[dev][inode]['relative_files'].add(relative_file) self.hardlinks[dev][inode]['relative_files'].add(relative_file)


def set_hardlink_md5(self, relative_file, md5):
dev = self[relative_file]['dev']
inode = self[relative_file]['inode']
self.record_hardlink(relative_file, dev, inode)
self.hardlinks[dev][inode]['md5'] = md5

def is_hardlinked(self, relative_file):
return self[relative_file]['nlink'] > 1

def get_hardlink_md5(self, relative_file): def get_hardlink_md5(self, relative_file):
md5 = None md5 = None
dev = self[relative_file]['dev'] dev = self[relative_file]['dev']
Expand Down
14 changes: 13 additions & 1 deletion s3cmd
Expand Up @@ -23,6 +23,7 @@ import locale
import subprocess import subprocess
import htmlentitydefs import htmlentitydefs
import socket import socket
import shutil


from copy import copy from copy import copy
from optparse import OptionParser, Option, OptionValueError, IndentedHelpFormatter from optparse import OptionParser, Option, OptionValueError, IndentedHelpFormatter
Expand Down Expand Up @@ -700,8 +701,9 @@ def cmd_sync_remote2local(args):
local_count = len(local_list) local_count = len(local_list)
remote_count = len(remote_list) remote_count = len(remote_list)
update_count = len(update_list) update_count = len(update_list)
copy_pairs_count = len(copy_pairs)


info(u"Summary: %d remote files to download, %d local files to delete" % (remote_count + update_count, local_count)) info(u"Summary: %d remote files to download, %d local files to delete, %d local files to hardlink" % (remote_count + update_count, local_count, copy_pairs_count))


def _set_local_filename(remote_list, destination_base): def _set_local_filename(remote_list, destination_base):
if not os.path.isdir(destination_base): if not os.path.isdir(destination_base):
Expand Down Expand Up @@ -826,6 +828,7 @@ def cmd_sync_remote2local(args):
seq = 0 seq = 0
seq, total_size = _download(remote_list, seq, remote_count + update_count, total_size, dir_cache) seq, total_size = _download(remote_list, seq, remote_count + update_count, total_size, dir_cache)
seq, total_size = _download(update_list, seq, remote_count + update_count, total_size, dir_cache) seq, total_size = _download(update_list, seq, remote_count + update_count, total_size, dir_cache)
local_hardlink(copy_pairs, destination_base)


total_elapsed = time.time() - timestamp_start total_elapsed = time.time() - timestamp_start
speed_fmt = formatSize(total_size/total_elapsed, human_readable = True, floating_point = True) speed_fmt = formatSize(total_size/total_elapsed, human_readable = True, floating_point = True)
Expand All @@ -841,6 +844,15 @@ def cmd_sync_remote2local(args):
if cfg.delete_removed and cfg.delete_after: if cfg.delete_removed and cfg.delete_after:
_do_deletes(local_list) _do_deletes(local_list)


def local_hardlink(copy_pairs, destination_base):
for (dst1, dst2) in copy_pairs:
try:
os.link(destination_base + dst1, destination_base + dst2)
debug(u"Hardlinking %s to %s" % (destination_base + dst1, destination_base + dst2))
except:
shutil.copy2(destination_base + dst1, destination_base + dst2)
debug(u"Hardlinking unavailable, copying %s to %s" % (destination_base + dst1, destination_base + dst2))

def remote_copy(s3, copy_pairs, destination_base): def remote_copy(s3, copy_pairs, destination_base):
saved_bytes = 0 saved_bytes = 0
for (dst1, dst2) in copy_pairs: for (dst1, dst2) in copy_pairs:
Expand Down

0 comments on commit f881b16

Please sign in to comment.