-
Notifications
You must be signed in to change notification settings - Fork 146
/
Pool.js
394 lines (353 loc) · 11.4 KB
/
Pool.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
var Promise = require('./Promise');
var WorkerHandler = require('./WorkerHandler');
var environment = require('./environment');
var DebugPortAllocator = require('./debug-port-allocator');
var DEBUG_PORT_ALLOCATOR = new DebugPortAllocator();
/**
* A pool to manage workers
* @param {String} [script] Optional worker script
* @param {Object} [options] Available options: maxWorkers: Number
* @constructor
*/
function Pool(script, options) {
if (typeof script === 'string') {
this.script = script || null;
}
else {
this.script = null;
options = script;
}
this.workers = []; // queue with all workers
this.tasks = []; // queue with tasks awaiting execution
options = options || {};
this.forkArgs = options.forkArgs || [];
this.forkOpts = options.forkOpts || {};
this.debugPortStart = (options.debugPortStart || 43210);
this.nodeWorker = options.nodeWorker;
// configuration
if (options && 'maxWorkers' in options) {
validateMaxWorkers(options.maxWorkers);
this.maxWorkers = options.maxWorkers;
}
else {
this.maxWorkers = Math.max((environment.cpus || 4) - 1, 1);
}
if (options && 'minWorkers' in options) {
if(options.minWorkers === 'max') {
this.minWorkers = Math.max((environment.cpus || 4) - 1, 1);
} else {
validateMinWorkers(options.minWorkers);
this.minWorkers = options.minWorkers;
this.maxWorkers = Math.max(this.minWorkers, this.maxWorkers); // in case minWorkers is higher than maxWorkers
}
this._ensureMinWorkers();
}
this._boundNext = this._next.bind(this);
if (this.nodeWorker === 'thread') {
WorkerHandler.ensureWorkerThreads();
}
}
/**
* Execute a function on a worker.
*
* Example usage:
*
* var pool = new Pool()
*
* // call a function available on the worker
* pool.exec('fibonacci', [6])
*
* // offload a function
* function add(a, b) {
* return a + b
* };
* pool.exec(add, [2, 4])
* .then(function (result) {
* console.log(result); // outputs 6
* })
* .catch(function(error) {
* console.log(error);
* });
*
* @param {String | Function} method Function name or function.
* If `method` is a string, the corresponding
* method on the worker will be executed
* If `method` is a Function, the function
* will be stringified and executed via the
* workers built-in function `run(fn, args)`.
* @param {Array} [params] Function arguments applied when calling the function
* @return {Promise.<*, Error>} result
*/
Pool.prototype.exec = function (method, params) {
// validate type of arguments
if (params && !Array.isArray(params)) {
throw new TypeError('Array expected as argument "params"');
}
if (typeof method === 'string') {
var resolver = Promise.defer();
// add a new task to the queue
var tasks = this.tasks;
var task = {
method: method,
params: params,
resolver: resolver,
timeout: null
};
tasks.push(task);
// replace the timeout method of the Promise with our own,
// which starts the timer as soon as the task is actually started
var originalTimeout = resolver.promise.timeout;
resolver.promise.timeout = function timeout (delay) {
if (tasks.indexOf(task) !== -1) {
// task is still queued -> start the timer later on
task.timeout = delay;
return resolver.promise;
}
else {
// task is already being executed -> start timer immediately
return originalTimeout.call(resolver.promise, delay);
}
};
// trigger task execution
this._next();
return resolver.promise;
}
else if (typeof method === 'function') {
// send stringified function and function arguments to worker
return this.exec('run', [String(method), params]);
}
else {
throw new TypeError('Function or string expected as argument "method"');
}
};
/**
* Create a proxy for current worker. Returns an object containing all
* methods available on the worker. The methods always return a promise.
*
* @return {Promise.<Object, Error>} proxy
*/
Pool.prototype.proxy = function () {
if (arguments.length > 0) {
throw new Error('No arguments expected');
}
var pool = this;
return this.exec('methods')
.then(function (methods) {
var proxy = {};
methods.forEach(function (method) {
proxy[method] = function () {
return pool.exec(method, Array.prototype.slice.call(arguments));
}
});
return proxy;
});
};
/**
* Creates new array with the results of calling a provided callback function
* on every element in this array.
* @param {Array} array
* @param {function} callback Function taking two arguments:
* `callback(currentValue, index)`
* @return {Promise.<Array>} Returns a promise which resolves with an Array
* containing the results of the callback function
* executed for each of the array elements.
*/
/* TODO: implement map
Pool.prototype.map = function (array, callback) {
};
*/
/**
* Grab the first task from the queue, find a free worker, and assign the
* worker to the task.
* @protected
*/
Pool.prototype._next = function () {
if (this.tasks.length > 0) {
// there are tasks in the queue
// find an available worker
var worker = this._getWorker();
if (worker) {
// get the first task from the queue
var me = this;
var task = this.tasks.shift();
// check if the task is still pending (and not cancelled -> promise rejected)
if (task.resolver.promise.pending) {
// send the request to the worker
var promise = worker.exec(task.method, task.params, task.resolver)
.then(me._boundNext)
.catch(function () {
// if the worker crashed and terminated, remove it from the pool
if (worker.terminated) {
me._removeWorker(worker);
// If minWorkers set, spin up new workers to replace the crashed ones
me._ensureMinWorkers();
}
me._next(); // trigger next task in the queue
});
// start queued timer now
if (typeof task.timeout === 'number') {
promise.timeout(task.timeout);
}
} else {
// The task taken was already complete (either rejected or resolved), so just trigger next task in the queue
me._next();
}
}
}
};
/**
* Get an available worker. If no worker is available and the maximum number
* of workers isn't yet reached, a new worker will be created and returned.
* If no worker is available and the maximum number of workers is reached,
* null will be returned.
*
* @return {WorkerHandler | null} worker
* @private
*/
Pool.prototype._getWorker = function() {
// find a non-busy worker
var workers = this.workers;
for (var i = 0; i < workers.length; i++) {
var worker = workers[i];
if (worker.busy() === false) {
return worker;
}
}
if (workers.length < this.maxWorkers) {
// create a new worker
worker = new WorkerHandler(this.script, {
forkArgs: this.forkArgs,
forkOpts: this.forkOpts,
debugPort: DEBUG_PORT_ALLOCATOR.nextAvailableStartingAt(this.debugPortStart),
nodeWorker: this.nodeWorker
});
workers.push(worker);
return worker;
}
return null;
};
/**
* Remove a worker from the pool. For example after a worker terminated for
* whatever reason
* @param {WorkerHandler} worker
* @protected
*/
Pool.prototype._removeWorker = function(worker) {
DEBUG_PORT_ALLOCATOR.releasePort(worker.debugPort)
// terminate the worker (if not already terminated)
worker.terminate();
this._removeWorkerFromList(worker);
};
/**
* Remove a worker from the pool list.
* @param {WorkerHandler} worker
* @protected
*/
Pool.prototype._removeWorkerFromList = function(worker) {
// remove from the list with workers
var index = this.workers.indexOf(worker);
if (index != -1) {
this.workers.splice(index, 1);
}
};
/**
* Close all active workers. Tasks currently being executed will be finished first.
* @param {boolean} [force=false] If false (default), the workers are terminated
* after finishing all tasks currently in
* progress. If true, the workers will be
* terminated immediately.
* @param {number} [timeout] If provided and non-zero, worker termination promise will be rejected
* after timeout if worker process has not been terminated.
* @return {Promise.<void, Error>}
*/
Pool.prototype.terminate = function (force, timeout) {
var f = function (worker) {
this._removeWorkerFromList(worker);
};
var removeWorker = f.bind(this);
var promises = [];
var workers = this.workers.slice();
workers.forEach(function (worker) {
var termPromise = worker.terminateAndNotify(force, timeout)
.then(removeWorker);
promises.push(termPromise);
});
return Promise.all(promises);
};
// DEPRECATED
/**
* Close all active workers. Unlike terminate, this function does not return a promise.
* @param force
*/
Pool.prototype.clear = function (force) {
this.terminate(force);
};
/**
* Retrieve statistics on tasks and workers.
* @return {{totalWorkers: number, busyWorkers: number, idleWorkers: number, pendingTasks: number, activeTasks: number}} Returns an object with statistics
*/
Pool.prototype.stats = function () {
var totalWorkers = this.workers.length;
var busyWorkers = this.workers.filter(function (worker) {
return worker.busy();
}).length;
return {
totalWorkers: totalWorkers,
busyWorkers: busyWorkers,
idleWorkers: totalWorkers - busyWorkers,
pendingTasks: this.tasks.length,
activeTasks: busyWorkers
};
};
/**
* Ensures that a minimum of minWorkers is up and running
* @protected
*/
Pool.prototype._ensureMinWorkers = function() {
if (this.minWorkers) {
for(var i = this.workers.length; i < this.minWorkers; i++) {
this.workers.push(new WorkerHandler(this.script, {
forkArgs: this.forkArgs,
forkOpts: this.forkOpts,
debugPort: DEBUG_PORT_ALLOCATOR.nextAvailableStartingAt(this.debugPortStart)
}));
}
}
};
/**
* Ensure that the maxWorkers option is an integer >= 1
* @param {*} maxWorkers
* @returns {boolean} returns true maxWorkers has a valid value
*/
function validateMaxWorkers(maxWorkers) {
if (!isNumber(maxWorkers) || !isInteger(maxWorkers) || maxWorkers < 1) {
throw new TypeError('Option maxWorkers must be an integer number >= 1');
}
}
/**
* Ensure that the minWorkers option is an integer >= 0
* @param {*} minWorkers
* @returns {boolean} returns true when minWorkers has a valid value
*/
function validateMinWorkers(minWorkers) {
if (!isNumber(minWorkers) || !isInteger(minWorkers) || minWorkers < 0) {
throw new TypeError('Option minWorkers must be an integer number >= 0');
}
}
/**
* Test whether a variable is a number
* @param {*} value
* @returns {boolean} returns true when value is a number
*/
function isNumber(value) {
return typeof value === 'number';
}
/**
* Test whether a number is an integer
* @param {number} value
* @returns {boolean} Returns true if value is an integer
*/
function isInteger(value) {
return Math.round(value) == value;
}
module.exports = Pool;