Skip to content
Permalink
Browse files

process: split worker IO into internal/worker/io.js

- Move `setupProcessStdio` which contains write access to
  the process object into `bootstrap/node.js`
- Move `MessagePort`, `MessageChannel`, `ReadableWorkerStdio`,
  and `WritableWorkerStdio` into `internal/worker/io.js`
- Move more worker-specific bootstrap code into
  `internal/process/worker_thread_only` from `setupChild`
  in `internal/worker.js`, and move the `process._fatalException`
  overwrite into `bootstrap/node.js` for clarity.

PR-URL: #25199
Reviewed-By: James M Snell <jasnell@gmail.com>
  • Loading branch information...
joyeecheung authored and BridgeAR committed Dec 23, 2018
1 parent 456b1b5 commit 9480e1b795554d1788c17cfede5aed59cae8b698
@@ -140,9 +140,13 @@ function startup() {
}

if (isMainThread) {
mainThreadSetup.setupStdio();
const { getStdout, getStdin, getStderr } =
NativeModule.require('internal/process/stdio').getMainThreadStdio();
setupProcessStdio(getStdout, getStdin, getStderr);
} else {
workerThreadSetup.setupStdio();
const { getStdout, getStdin, getStderr } =
workerThreadSetup.initializeWorkerStdio();
setupProcessStdio(getStdout, getStdin, getStderr);
}

if (global.__coverage__)
@@ -318,8 +322,14 @@ function startup() {
function startExecution() {
// This means we are in a Worker context, and any script execution
// will be directed by the worker module.
if (internalBinding('worker').getEnvMessagePort() !== undefined) {
NativeModule.require('internal/worker').setupChild();
if (!isMainThread) {
const workerThreadSetup = NativeModule.require(
'internal/process/worker_thread_only'
);
// Set up the message port and start listening
const { workerFatalExeception } = workerThreadSetup.setup();
// Overwrite fatalException
process._fatalException = workerFatalExeception;
return;
}

@@ -511,6 +521,31 @@ function setupProcessObject() {
EventEmitter.call(process);
}

function setupProcessStdio(getStdout, getStdin, getStderr) {
Object.defineProperty(process, 'stdout', {
configurable: true,
enumerable: true,
get: getStdout
});

Object.defineProperty(process, 'stderr', {
configurable: true,
enumerable: true,
get: getStderr
});

Object.defineProperty(process, 'stdin', {
configurable: true,
enumerable: true,
get: getStdin
});

process.openStdin = function() {
process.stdin.resume();
return process.stdin;
};
}

function setupGlobalVariables() {
Object.defineProperty(global, Symbol.toStringTag, {
value: 'global',
@@ -16,15 +16,6 @@ const {
validateString
} = require('internal/validators');

const {
setupProcessStdio,
getMainThreadStdio
} = require('internal/process/stdio');

function setupStdio() {
setupProcessStdio(getMainThreadStdio());
}

// The execution of this function itself should not cause any side effects.
function wrapProcessMethods(binding) {
function chdir(directory) {
@@ -174,7 +165,6 @@ function setupChildProcessIpcChannel() {
}

module.exports = {
setupStdio,
wrapProcessMethods,
setupSignalHandlers,
setupChildProcessIpcChannel,
@@ -1,6 +1,5 @@
'use strict';

exports.setupProcessStdio = setupProcessStdio;
exports.getMainThreadStdio = getMainThreadStdio;

function dummyDestroy(err, cb) { cb(err); }
@@ -134,31 +133,6 @@ function getMainThreadStdio() {
};
}

function setupProcessStdio({ getStdout, getStdin, getStderr }) {
Object.defineProperty(process, 'stdout', {
configurable: true,
enumerable: true,
get: getStdout
});

Object.defineProperty(process, 'stderr', {
configurable: true,
enumerable: true,
get: getStderr
});

Object.defineProperty(process, 'stdin', {
configurable: true,
enumerable: true,
get: getStdin
});

process.openStdin = function() {
process.stdin.resume();
return process.stdin;
};
}

function createWritableStdioStream(fd) {
var stream;
const tty_wrap = internalBinding('tty_wrap');
@@ -2,23 +2,54 @@

// This file contains process bootstrappers that can only be
// run in the worker thread.
const {
getEnvMessagePort,
threadId
} = internalBinding('worker');

const debug = require('util').debuglog('worker');

const {
setupProcessStdio
} = require('internal/process/stdio');
kWaitingStreams,
ReadableWorkerStdio,
WritableWorkerStdio
} = require('internal/worker/io');

const {
workerStdio
createMessageHandler,
createWorkerFatalExeception
} = require('internal/worker');

function setupStdio() {
setupProcessStdio({
getStdout: () => workerStdio.stdout,
getStderr: () => workerStdio.stderr,
getStdin: () => workerStdio.stdin
});
const workerStdio = {};

function initializeWorkerStdio() {
const port = getEnvMessagePort();
port[kWaitingStreams] = 0;
workerStdio.stdin = new ReadableWorkerStdio(port, 'stdin');
workerStdio.stdout = new WritableWorkerStdio(port, 'stdout');
workerStdio.stderr = new WritableWorkerStdio(port, 'stderr');

return {
getStdout() { return workerStdio.stdout; },
getStderr() { return workerStdio.stderr; },
getStdin() { return workerStdio.stdin; }
};
}

function setup() {
debug(`[${threadId}] is setting up worker child environment`);

const port = getEnvMessagePort();
const publicWorker = require('worker_threads');
port.on('message', createMessageHandler(publicWorker, port, workerStdio));
port.start();

return {
workerFatalExeception: createWorkerFatalExeception(port)
};
}

module.exports = {
setupStdio
initializeWorkerStdio,
setup
};

0 comments on commit 9480e1b

Please sign in to comment.
You can’t perform that action at this time.