Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

worker: reduce MessagePort prototype to documented API #23037

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 30 additions & 16 deletions lib/internal/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,14 @@ const {

const { internalBinding } = require('internal/bootstrap/loaders');
const { MessagePort, MessageChannel } = internalBinding('messaging');
const { handle_onclose } = internalBinding('symbols');
const {
handle_onclose: handleOnCloseSymbol,
oninit: onInitSymbol
} = internalBinding('symbols');
const { clearAsyncIdStack } = require('internal/async_hooks');
const { serializeError, deserializeError } = require('internal/error-serdes');
const { pathToFileURL } = require('url');

util.inherits(MessagePort, EventEmitter);

const {
Worker: WorkerImpl,
getEnvMessagePort,
Expand Down Expand Up @@ -57,6 +58,23 @@ const messageTypes = {
LOAD_SCRIPT: 'loadScript'
};

// We have to mess with the MessagePort prototype a bit, so that a) we can make
// it inherit from EventEmitter, even though it is a C++ class, and b) we do
// not provide methods that are not present in the Browser and not documented
// on our side (e.g. hasRef).
// Save a copy of the original set of methods as a shallow clone.
const MessagePortPrototype = Object.create(
Object.getPrototypeOf(MessagePort.prototype),
Object.getOwnPropertyDescriptors(MessagePort.prototype));
// Set up the new inheritance chain.
Object.setPrototypeOf(MessagePort, EventEmitter);
Object.setPrototypeOf(MessagePort.prototype, EventEmitter.prototype);
// Finally, purge methods we don't want to be public.
delete MessagePort.prototype.stop;
delete MessagePort.prototype.drain;
delete MessagePort.prototype.hasRef;
delete MessagePort.prototype.getAsyncId;

// A communication channel consisting of a handle (that wraps around an
// uv_async_t) which can receive information from other threads and emits
// .onmessage events, and a function used for sending data to a MessagePort
Expand All @@ -80,10 +98,10 @@ Object.defineProperty(MessagePort.prototype, 'onmessage', {
this[kOnMessageListener] = value;
if (typeof value === 'function') {
this.ref();
this.start();
MessagePortPrototype.start.call(this);
} else {
this.unref();
this.stop();
MessagePortPrototype.stop.call(this);
}
}
});
Expand All @@ -93,7 +111,7 @@ function oninit() {
setupPortReferencing(this, this, 'message');
}

Object.defineProperty(MessagePort.prototype, 'oninit', {
Object.defineProperty(MessagePort.prototype, onInitSymbol, {
enumerable: true,
writable: false,
value: oninit
Expand All @@ -110,22 +128,18 @@ function onclose() {
this.emit('close');
}

Object.defineProperty(MessagePort.prototype, handle_onclose, {
Object.defineProperty(MessagePort.prototype, handleOnCloseSymbol, {
enumerable: false,
writable: false,
value: onclose
});

const originalClose = MessagePort.prototype.close;
MessagePort.prototype.close = function(cb) {
if (typeof cb === 'function')
this.once('close', cb);
originalClose.call(this);
MessagePortPrototype.close.call(this);
};

const drainMessagePort = MessagePort.prototype.drain;
delete MessagePort.prototype.drain;

Object.defineProperty(MessagePort.prototype, util.inspect.custom, {
enumerable: false,
writable: false,
Expand All @@ -134,7 +148,7 @@ Object.defineProperty(MessagePort.prototype, util.inspect.custom, {
try {
// This may throw when `this` does not refer to a native object,
// e.g. when accessing the prototype directly.
ref = this.hasRef();
ref = MessagePortPrototype.hasRef.call(this);
} catch { return this; }
return Object.assign(Object.create(MessagePort.prototype),
ref === undefined ? {
Expand All @@ -156,12 +170,12 @@ function setupPortReferencing(port, eventEmitter, eventName) {
eventEmitter.on('newListener', (name) => {
if (name === eventName && eventEmitter.listenerCount(eventName) === 0) {
port.ref();
port.start();
MessagePortPrototype.start.call(port);
}
});
eventEmitter.on('removeListener', (name) => {
if (name === eventName && eventEmitter.listenerCount(eventName) === 0) {
port.stop();
MessagePortPrototype.stop.call(port);
port.unref();
}
});
Expand Down Expand Up @@ -303,7 +317,7 @@ class Worker extends EventEmitter {

[kOnExit](code) {
debug(`[${threadId}] hears end event for Worker ${this.threadId}`);
drainMessagePort.call(this[kPublicPort]);
MessagePortPrototype.drain.call(this[kPublicPort]);
this[kDispose]();
this.emit('exit', code);
this.removeAllListeners();
Expand Down
2 changes: 1 addition & 1 deletion src/env.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ struct PackageConfig {
#define PER_ISOLATE_SYMBOL_PROPERTIES(V) \
V(handle_onclose_symbol, "handle_onclose") \
V(owner_symbol, "owner") \
V(oninit_symbol, "oninit") \

// Strings are per-isolate primitives but Environment proxies them
// for the sake of convenience. Strings should be ASCII-only.
Expand Down Expand Up @@ -219,7 +220,6 @@ struct PackageConfig {
V(onhandshakedone_string, "onhandshakedone") \
V(onhandshakestart_string, "onhandshakestart") \
V(onheaders_string, "onheaders") \
V(oninit_string, "oninit") \
V(onmessage_string, "onmessage") \
V(onnewsession_string, "onnewsession") \
V(onocspresponse_string, "onocspresponse") \
Expand Down
2 changes: 1 addition & 1 deletion src/node_messaging.cc
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ MessagePort::MessagePort(Environment* env,
async()->data = static_cast<void*>(this);

Local<Value> fn;
if (!wrap->Get(context, env->oninit_string()).ToLocal(&fn))
if (!wrap->Get(context, env->oninit_symbol()).ToLocal(&fn))
return;

if (fn->IsFunction()) {
Expand Down
3 changes: 1 addition & 2 deletions test/parallel/test-heapdump-worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ validateSnapshotNodes('Worker', [
validateSnapshotNodes('MessagePort', [
{
children: [
{ name: 'MessagePortData' },
{ name: 'MessagePort' }
{ name: 'MessagePortData' }
]
}
], { loose: true });
Expand Down
9 changes: 9 additions & 0 deletions test/parallel/test-worker-message-port.js
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,12 @@ const { MessageChannel, MessagePort } = require('worker_threads');
});
});
}

{
assert.deepStrictEqual(
Object.getOwnPropertyNames(MessagePort.prototype).sort(),
[
'close', 'constructor', 'onmessage', 'postMessage', 'ref', 'start',
'unref'
]);
}