Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

initial

  • Loading branch information...
commit 2dfa14f0cd3e4b40c73de61236b7a31eb6866fe7 0 parents
Yuriy Bogdanov authored
54 README.md
@@ -0,0 +1,54 @@
+
+# Introduction
+node-narrow is a library which shrinks a given callback parallel execution in a limited number of threads, receiving the bunch of data (array of tasks).
+
+Inspired from [node-async#queue](https://github.com/caolan/async#queue).
+
+# Synopsis
+Push a big bunch of data and handle it in maximum 5 simultaneous threads:
+
+ var Narrow = require('narrow');
+
+ var narrow = new Narrow(5, function(str, callback){
+ setTimeout(function(){
+ callback(null, str.toUpperCase());
+ }, 1000)
+ })
+
+ var tasks = [];
+ for (var i = 0; i < 10; i++) {
+ tasks.push('something' + i);
+ }
+
+ var start = new Date;
+
+ narrow.on('complete', function(result){
+ console.log('%s after %dms', result, new Date - start);
+ });
+
+ narrow.pushAll(tasks, function(){
+ console.log('done');
+ })
+
+Will output:
+
+ SOMETHING0 after 1002ms
+ SOMETHING1 after 1012ms
+ SOMETHING2 after 1012ms
+ SOMETHING3 after 1012ms
+ SOMETHING4 after 1012ms
+ SOMETHING5 after 2012ms
+ SOMETHING6 after 2012ms
+ SOMETHING7 after 2013ms
+ SOMETHING8 after 2016ms
+ SOMETHING9 after 2016ms
+ done
+
+Timeouts support
+
+ todo: document
+
+See more examples in [examples](https://github.com/0ctave/node-narrow/tree/master/examples) directory.
+
+# Installation
+ npm install narrow
34 examples/push.js
@@ -0,0 +1,34 @@
+
+require.paths.unshift(__dirname + '/../lib');
+
+var Narrow = require('narrow');
+
+/**
+ * in 1 thread
+ */
+var narrow = new Narrow(function(str, callback){
+ setTimeout(function(){
+ callback(null, str.toUpperCase());
+ }, 1000)
+})
+
+var start = new Date;
+for (var i = 0; i < 10; i++) {
+ narrow.push('something' + i, function(err, result){
+ console.log('%s after %dms', result, new Date - start);
+ });
+}
+
+/* output:
+
+ SOMETHING0 after 1002ms
+ SOMETHING1 after 2012ms
+ SOMETHING2 after 3012ms
+ SOMETHING3 after 4012ms
+ SOMETHING4 after 5012ms
+ SOMETHING5 after 6016ms
+ SOMETHING6 after 7016ms
+ SOMETHING7 after 8016ms
+ SOMETHING8 after 9016ms
+ SOMETHING9 after 10016ms
+*/
34 examples/push5.js
@@ -0,0 +1,34 @@
+
+require.paths.unshift(__dirname + '/../lib');
+
+var Narrow = require('narrow');
+
+/**
+ * in 5 threads
+ */
+var narrow = new Narrow(5, function(str, callback){
+ setTimeout(function(){
+ callback(null, str.toUpperCase());
+ }, 1000)
+})
+
+var start = new Date;
+for (var i = 0; i < 10; i++) {
+ narrow.push('something' + i, function(err, result){
+ console.log('%s after %dms', result, new Date - start);
+ });
+}
+
+/* output:
+
+ SOMETHING0 after 1001ms
+ SOMETHING1 after 1012ms
+ SOMETHING2 after 1012ms
+ SOMETHING3 after 1012ms
+ SOMETHING4 after 1012ms
+ SOMETHING5 after 2016ms
+ SOMETHING6 after 2017ms
+ SOMETHING7 after 2017ms
+ SOMETHING8 after 2017ms
+ SOMETHING9 after 2017ms
+*/
43 examples/pushAll.js
@@ -0,0 +1,43 @@
+
+require.paths.unshift(__dirname + '/../lib');
+
+var Narrow = require('narrow');
+
+/**
+ * in 1 thread
+ */
+var narrow = new Narrow(function(str, callback){
+ setTimeout(function(){
+ callback(null, str.toUpperCase());
+ }, 1000)
+})
+
+var tasks = [];
+for (var i = 0; i < 10; i++) {
+ tasks.push('something' + i);
+}
+
+var start = new Date;
+
+narrow.on('complete', function(result){
+ console.log('%s after %dms', result, new Date - start);
+});
+
+narrow.pushAll(tasks, function(){
+ console.log('done');
+})
+
+/* output:
+
+ SOMETHING0 after 1003ms
+ SOMETHING1 after 2013ms
+ SOMETHING2 after 3013ms
+ SOMETHING3 after 4013ms
+ SOMETHING4 after 5013ms
+ SOMETHING5 after 6013ms
+ SOMETHING6 after 7017ms
+ SOMETHING7 after 8017ms
+ SOMETHING8 after 9017ms
+ SOMETHING9 after 10017ms
+ done
+*/
43 examples/pushAll5.js
@@ -0,0 +1,43 @@
+
+require.paths.unshift(__dirname + '/../lib');
+
+var Narrow = require('narrow');
+
+/**
+ * in 5 threads
+ */
+var narrow = new Narrow(5, function(str, callback){
+ setTimeout(function(){
+ callback(null, str.toUpperCase());
+ }, 1000)
+})
+
+var tasks = [];
+for (var i = 0; i < 10; i++) {
+ tasks.push('something' + i);
+}
+
+var start = new Date;
+
+narrow.on('complete', function(result){
+ console.log('%s after %dms', result, new Date - start);
+});
+
+narrow.pushAll(tasks, function(){
+ console.log('done');
+})
+
+/* output:
+
+ SOMETHING0 after 1002ms
+ SOMETHING1 after 1012ms
+ SOMETHING2 after 1012ms
+ SOMETHING3 after 1012ms
+ SOMETHING4 after 1012ms
+ SOMETHING5 after 2012ms
+ SOMETHING6 after 2012ms
+ SOMETHING7 after 2013ms
+ SOMETHING8 after 2016ms
+ SOMETHING9 after 2016ms
+ done
+*/
244 lib/narrow.js
@@ -0,0 +1,244 @@
+/*
+ Copyright 2011 Yuriy Bogdanov <chinsay@gmail.com>
+
+ Permission is hereby granted, free of charge, to any person obtaining a copy
+ of this software and associated documentation files (the "Software"), to
+ deal in the Software without restriction, including without limitation the
+ rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
+ sell copies of the Software, and to permit persons to whom the Software is
+ furnished to do so, subject to the following conditions:
+
+ The above copyright notice and this permission notice shall be included in
+ all copies or substantial portions of the Software.
+
+ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+ IN THE SOFTWARE.
+*/
+
+/**
+ * Narrow construct
+ *
+ * usage:
+ *
+ * // 1 thread (default)
+ * var narrow = new Narrow(function(task, callback){})
+ * // 10 threads
+ * var narrow = new Narrow(10, function(task, callback){})
+ *
+ *
+ */
+var Narrow = function(threads, body) {
+
+ if (arguments.length < 2) {
+ body = threads;
+ threads = 1;
+ }
+
+ this.body = body;
+ this.threads = threads;
+
+ this._running = true;
+ this._timeout = 0;
+ this._buffer = [];
+ this._inprog = 0;
+ this._complete = 0;
+ this._total = 0;
+
+ // Process new tasks when free
+ this.on('free', this._process.bind(this));
+}
+
+// Inherits from process.EventEmitter
+Narrow.prototype.__proto__ = process.EventEmitter.prototype;
+
+/**
+ * Starts narrow execution
+ */
+Narrow.prototype.start = function() {
+ this._running = true;
+ this._checkFree();
+}
+
+/**
+ * Stops execution, no new tasks will be spawned after this call
+ * Tasks which are in progress, will finish the exection in a normally
+ *
+ */
+Narrow.prototype.stop = function() {
+ this._running = false;
+}
+
+/**
+ * Pushes one task to a buffer
+ * Will execute callback with error/result when task will be completed
+ *
+ */
+Narrow.prototype.push = function(data, callback) {
+ this._push(data, callback);
+ this._checkFree();
+}
+
+/**
+ * Pushes a bunch of tasks to a buffer
+ * Will execute callback with error/results when all task will be completed
+ *
+ */
+Narrow.prototype.pushAll = function(tasks, callback) {
+
+ if (!tasks instanceof Array) {
+ throw new Error("tasks list should be instance of Array");
+ }
+
+ if (callback) {
+ var done = 0, data = [], cb = function(err, d){
+ if (d !== undefined) data.push(d);
+ if (++done >= tasks.length) callback(null, data.length ? data : null);
+ }
+ }
+
+ for (var i = 0; i < tasks.length; i++) {
+ this._push(tasks[i], cb);
+ }
+
+ this._checkFree();
+}
+
+/**
+ * Sets 'total' number of tasks. Useful when you push tasks one by one, using Narrow.push()
+ */
+Narrow.prototype.__defineSetter__('total', function(total) {
+ this._total = total;
+})
+Narrow.prototype.__defineGetter__('total', function() {
+ return this._total;
+})
+
+/**
+ * Sets 'timeout' (millis) for a single task
+ * If the task timed out, error will be thrown to a callback and 'error' event of Narrow
+ *
+ */
+Narrow.prototype.__defineSetter__('timeout', function(timeout) {
+ this._timeout = timeout;
+})
+Narrow.prototype.__defineGetter__('timeout', function() {
+ return this._timeout;
+})
+
+/**
+ * (private) Internal push
+ */
+Narrow.prototype._push = function(data, callback) {
+ this._buffer.push({
+ data : data,
+ callback : callback
+ })
+}
+
+/**
+ * (private) Checks if there are some free workers, emits 'free' event
+ */
+Narrow.prototype._checkFree = function() {
+ var self = this;
+ if (this._inprog < this.threads) {
+ self.emit('free');
+ }
+}
+
+/**
+ * (private)
+ * This function executes in each tick of Narrow
+ * It checks if there are free workers, than push the tasks from buffer
+ *
+ */
+Narrow.prototype._process = function() {
+
+ // Eat the buffer until maxium threads reached
+ while (this._buffer.length) {
+ if (!this._running) return;
+ if (this._inprog >= this.threads) return;
+ if (this._total && this._inprog >= this._total) return;
+ this._doTask(this._buffer.shift());
+ }
+}
+
+/**
+ * (private)
+ * Executes a given task
+ *
+ */
+Narrow.prototype._doTask = function(task) {
+ var self = this;
+
+ this._inprog++;
+ var called = false, t = null;
+
+ function taskDone(err) {
+ // call this func only once
+ if (called) return;
+ called = true;
+
+ // clear timeout
+ if (t) clearTimeout(t);
+ // check if pipe is running
+ if (!self._inprog) return;
+
+ // counters
+ self._inprog--;
+ self._complete++;
+
+ // Emit 'complete' event, pass task itself and the result to it
+ // emit('complete', resultArg1, resultArg2, task.data)
+ if (!err) {
+ var args = Array.prototype.slice.call(arguments, 1);
+ args.unshift('complete');
+ args.push(task.data);
+ self.emit.apply(self, args);
+ }
+
+ // execute task binded callback
+ if (task.callback) {
+ task.callback.apply(null, arguments);
+ }
+
+ if (err) {
+ self.emit('error', err, task.data);
+ }
+
+ // if the total specified and number of complete tasks + tasks in progress reached total value
+ if (self._total && self._complete + self._inprog >= self._total) {
+ // try to stop the execution if there are some tasks in progress
+ self.stop();
+ // if there are no tasks in progress left, just emit 'end'
+ if (!self._inprog) {
+ this.emit('end');
+ }
+ }
+
+ self.emit('free');
+ }
+
+ process.nextTick(function(){
+ try {
+ // If per-task timeout specified, init the timer
+ if (self._timeout) {
+ t = setTimeout(function(){
+ taskDone(new Error('Timeout'))
+ }, self._timeout);
+ }
+ // Execute task body
+ self.body(task.data, taskDone);
+ }
+ catch (e) {
+ // Same callback but with catched error
+ taskDone(e);
+ }
+ })
+}
+
+module.exports = Narrow;
114 lib/test.js
@@ -0,0 +1,114 @@
+/*
+ Copyright 2011 Yuriy Bogdanov <chinsay@gmail.com>
+
+ Permission is hereby granted, free of charge, to any person obtaining a copy
+ of this software and associated documentation files (the "Software"), to
+ deal in the Software without restriction, including without limitation the
+ rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
+ sell copies of the Software, and to permit persons to whom the Software is
+ furnished to do so, subject to the following conditions:
+
+ The above copyright notice and this permission notice shall be included in
+ all copies or substantial portions of the Software.
+
+ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+ IN THE SOFTWARE.
+*/
+
+/**
+ * Simple test module
+ */
+
+var assert = require('assert');
+
+function Test(name, module) {
+ this.queue = [];
+ this.name = name;
+ this.current = null;
+ for (var test in module) {
+ this.queue.push({
+ name : test,
+ fn : module[test]
+ });
+ }
+
+ this.assert = {};
+ Object.keys(assert).forEach(function(fn){
+ this.assert[fn] = function(){
+ try {
+ assert[fn].apply(assert, arguments);
+ }
+ catch (err) {
+ console.error('test "%s" FAILED: %s', this.current.name, err.stack || err);
+ }
+ }.bind(this)
+ }.bind(this));
+}
+
+Test.prototype.run = function(callback) {
+ console.log('------------------------------------')
+ console.log('running "%s":', this.name);
+ this._next(function(){
+ console.log('done "%s"', this.name);
+ if (callback) callback();
+ }.bind(this));
+}
+
+Test.prototype._next = function(callback) {
+ if (this.queue.length) {
+ this.current = this.queue.shift();
+ console.log('test "%s"', this.current.name);
+ try {
+ this.current.fn(this.assert, function(){
+ this._next(callback);
+ }.bind(this));
+ if (this.current.fn.length < 2) {
+ this._next(callback);
+ }
+ }
+ catch (err) {
+ console.error('test "%s" FAILED: %s', this.current.name, err.stack || err);
+ }
+ }
+ else {
+ callback();
+ }
+}
+
+function Suite(name) {
+ this.tests = [];
+ this.name = name;
+ this.current = null;
+}
+
+Suite.prototype.add = function(test) {
+ this.tests.push(test);
+}
+
+Suite.prototype.run = function(callback) {
+ console.log('Running suite "%s"', this.name);
+ this._next(function(){
+ console.log('done "%s"!', this.name);
+ if (callback) callback();
+ }.bind(this));
+}
+
+Suite.prototype._next = function(callback) {
+ if (this.tests.length) {
+ this.current = this.tests.shift();
+ this.current.run(function(){
+ this._next(callback);
+ }.bind(this));
+ }
+ else {
+ callback();
+ }
+}
+
+module.exports.Test = Test;
+module.exports.Suite = Suite;
13 package.json
@@ -0,0 +1,13 @@
+{
+ "name": "narrow",
+ "description": "Library shrinks a given callback parallel execution concurrency in a limited number of threads, receiving the bunch of data (array of tasks).",
+ "version": "v0.0.1",
+ "url": "http://github.com/0ctave/node-narrow",
+ "author": "Yuriy Bogdanov <chinsay@gmail.com>",
+ "main": "lib/narrow",
+ "engines": {
+ "node": ">=0.3.6"
+ },
+ "dependencies" : {
+ }
+}
127 test/base.js
@@ -0,0 +1,127 @@
+
+require.paths.unshift(__dirname + '/../lib');
+
+var Narrow = require('narrow'),
+ Test = require('test').Test;
+
+var test = module.exports = new Test('Base', {
+
+ 'test single push' : function(assert, done) {
+
+ var taskValue = 'some task';
+
+ var narrow = new Narrow(function(task, callback){
+ assert.strictEqual(task, taskValue);
+ process.nextTick(function(){
+ callback(null, task.toUpperCase());
+ })
+ })
+
+ narrow.push(taskValue, function(err, result){
+ assert.ok(!err);
+ assert.strictEqual(result, taskValue.toUpperCase());
+ done();
+ })
+ },
+
+ 'test single push error' : function(assert, done) {
+
+ var taskValue = 'some task',
+ errorValue = 'something went wrong';
+
+ var narrow = new Narrow(function(task, callback){
+ process.nextTick(function(){
+ callback(errorValue);
+ })
+ })
+
+ var i = 0, done2 = function(){
+ if (++i == 2) done();
+ }
+
+ narrow.on('error', function(err, task){
+ assert.strictEqual(task, taskValue);
+ assert.strictEqual(err, errorValue);
+ done2();
+ })
+
+ narrow.push(taskValue, function(err){
+ assert.strictEqual(err, errorValue);
+ done2();
+ })
+ },
+
+ 'test single push emit complete' : function(assert, done) {
+
+ var taskValue = 'some task';
+
+ var narrow = new Narrow(function(task, callback){
+ assert.strictEqual(task, taskValue);
+ process.nextTick(function(){
+ callback(null, task.toUpperCase());
+ })
+ })
+
+ narrow.push(taskValue);
+
+ narrow.on('complete', function(result, task){
+ assert.strictEqual(result, taskValue.toUpperCase());
+ assert.strictEqual(task, taskValue);
+ done();
+ })
+ },
+
+ 'test pushAll' : function(assert, done) {
+
+ var tasks = ['foo', 'bar', 'baz'];
+
+ var narrow = new Narrow(function(task, callback){
+ process.nextTick(function(){
+ callback(null, task.toUpperCase());
+ })
+ })
+
+ narrow.pushAll(tasks, function(err, result){
+ assert.ok(!err);
+ assert.deepEqual(result, ['FOO', 'BAR', 'BAZ']);
+ done();
+ })
+ },
+
+ 'test pushAll error' : function(assert, done) {
+
+ var tasks = ['foo', 'bar', 'baz'],
+ errorValue = 'something went wrong';
+
+ var narrow = new Narrow(function(task, callback){
+ process.nextTick(function(){
+ if (task == 'bar') {
+ callback(errorValue);
+ }
+ else {
+ callback(null, task.toUpperCase());
+ }
+ })
+ })
+
+ var i = 0, done2 = function(){
+ if (++i == 2) done();
+ }
+
+ narrow.on('error', function(err){
+ assert.strictEqual(err, errorValue);
+ done2();
+ })
+
+ narrow.pushAll(tasks, function(err, result){
+ assert.ok(!err);
+ assert.deepEqual(result, ['FOO', 'BAZ']);
+ done2();
+ })
+ }
+
+})
+
+if (!module.parent) {
+ test.run();
+}
13 test/index.js
@@ -0,0 +1,13 @@
+
+require.paths.unshift(__dirname + '/../lib');
+
+var Narrow = require('narrow'),
+ Suite = require('test').Suite;
+
+var suite = new Suite('Narrow testing');
+
+['base'].forEach(function(test){
+ suite.add(require('./' + test));
+})
+
+suite.run();
Please sign in to comment.
Something went wrong with that request. Please try again.