Skip to content

Commit

Permalink
Throttling added when doing recursive stuff
Browse files Browse the repository at this point in the history
  • Loading branch information
janjongboom committed Jan 25, 2012
1 parent aad02e6 commit 5c50e68
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 1 deletion.
2 changes: 1 addition & 1 deletion janzip/janzip.js
Expand Up @@ -4,7 +4,7 @@
* (c) Jan Jongboom, 2011
*/
var RollingBuffer = require("./rollingbuffer");
var fs = require("fs");
var fs = require("./throttlefs");

var Zip = function () {
var _self = this;
Expand Down
116 changes: 116 additions & 0 deletions janzip/throttlefs.js
@@ -0,0 +1,116 @@
/**
* File system throttling
*
* So when you open alot of files the filesystem will just say 'screw you' and stop working.
* With this (100% fs compatible) module, you will have auto throttling, so we'll have 20 file handles max.
*/
var fs = require("fs");

// number of active handles, plus a queue
var active = 0, processed = 0, scheduled = 0, queue = [];
var threshold = 20;

/**
* This is the function intercepter.
* It gets 'fs' functions, wraps them into throttle code and either executes or queues them
*/
function wrapper (target, funcName, args) {
var callback = args.length && args[args.length - 1];
var self = this;

if (callback && typeof callback === "function") {
scheduled++;

// finished handler
args[args.length - 1] = function () {
processed++;
active--;
callback.apply(this, arguments);
};

// kickoff code
var kickoff = function () {
active++;
target[funcName].apply(self, args);
};

// depending on the number of active items, we either execute it or schedule
queue.push(kickoff);
onItemAdded();
}
// cant find a callback function? then just execute
else {
return target[funcName].apply(this, args);
}
}

// ===============================
// extra methods outside fs
/**
* Throttled file copy
*/
var augment = {
copyFile: function (src, target, callback) {
var srcFile = fs.createReadStream(src);
var targetFile = fs.createWriteStream(target);

targetFile.on("close", function () {
callback(null, src);
});

srcFile.pipe(targetFile);
}
};

// copy all the fs methods
Object.keys(fs).filter(function (m) { return typeof fs[m] === "function"; }).forEach(function (func) {
module.exports[func] = function () {
return wrapper(fs, func, arguments);
};
});

Object.keys(augment).forEach(function (func) {
module.exports[func] = function () {
return wrapper(augment, func, arguments);
};
});

var monitor = 0;

/**
* When an item is added to the queue we'll create a monitoring service
*/
var onItemAdded = function () {
if (!monitor) {
monitor = setInterval(intervalFn, 20);
}
};

/**
* Function that we'll run every X ms.
*/
var intervalFn = function () {
var toProcess = threshold - active;
var ix = 0, item = null;

if (processed === scheduled) {
clearInterval(monitor);
monitor = 0;
}
else {
// can use this for debugging purposes
// console.log("queue length", queue.length, "empty counter", queueEmptyCounter, "processed", processed, "scheduled", scheduled);
}

while (++ix <= toProcess && (item = queue.shift())) {
item();
}
};

// some test code
//module.exports.readFile("./folder.js", "utf8", function (err, data) {
// console.log(err, !!data);
//});
//module.exports.readFile("./example.js", "utf8", function (err, data) {
// console.log(err, !!data);
//});
1 change: 1 addition & 0 deletions throttle-fs.js
@@ -0,0 +1 @@
module.exports = require("./janzip/throttlefs");

0 comments on commit 5c50e68

Please sign in to comment.