Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Merge branch 'multipart-single'

  • Loading branch information...
commit 3f44bd97bf4cd306504e91bd2ebd2b85ad12205c 2 parents 0d477b9 + f46250a
@mludvig mludvig authored
Showing with 179 additions and 17 deletions.
  1. +3 −0  S3/Config.py
  2. +113 −0 S3/MultiPart.py
  3. +45 −16 S3/S3.py
  4. +3 −0  S3/S3Uri.py
  5. +15 −1 s3cmd
View
3  S3/Config.py
@@ -63,6 +63,8 @@ class Config(object):
default_mime_type = "binary/octet-stream"
guess_mime_type = True
mime_type = ""
+ enable_multipart = True
+ multipart_chunk_size_mb = 15 # MB
# List of checks to be performed for 'sync'
sync_checks = ['size', 'md5'] # 'weak-timestamp'
# List of compiled REGEXPs
@@ -202,3 +204,4 @@ def dump(self, section, config):
for option in config.option_list():
self.stream.write("%s = %s\n" % (option, getattr(config, option)))
+# vim:et:ts=4:sts=4:ai
View
113 S3/MultiPart.py
@@ -0,0 +1,113 @@
+## Amazon S3 Multipart upload support
+## Author: Jerome Leclanche <jerome.leclanche@gmail.com>
+## License: GPL Version 2
+
+import os
+from stat import ST_SIZE
+from logging import debug, info, warning, error
+from Utils import getTextFromXml, formatSize, unicodise
+from Exceptions import S3UploadError
+
+class MultiPartUpload(object):
+
+ MIN_CHUNK_SIZE_MB = 5 # 5MB
+ MAX_CHUNK_SIZE_MB = 5120 # 5GB
+ MAX_FILE_SIZE = 42949672960 # 5TB
+
+ def __init__(self, s3, file, uri, headers_baseline = {}):
+ self.s3 = s3
+ self.file = file
+ self.uri = uri
+ self.parts = {}
+ self.headers_baseline = headers_baseline
+ self.upload_id = self.initiate_multipart_upload()
+
+ def initiate_multipart_upload(self):
+ """
+ Begin a multipart upload
+ http://docs.amazonwebservices.com/AmazonS3/latest/API/index.html?mpUploadInitiate.html
+ """
+ request = self.s3.create_request("OBJECT_POST", uri = self.uri, headers = self.headers_baseline, extra = "?uploads")
+ response = self.s3.send_request(request)
+ data = response["data"]
+ self.upload_id = getTextFromXml(data, "UploadId")
+ return self.upload_id
+
+ def upload_all_parts(self):
+ """
+ Execute a full multipart upload on a file
+ Returns the seq/etag dict
+ TODO use num_processes to thread it
+ """
+ if not self.upload_id:
+ raise RuntimeError("Attempting to use a multipart upload that has not been initiated.")
+
+ size_left = file_size = os.stat(self.file.name)[ST_SIZE]
+ self.chunk_size = self.s3.config.multipart_chunk_size_mb * 1024 * 1024
+ nr_parts = file_size / self.chunk_size + (file_size % self.chunk_size and 1)
+ debug("MultiPart: Uploading %s in %d parts" % (self.file.name, nr_parts))
+
+ seq = 1
+ while size_left > 0:
+ offset = self.chunk_size * (seq - 1)
+ current_chunk_size = min(file_size - offset, self.chunk_size)
+ size_left -= current_chunk_size
+ labels = {
+ 'source' : unicodise(self.file.name),
+ 'destination' : unicodise(self.uri.uri()),
+ 'extra' : "[part %d of %d, %s]" % (seq, nr_parts, "%d%sB" % formatSize(current_chunk_size, human_readable = True))
+ }
+ try:
+ self.upload_part(seq, offset, current_chunk_size, labels)
+ except:
+ error(u"Upload of '%s' part %d failed. Aborting multipart upload." % (self.file.name, seq))
+ self.abort_upload()
+ raise
+ seq += 1
+
+ debug("MultiPart: Upload finished: %d parts", seq - 1)
+
+ def upload_part(self, seq, offset, chunk_size, labels):
+ """
+ Upload a file chunk
+ http://docs.amazonwebservices.com/AmazonS3/latest/API/index.html?mpUploadUploadPart.html
+ """
+ # TODO implement Content-MD5
+ debug("Uploading part %i of %r (%s bytes)" % (seq, self.upload_id, chunk_size))
+ headers = { "content-length": chunk_size }
+ query_string = "?partNumber=%i&uploadId=%s" % (seq, self.upload_id)
+ request = self.s3.create_request("OBJECT_PUT", uri = self.uri, headers = headers, extra = query_string)
+ response = self.s3.send_file(request, self.file, labels, offset = offset, chunk_size = chunk_size)
+ self.parts[seq] = response["headers"]["etag"]
+ return response
+
+ def complete_multipart_upload(self):
+ """
+ Finish a multipart upload
+ http://docs.amazonwebservices.com/AmazonS3/latest/API/index.html?mpUploadComplete.html
+ """
+ debug("MultiPart: Completing upload: %s" % self.upload_id)
+
+ parts_xml = []
+ part_xml = "<Part><PartNumber>%i</PartNumber><ETag>%s</ETag></Part>"
+ for seq, etag in self.parts.items():
+ parts_xml.append(part_xml % (seq, etag))
+ body = "<CompleteMultipartUpload>%s</CompleteMultipartUpload>" % ("".join(parts_xml))
+
+ headers = { "content-length": len(body) }
+ request = self.s3.create_request("OBJECT_POST", uri = self.uri, headers = headers, extra = "?uploadId=%s" % (self.upload_id))
+ response = self.s3.send_request(request, body = body)
+
+ return response
+
+ def abort_upload(self):
+ """
+ Abort multipart upload
+ http://docs.amazonwebservices.com/AmazonS3/latest/API/index.html?mpUploadAbort.html
+ """
+ debug("MultiPart: Aborting upload: %s" % self.upload_id)
+ request = self.s3.create_request("OBJECT_DELETE", uri = self.uri, extra = "?uploadId=%s" % (self.upload_id))
+ response = self.s3.send_request(request)
+ return response
+
+# vim:et:ts=4:sts=4:ai
View
61 S3/S3.py
@@ -20,11 +20,12 @@
from Utils import *
from SortedDict import SortedDict
+from AccessLog import AccessLog
+from ACL import ACL, GranteeLogDelivery
from BidirMap import BidirMap
from Config import Config
from Exceptions import *
-from ACL import ACL, GranteeLogDelivery
-from AccessLog import AccessLog
+from MultiPart import MultiPartUpload
from S3Uri import S3Uri
try:
@@ -111,15 +112,16 @@ class S3(object):
PUT = 0x02,
HEAD = 0x04,
DELETE = 0x08,
- MASK = 0x0F,
- )
+ POST = 0x10,
+ MASK = 0x1F,
+ )
targets = BidirMap(
SERVICE = 0x0100,
BUCKET = 0x0200,
OBJECT = 0x0400,
MASK = 0x0700,
- )
+ )
operations = BidirMap(
UNDFINED = 0x0000,
@@ -131,13 +133,14 @@ class S3(object):
OBJECT_GET = targets["OBJECT"] | http_methods["GET"],
OBJECT_HEAD = targets["OBJECT"] | http_methods["HEAD"],
OBJECT_DELETE = targets["OBJECT"] | http_methods["DELETE"],
+ OBJECT_POST = targets["OBJECT"] | http_methods["POST"],
)
codes = {
"NoSuchBucket" : "Bucket '%s' does not exist",
"AccessDenied" : "Access to bucket '%s' was denied",
"BucketAlreadyExists" : "Bucket '%s' already exists",
- }
+ }
## S3 sometimes sends HTTP-307 response
redir_map = {}
@@ -345,10 +348,12 @@ def object_put(self, filename, uri, extra_headers = None, extra_label = ""):
size = os.stat(filename)[ST_SIZE]
except (IOError, OSError), e:
raise InvalidFileError(u"%s: %s" % (unicodise(filename), e.strerror))
+
headers = SortedDict(ignore_case = True)
if extra_headers:
headers.update(extra_headers)
- headers["content-length"] = size
+
+ ## MIME-type handling
content_type = self.config.mime_type
if not content_type and self.config.guess_mime_type:
content_type = mime_magic(filename)
@@ -356,10 +361,24 @@ def object_put(self, filename, uri, extra_headers = None, extra_label = ""):
content_type = self.config.default_mime_type
debug("Content-Type set to '%s'" % content_type)
headers["content-type"] = content_type
+
+ ## Other Amazon S3 attributes
if self.config.acl_public:
headers["x-amz-acl"] = "public-read"
if self.config.reduced_redundancy:
headers["x-amz-storage-class"] = "REDUCED_REDUNDANCY"
+
+ ## Multipart decision
+ multipart = False
+ if self.config.enable_multipart:
+ if size > self.config.multipart_chunk_size_mb * 1024 * 1024:
+ multipart = True
+ if multipart:
+ # Multipart requests are quite different... drop here
+ return self.send_file_multipart(file, headers, uri, size)
+
+ ## Not multipart...
+ headers["content-length"] = size
request = self.create_request("OBJECT_PUT", uri = uri, headers = headers)
labels = { 'source' : unicodise(filename), 'destination' : unicodise(uri.uri()), 'extra' : extra_label }
response = self.send_file(request, file, labels)
@@ -558,7 +577,9 @@ def send_request(self, request, body = None, retries = _max_retries):
for header in headers.keys():
headers[header] = str(headers[header])
conn = self.get_connection(resource['bucket'])
- conn.request(method_string, self.format_uri(resource), body, headers)
+ uri = self.format_uri(resource)
+ debug("Sending request method_string=%r, uri=%r, headers=%r, body=(%i bytes)" % (method_string, uri, headers, len(body or "")))
+ conn.request(method_string, uri, body, headers)
response = {}
http_response = conn.getresponse()
response["status"] = http_response.status
@@ -600,7 +621,7 @@ def send_request(self, request, body = None, retries = _max_retries):
return response
- def send_file(self, request, file, labels, throttle = 0, retries = _max_retries):
+ def send_file(self, request, file, labels, throttle = 0, retries = _max_retries, offset = 0, chunk_size = -1):
method_string, resource, headers = request.get_triplet()
size_left = size_total = headers.get("content-length")
if self.config.progress_meter:
@@ -623,15 +644,15 @@ def send_file(self, request, file, labels, throttle = 0, retries = _max_retries)
warning("Waiting %d sec..." % self._fail_wait(retries))
time.sleep(self._fail_wait(retries))
# Connection error -> same throttle value
- return self.send_file(request, file, labels, throttle, retries - 1)
+ return self.send_file(request, file, labels, throttle, retries - 1, offset, chunk_size)
else:
raise S3UploadError("Upload failed for: %s" % resource['uri'])
- file.seek(0)
+ file.seek(offset)
md5_hash = md5()
try:
while (size_left > 0):
#debug("SendFile: Reading up to %d bytes from '%s'" % (self.config.send_chunk, file.name))
- data = file.read(self.config.send_chunk)
+ data = file.read(min(self.config.send_chunk, size_left))
md5_hash.update(data)
conn.send(data)
if self.config.progress_meter:
@@ -660,7 +681,7 @@ def send_file(self, request, file, labels, throttle = 0, retries = _max_retries)
warning("Waiting %d sec..." % self._fail_wait(retries))
time.sleep(self._fail_wait(retries))
# Connection error -> same throttle value
- return self.send_file(request, file, labels, throttle, retries - 1)
+ return self.send_file(request, file, labels, throttle, retries - 1, offset, chunk_size)
else:
debug("Giving up on '%s' %s" % (file.name, e))
raise S3UploadError("Upload failed for: %s" % resource['uri'])
@@ -682,7 +703,7 @@ def send_file(self, request, file, labels, throttle = 0, retries = _max_retries)
redir_hostname = getTextFromXml(response['data'], ".//Endpoint")
self.set_hostname(redir_bucket, redir_hostname)
warning("Redirected to: %s" % (redir_hostname))
- return self.send_file(request, file, labels)
+ return self.send_file(request, file, labels, offset = offset, chunk_size = chunk_size)
# S3 from time to time doesn't send ETag back in a response :-(
# Force re-upload here.
@@ -705,7 +726,7 @@ def send_file(self, request, file, labels, throttle = 0, retries = _max_retries)
warning("Upload failed: %s (%s)" % (resource['uri'], S3Error(response)))
warning("Waiting %d sec..." % self._fail_wait(retries))
time.sleep(self._fail_wait(retries))
- return self.send_file(request, file, labels, throttle, retries - 1)
+ return self.send_file(request, file, labels, throttle, retries - 1, offset, chunk_size)
else:
warning("Too many failures. Giving up on '%s'" % (file.name))
raise S3UploadError
@@ -718,13 +739,21 @@ def send_file(self, request, file, labels, throttle = 0, retries = _max_retries)
warning("MD5 Sums don't match!")
if retries:
warning("Retrying upload of %s" % (file.name))
- return self.send_file(request, file, labels, throttle, retries - 1)
+ return self.send_file(request, file, labels, throttle, retries - 1, offset, chunk_size)
else:
warning("Too many failures. Giving up on '%s'" % (file.name))
raise S3UploadError
return response
+ def send_file_multipart(self, file, headers, uri, size):
+ chunk_size = self.config.multipart_chunk_size_mb * 1024 * 1024
+ upload = MultiPartUpload(self, file, uri, headers)
+ upload.upload_all_parts()
+ response = upload.complete_multipart_upload()
+ response["speed"] = 0 # XXX
+ return response
+
def recv_file(self, request, stream, labels, start_position = 0, retries = _max_retries):
method_string, resource, headers = request.get_triplet()
if self.config.progress_meter:
View
3  S3/S3Uri.py
@@ -40,6 +40,9 @@ def __str__(self):
def __unicode__(self):
return self.uri()
+ def __repr__(self):
+ return "<%s: %s>" % (self.__class__.__name__, self.__unicode__())
+
def public_url(self):
raise ValueError("This S3 URI does not have Anonymous URL representation")
View
16 s3cmd
@@ -837,6 +837,10 @@ def cmd_sync_local2remote(args):
s3 = S3(cfg)
+ ## FIXME
+ cfg.multipart_enabled = False
+ warning(u"MultiPart: disabled for 'sync' command. Don't panic, we'll fix it!")
+
if cfg.encrypt:
error(u"S3cmd 'sync' doesn't yet support GPG encryption, sorry.")
error(u"Either use unconditional 's3cmd put --recursive'")
@@ -1526,6 +1530,9 @@ def main():
optparser.add_option( "--encoding", dest="encoding", metavar="ENCODING", help="Override autodetected terminal and filesystem encoding (character set). Autodetected: %s" % preferred_encoding)
optparser.add_option( "--verbatim", dest="urlencoding_mode", action="store_const", const="verbatim", help="Use the S3 name as given on the command line. No pre-processing, encoding, etc. Use with caution!")
+ optparser.add_option( "--disable-multipart", dest="enable_multipart", action="store_false", help="Disable multipart upload on files bigger than --multipart-chunk-size")
+ optparser.add_option( "--multipart-chunk-size-mb", dest="multipart_chunk_size_mb", type="int", action="store", metavar="SIZE", help="Size of each chunk of a multipart upload. Files bigger than SIZE are automatically uploaded as multithreaded-multipart, smaller files are uploaded using the traditional method. SIZE is in Mega-Bytes, default chunk size is %defaultMB, minimum allowed chunk size is 5MB, maximum is 5GB.")
+
optparser.add_option( "--list-md5", dest="list_md5", action="store_true", help="Include MD5 sums in bucket listings (only for 'ls' command).")
optparser.add_option("-H", "--human-readable-sizes", dest="human_readable_sizes", action="store_true", help="Print sizes in human readable form (eg 1kB instead of 1234).")
@@ -1630,7 +1637,7 @@ def main():
if options.check_md5 == False:
try:
cfg.sync_checks.remove("md5")
- except:
+ except Exception:
pass
if options.check_md5 == True and cfg.sync_checks.count("md5") == 0:
cfg.sync_checks.append("md5")
@@ -1649,6 +1656,12 @@ def main():
cfg.update_option("enable", options.enable)
cfg.update_option("acl_public", options.acl_public)
+ ## Check multipart chunk constraints
+ if cfg.multipart_chunk_size_mb < MultiPartUpload.MIN_CHUNK_SIZE_MB:
+ raise ParameterError("Chunk size %d MB is too small, must be >= %d MB. Please adjust --multipart-chunk-size-mb" % (cfg.multipart_chunk_size_mb, MultiPartUpload.MIN_CHUNK_SIZE_MB))
+ if cfg.multipart_chunk_size_mb > MultiPartUpload.MAX_CHUNK_SIZE_MB:
+ raise ParameterError("Chunk size %d MB is too large, must be <= %d MB. Please adjust --multipart-chunk-size-mb" % (cfg.multipart_chunk_size_mb, MultiPartUpload.MAX_CHUNK_SIZE_MB))
+
## CloudFront's cf_enable and Config's enable share the same --enable switch
options.cf_enable = options.enable
@@ -1786,6 +1799,7 @@ if __name__ == '__main__':
from S3.CloudFront import Cmd as CfCmd
from S3.CloudFront import CloudFront
from S3.FileLists import *
+ from S3.MultiPart import MultiPartUpload
main()
sys.exit(0)
Please sign in to comment.
Something went wrong with that request. Please try again.