Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Loading…

Parallel multipart upload and download support for s3cmd #10

Closed
wants to merge 16 commits into from

1 participant

@t3rm1n4l

I have developed parallel multipart upload and download support for s3cmd.

We can enable the parallel upload and download using the following configuration.
parallel_multipart_download = False
parallel_multipart_upload = False
parallel_multipart_download_threads = 5
parallel_multipart_upload_threads = 5
parallel_multipart_download_count = 5
parallel_multipart_upload_count = 5

Please merge my changes to the s3cmd

Looking into your review comments.

Thanks
Sarath Lakshman
http://www.sarathlakshman.com

t3rm1n4l added some commits
@t3rm1n4l t3rm1n4l Adding changes to `recv_file` to support partial file download by spe…
…cifying start-position and end-position in bytes
a8f80ea
@t3rm1n4l t3rm1n4l Added `concat_files()` function
A function that takes destination file handle and list of source file handles, concatenate source files data and write into destination file
e60ae89
@t3rm1n4l t3rm1n4l Added `object_multipart_get()` function
* object_multipart_get() - Download a file from s3 by parallel download of multiple split files using Worker thread pool, merge split files and perform md5 checksum verification
* Added new parameters in Config to specify worker thread numbers, file split count and to toggle parallel split download on and off
8772c9a
@t3rm1n4l t3rm1n4l Added changes to s3cmd for switching parallel split download on/off b…
…ased on configuration file
7bacce0
@t3rm1n4l t3rm1n4l Cleanup handlers for temporary files and disk usage improvement to co…
…ncat_files() method to readily remove split files once data is read.
e515259
@t3rm1n4l t3rm1n4l Changed threading.active_count() to threading.activeCount() for backw…
…ard compatibility with python 2.4
16c3db4
@t3rm1n4l t3rm1n4l Added parameters signing support for auth signature calculation metho…
…d sign()
abf8a6b
@t3rm1n4l t3rm1n4l Added multipart upload support. Multipart upload can be enabled by a…
…dding parallel_multipart_upload = True in config file
14d3015
@t3rm1n4l t3rm1n4l Replaced email.Util.formatdate() with time module - ensure compatibil…
…ity with python 2.4
9f3003c
@t3rm1n4l t3rm1n4l FIX: Correct split numbering, unsymmetric file split issue 077bf4c
@t3rm1n4l t3rm1n4l s3cmd info - fix to show correct md5sum for multipart uploaded file b…
…ased on custom md5 meta header
bd70d2b
@t3rm1n4l t3rm1n4l Fix for python 2.4 daemon thread by using setDaemon(True) by replacin…
…g daemon=True
942b19f
@t3rm1n4l t3rm1n4l Disable progress bar for multipart upload b5f6590
@t3rm1n4l t3rm1n4l Added meta based md5 verfication for non-parallel downloader for file…
…s uploaded with multipart upload
2fb607a
@t3rm1n4l t3rm1n4l Added exit_status for s3cmd program
s3cmd does not return valid exit status codes. Hence it is unable to identify whether the program succeeded or failed (with cause of failure)
This commit adds exit status for s3cmd sync upload, sync download, get and put operations
Exit codes are as follows : SIZE_MISMATCH=1, MD5_MISMATCH=2, RETRIES_EXCEEDED=3, UPLOAD_ABORT=4, MD5_META_NOTFOUND=5, KEYBOARD_INTERRUPT=6
b30fd20
@t3rm1n4l t3rm1n4l Added separate config parameters for thread limit and split count wrt…
… to download and upload

added download config parameters parallel_multipart_download_threads (thread count), parallel_multipart_download_count (split count)
added upload config parameters parallel_multipart_upload_threads (thread count), parallel_multipart_upload_count (split count)
be7845f
@t3rm1n4l

There is bug in the code for exit_status definition. I am closing this pull and sending another pull request

@t3rm1n4l t3rm1n4l closed this
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Nov 6, 2011
  1. @t3rm1n4l

    Adding changes to `recv_file` to support partial file download by spe…

    t3rm1n4l authored
    …cifying start-position and end-position in bytes
  2. @t3rm1n4l

    Added `concat_files()` function

    t3rm1n4l authored
    A function that takes destination file handle and list of source file handles, concatenate source files data and write into destination file
  3. @t3rm1n4l

    Added `object_multipart_get()` function

    t3rm1n4l authored
    * object_multipart_get() - Download a file from s3 by parallel download of multiple split files using Worker thread pool, merge split files and perform md5 checksum verification
    * Added new parameters in Config to specify worker thread numbers, file split count and to toggle parallel split download on and off
  4. @t3rm1n4l

    Added changes to s3cmd for switching parallel split download on/off b…

    t3rm1n4l authored
    …ased on configuration file
  5. @t3rm1n4l

    Cleanup handlers for temporary files and disk usage improvement to co…

    t3rm1n4l authored
    …ncat_files() method to readily remove split files once data is read.
  6. @t3rm1n4l

    Changed threading.active_count() to threading.activeCount() for backw…

    t3rm1n4l authored
    …ard compatibility with python 2.4
  7. @t3rm1n4l
  8. @t3rm1n4l

    Added multipart upload support. Multipart upload can be enabled by a…

    t3rm1n4l authored
    …dding parallel_multipart_upload = True in config file
  9. @t3rm1n4l
  10. @t3rm1n4l
  11. @t3rm1n4l

    s3cmd info - fix to show correct md5sum for multipart uploaded file b…

    t3rm1n4l authored
    …ased on custom md5 meta header
  12. @t3rm1n4l
  13. @t3rm1n4l
  14. @t3rm1n4l

    Added meta based md5 verfication for non-parallel downloader for file…

    t3rm1n4l authored
    …s uploaded with multipart upload
  15. @t3rm1n4l

    Added exit_status for s3cmd program

    t3rm1n4l authored t3rm1n4l committed
    s3cmd does not return valid exit status codes. Hence it is unable to identify whether the program succeeded or failed (with cause of failure)
    This commit adds exit status for s3cmd sync upload, sync download, get and put operations
    Exit codes are as follows : SIZE_MISMATCH=1, MD5_MISMATCH=2, RETRIES_EXCEEDED=3, UPLOAD_ABORT=4, MD5_META_NOTFOUND=5, KEYBOARD_INTERRUPT=6
  16. @t3rm1n4l

    Added separate config parameters for thread limit and split count wrt…

    t3rm1n4l authored t3rm1n4l committed
    … to download and upload
    
    added download config parameters parallel_multipart_download_threads (thread count), parallel_multipart_download_count (split count)
    added upload config parameters parallel_multipart_upload_threads (thread count), parallel_multipart_upload_count (split count)
This page is out of date. Refresh to see the latest.
Showing with 437 additions and 48 deletions.
  1. +6 −0 S3/Config.py
  2. +352 −38 S3/S3.py
  3. +24 −0 S3/Utils.py
  4. +55 −10 s3cmd
View
6 S3/Config.py
@@ -81,6 +81,12 @@ class Config(object):
website_index = "index.html"
website_error = ""
website_endpoint = "http://%(bucket)s.s3-website-%(location)s.amazonaws.com/"
+ parallel_multipart_download = False
+ parallel_multipart_upload = False
+ parallel_multipart_download_threads = 5
+ parallel_multipart_upload_threads = 5
+ parallel_multipart_download_count = 5
+ parallel_multipart_upload_count = 5
## Creating a singleton
def __new__(self, configfile = None):
View
390 S3/S3.py
@@ -10,6 +10,8 @@
import logging
import mimetypes
import re
+import Queue
+import threading
from logging import debug, info, warning, error
from stat import ST_SIZE
@@ -22,6 +24,7 @@
from SortedDict import SortedDict
from BidirMap import BidirMap
from Config import Config
+from Utils import concat_files, hash_file_md5
from Exceptions import *
from ACL import ACL, GranteeLogDelivery
from AccessLog import AccessLog
@@ -60,7 +63,7 @@ def format_param_str(self):
return param_str and "?" + param_str[1:]
def sign(self):
- h = self.method_string + "\n"
+ h = self.method_string + "\n"
h += self.headers.get("content-md5", "")+"\n"
h += self.headers.get("content-type", "")+"\n"
h += self.headers.get("date", "")+"\n"
@@ -70,9 +73,18 @@ def sign(self):
if self.resource['bucket']:
h += "/" + self.resource['bucket']
h += self.resource['uri']
+
+ tmp_params = ""
+ for parameter in self.params:
+ if parameter in ['uploads', 'partNumber', 'uploadId', 'acl', 'location', 'logging', 'torrent']:
+ if self.params[parameter] != "":
+ tmp_params += '&%s=%s' %(parameter, self.params[parameter])
+ else:
+ tmp_params += '&%s' %parameter
+ if tmp_params != "":
+ h+='?'+tmp_params[1:].encode('UTF-8')
debug("SignHeaders: " + repr(h))
signature = sign_string(h)
-
self.headers["Authorization"] = "AWS "+self.s3.config.access_key+":"+signature
def get_triplet(self):
@@ -88,7 +100,8 @@ class S3(object):
PUT = 0x02,
HEAD = 0x04,
DELETE = 0x08,
- MASK = 0x0F,
+ POST = 0x20,
+ MASK = 0xFF,
)
targets = BidirMap(
@@ -107,6 +120,7 @@ class S3(object):
OBJECT_PUT = targets["OBJECT"] | http_methods["PUT"],
OBJECT_GET = targets["OBJECT"] | http_methods["GET"],
OBJECT_HEAD = targets["OBJECT"] | http_methods["HEAD"],
+ OBJECT_POST = targets["OBJECT"] | http_methods["POST"],
OBJECT_DELETE = targets["OBJECT"] | http_methods["DELETE"],
)
@@ -116,12 +130,25 @@ class S3(object):
"BucketAlreadyExists" : "Bucket '%s' already exists",
}
+ error_codes = {
+ "SIZE_MISMATCH":1,
+ "MD5_MISMATCH":2,
+ "RETRIES_EXCEEDED":3,
+ "UPLOAD_ABORT":4,
+ "MD5_META_NOTFOUND":5,
+ "KEYBOARD_INTERRUPT":6
+ }
+
+
## S3 sometimes sends HTTP-307 response
redir_map = {}
## Maximum attempts of re-issuing failed requests
_max_retries = 5
+ ##Default exit status = 0 (SUCCESS)
+ exit_status = 0
+
def __init__(self, config):
self.config = config
@@ -309,6 +336,156 @@ def website_delete(self, uri, bucket_location = None):
return response
+ def object_multipart_upload(self, filename, uri, cfg, extra_headers = None, extra_label = ""):
+ if uri.type != "s3":
+ raise ValueError("Expected URI type 's3', got '%s'" % uri.type)
+
+ if not os.path.isfile(filename):
+ raise InvalidFileError(u"%s is not a regular file" % unicodise(filename))
+ try:
+ file = open(filename, "rb")
+ file_size = os.stat(filename)[ST_SIZE]
+ except (IOError, OSError), e:
+ raise InvalidFileError(u"%s: %s" % (unicodise(filename), e.strerror))
+
+ parts_size = file_size / cfg.parallel_multipart_upload_count
+ debug("File size=%d parts size=%d" %(file_size, parts_size))
+ if parts_size < 5*1024*1024:
+ warning("File part size is less than minimum required size (5 MB). Disabled parallel multipart upload")
+ return self.object_put(filename, uri, extra_headers = extra_headers, extra_label = extra_label)
+
+ md5_hash = hash_file_md5(filename)
+ info("Calculating md5sum for %s" %filename)
+ headers = SortedDict(ignore_case = True)
+ if extra_headers:
+ headers.update(extra_headers)
+
+ content_type = self.config.mime_type
+ if not content_type and self.config.guess_mime_type:
+ content_type = mimetypes.guess_type(filename)[0]
+ if not content_type:
+ content_type = self.config.default_mime_type
+ debug("Content-Type set to '%s'" % content_type)
+ headers["content-type"] = content_type
+ if self.config.acl_public:
+ headers["x-amz-acl"] = "public-read"
+ if self.config.reduced_redundancy:
+ headers["x-amz-storage-class"] = "REDUCED_REDUNDANCY"
+
+ headers = {}
+ headers['date'] = time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime())
+ headers['x-amz-meta-md5sum'] = md5_hash
+ initiate_request = self.create_request("OBJECT_POST", headers = headers, uri = uri, uploads='')
+ initiate_response = self.send_request(initiate_request)
+ upload_id = getTextFromXml(initiate_response["data"], ".//UploadId")
+ #Upload single file size
+
+ debug("Upload ID = %s" %upload_id)
+
+ multipart_ranges = []
+ global upload_worker_queue
+ global part_upload_list
+
+ part_upload_list = {}
+ upload_worker_queue = Queue.Queue()
+ i = 1
+ for offset in range(0, file_size, parts_size):
+ start_offset = offset
+
+ if start_offset + parts_size - 1 < file_size:
+ end_offset = start_offset + parts_size - 1
+ if i == cfg.parallel_multipart_upload_count:
+ end_offset = file_size - 1
+ else:
+ end_offset = file_size - 1
+
+ item = {'part_no':i, 'start_position':start_offset, 'end_position':end_offset, 'uri':uri, 'upload_id':upload_id, 'filename':filename}
+
+ multipart_ranges.append(item)
+ upload_worker_queue.put(item)
+ debug("Part %d start=%d end=%d (part size=%d)" %(i, start_offset, end_offset, parts_size))
+ i+=1
+ if end_offset == file_size - 1:
+ break
+
+ def part_upload_worker():
+ while True:
+ try:
+ part_info = upload_worker_queue.get_nowait()
+ except Queue.Empty:
+ return
+ part_number = part_info['part_no']
+ start_position = part_info['start_position']
+ end_position = part_info['end_position']
+ uri = part_info['uri']
+ upload_id = part_info['upload_id']
+ filename = part_info['filename']
+
+ file = open(filename, 'rb')
+
+ headers = SortedDict(ignore_case = True)
+ #if extra_headers:
+ # headers.update(extra_headers)
+
+ headers["content-length"] = end_position - start_position + 1
+ headers['Expect'] = '100-continue'
+
+ request = self.create_request("OBJECT_PUT", uri = uri, headers = headers, partNumber = part_number, uploadId = upload_id)
+ labels = { 'source' : unicodise(filename), 'destination' : unicodise(uri.uri()), 'extra' : extra_label }
+ try:
+ response = self.send_file(request, file, labels, retries = self._max_retries, part_info = part_info)
+ except S3UploadError, e:
+ self.abort_multipart_upload(uri, upload_id)
+ file.close()
+ raise S3UploadError("Failed to upload part-%d to S3" %part_info['part_no'])
+ part_upload_list[part_number] = response["headers"]["etag"].strip('"\'')
+ file.close()
+
+ for i in range(cfg.parallel_multipart_upload_threads):
+ t = threading.Thread(target=part_upload_worker)
+ t.setDaemon(True)
+ t.start()
+
+ timestamp_start = time.time()
+ while threading.activeCount() > 1:
+ time.sleep(0.1)
+ debug("Upload of file parts complete")
+
+ body = "<CompleteMultipartUpload>\n"
+ for part in part_upload_list.keys():
+ body += " <Part>\n"
+ body += " <PartNumber>%d</PartNumber>\n" %part
+ body += " <ETag>%s</ETag>\n" %part_upload_list[part]
+ body += " </Part>\n"
+ body += "</CompleteMultipartUpload>"
+
+ complete_request = self.create_request("OBJECT_POST", uri = uri, uploadId = upload_id)
+ response = self.send_request(complete_request, body)
+ timestamp_end = time.time()
+
+ object_info = self.object_info(uri)
+ upload_size = int(object_info['headers']['content-length'])
+ #file_md5sum = info['headers']['etag'].strip('"')
+
+ response = {}
+ response["headers"] = object_info["headers"]
+ #response["md5match"] = file_md5sum.strip() == md5_hash_file.strip()
+ response["md5"] = md5_hash
+ #if not response["md5match"]:
+ # warning("MD5 signatures do not match: computed=%s, received=%s" % (md5_hash_file, file_md5sum))
+ # warning("Aborting file upload")
+ # self.abort_multipart_upload(uri, upload_id)
+ # raise S3UploadError
+
+ response["elapsed"] = timestamp_end - timestamp_start
+ response["size"] = file_size
+ response["speed"] = response["elapsed"] and float(response["size"]) / response["elapsed"] or float(-1)
+ if response["size"] != upload_size:
+ warning("Reported size (%s) does not match received size (%s)" % (upload_size, response["size"]))
+ self.abort_multipart_upload()
+ return response
+
+
def object_put(self, filename, uri, extra_headers = None, extra_label = ""):
# TODO TODO
# Make it consistent with stream-oriented object_get()
@@ -339,7 +516,7 @@ def object_put(self, filename, uri, extra_headers = None, extra_label = ""):
headers["x-amz-storage-class"] = "REDUCED_REDUNDANCY"
request = self.create_request("OBJECT_PUT", uri = uri, headers = headers)
labels = { 'source' : unicodise(filename), 'destination' : unicodise(uri.uri()), 'extra' : extra_label }
- response = self.send_file(request, file, labels)
+ response = self.send_file(request, file, labels, retries = self._max_retries)
return response
def object_get(self, uri, stream, start_position = 0, extra_label = ""):
@@ -350,6 +527,94 @@ def object_get(self, uri, stream, start_position = 0, extra_label = ""):
response = self.recv_file(request, stream, labels, start_position)
return response
+ def object_multipart_get(self, uri, stream, cfg, start_position = 0, extra_label = ""):
+ debug("Executing multipart download")
+ if uri.type != "s3":
+ raise ValueError("Expected URI type 's3', got '%s'" % uri.type)
+ object_info = self.object_info(uri)
+ file_size = int(object_info['headers']['content-length'])
+ file_md5sum = object_info['headers']['etag'].strip('"')
+ if len(file_md5sum.split('-')) == 2:
+ try:
+ file_md5sum = object_info['headers']['x-amz-meta-md5sum']
+ except:
+ warning('md5sum meta information not found in multipart uploaded file')
+
+ multipart_ranges = []
+ parts_size = file_size / cfg.parallel_multipart_download_count
+ global worker_queue
+ tmp_dir = os.path.join(os.path.dirname(stream.name),'tmps3')
+ os.makedirs(tmp_dir)
+
+ worker_queue = Queue.Queue()
+ i = 1
+ for offset in range(0, file_size, parts_size):
+ start_offset = offset
+ if start_offset + parts_size - 1 < file_size:
+ end_offset = start_offset + parts_size - 1
+ if i == cfg.parallel_multipart_download_count:
+ end_offset = file_size - 1
+ else:
+ end_offset = file_size - 1
+
+ part_stream = open(os.path.join(tmp_dir, "%s.part-%d" %(os.path.basename(stream.name), i)),'wb+')
+ item = (i, start_offset, end_offset, uri, part_stream)
+
+ multipart_ranges.append(item)
+ worker_queue.put(item)
+ i+=1
+
+ if end_offset == file_size - 1:
+ break
+
+
+ def get_worker():
+ while True:
+ try:
+ item = worker_queue.get_nowait()
+ except Queue.Empty:
+ return
+ offset = item[0]
+ start_position = item[1]
+ end_position = item[2]
+ uri = item[3]
+ stream = item[4]
+ request = self.create_request("OBJECT_GET", uri = uri)
+ labels = { 'source' : unicodise(uri.uri()), 'destination' : unicodise(stream.name), 'extra' : extra_label }
+ self.recv_file(request, stream, labels, start_position, retries = self._max_retries, end_position = end_position)
+
+ for i in range(cfg.parallel_multipart_download_threads):
+ t = threading.Thread(target=get_worker)
+ t.setDaemon(True)
+ t.start()
+
+ timestamp_start = time.time()
+ while threading.activeCount() > 1:
+ time.sleep(0.1)
+ debug("Download of file parts complete")
+ source_streams = map(lambda x: x[4], multipart_ranges)
+ md5_hash_download, download_size = concat_files(stream, True, *source_streams)
+ timestamp_end = time.time()
+ os.rmdir(tmp_dir)
+ stream.flush()
+
+ debug("ReceivedFile: Computed MD5 = %s" % md5_hash_download)
+ response = {}
+ response["headers"] = object_info["headers"]
+ response["md5match"] = file_md5sum.strip() == md5_hash_download.strip()
+ response["md5"] = file_md5sum
+ if not response["md5match"]:
+ warning("MD5 signatures do not match: computed=%s, received=%s" % (md5_hash_download, file_md5sum))
+ self.exit_status = self.error_codes["MD5_MISMATCH"]
+
+ response["elapsed"] = timestamp_end - timestamp_start
+ response["size"] = file_size
+ response["speed"] = response["elapsed"] and float(response["size"]) / response["elapsed"] or float(-1)
+ if response["size"] != download_size:
+ warning("Reported size (%s) does not match received size (%s)" % (download_size, response["size"]))
+ self.exit_status = self.error_codes["SIZE_MISMATCH"]
+ return response
+
def object_delete(self, uri):
if uri.type != "s3":
raise ValueError("Expected URI type 's3', got '%s'" % uri.type)
@@ -519,6 +784,7 @@ def create_request(self, operation, uri = None, bucket = None, object = None, he
request = S3Request(self, method_string, resource, headers, params)
debug("CreateRequest: resource[uri]=" + resource['uri'])
+ debug("Request: headers="+str(headers))
return request
def _fail_wait(self, retries):
@@ -535,6 +801,7 @@ def send_request(self, request, body = None, retries = _max_retries):
for header in headers.keys():
headers[header] = str(headers[header])
conn = self.get_connection(resource['bucket'])
+ debug("Sending Request: method:%s body: %s uri: %s headers:%s" %(method_string,body,self.format_uri(resource),str(headers)))
conn.request(method_string, self.format_uri(resource), body, headers)
response = {}
http_response = conn.getresponse()
@@ -577,13 +844,25 @@ def send_request(self, request, body = None, retries = _max_retries):
return response
- def send_file(self, request, file, labels, throttle = 0, retries = _max_retries):
+ def abort_multipart_upload(self, uri, upload_id):
+ headers = {}
+ headers['date'] = time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime())
+ request = self.create_request("OBJECT_DELETE", headers = headers, uri = uri, UploadId = upload_id)
+ response = self.send_request(request)
+ self.exit_status = self.error_codes["UPLOAD_ABORT"]
+ return response
+
+ def send_file(self, request, file, labels, throttle = 0, retries = _max_retries, part_info = None):
method_string, resource, headers = request.get_triplet()
size_left = size_total = headers.get("content-length")
if self.config.progress_meter:
progress = self.config.progress_class(labels, size_total)
else:
- info("Sending file '%s', please wait..." % file.name)
+ if part_info:
+ info("Sending file '%s' part-%d, please wait..." % (file.name, part_info['part_no']))
+ else:
+ info("Sending file '%s', please wait..." % file.name)
+
timestamp_start = time.time()
try:
conn = self.get_connection(resource['bucket'])
@@ -600,15 +879,25 @@ def send_file(self, request, file, labels, throttle = 0, retries = _max_retries)
warning("Waiting %d sec..." % self._fail_wait(retries))
time.sleep(self._fail_wait(retries))
# Connection error -> same throttle value
- return self.send_file(request, file, labels, throttle, retries - 1)
+ return self.send_file(request, file, labels, throttle, retries - 1, part_info)
else:
+ self.exit_status = self.error_codes["RETRIES_EXCEEDED"]
raise S3UploadError("Upload failed for: %s" % resource['uri'])
- file.seek(0)
+
+ if part_info:
+ file.seek(part_info['start_position'])
+ else:
+ file.seek(0)
+
md5_hash = md5()
try:
while (size_left > 0):
#debug("SendFile: Reading up to %d bytes from '%s'" % (self.config.send_chunk, file.name))
- data = file.read(self.config.send_chunk)
+ if size_left < self.config.send_chunk:
+ chunk_size = size_left
+ else:
+ chunk_size = self.config.send_chunk
+ data = file.read(chunk_size)
md5_hash.update(data)
conn.send(data)
if self.config.progress_meter:
@@ -629,6 +918,7 @@ def send_file(self, request, file, labels, throttle = 0, retries = _max_retries)
except Exception, e:
if self.config.progress_meter:
progress.done("failed")
+ debug("Retries:"+str(retries))
if retries:
if retries < self._max_retries:
throttle = throttle and throttle * 5 or 0.01
@@ -637,9 +927,10 @@ def send_file(self, request, file, labels, throttle = 0, retries = _max_retries)
warning("Waiting %d sec..." % self._fail_wait(retries))
time.sleep(self._fail_wait(retries))
# Connection error -> same throttle value
- return self.send_file(request, file, labels, throttle, retries - 1)
+ return self.send_file(request, file, labels, throttle, retries - 1, part_info)
else:
debug("Giving up on '%s' %s" % (file.name, e))
+ self.exit_status = self.error_codes["RETRIES_EXCEEDED"]
raise S3UploadError("Upload failed for: %s" % resource['uri'])
timestamp_end = time.time()
@@ -659,7 +950,7 @@ def send_file(self, request, file, labels, throttle = 0, retries = _max_retries)
redir_hostname = getTextFromXml(response['data'], ".//Endpoint")
self.set_hostname(redir_bucket, redir_hostname)
warning("Redirected to: %s" % (redir_hostname))
- return self.send_file(request, file, labels)
+ return self.send_file(request, file, labels, retries, part_info)
# S3 from time to time doesn't send ETag back in a response :-(
# Force re-upload here.
@@ -682,9 +973,10 @@ def send_file(self, request, file, labels, throttle = 0, retries = _max_retries)
warning("Upload failed: %s (%s)" % (resource['uri'], S3Error(response)))
warning("Waiting %d sec..." % self._fail_wait(retries))
time.sleep(self._fail_wait(retries))
- return self.send_file(request, file, labels, throttle, retries - 1)
+ return self.send_file(request, file, labels, throttle, retries - 1, part_info)
else:
warning("Too many failures. Giving up on '%s'" % (file.name))
+ self.exit_status = self.error_codes["RETRIES_EXCEEDED"]
raise S3UploadError
## Non-recoverable error
@@ -693,21 +985,24 @@ def send_file(self, request, file, labels, throttle = 0, retries = _max_retries)
debug("MD5 sums: computed=%s, received=%s" % (md5_computed, response["headers"]["etag"]))
if response["headers"]["etag"].strip('"\'') != md5_hash.hexdigest():
warning("MD5 Sums don't match!")
+ self.exit_status = self.error_codes["MD5_MISMATCH"]
if retries:
warning("Retrying upload of %s" % (file.name))
- return self.send_file(request, file, labels, throttle, retries - 1)
+ return self.send_file(request, file, labels, throttle, retries - 1, part_info)
else:
warning("Too many failures. Giving up on '%s'" % (file.name))
+ self.exit_status = self.error_codes["RETRIES_EXCEEDED"]
raise S3UploadError
return response
- def recv_file(self, request, stream, labels, start_position = 0, retries = _max_retries):
+ def recv_file(self, request, stream, labels, start_position = 0, retries = _max_retries, end_position = -1):
method_string, resource, headers = request.get_triplet()
if self.config.progress_meter:
progress = self.config.progress_class(labels, 0)
else:
info("Receiving file '%s', please wait..." % stream.name)
+ stream.seek(0)
timestamp_start = time.time()
try:
conn = self.get_connection(resource['bucket'])
@@ -715,9 +1010,12 @@ def recv_file(self, request, stream, labels, start_position = 0, retries = _max_
conn.putrequest(method_string, self.format_uri(resource))
for header in headers.keys():
conn.putheader(header, str(headers[header]))
- if start_position > 0:
+ if start_position > 0 and end_position == -1:
debug("Requesting Range: %d .. end" % start_position)
conn.putheader("Range", "bytes=%d-" % start_position)
+ elif end_position != -1:
+ debug("Requesting Range: %d .. %d" % (start_position, end_position))
+ conn.putheader("Range", "bytes=%d-%d" % (start_position, end_position))
conn.endheaders()
response = {}
http_response = conn.getresponse()
@@ -733,8 +1031,9 @@ def recv_file(self, request, stream, labels, start_position = 0, retries = _max_
warning("Waiting %d sec..." % self._fail_wait(retries))
time.sleep(self._fail_wait(retries))
# Connection error -> same throttle value
- return self.recv_file(request, stream, labels, start_position, retries - 1)
+ return self.recv_file(request, stream, labels, start_position, retries - 1, end_position)
else:
+ self.exit_status = self.error_codes["RETRIES_EXCEEDED"]
raise S3DownloadError("Download failed for: %s" % resource['uri'])
if response["status"] == 307:
@@ -744,12 +1043,12 @@ def recv_file(self, request, stream, labels, start_position = 0, retries = _max_
redir_hostname = getTextFromXml(response['data'], ".//Endpoint")
self.set_hostname(redir_bucket, redir_hostname)
warning("Redirected to: %s" % (redir_hostname))
- return self.recv_file(request, stream, labels)
+ return self.recv_file(request, stream, labels, start_position, retries, end_position)
if response["status"] < 200 or response["status"] > 299:
raise S3Error(response)
- if start_position == 0:
+ if start_position == 0 and end_position == -1:
# Only compute MD5 on the fly if we're downloading from beginning
# Otherwise we'd get a nonsense.
md5_hash = md5()
@@ -767,7 +1066,7 @@ def recv_file(self, request, stream, labels, start_position = 0, retries = _max_
this_chunk = size_left > self.config.recv_chunk and self.config.recv_chunk or size_left
data = http_response.read(this_chunk)
stream.write(data)
- if start_position == 0:
+ if start_position == 0 and end_position == -1:
md5_hash.update(data)
current_position += len(data)
## Call progress meter from here...
@@ -782,8 +1081,12 @@ def recv_file(self, request, stream, labels, start_position = 0, retries = _max_
warning("Waiting %d sec..." % self._fail_wait(retries))
time.sleep(self._fail_wait(retries))
# Connection error -> same throttle value
- return self.recv_file(request, stream, labels, current_position, retries - 1)
+ if end_position != -1:
+ return self.recv_file(request, stream, labels, current_position, retries - 1)
+ else:
+ return self.recv_file(request, stream, labels, start_position, retries - 1, end_position)
else:
+ self.exit_status = self.error_codes["RETRIES_EXCEEDED"]
raise S3DownloadError("Download failed for: %s" % resource['uri'])
stream.flush()
@@ -796,30 +1099,41 @@ def recv_file(self, request, stream, labels, start_position = 0, retries = _max_
progress.update()
progress.done("done")
- if start_position == 0:
- # Only compute MD5 on the fly if we were downloading from the beginning
- response["md5"] = md5_hash.hexdigest()
- else:
- # Otherwise try to compute MD5 of the output file
- try:
- response["md5"] = hash_file_md5(stream.name)
- except IOError, e:
- if e.errno != errno.ENOENT:
- warning("Unable to open file: %s: %s" % (stream.name, e))
- warning("Unable to verify MD5. Assume it matches.")
- response["md5"] = response["headers"]["etag"]
-
- response["md5match"] = response["headers"]["etag"].find(response["md5"]) >= 0
+ if end_position == -1:
+ if start_position == 0:
+ # Only compute MD5 on the fly if we were downloading from the beginning
+ response["md5"] = md5_hash.hexdigest()
+ else:
+ # Otherwise try to compute MD5 of the output file
+ try:
+ response["md5"] = hash_file_md5(stream.name)
+ except IOError, e:
+ if e.errno != errno.ENOENT:
+ warning("Unable to open file: %s: %s" % (stream.name, e))
+ warning("Unable to verify MD5. Assume it matches.")
+ response["md5"] = response["headers"]["etag"]
+
+ file_md5sum = response["headers"]["etag"].strip('"\'')
+ if len(response["headers"]["etag"].split('-')) == 2:
+ try:
+ file_md5sum = response['headers']['x-amz-meta-md5sum']
+ except:
+ warning('md5sum meta information not found in multipart uploaded file')
+ self.exit_status = self.error_codes["MD5_META_NOTFOUND"]
+
+ response["md5match"] = file_md5sum == response["md5"]
+ debug("ReceiveFile: Computed MD5 = %s" % response["md5"])
+ if not response["md5match"]:
+ warning("MD5 signatures do not match: computed=%s, received=%s" % (
+ response["md5"], response["headers"]["etag"]))
+ self.exit_status = self.error_codes["MD5_MISMATCH"]
response["elapsed"] = timestamp_end - timestamp_start
response["size"] = current_position
response["speed"] = response["elapsed"] and float(response["size"]) / response["elapsed"] or float(-1)
if response["size"] != start_position + long(response["headers"]["content-length"]):
warning("Reported size (%s) does not match received size (%s)" % (
start_position + response["headers"]["content-length"], response["size"]))
- debug("ReceiveFile: Computed MD5 = %s" % response["md5"])
- if not response["md5match"]:
- warning("MD5 signatures do not match: computed=%s, received=%s" % (
- response["md5"], response["headers"]["etag"]))
+ self.exit_status = self.error_codes["SIZE_MISMATCH"]
return response
__all__.append("S3")
View
24 S3/Utils.py
@@ -223,6 +223,30 @@ def hash_file_md5(filename):
return h.hexdigest()
__all__.append("hash_file_md5")
+def concat_files(dest_handle, unlink = True, *source_handles):
+ """
+ Read data from source file handles and write the data into dest_handle
+ Return md5-hash and file size
+ """
+ h = md5()
+ for source in source_handles:
+ source.seek(0)
+ while True:
+ # Hash 32kB chunks
+ data = source.read(32*1024)
+ dest_handle.write(data)
+ if not data:
+ break
+ h.update(data)
+
+ if unlink == True:
+ source.close()
+ os.unlink(source.name)
+
+ return h.hexdigest(), dest_handle.tell()
+__all__.append("concat_files")
+
+
def mkdir_with_parents(dir_name):
"""
mkdir_with_parents(dst_dir)
View
65 s3cmd
@@ -32,6 +32,16 @@ from distutils.spawn import find_executable
def output(message):
sys.stdout.write(message + "\n")
+def clean_tempfiles(dest_handler):
+ path = os.path.join(os.path.dirname(dest_handler.name),'tmps3')
+ if os.path.exists(path):
+ for f in os.listdir(path):
+ fpath = os.path.join(path,f)
+ os.unlink(fpath)
+ os.rmdir(path)
+ if os.stat(dest_handler.name).st_size == 0:
+ os.unlink(dest_handler.name)
+
def check_args_type(args, type, verbose_type):
for arg in args:
if S3Uri(arg).type != type:
@@ -315,6 +325,8 @@ def cmd_object_put(args):
debug(u"Removing temporary encrypted file: %s" % unicodise(full_name))
os.remove(full_name)
+ return s3.exit_status
+
def cmd_object_get(args):
cfg = Config()
s3 = S3(cfg)
@@ -449,6 +461,8 @@ def cmd_object_get(args):
output(u"File %s saved as '%s' (%d bytes in %0.1f seconds, %0.2f %sB/s)" %
(uri, destination, response["size"], response["elapsed"], speed_fmt[0], speed_fmt[1]))
+ return s3.exit_status
+
def cmd_object_del(args):
for uri_str in args:
uri = S3Uri(uri_str)
@@ -563,7 +577,10 @@ def cmd_info(args):
output(u" File size: %s" % info['headers']['content-length'])
output(u" Last mod: %s" % info['headers']['last-modified'])
output(u" MIME type: %s" % info['headers']['content-type'])
- output(u" MD5 sum: %s" % info['headers']['etag'].strip('"'))
+ if info['headers'].has_key('x-amz-meta-md5sum'):
+ output(u" MD5 sum: %s" % info['headers']['x-amz-meta-md5sum'].strip('"'))
+ else:
+ output(u" MD5 sum: %s" % info['headers']['etag'].strip('"'))
else:
info = s3.bucket_info(uri)
output(u"%s (bucket):" % uri.uri())
@@ -663,6 +680,11 @@ def cmd_sync_remote2local(args):
attrs[key] = val
return attrs
+ if cfg.parallel_multipart_download:
+ #Disable progress meter
+ cfg.progress_meter = False
+
+
s3 = S3(Config())
destination_base = args[-1]
@@ -744,7 +766,10 @@ def cmd_sync_remote2local(args):
os.close(os.open(dst_file, open_flags))
# Yeah I know there is a race condition here. Sadly I don't know how to open() in exclusive mode.
dst_stream = open(dst_file, "wb")
- response = s3.object_get(uri, dst_stream, extra_label = seq_label)
+ if cfg.parallel_multipart_download == False:
+ response = s3.object_get(uri, dst_stream, extra_label = seq_label)
+ else:
+ response = s3.object_multipart_get(uri, dst_stream, cfg, extra_label = seq_label)
dst_stream.close()
if response['headers'].has_key('x-amz-meta-s3cmd-attrs') and cfg.preserve_attrs:
attrs = _parse_attrs_header(response['headers']['x-amz-meta-s3cmd-attrs'])
@@ -756,7 +781,9 @@ def cmd_sync_remote2local(args):
os.utime(dst_file, (atime, mtime))
## FIXME: uid/gid / uname/gname handling comes here! TODO
except OSError, e:
- try: dst_stream.close()
+ try:
+ dst_stream.close()
+ clean_tempfiles(dst_stream)
except: pass
if e.errno == errno.EEXIST:
warning(u"%s exists - not overwriting" % (dst_file))
@@ -769,19 +796,25 @@ def cmd_sync_remote2local(args):
continue
raise e
except KeyboardInterrupt:
- try: dst_stream.close()
+ try:
+ dst_stream.close()
+ clean_tempfiles(dst_stream)
except: pass
warning(u"Exiting after keyboard interrupt")
- return
+ return s3.error_codes["KEYBOARD_INTERRUPT"]
except Exception, e:
- try: dst_stream.close()
+ try:
+ dst_stream.close()
+ clean_tempfiles(dst_stream)
except: pass
error(u"%s: %s" % (file, e))
continue
# We have to keep repeating this call because
# Python 2.4 doesn't support try/except/finally
# construction :-(
- try: dst_stream.close()
+ try:
+ dst_stream.close()
+ clean_tempfiles(dst_stream)
except: pass
except S3DownloadError, e:
error(u"%s: download failed too many times. Skipping that file." % file)
@@ -804,6 +837,8 @@ def cmd_sync_remote2local(args):
else:
info(outstr)
+ return s3.exit_status
+
def cmd_sync_local2remote(args):
def _build_attr_header(src):
import pwd, grp
@@ -835,6 +870,10 @@ def cmd_sync_local2remote(args):
for k in attrs: result += "%s:%s/" % (k, attrs[k])
return { 'x-amz-meta-s3cmd-attrs' : result[:-1] }
+ if cfg.parallel_multipart_upload:
+ #Disable progress meter
+ cfg.progress_meter = False
+
s3 = S3(cfg)
if cfg.encrypt:
@@ -919,7 +958,10 @@ def cmd_sync_local2remote(args):
attr_header = _build_attr_header(src)
debug(u"attr_header: %s" % attr_header)
extra_headers.update(attr_header)
- response = s3.object_put(src, uri, extra_headers, extra_label = seq_label)
+ if cfg.parallel_multipart_upload:
+ response = s3.object_multipart_upload(src, uri, cfg, extra_headers, extra_label = seq_label)
+ else:
+ response = s3.object_put(src, uri, extra_headers, extra_label = seq_label)
except InvalidFileError, e:
warning(u"File can not be uploaded: %s" % e)
continue
@@ -956,6 +998,8 @@ def cmd_sync_local2remote(args):
if result['status'] == 201:
output("Created invalidation request for %d paths" % len(uploaded_objects_list))
output("Check progress with: s3cmd cfinvalinfo cf://%s/%s" % (result['dist_id'], result['request_id']))
+ return s3.exit_status
+
def cmd_sync(args):
if (len(args) < 2):
@@ -1729,7 +1773,8 @@ def main():
sys.exit(1)
try:
- cmd_func(args)
+ rv = cmd_func(args)
+ sys.exit(rv)
except S3Error, e:
error(u"S3 error: %s" % e)
sys.exit(1)
@@ -1803,7 +1848,7 @@ if __name__ == '__main__':
except KeyboardInterrupt:
sys.stderr.write("See ya!\n")
- sys.exit(1)
+ sys.exit(6)
except Exception, e:
report_exception(e)
Something went wrong with that request. Please try again.