Skip to content
This repository has been archived by the owner on Nov 4, 2018. It is now read-only.

Commit

Permalink
* S3/S3.py, s3cmd, S3/Config.py, s3cmd.1: Added --continue for
Browse files Browse the repository at this point in the history
  'get' command, improved 'get' failure resiliency.



git-svn-id: https://s3tools.svn.sourceforge.net/svnroot/s3tools/s3cmd/trunk@267 830e0280-6d2a-0410-9c65-932aecc39d9d
  • Loading branch information
mludvig committed Nov 25, 2008
1 parent eb9a4b6 commit 9197e62
Show file tree
Hide file tree
Showing 6 changed files with 109 additions and 52 deletions.
2 changes: 2 additions & 0 deletions ChangeLog
Original file line number Original file line Diff line number Diff line change
@@ -1,5 +1,7 @@
2008-11-24 Michal Ludvig <michal@logix.cz> 2008-11-24 Michal Ludvig <michal@logix.cz>


* S3/S3.py, s3cmd, S3/Config.py, s3cmd.1: Added --continue for
'get' command, improved 'get' failure resiliency.
* S3/Progress.py: Support for progress meter not starting in 0. * S3/Progress.py: Support for progress meter not starting in 0.
* S3/S3.py: improved retrying in send_request() and send_file() * S3/S3.py: improved retrying in send_request() and send_file()


Expand Down
3 changes: 3 additions & 0 deletions NEWS
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ s3cmd 0.9.9 - ???
prefix with --recursive (-r) prefix with --recursive (-r)
* Copying and moving objects, within or between buckets. * Copying and moving objects, within or between buckets.
(Andrew Ryan) (Andrew Ryan)
* Continue getting partially downloaded files with --continue
* Improved resistance to communication errors (Connection
reset by peer, etc.)


s3cmd 0.9.8.4 - 2008-11-07 s3cmd 0.9.8.4 - 2008-11-07
============= =============
Expand Down
1 change: 1 addition & 0 deletions S3/Config.py
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class Config(object):
recv_chunk = 4096 recv_chunk = 4096
human_readable_sizes = False human_readable_sizes = False
force = False force = False
get_continue = False
recursive = False recursive = False
acl_public = False acl_public = False
proxy_host = "" proxy_host = ""
Expand Down
135 changes: 87 additions & 48 deletions S3/S3.py
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -189,11 +189,11 @@ def object_put(self, filename, uri, extra_headers = None):
response = self.send_file(request, file) response = self.send_file(request, file)
return response return response


def object_get(self, uri, stream): def object_get(self, uri, stream, start_position):
if uri.type != "s3": if uri.type != "s3":
raise ValueError("Expected URI type 's3', got '%s'" % uri.type) raise ValueError("Expected URI type 's3', got '%s'" % uri.type)
request = self.create_request("OBJECT_GET", uri = uri) request = self.create_request("OBJECT_GET", uri = uri)
response = self.recv_file(request, stream) response = self.recv_file(request, stream, start_position)
return response return response


def object_delete(self, uri): def object_delete(self, uri):
Expand Down Expand Up @@ -399,35 +399,29 @@ def send_file(self, request, file, throttle = 0, retries = _max_retries):
conn.putheader(header, str(headers[header])) conn.putheader(header, str(headers[header]))
conn.endheaders() conn.endheaders()
except Exception, e: except Exception, e:
if self.config.progress_meter:
progress.done("failed")
if retries: if retries:
warning("Retrying failed request: %s (%s)" % (resource['uri'], e)) warning("Retrying failed request: %s (%s)" % (resource['uri'], e))
warning("Waiting %d sec..." % self._fail_wait(retries)) warning("Waiting %d sec..." % self._fail_wait(retries))
time.sleep(self._fail_wait(retries)) time.sleep(self._fail_wait(retries))
# Connection error -> same throttle value # Connection error -> same throttle value
return self.send_file(request, file, throttle, retries - 1) return self.send_file(request, file, throttle, retries - 1)
else: else:
raise S3UploadError("Request failed for: %s" % resource['uri']) raise S3UploadError("Upload failed for: %s" % resource['uri'])
file.seek(0) file.seek(0)
md5_hash = md5.new() md5_hash = md5.new()
try: try:
while (size_left > 0): while (size_left > 0):
debug("SendFile: Reading up to %d bytes from '%s'" % (self.config.send_chunk, file.name)) debug("SendFile: Reading up to %d bytes from '%s'" % (self.config.send_chunk, file.name))
data = file.read(self.config.send_chunk) data = file.read(self.config.send_chunk)
md5_hash.update(data) md5_hash.update(data)
conn.send(data)
if self.config.progress_meter: if self.config.progress_meter:
progress.update(delta_position = len(data)) progress.update(delta_position = len(data))
else:
debug("SendFile: Sending %d bytes to the server" % len(data))
conn.send(data)

size_left -= len(data) size_left -= len(data)
if throttle: if throttle:
time.sleep(throttle) time.sleep(throttle)
## Call progress meter from here
debug("Sent %d bytes (%d %% of %d)" % (
(size_total - size_left),
(size_total - size_left) * 100 / size_total,
size_total))
md5_computed = md5_hash.hexdigest() md5_computed = md5_hash.hexdigest()
response = {} response = {}
http_response = conn.getresponse() http_response = conn.getresponse()
Expand All @@ -442,15 +436,15 @@ def send_file(self, request, file, throttle = 0, retries = _max_retries):
progress.done("failed") progress.done("failed")
if retries: if retries:
throttle = throttle and throttle * 5 or 0.01 throttle = throttle and throttle * 5 or 0.01
warning("Request failed: %s (%s)" % (resource['uri'], e)) warning("Upload failed: %s (%s)" % (resource['uri'], e))
warning("Retrying on lower speed (throttle=%0.2f)" % throttle) warning("Retrying on lower speed (throttle=%0.2f)" % throttle)
warning("Waiting %d sec..." % self._fail_wait(retries)) warning("Waiting %d sec..." % self._fail_wait(retries))
time.sleep(self._fail_wait(retries)) time.sleep(self._fail_wait(retries))
# Connection error -> same throttle value # Connection error -> same throttle value
return self.send_file(request, file, throttle, retries - 1) return self.send_file(request, file, throttle, retries - 1)
else: else:
debug("Giving up on '%s' %s" % (file.name, e)) debug("Giving up on '%s' %s" % (file.name, e))
raise S3UploadError("Request failed for: %s" % resource['uri']) raise S3UploadError("Upload failed for: %s" % resource['uri'])


timestamp_end = time.time() timestamp_end = time.time()
response["elapsed"] = timestamp_end - timestamp_start response["elapsed"] = timestamp_end - timestamp_start
Expand Down Expand Up @@ -490,24 +484,40 @@ def send_file(self, request, file, throttle = 0, retries = _max_retries):
raise S3Error(response) raise S3Error(response)
return response return response


def recv_file(self, request, stream): def recv_file(self, request, stream, start_position = 0, retries = _max_retries):
method_string, resource, headers = request method_string, resource, headers = request
if self.config.progress_meter: if self.config.progress_meter:
progress = self.config.progress_class(stream.name, 0) progress = self.config.progress_class(stream.name, 0)
else: else:
info("Receiving file '%s', please wait..." % stream.name) info("Receiving file '%s', please wait..." % stream.name)
timestamp_start = time.time() timestamp_start = time.time()
conn = self.get_connection(resource['bucket']) try:
conn.connect() conn = self.get_connection(resource['bucket'])
conn.putrequest(method_string, self.format_uri(resource)) conn.connect()
for header in headers.keys(): conn.putrequest(method_string, self.format_uri(resource))
conn.putheader(header, str(headers[header])) for header in headers.keys():
conn.endheaders() conn.putheader(header, str(headers[header]))
response = {} if start_position > 0:
http_response = conn.getresponse() debug("Requesting Range: %d .. end" % start_position)
response["status"] = http_response.status conn.putheader("Range", "bytes=%d-" % start_position)
response["reason"] = http_response.reason conn.endheaders()
response["headers"] = convertTupleListToDict(http_response.getheaders()) response = {}
http_response = conn.getresponse()
response["status"] = http_response.status
response["reason"] = http_response.reason
response["headers"] = convertTupleListToDict(http_response.getheaders())
debug("Response: %s" % response)
except Exception, e:
if self.config.progress_meter:
progress.done("failed")
if retries:
warning("Retrying failed request: %s (%s)" % (resource['uri'], e))
warning("Waiting %d sec..." % self._fail_wait(retries))
time.sleep(self._fail_wait(retries))
# Connection error -> same throttle value
return self.recv_file(request, stream, start_position, retries - 1)
else:
raise S3DownloadError("Download failed for: %s" % resource['uri'])


if response["status"] == 307: if response["status"] == 307:
## RedirectPermanent ## RedirectPermanent
Expand All @@ -521,38 +531,67 @@ def recv_file(self, request, stream):
if response["status"] < 200 or response["status"] > 299: if response["status"] < 200 or response["status"] > 299:
raise S3Error(response) raise S3Error(response)


md5_hash = md5.new() if start_position == 0:
size_left = size_total = int(response["headers"]["content-length"]) # Only compute MD5 on the fly if we're downloading from beginning
# Otherwise we'd get a nonsense.
md5_hash = md5.new()
size_left = int(response["headers"]["content-length"])
size_total = start_position + size_left
current_position = start_position

if self.config.progress_meter: if self.config.progress_meter:
progress.total_size = size_total progress.total_size = size_total
size_recvd = 0 progress.initial_position = current_position
while (size_recvd < size_total): progress.current_position = current_position
this_chunk = size_left > self.config.recv_chunk and self.config.recv_chunk or size_left
debug("ReceiveFile: Receiving up to %d bytes from the server" % this_chunk) try:
data = http_response.read(this_chunk) while (current_position < size_total):
debug("ReceiveFile: Writing %d bytes to file '%s'" % (len(data), stream.name)) this_chunk = size_left > self.config.recv_chunk and self.config.recv_chunk or size_left
stream.write(data) data = http_response.read(this_chunk)
md5_hash.update(data) stream.write(data)
size_recvd += len(data) if start_position == 0:
## Call progress meter from here... md5_hash.update(data)
current_position += len(data)
## Call progress meter from here...
if self.config.progress_meter:
progress.update(delta_position = len(data))
conn.close()
except Exception, e:
if self.config.progress_meter: if self.config.progress_meter:
progress.update(delta_position = len(data)) progress.done("failed")
if retries:
warning("Retrying failed request: %s (%s)" % (resource['uri'], e))
warning("Waiting %d sec..." % self._fail_wait(retries))
time.sleep(self._fail_wait(retries))
# Connection error -> same throttle value
return self.recv_file(request, stream, current_position, retries - 1)
else: else:
debug("Received %d bytes (%d %% of %d)" % ( raise S3DownloadError("Download failed for: %s" % resource['uri'])
size_recvd,
size_recvd * 100 / size_total, stream.flush()
size_total))
conn.close()
progress.done("done") progress.done("done")
timestamp_end = time.time() timestamp_end = time.time()
response["md5"] = md5_hash.hexdigest()
if start_position == 0:
# Only compute MD5 on the fly if we were downloading from the beginning
response["md5"] = md5_hash.hexdigest()
else:
# Otherwise try to compute MD5 of the output file
try:
response["md5"] = hash_file_md5(stream.name)
except IOError, e:
if e.errno != errno.ENOENT:
warning("Unable to open file: %s: %s" % (stream.name, e))
warning("Unable to verify MD5. Assume it matches.")
response["md5"] = response["headers"]["etag"]

response["md5match"] = response["headers"]["etag"].find(response["md5"]) >= 0 response["md5match"] = response["headers"]["etag"].find(response["md5"]) >= 0
response["elapsed"] = timestamp_end - timestamp_start response["elapsed"] = timestamp_end - timestamp_start
response["size"] = size_recvd response["size"] = current_position
response["speed"] = response["elapsed"] and float(response["size"]) / response["elapsed"] or float(-1) response["speed"] = response["elapsed"] and float(response["size"]) / response["elapsed"] or float(-1)
if response["size"] != long(response["headers"]["content-length"]): if response["size"] != start_position + long(response["headers"]["content-length"]):
warning("Reported size (%s) does not match received size (%s)" % ( warning("Reported size (%s) does not match received size (%s)" % (
response["headers"]["content-length"], response["size"])) start_position + response["headers"]["content-length"], response["size"]))
debug("ReceiveFile: Computed MD5 = %s" % response["md5"]) debug("ReceiveFile: Computed MD5 = %s" % response["md5"])
if not response["md5match"]: if not response["md5match"]:
warning("MD5 signatures do not match: computed=%s, received=%s" % ( warning("MD5 signatures do not match: computed=%s, received=%s" % (
Expand Down
17 changes: 13 additions & 4 deletions s3cmd
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ def cmd_object_get(args):
uri_arg = args.pop(0) uri_arg = args.pop(0)
uri = S3Uri(uri_arg) uri = S3Uri(uri_arg)


start_position = 0
if destination_file: if destination_file:
destination = destination_file destination = destination_file
elif destination_dir: elif destination_dir:
Expand All @@ -265,14 +266,21 @@ def cmd_object_get(args):
dst_stream = sys.stdout dst_stream = sys.stdout
else: else:
## File ## File
if not Config().force and os.path.exists(destination):
raise ParameterError("File %s already exists. Use --force to overwrite it" % destination)
try: try:
dst_stream = open(destination, "wb") dst_stream = open(destination, "ab")
if os.path.exists(destination):
if Config().get_continue:
start_position = dst_stream.tell()
elif Config().force:
start_position = 0L
dst_stream.seek(0L)
dst_stream.truncate()
else:
raise ParameterError("File %s already exists. Use either --force or --continue or give it a new name." % destination)
except IOError, e: except IOError, e:
error("Skipping %s: %s" % (destination, e.strerror)) error("Skipping %s: %s" % (destination, e.strerror))
continue continue
response = s3.object_get(uri, dst_stream) response = s3.object_get(uri, dst_stream, start_position = start_position)
if response["headers"].has_key("x-amz-meta-s3tools-gpgenc"): if response["headers"].has_key("x-amz-meta-s3tools-gpgenc"):
gpg_decrypt(destination, response["headers"]["x-amz-meta-s3tools-gpgenc"]) gpg_decrypt(destination, response["headers"]["x-amz-meta-s3tools-gpgenc"])
response["size"] = os.stat(destination)[6] response["size"] = os.stat(destination)[6]
Expand Down Expand Up @@ -965,6 +973,7 @@ def main():
optparser.add_option("-e", "--encrypt", dest="encrypt", action="store_true", help="Encrypt files before uploading to S3.") optparser.add_option("-e", "--encrypt", dest="encrypt", action="store_true", help="Encrypt files before uploading to S3.")
optparser.add_option( "--no-encrypt", dest="encrypt", action="store_false", help="Don't encrypt files.") optparser.add_option( "--no-encrypt", dest="encrypt", action="store_false", help="Don't encrypt files.")
optparser.add_option("-f", "--force", dest="force", action="store_true", help="Force overwrite and other dangerous operations.") optparser.add_option("-f", "--force", dest="force", action="store_true", help="Force overwrite and other dangerous operations.")
optparser.add_option( "--continue", dest="get_continue", action="store_true", help="Continue getting a partially downloaded file (only for [get] command).")
optparser.add_option("-r", "--recursive", dest="recursive", action="store_true", help="Recursive upload, download or removal.") optparser.add_option("-r", "--recursive", dest="recursive", action="store_true", help="Recursive upload, download or removal.")
optparser.add_option("-P", "--acl-public", dest="acl_public", action="store_true", help="Store objects with ACL allowing read for anyone.") optparser.add_option("-P", "--acl-public", dest="acl_public", action="store_true", help="Store objects with ACL allowing read for anyone.")
optparser.add_option( "--acl-private", dest="acl_public", action="store_false", help="Store objects with default ACL allowing access for you only.") optparser.add_option( "--acl-private", dest="acl_public", action="store_false", help="Store objects with default ACL allowing access for you only.")
Expand Down
3 changes: 3 additions & 0 deletions s3cmd.1
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -113,6 +113,9 @@ Options common for all commands (where it makes sense indeed):
\fB\-f\fR, \fB\-\-force\fR \fB\-f\fR, \fB\-\-force\fR
Force overwrite and other dangerous operations. Force overwrite and other dangerous operations.
.TP .TP
\fB\-\-continue\fR
Continue getting a partially downloaded file (only for \fIget\fR command). This comes handy once download of a large file, say an ISO image, from a S3 bucket fails and a partially downloaded file is left on the disk. Unfortunately \fIput\fR command doesn't support restarting of failed upload due to Amazon S3 limitation.
.TP
\fB\-P\fR, \fB\-\-acl\-public\fR \fB\-P\fR, \fB\-\-acl\-public\fR
Store objects with permissions allowing read for anyone. Store objects with permissions allowing read for anyone.
.TP .TP
Expand Down

0 comments on commit 9197e62

Please sign in to comment.