Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

First commit.

  • Loading branch information...
commit 56be6f2041642992cdc1f97043a3b60a95264fda 0 parents
@jaredhanson authored
11 Makefile
@@ -0,0 +1,11 @@
+NODE = node
+TEST = expresso
+TESTS ?= test/*.test.js
+
+test:
+ @NODE_ENV=test $(TEST) -I lib $(TEST_FLAGS) $(TESTS)
+
+test-cov:
+ @$(MAKE) test TEST_FLAGS="--cov"
+
+.PHONY: test test-cov
1  README
@@ -0,0 +1 @@
+A simple worker pool in JavaScript, targeting Node.js.
22 examples/callback.js
@@ -0,0 +1,22 @@
+var workerpool = require('../lib/worker-pool');
+
+var pool = new workerpool.Pool(function(delay, finish) {
+ if (delay < 0) {
+ finish(new Error('delay must be greater than zero'));
+ return;
+ }
+
+ console.log('Working for ' + delay + ' milliseconds...');
+ setTimeout(function() { finish(delay) }, delay);
+});
+
+function callback(err, result) {
+ if (err) {
+ console.log('ERROR: ' + err.name + ' ' + err.message);
+ return;
+ }
+ console.log('Worked for ' + result + ' milliseconds.');
+}
+
+pool.add(1000, callback);
+pool.add(-1, callback);
12 examples/delay.js
@@ -0,0 +1,12 @@
+var workerpool = require('../lib/worker-pool');
+
+var pool = new workerpool.Pool(function(delay, finish) {
+ console.log('Working for ' + delay + ' milliseconds...');
+ setTimeout(finish, delay);
+});
+
+
+pool.add(1000);
+pool.add(2000);
+pool.add(3000);
+pool.add(1000);
1  index.js
@@ -0,0 +1 @@
+module.exports = require('./lib/worker-pool');
3  lib/worker-pool/index.js
@@ -0,0 +1,3 @@
+var Pool = require('./pool');
+
+exports.Pool = Pool;
72 lib/worker-pool/pool.js
@@ -0,0 +1,72 @@
+var sys = require('sys');
+var EventEmitter = require('events').EventEmitter;
+var Worker = require('./worker');
+
+function Pool (options, workerFunc) {
+ if (typeof options == 'function') {
+ workerFunc = options;
+ options = {};
+ }
+
+ if (!workerFunc) { throw new Error('Pool requires a worker function') };
+ if (workerFunc.length < 1) { throw new Error('Worker function must take finish function as an argument') };
+
+ options = options || {};
+
+ this.workerFunc = workerFunc;
+ this.queue = [];
+ this.workers = this._createWorkers(options.size || 3);
+ EventEmitter.call(this);
+};
+
+sys.inherits(Pool, EventEmitter);
+
+
+Pool.prototype.add = function() {
+ var args = Array.prototype.slice.call(arguments);
+ var callback;
+ if (args && args.length) {
+ if (typeof args[args.length - 1] == 'function') {
+ callback = args.pop();
+ }
+ }
+
+ this.queue.push({args: args, callback: callback});
+ this.emit('add');
+ this.work();
+
+ // Allow chaining.
+ return this;
+}
+
+Pool.prototype.work = function() {
+ var i;
+ for (i = 0; i < this.workers.length; i++) {
+ var worker = this.workers[i];
+ if (!worker.working) {
+ return worker.work();
+ }
+ }
+};
+
+Pool.prototype.shift = function() {
+ return this.queue.shift();
+};
+
+Pool.prototype.report = function() {
+ if (this.queue.length == 0) {
+ this.emit('idle');
+ }
+};
+
+Pool.prototype._createWorkers = function(count) {
+ var array = [];
+ var i;
+ for (i = 0; i < count; i++) {
+ array.push(new Worker(this, this.workerFunc));
+ }
+ return array;
+};
+
+
+module.exports = Pool;
41 lib/worker-pool/worker.js
@@ -0,0 +1,41 @@
+function Worker (pool, workFunc) {
+ this.pool = pool;
+ this.workFunc = workFunc;
+ this.working = false;
+};
+
+Worker.prototype.work = function() {
+ if (this.working) { return; }
+
+ var job = this.pool.shift();
+ if (!job) { return; }
+
+ var self = this;
+ var args = job.args || [];
+ var callback = job.callback;
+
+ function finish(res) {
+ if (callback) {
+ if (res instanceof Error) {
+ callback.call(self, res);
+ } else {
+ callback.call(self, null, res);
+ }
+ }
+
+ self.pool.report();
+ self.working = false;
+ self.work();
+ }
+ args.push(finish);
+
+ this.working = true;
+ try {
+ return this.workFunc.apply(this, args);
+ } catch (err) {
+ return finish(err);
+ }
+};
+
+
+module.exports = Worker;
90 test/pool.test.js
@@ -0,0 +1,90 @@
+var Pool = require('worker-pool').Pool;
+var assert = require('assert');
+
+
+module.exports = {
+
+ 'test default options': function() {
+ var pool = new Pool(function(finish) {});
+ assert.equal(pool.workers.length, 3);
+ assert.equal(pool.queue.length, 0);
+ },
+
+ 'test creates correct number of workers': function() {
+ var pool = new Pool({ size: 5 }, function(finish) {});
+ assert.equal(pool.workers.length, 5);
+ },
+
+ 'test passes zero arguments to worker function': function() {
+ var fini;
+ var pool = new Pool(function(finish) {
+ fini = finish;
+ });
+
+ pool.add();
+ assert.type(fini, 'function');
+ },
+
+ 'test passes single argument to worker function': function() {
+ var a1;
+ var fini;
+ var pool = new Pool(function(x, finish) {
+ a1 = x;
+ fini = finish;
+ });
+
+ pool.add(1);
+ assert.equal(a1, 1);
+ assert.type(fini, 'function');
+ },
+
+ 'test passes multiple arguments to worker function': function() {
+ var a1;
+ var a2;
+ var fini;
+ var pool = new Pool(function(x, y, finish) {
+ a1 = x;
+ a2 = y;
+ fini = finish;
+ });
+
+ pool.add(1, 2);
+ assert.equal(a1, 1);
+ assert.equal(a2, 2);
+ assert.type(fini, 'function');
+ },
+
+ 'test accumulates work in queue': function() {
+ var args = [];
+ var addCount = 0;
+ var pool = new Pool(function(x, finish) { args.push(x) });
+
+
+ pool.on('add', function() { addCount++ });
+ pool.add(1);
+ pool.add(2);
+ pool.add(3);
+ pool.add(4);
+ pool.add(5);
+
+ assert.deepEqual(args, [1, 2, 3]);
+ assert.equal(addCount, 5);
+ assert.equal(pool.queue.length, 2);
+ },
+
+ 'test throws without worker function': function() {
+ assert.throws(
+ function() {
+ var pool = new Pool();
+ },
+ Error
+ );
+
+ assert.throws(
+ function() {
+ var pool = new Pool({});
+ },
+ Error
+ );
+ }
+};
Please sign in to comment.
Something went wrong with that request. Please try again.