diff --git a/S3/Config.py b/S3/Config.py index 4e2a58649..458f1e882 100644 --- a/S3/Config.py +++ b/S3/Config.py @@ -81,6 +81,12 @@ class Config(object): website_index = "index.html" website_error = "" website_endpoint = "http://%(bucket)s.s3-website-%(location)s.amazonaws.com/" + parallel_multipart_download = False + parallel_multipart_upload = False + parallel_multipart_download_threads = 5 + parallel_multipart_upload_threads = 5 + parallel_multipart_download_count = 5 + parallel_multipart_upload_count = 5 ## Creating a singleton def __new__(self, configfile = None): diff --git a/S3/S3.py b/S3/S3.py index 7d16336ed..4ba006a7b 100644 --- a/S3/S3.py +++ b/S3/S3.py @@ -10,6 +10,8 @@ import logging import mimetypes import re +import Queue +import threading from logging import debug, info, warning, error from stat import ST_SIZE @@ -22,6 +24,7 @@ from SortedDict import SortedDict from BidirMap import BidirMap from Config import Config +from Utils import concat_files, hash_file_md5 from Exceptions import * from ACL import ACL, GranteeLogDelivery from AccessLog import AccessLog @@ -60,7 +63,7 @@ def format_param_str(self): return param_str and "?" + param_str[1:] def sign(self): - h = self.method_string + "\n" + h = self.method_string + "\n" h += self.headers.get("content-md5", "")+"\n" h += self.headers.get("content-type", "")+"\n" h += self.headers.get("date", "")+"\n" @@ -70,9 +73,18 @@ def sign(self): if self.resource['bucket']: h += "/" + self.resource['bucket'] h += self.resource['uri'] + + tmp_params = "" + for parameter in self.params: + if parameter in ['uploads', 'partNumber', 'uploadId', 'acl', 'location', 'logging', 'torrent']: + if self.params[parameter] != "": + tmp_params += '&%s=%s' %(parameter, self.params[parameter]) + else: + tmp_params += '&%s' %parameter + if tmp_params != "": + h+='?'+tmp_params[1:].encode('UTF-8') debug("SignHeaders: " + repr(h)) signature = sign_string(h) - self.headers["Authorization"] = "AWS "+self.s3.config.access_key+":"+signature def get_triplet(self): @@ -88,7 +100,8 @@ class S3(object): PUT = 0x02, HEAD = 0x04, DELETE = 0x08, - MASK = 0x0F, + POST = 0x20, + MASK = 0xFF, ) targets = BidirMap( @@ -107,6 +120,7 @@ class S3(object): OBJECT_PUT = targets["OBJECT"] | http_methods["PUT"], OBJECT_GET = targets["OBJECT"] | http_methods["GET"], OBJECT_HEAD = targets["OBJECT"] | http_methods["HEAD"], + OBJECT_POST = targets["OBJECT"] | http_methods["POST"], OBJECT_DELETE = targets["OBJECT"] | http_methods["DELETE"], ) @@ -116,12 +130,25 @@ class S3(object): "BucketAlreadyExists" : "Bucket '%s' already exists", } + error_codes = { + "SIZE_MISMATCH":1, + "MD5_MISMATCH":2, + "RETRIES_EXCEEDED":3, + "UPLOAD_ABORT":4, + "MD5_META_NOTFOUND":5, + "KEYBOARD_INTERRUPT":6 + } + + ## S3 sometimes sends HTTP-307 response redir_map = {} ## Maximum attempts of re-issuing failed requests _max_retries = 5 + ##Default exit status = 0 (SUCCESS) + exit_status = 0 + def __init__(self, config): self.config = config @@ -309,6 +336,156 @@ def website_delete(self, uri, bucket_location = None): return response + def object_multipart_upload(self, filename, uri, cfg, extra_headers = None, extra_label = ""): + if uri.type != "s3": + raise ValueError("Expected URI type 's3', got '%s'" % uri.type) + + if not os.path.isfile(filename): + raise InvalidFileError(u"%s is not a regular file" % unicodise(filename)) + try: + file = open(filename, "rb") + file_size = os.stat(filename)[ST_SIZE] + except (IOError, OSError), e: + raise InvalidFileError(u"%s: %s" % (unicodise(filename), e.strerror)) + + parts_size = file_size / cfg.parallel_multipart_upload_count + debug("File size=%d parts size=%d" %(file_size, parts_size)) + if parts_size < 5*1024*1024: + warning("File part size is less than minimum required size (5 MB). Disabled parallel multipart upload") + return self.object_put(filename, uri, extra_headers = extra_headers, extra_label = extra_label) + + md5_hash = hash_file_md5(filename) + info("Calculating md5sum for %s" %filename) + headers = SortedDict(ignore_case = True) + if extra_headers: + headers.update(extra_headers) + + content_type = self.config.mime_type + if not content_type and self.config.guess_mime_type: + content_type = mimetypes.guess_type(filename)[0] + if not content_type: + content_type = self.config.default_mime_type + debug("Content-Type set to '%s'" % content_type) + headers["content-type"] = content_type + if self.config.acl_public: + headers["x-amz-acl"] = "public-read" + if self.config.reduced_redundancy: + headers["x-amz-storage-class"] = "REDUCED_REDUNDANCY" + + headers = {} + headers['date'] = time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime()) + headers['x-amz-meta-md5sum'] = md5_hash + initiate_request = self.create_request("OBJECT_POST", headers = headers, uri = uri, uploads='') + initiate_response = self.send_request(initiate_request) + upload_id = getTextFromXml(initiate_response["data"], ".//UploadId") + #Upload single file size + + debug("Upload ID = %s" %upload_id) + + multipart_ranges = [] + global upload_worker_queue + global part_upload_list + + part_upload_list = {} + upload_worker_queue = Queue.Queue() + i = 1 + for offset in range(0, file_size, parts_size): + start_offset = offset + + if start_offset + parts_size - 1 < file_size: + end_offset = start_offset + parts_size - 1 + if i == cfg.parallel_multipart_upload_count: + end_offset = file_size - 1 + else: + end_offset = file_size - 1 + + item = {'part_no':i, 'start_position':start_offset, 'end_position':end_offset, 'uri':uri, 'upload_id':upload_id, 'filename':filename} + + multipart_ranges.append(item) + upload_worker_queue.put(item) + debug("Part %d start=%d end=%d (part size=%d)" %(i, start_offset, end_offset, parts_size)) + i+=1 + if end_offset == file_size - 1: + break + + def part_upload_worker(): + while True: + try: + part_info = upload_worker_queue.get_nowait() + except Queue.Empty: + return + part_number = part_info['part_no'] + start_position = part_info['start_position'] + end_position = part_info['end_position'] + uri = part_info['uri'] + upload_id = part_info['upload_id'] + filename = part_info['filename'] + + file = open(filename, 'rb') + + headers = SortedDict(ignore_case = True) + #if extra_headers: + # headers.update(extra_headers) + + headers["content-length"] = end_position - start_position + 1 + headers['Expect'] = '100-continue' + + request = self.create_request("OBJECT_PUT", uri = uri, headers = headers, partNumber = part_number, uploadId = upload_id) + labels = { 'source' : unicodise(filename), 'destination' : unicodise(uri.uri()), 'extra' : extra_label } + try: + response = self.send_file(request, file, labels, retries = self._max_retries, part_info = part_info) + except S3UploadError, e: + self.abort_multipart_upload(uri, upload_id) + file.close() + raise S3UploadError("Failed to upload part-%d to S3" %part_info['part_no']) + part_upload_list[part_number] = response["headers"]["etag"].strip('"\'') + file.close() + + for i in range(cfg.parallel_multipart_upload_threads): + t = threading.Thread(target=part_upload_worker) + t.setDaemon(True) + t.start() + + timestamp_start = time.time() + while threading.activeCount() > 1: + time.sleep(0.1) + debug("Upload of file parts complete") + + body = "\n" + for part in part_upload_list.keys(): + body += " \n" + body += " %d\n" %part + body += " %s\n" %part_upload_list[part] + body += " \n" + body += "" + + complete_request = self.create_request("OBJECT_POST", uri = uri, uploadId = upload_id) + response = self.send_request(complete_request, body) + timestamp_end = time.time() + + object_info = self.object_info(uri) + upload_size = int(object_info['headers']['content-length']) + #file_md5sum = info['headers']['etag'].strip('"') + + response = {} + response["headers"] = object_info["headers"] + #response["md5match"] = file_md5sum.strip() == md5_hash_file.strip() + response["md5"] = md5_hash + #if not response["md5match"]: + # warning("MD5 signatures do not match: computed=%s, received=%s" % (md5_hash_file, file_md5sum)) + # warning("Aborting file upload") + # self.abort_multipart_upload(uri, upload_id) + # raise S3UploadError + + response["elapsed"] = timestamp_end - timestamp_start + response["size"] = file_size + response["speed"] = response["elapsed"] and float(response["size"]) / response["elapsed"] or float(-1) + if response["size"] != upload_size: + warning("Reported size (%s) does not match received size (%s)" % (upload_size, response["size"])) + self.abort_multipart_upload() + return response + + def object_put(self, filename, uri, extra_headers = None, extra_label = ""): # TODO TODO # Make it consistent with stream-oriented object_get() @@ -339,7 +516,7 @@ def object_put(self, filename, uri, extra_headers = None, extra_label = ""): headers["x-amz-storage-class"] = "REDUCED_REDUNDANCY" request = self.create_request("OBJECT_PUT", uri = uri, headers = headers) labels = { 'source' : unicodise(filename), 'destination' : unicodise(uri.uri()), 'extra' : extra_label } - response = self.send_file(request, file, labels) + response = self.send_file(request, file, labels, retries = self._max_retries) return response def object_get(self, uri, stream, start_position = 0, extra_label = ""): @@ -350,6 +527,94 @@ def object_get(self, uri, stream, start_position = 0, extra_label = ""): response = self.recv_file(request, stream, labels, start_position) return response + def object_multipart_get(self, uri, stream, cfg, start_position = 0, extra_label = ""): + debug("Executing multipart download") + if uri.type != "s3": + raise ValueError("Expected URI type 's3', got '%s'" % uri.type) + object_info = self.object_info(uri) + file_size = int(object_info['headers']['content-length']) + file_md5sum = object_info['headers']['etag'].strip('"') + if len(file_md5sum.split('-')) == 2: + try: + file_md5sum = object_info['headers']['x-amz-meta-md5sum'] + except: + warning('md5sum meta information not found in multipart uploaded file') + + multipart_ranges = [] + parts_size = file_size / cfg.parallel_multipart_download_count + global worker_queue + tmp_dir = os.path.join(os.path.dirname(stream.name),'tmps3') + os.makedirs(tmp_dir) + + worker_queue = Queue.Queue() + i = 1 + for offset in range(0, file_size, parts_size): + start_offset = offset + if start_offset + parts_size - 1 < file_size: + end_offset = start_offset + parts_size - 1 + if i == cfg.parallel_multipart_download_count: + end_offset = file_size - 1 + else: + end_offset = file_size - 1 + + part_stream = open(os.path.join(tmp_dir, "%s.part-%d" %(os.path.basename(stream.name), i)),'wb+') + item = (i, start_offset, end_offset, uri, part_stream) + + multipart_ranges.append(item) + worker_queue.put(item) + i+=1 + + if end_offset == file_size - 1: + break + + + def get_worker(): + while True: + try: + item = worker_queue.get_nowait() + except Queue.Empty: + return + offset = item[0] + start_position = item[1] + end_position = item[2] + uri = item[3] + stream = item[4] + request = self.create_request("OBJECT_GET", uri = uri) + labels = { 'source' : unicodise(uri.uri()), 'destination' : unicodise(stream.name), 'extra' : extra_label } + self.recv_file(request, stream, labels, start_position, retries = self._max_retries, end_position = end_position) + + for i in range(cfg.parallel_multipart_download_threads): + t = threading.Thread(target=get_worker) + t.setDaemon(True) + t.start() + + timestamp_start = time.time() + while threading.activeCount() > 1: + time.sleep(0.1) + debug("Download of file parts complete") + source_streams = map(lambda x: x[4], multipart_ranges) + md5_hash_download, download_size = concat_files(stream, True, *source_streams) + timestamp_end = time.time() + os.rmdir(tmp_dir) + stream.flush() + + debug("ReceivedFile: Computed MD5 = %s" % md5_hash_download) + response = {} + response["headers"] = object_info["headers"] + response["md5match"] = file_md5sum.strip() == md5_hash_download.strip() + response["md5"] = file_md5sum + if not response["md5match"]: + warning("MD5 signatures do not match: computed=%s, received=%s" % (md5_hash_download, file_md5sum)) + self.exit_status = self.error_codes["MD5_MISMATCH"] + + response["elapsed"] = timestamp_end - timestamp_start + response["size"] = file_size + response["speed"] = response["elapsed"] and float(response["size"]) / response["elapsed"] or float(-1) + if response["size"] != download_size: + warning("Reported size (%s) does not match received size (%s)" % (download_size, response["size"])) + self.exit_status = self.error_codes["SIZE_MISMATCH"] + return response + def object_delete(self, uri): if uri.type != "s3": raise ValueError("Expected URI type 's3', got '%s'" % uri.type) @@ -519,6 +784,7 @@ def create_request(self, operation, uri = None, bucket = None, object = None, he request = S3Request(self, method_string, resource, headers, params) debug("CreateRequest: resource[uri]=" + resource['uri']) + debug("Request: headers="+str(headers)) return request def _fail_wait(self, retries): @@ -535,6 +801,7 @@ def send_request(self, request, body = None, retries = _max_retries): for header in headers.keys(): headers[header] = str(headers[header]) conn = self.get_connection(resource['bucket']) + debug("Sending Request: method:%s body: %s uri: %s headers:%s" %(method_string,body,self.format_uri(resource),str(headers))) conn.request(method_string, self.format_uri(resource), body, headers) response = {} http_response = conn.getresponse() @@ -577,13 +844,25 @@ def send_request(self, request, body = None, retries = _max_retries): return response - def send_file(self, request, file, labels, throttle = 0, retries = _max_retries): + def abort_multipart_upload(self, uri, upload_id): + headers = {} + headers['date'] = time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime()) + request = self.create_request("OBJECT_DELETE", headers = headers, uri = uri, UploadId = upload_id) + response = self.send_request(request) + self.exit_status = self.error_codes["UPLOAD_ABORT"] + return response + + def send_file(self, request, file, labels, throttle = 0, retries = _max_retries, part_info = None): method_string, resource, headers = request.get_triplet() size_left = size_total = headers.get("content-length") if self.config.progress_meter: progress = self.config.progress_class(labels, size_total) else: - info("Sending file '%s', please wait..." % file.name) + if part_info: + info("Sending file '%s' part-%d, please wait..." % (file.name, part_info['part_no'])) + else: + info("Sending file '%s', please wait..." % file.name) + timestamp_start = time.time() try: conn = self.get_connection(resource['bucket']) @@ -600,15 +879,25 @@ def send_file(self, request, file, labels, throttle = 0, retries = _max_retries) warning("Waiting %d sec..." % self._fail_wait(retries)) time.sleep(self._fail_wait(retries)) # Connection error -> same throttle value - return self.send_file(request, file, labels, throttle, retries - 1) + return self.send_file(request, file, labels, throttle, retries - 1, part_info) else: + self.exit_status = self.error_codes["RETRIES_EXCEEDED"] raise S3UploadError("Upload failed for: %s" % resource['uri']) - file.seek(0) + + if part_info: + file.seek(part_info['start_position']) + else: + file.seek(0) + md5_hash = md5() try: while (size_left > 0): #debug("SendFile: Reading up to %d bytes from '%s'" % (self.config.send_chunk, file.name)) - data = file.read(self.config.send_chunk) + if size_left < self.config.send_chunk: + chunk_size = size_left + else: + chunk_size = self.config.send_chunk + data = file.read(chunk_size) md5_hash.update(data) conn.send(data) if self.config.progress_meter: @@ -629,6 +918,7 @@ def send_file(self, request, file, labels, throttle = 0, retries = _max_retries) except Exception, e: if self.config.progress_meter: progress.done("failed") + debug("Retries:"+str(retries)) if retries: if retries < self._max_retries: throttle = throttle and throttle * 5 or 0.01 @@ -637,9 +927,10 @@ def send_file(self, request, file, labels, throttle = 0, retries = _max_retries) warning("Waiting %d sec..." % self._fail_wait(retries)) time.sleep(self._fail_wait(retries)) # Connection error -> same throttle value - return self.send_file(request, file, labels, throttle, retries - 1) + return self.send_file(request, file, labels, throttle, retries - 1, part_info) else: debug("Giving up on '%s' %s" % (file.name, e)) + self.exit_status = self.error_codes["RETRIES_EXCEEDED"] raise S3UploadError("Upload failed for: %s" % resource['uri']) timestamp_end = time.time() @@ -659,7 +950,7 @@ def send_file(self, request, file, labels, throttle = 0, retries = _max_retries) redir_hostname = getTextFromXml(response['data'], ".//Endpoint") self.set_hostname(redir_bucket, redir_hostname) warning("Redirected to: %s" % (redir_hostname)) - return self.send_file(request, file, labels) + return self.send_file(request, file, labels, retries, part_info) # S3 from time to time doesn't send ETag back in a response :-( # Force re-upload here. @@ -682,9 +973,10 @@ def send_file(self, request, file, labels, throttle = 0, retries = _max_retries) warning("Upload failed: %s (%s)" % (resource['uri'], S3Error(response))) warning("Waiting %d sec..." % self._fail_wait(retries)) time.sleep(self._fail_wait(retries)) - return self.send_file(request, file, labels, throttle, retries - 1) + return self.send_file(request, file, labels, throttle, retries - 1, part_info) else: warning("Too many failures. Giving up on '%s'" % (file.name)) + self.exit_status = self.error_codes["RETRIES_EXCEEDED"] raise S3UploadError ## Non-recoverable error @@ -693,21 +985,24 @@ def send_file(self, request, file, labels, throttle = 0, retries = _max_retries) debug("MD5 sums: computed=%s, received=%s" % (md5_computed, response["headers"]["etag"])) if response["headers"]["etag"].strip('"\'') != md5_hash.hexdigest(): warning("MD5 Sums don't match!") + self.exit_status = self.error_codes["MD5_MISMATCH"] if retries: warning("Retrying upload of %s" % (file.name)) - return self.send_file(request, file, labels, throttle, retries - 1) + return self.send_file(request, file, labels, throttle, retries - 1, part_info) else: warning("Too many failures. Giving up on '%s'" % (file.name)) + self.exit_status = self.error_codes["RETRIES_EXCEEDED"] raise S3UploadError return response - def recv_file(self, request, stream, labels, start_position = 0, retries = _max_retries): + def recv_file(self, request, stream, labels, start_position = 0, retries = _max_retries, end_position = -1): method_string, resource, headers = request.get_triplet() if self.config.progress_meter: progress = self.config.progress_class(labels, 0) else: info("Receiving file '%s', please wait..." % stream.name) + stream.seek(0) timestamp_start = time.time() try: conn = self.get_connection(resource['bucket']) @@ -715,9 +1010,12 @@ def recv_file(self, request, stream, labels, start_position = 0, retries = _max_ conn.putrequest(method_string, self.format_uri(resource)) for header in headers.keys(): conn.putheader(header, str(headers[header])) - if start_position > 0: + if start_position > 0 and end_position == -1: debug("Requesting Range: %d .. end" % start_position) conn.putheader("Range", "bytes=%d-" % start_position) + elif end_position != -1: + debug("Requesting Range: %d .. %d" % (start_position, end_position)) + conn.putheader("Range", "bytes=%d-%d" % (start_position, end_position)) conn.endheaders() response = {} http_response = conn.getresponse() @@ -733,8 +1031,9 @@ def recv_file(self, request, stream, labels, start_position = 0, retries = _max_ warning("Waiting %d sec..." % self._fail_wait(retries)) time.sleep(self._fail_wait(retries)) # Connection error -> same throttle value - return self.recv_file(request, stream, labels, start_position, retries - 1) + return self.recv_file(request, stream, labels, start_position, retries - 1, end_position) else: + self.exit_status = self.error_codes["RETRIES_EXCEEDED"] raise S3DownloadError("Download failed for: %s" % resource['uri']) if response["status"] == 307: @@ -744,12 +1043,12 @@ def recv_file(self, request, stream, labels, start_position = 0, retries = _max_ redir_hostname = getTextFromXml(response['data'], ".//Endpoint") self.set_hostname(redir_bucket, redir_hostname) warning("Redirected to: %s" % (redir_hostname)) - return self.recv_file(request, stream, labels) + return self.recv_file(request, stream, labels, start_position, retries, end_position) if response["status"] < 200 or response["status"] > 299: raise S3Error(response) - if start_position == 0: + if start_position == 0 and end_position == -1: # Only compute MD5 on the fly if we're downloading from beginning # Otherwise we'd get a nonsense. md5_hash = md5() @@ -767,7 +1066,7 @@ def recv_file(self, request, stream, labels, start_position = 0, retries = _max_ this_chunk = size_left > self.config.recv_chunk and self.config.recv_chunk or size_left data = http_response.read(this_chunk) stream.write(data) - if start_position == 0: + if start_position == 0 and end_position == -1: md5_hash.update(data) current_position += len(data) ## Call progress meter from here... @@ -782,8 +1081,12 @@ def recv_file(self, request, stream, labels, start_position = 0, retries = _max_ warning("Waiting %d sec..." % self._fail_wait(retries)) time.sleep(self._fail_wait(retries)) # Connection error -> same throttle value - return self.recv_file(request, stream, labels, current_position, retries - 1) + if end_position != -1: + return self.recv_file(request, stream, labels, current_position, retries - 1) + else: + return self.recv_file(request, stream, labels, start_position, retries - 1, end_position) else: + self.exit_status = self.error_codes["RETRIES_EXCEEDED"] raise S3DownloadError("Download failed for: %s" % resource['uri']) stream.flush() @@ -796,30 +1099,41 @@ def recv_file(self, request, stream, labels, start_position = 0, retries = _max_ progress.update() progress.done("done") - if start_position == 0: - # Only compute MD5 on the fly if we were downloading from the beginning - response["md5"] = md5_hash.hexdigest() - else: - # Otherwise try to compute MD5 of the output file - try: - response["md5"] = hash_file_md5(stream.name) - except IOError, e: - if e.errno != errno.ENOENT: - warning("Unable to open file: %s: %s" % (stream.name, e)) - warning("Unable to verify MD5. Assume it matches.") - response["md5"] = response["headers"]["etag"] - - response["md5match"] = response["headers"]["etag"].find(response["md5"]) >= 0 + if end_position == -1: + if start_position == 0: + # Only compute MD5 on the fly if we were downloading from the beginning + response["md5"] = md5_hash.hexdigest() + else: + # Otherwise try to compute MD5 of the output file + try: + response["md5"] = hash_file_md5(stream.name) + except IOError, e: + if e.errno != errno.ENOENT: + warning("Unable to open file: %s: %s" % (stream.name, e)) + warning("Unable to verify MD5. Assume it matches.") + response["md5"] = response["headers"]["etag"] + + file_md5sum = response["headers"]["etag"].strip('"\'') + if len(response["headers"]["etag"].split('-')) == 2: + try: + file_md5sum = response['headers']['x-amz-meta-md5sum'] + except: + warning('md5sum meta information not found in multipart uploaded file') + self.exit_status = self.error_codes["MD5_META_NOTFOUND"] + + response["md5match"] = file_md5sum == response["md5"] + debug("ReceiveFile: Computed MD5 = %s" % response["md5"]) + if not response["md5match"]: + warning("MD5 signatures do not match: computed=%s, received=%s" % ( + response["md5"], response["headers"]["etag"])) + self.exit_status = self.error_codes["MD5_MISMATCH"] response["elapsed"] = timestamp_end - timestamp_start response["size"] = current_position response["speed"] = response["elapsed"] and float(response["size"]) / response["elapsed"] or float(-1) if response["size"] != start_position + long(response["headers"]["content-length"]): warning("Reported size (%s) does not match received size (%s)" % ( start_position + response["headers"]["content-length"], response["size"])) - debug("ReceiveFile: Computed MD5 = %s" % response["md5"]) - if not response["md5match"]: - warning("MD5 signatures do not match: computed=%s, received=%s" % ( - response["md5"], response["headers"]["etag"])) + self.exit_status = self.error_codes["SIZE_MISMATCH"] return response __all__.append("S3") diff --git a/S3/Utils.py b/S3/Utils.py index 4a83601e1..16b95f06c 100644 --- a/S3/Utils.py +++ b/S3/Utils.py @@ -223,6 +223,30 @@ def hash_file_md5(filename): return h.hexdigest() __all__.append("hash_file_md5") +def concat_files(dest_handle, unlink = True, *source_handles): + """ + Read data from source file handles and write the data into dest_handle + Return md5-hash and file size + """ + h = md5() + for source in source_handles: + source.seek(0) + while True: + # Hash 32kB chunks + data = source.read(32*1024) + dest_handle.write(data) + if not data: + break + h.update(data) + + if unlink == True: + source.close() + os.unlink(source.name) + + return h.hexdigest(), dest_handle.tell() +__all__.append("concat_files") + + def mkdir_with_parents(dir_name): """ mkdir_with_parents(dst_dir) diff --git a/s3cmd b/s3cmd index 7d93a5bf7..b665f7a60 100755 --- a/s3cmd +++ b/s3cmd @@ -32,6 +32,16 @@ from distutils.spawn import find_executable def output(message): sys.stdout.write(message + "\n") +def clean_tempfiles(dest_handler): + path = os.path.join(os.path.dirname(dest_handler.name),'tmps3') + if os.path.exists(path): + for f in os.listdir(path): + fpath = os.path.join(path,f) + os.unlink(fpath) + os.rmdir(path) + if os.stat(dest_handler.name).st_size == 0: + os.unlink(dest_handler.name) + def check_args_type(args, type, verbose_type): for arg in args: if S3Uri(arg).type != type: @@ -315,6 +325,8 @@ def cmd_object_put(args): debug(u"Removing temporary encrypted file: %s" % unicodise(full_name)) os.remove(full_name) + return s3.exit_status + def cmd_object_get(args): cfg = Config() s3 = S3(cfg) @@ -449,6 +461,8 @@ def cmd_object_get(args): output(u"File %s saved as '%s' (%d bytes in %0.1f seconds, %0.2f %sB/s)" % (uri, destination, response["size"], response["elapsed"], speed_fmt[0], speed_fmt[1])) + return s3.exit_status + def cmd_object_del(args): for uri_str in args: uri = S3Uri(uri_str) @@ -563,7 +577,10 @@ def cmd_info(args): output(u" File size: %s" % info['headers']['content-length']) output(u" Last mod: %s" % info['headers']['last-modified']) output(u" MIME type: %s" % info['headers']['content-type']) - output(u" MD5 sum: %s" % info['headers']['etag'].strip('"')) + if info['headers'].has_key('x-amz-meta-md5sum'): + output(u" MD5 sum: %s" % info['headers']['x-amz-meta-md5sum'].strip('"')) + else: + output(u" MD5 sum: %s" % info['headers']['etag'].strip('"')) else: info = s3.bucket_info(uri) output(u"%s (bucket):" % uri.uri()) @@ -663,6 +680,11 @@ def cmd_sync_remote2local(args): attrs[key] = val return attrs + if cfg.parallel_multipart_download: + #Disable progress meter + cfg.progress_meter = False + + s3 = S3(Config()) destination_base = args[-1] @@ -744,7 +766,10 @@ def cmd_sync_remote2local(args): os.close(os.open(dst_file, open_flags)) # Yeah I know there is a race condition here. Sadly I don't know how to open() in exclusive mode. dst_stream = open(dst_file, "wb") - response = s3.object_get(uri, dst_stream, extra_label = seq_label) + if cfg.parallel_multipart_download == False: + response = s3.object_get(uri, dst_stream, extra_label = seq_label) + else: + response = s3.object_multipart_get(uri, dst_stream, cfg, extra_label = seq_label) dst_stream.close() if response['headers'].has_key('x-amz-meta-s3cmd-attrs') and cfg.preserve_attrs: attrs = _parse_attrs_header(response['headers']['x-amz-meta-s3cmd-attrs']) @@ -756,7 +781,9 @@ def cmd_sync_remote2local(args): os.utime(dst_file, (atime, mtime)) ## FIXME: uid/gid / uname/gname handling comes here! TODO except OSError, e: - try: dst_stream.close() + try: + dst_stream.close() + clean_tempfiles(dst_stream) except: pass if e.errno == errno.EEXIST: warning(u"%s exists - not overwriting" % (dst_file)) @@ -769,19 +796,25 @@ def cmd_sync_remote2local(args): continue raise e except KeyboardInterrupt: - try: dst_stream.close() + try: + dst_stream.close() + clean_tempfiles(dst_stream) except: pass warning(u"Exiting after keyboard interrupt") - return + return s3.error_codes["KEYBOARD_INTERRUPT"] except Exception, e: - try: dst_stream.close() + try: + dst_stream.close() + clean_tempfiles(dst_stream) except: pass error(u"%s: %s" % (file, e)) continue # We have to keep repeating this call because # Python 2.4 doesn't support try/except/finally # construction :-( - try: dst_stream.close() + try: + dst_stream.close() + clean_tempfiles(dst_stream) except: pass except S3DownloadError, e: error(u"%s: download failed too many times. Skipping that file." % file) @@ -804,6 +837,8 @@ def cmd_sync_remote2local(args): else: info(outstr) + return s3.exit_status + def cmd_sync_local2remote(args): def _build_attr_header(src): import pwd, grp @@ -835,6 +870,10 @@ def cmd_sync_local2remote(args): for k in attrs: result += "%s:%s/" % (k, attrs[k]) return { 'x-amz-meta-s3cmd-attrs' : result[:-1] } + if cfg.parallel_multipart_upload: + #Disable progress meter + cfg.progress_meter = False + s3 = S3(cfg) if cfg.encrypt: @@ -919,7 +958,10 @@ def cmd_sync_local2remote(args): attr_header = _build_attr_header(src) debug(u"attr_header: %s" % attr_header) extra_headers.update(attr_header) - response = s3.object_put(src, uri, extra_headers, extra_label = seq_label) + if cfg.parallel_multipart_upload: + response = s3.object_multipart_upload(src, uri, cfg, extra_headers, extra_label = seq_label) + else: + response = s3.object_put(src, uri, extra_headers, extra_label = seq_label) except InvalidFileError, e: warning(u"File can not be uploaded: %s" % e) continue @@ -956,6 +998,8 @@ def cmd_sync_local2remote(args): if result['status'] == 201: output("Created invalidation request for %d paths" % len(uploaded_objects_list)) output("Check progress with: s3cmd cfinvalinfo cf://%s/%s" % (result['dist_id'], result['request_id'])) + return s3.exit_status + def cmd_sync(args): if (len(args) < 2): @@ -1729,7 +1773,8 @@ def main(): sys.exit(1) try: - cmd_func(args) + rv = cmd_func(args) + sys.exit(rv) except S3Error, e: error(u"S3 error: %s" % e) sys.exit(1) @@ -1803,7 +1848,7 @@ if __name__ == '__main__': except KeyboardInterrupt: sys.stderr.write("See ya!\n") - sys.exit(1) + sys.exit(6) except Exception, e: report_exception(e)