Skip to content

Commit

Permalink
worker: enable stdio
Browse files Browse the repository at this point in the history
Provide `stdin`, `stdout` and `stderr` options for the `Worker`
constructor, and make these available to the worker thread
under their usual names.

The default for `stdin` is an empty stream, the default for
`stdout` and `stderr` is redirecting to the parent thread’s
corresponding stdio streams.

PR-URL: #20876
Reviewed-By: Gireesh Punathil <gpunathi@in.ibm.com>
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
Reviewed-By: Shingo Inoue <leko.noor@gmail.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Tiancheng "Timothy" Gu <timothygu99@gmail.com>
Reviewed-By: John-David Dalton <john.david.dalton@gmail.com>
Reviewed-By: Gus Caplan <me@gus.host>
  • Loading branch information
addaleax authored and targos committed Jun 13, 2018
1 parent c97fb91 commit 6b1a887
Show file tree
Hide file tree
Showing 4 changed files with 256 additions and 11 deletions.
44 changes: 43 additions & 1 deletion doc/api/worker.md
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ Most Node.js APIs are available inside of it.
Notable differences inside a Worker environment are:

- The [`process.stdin`][], [`process.stdout`][] and [`process.stderr`][]
properties are set to `null`.
may be redirected by the parent thread.
- The [`require('worker').isMainThread`][] property is set to `false`.
- The [`require('worker').parentPort`][] message port is available,
- [`process.exit()`][] does not stop the whole program, just the single thread,
Expand Down Expand Up @@ -313,6 +313,13 @@ if (isMainThread) {
described in the [HTML structured clone algorithm][], and an error will be
thrown if the object cannot be cloned (e.g. because it contains
`function`s).
* stdin {boolean} If this is set to `true`, then `worker.stdin` will
provide a writable stream whose contents will appear as `process.stdin`
inside the Worker. By default, no data is provided.
* stdout {boolean} If this is set to `true`, then `worker.stdout` will
not automatically be piped through to `process.stdout` in the parent.
* stderr {boolean} If this is set to `true`, then `worker.stderr` will
not automatically be piped through to `process.stderr` in the parent.

### Event: 'error'
<!-- YAML
Expand Down Expand Up @@ -377,6 +384,41 @@ Opposite of `unref()`, calling `ref()` on a previously `unref()`ed worker will
behavior). If the worker is `ref()`ed, calling `ref()` again will have
no effect.

### worker.stderr
<!-- YAML
added: REPLACEME
-->

* {stream.Readable}

This is a readable stream which contains data written to [`process.stderr`][]
inside the worker thread. If `stderr: true` was not passed to the
[`Worker`][] constructor, then data will be piped to the parent thread's
[`process.stderr`][] stream.

### worker.stdin
<!-- YAML
added: REPLACEME
-->

* {null|stream.Writable}

If `stdin: true` was passed to the [`Worker`][] constructor, this is a
writable stream. The data written to this stream will be made available in
the worker thread as [`process.stdin`][].

### worker.stdout
<!-- YAML
added: REPLACEME
-->

* {stream.Readable}

This is a readable stream which contains data written to [`process.stdout`][]
inside the worker thread. If `stdout: true` was not passed to the
[`Worker`][] constructor, then data will be piped to the parent thread's
[`process.stdout`][] stream.

### worker.terminate([callback])
<!-- YAML
added: REPLACEME
Expand Down
14 changes: 7 additions & 7 deletions lib/internal/process/stdio.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ const {
ERR_UNKNOWN_STDIN_TYPE,
ERR_UNKNOWN_STREAM_TYPE
} = require('internal/errors').codes;
const { isMainThread } = require('internal/worker');
const {
isMainThread,
workerStdio
} = require('internal/worker');

exports.setup = setupStdio;

Expand All @@ -17,8 +20,7 @@ function setupStdio() {

function getStdout() {
if (stdout) return stdout;
if (!isMainThread)
return new (require('stream').Writable)({ write(b, e, cb) { cb(); } });
if (!isMainThread) return workerStdio.stdout;
stdout = createWritableStdioStream(1);
stdout.destroySoon = stdout.destroy;
stdout._destroy = function(er, cb) {
Expand All @@ -34,8 +36,7 @@ function setupStdio() {

function getStderr() {
if (stderr) return stderr;
if (!isMainThread)
return new (require('stream').Writable)({ write(b, e, cb) { cb(); } });
if (!isMainThread) return workerStdio.stderr;
stderr = createWritableStdioStream(2);
stderr.destroySoon = stderr.destroy;
stderr._destroy = function(er, cb) {
Expand All @@ -51,8 +52,7 @@ function setupStdio() {

function getStdin() {
if (stdin) return stdin;
if (!isMainThread)
return new (require('stream').Readable)({ read() { this.push(null); } });
if (!isMainThread) return workerStdio.stdin;

const tty_wrap = process.binding('tty_wrap');
const fd = 0;
Expand Down
166 changes: 163 additions & 3 deletions lib/internal/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ const EventEmitter = require('events');
const assert = require('assert');
const path = require('path');
const util = require('util');
const { Readable, Writable } = require('stream');
const {
ERR_INVALID_ARG_TYPE,
ERR_WORKER_NEED_ABSOLUTE_PATH,
Expand All @@ -29,13 +30,20 @@ const isMainThread = threadId === 0;

const kOnMessageListener = Symbol('kOnMessageListener');
const kHandle = Symbol('kHandle');
const kName = Symbol('kName');
const kPort = Symbol('kPort');
const kPublicPort = Symbol('kPublicPort');
const kDispose = Symbol('kDispose');
const kOnExit = Symbol('kOnExit');
const kOnMessage = Symbol('kOnMessage');
const kOnCouldNotSerializeErr = Symbol('kOnCouldNotSerializeErr');
const kOnErrorMessage = Symbol('kOnErrorMessage');
const kParentSideStdio = Symbol('kParentSideStdio');
const kWritableCallbacks = Symbol('kWritableCallbacks');
const kStdioWantsMoreDataCallback = Symbol('kStdioWantsMoreDataCallback');
const kStartedReading = Symbol('kStartedReading');
const kWaitingStreams = Symbol('kWaitingStreams');
const kIncrementsPortRef = Symbol('kIncrementsPortRef');

const debug = util.debuglog('worker');

Expand Down Expand Up @@ -129,6 +137,72 @@ function setupPortReferencing(port, eventEmitter, eventName) {
}


class ReadableWorkerStdio extends Readable {
constructor(port, name) {
super();
this[kPort] = port;
this[kName] = name;
this[kIncrementsPortRef] = true;
this[kStartedReading] = false;
this.on('end', () => {
if (this[kIncrementsPortRef] && --this[kPort][kWaitingStreams] === 0)
this[kPort].unref();
});
}

_read() {
if (!this[kStartedReading] && this[kIncrementsPortRef]) {
this[kStartedReading] = true;
if (this[kPort][kWaitingStreams]++ === 0)
this[kPort].ref();
}

this[kPort].postMessage({
type: 'stdioWantsMoreData',
stream: this[kName]
});
}
}

class WritableWorkerStdio extends Writable {
constructor(port, name) {
super({ decodeStrings: false });
this[kPort] = port;
this[kName] = name;
this[kWritableCallbacks] = [];
}

_write(chunk, encoding, cb) {
this[kPort].postMessage({
type: 'stdioPayload',
stream: this[kName],
chunk,
encoding
});
this[kWritableCallbacks].push(cb);
if (this[kPort][kWaitingStreams]++ === 0)
this[kPort].ref();
}

_final(cb) {
this[kPort].postMessage({
type: 'stdioPayload',
stream: this[kName],
chunk: null
});
cb();
}

[kStdioWantsMoreDataCallback]() {
const cbs = this[kWritableCallbacks];
this[kWritableCallbacks] = [];
for (const cb of cbs)
cb();
if ((this[kPort][kWaitingStreams] -= cbs.length) === 0)
this[kPort].unref();
}
}

class Worker extends EventEmitter {
constructor(filename, options = {}) {
super();
Expand All @@ -154,8 +228,25 @@ class Worker extends EventEmitter {
this[kPort].on('message', (data) => this[kOnMessage](data));
this[kPort].start();
this[kPort].unref();
this[kPort][kWaitingStreams] = 0;
debug(`[${threadId}] created Worker with ID ${this.threadId}`);

let stdin = null;
if (options.stdin)
stdin = new WritableWorkerStdio(this[kPort], 'stdin');
const stdout = new ReadableWorkerStdio(this[kPort], 'stdout');
if (!options.stdout) {
stdout[kIncrementsPortRef] = false;
pipeWithoutWarning(stdout, process.stdout);
}
const stderr = new ReadableWorkerStdio(this[kPort], 'stderr');
if (!options.stderr) {
stderr[kIncrementsPortRef] = false;
pipeWithoutWarning(stderr, process.stderr);
}

this[kParentSideStdio] = { stdin, stdout, stderr };

const { port1, port2 } = new MessageChannel();
this[kPublicPort] = port1;
this[kPublicPort].on('message', (message) => this.emit('message', message));
Expand All @@ -165,7 +256,8 @@ class Worker extends EventEmitter {
filename,
doEval: !!options.eval,
workerData: options.workerData,
publicPort: port2
publicPort: port2,
hasStdin: !!options.stdin
}, [port2]);
// Actually start the new thread now that everything is in place.
this[kHandle].startThread();
Expand Down Expand Up @@ -197,6 +289,16 @@ class Worker extends EventEmitter {
return this[kOnCouldNotSerializeErr]();
case 'errorMessage':
return this[kOnErrorMessage](message.error);
case 'stdioPayload':
{
const { stream, chunk, encoding } = message;
return this[kParentSideStdio][stream].push(chunk, encoding);
}
case 'stdioWantsMoreData':
{
const { stream } = message;
return this[kParentSideStdio][stream][kStdioWantsMoreDataCallback]();
}
}

assert.fail(`Unknown worker message type ${message.type}`);
Expand All @@ -207,6 +309,18 @@ class Worker extends EventEmitter {
this[kHandle] = null;
this[kPort] = null;
this[kPublicPort] = null;

const { stdout, stderr } = this[kParentSideStdio];
this[kParentSideStdio] = null;

if (!stdout._readableState.ended) {
debug(`[${threadId}] explicitly closes stdout for ${this.threadId}`);
stdout.push(null);
}
if (!stderr._readableState.ended) {
debug(`[${threadId}] explicitly closes stderr for ${this.threadId}`);
stderr.push(null);
}
}

postMessage(...args) {
Expand Down Expand Up @@ -243,6 +357,27 @@ class Worker extends EventEmitter {

return this[kHandle].threadId;
}

get stdin() {
return this[kParentSideStdio].stdin;
}

get stdout() {
return this[kParentSideStdio].stdout;
}

get stderr() {
return this[kParentSideStdio].stderr;
}
}

const workerStdio = {};
if (!isMainThread) {
const port = getEnvMessagePort();
port[kWaitingStreams] = 0;
workerStdio.stdin = new ReadableWorkerStdio(port, 'stdin');
workerStdio.stdout = new WritableWorkerStdio(port, 'stdout');
workerStdio.stderr = new WritableWorkerStdio(port, 'stderr');
}

let originalFatalException;
Expand All @@ -256,10 +391,14 @@ function setupChild(evalScript) {

port.on('message', (message) => {
if (message.type === 'loadScript') {
const { filename, doEval, workerData, publicPort } = message;
const { filename, doEval, workerData, publicPort, hasStdin } = message;
publicWorker.parentPort = publicPort;
setupPortReferencing(publicPort, publicPort, 'message');
publicWorker.workerData = workerData;

if (!hasStdin)
workerStdio.stdin.push(null);

debug(`[${threadId}] starts worker script ${filename} ` +
`(eval = ${eval}) at cwd = ${process.cwd()}`);
port.unref();
Expand All @@ -271,6 +410,14 @@ function setupChild(evalScript) {
require('module').runMain();
}
return;
} else if (message.type === 'stdioPayload') {
const { stream, chunk, encoding } = message;
workerStdio[stream].push(chunk, encoding);
return;
} else if (message.type === 'stdioWantsMoreData') {
const { stream } = message;
workerStdio[stream][kStdioWantsMoreDataCallback]();
return;
}

assert.fail(`Unknown worker message type ${message.type}`);
Expand Down Expand Up @@ -317,11 +464,24 @@ function deserializeError(error) {
error.byteLength).toString('utf8');
}

function pipeWithoutWarning(source, dest) {
const sourceMaxListeners = source._maxListeners;
const destMaxListeners = dest._maxListeners;
source.setMaxListeners(Infinity);
dest.setMaxListeners(Infinity);

source.pipe(dest);

source._maxListeners = sourceMaxListeners;
dest._maxListeners = destMaxListeners;
}

module.exports = {
MessagePort,
MessageChannel,
threadId,
Worker,
setupChild,
isMainThread
isMainThread,
workerStdio
};
Loading

0 comments on commit 6b1a887

Please sign in to comment.