Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Reworked Multipart upload

- Converted to non-threaded upload again
  (will add threading for all uploads, not only multipart, later on)
- Using S3.send_file() instead of S3.send_request()
- Don't read data in the main loop, only compute offset and chunk size
  and leave it to S3.send_file() to read the data.
- Re-enabled progress indicator.

Still broken:
- "s3cmd sync" doesn't work with multipart uploaded files because
  the ETag no longer contains MD5sum of the file. MAJOR!
- Multipart upload abort is not triggered with all failures.
- s3cmd commands "mplist" and "mpabort" to be added.
- s3cmd should resume failed multipart uploads.
  • Loading branch information...
commit a184e0dea035142de3fc24c6bc899e6a4ddeaefd 1 parent 92ba05a
@mludvig mludvig authored
Showing with 55 additions and 82 deletions.
  1. +45 −71 S3/MultiPart.py
  2. +10 −11 S3/S3.py
View
116 S3/MultiPart.py
@@ -2,53 +2,16 @@
## Author: Jerome Leclanche <jerome.leclanche@gmail.com>
## License: GPL Version 2
-from Queue import Queue
-from threading import Thread
+import os
+from stat import ST_SIZE
from logging import debug, info, warning, error
-from Utils import getTextFromXml
-
-class Worker(Thread):
- """
- Thread executing tasks from a given tasks queue
- """
- def __init__(self, tasks):
- super(Worker, self).__init__()
- self.tasks = tasks
- self.daemon = True
- self.start()
-
- def run(self):
- while True:
- func, args, kargs = self.tasks.get()
- func(*args, **kargs)
- self.tasks.task_done()
-
-class ThreadPool(object):
- """
- Pool of threads consuming tasks from a queue
- """
- def __init__(self, num_threads):
- self.tasks = Queue(num_threads)
- for _ in range(num_threads):
- Worker(self.tasks)
-
- def add_task(self, func, *args, **kargs):
- """
- Add a task to the queue
- """
- self.tasks.put((func, args, kargs))
-
- def wait_completion(self):
- """
- Wait for completion of all the tasks in the queue
- """
- self.tasks.join()
+from Utils import getTextFromXml, formatSize, unicodise
+from Exceptions import S3UploadError
class MultiPartUpload(object):
MIN_CHUNK_SIZE_MB = 5 # 5MB
MAX_CHUNK_SIZE_MB = 5120 # 5GB
- MAX_CHUNKS = 100
MAX_FILE_SIZE = 42949672960 # 5TB
def __init__(self, s3, file, uri):
@@ -66,11 +29,10 @@ def initiate_multipart_upload(self):
request = self.s3.create_request("OBJECT_POST", uri = self.uri, extra = "?uploads")
response = self.s3.send_request(request)
data = response["data"]
- s3, key, upload_id = getTextFromXml(data, "Bucket"), getTextFromXml(data, "Key"), getTextFromXml(data, "UploadId")
- self.upload_id = upload_id
- return s3, key, upload_id
+ self.upload_id = getTextFromXml(data, "UploadId")
+ return self.upload_id
- def upload_all_parts(self, num_threads, chunk_size):
+ def upload_all_parts(self):
"""
Execute a full multipart upload on a file
Returns the id/etag dict
@@ -79,50 +41,52 @@ def upload_all_parts(self, num_threads, chunk_size):
if not self.upload_id:
raise RuntimeError("Attempting to use a multipart upload that has not been initiated.")
+ size_left = file_size = os.stat(self.file.name)[ST_SIZE]
+ self.chunk_size = self.s3.config.multipart_chunk_size_mb * 1024 * 1024
+ nr_parts = file_size / self.chunk_size + (file_size % self.chunk_size and 1)
+ debug("MultiPart: Uploading %s in %d parts" % (self.file.name, nr_parts))
+
id = 1
- if num_threads > 1:
- debug("MultiPart: Uploading in %d threads" % num_threads)
- pool = ThreadPool(num_threads)
- else:
- debug("MultiPart: Uploading in a single thread")
-
- while True:
- if id == self.MAX_CHUNKS:
- data = self.file.read(-1)
- else:
- data = self.file.read(chunk_size)
- if not data:
- break
- if num_threads > 1:
- pool.add_task(self.upload_part, data, id)
- else:
- self.upload_part(data, id)
+ while size_left > 0:
+ offset = self.chunk_size * (id - 1)
+ current_chunk_size = min(file_size - offset, self.chunk_size)
+ size_left -= current_chunk_size
+ labels = {
+ 'source' : unicodise(self.file.name),
+ 'destination' : unicodise(self.uri.uri()),
+ 'extra' : "[part %d of %d, %s]" % (id, nr_parts, "%d%sB" % formatSize(current_chunk_size, human_readable = True))
+ }
+ try:
+ self.upload_part(id, offset, current_chunk_size, labels)
+ except S3UploadError, e:
+ error(u"Upload of '%s' part %d failed too many times. Aborting multipart upload." % (self.file.name, id))
+ self.abort_upload()
+ raise e
id += 1
- if num_threads > 1:
- debug("Thread pool with %i threads and %i tasks awaiting completion." % (num_threads, id))
- pool.wait_completion()
+ debug("MultiPart: Upload finished: %d parts", id - 1)
- def upload_part(self, data, id):
+ def upload_part(self, id, offset, chunk_size, labels):
"""
Upload a file chunk
http://docs.amazonwebservices.com/AmazonS3/latest/API/index.html?mpUploadUploadPart.html
"""
# TODO implement Content-MD5
- content_length = str(len(data))
- debug("Uploading part %i of %r (%s bytes)" % (id, self.upload_id, content_length))
- headers = { "content-length": content_length }
+ debug("Uploading part %i of %r (%s bytes)" % (id, self.upload_id, chunk_size))
+ headers = { "content-length": chunk_size }
query_string = "?partNumber=%i&uploadId=%s" % (id, self.upload_id)
request = self.s3.create_request("OBJECT_PUT", uri = self.uri, headers = headers, extra = query_string)
- response = self.s3.send_request(request, body = data)
-
+ response = self.s3.send_file(request, self.file, labels, offset = offset, chunk_size = chunk_size)
self.parts[id] = response["headers"]["etag"]
+ return response
def complete_multipart_upload(self):
"""
Finish a multipart upload
http://docs.amazonwebservices.com/AmazonS3/latest/API/index.html?mpUploadComplete.html
"""
+ debug("MultiPart: Completing upload: %s" % self.upload_id)
+
parts_xml = []
part_xml = "<Part><PartNumber>%i</PartNumber><ETag>%s</ETag></Part>"
for id, etag in self.parts.items():
@@ -135,4 +99,14 @@ def complete_multipart_upload(self):
return response
+ def abort_upload(self):
+ """
+ Abort multipart upload
+ http://docs.amazonwebservices.com/AmazonS3/latest/API/index.html?mpUploadAbort.html
+ """
+ debug("MultiPart: Aborting upload: %s" % self.upload_id)
+ request = self.s3.create_request("OBJECT_DELETE", uri = self.uri, extra = "?uploadId=%s" % (self.upload_id))
+ response = self.s3.send_request(request, body = body)
+ return response
+
# vim:et:ts=4:sts=4:ai
View
21 S3/S3.py
@@ -616,7 +616,7 @@ def send_request(self, request, body = None, retries = _max_retries):
return response
- def send_file(self, request, file, labels, throttle = 0, retries = _max_retries):
+ def send_file(self, request, file, labels, throttle = 0, retries = _max_retries, offset = 0, chunk_size = -1):
method_string, resource, headers = request.get_triplet()
size_left = size_total = headers.get("content-length")
if self.config.progress_meter:
@@ -639,15 +639,15 @@ def send_file(self, request, file, labels, throttle = 0, retries = _max_retries)
warning("Waiting %d sec..." % self._fail_wait(retries))
time.sleep(self._fail_wait(retries))
# Connection error -> same throttle value
- return self.send_file(request, file, labels, throttle, retries - 1)
+ return self.send_file(request, file, labels, throttle, retries - 1, offset, chunk_size)
else:
raise S3UploadError("Upload failed for: %s" % resource['uri'])
- file.seek(0)
+ file.seek(offset)
md5_hash = md5()
try:
while (size_left > 0):
#debug("SendFile: Reading up to %d bytes from '%s'" % (self.config.send_chunk, file.name))
- data = file.read(self.config.send_chunk)
+ data = file.read(min(self.config.send_chunk, size_left))
md5_hash.update(data)
conn.send(data)
if self.config.progress_meter:
@@ -676,7 +676,7 @@ def send_file(self, request, file, labels, throttle = 0, retries = _max_retries)
warning("Waiting %d sec..." % self._fail_wait(retries))
time.sleep(self._fail_wait(retries))
# Connection error -> same throttle value
- return self.send_file(request, file, labels, throttle, retries - 1)
+ return self.send_file(request, file, labels, throttle, retries - 1, offset, chunk_size)
else:
debug("Giving up on '%s' %s" % (file.name, e))
raise S3UploadError("Upload failed for: %s" % resource['uri'])
@@ -698,7 +698,7 @@ def send_file(self, request, file, labels, throttle = 0, retries = _max_retries)
redir_hostname = getTextFromXml(response['data'], ".//Endpoint")
self.set_hostname(redir_bucket, redir_hostname)
warning("Redirected to: %s" % (redir_hostname))
- return self.send_file(request, file, labels)
+ return self.send_file(request, file, labels, offset = offset, chunk_size = chunk_size)
# S3 from time to time doesn't send ETag back in a response :-(
# Force re-upload here.
@@ -721,7 +721,7 @@ def send_file(self, request, file, labels, throttle = 0, retries = _max_retries)
warning("Upload failed: %s (%s)" % (resource['uri'], S3Error(response)))
warning("Waiting %d sec..." % self._fail_wait(retries))
time.sleep(self._fail_wait(retries))
- return self.send_file(request, file, labels, throttle, retries - 1)
+ return self.send_file(request, file, labels, throttle, retries - 1, offset, chunk_size)
else:
warning("Too many failures. Giving up on '%s'" % (file.name))
raise S3UploadError
@@ -734,7 +734,7 @@ def send_file(self, request, file, labels, throttle = 0, retries = _max_retries)
warning("MD5 Sums don't match!")
if retries:
warning("Retrying upload of %s" % (file.name))
- return self.send_file(request, file, labels, throttle, retries - 1)
+ return self.send_file(request, file, labels, throttle, retries - 1, offset, chunk_size)
else:
warning("Too many failures. Giving up on '%s'" % (file.name))
raise S3UploadError
@@ -743,13 +743,12 @@ def send_file(self, request, file, labels, throttle = 0, retries = _max_retries)
def send_file_multipart(self, file, headers, uri, size):
upload = MultiPartUpload(self, file, uri)
- bucket, key, upload_id = upload.initiate_multipart_upload()
+ upload_id = upload.initiate_multipart_upload()
num_threads = self.config.multipart_num_threads
chunk_size = self.config.multipart_chunk_size_mb * 1024 * 1024
- file.seek(0)
- upload.upload_all_parts(num_threads, chunk_size)
+ upload.upload_all_parts()
response = upload.complete_multipart_upload()
response["speed"] = 0 # XXX
return response
Please sign in to comment.
Something went wrong with that request. Please try again.