Skip to content
Browse files

Initial commit

  • Loading branch information...
1 parent 85d70e2 commit 0627f9bc6bbef28f9b71295506e8c3041c3b262a @nathanoehlman committed
Showing with 514 additions and 4 deletions.
  1. +2 −1 .gitignore
  2. +88 −3 README.md
  3. +1 −0 index.js
  4. +250 −0 lib/multipartupload.js
  5. +27 −0 lib/parse.js
  6. +25 −0 package.json
  7. +121 −0 test/multipart.js
View
3 .gitignore
@@ -12,4 +12,5 @@ logs
results
node_modules
-npm-debug.log
+npm-debug.log
+test/auth.json
View
91 README.md
@@ -1,4 +1,89 @@
-knox-mpu
-========
+## knox-mpu
-Multi part upload for Amazon S3 using Knox
+A Node.js client designed to make large file uploads to Amazon S3 via the [MultiPartUpload API](http://docs.amazonwebservices.com/AmazonS3/latest/dev/sdksupportformpu.html) simple and easy. It's built on top of the excellent [Knox](https://github.com/LearnBoost/knox) library from the guys over at LearnBoost.
+
+### Features
+
+* Simple and easy to use
+* Pipe either a file, or a stream directly to S3 (No need to know the content length first!)
+* Automatically separates a file/stream into appropriate sized segments for upload
+* Asynchronous uploading of segments
+* Handy events to track your upload progress
+
+_Planned_
+
+* Better error handling (reuploading failed parts, etc)
+
+### Installing
+
+Installation is done via NPM, by running ```npm install knox-mpu```
+
+### Examples
+
+#### Uploading a stream
+
+To upload a stream, simply pass the stream when constructing the MultiPartUpload. The upload will then listen to the stream, and create parts from incoming data stream. When a part reaches the minimum part size, it will attempt to upload it to S3.
+
+```javascript
+
+// Create a Knox client first
+var client = knox.createClient({ ... }),
+ upload = null;
+
+
+upload = new MultiPartUpload(
+ {
+ client: client,
+ objectName: 'destination.txt', // Amazon S3 object name
+ stream: stream
+ },
+ // Callback handler
+ function(err, res) {
+ // If successful, will return a JSON object containing Location, Bucket, Key and ETag of the object
+ }
+ );
+````
+
+#### Uploading a file
+
+To upload a file, pass the path to the file in the constructor. Knox-mpu will split the file into parts and upload them.
+
+```javascript
+
+// Create a Knox client first
+var client = knox.createClient({ ... }),
+ upload = null;
+
+
+upload = new MultiPartUpload(
+ {
+ client: client,
+ objectName: 'destination.txt', // Amazon S3 object name
+ file: ... // path to the file
+ },
+ // Callback handler
+ function(err, res) {
+ // If successful, will return a JSON object containing Location, Bucket, Key and ETag of the object
+ }
+ );
+````
+### Options
+
+The following options can be passed to the MultiPartUpload constructor -
+
+* ```client``` _Required_ The knox client to use for this upload request
+* ```objectName``` _Required_ The destination object name/path on S3 for this upload
+* ```stream``` The stream to upload (required if file is not being supplied)
+* ```file``` The path to the file (required if stream is not being supplied)
+* ```headers``` Any additional headers to include on the requests
+* ```partSize``` The minimum size of the parts to upload (default to 5MB).
+
+### Events
+
+The MultiPartUpload will emit a number of events -
+
+* ```initiated``` Emitted when the multi part upload has been initiated, and received an upload ID. Passes the upload id through as the first argument to the event
+* ```uploading``` Emitted each time a part starts uploading. The part id is passed as the first argument.
+* ```uploaded``` Emitted each time a part finishes uploading. Passes through an object containing the part id and Amazon ETag for the uploaded part.
+* ```error``` Emitted each time a part upload fails. Passes an object containing the part id and error message
+* ```completed``` Emitted when the upload has completed successfully. Contains the object information from Amazon S3 (location, bucket, key and ETag)
View
1 index.js
@@ -0,0 +1 @@
+module.exports = require('./lib/multipartupload');
View
250 lib/multipartupload.js
@@ -0,0 +1,250 @@
+var _ = require('lodash'),
+ EventEmitter = require('events').EventEmitter,
+ Batch = require('batch'),
+ fs = require('fs'),
+ path = require('path'),
+ os = require('os'),
+ util = require('util'),
+ parse = require('./parse');
+
+/**
+ * Initializes a Amazon S3 Multi part file upload with the given options
+ */
+function MultiPartUpload(opts, callback) {
+ if (!opts.client || !opts.objectName) {
+ throw new Error('MultiPart upload must be created from a client and provide a object name');
+ }
+
+ if (!opts.stream && !opts.file) {
+ throw new Error('MultiPart upload must be passed either a stream or file parameter');
+ }
+
+ if (opts.stream && opts.file) {
+ throw new Error('You cannot provide both a stream and a file to upload');
+ }
+
+ this.objectName = opts.objectName;
+ this.headers = opts.headers || {};
+ this.client = opts.client;
+ this.partSize = opts.partSize || 5242880; // 5MB default
+ this.uploadId = null;
+ this.uploads = new Batch();
+
+ if (opts.stream) {
+ this._putStream(opts.stream, callback);
+ } else {
+ this._putFile(opts.file, callback);
+ }
+
+}
+util.inherits(MultiPartUpload, EventEmitter);
+
+/**
+ * Attempts to initiate the MultiPartUpload request (gets the upload ID)
+ */
+MultiPartUpload.prototype._initiate = function(callback) {
+ // Send the initiate request
+ var req = this.client.request('POST', this.objectName + '?uploads', this.headers),
+ mpu = this;
+
+ // Handle the xml response
+ parse.xmlResponse(req, function(err, body) {
+
+ if (err) return callback(err);
+ if (!body.UploadId) return callback('Invalid upload ID');
+
+ mpu.uploadId = body.UploadId;
+ mpu.emit('initiated', body.UploadId);
+ return callback(null, body.UploadId);
+ });
+
+ req.end();
+}
+
+/**
+ * Streams a file to S3 using a multipart form upload
+ *
+ * Divides the file into separate files, and then writes them to Amazon S3
+ */
+MultiPartUpload.prototype._putFile = function(file, callback) {
+ if (!file) return callback('Invalid file');
+
+ var mpu = this;
+
+ fs.exists(file, function(exists) {
+ if (!exists) {
+ return callback('File does not exist');
+ }
+
+ var stream = fs.createReadStream(file);
+ mpu._putStream(stream, callback);
+ });
+}
+
+/**
+ * Streams a stream to S3 using a multipart form upload.
+ *
+ * It will attempt to initialize the upload (if not already started), read the stream in,
+ * write the stream to a temporary file of the given partSize, and then start uploading a part
+ * each time a part is available
+ */
+MultiPartUpload.prototype._putStream = function(stream, callback) {
+
+ if (!stream) return callback('Invalid stream');
+
+ var mpu = this;
+
+ if (!this.uploadId) {
+ this._initiate(function(err, uploadId) {
+ if (err || !uploadId) return callback('Unable to initiate stream upload');
+ });
+ }
+ // Start handling the stream straight away
+ mpu._handleStream(stream, callback);
+}
+
+/**
+ Handles an incoming stream, divides it into parts, and uploads it to S3
+ **/
+MultiPartUpload.prototype._handleStream = function(stream, callback) {
+
+ var mpu = this,
+ parts = [],
+ current;
+
+ // Create a new part
+ function newPart() {
+ var partId = parts.length + 1,
+ partFileName = path.resolve(path.join(os.tmpDir(), 'mpu-' + (mpu.uploadId || Date.now()) + '-' + partId)),
+ partFile = fs.createWriteStream(partFileName),
+ part = {
+ id: partId,
+ stream: partFile,
+ fileName: partFileName,
+ length: 0
+ };
+
+ parts.push(part);
+ return part;
+ }
+
+ function partReady(part) {
+ if (!part) return
+
+ // Ensure the stream is closed
+ if (part.stream.writable) {
+ part.stream.end();
+ }
+ mpu.uploads.push(mpu._uploadPart.bind(mpu, part));
+ }
+
+ // Handle the data coming in
+ stream.on('data', function(buffer) {
+ if (!current) {
+ current = newPart();
+ }
+
+ current.stream.write(buffer);
+ current.length += buffer.length;
+
+ // Check if we have a part
+ if (current.length >= mpu.partSize) {
+ partReady(current);
+ current = null;
+ }
+ });
+
+ // Handle the end of the stream
+ stream.on('end', function() {
+ if (current) {
+ partReady(current);
+ }
+
+ // Wait for the completion of the uploads
+ return mpu._completeUploads(callback);
+ });
+
+ // Handle errors
+ stream.on('error', function(err) {
+ // Clean up
+ return callback(err);
+ });
+}
+
+/**
+ Uploads a part, or if we are not ready yet, waits for the upload to be initiated
+ and will then upload
+ **/
+MultiPartUpload.prototype._uploadPart = function(part, callback) {
+
+ // If we haven't started the upload yet, wait for the initialization
+ if (!this.uploadId) {
+ return this.on('initiated', this._uploadPart.bind(this, part, callback));
+ }
+
+ var url = this.objectName + '?partNumber=' + part.id + '&uploadId=' + this.uploadId,
+ headers = { 'Content-Length': part.length },
+ req = this.client.request('PUT', url, headers),
+ partStream = fs.createReadStream(part.fileName),
+ mpu = this;
+
+ // Wait for the upload to complete
+ req.on('response', function(res) {
+ if (res.statusCode != 200) return callback({part: part.id, message: 'Upload failed'});
+
+ // Grab the etag and return it
+ var etag = res.headers['etag'],
+ result = {part: part.id, etag: etag};
+
+ mpu.emit('uploaded', result);
+
+ // Remove the temporary file
+ fs.unlink(part.fileName, function(err) {
+ return callback(err, result);
+ });
+ });
+
+ // Handle errors
+ req.on('error', function(err) {
+ var result = {part: part.id, message: err};
+ mpu.emit('failed', result);
+ return callback(result);
+ });
+
+ partStream.pipe(req);
+ mpu.emit('uploading', part.id);
+}
+
+/**
+ Indicates that all uploads have been started and that we should wait for completion
+ **/
+MultiPartUpload.prototype._completeUploads = function(callback) {
+
+ var mpu = this;
+
+ this.uploads.end(function(err, results) {
+
+ if (err) return callback(err);
+
+ var parts, body;
+ parts = _.map(results, function(value) {
+ return util.format('<Part><PartNumber>%d</PartNumber><ETag>%s</ETag></Part>', value.part, value.etag);
+ }).join('');
+
+ var req = mpu.client.request('POST', mpu.objectName + '?uploadId=' + mpu.uploadId);
+
+ // Register the response handler
+ parse.xmlResponse(req, function(err, body) {
+ if (err) return callback(err);
+ delete body.$;
+ mpu.emit('completed', body);
+ return callback(null, body);
+ });
+
+ // Write the request
+ req.write('<CompleteMultipartUpload>' + parts + '</CompleteMultipartUpload>');
+ req.end();
+ });
+}
+
+module.exports = MultiPartUpload;
View
27 lib/parse.js
@@ -0,0 +1,27 @@
+var xml2js = require('xml2js');
+
+/**
+ Simple helper method to handle XML responses
+ **/
+exports.xmlResponse = xmlResponse = function(req, callback) {
+
+ if (!req) return callback('Invalid request');
+
+ // Handle the response
+ req.on('response', function(res) {
+ var body = '';
+
+ res.on('data', function(chunk){
+ body += chunk;
+ });
+
+ res.on('end', function(){
+ var parser = new xml2js.Parser({explicitArray: false, explicitRoot: false});
+ parser.parseString(body, callback);
+ });
+
+ res.on('error', callback);
+ });
+
+ req.on('error', callback);
+}
View
25 package.json
@@ -0,0 +1,25 @@
+{
+ "name": "knox-mpu",
+ "version": "0.0.1",
+ "description": "Provide multi part upload functionality to Amazon S3 using the knox library",
+ "keywords": ["aws", "amazon", "s3", "knox", "multi", "part", "upload"],
+ "main": "index.js",
+ "scripts": {
+ "test": "mocha --reporter spec -t 0"
+ },
+ "repository": {
+ "type": "git",
+ "url": "git://github.com/nathanoehlman/knox-mpu.git"
+ },
+ "author": "Nathan Oehlman",
+ "license": "BSD",
+ "dependencies": {
+ "batch": "~0.2.0",
+ "knox": "~0.4.0",
+ "xml2js": "~0.2.0",
+ "lodash": "~0.9.0"
+ },
+ "devDependencies": {
+ "mockstream": "0.0.0"
+ }
+}
View
121 test/multipart.js
@@ -0,0 +1,121 @@
+var assert = require('assert'),
+ fs = require('fs'),
+ knox = require('knox'),
+ os = require('os'),
+ path = require('path'),
+ MultiPartUpload = require('..'),
+ mockstream = require('mockstream');
+
+describe('Knox multipart form uploads', function() {
+
+ var client = null;
+
+ before(function(done) {
+ try {
+ var auth = require('./auth.json');
+ client = knox.createClient(auth);
+ done();
+ } catch (err) {
+ done('Could not create Knox client - please provide an ./auth.json file');
+ }
+ });
+
+ it('should be able to pipe a stream directly to Amazon S3 using the multi part upload', function(done) {
+ var testLength = 7242880,
+ chunkSize = 2048,
+ stream = new mockstream.MockDataStream({chunkSize: chunkSize, streamLength: testLength}),
+ opts = {
+ client: client, objectName: Date.now() + '.txt', stream: stream
+ },
+ mpu = null;
+
+ // Upload the file
+ mpu = new MultiPartUpload(opts, function(err, body) {
+ if (err) return done(err);
+ assert.equal(body['Key'], opts.objectName);
+
+ // Clean up after ourselves
+ client.deleteFile(opts.objectName, function(err, res) {
+ if (err) return done('Could not delete file [' + err + ']');
+ return done();
+ });
+
+ });
+
+ stream.start();
+ });
+
+ it('should be able to upload a small file to S3', function(done) {
+
+ var testLength = 242880,
+ chunkSize = 2048,
+ stream = new mockstream.MockDataStream({chunkSize: chunkSize, streamLength: testLength}),
+ opts = {
+ client: client, objectName: Date.now() + '.txt', stream: stream
+ },
+ mpu = null;
+
+ // Upload the file
+ mpu = new MultiPartUpload(opts, function(err, body) {
+ if (err) return done(err);
+ console.log(body);
+ assert.equal(body['Key'], opts.objectName);
+
+ // Clean up after ourselves
+ client.deleteFile(opts.objectName, function(err, res) {
+ if (err) return done('Could not delete file [' + err + ']');
+ return done();
+ });
+
+ });
+
+ stream.start();
+
+ });
+
+ it('should be able to upload a file to S3', function(done) {
+
+ // Create a temporary file of data for uploading
+ var tempFile = path.resolve(path.join(os.tmpDir(), 'knoxmpu-file-upload-test.txt')),
+ writeStream = fs.createWriteStream(tempFile),
+ mockDataStream = new mockstream.MockDataStream({chunkSize: 2048, streamLength: 6242880});
+
+ mockDataStream.on('data', function(chunk) {
+ writeStream.write(chunk);
+ });
+
+ mockDataStream.on('end', function() {
+ writeStream.end();
+ });
+
+ writeStream.on('error', done);
+ mockDataStream.start();
+
+ // Upload the file once we have a temporary file
+ writeStream.on('close', function() {
+
+ // Upload the file
+ var opts = {
+ client: client, objectName: Date.now() + '.txt', file: tempFile
+ },
+ mpu = null;
+
+ // Upload the file
+ mpu = new MultiPartUpload(opts, function(err, body) {
+ console.log(err);
+ if (err) return done(err);
+ assert.equal(body['Key'], opts.objectName);
+
+ // Clean up after ourselves
+ client.deleteFile(opts.objectName, function(err, res) {
+ fs.unlink(tempFile, function(err2) {
+ return done((err || err2) ? 'Could not clean up after test' : null);
+ });
+ });
+
+ });
+ });
+
+ });
+
+})

0 comments on commit 0627f9b

Please sign in to comment.
Something went wrong with that request. Please try again.