Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
branch: master
Fetching contributors…

Cannot retrieve contributors at this time

726 lines (696 sloc) 24.594 kB
Index: S3/Config.py
===================================================================
--- S3/Config.py (revision 437)
+++ S3/Config.py (working copy)
@@ -74,7 +74,9 @@
urlencoding_mode = "normal"
log_target_prefix = ""
reduced_redundancy = False
- follow_symlinks=False
+ parallel = False
+ workers = 10
+ follow_symlinks=False
## Creating a singleton
def __new__(self, configfile = None):
Index: s3cmd
===================================================================
--- s3cmd (revision 437)
+++ s3cmd (working copy)
@@ -23,11 +23,17 @@
import subprocess
import htmlentitydefs
+import Queue
+import threading
+
from copy import copy
from optparse import OptionParser, Option, OptionValueError, IndentedHelpFormatter
from logging import debug, info, warning, error
from distutils.spawn import find_executable
+
+
+
def output(message):
sys.stdout.write(message + "\n")
@@ -317,7 +323,6 @@
def cmd_object_put(args):
cfg = Config()
- s3 = S3(cfg)
if len(args) == 0:
raise ParameterError("Nothing to upload. Expecting a local file or directory and a S3 URI destination.")
@@ -356,43 +361,86 @@
warning(u"Exitting now because of --dry-run")
return
+
+ if cfg.parallel and len(local_list) > 1:
+ #Disabling progress metter for parallel downloads.
+ cfg.progress_meter = False
+ #Initialize Queue
+ global q
+ q = Queue.Queue()
+
+ seq = 0
+ for key in local_list:
+ seq += 1
+ q.put([local_list[key],seq,len(local_list)])
- seq = 0
- for key in local_list:
- seq += 1
+ for i in range(cfg.workers):
+ t = threading.Thread(target=put_worker)
+ t.daemon = True
+ t.start()
- uri_final = S3Uri(local_list[key]['remote_uri'])
+ #Necessary to ensure KeyboardInterrupt can actually kill
+ #Otherwise Queue.join() blocks until all queue elements have completed
+ while threading.active_count() > 1:
+ time.sleep(.1)
- extra_headers = copy(cfg.extra_headers)
- full_name_orig = local_list[key]['full_name']
- full_name = full_name_orig
- seq_label = "[%d of %d]" % (seq, local_count)
- if Config().encrypt:
- exitcode, full_name, extra_headers["x-amz-meta-s3tools-gpgenc"] = gpg_encrypt(full_name_orig)
+ q.join()
+ else:
+ seq = 0
+ for key in local_list:
+ seq += 1
+ do_put_work(local_list[key],seq,len(local_list))
+
+def put_worker():
+ while True:
try:
- response = s3.object_put(full_name, uri_final, extra_headers, extra_label = seq_label)
- except S3UploadError, e:
- error(u"Upload of '%s' failed too many times. Skipping that file." % full_name_orig)
- continue
- except InvalidFileError, e:
- warning(u"File can not be uploaded: %s" % e)
- continue
- speed_fmt = formatSize(response["speed"], human_readable = True, floating_point = True)
- if not Config().progress_meter:
- output(u"File '%s' stored as '%s' (%d bytes in %0.1f seconds, %0.2f %sB/s) %s" %
- (unicodise(full_name_orig), uri_final, response["size"], response["elapsed"],
- speed_fmt[0], speed_fmt[1], seq_label))
- if Config().acl_public:
- output(u"Public URL of the object is: %s" %
- (uri_final.public_url()))
- if Config().encrypt and full_name != full_name_orig:
- debug(u"Removing temporary encrypted file: %s" % unicodise(full_name))
- os.remove(full_name)
+ (item,seq,total) = q.get_nowait()
+ except Queue.Empty:
+ return
+ try:
+ do_put_work(item,seq,total)
+ except Exception, e:
+ report_exception(e)
+ exit
+ q.task_done()
-def cmd_object_get(args):
+def do_put_work(item,seq,total):
cfg = Config()
s3 = S3(cfg)
+ uri_final = S3Uri(item['remote_uri'])
+ extra_headers = copy(cfg.extra_headers)
+ full_name_orig = item['full_name']
+ full_name = full_name_orig
+ seq_label = "[%d of %d]" % (seq, total)
+ if Config().encrypt:
+ exitcode, full_name, extra_headers["x-amz-meta-s3tools-gpgenc"] = gpg_encrypt(full_name_orig)
+ if not Config().progress_meter:
+ output(u"File '%s' started %s" %
+ (unicodise(full_name_orig), seq_label))
+ try:
+ response = s3.object_put(full_name, uri_final, extra_headers, extra_label = seq_label)
+ except S3UploadError, e:
+ error(u"Upload of '%s' failed too many times. Skipping that file." % full_name_orig)
+ return
+ except InvalidFileError, e:
+ warning(u"File can not be uploaded: %s" % e)
+ return
+ speed_fmt = formatSize(response["speed"], human_readable = True, floating_point = True)
+ if not Config().progress_meter:
+ output(u"File '%s' stored as '%s' (%d bytes in %0.1f seconds, %0.2f %sB/s) %s" %
+ (unicodise(full_name_orig), uri_final, response["size"], response["elapsed"],
+ speed_fmt[0], speed_fmt[1], seq_label))
+ if Config().acl_public:
+ output(u"Public URL of the object is: %s" %
+ (uri_final.public_url()))
+ if Config().encrypt and full_name != full_name_orig:
+ debug(u"Removing temporary encrypted file: %s" % unicodise(full_name))
+ os.remove(full_name)
+
+def cmd_object_get(args):
+ cfg = Config()
+
## Check arguments:
## if not --recursive:
## - first N arguments must be S3Uri
@@ -465,59 +513,107 @@
warning(u"Exitting now because of --dry-run")
return
- seq = 0
- for key in remote_list:
- seq += 1
- item = remote_list[key]
- uri = S3Uri(item['object_uri_str'])
- ## Encode / Decode destination with "replace" to make sure it's compatible with current encoding
- destination = unicodise_safe(item['local_filename'])
- seq_label = "[%d of %d]" % (seq, remote_count)
+ if cfg.parallel and len(remote_list) > 1:
+ #Disabling progress metter for parallel downloads.
+ cfg.progress_meter = False
+ #Initialize Queue
+ global q
+ q = Queue.Queue()
+
+ seq = 0
+ for key in remote_list:
+ seq += 1
+ q.put([remote_list[key],seq,len(remote_list)])
- start_position = 0
+ for i in range(cfg.workers):
+ t = threading.Thread(target=get_worker)
+ t.daemon = True
+ t.start()
+
+ #Necessary to ensure KeyboardInterrupt can actually kill
+ #Otherwise Queue.join() blocks until all queue elements have completed
+ while threading.active_count() > 1:
+ time.sleep(.1)
- if destination == "-":
- ## stdout
- dst_stream = sys.__stdout__
- else:
- ## File
+ q.join()
+ else:
+ seq = 0
+ for key in remote_list:
+ seq += 1
+ do_get_work(remote_list[key],seq,len(remote_list))
+
+def get_worker():
+ while True:
+ try:
+ (item,seq,total) = q.get_nowait()
+ except Queue.Empty:
+ return
+ try:
+ do_get_work(item,seq,total)
+ except ParameterError, e:
+ error(u"Parameter problem: %s" % e)
+ except S3Error, e:
+ error(u"S3 error: %s" % e)
+ exit
+ except Exception, e:
+ report_exception(e)
+ exit
+ q.task_done()
+
+def do_get_work(item,seq,total):
+ cfg = Config()
+ s3 = S3(cfg)
+ uri = S3Uri(item['object_uri_str'])
+ ## Encode / Decode destination with "replace" to make sure it's compatible with current encoding
+ destination = unicodise_safe(item['local_filename'])
+ seq_label = "[%d of %d]" % (seq, total)
+ start_position = 0
+
+ if destination == "-":
+ ## stdout
+ dst_stream = sys.__stdout__
+ else:
+ ## File
+ try:
+ file_exists = os.path.exists(destination)
try:
- file_exists = os.path.exists(destination)
- try:
+ dst_stream = open(destination, "ab")
+ except IOError, e:
+ if e.errno == errno.ENOENT:
+ basename = destination[:destination.rindex(os.path.sep)]
+ info(u"Creating directory: %s" % basename)
+ os.makedirs(basename)
dst_stream = open(destination, "ab")
- except IOError, e:
- if e.errno == errno.ENOENT:
- basename = destination[:destination.rindex(os.path.sep)]
- info(u"Creating directory: %s" % basename)
- os.makedirs(basename)
- dst_stream = open(destination, "ab")
- else:
- raise
- if file_exists:
- if Config().get_continue:
- start_position = dst_stream.tell()
- elif Config().force:
- start_position = 0L
- dst_stream.seek(0L)
- dst_stream.truncate()
- elif Config().skip_existing:
- info(u"Skipping over existing file: %s" % (destination))
- continue
- else:
- dst_stream.close()
- raise ParameterError(u"File %s already exists. Use either of --force / --continue / --skip-existing or give it a new name." % destination)
- except IOError, e:
- error(u"Skipping %s: %s" % (destination, e.strerror))
- continue
- response = s3.object_get(uri, dst_stream, start_position = start_position, extra_label = seq_label)
- if response["headers"].has_key("x-amz-meta-s3tools-gpgenc"):
- gpg_decrypt(destination, response["headers"]["x-amz-meta-s3tools-gpgenc"])
- response["size"] = os.stat(destination)[6]
- if not Config().progress_meter and destination != "-":
- speed_fmt = formatSize(response["speed"], human_readable = True, floating_point = True)
- output(u"File %s saved as '%s' (%d bytes in %0.1f seconds, %0.2f %sB/s)" %
- (uri, destination, response["size"], response["elapsed"], speed_fmt[0], speed_fmt[1]))
-
+ else:
+ raise
+ if file_exists:
+ if Config().get_continue:
+ start_position = dst_stream.tell()
+ elif Config().force:
+ start_position = 0L
+ dst_stream.seek(0L)
+ dst_stream.truncate()
+ elif Config().skip_existing:
+ info(u"Skipping over existing file: %s" % (destination))
+ return
+ else:
+ dst_stream.close()
+ raise ParameterError(u"File %s already exists. Use either of --force / --continue / --skip-existing or give it a new name." % destination)
+ except IOError, e:
+ error(u"Skipping %s: %s" % (destination, e.strerror))
+ return
+ if not Config().progress_meter and destination != "-":
+ output(u"File %s started %s" %
+ (uri, seq_label))
+ response = s3.object_get(uri, dst_stream, start_position = start_position, extra_label = seq_label)
+ if response["headers"].has_key("x-amz-meta-s3tools-gpgenc"):
+ gpg_decrypt(destination, response["headers"]["x-amz-meta-s3tools-gpgenc"])
+ response["size"] = os.stat(destination)[6]
+ if not Config().progress_meter and destination != "-":
+ speed_fmt = formatSize(response["speed"], human_readable = True, floating_point = True)
+ output(u"File %s saved as '%s' (%d bytes in %0.1f seconds, %0.2f %sB/s)" %
+ (uri, destination, response["size"], response["elapsed"], speed_fmt[0], speed_fmt[1]))
+
def cmd_object_del(args):
for uri_str in args:
uri = S3Uri(uri_str)
@@ -822,13 +918,6 @@
return src_list, dst_list, exists_list
def cmd_sync_remote2local(args):
- def _parse_attrs_header(attrs_header):
- attrs = {}
- for attr in attrs_header.split("/"):
- key, val = attr.split(":")
- attrs[key] = val
- return attrs
-
s3 = S3(Config())
destination_base = args[-1]
@@ -884,117 +973,133 @@
total_elapsed = 0.0
timestamp_start = time.time()
seq = 0
- dir_cache = {}
file_list = remote_list.keys()
file_list.sort()
- for file in file_list:
- seq += 1
- item = remote_list[file]
- uri = S3Uri(item['object_uri_str'])
- dst_file = item['local_filename']
- seq_label = "[%d of %d]" % (seq, remote_count)
+
+ build_dir_structure(remote_list)
+
+ if cfg.parallel and len(file_list) > 1:
+ #Disabling progress metter for parallel downloads.
+ cfg.progress_meter = False
+ #Initialize Queue
+ global q
+ q = Queue.Queue()
+
+ seq = 0
+ for file in file_list:
+ seq += 1
+ q.put([remote_list[file],seq,len(file_list)])
+
+ for i in range(cfg.workers):
+ t = threading.Thread(target=sync_remote2local_worker)
+ t.daemon = True
+ t.start()
+
+ #Necessary to ensure KeyboardInterrupt can actually kill
+ #Otherwise Queue.join() blocks until all queue elements have completed
+ while threading.active_count() > 1:
+ time.sleep(.1)
+
+ q.join()
+ else:
+ for file in file_list:
+ seq += 1
+ do_remote2local_work(remote_list[file],seq,len(file_list))
+
+ #Omitted due to threading
+
+def sync_remote2local_worker():
+ while True:
try:
- dst_dir = os.path.dirname(dst_file)
- if not dir_cache.has_key(dst_dir):
- dir_cache[dst_dir] = Utils.mkdir_with_parents(dst_dir)
- if dir_cache[dst_dir] == False:
- warning(u"%s: destination directory not writable: %s" % (file, dst_dir))
- continue
- try:
- open_flags = os.O_CREAT
- open_flags |= os.O_TRUNC
- # open_flags |= os.O_EXCL
+ (item,seq,total) = q.get_nowait()
+ except Queue.Empty:
+ return
+ try:
+ do_remote2local_work(item,seq,total)
+ except Exception, e:
+ report_exception(e)
+ exit
+ q.task_done()
- debug(u"dst_file=%s" % unicodise(dst_file))
- # This will have failed should the file exist
- os.close(os.open(dst_file, open_flags))
- # Yeah I know there is a race condition here. Sadly I don't know how to open() in exclusive mode.
- dst_stream = open(dst_file, "wb")
- response = s3.object_get(uri, dst_stream, extra_label = seq_label)
- dst_stream.close()
- if response['headers'].has_key('x-amz-meta-s3cmd-attrs') and cfg.preserve_attrs:
- attrs = _parse_attrs_header(response['headers']['x-amz-meta-s3cmd-attrs'])
- if attrs.has_key('mode'):
- os.chmod(dst_file, int(attrs['mode']))
- if attrs.has_key('mtime') or attrs.has_key('atime'):
- mtime = attrs.has_key('mtime') and int(attrs['mtime']) or int(time.time())
- atime = attrs.has_key('atime') and int(attrs['atime']) or int(time.time())
- os.utime(dst_file, (atime, mtime))
- ## FIXME: uid/gid / uname/gname handling comes here! TODO
- except OSError, e:
- try: dst_stream.close()
- except: pass
- if e.errno == errno.EEXIST:
- warning(u"%s exists - not overwriting" % (dst_file))
- continue
- if e.errno in (errno.EPERM, errno.EACCES):
- warning(u"%s not writable: %s" % (dst_file, e.strerror))
- continue
- raise e
- except KeyboardInterrupt:
- try: dst_stream.close()
- except: pass
- warning(u"Exiting after keyboard interrupt")
- return
- except Exception, e:
- try: dst_stream.close()
- except: pass
- error(u"%s: %s" % (file, e))
- continue
- # We have to keep repeating this call because
- # Python 2.4 doesn't support try/except/finally
- # construction :-(
+def build_dir_structure(file_list):
+ #Builds directory structure first to avoid race conditions with threading
+ dir_cache = {}
+ for item in file_list:
+ dst_file = file_list[item]['local_filename']
+ dst_dir = os.path.dirname(dst_file)
+ if not dir_cache.has_key(dst_dir):
+ dir_cache[dst_dir] = Utils.mkdir_with_parents(dst_dir)
+ if dir_cache[dst_dir] == False:
+ warning(u"%s: destination directory not writable: %s" % (file, dst_dir))
+ continue
+
+def do_remote2local_work(item,seq,total):
+ def _parse_attrs_header(attrs_header):
+ attrs = {}
+ for attr in attrs_header.split("/"):
+ key, val = attr.split(":")
+ attrs[key] = val
+ return attrs
+
+ s3 = S3(Config())
+ uri = S3Uri(item['object_uri_str'])
+ dst_file = item['local_filename']
+ seq_label = "[%d of %d]" % (seq, total)
+ try:
+ dst_dir = os.path.dirname(dst_file)
+ try:
+ open_flags = os.O_CREAT
+ open_flags |= os.O_TRUNC
+ # open_flags |= os.O_EXCL
+
+ debug(u"dst_file=%s" % unicodise(dst_file))
+ # This will have failed should the file exist
+ os.close(os.open(dst_file, open_flags))
+ # Yeah I know there is a race condition here. Sadly I don't know how to open() in exclusive mode.
+ dst_stream = open(dst_file, "wb")
+ if not cfg.progress_meter:
+ output(u"File '%s' started %s" %
+ (uri, seq_label))
+ response = s3.object_get(uri, dst_stream, extra_label = seq_label)
+ dst_stream.close()
+ if response['headers'].has_key('x-amz-meta-s3cmd-attrs') and cfg.preserve_attrs:
+ attrs = _parse_attrs_header(response['headers']['x-amz-meta-s3cmd-attrs'])
+ if attrs.has_key('mode'):
+ os.chmod(dst_file, int(attrs['mode']))
+ if attrs.has_key('mtime') or attrs.has_key('atime'):
+ mtime = attrs.has_key('mtime') and int(attrs['mtime']) or int(time.time())
+ atime = attrs.has_key('atime') and int(attrs['atime']) or int(time.time())
+ os.utime(dst_file, (atime, mtime))
+ ## FIXME: uid/gid / uname/gname handling comes here! TODO
+ except OSError, e:
try: dst_stream.close()
except: pass
- except S3DownloadError, e:
- error(u"%s: download failed too many times. Skipping that file." % file)
- continue
- speed_fmt = formatSize(response["speed"], human_readable = True, floating_point = True)
- if not Config().progress_meter:
- output(u"File '%s' stored as '%s' (%d bytes in %0.1f seconds, %0.2f %sB/s) %s" %
- (uri, unicodise(dst_file), response["size"], response["elapsed"], speed_fmt[0], speed_fmt[1],
- seq_label))
- total_size += response["size"]
+ if e.errno == errno.EEXIST:
+ warning(u"%s exists - not overwriting" % (dst_file))
+ return
+ if e.errno in (errno.EPERM, errno.EACCES):
+ warning(u"%s not writable: %s" % (dst_file, e.strerror))
+ return
+ raise e
- total_elapsed = time.time() - timestamp_start
- speed_fmt = formatSize(total_size/total_elapsed, human_readable = True, floating_point = True)
+ # We have to keep repeating this call because
+ # Python 2.4 doesn't support try/except/finally
+ # construction :-(
+ try: dst_stream.close()
+ except: pass
+ except S3DownloadError, e:
+ error(u"%s: download failed too many times. Skipping that file." % file)
+ return
+ speed_fmt = formatSize(response["speed"], human_readable = True, floating_point = True)
+ if not Config().progress_meter:
+ output(u"File '%s' stored as '%s' (%d bytes in %0.1f seconds, %0.2f %sB/s) %s" %
+ (uri, unicodise(dst_file), response["size"], response["elapsed"], speed_fmt[0], speed_fmt[1],
+ seq_label))
- # Only print out the result if any work has been done or
- # if the user asked for verbose output
- outstr = "Done. Downloaded %d bytes in %0.1f seconds, %0.2f %sB/s" % (total_size, total_elapsed, speed_fmt[0], speed_fmt[1])
- if total_size > 0:
- output(outstr)
- else:
- info(outstr)
def cmd_sync_local2remote(args):
- def _build_attr_header(src):
- import pwd, grp
- attrs = {}
- src = deunicodise(src)
- st = os.stat_result(os.stat(src))
- for attr in cfg.preserve_attrs_list:
- if attr == 'uname':
- try:
- val = pwd.getpwuid(st.st_uid).pw_name
- except KeyError:
- attr = "uid"
- val = st.st_uid
- warning(u"%s: Owner username not known. Storing UID=%d instead." % (unicodise(src), val))
- elif attr == 'gname':
- try:
- val = grp.getgrgid(st.st_gid).gr_name
- except KeyError:
- attr = "gid"
- val = st.st_gid
- warning(u"%s: Owner groupname not known. Storing GID=%d instead." % (unicodise(src), val))
- else:
- val = getattr(st, 'st_' + attr)
- attrs[attr] = val
- result = ""
- for k in attrs: result += "%s:%s/" % (k, attrs[k])
- return { 'x-amz-meta-s3cmd-attrs' : result[:-1] }
+
s3 = S3(cfg)
if cfg.encrypt:
@@ -1066,44 +1171,106 @@
seq = 0
file_list = local_list.keys()
file_list.sort()
- for file in file_list:
- seq += 1
- item = local_list[file]
- src = item['full_name']
- uri = S3Uri(item['remote_uri'])
- seq_label = "[%d of %d]" % (seq, local_count)
- extra_headers = copy(cfg.extra_headers)
- if cfg.preserve_attrs:
- attr_header = _build_attr_header(src)
- debug(u"attr_header: %s" % attr_header)
- extra_headers.update(attr_header)
- try:
- response = s3.object_put(src, uri, extra_headers, extra_label = seq_label)
- except S3UploadError, e:
- error(u"%s: upload failed too many times. Skipping that file." % item['full_name_unicode'])
- continue
- except InvalidFileError, e:
- warning(u"File can not be uploaded: %s" % e)
- continue
- speed_fmt = formatSize(response["speed"], human_readable = True, floating_point = True)
- if not cfg.progress_meter:
- output(u"File '%s' stored as '%s' (%d bytes in %0.1f seconds, %0.2f %sB/s) %s" %
- (item['full_name_unicode'], uri, response["size"], response["elapsed"],
- speed_fmt[0], speed_fmt[1], seq_label))
- total_size += response["size"]
+
+ if cfg.parallel and len(file_list) > 1:
+ #Disabling progress metter for parallel downloads.
+ cfg.progress_meter = False
+ #Initialize Queue
+ global q
+ q = Queue.Queue()
- total_elapsed = time.time() - timestamp_start
- total_speed = total_elapsed and total_size/total_elapsed or 0.0
- speed_fmt = formatSize(total_speed, human_readable = True, floating_point = True)
+ seq = 0
+ for file in file_list:
+ seq += 1
+ q.put([local_list[file],seq,len(file_list)])
- # Only print out the result if any work has been done or
- # if the user asked for verbose output
- outstr = "Done. Uploaded %d bytes in %0.1f seconds, %0.2f %sB/s" % (total_size, total_elapsed, speed_fmt[0], speed_fmt[1])
- if total_size > 0:
- output(outstr)
+ for i in range(cfg.workers):
+ t = threading.Thread(target=sync_local2remote_worker)
+ t.daemon = True
+ t.start()
+
+ #Necessary to ensure KeyboardInterrupt can actually kill
+ #Otherwise Queue.join() blocks until all queue elements have completed
+ while threading.active_count() > 1:
+ time.sleep(.1)
+
+ q.join()
else:
- info(outstr)
+ seq = 0
+ for file in file_list:
+ seq += 1
+ do_local2remote_work(local_list[file],seq,len(file_list))
+
+#Omitted summary data, difficult to handle with threading.
+
+def sync_local2remote_worker():
+ while True:
+ try:
+ (item,seq,total) = q.get_nowait()
+ except Queue.Empty:
+ return
+ try:
+ do_local2remote_work(item,seq,total)
+ except Exception, e:
+ report_exception(e)
+ exit
+ q.task_done()
+
+def do_local2remote_work(item,seq,total):
+ def _build_attr_header(src):
+ import pwd, grp
+ attrs = {}
+ src = deunicodise(src)
+ st = os.stat_result(os.stat(src))
+ for attr in cfg.preserve_attrs_list:
+ if attr == 'uname':
+ try:
+ val = pwd.getpwuid(st.st_uid).pw_name
+ except KeyError:
+ attr = "uid"
+ val = st.st_uid
+ warning(u"%s: Owner username not known. Storing UID=%d instead." % (unicodise(src), val))
+ elif attr == 'gname':
+ try:
+ val = grp.getgrgid(st.st_gid).gr_name
+ except KeyError:
+ attr = "gid"
+ val = st.st_gid
+ warning(u"%s: Owner groupname not known. Storing GID=%d instead." % (unicodise(src), val))
+ else:
+ val = getattr(st, 'st_' + attr)
+ attrs[attr] = val
+ result = ""
+ for k in attrs: result += "%s:%s/" % (k, attrs[k])
+ return { 'x-amz-meta-s3cmd-attrs' : result[:-1] }
+
+ s3 = S3(cfg)
+ src = item['full_name']
+ uri = S3Uri(item['remote_uri'])
+ seq_label = "[%d of %d]" % (seq, total)
+ extra_headers = copy(cfg.extra_headers)
+ if cfg.preserve_attrs:
+ attr_header = _build_attr_header(src)
+ debug(u"attr_header: %s" % attr_header)
+ extra_headers.update(attr_header)
+ if not cfg.progress_meter:
+ output(u"File '%s' started %s" %
+ (item['full_name_unicode'], seq_label))
+ try:
+ response = s3.object_put(src, uri, extra_headers, extra_label = seq_label)
+ except S3UploadError, e:
+ error(u"%s: upload failed too many times. Skipping that file." % item['full_name_unicode'])
+ return
+ except InvalidFileError, e:
+ warning(u"File can not be uploaded: %s" % e)
+ return
+ speed_fmt = formatSize(response["speed"], human_readable = True, floating_point = True)
+ if not cfg.progress_meter:
+ output(u"File '%s' stored as '%s' (%d bytes in %0.1f seconds, %0.2f %sB/s) %s" %
+ (item['full_name_unicode'], uri, response["size"], response["elapsed"],
+ speed_fmt[0], speed_fmt[1], seq_label))
+
def cmd_sync(args):
if (len(args) < 2):
raise ParameterError("Too few parameters! Expected: %s" % commands['sync']['param'])
@@ -1658,6 +1825,9 @@
optparser.add_option("-d", "--debug", dest="verbosity", action="store_const", const=logging.DEBUG, help="Enable debug output.")
optparser.add_option( "--version", dest="show_version", action="store_true", help="Show s3cmd version (%s) and exit." % (PkgInfo.version))
optparser.add_option("-F", "--follow-symlinks", dest="follow_symlinks", action="store_true", default=False, help="Follow symbolic links as if they are regular files")
+
+ optparser.add_option( "--parallel", dest="parallel", action="store_true", help="Download and upload files in parallel.")
+ optparser.add_option( "--workers", dest="workers", default=10, help="Sets the number of workers to run for uploading and downloading files (can only be used in conjunction with the --parallel argument)")
optparser.set_usage(optparser.usage + " COMMAND [parameters]")
optparser.set_description('S3cmd is a tool for managing objects in '+
@@ -1903,6 +2073,7 @@
except KeyboardInterrupt:
sys.stderr.write("See ya!\n")
+
sys.exit(1)
except Exception, e:
Jump to Line
Something went wrong with that request. Please try again.