diff --git a/lib/parallelize-worker.js b/lib/parallelize-worker.js index 9091df420..b0395cdae 100644 --- a/lib/parallelize-worker.js +++ b/lib/parallelize-worker.js @@ -17,16 +17,22 @@ const stream = worker(workerData.args, workerData.shard); const input = new Stream.Readable({ objectMode: true, + highWaterMark: 1000, - read() {} + read() { + parentPort.postMessage({ busy: false }); + } }); parentPort.on('message', (msg) => { + let ok; if (msg.data) - input.push(msg.data); + ok = input.push(msg.data); else if (msg.end) - input.push(null); + ok = input.push(null); else throw new Error('unrecognized message: ' + JSON.stringify(msg)); + if (!ok) + parentPort.postMessage({ busy: true }); }); const output = new Stream.Writable({ diff --git a/lib/parallelize.js b/lib/parallelize.js index ee08c7800..35f554248 100644 --- a/lib/parallelize.js +++ b/lib/parallelize.js @@ -9,6 +9,7 @@ // See COPYING for details "use strict"; +const assert = require('assert'); const Stream = require('stream'); let Worker; @@ -24,6 +25,8 @@ function singleparallelize(workerPath, args) { } module.exports = function parallelize(N, workerPath, args) { + if (N <= 0) + N = 1; if (N === 1 || Worker === null) { if (N !== 1) console.error('Worker thread support not available, falling back to single-threaded execution'); @@ -31,21 +34,36 @@ module.exports = function parallelize(N, workerPath, args) { } let workers = []; + let busy = []; for (let i = 0; i < N; i++) { + busy.push(false); workers.push(new Worker(require.resolve('./parallelize-worker'), { workerData: { args, workerPath, shard: i }, })); } + let buffer = null; - // round robin - let rr = 0; const stream = new Stream.Duplex({ objectMode: true, read() {}, write(data, encoding, callback) { - workers[rr].postMessage({ data, end: false }); - rr = (rr+1) % N; + assert(buffer === null); + + // find the first worker that is not busy + let idx = 0; + while (idx < N && busy[idx]) + idx++; + if (idx === N) { + // all workers are busy, save this data item + // without calling callback + buffer = { + data, callback + }; + return; + } + + workers[idx].postMessage({ data, end: false }); callback(); }, final(callback) { @@ -56,9 +74,17 @@ module.exports = function parallelize(N, workerPath, args) { }); let waitCount = N; - for (let worker of workers) { + for (let i = 0; i < workers.length; i++) { + let worker = workers[i]; worker.on('message', (msg) => { - if (msg.data) { + if (Object.prototype.hasOwnProperty.call(msg, 'busy')) { + busy[i] = msg.busy; + if (!msg.busy && buffer) { + worker.postMessage({ data: buffer.data, end: false }); + buffer.callback(); + buffer = null; + } + } if (msg.data) { stream.push(msg.data); } else if (msg.end) { waitCount--;