forked from technoweenie/node-chain-gang
/
index.js
117 lines (115 loc) · 3 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
var ChainGang, Worker, events, sys;
var __slice = Array.prototype.slice;
sys = require('sys');
events = require('events');
ChainGang = function(options) {
options = options || {};
this.index = {};
this.queue = [];
this.events = new events.EventEmitter();
this.workers = this.build_workers(options.workers || 3);
this.active = true;
return this;
};
ChainGang.prototype.add = function(task, name, callback) {
name = name || this.default_name_for(task);
callback ? this.events.addListener(name, callback) : null;
if (this.index[name] !== undefined) {
return null;
}
this.queue.push(name);
this.index[name] = task;
this.events.emit('add', name);
if (this.active) {
return this.perform();
}
};
ChainGang.prototype.perform = function() {
var _a, _b, _c, _d, worker;
_a = []; _c = this.workers;
for (_b = 0, _d = _c.length; _b < _d; _b++) {
worker = _c[_b];
if (!worker.performing) {
return worker.perform();
}
}
return _a;
};
ChainGang.prototype.shift = function() {
var job;
if ((job = this.queue.shift())) {
return {
name: job,
callback: this.index[job]
};
}
};
ChainGang.prototype.finish = function(name, err) {
delete this.index[name];
this.emit(name, err);
return this.emit('finished', name, err);
};
ChainGang.prototype.emit = function(event) {
var args;
var _a = arguments.length, _b = _a >= 2;
args = __slice.call(arguments, 1, _a - 0);
return this.events.emit.apply(this.events, [event].concat(args));
};
ChainGang.prototype.on = function(event, listener) {
return this.events.on(event, listener);
};
ChainGang.prototype.addListener = function(event, listener) {
return this.events.addListener(event, listener);
};
ChainGang.prototype.removeListener = function(event, listener) {
return this.events.removeListener(event, listener);
};
ChainGang.prototype.listeners = function(event) {
return this.events.listeners(event);
};
ChainGang.prototype.build_workers = function(num) {
var arr, i;
arr = [];
for (i = 0; i < num; i += 1) {
arr.push(new Worker(this));
}
return arr;
};
ChainGang.prototype.default_name_for = function(task) {
this.crypto = this.crypto || require('crypto');
return this.crypto.createHash('md5').update(task.toString()).digest('hex');
};
Worker = function(chain) {
var worker;
this.chain = chain;
this.performing = false;
worker = this;
return this;
};
Worker.prototype.perform = function() {
var data;
if (this.performing) {
return null;
}
data = this.chain.shift();
if (!data) {
return null;
}
this.performing = data.name;
this.chain.emit('starting', data.name);
try {
return data.callback(this);
} catch (err) {
this.chain.emit(("error-" + data.name), err);
this.chain.emit("error", err, data.name);
return this.finish(data.name, err);
}
};
Worker.prototype.finish = function(err) {
this.chain.finish(this.performing, err);
this.performing = false;
return this.perform();
};
exports.create = function(options) {
return new ChainGang(options);
};