Multipart copy #74

Open
wants to merge 3 commits into
from
View
@@ -65,6 +65,9 @@ class Config(object):
mime_type = ""
enable_multipart = True
multipart_chunk_size_mb = 15 # MB
+ #- minimum size to use multipart remote s3-to-s3 copy with byte range is 5gb
+ #multipart_copy_size = (5 * 1024 * 1024 * 1024) - 1
+ multipart_copy_size = 5 * 1024 * 1024 * 1024
# List of checks to be performed for 'sync'
sync_checks = ['size', 'md5'] # 'weak-timestamp'
# List of compiled REGEXPs
View
@@ -110,4 +110,100 @@ def abort_upload(self):
response = self.s3.send_request(request)
return response
+
+class MultiPartCopy(MultiPartUpload):
+
+ def __init__(self, s3, src_uri, dst_uri, src_size, headers_baseline = {}):
+ self.s3 = s3
+ self.file = self.src_uri = src_uri
+ self.uri = self.dst_uri = dst_uri
+ # ...
+ self.src_size = src_size
+ self.parts = {}
+ self.headers_baseline = headers_baseline
+ self.upload_id = self.initiate_multipart_copy()
+
+ def initiate_multipart_copy(self):
+ return self.initiate_multipart_upload()
+
+ def complete_multipart_copy(self):
+ return self.complete_multipart_upload()
+
+ def abort_copy(self):
+ return self.abort_upload()
+
+
+ def copy_all_parts(self):
+ """
+ Execute a full multipart upload copy on a remote file
+ Returns the seq/etag dict
+ """
+ if not self.upload_id:
+ raise RuntimeError("Attempting to use a multipart copy that has not been initiated.")
+
+ size_left = file_size = self.src_size
+ self.chunk_size = self.s3.config.multipart_copy_size # - 1
+ nr_parts = file_size / self.chunk_size + (file_size % self.chunk_size and 1)
+ debug("MultiPart: Copying %s in %d parts" % (self.src_uri, nr_parts))
+
+ seq = 1
+ while size_left > 0:
+ offset = self.chunk_size * (seq - 1)
+ current_chunk_size = min(file_size - offset, self.chunk_size)
+ size_left -= current_chunk_size
+ labels = {
+ 'source' : unicodise(self.src_uri.uri()),
+ 'destination' : unicodise(self.uri.uri()),
+ 'extra' : "[part %d of %d, %s]" % (seq, nr_parts, "%d%sB" % formatSize(current_chunk_size, human_readable = True))
+ }
+ try:
+ self.copy_part(seq, offset, current_chunk_size, labels)
+ except:
+ # TODO: recover from some "retriable" errors?
+ error(u"Upload copy of '%s' part %d failed. Aborting multipart upload copy." % (self.src_uri, seq))
+ self.abort_copy()
+ raise
+ seq += 1
+
+ debug("MultiPart: Copy finished: %d parts", seq - 1)
+
+ def copy_part(self, seq, offset, chunk_size, labels):
+ """
+ Copy a remote file chunk
+ http://docs.amazonwebservices.com/AmazonS3/latest/API/index.html?mpUploadUploadPart.html
+ http://docs.amazonwebservices.com/AmazonS3/latest/API/mpUploadUploadPartCopy.html
+ """
+ debug("Copying part %i of %r (%s bytes)" % (seq, self.upload_id, chunk_size))
+
+ # set up headers with copy-params.
+ # Examples:
+ # x-amz-copy-source: /source_bucket/sourceObject
+ # x-amz-copy-source-range:bytes=first-last
+ # x-amz-copy-source-if-match: etag
+ # x-amz-copy-source-if-none-match: etag
+ # x-amz-copy-source-if-unmodified-since: time_stamp
+ # x-amz-copy-source-if-modified-since: time_stamp
+ headers = { "x-amz-copy-source": "/%s/%s" % (self.src_uri.bucket(), self.src_uri.object()) }
+
+ # include byte range header if already on next sequence or original file is > 5gb
+ if (seq > 1) or (chunk_size >= self.s3.config.multipart_copy_size):
+ # a 10 byte file has bytes=0-9
+ headers["x-amz-copy-source-range"] = "bytes=%d-%d" % (offset, (offset + chunk_size - 1))
+
+ query_string = "?partNumber=%i&uploadId=%s" % (seq, self.upload_id)
+
+ request = self.s3.create_request("OBJECT_PUT", uri = self.uri, headers = headers, extra = query_string)
+ response = self.s3.send_request(request)
+
+ # NOTE: Amazon sends whitespace while upload progresses, which
+ # accumulates in response body and seems to confuse XML parser.
+ # Strip newlines to find ETag in XML response data
+ data = response["data"].replace("\n", '')
+ self.parts[seq] = getTextFromXml(data, "ETag")
+
+ # TODO: how to fail if no ETag found ... raise Exception?
+ #debug("Uploaded copy part %i of %r (%s bytes): etag=%s" % (seq, self.upload_id, chunk_size, self.parts[seq]))
+
+ return response
+
# vim:et:ts=4:sts=4:ai
View
@@ -25,7 +25,7 @@
from BidirMap import BidirMap
from Config import Config
from Exceptions import *
-from MultiPart import MultiPartUpload
+from MultiPart import MultiPartUpload, MultiPartCopy
from S3Uri import S3Uri
try:
@@ -403,21 +403,41 @@ def object_delete(self, uri):
response = self.send_request(request)
return response
+
def object_copy(self, src_uri, dst_uri, extra_headers = None):
if src_uri.type != "s3":
raise ValueError("Expected URI type 's3', got '%s'" % src_uri.type)
if dst_uri.type != "s3":
raise ValueError("Expected URI type 's3', got '%s'" % dst_uri.type)
headers = SortedDict(ignore_case = True)
- headers['x-amz-copy-source'] = "/%s/%s" % (src_uri.bucket(), self.urlencode_string(src_uri.object()))
- ## TODO: For now COPY, later maybe add a switch?
- headers['x-amz-metadata-directive'] = "COPY"
+
+ # TODO: where do ACL headers go for copy? Should we copy ACL from source?
if self.config.acl_public:
headers["x-amz-acl"] = "public-read"
if self.config.reduced_redundancy:
headers["x-amz-storage-class"] = "REDUCED_REDUNDANCY"
# if extra_headers:
# headers.update(extra_headers)
+
+ ## Multipart decision - only do multipart copy for remote s3 files > 5gb
+ multipart = False
+ # TODO: does it need new config option for: enable_multipart_copy ?
+ if self.config.enable_multipart:
+ # get size of remote src only if multipart is enabled
+ src_info = self.object_info(src_uri)
+ size = int(src_info["headers"]["content-length"])
+
+ if size > self.config.multipart_copy_size:
+ multipart = True
+
+ if multipart:
+ # Multipart requests are quite different... drop here
+ return self.copy_file_multipart(src_uri, dst_uri, size, headers)
+
+ ## Not multipart...
+ ## TODO: For now COPY, later maybe add a switch?
+ headers['x-amz-copy-source'] = "/%s/%s" % (src_uri.bucket(), self.urlencode_string(src_uri.object()))
+ headers['x-amz-metadata-directive'] = "COPY"
request = self.create_request("OBJECT_PUT", uri = dst_uri, headers = headers)
response = self.send_request(request)
return response
@@ -762,6 +782,18 @@ def send_file_multipart(self, file, headers, uri, size):
response["speed"] = response["elapsed"] and float(response["size"]) / response["elapsed"] or float(-1)
return response
+ def copy_file_multipart(self, src_uri, dst_uri, size, headers):
+ debug("copying multi-part ..." )
+ timestamp_start = time.time()
+ multicopy = MultiPartCopy(self, src_uri, dst_uri, size, headers)
+ multicopy.copy_all_parts()
+ response = multicopy.complete_multipart_copy()
+ timestamp_end = time.time()
+ response["elapsed"] = timestamp_end - timestamp_start
+ response["size"] = size
+ response["speed"] = response["elapsed"] and float(response["size"]) / response["elapsed"] or float(-1)
+ return response
+
def recv_file(self, request, stream, labels, start_position = 0, retries = _max_retries):
method_string, resource, headers = request.get_triplet()
if self.config.progress_meter: