Permalink
Browse files

Initial commit

  • Loading branch information...
0 parents commit e3da6b9587509cc698b5bc51a0202afaf4c298cd @tim-smart committed Jun 19, 2010
Showing with 148 additions and 0 deletions.
  1. 0 README.md
  2. +125 −0 lib/parallel/index.js
  3. +23 −0 test.js
0 README.md
No changes.
125 lib/parallel/index.js
@@ -0,0 +1,125 @@
+
+var Task = exports.Task = function Task(actions) {
+ this.actions = {};
+ this._emitter = new process.EventEmitter();
+
+ if ('object' === typeof actions) {
+ var keys = Object.keys(actions);
+ for (var i = 0, key; key = keys[i++]; ) {
+ this.add(key, actions[key]);
+ }
+ }
+
+ return this;
+};
+
+Task.prototype.add = function add(name, action) {
+ this.actions[name] = action;
+
+ return this;
+};
+
+Task.prototype.bind = function bind() {
+ var callback = arguments[arguments.length - 1],
+ names = Array.prototype.slice.call(arguments, 0, -1);
+
+ for (var i = 0, name; name = names[i++]; ) {
+ this._emitter.addListener(name, callback);
+ }
+
+ return this;
+};
+
+Task.prototype.run = function run(callback) {
+ var count = 0,
+ action,
+ args,
+ keys = Object.keys(this.actions),
+ self = this;
+
+ for (var i = 0, key; key = keys[i++]; ) {
+ action = this.actions[key][0];
+ args = this.actions[key].slice(1);
+ args.push((function(name, action) {
+ return function() {
+ arguments = Array.prototype.slice.call(arguments);
+ arguments.unshift(name);
+ self._emitter.emit.apply(self._emitter, arguments);
+ callback.apply(self, arguments);
+
+ count--;
+ if (0 >= count) {
+ callback.call(self, null);
+ }
+ };
+ })(key, action));
+
+ count++;
+ action.apply(exports, args);
+ }
+
+ if (0 >= count) {
+ callback(null);
+ }
+
+ return this;
+};
+
+var Sequence = exports.Sequence = function Sequence(tasks) {
+ if (typeof tasks === 'function') {
+ tasks = Array.prototype.slice.call(arguments);
+ }
+ this._tasks = tasks;
+ return this;
+};
+
+Sequence.prototype.add = function add() {
+ var tasks = Array.prototype.slice.call(arguments);
+ this._tasks.push.apply(this._tasks, tasks);
+ return this;
+};
+
+Sequence.prototype.run = function run(callback) {
+ var tasks = this._tasks.slice();
+ var next = function next() {
+ var task = tasks.shift();
+ if (typeof task !== 'function') {
+ return callback();
+ }
+ var args = Array.prototype.slice.call(arguments);
+ args.unshift(next);
+ try {
+ task.apply(this, args);
+ } catch (error) {
+ next(next, error)
+ }
+ };
+ var counter = 0,
+ results = [],
+ error = null;
+ next.parallel = function parallel() {
+ counter++;
+ return function () {
+ if (arguments[0]) {
+ error = arguments[0];
+ }
+ results.push(arguments[1]);
+ counter--;
+ if (counter <= 0) {
+ var temp_results = results, temp_error = error;
+ counter = 0;
+ error = null;
+ results = [];
+ temp_results.unshift(temp_results.slice());
+ temp_results.unshift(temp_error);
+ temp_results.unshift(next);
+ next.apply(null, temp_results);
+ }
+ };
+ };
+
+ if (tasks.length > 0) {
+ tasks.shift()(next);
+ }
+ return this;
+};
23 test.js
@@ -0,0 +1,23 @@
+var parallel = require('./lib/parallel');
+var sys = require('sys');
+
+var task = new parallel.Sequence(
+ function (next) {
+ sys.puts('1');
+ next('2');
+ },
+ function (next, text) {
+ sys.puts(text);
+ process.nextTick(next.parallel());
+ process.nextTick(next.parallel());
+ process.nextTick(next.parallel());
+ },
+ function (next, err, results) {
+ sys.puts('3');
+ sys.puts(sys.inspect(arguments));
+ next();
+ }
+);
+task.run(function () {
+ sys.puts('Done.');
+});

0 comments on commit e3da6b9

Please sign in to comment.