Skip to content
Browse files

Merge branch 'econnell/master' into econnell-merge

  • Loading branch information...
2 parents fb6441c + 3677935 commit 288b79747f9754156641c1a5537097240d247678 @mdomsch committed with
Showing with 92 additions and 38 deletions.
  1. +10 −0 S3/FileLists.py
  2. +46 −22 S3/MultiPart.py
  3. +24 −14 S3/S3.py
  4. +12 −2 s3cmd
View
10 S3/FileLists.py
@@ -140,6 +140,16 @@ def handle_exclude_include_walk(root, dirs, files):
def fetch_local_list(args, recursive = None):
def _get_filelist_local(loc_list, local_uri, cache):
info(u"Compiling list of local files...")
+
+ if deunicodise(local_uri.basename()) == "-":
+ loc_list = SortedDict(ignore_case = False)
+ loc_list["-"] = {
+ 'full_name_unicode' : '-',
+ 'full_name' : '-',
+ 'size' : -1,
+ 'mtime' : -1,
+ }
+ return loc_list, True
if local_uri.isdir():
local_base = deunicodise(local_uri.basename())
local_path = deunicodise(local_uri.path())
View
68 S3/MultiPart.py
@@ -42,32 +42,56 @@ def upload_all_parts(self):
if not self.upload_id:
raise RuntimeError("Attempting to use a multipart upload that has not been initiated.")
- size_left = file_size = os.stat(self.file.name)[ST_SIZE]
- self.chunk_size = self.s3.config.multipart_chunk_size_mb * 1024 * 1024
- nr_parts = file_size / self.chunk_size + (file_size % self.chunk_size and 1)
- debug("MultiPart: Uploading %s in %d parts" % (self.file.name, nr_parts))
+ if self.file.name != "<stdin>":
+ size_left = file_size = os.stat(self.file.name)[ST_SIZE]
+ nr_parts = file_size / self.chunk_size + (file_size % self.chunk_size and 1)
+ debug("MultiPart: Uploading %s in %d parts" % (self.file.name, nr_parts))
+ else:
+ debug("MultiPart: Uploading from %s" % (self.file.name))
+
+ self.chunk_size = self.s3.config.multipart_chunk_size_mb * 1024 * 1024
seq = 1
- while size_left > 0:
- offset = self.chunk_size * (seq - 1)
- current_chunk_size = min(file_size - offset, self.chunk_size)
- size_left -= current_chunk_size
- labels = {
- 'source' : unicodise(self.file.name),
- 'destination' : unicodise(self.uri.uri()),
- 'extra' : "[part %d of %d, %s]" % (seq, nr_parts, "%d%sB" % formatSize(current_chunk_size, human_readable = True))
- }
- try:
- self.upload_part(seq, offset, current_chunk_size, labels)
- except:
- error(u"Upload of '%s' part %d failed. Aborting multipart upload." % (self.file.name, seq))
- self.abort_upload()
- raise
- seq += 1
+ if self.file.name != "<stdin>":
+ while size_left > 0:
+ offset = self.chunk_size * (seq - 1)
+ current_chunk_size = min(file_size - offset, self.chunk_size)
+ size_left -= current_chunk_size
+ labels = {
+ 'source' : unicodise(self.file.name),
+ 'destination' : unicodise(self.uri.uri()),
+ 'extra' : "[part %d of %d, %s]" % (seq, nr_parts, "%d%sB" % formatSize(current_chunk_size, human_readable = True))
+ }
+ try:
+ self.upload_part(seq, offset, current_chunk_size, labels)
+ except:
+ error(u"Upload of '%s' part %d failed. Aborting multipart upload." % (self.file.name, seq))
+ self.abort_upload()
+ raise
+ seq += 1
+ else:
+ while True:
+ buffer = self.file.read(self.chunk_size)
+ offset = self.chunk_size * (seq - 1)
+ current_chunk_size = len(buffer)
+ labels = {
+ 'source' : unicodise(self.file.name),
+ 'destination' : unicodise(self.uri.uri()),
+ 'extra' : "[part %d, %s]" % (seq, "%d%sB" % formatSize(current_chunk_size, human_readable = True))
+ }
+ if len(buffer) == 0: # EOF
+ break
+ try:
+ self.upload_part(seq, offset, current_chunk_size, labels, buffer)
+ except:
+ error(u"Upload of '%s' part %d failed. Aborting multipart upload." % (self.file.name, seq))
+ self.abort_upload()
+ raise
+ seq += 1
debug("MultiPart: Upload finished: %d parts", seq - 1)
- def upload_part(self, seq, offset, chunk_size, labels):
+ def upload_part(self, seq, offset, chunk_size, labels, buffer = ''):
"""
Upload a file chunk
http://docs.amazonwebservices.com/AmazonS3/latest/API/index.html?mpUploadUploadPart.html
@@ -77,7 +101,7 @@ def upload_part(self, seq, offset, chunk_size, labels):
headers = { "content-length": chunk_size }
query_string = "?partNumber=%i&uploadId=%s" % (seq, self.upload_id)
request = self.s3.create_request("OBJECT_PUT", uri = self.uri, headers = headers, extra = query_string)
- response = self.s3.send_file(request, self.file, labels, offset = offset, chunk_size = chunk_size)
+ response = self.s3.send_file(request, self.file, labels, buffer, offset = offset, chunk_size = chunk_size)
self.parts[seq] = response["headers"]["etag"]
return response
View
38 S3/S3.py
@@ -345,11 +345,15 @@ def object_put(self, filename, uri, extra_headers = None, extra_label = ""):
if uri.type != "s3":
raise ValueError("Expected URI type 's3', got '%s'" % uri.type)
- if not os.path.isfile(filename):
+ if filename != "-" and not os.path.isfile(filename):
raise InvalidFileError(u"%s is not a regular file" % unicodise(filename))
try:
- file = open(filename, "rb")
- size = os.stat(filename)[ST_SIZE]
+ if filename == "-":
+ file = sys.stdin
+ size = 0
+ else:
+ file = open(filename, "rb")
+ size = os.stat(filename)[ST_SIZE]
except (IOError, OSError), e:
raise InvalidFileError(u"%s: %s" % (unicodise(filename), e.strerror))
@@ -359,7 +363,7 @@ def object_put(self, filename, uri, extra_headers = None, extra_label = ""):
## MIME-type handling
content_type = self.config.mime_type
- if not content_type and self.config.guess_mime_type:
+ if filename != "-" and not content_type and self.config.guess_mime_type:
content_type = mime_magic(filename)
if not content_type:
content_type = self.config.default_mime_type
@@ -374,8 +378,10 @@ def object_put(self, filename, uri, extra_headers = None, extra_label = ""):
## Multipart decision
multipart = False
+ if not self.config.enable_multipart and filename == "-":
+ raise ParameterError("Multi-part upload is required to upload from stdin")
if self.config.enable_multipart:
- if size > self.config.multipart_chunk_size_mb * 1024 * 1024:
+ if size > self.config.multipart_chunk_size_mb * 1024 * 1024 or filename == "-":
multipart = True
if multipart:
# Multipart requests are quite different... drop here
@@ -625,7 +631,7 @@ def send_request(self, request, body = None, retries = _max_retries):
return response
- def send_file(self, request, file, labels, throttle = 0, retries = _max_retries, offset = 0, chunk_size = -1):
+ def send_file(self, request, file, labels, buffer = '', throttle = 0, retries = _max_retries, offset = 0, chunk_size = -1):
method_string, resource, headers = request.get_triplet()
size_left = size_total = headers.get("content-length")
if self.config.progress_meter:
@@ -648,15 +654,19 @@ def send_file(self, request, file, labels, throttle = 0, retries = _max_retries,
warning("Waiting %d sec..." % self._fail_wait(retries))
time.sleep(self._fail_wait(retries))
# Connection error -> same throttle value
- return self.send_file(request, file, labels, throttle, retries - 1, offset, chunk_size)
+ return self.send_file(request, file, labels, buffer, throttle, retries - 1, offset, chunk_size)
else:
raise S3UploadError("Upload failed for: %s" % resource['uri'])
- file.seek(offset)
+ if buffer == '':
+ file.seek(offset)
md5_hash = md5()
try:
while (size_left > 0):
- #debug("SendFile: Reading up to %d bytes from '%s'" % (self.config.send_chunk, file.name))
- data = file.read(min(self.config.send_chunk, size_left))
+ #debug("SendFile: Reading up to %d bytes from '%s' - remaining bytes: %s" % (self.config.send_chunk, file.name, size_left))
+ if buffer == '':
+ data = file.read(min(self.config.send_chunk, size_left))
+ else:
+ data = buffer
md5_hash.update(data)
conn.send(data)
if self.config.progress_meter:
@@ -685,7 +695,7 @@ def send_file(self, request, file, labels, throttle = 0, retries = _max_retries,
warning("Waiting %d sec..." % self._fail_wait(retries))
time.sleep(self._fail_wait(retries))
# Connection error -> same throttle value
- return self.send_file(request, file, labels, throttle, retries - 1, offset, chunk_size)
+ return self.send_file(request, file, labels, buffer, throttle, retries - 1, offset, chunk_size)
else:
debug("Giving up on '%s' %s" % (file.name, e))
raise S3UploadError("Upload failed for: %s" % resource['uri'])
@@ -707,7 +717,7 @@ def send_file(self, request, file, labels, throttle = 0, retries = _max_retries,
redir_hostname = getTextFromXml(response['data'], ".//Endpoint")
self.set_hostname(redir_bucket, redir_hostname)
warning("Redirected to: %s" % (redir_hostname))
- return self.send_file(request, file, labels, offset = offset, chunk_size = chunk_size)
+ return self.send_file(request, file, labels, buffer, offset = offset, chunk_size = chunk_size)
# S3 from time to time doesn't send ETag back in a response :-(
# Force re-upload here.
@@ -730,7 +740,7 @@ def send_file(self, request, file, labels, throttle = 0, retries = _max_retries,
warning("Upload failed: %s (%s)" % (resource['uri'], S3Error(response)))
warning("Waiting %d sec..." % self._fail_wait(retries))
time.sleep(self._fail_wait(retries))
- return self.send_file(request, file, labels, throttle, retries - 1, offset, chunk_size)
+ return self.send_file(request, file, labels, buffer, throttle, retries - 1, offset, chunk_size)
else:
warning("Too many failures. Giving up on '%s'" % (file.name))
raise S3UploadError
@@ -743,7 +753,7 @@ def send_file(self, request, file, labels, throttle = 0, retries = _max_retries,
warning("MD5 Sums don't match!")
if retries:
warning("Retrying upload of %s" % (file.name))
- return self.send_file(request, file, labels, throttle, retries - 1, offset, chunk_size)
+ return self.send_file(request, file, labels, buffer, throttle, retries - 1, offset, chunk_size)
else:
warning("Too many failures. Giving up on '%s'" % (file.name))
raise S3UploadError
View
14 s3cmd
@@ -267,7 +267,13 @@ def cmd_object_put(args):
info(u"Summary: %d local files to upload" % local_count)
if local_count > 0:
- if not destination_base.endswith("/"):
+ if not single_file_local:
+ for key in local_list:
+ if key == "-":
+ raise ParameterError("Cannot specify multiple local files if uploading from '-' (ie stdin)")
+ elif single_file_local and local_list.keys()[0] == "-" and destination_base.endswith("/"):
+ raise ParameterError("Destination S3 URI must not end with '/' when uploading from stdin.")
+ elif not destination_base.endswith("/"):
if not single_file_local:
raise ParameterError("Destination S3 URI must end with '/' (ie must refer to a directory on the remote side).")
local_list[local_list.keys()[0]]['remote_uri'] = unicodise(destination_base)
@@ -279,7 +285,11 @@ def cmd_object_put(args):
for key in exclude_list:
output(u"exclude: %s" % unicodise(key))
for key in local_list:
- output(u"upload: %s -> %s" % (local_list[key]['full_name_unicode'], local_list[key]['remote_uri']))
+ if key != "-":
+ nicekey = local_list[key]['full_name_unicode']
+ else:
+ nicekey = "<stdin>"
+ output(u"upload: %s -> %s" % (nicekey, local_list[key]['remote_uri']))
warning(u"Exitting now because of --dry-run")
return

0 comments on commit 288b797

Please sign in to comment.
Something went wrong with that request. Please try again.