forked from caolan/async
Permalink
Newer
100644
807 lines (737 sloc)
24.4 KB
10
if (typeof module !== 'undefined' && module.exports) {
11
module.exports = async;
12
}
13
else {
14
root.async = async;
15
}
24
var _forEach = function (arr, iterator) {
25
if (arr.forEach) {
26
return arr.forEach(iterator);
27
}
28
for (var i = 0; i < arr.length; i += 1) {
33
var _map = function (arr, iterator) {
34
if (arr.map) {
35
return arr.map(iterator);
36
}
44
var _reduce = function (arr, iterator, memo) {
45
if (arr.reduce) {
46
return arr.reduce(iterator, memo);
47
}
48
_forEach(arr, function (x, i, a) {
54
var _keys = function (obj) {
55
if (Object.keys) {
56
return Object.keys(obj);
57
}
59
for (var k in obj) {
60
if (obj.hasOwnProperty(k)) {
61
keys.push(k);
62
}
67
//// exported async module functions ////
69
//// nextTick implementation with browser-compatible fallback ////
70
if (typeof process === 'undefined' || !(process.nextTick)) {
71
async.nextTick = function (fn) {
73
};
74
}
75
else {
76
async.nextTick = process.nextTick;
77
}
80
callback = callback || function () {};
85
_forEach(arr, function (x) {
86
iterator(x, function (err) {
87
if (err) {
92
completed += 1;
93
if (completed === arr.length) {
94
callback();
95
}
102
callback = callback || function () {};
107
var iterate = function () {
108
iterator(arr[completed], function (err) {
109
if (err) {
114
completed += 1;
115
if (completed === arr.length) {
116
callback();
117
}
118
else {
119
iterate();
120
}
126
127
async.forEachLimit = function (arr, limit, iterator, callback) {
128
callback = callback || function () {};
129
if (!arr.length || limit <= 0) {
130
return callback();
131
}
132
var completed = 0;
133
var started = 0;
134
var running = 0;
135
136
(function replenish () {
137
if (completed === arr.length) {
138
return callback();
139
}
140
141
while (running < limit && started < arr.length) {
142
iterator(arr[started], function (err) {
143
if (err) {
144
callback(err);
145
callback = function () {};
146
}
147
else {
148
completed += 1;
149
running -= 1;
150
if (completed === arr.length) {
151
callback();
152
}
153
else {
154
replenish();
155
}
156
}
157
});
158
started += 1;
159
running += 1;
160
}
161
})();
162
};
184
eachfn(arr, function (x, callback) {
185
iterator(x.value, function (err, v) {
193
async.map = doParallel(_asyncMap);
194
async.mapSeries = doSeries(_asyncMap);
197
// reduce only has a series version, as doing reduce in parallel won't
198
// work in many situations.
199
async.reduce = function (arr, memo, iterator, callback) {
200
async.forEachSeries(arr, function (x, callback) {
201
iterator(memo, x, function (err, v) {
214
async.reduceRight = function (arr, memo, iterator, callback) {
215
var reversed = _map(arr, function (x) {
216
return x;
217
}).reverse();
228
eachfn(arr, function (x, callback) {
229
iterator(x.value, function (v) {
230
if (v) {
231
results.push(x);
232
}
235
}, function (err) {
236
callback(_map(results.sort(function (a, b) {
243
async.filter = doParallel(_filter);
244
async.filterSeries = doSeries(_filter);
246
async.select = async.filter;
247
async.selectSeries = async.filterSeries;
254
eachfn(arr, function (x, callback) {
255
iterator(x.value, function (v) {
256
if (!v) {
257
results.push(x);
258
}
261
}, function (err) {
262
callback(_map(results.sort(function (a, b) {
269
async.reject = doParallel(_reject);
270
async.rejectSeries = doSeries(_reject);
272
var _detect = function (eachfn, arr, iterator, main_callback) {
273
eachfn(arr, function (x, callback) {
274
iterator(x, function (result) {
275
if (result) {
276
main_callback(x);
277
main_callback = function () {};
287
async.detect = doParallel(_detect);
288
async.detectSeries = doSeries(_detect);
290
async.some = function (arr, iterator, main_callback) {
291
async.forEach(arr, function (x, callback) {
292
iterator(x, function (v) {
293
if (v) {
306
async.every = function (arr, iterator, main_callback) {
307
async.forEach(arr, function (x, callback) {
308
iterator(x, function (v) {
309
if (!v) {
322
async.sortBy = function (arr, iterator, callback) {
323
async.map(arr, function (x, callback) {
324
iterator(x, function (err, criteria) {
325
if (err) {
326
callback(err);
327
}
328
else {
329
callback(null, {value: x, criteria: criteria});
330
}
332
}, function (err, results) {
333
if (err) {
334
return callback(err);
335
}
336
else {
337
var fn = function (left, right) {
338
var a = left.criteria, b = right.criteria;
339
return a < b ? -1 : a > b ? 1 : 0;
340
};
341
callback(null, _map(results.sort(fn), function (x) {
342
return x.value;
343
}));
344
}
345
});
348
async.auto = function (tasks, callback) {
349
callback = callback || function () {};
355
var results = {};
361
var removeListener = function (fn) {
362
for (var i = 0; i < listeners.length; i += 1) {
363
if (listeners[i] === fn) {
370
_forEach(listeners.slice(0), function (fn) {
376
if (_keys(results).length === keys.length) {
377
callback(null, results);
378
callback = function () {};
382
_forEach(keys, function (k) {
383
var task = (tasks[k] instanceof Function) ? [tasks[k]]: tasks[k];
384
var taskCallback = function (err) {
385
if (err) {
386
callback(err);
387
// stop subsequent errors hitting callback multiple times
391
var args = Array.prototype.slice.call(arguments, 1);
392
if (args.length <= 1) {
393
args = args[0];
394
}
395
results[k] = args;
399
var requires = task.slice(0, Math.abs(task.length - 1)) || [];
400
var ready = function () {
401
return _reduce(requires, function (a, x) {
402
return (a && results.hasOwnProperty(x));
406
task[task.length - 1](taskCallback, results);
412
task[task.length - 1](taskCallback, results);
422
if (!tasks.length) {
423
return callback();
424
}
425
var wrapIterator = function (iterator) {
426
return function (err) {
427
if (err) {
430
}
431
else {
432
var args = Array.prototype.slice.call(arguments, 1);
433
var next = iterator.next();
434
if (next) {
435
args.push(wrapIterator(next));
436
}
437
else {
438
args.push(callback);
439
}
440
async.nextTick(function () {
441
iterator.apply(null, args);
442
});
449
async.parallel = function (tasks, callback) {
450
callback = callback || function () {};
451
if (tasks.constructor === Array) {
452
async.map(tasks, function (fn, callback) {
453
if (fn) {
454
fn(function (err) {
455
var args = Array.prototype.slice.call(arguments, 1);
456
if (args.length <= 1) {
457
args = args[0];
458
}
459
callback.call(null, err, args);
460
});
461
}
462
}, callback);
463
}
464
else {
465
var results = {};
466
async.forEach(_keys(tasks), function (k, callback) {
467
tasks[k](function (err) {
468
var args = Array.prototype.slice.call(arguments, 1);
469
if (args.length <= 1) {
470
args = args[0];
471
}
472
results[k] = args;
473
callback(err);
475
}, function (err) {
476
callback(err, results);
477
});
478
}
481
async.series = function (tasks, callback) {
482
callback = callback || function () {};
483
if (tasks.constructor === Array) {
484
async.mapSeries(tasks, function (fn, callback) {
485
if (fn) {
486
fn(function (err) {
487
var args = Array.prototype.slice.call(arguments, 1);
488
if (args.length <= 1) {
489
args = args[0];
490
}
491
callback.call(null, err, args);
492
});
493
}
494
}, callback);
495
}
496
else {
497
var results = {};
498
async.forEachSeries(_keys(tasks), function (k, callback) {
499
tasks[k](function (err) {
500
var args = Array.prototype.slice.call(arguments, 1);
501
if (args.length <= 1) {
502
args = args[0];
503
}
504
results[k] = args;
505
callback(err);
507
}, function (err) {
508
callback(err, results);
509
});
510
}
513
async.iterator = function (tasks) {
514
var makeCallback = function (index) {
515
var fn = function () {
516
if (tasks.length) {
517
tasks[index].apply(null, arguments);
518
}
521
fn.next = function () {
522
return (index < tasks.length - 1) ? makeCallback(index + 1): null;
523
};
524
return fn;
525
};
526
return makeCallback(0);
527
};
528
532
return fn.apply(
533
null, args.concat(Array.prototype.slice.call(arguments))
534
);
549
async.concat = doParallel(_concat);
550
async.concatSeries = doSeries(_concat);
580
async.queue = function (worker, concurrency) {
581
var workers = 0;
582
var q = {
589
if(data.constructor !== Array) {
590
data = [data];
591
}
592
_forEach(data, function(task) {
593
q.tasks.push({
594
data: task,
595
callback: typeof callback === 'function' ? callback : null
596
});
597
if (q.saturated && q.tasks.length == concurrency) {
598
q.saturated();
599
}
600
async.nextTick(q.process);
601
});
604
if (workers < q.concurrency && q.tasks.length) {
605
var task = q.tasks.shift();
606
if(q.empty && q.tasks.length == 0) q.empty();
610
if (task.callback) {
611
task.callback.apply(task, arguments);
612
}
613
if(q.drain && q.tasks.length + workers == 0) q.drain();
620
},
621
running: function () {
622
return workers;
623
},
624
rateLimit: function (timeout) {
625
// mixin the ratelimiter
626
rateLimiter.call(q);
627
// and invoke it
628
q.rateLimit(timeout);
629
630
// return original queue for chaining
631
return q;
643
if (typeof console !== 'undefined') {
644
if (err) {
645
if (console.error) {
646
console.error(err);
647
}
649
else if (console[name]) {
650
_forEach(args, function (x) {
651
console[name](x);
652
});
658
async.log = _console_fn('log');
659
async.dir = _console_fn('dir');
660
/*async.info = _console_fn('info');
661
async.warn = _console_fn('warn');
662
async.error = _console_fn('error');*/
671
var args = Array.prototype.slice.call(arguments);
672
var callback = args.pop();
673
var key = hasher.apply(null, args);
674
if (key in memo) {
675
callback.apply(null, memo[key]);
676
}
677
else if (key in queues) {
678
queues[key].push(callback);
679
}
684
var q = queues[key];
685
delete queues[key];
686
for (var i = 0, l = q.length; i < l; i++) {
687
q[i].apply(null, arguments);
688
}
692
memoized.unmemoized = fn;
698
return (fn.unmemoized || fn).apply(null, arguments);
701
702
/**
703
* Add rate limiting capabilities to a queue.
704
* The timeout should indicate the minimum number of milliseconds before
705
* invoking the next item in the queue.
706
* So for a 100/min. rate limit, use (60000 / 100)
707
*
708
* Invoke like
709
* rateLimiter.call(queue);
710
* queue.rateLimit(timeout);
711
*/
712
var rateLimiter = function () {
713
var queue = this;
714
715
// keep the original methods, so we can invoke them later on
716
var _push = queue.push;
717
var _drain = queue.drain;
718
719
// copy the original tasks
720
var internalQueue = queue.tasks.splice(0);
721
queue.tasks = [];
722
723
// number of items that need to finish
724
var toProcess = internalQueue.length;
725
726
// internal state
727
this.$rateLimitTimeout = 0;
728
this.$rateLimitBusy = false;
729
this.$rateLimitInterval = null;
730
731
// wrapper around push that pushes to our own queue instead of tasks
732
this.push = function (data, callback) {
733
if(data.constructor !== Array) {
734
data = [data];
735
}
736
data.forEach(function (task) {
737
internalQueue.push({ data: task, callback: callback });
738
});
739
740
toProcess += data.length;
741
};
742
743
// kick off the rate limiter
744
this.rateLimit = function (timeout) {
745
this.$rateLimitTimeout = timeout || 1000;
746
this.$processNextRateLimited();
747
};
748
749
// invoke the rate limit implementation with a timeout
750
this.$processNextRateLimited = function () {
751
var self = this;
752
753
self.$rateLimitInterval = setInterval(function () {
754
self.$processNextRateLimitedImpl();
755
}, self.$rateLimitTimeout);
756
};
757
758
// process the next item in our rate limited queue
759
this.$processNextRateLimitedImpl = function () {
760
var self = this;
761
762
// grab next item...
763
// disadvantage of this approach (doing it in the impl) is that the event loop will die
764
// 1 timeout later than required. Might need to fix that one day.
765
var item = internalQueue.shift(0);
766
if (!item) {
767
return;
768
}
769
770
// we're busy
771
self.$rateLimitBusy = true;
772
773
// the 'drain' function can be overwritten at this point,
774
// as we don't have control over when it happens...
775
if (self.drain !== $emptyDrain) {
776
_drain = self.drain;
777
self.drain = $emptyDrain;
778
}
779
780
// invoke original push function of the queue
781
_push(item.data, function () {
782
if (typeof item.callback === "function") {
783
item.callback.apply(this, arguments);
784
}
785
786
toProcess -= 1;
787
788
if (toProcess === 0) {
789
self.$rateLimitBusy = false;
790
791
clearInterval(self.$rateLimitInterval);
792
793
if (typeof _drain === "function") {
794
_drain();
795
}
796
}
797
});
798
};
799
800
var $emptyDrain = function () {};
801
802
// the original drain function of a queue needs to be overwritten
803
// otherwise the user will be flooded with messages
804
this.drain = $emptyDrain;
805
};