Skip to content

Commit

Permalink
Merge branch 'refs/heads/release-2.0.1'
Browse files Browse the repository at this point in the history
  • Loading branch information
kilianc committed Nov 23, 2011
2 parents c7c048c + bde1b22 commit 93be370
Show file tree
Hide file tree
Showing 5 changed files with 502 additions and 366 deletions.
2 changes: 0 additions & 2 deletions .gitignore

This file was deleted.

90 changes: 47 additions & 43 deletions README.md
Expand Up @@ -13,25 +13,36 @@ A powerful utility for function chaining (inspired by [async](https://github.com
## Syntax

```javascript
new FnQueue(functionsList[, callback, concurrencyLevel]);
new FnQueue(functionsList[, callback, concurrencyLevel, isStopped]);
```
##Parameters

1. `functionsList` __(Object)__ a list of Functions. Each function can declare implicit dependencies as arguments and assume you provide a single callback as the last argument.
2. `callback` __(Function(err, data))__ the complete callback in the conventional form of `function (err, data){ ... }`
2. `callback` __(Function(err, data))__ the complete callback in the conventional form of `function (err, data) { ... }`
3. `concurrencyLevel` __(Number/String: defaults to 'auto')__ the concurrency level of the chain execution, can be `'auto'` or `N* = { 1, 2, ... }`
4. `isStopped` __(Boolean: defaults to false)__ if true you must call the start method in order to execute the function list.

##Methods

* __start__: will start the execution, used in combination with `isStopped = true` constructor parameter

##Attributes

* __isVerbose__ _(Boolean)_: will change the instance verbose mode

##Notes

FnQueue runs a list of functions, each passing their results to the dependent function in the list. However, if any of the functions pass an error to the callback, the next function is not executed and the main callback is immediately called with the error.

Each dependency/argument must be named with the label of the dependent function in the `functionsList` (the first constructor argument).
Each function with a dependency will be called with the result of the dependent function as expected. __(YES this is a fucking cool introspection!)__
Each function with a dependency will be called with the result of the dependent function as expected. __(Introspection by [introspect](https://github.com/kilianc/introspect))__

The global callback is called once, on the first error or at the end of the execution. A data object will be provided with the indexed result of the functions.

FnQueue magically resolves all dependencies and executes functions in the right order with the provided concurrency level.

##Example

```javascript
var FnQueue = require('fnqueue');
```
Expand All @@ -44,48 +55,41 @@ Example:

```javascript
new FnQueue({
// this will wait for 'processSomething' and 'searchSomething' and will be called with the respective results
funnyStuff: function(processSomething, searchSomething, callback){
// do something silly
callback(null, 'ciao!');
},
// this will be called instantly
searchSomething: function(callback){
// do something with database
callback(err, results);
},
// this will wait 'searchSomething'
update: function(searchSomething, callback){
// change values inside results and save to db
callback(err); // no needs to return values
},
// this will wait 'searchSomething'
processSomething: function(searchSomething, callback){
var start = new Date().getTime();
// do something slow
var elapsedTime = new Date().getTime() - start;
callback(err, elapsedTime);
}]
},function(err, data){

if(err)
throw err;

console.log(data.searchSomething); // results
console.log(data.update); // undefined
console.log(data.processSomething); // elapsedTime
console.log(data.funnyStuff); // 'ciao!'
// this will wait for 'processSomething' and 'searchSomething' and will be called with the respective results
funnyStuff: function (processSomething, searchSomething, callback) {
// do something silly
callback(null, 'ciao!');
},
// this will be called instantly
searchSomething: function (callback) {
// do something with database
callback(err, results);
},
// this will wait 'searchSomething'
update: function (searchSomething, callback) {
// change values inside results and save to db
callback(err); // no needs to return values
},
// this will wait 'searchSomething'
processSomething: function (searchSomething, callback) {
var start = new Date().getTime();
// do something slow
var elapsedTime = new Date().getTime() - start;
callback(err, elapsedTime);
}]
}, function (err, data) {

if (err) {
throw err;
}

console.log(data.searchSomething); // results
console.log(data.update); // undefined
console.log(data.processSomething); // elapsedTime
console.log(data.funnyStuff); // 'ciao!'
}, 1);
```

##Introspection profiling results

Profiling results are pretty good, Function.toString() took up __~2 seconds__ every __1 Million__ of executions.

Lines of code time (ms) Platform
---------------------------------------------------------------------------------------------------
800 1808ms OSX Lion 2.2 GHz Intel Core i7 / nodejs v6.0.1

## Test

Tests depends on http://vowsjs.org/ then
Expand All @@ -94,7 +98,7 @@ Tests depends on http://vowsjs.org/ then
npm install
npm test

![tests](http://cl.ly/1R2q3h0G2c3a41303R1I/fnqueue_test_v2.png)
![tests](http://f.cl.ly/items/03432M3A0l0r3M142B2w/fnqueue_test_v2.0.2.png)

## License

Expand Down
180 changes: 105 additions & 75 deletions lib/fnqueue.js
@@ -1,121 +1,151 @@
function FnQueue(tasks, callback, concurrecyLevel, stop) {
this.tasks = tasks;
this.callback = callback;
this.callSequence = [];
this.results = {};
this.gotError = false;
this.concurrecyLevel = concurrecyLevel || 'auto';
this.runningNb = 0;
!stop && this.callNextFunction();
var
introspect = require('introspect');

function FnQueue(tasks, callback, concurrecyLevel, isStopped) {
this.tasks = tasks;
this.taskDependencies = {};
this.callback = callback;
this.callSequence = [];
this.results = {};
this.gotError = false;
this.concurrecyLevel = concurrecyLevel || 'auto';
this.runningNb = 0;
this.loadDependencies();

if (typeof this.concurrecyLevel !== 'string' && typeof this.concurrecyLevel !== 'number') {
this.onTaskComplete(null, new Error('Invalid concurrecyLevel: ' + typeof this.concurrecyLevel + ', must be a Number or \'auto\''));
return;
} else if (typeof this.concurrecyLevel === 'number' && this.concurrecyLevel < 1) {
this.onTaskComplete(null, new Error('Invalid concurrecyLevel: ' + this.concurrecyLevel + ' < 1'));
return;
} else if (typeof this.concurrecyLevel === 'string' && this.concurrecyLevel !== 'auto') {
this.onTaskComplete(null, new Error('Invalid concurrecyLevel: ' + this.concurrecyLevel + ' < 1'));
return;
}

!isStopped && this.callNextFunction();
}

FnQueue.verbose = function () {
FnQueue.prototype.isVerbose = true;
return FnQueue;
FnQueue.prototype.isVerbose = true;
return FnQueue;
};

FnQueue.prototype.isVerbose = false;

FnQueue.prototype.loadDependencies = function() {

for(var taskName in this.tasks) {

if (typeof this.tasks[taskName] !== 'function') {
this.onTaskComplete(null, new Error('Invalid Function in list: ' + taskName + ' is not a function'));
return;
}

if (this.tasks[taskName].length === 0) {
this.onTaskComplete(null, new Error('Invalid Function in list: missing callback parameter in ' + taskName));
return;
}

this.taskDependencies[taskName] = introspect(this.tasks[taskName]).slice(0, -1);
}
};

FnQueue.prototype.onTaskComplete = function (taskName, err, result) {

if (this.gotError)
return;
if (this.gotError) {
return;
}

this.runningNb--;
this.runningNb--;

if (err) {
this.gotError = true;
this.callback && this.callback(err, this.getResults());
return;
}
if (err) {
this.gotError = true;
this.callback && this.callback(err, this.getResults());
return;
}

this.results[taskName] = { result: result };
this.callNextFunction();
this.results[taskName] = { result: result };
this.callNextFunction();
};

FnQueue.prototype.getResults = function () {
var results = {};
for(var taskName in this.results)
results[taskName] = this.results[taskName].result;
return results;
var results = {};
for(var taskName in this.results) {
results[taskName] = this.results[taskName].result;
}
return results;
};

FnQueue.prototype.callNextFunction = function () {

var nextFunction, dependencies, finish = true, args;

for(var taskName in this.tasks) {

finish = false;
var nextFunction, dependencies, finish = true, args;

if (this.concurrecyLevel != 'auto' && this.runningNb >= this.concurrecyLevel)
return;
for(var taskName in this.tasks) {

nextFunction = this.tasks[taskName];
finish = false;

if (nextFunction.length === 0) {
this.onTaskComplete(null, new Error('Invalid Function in chain: missing callback parameter'));
return;
}
if (this.concurrecyLevel != 'auto' && this.runningNb >= this.concurrecyLevel) {
return;
}

dependencies = this.getFunctionArguments(nextFunction).slice(0, -1);
args = this.getDependencies(dependencies);
nextFunction = this.tasks[taskName];
dependencies = this.taskDependencies[taskName];
args = this.getDependencies(dependencies);

if (args) {
args.push(this.onTaskComplete.bind(this, taskName));
if (args) {
args.push(this.onTaskComplete.bind(this, taskName));

this.isVerbose && console.log(' - FnQueue: executing: ' + taskName + '(' + dependencies.join(',') + ') { ... }');
this.isVerbose && console.log(' - FnQueue: executing: ' + taskName + '(' + dependencies.join(',') + ') { ... }');

this.runningNb++;
delete this.tasks[taskName];
this.callSequence.push(taskName);
nextFunction.apply(null, args);
this.runningNb++;
delete this.tasks[taskName];
this.callSequence.push(taskName);
nextFunction.apply(null, args);

if (this.concurrecyLevel != 'auto' && this.runningNb >= this.concurrecyLevel)
return;
}
}
if (this.concurrecyLevel != 'auto' && this.runningNb >= this.concurrecyLevel) {
return;
}
}
}

if (!finish && this.runningNb === 0) {
this.onTaskComplete(null, new Error('Unresolvable dependencies: function "' + taskName + '" requires [' + dependencies.join(',') + ']'));
return;
}
if (!finish && this.runningNb === 0) {
this.onTaskComplete(null, new Error('Unresolvable dependencies: function "' + taskName + '" requires [' + dependencies.join(',') + ']'));
return;
}

if (finish && !this.runningNb && this.callback) {
this.callback(null, this.getResults());
this.destroy();
}
if (finish && !this.runningNb && this.callback) {
this.callback(null, this.getResults());
this.destroy();
}
};

FnQueue.prototype.start = FnQueue.prototype.callNextFunction;

FnQueue.prototype.getFunctionArguments = function (fn) {
return (/^function.+\(([a-z0-9\n\r\t ,]*)\)/i).exec(fn.toString())[1].trim().split(/[ ,\n\r\t]+/);
};

FnQueue.prototype.getDependencies = function (dependencies) {

var args = [];
var args = [];

for(var i = 0; i < dependencies.length; i++) {
for(var i = 0; i < dependencies.length; i++) {

if (this.results[dependencies[i]] === undefined) {
return false;
}
if (this.results[dependencies[i]] === undefined) {
return false;
}

var arg = this.results[dependencies[i]];
var arg = this.results[dependencies[i]];

if (arg !== undefined)
args.push(arg.result);
}
if (arg !== undefined) {
args.push(arg.result);
}
}

return args;
return args;
};

FnQueue.prototype.destroy = function () {
delete this.results;
delete this.callback;
delete this.tasks;
delete this.results;
delete this.callback;
delete this.tasks;
};

module.exports = FnQueue;

0 comments on commit 93be370

Please sign in to comment.