Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for multipart uploads #78

Merged
merged 1 commit into from
Jan 18, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions lib/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ module.exports = function (options) {
app.delete('/:bucket', controllers.bucketExists, controllers.deleteBucket);
app.put('/:bucket', controllers.putBucket);
app.put('/:bucket/:key(*)', controllers.bucketExists, controllers.putObject);
app.post('/:bucket/:key(*)', controllers.bucketExists, controllers.postObject);
app.get('/:bucket/:key(*)', controllers.bucketExists, controllers.getObject);
app.head('/:bucket/:key(*)', controllers.getObject);
app.delete('/:bucket/:key(*)', controllers.bucketExists, controllers.deleteObject);
Expand Down
168 changes: 121 additions & 47 deletions lib/controllers.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ var FileStore = require('./file-store'),
xml2js = require('xml2js'),
async = require('async'),
path = require('path'),
ReadableStream = require('stream').Readable;
ReadableStream = require('stream').Readable,
crypto = require('crypto'),
url = require('url');

module.exports = function (rootDirectory, logger, indexDocument, errorDocument, fs) {
var fileStore = new FileStore(rootDirectory, fs);
Expand Down Expand Up @@ -121,6 +123,73 @@ module.exports = function (rootDirectory, logger, indexDocument, errorDocument,
});
};

var handleCopyObject = function (key, req, res) {
var template;
var copy = req.headers['x-amz-copy-source'];
copy = copy.charAt(0) === '/' ? copy : '/' + copy;
var srcObjectParams = copy.split('/'),
srcBucket = srcObjectParams[1],
srcObject = srcObjectParams.slice(2).join('/');
fileStore.getBucket(srcBucket, function (err, bucket) {
if (err) {
logger.error('No bucket found for "%s"', srcBucket);
template = templateBuilder.buildBucketNotFound(srcBucket);
return buildXmlResponse(res, 404, template);
}
fileStore.getObject(bucket, srcObject, function (err) {
if (err) {
logger.error('Object "%s" in bucket "%s" does not exist', srcObject, bucket.name);
template = templateBuilder.buildKeyNotFound(srcObject);
return buildXmlResponse(res, 404, template);
}

var replaceMetadata = req.headers['x-amz-metadata-directive'] === 'REPLACE';
fileStore.copyObject({
request: req,
srcKey: srcObject,
srcBucket: bucket,
destBucket: req.bucket,
destKey: key,
replaceMetadata: replaceMetadata

}, function (err, object) {
if (err) {
logger.error('Error copying object "%s" from bucket "%s" into bucket "%s" with key of "%s"',
srcObject, bucket.name, req.bucket.name, key);
template = templateBuilder.buildError('InternalError',
'We encountered an internal error. Please try again.');
return buildXmlResponse(res, 500, template);
}

logger.info('Copied object "%s" from bucket "%s" into bucket "%s" with key of "%s"',
srcObject, bucket.name, req.bucket.name, key);
template = templateBuilder.buildCopyObject(object);
return buildXmlResponse(res, 200, template);
});
});
});
};

var putObjectMultipart = function (req, res) {
var partKey = req.query.uploadId + '_' + req.query.partNumber;
if (req.headers['x-amz-copy-source']) {
handleCopyObject(partKey, req, res);
} else {
fileStore.putObject(req.bucket, partKey, req, function (err, key) {
if (err) {
logger.error('Error uploading object "%s" to bucket "%s"',
partKey, req.bucket.name, err);
var template = templateBuilder.buildError('InternalError',
'We encountered an internal error. Please try again.');
return buildXmlResponse(res, 500, template);
}
logger.info('Stored object "%s" in bucket "%s" successfully', partKey, req.bucket.name);
res.header('ETag', '"' + key.md5 + '"');
return res.status(200).end();
});
}
};

/**
* The following methods correspond the S3 api. For more information visit:
* http://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/S3.html
Expand Down Expand Up @@ -267,66 +336,71 @@ module.exports = function (rootDirectory, logger, indexDocument, errorDocument,
});
},
putObject: function (req, res) {
var template;
var copy = req.headers['x-amz-copy-source'];
if (copy) {
copy = copy.charAt(0) === '/' ? copy : '/' + copy
var srcObjectParams = copy.split('/'),
srcBucket = srcObjectParams[1],
srcObject = srcObjectParams.slice(2).join('/');
fileStore.getBucket(srcBucket, function (err, bucket) {
if (req.query.uploadId) {
return putObjectMultipart(req, res);
}

if (req.headers['x-amz-copy-source']) {
handleCopyObject(req.params.key, req, res);
} else {
fileStore.putObject(req.bucket, req.params.key, req, function (err, key) {
if (err) {
logger.error('No bucket found for "%s"', srcBucket);
template = templateBuilder.buildBucketNotFound(srcBucket);
return buildXmlResponse(res, 404, template);
logger.error('Error uploading object "%s" to bucket "%s"',
req.params.key, req.bucket.name, err);
var template = templateBuilder.buildError('InternalError',
'We encountered an internal error. Please try again.');
return buildXmlResponse(res, 500, template);
}
fileStore.getObject(bucket, srcObject, function (err) {
logger.info('Stored object "%s" in bucket "%s" successfully', req.params.key, req.bucket.name);
res.header('ETag', '"' + key.md5 + '"');
return res.status(200).end();
});
}
},
postObject: function (req, res) {
if (req.query.uploads !== undefined) {
var uploadId = crypto.randomBytes(16).toString('hex');
return buildXmlResponse(res, 200, templateBuilder.buildInitiateMultipartUploadResult(req.bucket.name, req.params.key, uploadId));
} else {
var completeMultipartUploadXml = '';

req.on('data', function (data) {
completeMultipartUploadXml += data.toString('utf8');
});

req.on('end', function () {
xml2js.parseString(completeMultipartUploadXml, function (err, result) {
if (err) {
logger.error('Object "%s" in bucket "%s" does not exist', srcObject, bucket.name);
template = templateBuilder.buildKeyNotFound(srcObject);
return buildXmlResponse(res, 404, template);
logger.error('Error completing multipart upload "%s" for object "%s" in bucket "%s"',
req.query.uploadId, req.params.key, req.bucket.name, err);
var template = templateBuilder.buildError('XMLParseError', err.message);
return buildXmlResponse(res, 400, template);
}

var replaceMetadata = req.headers['x-amz-metadata-directive'] === 'REPLACE';
fileStore.copyObject({
request: req,
srcKey: srcObject,
srcBucket: bucket,
destBucket: req.bucket,
destKey: req.params.key,
replaceMetadata: replaceMetadata
var parts = result.CompleteMultipartUpload.Part.map(function (part) {
return {
number: part.PartNumber[0],
etag: part.ETag[0].replace('"', '')
};
});

}, function (err, key) {
fileStore.combineObjectParts(req.bucket, req.params.key, req.query.uploadId, parts, req, function (err, key) {
if (err) {
logger.error('Error copying object "%s" from bucket "%s" into bucket "%s" with key of "%s"',
srcObject, bucket.name, req.bucket.name, req.params.key);
template = templateBuilder.buildError('InternalError',
logger.error('Error uploading object "%s" to bucket "%s"',
req.params.key, req.bucket.name, err);
var template = templateBuilder.buildError('InternalError',
'We encountered an internal error. Please try again.');
return buildXmlResponse(res, 500, template);
}

logger.info('Copied object "%s" from bucket "%s" into bucket "%s" with key of "%s"',
srcObject, bucket.name, req.bucket.name, req.params.key);
template = templateBuilder.buildCopyObject(key);
return buildXmlResponse(res, 200, template);
logger.info('Stored object "%s" in bucket "%s" successfully', req.params.key, req.bucket.name);
var location = req.protocol + '://' + req.get('Host') + url.parse(req.originalUrl).pathname;
return buildXmlResponse(res, 200,
templateBuilder.buildCompleteMultipartUploadResult(req.bucket.name, req.params.key, location, key)
);
});
});
});
}
else {
fileStore.putObject(req.bucket, req, function (err, key) {
if (err) {
logger.error('Error uploading object "%s" to bucket "%s"',
req.params.key, req.bucket.name, err);
var template = templateBuilder.buildError('InternalError',
'We encountered an internal error. Please try again.');
return buildXmlResponse(res, 500, template);
}
logger.info('Stored object "%s" in bucket "%s" successfully', req.params.key, req.bucket.name);
res.header('ETag', '"' + key.md5 + '"');
return res.status(200).end();
});
}
},
deleteObject: function (req, res) {
var key = req.params.key;
Expand Down
57 changes: 52 additions & 5 deletions lib/file-store.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
var path = require('path'),
async = require('async'),
crypto = require('crypto'),
PassThrough = require('stream').PassThrough,
utils = require('./utils'),
_ = require('lodash');

Expand Down Expand Up @@ -280,14 +281,12 @@ var FileStore = function (rootDirectory,fs) {
});
};

var putObject = function (bucket, req, done) {
var keyName = path.join(bucket.name, req.params.key);
var putObject = function (bucket, key, req, done) {
var keyName = path.join(bucket.name, key);
var dirName = path.join(rootDirectory, keyName);
fs.mkdirpSync(dirName);
var contentFile = path.join(dirName, CONTENT_FILE);
var metaFile = path.join(dirName, METADATA_FILE);
var key = req.params.key;
key = key.substr(key.lastIndexOf('/') + 1);
var writeStream = req.pipe(fs.createWriteStream(contentFile));
writeStream.on('error', function (err) {
return done('Error writing file');
Expand Down Expand Up @@ -383,6 +382,53 @@ var FileStore = function (rootDirectory,fs) {
});
};

var concatStreams = function (passThrough, streams) {
var stream = streams.shift();
if (!stream) {
passThrough.end();
return passThrough;
}
stream.once('end', function () { concatStreams(passThrough, streams); });
stream.pipe(passThrough, { end: false });
return passThrough;
};

var combineObjectParts = function (bucket, key, uploadId, parts, req, done) {
var sortedParts = _.sortBy(parts, function (part) { return part.number; });
var partPaths = _.map(sortedParts, function (part) {
return path.resolve(getBucketPath(bucket.name), uploadId + '_' + part.number);
});
var partStreams = _.map(partPaths, function (partPath) { return fs.createReadStream(path.join(partPath, CONTENT_FILE)); });
var combinedPartsStream = concatStreams(new PassThrough(), partStreams);
var keyName = path.join(bucket.name, key);
var dirName = path.join(rootDirectory, keyName);
fs.mkdirpSync(dirName);
var contentFile = path.join(dirName, CONTENT_FILE);
var metaFile = path.join(dirName, METADATA_FILE);
var writeStream = combinedPartsStream.pipe(fs.createWriteStream(contentFile));
writeStream.on('error', function (err) {
return done(err);
});
writeStream.on('finish', function () {
writeStream.end();
_.forEach(partPaths, function (partPath) { fs.removeSync(partPath); });
createMetaData({
contentFile: contentFile,
type: req.headers['content-type'],
encoding: req.headers['content-encoding'],
disposition: req.headers['content-disposition'],
key: key,
metaFile: metaFile,
headers: req.headers
}, function (err, metaData) {
if (err) {
return done(err);
}
return done(null, new S3Object(metaData));
});
});
};

return {
getBuckets: getBuckets,
getBucket: getBucket,
Expand All @@ -393,7 +439,8 @@ var FileStore = function (rootDirectory,fs) {
putObject: putObject,
copyObject: copyObject,
getObjectExists: getObjectExists,
deleteObject: deleteObject
deleteObject: deleteObject,
combineObjectParts: combineObjectParts
};
};
module.exports = FileStore;
25 changes: 25 additions & 0 deletions lib/xml-template-builder.js
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,31 @@ var xml = function () {
header: true,
indent: ' '
});
},
buildInitiateMultipartUploadResult: function (bucket, key, uploadId) {
return jstoxml.toXML({
InitiateMultipartUploadResult: {
Bucket: bucket,
Key: key,
UploadId: uploadId
}
}, {
header: true,
indent: ' '
});
},
buildCompleteMultipartUploadResult: function (bucket, key, location, item) {
return jstoxml.toXML({
CompleteMultipartUploadResult: {
Location: location,
Bucket: bucket,
Key: key,
ETag: '"' + item.md5 + '"'
}
}, {
header: true,
indent: ' '
});
}
};
};
Expand Down
22 changes: 22 additions & 0 deletions test/test.js
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,28 @@ describe('S3rver Tests', function () {
});
});

it('should upload a managed upload <=5MB', function (done) {
var params = { Bucket: buckets[0], Key: 'multi/directory/path/multipart', Body: Buffer.alloc(5e+6) }; // 5MB
s3Client.upload(params, function (err, data) {
(/"[a-fA-F0-9]{32}"/).test(data.ETag).should.equal(true);
if (err) {
return done(err);
}
done();
});
});

it('should upload a managed upload >5MB (multipart upload)', function (done) {
var params = { Bucket: buckets[0], Key: 'multi/directory/path/multipart', Body: Buffer.alloc(2e+7) }; // 20MB
s3Client.upload(params, function (err, data) {
(/"[a-fA-F0-9]{32}"/).test(data.ETag).should.equal(true);
if (err) {
return done(err);
}
done();
});
});

it('should find a text file in a multi directory path', function (done) {
s3Client.getObject({Bucket: buckets[0], Key: 'multi/directory/path/text'}, function (err, object) {
if (err) {
Expand Down