Skip to content

Commit

Permalink
Merge ef9b829 into 8e559ae
Browse files Browse the repository at this point in the history
  • Loading branch information
gcampax committed Sep 6, 2019
2 parents 8e559ae + ef9b829 commit 7c4fe55
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 9 deletions.
12 changes: 9 additions & 3 deletions lib/parallelize-worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,22 @@ const stream = worker(workerData.args, workerData.shard);

const input = new Stream.Readable({
objectMode: true,
highWaterMark: 1000,

read() {}
read() {
parentPort.postMessage({ busy: false });
}
});
parentPort.on('message', (msg) => {
let ok;
if (msg.data)
input.push(msg.data);
ok = input.push(msg.data);
else if (msg.end)
input.push(null);
ok = input.push(null);
else
throw new Error('unrecognized message: ' + JSON.stringify(msg));
if (!ok)
parentPort.postMessage({ busy: true });
});

const output = new Stream.Writable({
Expand Down
38 changes: 32 additions & 6 deletions lib/parallelize.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
// See COPYING for details
"use strict";

const assert = require('assert');
const Stream = require('stream');

let Worker;
Expand All @@ -24,28 +25,45 @@ function singleparallelize(workerPath, args) {
}

module.exports = function parallelize(N, workerPath, args) {
if (N <= 0)
N = 1;
if (N === 1 || Worker === null) {
if (N !== 1)
console.error('Worker thread support not available, falling back to single-threaded execution');
return singleparallelize(workerPath, args);
}

let workers = [];
let busy = [];
for (let i = 0; i < N; i++) {
busy.push(false);
workers.push(new Worker(require.resolve('./parallelize-worker'), {
workerData: { args, workerPath, shard: i },
}));
}
let buffer = null;

// round robin
let rr = 0;
const stream = new Stream.Duplex({
objectMode: true,

read() {},
write(data, encoding, callback) {
workers[rr].postMessage({ data, end: false });
rr = (rr+1) % N;
assert(buffer === null);

// find the first worker that is not busy
let idx = 0;
while (idx < N && busy[idx])
idx++;
if (idx === N) {
// all workers are busy, save this data item
// without calling callback
buffer = {
data, callback
};
return;
}

workers[idx].postMessage({ data, end: false });
callback();
},
final(callback) {
Expand All @@ -56,9 +74,17 @@ module.exports = function parallelize(N, workerPath, args) {
});

let waitCount = N;
for (let worker of workers) {
for (let i = 0; i < workers.length; i++) {
let worker = workers[i];
worker.on('message', (msg) => {
if (msg.data) {
if (Object.prototype.hasOwnProperty.call(msg, 'busy')) {
busy[i] = msg.busy;
if (!msg.busy && buffer) {
worker.postMessage({ data: buffer.data, end: false });
buffer.callback();
buffer = null;
}
} if (msg.data) {
stream.push(msg.data);
} else if (msg.end) {
waitCount--;
Expand Down

0 comments on commit 7c4fe55

Please sign in to comment.