Permalink
Browse files

Massive overhaul.

  • Loading branch information...
1 parent 56be6f2 commit 109a8cb907ab0c0c6c3d649e70f90f50dc3b6ee2 @jaredhanson committed Dec 2, 2011
View
@@ -0,0 +1,2 @@
+.DS_Store
+node_modules
View
@@ -0,0 +1,8 @@
+*.md
+.DS_Store
+.git*
+Makefile
+docs/
+examples/
+support/
+test/
View
20 LICENSE
@@ -0,0 +1,20 @@
+(The MIT License)
+
+Copyright (c) 2011 Jared Hanson
+
+Permission is hereby granted, free of charge, to any person obtaining a copy of
+this software and associated documentation files (the "Software"), to deal in
+the Software without restriction, including without limitation the rights to
+use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
+the Software, and to permit persons to whom the Software is furnished to do so,
+subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
+FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
+COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
+IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
View
@@ -1,11 +1,8 @@
NODE = node
-TEST = expresso
-TESTS ?= test/*.test.js
+TEST = vows
+TESTS ?= test/*-test.js
test:
- @NODE_ENV=test $(TEST) -I lib $(TEST_FLAGS) $(TESTS)
-
-test-cov:
- @$(MAKE) test TEST_FLAGS="--cov"
+ @NODE_ENV=test NODE_PATH=lib $(TEST) $(TEST_FLAGS) $(TESTS)
.PHONY: test test-cov
View
1 README
@@ -1 +0,0 @@
-A simple worker pool in JavaScript, targeting Node.js.
View
@@ -0,0 +1,30 @@
+# Function Pool
+
+A simple worker pool in JavaScript, targeting Node.js.
+
+## Credits
+
+ - [Jared Hanson](http://github.com/jaredhanson)
+
+## License
+
+(The MIT License)
+
+Copyright (c) 2011 Jared Hanson
+
+Permission is hereby granted, free of charge, to any person obtaining a copy of
+this software and associated documentation files (the "Software"), to deal in
+the Software without restriction, including without limitation the rights to
+use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
+the Software, and to permit persons to whom the Software is furnished to do so,
+subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
+FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
+COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
+IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
View
@@ -1,13 +1,13 @@
-var workerpool = require('../lib/worker-pool');
+var functionpool = require('functionpool');
-var pool = new workerpool.Pool(function(delay, finish) {
+var pool = new functionpool.Pool(function(delay, done) {
if (delay < 0) {
- finish(new Error('delay must be greater than zero'));
+ done(new Error('delay must be greater than zero'));
return;
}
console.log('Working for ' + delay + ' milliseconds...');
- setTimeout(function() { finish(delay) }, delay);
+ setTimeout(function() { done(null, delay) }, delay);
});
function callback(err, result) {
@@ -18,5 +18,6 @@ function callback(err, result) {
console.log('Worked for ' + result + ' milliseconds.');
}
+
pool.add(1000, callback);
pool.add(-1, callback);
View
@@ -1,12 +1,17 @@
-var workerpool = require('../lib/worker-pool');
+var functionpool = require('functionpool');
-var pool = new workerpool.Pool(function(delay, finish) {
+var startedAt = Date.now();
+
+var pool = new functionpool.Pool({ size: 3 }, function(delay, done) {
console.log('Working for ' + delay + ' milliseconds...');
- setTimeout(finish, delay);
+ setTimeout(done, delay);
});
+pool.on('idle', function() {
+ console.log('Finished (' + (Date.now() - startedAt) + ' ms)');
+});
-pool.add(1000);
-pool.add(2000);
-pool.add(3000);
-pool.add(1000);
+pool.task(1000);
+pool.task(2000);
+pool.task(3000);
+pool.task(1000);
View
@@ -1 +0,0 @@
-module.exports = require('./lib/worker-pool');
View
@@ -0,0 +1,10 @@
+/**
+ * Module dependencies.
+ */
+var Pool = require('./pool');
+
+
+/**
+ * Expose constructors.
+ */
+exports.Pool = Pool;
View
@@ -0,0 +1,92 @@
+var util = require('util')
+ , EventEmitter = require('events').EventEmitter
+ , Worker = require('./worker');
+
+function Pool(options, fn) {
+ if (typeof options == 'function') {
+ fn = options;
+ options = {};
+ }
+ options = options || {};
+
+ if (!fn) { throw new Error('Pool requires a worker function') };
+ if (fn.length < 1) { throw new Error('Pool worker function must accept done callback as an argument') };
+
+ EventEmitter.call(this);
+ this._fn = fn;
+ this._workers = this._createWorkers(options.size || 5);
+ this._working = 0;
+ this._queue = [];
+
+ var self = this;
+ this.on('task', function() {
+ if (self._workers.length == self._working) { return; }
+ self._dispatch();
+ });
+ this.on('done', function() {
+ // TODO: emit a `drain` event if the pool was previously queuing, and now
+ // has slots available
+ if (self._working == 0) { self.emit('idle'); }
+ if (self._queue.length == 0) { return; }
+ self._dispatch();
+ })
+};
+
+util.inherits(Pool, EventEmitter);
+
+
+Pool.prototype.add =
+Pool.prototype.task = function() {
+ var args = Array.prototype.slice.call(arguments);
+ var callback;
+ if (args && args.length) {
+ if (typeof args[args.length - 1] == 'function') {
+ callback = args.pop();
+ }
+ }
+
+ this._queue.push({args: args, callback: callback});
+ var size = this._queue.length;
+ this.emit('task');
+
+ // If the queue length is zero, a worker was available to accept the task.
+ // Otherwise, all workers are busy, and the task remains in the queue.
+ return (this._queue.length == 0) ? true : false;
+}
+
+Pool.prototype._dispatch = function() {
+ var self = this;
+
+ // Find an available worker to give the next task to.
+ var worker;
+ for (var i = 0, len = this._workers.length; i < len; i++) {
+ var w = this._workers[i];
+ if (!w.busy) {
+ worker = w;
+ break;
+ }
+ }
+
+ if (worker) {
+ var task = this._queue.shift();
+ worker.once('done', function(err, res) {
+ task.callback && task.callback(err, res);
+ self._working--;
+ self.emit('done');
+ });
+
+ this._working++;
+ worker.work(task.args);
+ }
+}
+
+Pool.prototype._createWorkers = function(count) {
+ var array = [];
+ for (var i = 0; i < count; i++) {
+ array.push(new Worker(this._fn));
+ }
+ return array;
+};
+
+
+module.exports = Pool;
View
@@ -0,0 +1,31 @@
+var util = require('util')
+ , EventEmitter = require('events').EventEmitter;
+
+function Worker(fn) {
+ EventEmitter.call(this);
+ this.busy = false;
+ this._fn = fn;
+};
+
+util.inherits(Worker, EventEmitter);
+
+Worker.prototype.work = function(args) {
+ args = args || [];
+
+ var self = this;
+ function done(err, res) {
+ self.busy = false;
+ self.emit('done', err, res);
+ }
+
+ this.busy = true;
+ args.push(done);
+ try {
+ this._fn.apply(this, args);
+ } catch (err) {
+ done(err);
+ }
+};
+
+
+module.exports = Worker;
View
@@ -1,3 +0,0 @@
-var Pool = require('./pool');
-
-exports.Pool = Pool;
View
@@ -1,72 +0,0 @@
-var sys = require('sys');
-var EventEmitter = require('events').EventEmitter;
-var Worker = require('./worker');
-
-function Pool (options, workerFunc) {
- if (typeof options == 'function') {
- workerFunc = options;
- options = {};
- }
-
- if (!workerFunc) { throw new Error('Pool requires a worker function') };
- if (workerFunc.length < 1) { throw new Error('Worker function must take finish function as an argument') };
-
- options = options || {};
-
- this.workerFunc = workerFunc;
- this.queue = [];
- this.workers = this._createWorkers(options.size || 3);
- EventEmitter.call(this);
-};
-
-sys.inherits(Pool, EventEmitter);
-
-
-Pool.prototype.add = function() {
- var args = Array.prototype.slice.call(arguments);
- var callback;
- if (args && args.length) {
- if (typeof args[args.length - 1] == 'function') {
- callback = args.pop();
- }
- }
-
- this.queue.push({args: args, callback: callback});
- this.emit('add');
- this.work();
-
- // Allow chaining.
- return this;
-}
-
-Pool.prototype.work = function() {
- var i;
- for (i = 0; i < this.workers.length; i++) {
- var worker = this.workers[i];
- if (!worker.working) {
- return worker.work();
- }
- }
-};
-
-Pool.prototype.shift = function() {
- return this.queue.shift();
-};
-
-Pool.prototype.report = function() {
- if (this.queue.length == 0) {
- this.emit('idle');
- }
-};
-
-Pool.prototype._createWorkers = function(count) {
- var array = [];
- var i;
- for (i = 0; i < count; i++) {
- array.push(new Worker(this, this.workerFunc));
- }
- return array;
-};
-
-
-module.exports = Pool;
View
@@ -1,41 +0,0 @@
-function Worker (pool, workFunc) {
- this.pool = pool;
- this.workFunc = workFunc;
- this.working = false;
-};
-
-Worker.prototype.work = function() {
- if (this.working) { return; }
-
- var job = this.pool.shift();
- if (!job) { return; }
-
- var self = this;
- var args = job.args || [];
- var callback = job.callback;
-
- function finish(res) {
- if (callback) {
- if (res instanceof Error) {
- callback.call(self, res);
- } else {
- callback.call(self, null, res);
- }
- }
-
- self.pool.report();
- self.working = false;
- self.work();
- }
- args.push(finish);
-
- this.working = true;
- try {
- return this.workFunc.apply(this, args);
- } catch (err) {
- return finish(err);
- }
-};
-
-
-module.exports = Worker;
Oops, something went wrong.

0 comments on commit 109a8cb

Please sign in to comment.