From a8f80ea0bda5d80993ff86c3a31dd76f23832358 Mon Sep 17 00:00:00 2001 From: Sarath Lakshman Date: Wed, 19 Oct 2011 17:04:21 +0530 Subject: [PATCH 01/16] Adding changes to `recv_file` to support partial file download by specifying start-position and end-position in bytes --- S3/S3.py | 52 ++++++++++++++++++++++++++++------------------------ 1 file changed, 28 insertions(+), 24 deletions(-) diff --git a/S3/S3.py b/S3/S3.py index 7d16336ed..95a04798d 100644 --- a/S3/S3.py +++ b/S3/S3.py @@ -702,7 +702,7 @@ def send_file(self, request, file, labels, throttle = 0, retries = _max_retries) return response - def recv_file(self, request, stream, labels, start_position = 0, retries = _max_retries): + def recv_file(self, request, stream, labels, start_position = 0, retries = _max_retries, end_position = -1): method_string, resource, headers = request.get_triplet() if self.config.progress_meter: progress = self.config.progress_class(labels, 0) @@ -715,9 +715,12 @@ def recv_file(self, request, stream, labels, start_position = 0, retries = _max_ conn.putrequest(method_string, self.format_uri(resource)) for header in headers.keys(): conn.putheader(header, str(headers[header])) - if start_position > 0: + if start_position > 0 and end_position == -1: debug("Requesting Range: %d .. end" % start_position) conn.putheader("Range", "bytes=%d-" % start_position) + elif end_position != -1: + debug("Requesting Range: %d .. %d" % (start_position, end_position)) + conn.putheader("Range", "bytes=%d-%d" % (start_position, end_position)) conn.endheaders() response = {} http_response = conn.getresponse() @@ -733,7 +736,7 @@ def recv_file(self, request, stream, labels, start_position = 0, retries = _max_ warning("Waiting %d sec..." % self._fail_wait(retries)) time.sleep(self._fail_wait(retries)) # Connection error -> same throttle value - return self.recv_file(request, stream, labels, start_position, retries - 1) + return self.recv_file(request, stream, labels, start_position, retries - 1, end_position) else: raise S3DownloadError("Download failed for: %s" % resource['uri']) @@ -744,12 +747,12 @@ def recv_file(self, request, stream, labels, start_position = 0, retries = _max_ redir_hostname = getTextFromXml(response['data'], ".//Endpoint") self.set_hostname(redir_bucket, redir_hostname) warning("Redirected to: %s" % (redir_hostname)) - return self.recv_file(request, stream, labels) + return self.recv_file(request, stream, labels, start_position, retries, end_position) if response["status"] < 200 or response["status"] > 299: raise S3Error(response) - if start_position == 0: + if start_position == 0 and end_position == -1: # Only compute MD5 on the fly if we're downloading from beginning # Otherwise we'd get a nonsense. md5_hash = md5() @@ -767,7 +770,7 @@ def recv_file(self, request, stream, labels, start_position = 0, retries = _max_ this_chunk = size_left > self.config.recv_chunk and self.config.recv_chunk or size_left data = http_response.read(this_chunk) stream.write(data) - if start_position == 0: + if start_position == 0 and end_position == -1: md5_hash.update(data) current_position += len(data) ## Call progress meter from here... @@ -796,30 +799,31 @@ def recv_file(self, request, stream, labels, start_position = 0, retries = _max_ progress.update() progress.done("done") - if start_position == 0: - # Only compute MD5 on the fly if we were downloading from the beginning - response["md5"] = md5_hash.hexdigest() - else: - # Otherwise try to compute MD5 of the output file - try: - response["md5"] = hash_file_md5(stream.name) - except IOError, e: - if e.errno != errno.ENOENT: - warning("Unable to open file: %s: %s" % (stream.name, e)) - warning("Unable to verify MD5. Assume it matches.") - response["md5"] = response["headers"]["etag"] - - response["md5match"] = response["headers"]["etag"].find(response["md5"]) >= 0 + if end_position == -1: + if start_position == 0: + # Only compute MD5 on the fly if we were downloading from the beginning + response["md5"] = md5_hash.hexdigest() + else: + # Otherwise try to compute MD5 of the output file + try: + response["md5"] = hash_file_md5(stream.name) + except IOError, e: + if e.errno != errno.ENOENT: + warning("Unable to open file: %s: %s" % (stream.name, e)) + warning("Unable to verify MD5. Assume it matches.") + response["md5"] = response["headers"]["etag"] + + response["md5match"] = response["headers"]["etag"].find(response["md5"]) >= 0 + debug("ReceiveFile: Computed MD5 = %s" % response["md5"]) + if not response["md5match"]: + warning("MD5 signatures do not match: computed=%s, received=%s" % ( + response["md5"], response["headers"]["etag"])) response["elapsed"] = timestamp_end - timestamp_start response["size"] = current_position response["speed"] = response["elapsed"] and float(response["size"]) / response["elapsed"] or float(-1) if response["size"] != start_position + long(response["headers"]["content-length"]): warning("Reported size (%s) does not match received size (%s)" % ( start_position + response["headers"]["content-length"], response["size"])) - debug("ReceiveFile: Computed MD5 = %s" % response["md5"]) - if not response["md5match"]: - warning("MD5 signatures do not match: computed=%s, received=%s" % ( - response["md5"], response["headers"]["etag"])) return response __all__.append("S3") From e60ae89f25e233e012142b5ec4d744a49db845b2 Mon Sep 17 00:00:00 2001 From: Sarath Lakshman Date: Wed, 19 Oct 2011 17:43:20 +0530 Subject: [PATCH 02/16] Added `concat_files()` function A function that takes destination file handle and list of source file handles, concatenate source files data and write into destination file --- S3/Utils.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/S3/Utils.py b/S3/Utils.py index 4a83601e1..bc4568ae9 100644 --- a/S3/Utils.py +++ b/S3/Utils.py @@ -223,6 +223,24 @@ def hash_file_md5(filename): return h.hexdigest() __all__.append("hash_file_md5") +def concat_files(dest_handle, *source_handles): + """ + Read data from source file handles and write the data into dest_handle + Return md5-hash and file size + """ + h = md5() + for source in source_handles: + source.seek(0) + while True: + # Hash 32kB chunks + data = source.read(32*1024) + dest_handle.write(data) + if not data: + break + h.update(data) + return h.hexdigest(), dest_handle.tell() +__all__.append("concat_files") + def mkdir_with_parents(dir_name): """ mkdir_with_parents(dst_dir) From 8772c9a703b20ea5a71decfbc7ca5718cf00665e Mon Sep 17 00:00:00 2001 From: Sarath Lakshman Date: Wed, 19 Oct 2011 18:01:46 +0530 Subject: [PATCH 03/16] Added `object_multipart_get()` function * object_multipart_get() - Download a file from s3 by parallel download of multiple split files using Worker thread pool, merge split files and perform md5 checksum verification * Added new parameters in Config to specify worker thread numbers, file split count and to toggle parallel split download on and off --- S3/Config.py | 3 ++ S3/S3.py | 81 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 84 insertions(+) diff --git a/S3/Config.py b/S3/Config.py index 4e2a58649..a1998b748 100644 --- a/S3/Config.py +++ b/S3/Config.py @@ -81,6 +81,9 @@ class Config(object): website_index = "index.html" website_error = "" website_endpoint = "http://%(bucket)s.s3-website-%(location)s.amazonaws.com/" + parallel_multipart_download = False + parallel_multipart_threads = 10 + parallel_multipart_count = 20 ## Creating a singleton def __new__(self, configfile = None): diff --git a/S3/S3.py b/S3/S3.py index 95a04798d..65e31d47f 100644 --- a/S3/S3.py +++ b/S3/S3.py @@ -10,6 +10,8 @@ import logging import mimetypes import re +import Queue +import threading from logging import debug, info, warning, error from stat import ST_SIZE @@ -22,6 +24,7 @@ from SortedDict import SortedDict from BidirMap import BidirMap from Config import Config +from Utils import concat_files from Exceptions import * from ACL import ACL, GranteeLogDelivery from AccessLog import AccessLog @@ -350,6 +353,84 @@ def object_get(self, uri, stream, start_position = 0, extra_label = ""): response = self.recv_file(request, stream, labels, start_position) return response + def object_multipart_get(self, uri, stream, cfg, start_position = 0, extra_label = ""): + debug("Executing multipart download") + if uri.type != "s3": + raise ValueError("Expected URI type 's3', got '%s'" % uri.type) + info = self.object_info(uri) + file_size = int(info['headers']['content-length']) + file_md5sum = info['headers']['etag'].strip('"') + + multipart_ranges = [] + parts_size = file_size / cfg.parallel_multipart_count + global worker_queue + tmp_dir = os.path.join(os.path.dirname(stream.name),'tmps3') + os.makedirs(tmp_dir) + + worker_queue = Queue.Queue() + i = 0 + for offset in range(0, file_size, parts_size): + start_offset = offset + if start_offset + parts_size < file_size - 1: + end_offset = start_offset + parts_size - 1 + else: + end_offset = file_size - 1 + + part_stream = open(os.path.join(tmp_dir, "%s.part-%d" %(os.path.basename(stream.name), i)),'wb+') + item = (i, start_offset, end_offset, uri, part_stream) + + multipart_ranges.append(item) + worker_queue.put(item) + i+=1 + + def get_worker(): + while True: + try: + item = worker_queue.get_nowait() + except Queue.Empty: + return + offset = item[0] + start_position = item[1] + end_position = item[2] + uri = item[3] + stream = item[4] + request = self.create_request("OBJECT_GET", uri = uri) + labels = { 'source' : unicodise(uri.uri()), 'destination' : unicodise(stream.name), 'extra' : extra_label } + self.recv_file(request, stream, labels, start_position, retries = self._max_retries, end_position = end_position) + + for i in range(cfg.parallel_multipart_threads): + t = threading.Thread(target=get_worker) + t.daemon = True + t.start() + + timestamp_start = time.time() + while threading.active_count() > 1: + time.sleep(0.1) + debug("Download of file parts complete") + source_streams = map(lambda x: x[4], multipart_ranges) + md5_hash_download, download_size = concat_files(stream, *source_streams) + timestamp_end = time.time() + for item in multipart_ranges: + item[4].close() + os.unlink(item[4].name) + os.rmdir(tmp_dir) + stream.flush() + + debug("ReceivedFile: Computed MD5 = %s" % md5_hash_download) + response = {} + response["headers"] = info["headers"] + response["md5match"] = file_md5sum.strip() == md5_hash_download.strip() + response["md5"] = file_md5sum + if not response["md5match"]: + warning("MD5 signatures do not match: computed=%s, received=%s" % (md5_hash_download, file_md5sum)) + + response["elapsed"] = timestamp_end - timestamp_start + response["size"] = file_size + response["speed"] = response["elapsed"] and float(response["size"]) / response["elapsed"] or float(-1) + if response["size"] != download_size: + warning("Reported size (%s) does not match received size (%s)" % (download_size, response["size"])) + return response + def object_delete(self, uri): if uri.type != "s3": raise ValueError("Expected URI type 's3', got '%s'" % uri.type) From 7bacce0def3ead528a8f0074dfaa6bbcdedbb475 Mon Sep 17 00:00:00 2001 From: Sarath Lakshman Date: Wed, 19 Oct 2011 18:11:04 +0530 Subject: [PATCH 04/16] Added changes to s3cmd for switching parallel split download on/off based on configuration file --- s3cmd | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/s3cmd b/s3cmd index 7d93a5bf7..48ebf5671 100755 --- a/s3cmd +++ b/s3cmd @@ -663,6 +663,11 @@ def cmd_sync_remote2local(args): attrs[key] = val return attrs + if cfg.parallel_multipart_download: + #Disable progress meter + cfg.progress_meter = False + + s3 = S3(Config()) destination_base = args[-1] @@ -744,7 +749,10 @@ def cmd_sync_remote2local(args): os.close(os.open(dst_file, open_flags)) # Yeah I know there is a race condition here. Sadly I don't know how to open() in exclusive mode. dst_stream = open(dst_file, "wb") - response = s3.object_get(uri, dst_stream, extra_label = seq_label) + if cfg.parallel_multipart_download == False: + response = s3.object_get(uri, dst_stream, extra_label = seq_label) + else: + response = s3.object_multipart_get(uri, dst_stream, cfg, extra_label = seq_label) dst_stream.close() if response['headers'].has_key('x-amz-meta-s3cmd-attrs') and cfg.preserve_attrs: attrs = _parse_attrs_header(response['headers']['x-amz-meta-s3cmd-attrs']) From e515259595fcf835019defb0a7775429cb89b850 Mon Sep 17 00:00:00 2001 From: Sarath Lakshman Date: Thu, 20 Oct 2011 16:19:18 +0530 Subject: [PATCH 05/16] Cleanup handlers for temporary files and disk usage improvement to concat_files() method to readily remove split files once data is read. --- S3/S3.py | 5 +---- S3/Utils.py | 8 +++++++- s3cmd | 26 ++++++++++++++++++++++---- 3 files changed, 30 insertions(+), 9 deletions(-) diff --git a/S3/S3.py b/S3/S3.py index 65e31d47f..cec9eb722 100644 --- a/S3/S3.py +++ b/S3/S3.py @@ -408,11 +408,8 @@ def get_worker(): time.sleep(0.1) debug("Download of file parts complete") source_streams = map(lambda x: x[4], multipart_ranges) - md5_hash_download, download_size = concat_files(stream, *source_streams) + md5_hash_download, download_size = concat_files(stream, True, *source_streams) timestamp_end = time.time() - for item in multipart_ranges: - item[4].close() - os.unlink(item[4].name) os.rmdir(tmp_dir) stream.flush() diff --git a/S3/Utils.py b/S3/Utils.py index bc4568ae9..16b95f06c 100644 --- a/S3/Utils.py +++ b/S3/Utils.py @@ -223,7 +223,7 @@ def hash_file_md5(filename): return h.hexdigest() __all__.append("hash_file_md5") -def concat_files(dest_handle, *source_handles): +def concat_files(dest_handle, unlink = True, *source_handles): """ Read data from source file handles and write the data into dest_handle Return md5-hash and file size @@ -238,9 +238,15 @@ def concat_files(dest_handle, *source_handles): if not data: break h.update(data) + + if unlink == True: + source.close() + os.unlink(source.name) + return h.hexdigest(), dest_handle.tell() __all__.append("concat_files") + def mkdir_with_parents(dir_name): """ mkdir_with_parents(dst_dir) diff --git a/s3cmd b/s3cmd index 48ebf5671..5c64135f8 100755 --- a/s3cmd +++ b/s3cmd @@ -32,6 +32,16 @@ from distutils.spawn import find_executable def output(message): sys.stdout.write(message + "\n") +def clean_tempfiles(dest_handler): + path = os.path.join(os.path.dirname(dest_handler.name),'tmps3') + if os.path.exists(path): + for f in os.listdir(path): + fpath = os.path.join(path,f) + os.unlink(fpath) + os.rmdir(path) + if os.stat(dest_handler.name).st_size == 0: + os.unlink(dest_handler.name) + def check_args_type(args, type, verbose_type): for arg in args: if S3Uri(arg).type != type: @@ -764,7 +774,9 @@ def cmd_sync_remote2local(args): os.utime(dst_file, (atime, mtime)) ## FIXME: uid/gid / uname/gname handling comes here! TODO except OSError, e: - try: dst_stream.close() + try: + dst_stream.close() + clean_tempfiles(dst_stream) except: pass if e.errno == errno.EEXIST: warning(u"%s exists - not overwriting" % (dst_file)) @@ -777,19 +789,25 @@ def cmd_sync_remote2local(args): continue raise e except KeyboardInterrupt: - try: dst_stream.close() + try: + dst_stream.close() + clean_tempfiles(dst_stream) except: pass warning(u"Exiting after keyboard interrupt") return except Exception, e: - try: dst_stream.close() + try: + dst_stream.close() + clean_tempfiles(dst_stream) except: pass error(u"%s: %s" % (file, e)) continue # We have to keep repeating this call because # Python 2.4 doesn't support try/except/finally # construction :-( - try: dst_stream.close() + try: + dst_stream.close() + clean_tempfiles(dst_stream) except: pass except S3DownloadError, e: error(u"%s: download failed too many times. Skipping that file." % file) From 16c3db43dc18398a1e7e0ac2791807ca0f356be9 Mon Sep 17 00:00:00 2001 From: Sarath Lakshman Date: Thu, 20 Oct 2011 16:20:46 +0530 Subject: [PATCH 06/16] Changed threading.active_count() to threading.activeCount() for backward compatibility with python 2.4 --- S3/S3.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/S3/S3.py b/S3/S3.py index cec9eb722..8ac7618e1 100644 --- a/S3/S3.py +++ b/S3/S3.py @@ -404,7 +404,7 @@ def get_worker(): t.start() timestamp_start = time.time() - while threading.active_count() > 1: + while threading.activeCount() > 1: time.sleep(0.1) debug("Download of file parts complete") source_streams = map(lambda x: x[4], multipart_ranges) From abf8a6b42450682bec78fbd63c7101363c1e00b1 Mon Sep 17 00:00:00 2001 From: Sarath Lakshman Date: Mon, 24 Oct 2011 17:11:42 +0530 Subject: [PATCH 07/16] Added parameters signing support for auth signature calculation method sign() --- S3/S3.py | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/S3/S3.py b/S3/S3.py index 8ac7618e1..c13cfa454 100644 --- a/S3/S3.py +++ b/S3/S3.py @@ -63,7 +63,7 @@ def format_param_str(self): return param_str and "?" + param_str[1:] def sign(self): - h = self.method_string + "\n" + h = self.method_string + "\n" h += self.headers.get("content-md5", "")+"\n" h += self.headers.get("content-type", "")+"\n" h += self.headers.get("date", "")+"\n" @@ -73,9 +73,20 @@ def sign(self): if self.resource['bucket']: h += "/" + self.resource['bucket'] h += self.resource['uri'] + + tmp_params = "" + for parameter in self.params: + if parameter in ['uploads', 'partNumber', 'uploadId', 'acl', 'location', 'logging', 'torrent']: + if self.params[parameter] != "": + tmp_params += '&%s=%s' %(parameter, self.params[parameter]) + else: + tmp_params += '&%s' %parameter + + if tmp_params != "": + h+='?'+tmp_params[1:].encode('UTF-8') + debug("SignHeaders: " + repr(h)) signature = sign_string(h) - self.headers["Authorization"] = "AWS "+self.s3.config.access_key+":"+signature def get_triplet(self): From 14d3015085b88cff26755661acc9b0b70097eacc Mon Sep 17 00:00:00 2001 From: Sarath Lakshman Date: Mon, 24 Oct 2011 17:55:02 +0530 Subject: [PATCH 08/16] Added multipart upload support. Multipart upload can be enabled by adding parallel_multipart_upload = True in config file --- S3/Config.py | 1 + S3/S3.py | 210 +++++++++++++++++++++++++++++++++++++++++++++------ s3cmd | 5 +- 3 files changed, 193 insertions(+), 23 deletions(-) diff --git a/S3/Config.py b/S3/Config.py index a1998b748..751d13f41 100644 --- a/S3/Config.py +++ b/S3/Config.py @@ -82,6 +82,7 @@ class Config(object): website_error = "" website_endpoint = "http://%(bucket)s.s3-website-%(location)s.amazonaws.com/" parallel_multipart_download = False + parallel_multipart_upload = False parallel_multipart_threads = 10 parallel_multipart_count = 20 diff --git a/S3/S3.py b/S3/S3.py index c13cfa454..8f1716fb3 100644 --- a/S3/S3.py +++ b/S3/S3.py @@ -24,11 +24,12 @@ from SortedDict import SortedDict from BidirMap import BidirMap from Config import Config -from Utils import concat_files +from Utils import concat_files, hash_file_md5 from Exceptions import * from ACL import ACL, GranteeLogDelivery from AccessLog import AccessLog from S3Uri import S3Uri +from email.utils import formatdate __all__ = [] class S3Request(object): @@ -81,10 +82,8 @@ def sign(self): tmp_params += '&%s=%s' %(parameter, self.params[parameter]) else: tmp_params += '&%s' %parameter - if tmp_params != "": h+='?'+tmp_params[1:].encode('UTF-8') - debug("SignHeaders: " + repr(h)) signature = sign_string(h) self.headers["Authorization"] = "AWS "+self.s3.config.access_key+":"+signature @@ -102,7 +101,8 @@ class S3(object): PUT = 0x02, HEAD = 0x04, DELETE = 0x08, - MASK = 0x0F, + POST = 0x20, + MASK = 0xFF, ) targets = BidirMap( @@ -121,6 +121,7 @@ class S3(object): OBJECT_PUT = targets["OBJECT"] | http_methods["PUT"], OBJECT_GET = targets["OBJECT"] | http_methods["GET"], OBJECT_HEAD = targets["OBJECT"] | http_methods["HEAD"], + OBJECT_POST = targets["OBJECT"] | http_methods["POST"], OBJECT_DELETE = targets["OBJECT"] | http_methods["DELETE"], ) @@ -323,6 +324,145 @@ def website_delete(self, uri, bucket_location = None): return response + def object_multipart_upload(self, filename, uri, cfg, extra_headers = None, extra_label = ""): + if uri.type != "s3": + raise ValueError("Expected URI type 's3', got '%s'" % uri.type) + + if not os.path.isfile(filename): + raise InvalidFileError(u"%s is not a regular file" % unicodise(filename)) + try: + file = open(filename, "rb") + file_size = os.stat(filename)[ST_SIZE] + except (IOError, OSError), e: + raise InvalidFileError(u"%s: %s" % (unicodise(filename), e.strerror)) + + parts_size = file_size / cfg.parallel_multipart_count + if parts_size < 5*1024*1024: + warning("File part size is less than minimum required size (5 MB). Disabled parallel multipart upload") + return self.object_put(filename, uri, extra_headers = extra_headers, extra_label = extra_label) + + headers = SortedDict(ignore_case = True) + if extra_headers: + headers.update(extra_headers) + + content_type = self.config.mime_type + if not content_type and self.config.guess_mime_type: + content_type = mimetypes.guess_type(filename)[0] + if not content_type: + content_type = self.config.default_mime_type + debug("Content-Type set to '%s'" % content_type) + headers["content-type"] = content_type + if self.config.acl_public: + headers["x-amz-acl"] = "public-read" + if self.config.reduced_redundancy: + headers["x-amz-storage-class"] = "REDUCED_REDUNDANCY" + + headers = {} + headers['date'] = formatdate(usegmt=True) + initiate_request = self.create_request("OBJECT_POST", headers = headers, uri = uri, uploads='') + initiate_response = self.send_request(initiate_request) + upload_id = getTextFromXml(initiate_response["data"], ".//UploadId") + #Upload single file size + + debug("Upload ID = %s" %upload_id) + + multipart_ranges = [] + global upload_worker_queue + global part_upload_list + + part_upload_list = {} + upload_worker_queue = Queue.Queue() + i = 1 + for offset in range(0, file_size, parts_size): + start_offset = offset + if start_offset + parts_size < file_size - 1: + end_offset = start_offset + parts_size - 1 + else: + end_offset = file_size - 1 + + item = {'part_no':i, 'start_position':start_offset, 'end_position':end_offset, 'uri':uri, 'upload_id':upload_id, 'filename':filename} + + multipart_ranges.append(item) + upload_worker_queue.put(item) + i+=1 + + def part_upload_worker(): + while True: + try: + part_info = upload_worker_queue.get_nowait() + except Queue.Empty: + return + part_number = part_info['part_no'] + start_position = part_info['start_position'] + end_position = part_info['end_position'] + uri = part_info['uri'] + upload_id = part_info['upload_id'] + filename = part_info['filename'] + + file = open(filename, 'rb') + + headers = SortedDict(ignore_case = True) + #if extra_headers: + # headers.update(extra_headers) + + headers["content-length"] = end_position - start_position + 1 + headers['Expect'] = '100-continue' + + request = self.create_request("OBJECT_PUT", uri = uri, headers = headers, partNumber = part_number, uploadId = upload_id) + labels = { 'source' : unicodise(filename), 'destination' : unicodise(uri.uri()), 'extra' : extra_label } + try: + response = self.send_file(request, file, labels, retries = self._max_retries, part_info = part_info) + except S3UploadError, e: + self.abort_multipart_upload(uri, upload_id) + file.close() + raise S3UploadError("Failed to upload part-%d to S3" %part_info['part_no']) + part_upload_list[part_number] = response["headers"]["etag"].strip('"\'') + file.close() + + for i in range(cfg.parallel_multipart_threads): + t = threading.Thread(target=part_upload_worker) + t.daemon = True + t.start() + + timestamp_start = time.time() + while threading.activeCount() > 1: + time.sleep(0.1) + debug("Upload of file parts complete") + + body = "\n" + for part in part_upload_list.keys(): + body += " \n" + body += " %d\n" %part + body += " %s\n" %part_upload_list[part] + body += " \n" + body += "" + + complete_request = self.create_request("OBJECT_POST", uri = uri, uploadId = upload_id) + response = self.send_request(complete_request, body) + timestamp_end = time.time() + + object_info = self.object_info(uri) + upload_size = int(object_info['headers']['content-length']) + #file_md5sum = info['headers']['etag'].strip('"') + + response = {} + response["headers"] = object_info["headers"] + #response["md5match"] = file_md5sum.strip() == md5_hash_file.strip() + response["md5"] = md5_hash + #if not response["md5match"]: + # warning("MD5 signatures do not match: computed=%s, received=%s" % (md5_hash_file, file_md5sum)) + # warning("Aborting file upload") + # self.abort_multipart_upload(uri, upload_id) + # raise S3UploadError + + response["elapsed"] = timestamp_end - timestamp_start + response["size"] = file_size + response["speed"] = response["elapsed"] and float(response["size"]) / response["elapsed"] or float(-1) + if response["size"] != upload_size: + warning("Reported size (%s) does not match received size (%s)" % (upload_size, response["size"])) + return response + + def object_put(self, filename, uri, extra_headers = None, extra_label = ""): # TODO TODO # Make it consistent with stream-oriented object_get() @@ -353,7 +493,7 @@ def object_put(self, filename, uri, extra_headers = None, extra_label = ""): headers["x-amz-storage-class"] = "REDUCED_REDUNDANCY" request = self.create_request("OBJECT_PUT", uri = uri, headers = headers) labels = { 'source' : unicodise(filename), 'destination' : unicodise(uri.uri()), 'extra' : extra_label } - response = self.send_file(request, file, labels) + response = self.send_file(request, file, labels, retries = self._max_retries) return response def object_get(self, uri, stream, start_position = 0, extra_label = ""): @@ -608,6 +748,7 @@ def create_request(self, operation, uri = None, bucket = None, object = None, he request = S3Request(self, method_string, resource, headers, params) debug("CreateRequest: resource[uri]=" + resource['uri']) + debug("Request: headers="+str(headers)) return request def _fail_wait(self, retries): @@ -624,6 +765,7 @@ def send_request(self, request, body = None, retries = _max_retries): for header in headers.keys(): headers[header] = str(headers[header]) conn = self.get_connection(resource['bucket']) + debug("Sending Request: method:%s body: %s uri: %s headers:%s" %(method_string,body,self.format_uri(resource),str(headers))) conn.request(method_string, self.format_uri(resource), body, headers) response = {} http_response = conn.getresponse() @@ -666,13 +808,24 @@ def send_request(self, request, body = None, retries = _max_retries): return response - def send_file(self, request, file, labels, throttle = 0, retries = _max_retries): + def abort_multipart_upload(self, uri, upload_id): + headers = {} + headers['date'] = time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime()) + request = self.create_request("OBJECT_DELETE", headers = headers, uri = uri, UploadId = upload_id) + response = self.send_request(request) + return response + + def send_file(self, request, file, labels, throttle = 0, retries = _max_retries, part_info = None): method_string, resource, headers = request.get_triplet() size_left = size_total = headers.get("content-length") if self.config.progress_meter: progress = self.config.progress_class(labels, size_total) else: - info("Sending file '%s', please wait..." % file.name) + if part_info: + info("Sending file '%s' part-%d, please wait..." % (file.name, part_info['part_no'])) + else: + info("Sending file '%s', please wait..." % file.name) + timestamp_start = time.time() try: conn = self.get_connection(resource['bucket']) @@ -689,15 +842,24 @@ def send_file(self, request, file, labels, throttle = 0, retries = _max_retries) warning("Waiting %d sec..." % self._fail_wait(retries)) time.sleep(self._fail_wait(retries)) # Connection error -> same throttle value - return self.send_file(request, file, labels, throttle, retries - 1) + return self.send_file(request, file, labels, throttle, retries - 1, part_info) else: raise S3UploadError("Upload failed for: %s" % resource['uri']) - file.seek(0) + + if part_info: + file.seek(part_info['start_position']) + else: + file.seek(0) + md5_hash = md5() try: while (size_left > 0): #debug("SendFile: Reading up to %d bytes from '%s'" % (self.config.send_chunk, file.name)) - data = file.read(self.config.send_chunk) + if size_left < self.config.send_chunk: + chunk_size = size_left + else: + chunk_size = self.config.send_chunk + data = file.read(chunk_size) md5_hash.update(data) conn.send(data) if self.config.progress_meter: @@ -718,6 +880,7 @@ def send_file(self, request, file, labels, throttle = 0, retries = _max_retries) except Exception, e: if self.config.progress_meter: progress.done("failed") + debug("Retries:"+str(retries)) if retries: if retries < self._max_retries: throttle = throttle and throttle * 5 or 0.01 @@ -726,7 +889,7 @@ def send_file(self, request, file, labels, throttle = 0, retries = _max_retries) warning("Waiting %d sec..." % self._fail_wait(retries)) time.sleep(self._fail_wait(retries)) # Connection error -> same throttle value - return self.send_file(request, file, labels, throttle, retries - 1) + return self.send_file(request, file, labels, throttle, retries - 1, part_info) else: debug("Giving up on '%s' %s" % (file.name, e)) raise S3UploadError("Upload failed for: %s" % resource['uri']) @@ -748,7 +911,7 @@ def send_file(self, request, file, labels, throttle = 0, retries = _max_retries) redir_hostname = getTextFromXml(response['data'], ".//Endpoint") self.set_hostname(redir_bucket, redir_hostname) warning("Redirected to: %s" % (redir_hostname)) - return self.send_file(request, file, labels) + return self.send_file(request, file, labels, retries, part_info) # S3 from time to time doesn't send ETag back in a response :-( # Force re-upload here. @@ -771,7 +934,7 @@ def send_file(self, request, file, labels, throttle = 0, retries = _max_retries) warning("Upload failed: %s (%s)" % (resource['uri'], S3Error(response))) warning("Waiting %d sec..." % self._fail_wait(retries)) time.sleep(self._fail_wait(retries)) - return self.send_file(request, file, labels, throttle, retries - 1) + return self.send_file(request, file, labels, throttle, retries - 1, part_info) else: warning("Too many failures. Giving up on '%s'" % (file.name)) raise S3UploadError @@ -779,15 +942,17 @@ def send_file(self, request, file, labels, throttle = 0, retries = _max_retries) ## Non-recoverable error raise S3Error(response) - debug("MD5 sums: computed=%s, received=%s" % (md5_computed, response["headers"]["etag"])) - if response["headers"]["etag"].strip('"\'') != md5_hash.hexdigest(): - warning("MD5 Sums don't match!") - if retries: - warning("Retrying upload of %s" % (file.name)) - return self.send_file(request, file, labels, throttle, retries - 1) - else: - warning("Too many failures. Giving up on '%s'" % (file.name)) - raise S3UploadError + if not part_info: + debug("MD5 sums: computed=%s, received=%s" % (md5_computed, response["headers"]["etag"])) + if response["headers"]["etag"].strip('"\'') != md5_hash.hexdigest(): + warning("MD5 Sums don't match!") + if retries: + warning("Retrying upload of %s" % (file.name)) + return self.send_file(request, file, labels, throttle, retries - 1, part_info) + else: + warning("Too many failures. Giving up on '%s'" % (file.name)) + raise S3UploadError + return response @@ -797,6 +962,7 @@ def recv_file(self, request, stream, labels, start_position = 0, retries = _max_ progress = self.config.progress_class(labels, 0) else: info("Receiving file '%s', please wait..." % stream.name) + stream.seek(0) timestamp_start = time.time() try: conn = self.get_connection(resource['bucket']) diff --git a/s3cmd b/s3cmd index 5c64135f8..1c4112c1b 100755 --- a/s3cmd +++ b/s3cmd @@ -945,7 +945,10 @@ def cmd_sync_local2remote(args): attr_header = _build_attr_header(src) debug(u"attr_header: %s" % attr_header) extra_headers.update(attr_header) - response = s3.object_put(src, uri, extra_headers, extra_label = seq_label) + if cfg.parallel_multipart_upload: + response = s3.object_multipart_upload(src, uri, cfg, extra_headers, extra_label = seq_label) + else: + response = s3.object_put(src, uri, extra_headers, extra_label = seq_label) except InvalidFileError, e: warning(u"File can not be uploaded: %s" % e) continue From 9f3003c9ed5fda96232027091f0429d4efede858 Mon Sep 17 00:00:00 2001 From: Sarath Lakshman Date: Tue, 25 Oct 2011 13:40:43 +0530 Subject: [PATCH 09/16] Replaced email.Util.formatdate() with time module - ensure compatibility with python 2.4 --- S3/S3.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/S3/S3.py b/S3/S3.py index 8f1716fb3..509b07c45 100644 --- a/S3/S3.py +++ b/S3/S3.py @@ -29,7 +29,6 @@ from ACL import ACL, GranteeLogDelivery from AccessLog import AccessLog from S3Uri import S3Uri -from email.utils import formatdate __all__ = [] class S3Request(object): @@ -358,7 +357,7 @@ def object_multipart_upload(self, filename, uri, cfg, extra_headers = None, extr headers["x-amz-storage-class"] = "REDUCED_REDUNDANCY" headers = {} - headers['date'] = formatdate(usegmt=True) + headers['date'] = time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime()) initiate_request = self.create_request("OBJECT_POST", headers = headers, uri = uri, uploads='') initiate_response = self.send_request(initiate_request) upload_id = getTextFromXml(initiate_response["data"], ".//UploadId") From 077bf4c408e0663ed43f9028c099a3adfe7aadb9 Mon Sep 17 00:00:00 2001 From: Sarath Lakshman Date: Thu, 27 Oct 2011 15:04:06 +0530 Subject: [PATCH 10/16] FIX: Correct split numbering, unsymmetric file split issue --- S3/S3.py | 37 ++++++++++++++++++++++++++++--------- 1 file changed, 28 insertions(+), 9 deletions(-) diff --git a/S3/S3.py b/S3/S3.py index 509b07c45..d4ca19f99 100644 --- a/S3/S3.py +++ b/S3/S3.py @@ -335,11 +335,15 @@ def object_multipart_upload(self, filename, uri, cfg, extra_headers = None, extr except (IOError, OSError), e: raise InvalidFileError(u"%s: %s" % (unicodise(filename), e.strerror)) - parts_size = file_size / cfg.parallel_multipart_count + parts_size = file_size / cfg.parallel_multipart_count + debug("File size=%d parts size=%d" %(file_size, parts_size)) if parts_size < 5*1024*1024: warning("File part size is less than minimum required size (5 MB). Disabled parallel multipart upload") return self.object_put(filename, uri, extra_headers = extra_headers, extra_label = extra_label) + uri_original = uri + uri = S3Uri(uri.uri()+'_S3__tmp') + headers = SortedDict(ignore_case = True) if extra_headers: headers.update(extra_headers) @@ -374,8 +378,11 @@ def object_multipart_upload(self, filename, uri, cfg, extra_headers = None, extr i = 1 for offset in range(0, file_size, parts_size): start_offset = offset - if start_offset + parts_size < file_size - 1: + + if start_offset + parts_size - 1 < file_size: end_offset = start_offset + parts_size - 1 + if i == cfg.parallel_multipart_count: + end_offset = file_size - 1 else: end_offset = file_size - 1 @@ -383,7 +390,10 @@ def object_multipart_upload(self, filename, uri, cfg, extra_headers = None, extr multipart_ranges.append(item) upload_worker_queue.put(item) + debug("Part %d start=%d end=%d (part size=%d)" %(i, start_offset, end_offset, parts_size)) i+=1 + if end_offset == file_size - 1: + break def part_upload_worker(): while True: @@ -507,9 +517,9 @@ def object_multipart_get(self, uri, stream, cfg, start_position = 0, extra_label debug("Executing multipart download") if uri.type != "s3": raise ValueError("Expected URI type 's3', got '%s'" % uri.type) - info = self.object_info(uri) - file_size = int(info['headers']['content-length']) - file_md5sum = info['headers']['etag'].strip('"') + object_info = self.object_info(uri) + file_size = int(object_info['headers']['content-length']) + file_md5sum = object_info['headers']['etag'].strip('"') multipart_ranges = [] parts_size = file_size / cfg.parallel_multipart_count @@ -518,11 +528,13 @@ def object_multipart_get(self, uri, stream, cfg, start_position = 0, extra_label os.makedirs(tmp_dir) worker_queue = Queue.Queue() - i = 0 + i = 1 for offset in range(0, file_size, parts_size): start_offset = offset - if start_offset + parts_size < file_size - 1: + if start_offset + parts_size - 1 < file_size: end_offset = start_offset + parts_size - 1 + if i == cfg.parallel_multipart_count: + end_offset = file_size - 1 else: end_offset = file_size - 1 @@ -533,6 +545,10 @@ def object_multipart_get(self, uri, stream, cfg, start_position = 0, extra_label worker_queue.put(item) i+=1 + if end_offset == file_size - 1: + break + + def get_worker(): while True: try: @@ -565,7 +581,7 @@ def get_worker(): debug("ReceivedFile: Computed MD5 = %s" % md5_hash_download) response = {} - response["headers"] = info["headers"] + response["headers"] = object_info["headers"] response["md5match"] = file_md5sum.strip() == md5_hash_download.strip() response["md5"] = file_md5sum if not response["md5match"]: @@ -1039,7 +1055,10 @@ def recv_file(self, request, stream, labels, start_position = 0, retries = _max_ warning("Waiting %d sec..." % self._fail_wait(retries)) time.sleep(self._fail_wait(retries)) # Connection error -> same throttle value - return self.recv_file(request, stream, labels, current_position, retries - 1) + if end_position != -1: + return self.recv_file(request, stream, labels, current_position, retries - 1) + else: + return self.recv_file(request, stream, labels, start_position, retries - 1, end_position) else: raise S3DownloadError("Download failed for: %s" % resource['uri']) From bd70d2b1fcac272021d7c3704e2ef8c4f89a8fed Mon Sep 17 00:00:00 2001 From: Sarath Lakshman Date: Tue, 1 Nov 2011 21:21:46 +0530 Subject: [PATCH 11/16] s3cmd info - fix to show correct md5sum for multipart uploaded file based on custom md5 meta header --- S3/S3.py | 5 ++--- s3cmd | 5 ++++- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/S3/S3.py b/S3/S3.py index d4ca19f99..b7387bb28 100644 --- a/S3/S3.py +++ b/S3/S3.py @@ -341,9 +341,8 @@ def object_multipart_upload(self, filename, uri, cfg, extra_headers = None, extr warning("File part size is less than minimum required size (5 MB). Disabled parallel multipart upload") return self.object_put(filename, uri, extra_headers = extra_headers, extra_label = extra_label) - uri_original = uri - uri = S3Uri(uri.uri()+'_S3__tmp') - + md5_hash = hash_file_md5(filename) + info("Calculating md5sum for %s" %filename) headers = SortedDict(ignore_case = True) if extra_headers: headers.update(extra_headers) diff --git a/s3cmd b/s3cmd index 1c4112c1b..461bccd09 100755 --- a/s3cmd +++ b/s3cmd @@ -573,7 +573,10 @@ def cmd_info(args): output(u" File size: %s" % info['headers']['content-length']) output(u" Last mod: %s" % info['headers']['last-modified']) output(u" MIME type: %s" % info['headers']['content-type']) - output(u" MD5 sum: %s" % info['headers']['etag'].strip('"')) + if info['headers'].has_key('x-amz-meta-md5sum'): + output(u" MD5 sum: %s" % info['headers']['x-amz-meta-md5sum'].strip('"')) + else: + output(u" MD5 sum: %s" % info['headers']['etag'].strip('"')) else: info = s3.bucket_info(uri) output(u"%s (bucket):" % uri.uri()) From 942b19ff8a53048e91c89efd3b6f3199172c44af Mon Sep 17 00:00:00 2001 From: Sarath Lakshman Date: Thu, 3 Nov 2011 13:10:18 +0530 Subject: [PATCH 12/16] Fix for python 2.4 daemon thread by using setDaemon(True) by replacing daemon=True --- S3/S3.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/S3/S3.py b/S3/S3.py index b7387bb28..abb8aeebd 100644 --- a/S3/S3.py +++ b/S3/S3.py @@ -429,7 +429,7 @@ def part_upload_worker(): for i in range(cfg.parallel_multipart_threads): t = threading.Thread(target=part_upload_worker) - t.daemon = True + t.setDaemon(True) t.start() timestamp_start = time.time() @@ -565,7 +565,7 @@ def get_worker(): for i in range(cfg.parallel_multipart_threads): t = threading.Thread(target=get_worker) - t.daemon = True + t.setDaemon(True) t.start() timestamp_start = time.time() From b5f65906956bec8b3658c81d167b08c0935b9ea5 Mon Sep 17 00:00:00 2001 From: Sarath Lakshman Date: Thu, 3 Nov 2011 15:59:35 +0530 Subject: [PATCH 13/16] Disable progress bar for multipart upload --- s3cmd | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/s3cmd b/s3cmd index 461bccd09..81635e750 100755 --- a/s3cmd +++ b/s3cmd @@ -864,6 +864,10 @@ def cmd_sync_local2remote(args): for k in attrs: result += "%s:%s/" % (k, attrs[k]) return { 'x-amz-meta-s3cmd-attrs' : result[:-1] } + if cfg.parallel_multipart_upload: + #Disable progress meter + cfg.progress_meter = False + s3 = S3(cfg) if cfg.encrypt: From 2fb607a5500211a2b9d5b0574f9bdf6a16f7ece9 Mon Sep 17 00:00:00 2001 From: Sarath Lakshman Date: Thu, 3 Nov 2011 16:49:01 +0530 Subject: [PATCH 14/16] Added meta based md5 verfication for non-parallel downloader for files uploaded with multipart upload --- S3/S3.py | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/S3/S3.py b/S3/S3.py index abb8aeebd..278bb7093 100644 --- a/S3/S3.py +++ b/S3/S3.py @@ -361,6 +361,7 @@ def object_multipart_upload(self, filename, uri, cfg, extra_headers = None, extr headers = {} headers['date'] = time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime()) + headers['x-amz-meta-md5sum'] = md5_hash initiate_request = self.create_request("OBJECT_POST", headers = headers, uri = uri, uploads='') initiate_response = self.send_request(initiate_request) upload_id = getTextFromXml(initiate_response["data"], ".//UploadId") @@ -519,6 +520,11 @@ def object_multipart_get(self, uri, stream, cfg, start_position = 0, extra_label object_info = self.object_info(uri) file_size = int(object_info['headers']['content-length']) file_md5sum = object_info['headers']['etag'].strip('"') + if len(file_md5sum.split('-')) == 2: + try: + file_md5sum = object_info['headers']['x-amz-meta-md5sum'] + except: + warning('md5sum meta information not found in multipart uploaded file') multipart_ranges = [] parts_size = file_size / cfg.parallel_multipart_count @@ -1085,7 +1091,14 @@ def recv_file(self, request, stream, labels, start_position = 0, retries = _max_ warning("Unable to verify MD5. Assume it matches.") response["md5"] = response["headers"]["etag"] - response["md5match"] = response["headers"]["etag"].find(response["md5"]) >= 0 + file_md5sum = response["headers"]["etag"].strip('"\'') + if len(response["headers"]["etag"].split('-')) == 2: + try: + file_md5sum = response['headers']['x-amz-meta-md5sum'] + except: + warning('md5sum meta information not found in multipart uploaded file') + + response["md5match"] = file_md5sum == response["md5"] debug("ReceiveFile: Computed MD5 = %s" % response["md5"]) if not response["md5match"]: warning("MD5 signatures do not match: computed=%s, received=%s" % ( From b30fd2054f7dca64f7fca926b6b8c7adad58aa44 Mon Sep 17 00:00:00 2001 From: Sarath Lakshman Date: Sun, 6 Nov 2011 14:35:45 +0530 Subject: [PATCH 15/16] Added exit_status for s3cmd program s3cmd does not return valid exit status codes. Hence it is unable to identify whether the program succeeded or failed (with cause of failure) This commit adds exit status for s3cmd sync upload, sync download, get and put operations Exit codes are as follows : SIZE_MISMATCH=1, MD5_MISMATCH=2, RETRIES_EXCEEDED=3, UPLOAD_ABORT=4, MD5_META_NOTFOUND=5, KEYBOARD_INTERRUPT=6 --- S3/S3.py | 47 ++++++++++++++++++++++++++++++++++++----------- s3cmd | 15 ++++++++++++--- 2 files changed, 48 insertions(+), 14 deletions(-) diff --git a/S3/S3.py b/S3/S3.py index 278bb7093..614e61ef7 100644 --- a/S3/S3.py +++ b/S3/S3.py @@ -130,12 +130,25 @@ class S3(object): "BucketAlreadyExists" : "Bucket '%s' already exists", } + error_codes = { + "SIZE_MISMATCH":1, + "MD5_MISMATCH":2, + "RETRIES_EXCEEDED":3, + "UPLOAD_ABORT":4, + "MD5_META_NOTFOUND":5, + "KEYBOARD_INTERRUPT":6 + } + + ## S3 sometimes sends HTTP-307 response redir_map = {} ## Maximum attempts of re-issuing failed requests _max_retries = 5 + ##Default exit status = 0 (SUCCESS) + exit_status = 0 + def __init__(self, config): self.config = config @@ -469,6 +482,7 @@ def part_upload_worker(): response["speed"] = response["elapsed"] and float(response["size"]) / response["elapsed"] or float(-1) if response["size"] != upload_size: warning("Reported size (%s) does not match received size (%s)" % (upload_size, response["size"])) + self.abort_multipart_upload() return response @@ -591,12 +605,14 @@ def get_worker(): response["md5"] = file_md5sum if not response["md5match"]: warning("MD5 signatures do not match: computed=%s, received=%s" % (md5_hash_download, file_md5sum)) + self.exit_status = self.error_codes["MD5_MISMATCH"] response["elapsed"] = timestamp_end - timestamp_start response["size"] = file_size response["speed"] = response["elapsed"] and float(response["size"]) / response["elapsed"] or float(-1) if response["size"] != download_size: warning("Reported size (%s) does not match received size (%s)" % (download_size, response["size"])) + self.exit_status = self.error_codes["SIZE_MISMATCH"] return response def object_delete(self, uri): @@ -833,6 +849,7 @@ def abort_multipart_upload(self, uri, upload_id): headers['date'] = time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime()) request = self.create_request("OBJECT_DELETE", headers = headers, uri = uri, UploadId = upload_id) response = self.send_request(request) + self.exit_status = self.error_codes["UPLOAD_ABORT"] return response def send_file(self, request, file, labels, throttle = 0, retries = _max_retries, part_info = None): @@ -864,6 +881,7 @@ def send_file(self, request, file, labels, throttle = 0, retries = _max_retries, # Connection error -> same throttle value return self.send_file(request, file, labels, throttle, retries - 1, part_info) else: + self.exit_status = self.error_codes["RETRIES_EXCEEDED"] raise S3UploadError("Upload failed for: %s" % resource['uri']) if part_info: @@ -912,6 +930,7 @@ def send_file(self, request, file, labels, throttle = 0, retries = _max_retries, return self.send_file(request, file, labels, throttle, retries - 1, part_info) else: debug("Giving up on '%s' %s" % (file.name, e)) + self.exit_status = self.error_codes["RETRIES_EXCEEDED"] raise S3UploadError("Upload failed for: %s" % resource['uri']) timestamp_end = time.time() @@ -957,22 +976,23 @@ def send_file(self, request, file, labels, throttle = 0, retries = _max_retries, return self.send_file(request, file, labels, throttle, retries - 1, part_info) else: warning("Too many failures. Giving up on '%s'" % (file.name)) + self.exit_status = self.error_codes["RETRIES_EXCEEDED"] raise S3UploadError ## Non-recoverable error raise S3Error(response) - if not part_info: - debug("MD5 sums: computed=%s, received=%s" % (md5_computed, response["headers"]["etag"])) - if response["headers"]["etag"].strip('"\'') != md5_hash.hexdigest(): - warning("MD5 Sums don't match!") - if retries: - warning("Retrying upload of %s" % (file.name)) - return self.send_file(request, file, labels, throttle, retries - 1, part_info) - else: - warning("Too many failures. Giving up on '%s'" % (file.name)) - raise S3UploadError - + debug("MD5 sums: computed=%s, received=%s" % (md5_computed, response["headers"]["etag"])) + if response["headers"]["etag"].strip('"\'') != md5_hash.hexdigest(): + warning("MD5 Sums don't match!") + self.exit_status = self.error_codes["MD5_MISMATCH"] + if retries: + warning("Retrying upload of %s" % (file.name)) + return self.send_file(request, file, labels, throttle, retries - 1, part_info) + else: + warning("Too many failures. Giving up on '%s'" % (file.name)) + self.exit_status = self.error_codes["RETRIES_EXCEEDED"] + raise S3UploadError return response @@ -1013,6 +1033,7 @@ def recv_file(self, request, stream, labels, start_position = 0, retries = _max_ # Connection error -> same throttle value return self.recv_file(request, stream, labels, start_position, retries - 1, end_position) else: + self.exit_status = self.error_codes["RETRIES_EXCEEDED"] raise S3DownloadError("Download failed for: %s" % resource['uri']) if response["status"] == 307: @@ -1065,6 +1086,7 @@ def recv_file(self, request, stream, labels, start_position = 0, retries = _max_ else: return self.recv_file(request, stream, labels, start_position, retries - 1, end_position) else: + self.exit_status = self.error_codes["RETRIES_EXCEEDED"] raise S3DownloadError("Download failed for: %s" % resource['uri']) stream.flush() @@ -1097,18 +1119,21 @@ def recv_file(self, request, stream, labels, start_position = 0, retries = _max_ file_md5sum = response['headers']['x-amz-meta-md5sum'] except: warning('md5sum meta information not found in multipart uploaded file') + self.exit_status = self.error_codes["MD5_META_NOTFOUND"] response["md5match"] = file_md5sum == response["md5"] debug("ReceiveFile: Computed MD5 = %s" % response["md5"]) if not response["md5match"]: warning("MD5 signatures do not match: computed=%s, received=%s" % ( response["md5"], response["headers"]["etag"])) + self.exit_status = self.error_codes["MD5_MISMATCH"] response["elapsed"] = timestamp_end - timestamp_start response["size"] = current_position response["speed"] = response["elapsed"] and float(response["size"]) / response["elapsed"] or float(-1) if response["size"] != start_position + long(response["headers"]["content-length"]): warning("Reported size (%s) does not match received size (%s)" % ( start_position + response["headers"]["content-length"], response["size"])) + self.exit_status = self.error_codes["SIZE_MISMATCH"] return response __all__.append("S3") diff --git a/s3cmd b/s3cmd index 81635e750..b665f7a60 100755 --- a/s3cmd +++ b/s3cmd @@ -325,6 +325,8 @@ def cmd_object_put(args): debug(u"Removing temporary encrypted file: %s" % unicodise(full_name)) os.remove(full_name) + return s3.exit_status + def cmd_object_get(args): cfg = Config() s3 = S3(cfg) @@ -459,6 +461,8 @@ def cmd_object_get(args): output(u"File %s saved as '%s' (%d bytes in %0.1f seconds, %0.2f %sB/s)" % (uri, destination, response["size"], response["elapsed"], speed_fmt[0], speed_fmt[1])) + return s3.exit_status + def cmd_object_del(args): for uri_str in args: uri = S3Uri(uri_str) @@ -797,7 +801,7 @@ def cmd_sync_remote2local(args): clean_tempfiles(dst_stream) except: pass warning(u"Exiting after keyboard interrupt") - return + return s3.error_codes["KEYBOARD_INTERRUPT"] except Exception, e: try: dst_stream.close() @@ -833,6 +837,8 @@ def cmd_sync_remote2local(args): else: info(outstr) + return s3.exit_status + def cmd_sync_local2remote(args): def _build_attr_header(src): import pwd, grp @@ -992,6 +998,8 @@ def cmd_sync_local2remote(args): if result['status'] == 201: output("Created invalidation request for %d paths" % len(uploaded_objects_list)) output("Check progress with: s3cmd cfinvalinfo cf://%s/%s" % (result['dist_id'], result['request_id'])) + return s3.exit_status + def cmd_sync(args): if (len(args) < 2): @@ -1765,7 +1773,8 @@ def main(): sys.exit(1) try: - cmd_func(args) + rv = cmd_func(args) + sys.exit(rv) except S3Error, e: error(u"S3 error: %s" % e) sys.exit(1) @@ -1839,7 +1848,7 @@ if __name__ == '__main__': except KeyboardInterrupt: sys.stderr.write("See ya!\n") - sys.exit(1) + sys.exit(6) except Exception, e: report_exception(e) From be7845ffb94a4dc894df014ae61e75fcaa7511f6 Mon Sep 17 00:00:00 2001 From: Sarath Lakshman Date: Sun, 6 Nov 2011 20:26:39 +0530 Subject: [PATCH 16/16] Added separate config parameters for thread limit and split count wrt to download and upload added download config parameters parallel_multipart_download_threads (thread count), parallel_multipart_download_count (split count) added upload config parameters parallel_multipart_upload_threads (thread count), parallel_multipart_upload_count (split count) --- S3/Config.py | 6 ++++-- S3/S3.py | 12 ++++++------ 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/S3/Config.py b/S3/Config.py index 751d13f41..458f1e882 100644 --- a/S3/Config.py +++ b/S3/Config.py @@ -83,8 +83,10 @@ class Config(object): website_endpoint = "http://%(bucket)s.s3-website-%(location)s.amazonaws.com/" parallel_multipart_download = False parallel_multipart_upload = False - parallel_multipart_threads = 10 - parallel_multipart_count = 20 + parallel_multipart_download_threads = 5 + parallel_multipart_upload_threads = 5 + parallel_multipart_download_count = 5 + parallel_multipart_upload_count = 5 ## Creating a singleton def __new__(self, configfile = None): diff --git a/S3/S3.py b/S3/S3.py index 614e61ef7..4ba006a7b 100644 --- a/S3/S3.py +++ b/S3/S3.py @@ -348,7 +348,7 @@ def object_multipart_upload(self, filename, uri, cfg, extra_headers = None, extr except (IOError, OSError), e: raise InvalidFileError(u"%s: %s" % (unicodise(filename), e.strerror)) - parts_size = file_size / cfg.parallel_multipart_count + parts_size = file_size / cfg.parallel_multipart_upload_count debug("File size=%d parts size=%d" %(file_size, parts_size)) if parts_size < 5*1024*1024: warning("File part size is less than minimum required size (5 MB). Disabled parallel multipart upload") @@ -394,7 +394,7 @@ def object_multipart_upload(self, filename, uri, cfg, extra_headers = None, extr if start_offset + parts_size - 1 < file_size: end_offset = start_offset + parts_size - 1 - if i == cfg.parallel_multipart_count: + if i == cfg.parallel_multipart_upload_count: end_offset = file_size - 1 else: end_offset = file_size - 1 @@ -441,7 +441,7 @@ def part_upload_worker(): part_upload_list[part_number] = response["headers"]["etag"].strip('"\'') file.close() - for i in range(cfg.parallel_multipart_threads): + for i in range(cfg.parallel_multipart_upload_threads): t = threading.Thread(target=part_upload_worker) t.setDaemon(True) t.start() @@ -541,7 +541,7 @@ def object_multipart_get(self, uri, stream, cfg, start_position = 0, extra_label warning('md5sum meta information not found in multipart uploaded file') multipart_ranges = [] - parts_size = file_size / cfg.parallel_multipart_count + parts_size = file_size / cfg.parallel_multipart_download_count global worker_queue tmp_dir = os.path.join(os.path.dirname(stream.name),'tmps3') os.makedirs(tmp_dir) @@ -552,7 +552,7 @@ def object_multipart_get(self, uri, stream, cfg, start_position = 0, extra_label start_offset = offset if start_offset + parts_size - 1 < file_size: end_offset = start_offset + parts_size - 1 - if i == cfg.parallel_multipart_count: + if i == cfg.parallel_multipart_download_count: end_offset = file_size - 1 else: end_offset = file_size - 1 @@ -583,7 +583,7 @@ def get_worker(): labels = { 'source' : unicodise(uri.uri()), 'destination' : unicodise(stream.name), 'extra' : extra_label } self.recv_file(request, stream, labels, start_position, retries = self._max_retries, end_position = end_position) - for i in range(cfg.parallel_multipart_threads): + for i in range(cfg.parallel_multipart_download_threads): t = threading.Thread(target=get_worker) t.setDaemon(True) t.start()