Permalink
Browse files

delete part data once uploaded to be sure its out of memory

  • Loading branch information...
1 parent 9b419f5 commit cb7c86fb14d23a8e68d4812cb6ca7d0544ffd9f6 Aaron Silverman committed Jul 1, 2013
Showing with 15 additions and 16 deletions.
  1. +1 −1 README.md
  2. +13 −14 lib/multipartupload.js
  3. +1 −1 package.json
View
@@ -95,7 +95,7 @@ The following options can be passed to the MultiPartUpload constructor -
* ```file``` The path to the file (required if stream is not being supplied)
* ```headers``` Any additional headers to include on the requests
* ```partSize``` The minimum size of the parts to upload (default to 5MB).
-* ```maxConcurrentParts``` The maximum number of concurrent parts that can be uploading at any one time (default is 100)
+* ```batchSize``` The maximum number of concurrent parts that can be uploading at any one time (default is 4)
* ```maxUploadSize``` The maximum size of the file to upload (default inifinity). Useful if there is a stream with unknown length.
* ```noDisk``` If true, parts will be kept in-memory instead of written to temp files (default to false).
View
@@ -1,6 +1,6 @@
var _ = require('lodash'),
EventEmitter = require('events').EventEmitter,
- batches = require('batches'),
+ Batch = require('batch'),
fs = require('fs'),
path = require('path'),
os = require('os'),
@@ -34,17 +34,13 @@ function MultiPartUpload(opts, callback) {
this.client = opts.client;
this.partSize = opts.partSize || 5242880; // 5MB default
this.uploadId = null;
+ this.uploads = new Batch();
this.noDisk = opts.noDisk;
this.maxUploadSize = opts.maxUploadSize || 1/0; // infinity default
this.currentUploadSize = 0;
this.aborted = false;
- if( opts.batchSize ){
- this.uploads.concurrency(opts.batchSize);
- }
-
- this.maxConcurrentParts = opts.maxConcurrentParts || 100; // maximum number of simultaneous uploads
- this.uploads = batches.batch({concurrent: this.maxConcurrentParts, collectData: true});
+ this.uploads.concurrency(opts.batchSize ||4); // 4 simultaneous uploads by default
// initialise the tmp directory based on opts (fallback to os.tmpDir())
this.tmpDir = !this.noDisk && (opts.tmpDir || os.tmpDir());
@@ -155,7 +151,7 @@ MultiPartUpload.prototype._handleStream = function(stream, callback) {
if (part.stream && part.stream.writable) {
part.stream.end();
}
- mpu.uploads.add(mpu._uploadPart.bind(mpu, part));
+ mpu.uploads.push(mpu._uploadPart.bind(mpu, part));
}
function abortUpload(part) {
@@ -247,13 +243,13 @@ MultiPartUpload.prototype._uploadPart = function(part, callback) {
result = {part: part.id, etag: etag, size: part.length};
mpu.emit('uploaded', result);
-
- // Remove the temporary file
+ // Remove the temporary file / Clean up
if (!mpu.noDisk) {
fs.unlink(part.fileName, function(err) {
return callback(err, result);
});
} else {
+ delete part.data;
return callback(null, result);
}
@@ -280,18 +276,21 @@ MultiPartUpload.prototype._uploadPart = function(part, callback) {
req.write(part.data);
req.end();
}
+
mpu.emit('uploading', part.id);
};
/**
Indicates that all uploads have been started and that we should wait for completion
**/
-MultiPartUpload.prototype._completeUploads = function(callback) {
-
+MultiPartUpload.prototype._completeUploads = function(callback) {
+
var mpu = this;
- this.uploads.when(function(err, results) {
+ this.uploads.end(function(err, results) {
+
if (err) return callback(err);
+
var size = 0, parts;
parts = _.map(results, function(value) {
size += value.size;
@@ -322,7 +321,7 @@ MultiPartUpload.prototype._abortUploads = function(callback) {
var mpu = this;
- this.uploads.when(function(err, results) {
+ this.uploads.end(function(err, results) {
if (err) return callback(err);
View
@@ -14,7 +14,7 @@
"author": "Nathan Oehlman",
"license": "BSD",
"dependencies": {
- "batches": "0.1.x",
+ "batch": "0.5.x",
"xml2js": "0.2.x",
"lodash": "1.0.x"
},

0 comments on commit cb7c86f

Please sign in to comment.