Browse files

Adding MultiPartCopy class and enable for object_copy when remote fil…

…e is greater than 5gb.
  • Loading branch information...
1 parent 19a529a commit a08c404f851d59eaebc88fe41184d6e76acb6b0d @porcupie committed Jul 17, 2012
Showing with 141 additions and 4 deletions.
  1. +3 −0 S3/Config.py
  2. +100 −0 S3/MultiPart.py
  3. +38 −4 S3/S3.py
View
3 S3/Config.py
@@ -65,6 +65,9 @@ class Config(object):
mime_type = ""
enable_multipart = True
multipart_chunk_size_mb = 15 # MB
+ #- minimum size to use multipart remote s3-to-s3 copy with byte range is 5gb
+ multipart_copy_size_gb = 5 # GB
+ multipart_copy_size = 5 * 1024 * 1024 * 1024
# List of checks to be performed for 'sync'
sync_checks = ['size', 'md5'] # 'weak-timestamp'
# List of compiled REGEXPs
View
100 S3/MultiPart.py
@@ -110,4 +110,104 @@ def abort_upload(self):
response = self.s3.send_request(request)
return response
+
+class MultiPartCopy(MultiPartUpload):
+
+ # S3 Config or const?
+ MIN_CHUNK_SIZE_MB = 5120 # 5GB
+ MAX_CHUNK_SIZE_MB = 42949672960 # 5TB
+
+ def __init__(self, s3, src_uri, dst_uri, src_size, headers_baseline = {}):
+ self.s3 = s3
+ self.file = self.src_uri = src_uri
+ self.uri = self.dst_uri = dst_uri
+ # ...
+ self.src_size = src_size
+ self.parts = {}
+ self.headers_baseline = headers_baseline
+ self.upload_id = self.initiate_multipart_copy()
+
+ def initiate_multipart_copy(self):
+ return self.initiate_multipart_upload()
+
+ def copy_all_parts(self):
+ """
+ Execute a full multipart upload copy on a remote file
+ Returns the seq/etag dict
+ """
+ if not self.upload_id:
+ raise RuntimeError("Attempting to use a multipart copy that has not been initiated.")
+
+ size_left = file_size = self.src_size
+ # TODO: only include byte range if remote src file is > 5gb, or get error
+ # > 5368709121 (5 * 1024 * 1024 * 1024)
+ self.chunk_size = self.s3.config.multipart_copy_size
+ nr_parts = file_size / self.chunk_size + (file_size % self.chunk_size and 1)
+ debug("MultiPart: Copying %s in %d parts" % (self.src_uri, nr_parts))
+
+ seq = 1
+ while size_left > 0:
+ offset = self.chunk_size * (seq - 1)
+ current_chunk_size = min(file_size - offset, self.chunk_size)
+ size_left -= current_chunk_size
+ labels = {
+ 'source' : unicodise(self.src_uri.uri()),
+ 'destination' : unicodise(self.uri.uri()),
+ 'extra' : "[part %d of %d, %s]" % (seq, nr_parts, "%d%sB" % formatSize(current_chunk_size, human_readable = True))
+ }
+ try:
+ #self.upload_part(seq, offset, current_chunk_size, labels)
+ self.copy_part(seq, offset, current_chunk_size, labels)
+ except:
+ error(u"Upload copy of '%s' part %d failed. Aborting multipart upload copy." % (self.src_uri, seq))
+ self.abort_copy()
+ raise
+ seq += 1
+
+ debug("MultiPart: Copy finished: %d parts", seq - 1)
+
+ def copy_part(self, seq, offset, chunk_size, labels):
+ """
+ Copy a remote file chunk
+ http://docs.amazonwebservices.com/AmazonS3/latest/API/index.html?mpUploadUploadPart.html
+ http://docs.amazonwebservices.com/AmazonS3/latest/API/mpUploadUploadPartCopy.html
+ """
+ debug("Copying part %i of %r (%s bytes)" % (seq, self.upload_id, chunk_size))
+
+ # set up headers with copy-params
+ headers = {
+ # TODO: should be /bucket/uri
+ "x-amz-copy-source": "/%s/%s" % (self.src_uri.bucket(), self.src_uri.object())
+ }
+ if chunk_size >= self.s3.config.multipart_copy_size:
+ # TODO: only include byte range if original file is > 5gb?
+ # > 5368709121 (5 * 1024 * 1024 * 1024)
+ headers["x-amz-copy-source-range"] = "bytes=%d-%d" % (offset, offset + chunk_size)
+
+
+ # x-amz-copy-source: /source_bucket/sourceObject
+ # x-amz-copy-source-range:bytes=first-last
+ # x-amz-copy-source-if-match: etag
+ # x-amz-copy-source-if-none-match: etag
+ # x-amz-copy-source-if-unmodified-since: time_stamp
+ # x-amz-copy-source-if-modified-since: time_stamp
+
+ query_string = "?partNumber=%i&uploadId=%s" % (seq, self.upload_id)
+
+ request = self.s3.create_request("OBJECT_PUT", uri = self.uri, headers = headers, extra = query_string)
+ response = self.s3.send_request(request)
+
+ # etag in xml response
+ #self.parts[seq] = response["headers"]["etag"]
+ self.parts[seq] = getTextFromXml(response["data"], "ETag")
+
+ return response
+
+ def complete_multipart_copy(self):
+ return self.complete_multipart_upload()
+
+ def abort_copy(self):
+ return self.abort_upload()
+
+
# vim:et:ts=4:sts=4:ai
View
42 S3/S3.py
@@ -25,7 +25,7 @@
from BidirMap import BidirMap
from Config import Config
from Exceptions import *
-from MultiPart import MultiPartUpload
+from MultiPart import MultiPartUpload, MultiPartCopy
from S3Uri import S3Uri
try:
@@ -403,21 +403,43 @@ def object_delete(self, uri):
response = self.send_request(request)
return response
+# TODO: want to be able to do multi-part copy on remote s3 objects > 5gb
+# instead of Object-PUT ... multipart upload with header -d
+
def object_copy(self, src_uri, dst_uri, extra_headers = None):
if src_uri.type != "s3":
raise ValueError("Expected URI type 's3', got '%s'" % src_uri.type)
if dst_uri.type != "s3":
raise ValueError("Expected URI type 's3', got '%s'" % dst_uri.type)
headers = SortedDict(ignore_case = True)
- headers['x-amz-copy-source'] = "/%s/%s" % (src_uri.bucket(), self.urlencode_string(src_uri.object()))
- ## TODO: For now COPY, later maybe add a switch?
- headers['x-amz-metadata-directive'] = "COPY"
+
+ # TODO: where do ACL headers go for copy?
if self.config.acl_public:
headers["x-amz-acl"] = "public-read"
if self.config.reduced_redundancy:
headers["x-amz-storage-class"] = "REDUCED_REDUNDANCY"
# if extra_headers:
# headers.update(extra_headers)
+
+ ## Multipart decision - can only copy remote s3-to-s3 files over 5gb
+ multipart = False
+ # TODO: does it need new config option for: enable_multipart_copy ?
+ if self.config.enable_multipart:
+ # get size of remote src only if multipart is enabled
+ src_info = self.object_info(src_uri)
+ size = int(src_info["headers"]["content-length"])
+
+ if size > self.config.multipart_copy_size:
+ multipart = True
+
+ if multipart:
+ # Multipart requests are quite different... drop here
+ return self.copy_file_multipart(src_uri, dst_uri, size, headers)
+
+ ## Not multipart...
+ ## TODO: For now COPY, later maybe add a switch?
+ headers['x-amz-copy-source'] = "/%s/%s" % (src_uri.bucket(), self.urlencode_string(src_uri.object()))
+ headers['x-amz-metadata-directive'] = "COPY"
request = self.create_request("OBJECT_PUT", uri = dst_uri, headers = headers)
response = self.send_request(request)
return response
@@ -762,6 +784,18 @@ def send_file_multipart(self, file, headers, uri, size):
response["speed"] = response["elapsed"] and float(response["size"]) / response["elapsed"] or float(-1)
return response
+ def copy_file_multipart(self, src_uri, dst_uri, size, headers):
+ debug("copying multi-part ..." )
+ timestamp_start = time.time()
+ multicopy = MultiPartCopy(self, src_uri, dst_uri, size, headers)
+ multicopy.copy_all_parts()
+ response = multicopy.complete_multipart_copy()
+ timestamp_end = time.time()
+ response["elapsed"] = timestamp_end - timestamp_start
+ response["size"] = size
+ response["speed"] = response["elapsed"] and float(response["size"]) / response["elapsed"] or float(-1)
+ return response
+
def recv_file(self, request, stream, labels, start_position = 0, retries = _max_retries):
method_string, resource, headers = request.get_triplet()
if self.config.progress_meter:

0 comments on commit a08c404

Please sign in to comment.