Browse files

Merge branch 'future'

  • Loading branch information...
2 parents 5b885b2 + 86eb449 commit 4612664f476bb5aac847d825255237b4c8b290f8 @SaltwaterC SaltwaterC committed Jan 30, 2012
Showing with 381 additions and 18 deletions.
  1. +1 −0 .npmignore
  2. +7 −0 CHANGELOG.md
  3. +3 −3 TESTING.md
  4. +246 −4 lib/aws.js
  5. +80 −8 lib/internals.js
  6. +3 −3 package.json
  7. +37 −0 tests/s3-multipart-upload.js
  8. +4 −0 tools/createtemp.sh
View
1 .npmignore
@@ -1,2 +1,3 @@
/.project
/.settings
+/node_modules
View
7 CHANGELOG.md
@@ -1,3 +1,10 @@
+## v0.6.4
+ * s3.post() low level method for initiating / completing a multipart upload.
+ * The file Request Body Handler now supports byte ranges in order to upload the parts of a multipart upload.
+ * Removes some old 0.5 cruft. Requests to S3 with query parameters that didn't contained paths were not signed properly.
+ * S3 multipart upload API: initUpload(), abortUpload, completeUpload(), putFilePart(), putStreamPart(), putBufferPart(), putFileMultipart().
+ * Disables node 0.6.9 from the supported version due to [#2636](https://github.com/joyent/node/issues/2636).
+
## v0.6.3
* Uses the idea from [#16](https://github.com/livelycode/aws-lib/pull/16) of aws-lib in order to make the query APIs request signing more stable.
* Added Amazon STS (Security Token Service) support.
View
6 TESTING.md
@@ -1,6 +1,6 @@
## Requirements
- * $AWS_ACCEESS_KEY_ID and $AWS_SECRET_ACCESS_KEY environment variables which target to proper accessKeyId and secretAccessKey credentials. These are used for all the tests, therefore if the credentials are from a IAM user, they must be permissive enough in order to succesfully run all the tests.
- * $AWS2JS_S3_BUCKET environment variable which indicates the S3 bucket where the tests should upload their contents.
- * A 3rd party HTTP client implementation: [http-get](https://github.com/SaltwaterC/http-get). Install by running `npm install http-get`.
+ * AWS_ACCEESS_KEY_ID and AWS_SECRET_ACCESS_KEY environment variables which target to proper accessKeyId and secretAccessKey credentials. These are used for all the tests, therefore if the credentials are from a IAM user, they must be permissive enough in order to succesfully run all the tests.
+ * AWS2JS_S3_BUCKET environment variable which indicates the S3 bucket where the tests should upload their contents.
+ * The following *nix utils: dd, md5sum, cut with the existence of the /dev/urandom for the putFileMultipart test.
* The tests directory must be writable by the system user that runs the tests.
View
250 lib/aws.js
@@ -310,6 +310,36 @@ var client = function (config) {
}
);
};
+ /**
+ * Wraps the POST requests to the S3 API for the multipart initiate / complete operations
+ * @param path
+ * @param headers
+ * @param body
+ * @param callback
+ */
+ config.post = function (path, headers, body, callback) {
+ internals.checkConfig(config);
+ path = internals.checkPath(path, callback);
+ if ( ! path) {
+ return;
+ }
+ if (body.file) {
+ body.file = p.resolve(body.file);
+ }
+ internals.standardHeaders(config, 'POST', headers, path, body, function (err, hdrs) {
+ if (err) {
+ callback(err);
+ } else {
+ hdrs.expect = '100-continue';
+ internals.makeRequest(config, {
+ method: 'POST',
+ path: path,
+ headers: hdrs
+ }, body, 'xml', callback);
+ }
+ }
+ );
+ };
/* the S3 helpers */
/**
* Sets the bucket name
@@ -386,10 +416,10 @@ var client = function (config) {
file = p.resolve(file);
headers = internals.normalizeHeaders(headers);
headers = internals.checkAcl(acl, headers, callback);
- headers.expect = '100-continue';
if ( ! headers) {
return;
}
+ headers.expect = '100-continue';
var md5 = crypto.createHash('md5');
var bf = fs.ReadStream(file);
bf.on('data', function (chunk) {
@@ -421,10 +451,10 @@ var client = function (config) {
config.putStream = function (path, stream, acl, headers, callback) {
headers = internals.normalizeHeaders(headers);
headers = internals.checkAcl(acl, headers, callback);
- headers.expect = '100-continue';
if ( ! headers) {
return;
}
+ headers.expect = '100-continue';
config.put(path, headers, stream, callback);
};
/**
@@ -438,10 +468,10 @@ var client = function (config) {
config.putBuffer = function (path, buffer, acl, headers, callback) {
headers = internals.normalizeHeaders(headers);
headers = internals.checkAcl(acl, headers, callback);
- headers.expect = '100-continue';
if ( ! headers) {
return;
}
+ headers.expect = '100-continue';
if ( ! headers['content-md5']) {
var md5 = crypto.createHash('md5');
md5.update(buffer);
@@ -523,7 +553,219 @@ var client = function (config) {
});
};
/**
- * Exposes the escapePath() helper
+ * Initiates a multipart upload
+ * @param path
+ * @param acl
+ * @param headers
+ * @param callback
+ */
+ config.initUpload = function (path, acl, headers, callback) {
+ headers = internals.normalizeHeaders(headers);
+ headers = internals.checkAcl(acl, headers, callback);
+ if ( ! headers) {
+ return;
+ }
+ config.post(path + '?uploads', headers, '', function (err, res) {
+ if (err) {
+ callback(err);
+ } else {
+ callback(null, {bucket: res.Bucket, key: res.Key, uploadId: res.UploadId});
+ }
+ });
+ };
+ /**
+ * Aborts a multipart upload
+ * @param path
+ * @param uploadId
+ * @param callback
+ */
+ config.abortUpload = function (path, uploadId, callback) {
+ config.del(path + '?uploadId=' + uploadId, callback);
+ };
+ /**
+ * Completes a multipart upload
+ * @param path
+ * @param uploadId
+ * @param uploadParts
+ * @param callback
+ */
+ config.completeUpload = function (path, uploadId, uploadParts, callback) {
+ var xml = '<CompleteMultipartUpload>';
+ for (var i in uploadParts) {
+ xml += '<Part><PartNumber>' + i + '</PartNumber><ETag>' + uploadParts[i] + '</ETag></Part>';
+ }
+ xml += '</CompleteMultipartUpload>';
+ config.post(path + '?uploadId=' + uploadId, {}, xml, callback);
+ };
+ /**
+ * Uploades a file part of a multipart upload
+ * @param path
+ * @param partNumber
+ * @param uploadId
+ * @param fileHandler
+ * @param callback
+ */
+ config.putFilePart = function (path, partNumber, uploadId, fileHandler, callback) {
+ fileHandler.file = p.resolve(fileHandler.file);
+ fileHandler.options = internals.filterByteRange(fileHandler.options, false, callback);
+ if ( ! fileHandler.options) {
+ return;
+ }
+ var md5 = crypto.createHash('md5');
+ var bf = fs.ReadStream(fileHandler.file, fileHandler.options);
+ bf.on('data', function (chunk) {
+ md5.update(chunk);
+ });
+ bf.on('end', function () {
+ var headers = {expect: '100-continue'};
+ headers['content-md5'] = md5.digest('base64');
+ config.put(path + '?partNumber=' + partNumber + '&uploadId=' + uploadId, headers, fileHandler, function (err, res) {
+ if (err) {
+ err.partNumber = partNumber;
+ callback(err)
+ } else {
+ callback(null, {
+ partNumber: partNumber,
+ ETag: res.etag
+ });
+ }
+ });
+ });
+ bf.on('error', function (error) {
+ callback(error);
+ });
+ };
+ /**
+ * Uploades a stream part of a multipart upload
+ * @param path
+ * @param partNumber
+ * @param uploadId
+ * @param stream
+ * @param headers
+ * @param callback
+ */
+ config.putStreamPart = function (path, partNumber, uploadId, stream, headers, callback) {
+ headers.expect = '100-continue';
+ config.put(path + '?partNumber=' + partNumber + '&uploadId=' + uploadId, headers, stream, function (err, res) {
+ if (err) {
+ err.partNumber = partNumber;
+ callback(err);
+ } else {
+ callback(null, {
+ partNumber: partNumber,
+ ETag: res.etag
+ });
+ }
+ });
+ };
+ /**
+ * Uploades a buffer part of a multipart upload
+ * @param path
+ * @param partNumber
+ * @param uploadId
+ * @param buffer
+ * @param callback
+ */
+ config.putBufferPart = function (path, partNumber, uploadId, buffer, callback) {
+ var headers = {expect: '100-continue'};
+ var md5 = crypto.createHash('md5');
+ md5.update(buffer);
+ headers['content-md5'] = md5.digest('base64');
+ config.put(path + '?partNumber=' + partNumber + '&uploadId=' + uploadId, headers, buffer, function (err, res) {
+ if (err) {
+ err.partNumber = partNumber;
+ callback(err);
+ } else {
+ callback(null, {
+ partNumber: partNumber,
+ ETag: res.etag
+ });
+ }
+ });
+ };
+ /**
+ * Uploads a file by using the S3 multipart upload API
+ * @param path
+ * @param file
+ * @param acl
+ * @param headers
+ * @param partSize
+ * @param callback
+ */
+ config.putFileMultipart = function (path, file, acl, headers, partSize, callback) {
+ if (partSize < 5242880) {
+ throw new Error('The part size must be at least 5242880 bytes.');
+ }
+ file = p.resolve(file);
+ fs.stat(file, function (err, res) {
+ if (err) {
+ callback(err);
+ } else {
+ var size = res.size;
+ if (size <= 5242880) { // fallback to s3.putFile()
+ config.putFile(path, file, acl, headers, callback);
+ } else { // multipart upload
+ config.initUpload(path, acl, headers, function (err, res) {
+ if (err) {
+ callback(err);
+ } else {
+ var uploadId = res.uploadId;
+ var count = Math.ceil(size / partSize);
+ var errors = [];
+ var aborted = false;
+ var uploadParts = [];
+ var finished = 0;
+ var putFilePart = function (path, partNumber, uploadId, fileHandler, callback) {
+ if ( ! aborted) {
+ config.putFilePart(path, partNumber, uploadId, fileHandler, function (err, res) {
+ if ( ! aborted) {
+ if (err) {
+ errors[partNumber]++;
+ if (errors[partNumber] == 10) {
+ aborted = true;
+ config.abortUpload(path, uploadId, function (err, res) {
+ if ( ! err) {
+ err = new Error('Part ' + partNumber + ' failed the upload 10 times. Aborting the multipart upload.');
+ err.partNumber = partNumber;
+ } else {
+ err.partNumber = partNumber;
+ }
+ callback(err);
+ });
+ } else {
+ setTimeout(function () {
+ putFilePart(path, partNumber, uploadId, fileHandler, callback);
+ }, 500 * errors[partNumber]);
+ }
+ } else {
+ uploadParts[res.partNumber] = res.ETag;
+ finished++;
+ if (finished == count) {
+ config.completeUpload(path, uploadId, uploadParts, callback);
+ }
+ }
+ }
+ });
+ }
+ };
+ for (var partNumber = 1; partNumber <= count; partNumber++) {
+ errors[partNumber] = 0;
+ putFilePart(path, partNumber, uploadId, {
+ file: file,
+ options: {
+ start: (partNumber - 1) * partSize,
+ end: partNumber * partSize - 1
+ }
+ }, callback);
+ }
+ }
+ });
+ }
+ }
+ });
+ };
+ /**
+ * Exposes the deprecated escapePath() helper
* @param path
* @return string
*/
View
88 lib/internals.js
@@ -12,7 +12,7 @@ var Stream = require('stream').Stream;
/* the internally used modules */
require('./Object.watch.js');
-require('./Buffer.toByteArray.js')
+require('./Buffer.toByteArray.js');
var cfg = require('./config.js');
var tools = require('./tools.js');
@@ -31,6 +31,7 @@ var checkConfig = function (config) {
}
};
exports.checkConfig = checkConfig;
+// TODO: break it in smaller pieces since it's almost 300 LOC
/**
* The muscle of this library aka the function that makes all the request - response machinery
* @param config
@@ -65,14 +66,21 @@ var makeRequest = function (config, options, body, handler, callback, requestId)
if ( ! body.file) {
throw new Error('Invalid body handler, expecting a file path.');
}
+ if (body.options) {
+ body.options = filterByteRange(body.options, options.headers['content-length'], callback);
+ if ( ! body.options) {
+ return;
+ }
+ options.headers['content-length'] = body.options.length;
+ delete(body.options.length);
+ }
}
break;
default:
throw new Error('Invalid request body handler. Expecting a String or Object.');
break;
}
}
- var data = [];
if (handler != 'xml' && handler != 'buffer' && handler != 'stream' && handler != 'json' && handler != null) {
try {
if ( ! handler.file) {
@@ -113,6 +121,7 @@ var makeRequest = function (config, options, body, handler, callback, requestId)
}
}
var aborted = false;
+ var data = [];
var request = https.request(options, function (response) {
var parseXml = function (data) {
libxml2js(new Buffer(data).toString(), function (error, result) {
@@ -263,12 +272,25 @@ var makeRequest = function (config, options, body, handler, callback, requestId)
request.end();
return;
}
- if (body instanceof Stream) {
- body.pipe(request);
+ if (body instanceof Stream) { // some requests don't like body.pipe(request). bummer.
+ body.on('data', function (chunk) {
+ if ( ! aborted) {
+ request.write(chunk);
+ }
+ });
+ body.on('end', function () {
+ if ( ! aborted) {
+ request.end();
+ }
+ })
return;
}
if ( ! haveBodyHandler) {
- var bfile = fs.ReadStream(body.file);
+ if (body.options) {
+ var bfile = fs.ReadStream(body.file, body.options);
+ } else {
+ var bfile = fs.ReadStream(body.file);
+ }
bfile.on('data', function (chunk) {
if ( ! aborted) {
request.write(chunk);
@@ -292,6 +314,59 @@ var makeRequest = function (config, options, body, handler, callback, requestId)
};
exports.makeRequest = makeRequest;
/**
+ * Filters the byte range for a ReadStream
+ * @param range
+ * @param callback
+ * @return object / bool
+ */
+var filterByteRange = function (range, length, callback) {
+ if ( ! isNaN(Number(range.start))) {
+ if (range.start < 0) {
+ range.start = 0;
+ }
+ if (length) {
+ if (range.start >= length) {
+ var err = new Error('The start value of a byte range can\'t be more that the file length.');
+ callback(err);
+ return false;
+ }
+ }
+ }
+ if ( ! isNaN(Number(range.end))) {
+ if (length) {
+ if (range.end > length - 1) {
+ range.end = length - 1;
+ }
+ }
+ if (range.end < 0) {
+ var err = new Error('The end value of a byte range can\'t be less than 0.');
+ callback(err);
+ return false;
+ }
+ }
+ if (Number(range.end) > 0 && isNaN(Number(range.start))) {
+ range.start = 0; // the node.js upstream is dumb about this ...
+ }
+ if (Number(range.start) >= 0 && isNaN(Number(range.end))) {
+ range.end = length - 1;
+ }
+ if (range.start > range.end) {
+ var err = new Error('The start value of a byte range must be lower than the end value.');
+ callback(err);
+ return false;
+ }
+ if ( ! isNaN(range.start) && ! isNaN(range.end)) {
+ if (length) {
+ range.length = range.end - range.start + 1;
+ }
+ } else {
+ delete(range.start);
+ delete(range.end);
+ }
+ return range;
+};
+exports.filterByteRange = filterByteRange;
+/**
* Creates HMAC signatures for signing the requests as required by the AWS APIs
* @param secretAccessKey
* @param toSign
@@ -395,9 +470,6 @@ var authorize = function (config, method, headers, path) {
for (var key in sorted) {
toSign += key + ':' + sorted[key] + '\n';
}
- if (path.substring(0, 2) == '/?') {
- path = '/';
- }
if (config.useBucket) {
path = '/' + config.useBucket + path;
}
View
6 package.json
@@ -1,15 +1,15 @@
{
"name": "aws2js",
"main": "./lib/aws.js",
- "version": "0.6.3",
+ "version": "0.6.4",
"description": "AWS (Amazon Web Services) APIs client implementation for node.js",
"dependencies": {
"libxmljs": ">=0.5.0",
"libxml-to-js": ">=0.3.9",
- "mime-magic": ">=0.2.3"
+ "mime-magic": ">=0.2.4"
},
"engines": {
- "node": ">=0.4.10"
+ "node": ">=0.4.10 <=0.6.8"
},
"homepage": "https://github.com/SaltwaterC/aws2js",
"author": {
View
37 tests/s3-multipart-upload.js
@@ -0,0 +1,37 @@
+var cp = require('child_process');
+var assert = require('assert');
+var crypto = require('crypto');
+var fs = require('fs');
+var s3 = require('../').load('s3');
+
+s3.setCredentials(process.env.AWS_ACCEESS_KEY_ID, process.env.AWS_SECRET_ACCESS_KEY);
+s3.setBucket(process.env.AWS2JS_S3_BUCKET);
+
+cp.execFile('../tools/createtemp.sh', function (err, res) {
+ assert.ifError(err);
+ var tempMd5 = res.replace(/\s/g, '');
+ s3.putFileMultipart('10M.tmp', './10M.tmp', false, {}, 5242880, function (err, res) {
+ assert.ifError(err);
+ s3.get('10M.tmp', {file: './10M.tmp'}, function (err, res) {
+ assert.ifError(err);
+ var md5 = crypto.createHash('md5');
+ var rs = fs.ReadStream('./10M.tmp');
+ rs.on('data', function (data) {
+ md5.update(data);
+ });
+ rs.on('end', function () {
+ var dlMd5 = md5.digest('hex');
+ assert.deepEqual(tempMd5, dlMd5);
+ fs.unlink('./10M.tmp', function (err) {
+ assert.ifError(err);
+ s3.del('10M.tmp', function (err, res) {
+ assert.ifError(err);
+ });
+ });
+ });
+ rs.on('error', function (err) {
+ assert.ifError(err);
+ });
+ });
+ });
+});
View
4 tools/createtemp.sh
@@ -0,0 +1,4 @@
+#!/bin/sh
+
+dd if=/dev/urandom of=10M.tmp bs=1M count=10 2>/dev/null
+md5sum 10M.tmp | cut -d' ' -f1

0 comments on commit 4612664

Please sign in to comment.