Skip to content

Commit

Permalink
worker: add connect and setConnectionsListener
Browse files Browse the repository at this point in the history
  • Loading branch information
ShogunPanda committed Jun 22, 2024
1 parent 6cb940a commit 728f5e7
Show file tree
Hide file tree
Showing 15 changed files with 667 additions and 12 deletions.
31 changes: 31 additions & 0 deletions doc/api/errors.md
Original file line number Diff line number Diff line change
Expand Up @@ -3072,6 +3072,16 @@ added: v18.1.0
The `Response` that has been passed to `WebAssembly.compileStreaming` or to
`WebAssembly.instantiateStreaming` is not a valid WebAssembly response.

<a id="ERR_WORKER_CONNECTION_REFUSED"></a>

### `ERR_WORKER_CONNECTION_REFUSED`

<!-- YAML
added: REPLACEME
-->

The thread requested in [`connect()`][] refused the connection or has no connections listener provided.

<a id="ERR_WORKER_INIT_FAILED"></a>

### `ERR_WORKER_INIT_FAILED`
Expand All @@ -3085,6 +3095,16 @@ The `Worker` initialization failed.
The `execArgv` option passed to the `Worker` constructor contains
invalid flags.

<a id="ERR_WORKER_INVALID_ID"></a>

### `ERR_WORKER_INVALID_ID`

<!-- YAML
added: REPLACEME
-->

The thread id requested in [`connect()`][] is invalid.

<a id="ERR_WORKER_NOT_RUNNING"></a>

### `ERR_WORKER_NOT_RUNNING`
Expand All @@ -3104,6 +3124,16 @@ The `Worker` instance terminated because it reached its memory limit.
The path for the main script of a worker is neither an absolute path
nor a relative path starting with `./` or `../`.

<a id="ERR_WORKER_SAME_THREAD"></a>

### `ERR_WORKER_SAME_THREAD`

<!-- YAML
added: REPLACEME
-->

The thread id requested in [`connect()`][] is the current thread id.

<a id="ERR_WORKER_UNSERIALIZABLE_ERROR"></a>

### `ERR_WORKER_UNSERIALIZABLE_ERROR`
Expand Down Expand Up @@ -3999,6 +4029,7 @@ An error occurred trying to allocate memory. This should never happen.
[`Writable`]: stream.md#class-streamwritable
[`child_process`]: child_process.md
[`cipher.getAuthTag()`]: crypto.md#ciphergetauthtag
[`connect()`]: worker_threads.md#workerconnecttarget-data-timeout
[`crypto.getDiffieHellman()`]: crypto.md#cryptogetdiffiehellmangroupname
[`crypto.scrypt()`]: crypto.md#cryptoscryptpassword-salt-keylen-options-callback
[`crypto.scryptSync()`]: crypto.md#cryptoscryptsyncpassword-salt-keylen-options
Expand Down
150 changes: 150 additions & 0 deletions doc/api/worker_threads.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,128 @@ Worker threads inherit non-process-specific options by default. Refer to
[`Worker constructor options`][] to know how to customize worker thread options,
specifically `argv` and `execArgv` options.

## `worker.connect(target[, data][, timeout])`

<!-- YAML
added: REPLACEME
-->

> Stability: 1.1 - Active development
* `target` {number} The target thread id.
* `data` {any} Any arbitrary, cloneable JavaScript value.
The data will be passed as the second argument to the callback provided in the
target thread via [`worker.setConnectionsListener()`][].
* `timeout` {number} Time to wait for the communication port to the target thread,
in milliseconds. By default it's `undefined`, which means wait forever.
* Returns: {Promise} A promise for a `MessagePort`.

Establishes a connection to another worker thread in the same process, returning a
`MessagePort` that can be used for the communication.

The target thread must have a connection listener setup via [`worker.setConnectionsListener()`][]
otherwise the connection request will fail.

This method does not support transferables in the `data` argument. If there is need to
transfer objects between threads it can be accomplished via the returned port once the
connection has been established.

This method should be used when the target thread is not the direct
parent or child of the current thread.
If the two threads are parent-children, use the [`require('node:worker_threads').parentPort.postMessage()`][]
and the [`worker.postMessage()`][] to let the threads communicate.

If the messages never need transferable, use `BroadcastChannel` for the communication. Remember that
they are one to many channels so if you need to narrow it down to a one to one channel you will have to
ensure only two threads subscribe to a certain channel, for instance by using unique channel ids.

The example below shows the combined use of `connect` and [`worker.setConnectionsListener()`][]:
it creates 10 nested threads, the last one will try to communicate with the third one.
Since `MessagePort`s cannot be transferred in `BroadcastChannel`, the `connect` API was the only way
to let the two threads talk.

```mjs
import { fileURLToPath } from 'node:url';
import {
Worker,
connect,
setConnectionsListener,
threadId,
workerData,
} from 'node:worker_threads';

const level = workerData?.level ?? 0;
const targetThread =
workerData?.targetThread ?? (level === 2 ? threadId : undefined);

if (level < 10) {
const worker = new Worker(fileURLToPath(import.meta.url), {
workerData: { level: level + 1, targetThread },
});
}

if (level === 2) {
setConnectionsListener((sender, port, data) => {
port.on('message', (message) => {
console.log(`${sender} -> ${threadId}`, message);
port.postMessage({ message: 'pong', data });
});

return true;
});
} else if (level === 10) {
const port = await connect(targetThread, { foo: 'bar' });

port.on('message', (message) => {
console.log(`${targetThread} -> ${threadId}`, message);
port.close();
});

port.postMessage({ message: 'ping' });
}
```
```cjs
const {
Worker,
connect,
setConnectionsListener,
threadId,
workerData,
} = require('node:worker_threads');

const level = workerData?.level ?? 0;
const targetThread =
workerData?.targetThread ?? (level === 2 ? threadId : undefined);

if (level < 10) {
const worker = new Worker(__filename, {
workerData: { level: level + 1, targetThread },
});
}

if (level === 2) {
setConnectionsListener((sender, port, data) => {
port.on('message', (message) => {
console.log(`${sender} -> ${threadId}`, message);
port.postMessage({ message: 'pong', data });
});

return true;
});
} else if (level === 10) {
connect(targetThread, { foo: 'bar' }).then((port) => {
port.on('message', (message) => {
console.log(`${targetThread} -> ${threadId}`, message);
port.close();
});

port.postMessage({ message: 'ping' });

});
}
```
## `worker.getEnvironmentData(key)`
<!-- YAML
Expand Down Expand Up @@ -325,6 +447,32 @@ new Worker('process.env.SET_IN_WORKER = "foo"', { eval: true, env: SHARE_ENV })
});
```
## `worker.setConnectionsListener(fn)`
<!-- YAML
added: REPLACEME
-->
> Stability: 1.1 - Active development
* `fn` {Function} A callback to be executed when [`worker.connect()`][] is called from another thread.
The function will receive the following arguments:
* `sender` {number} The other thread id.
* `port` {MessagePort} The port than can be used to communicate with the other thread.
* `data` {any} The data passed as second argument to [`worker.connect()`][].
The function must return `true` to accept the connection or any other value to
refuse the connection. If the function returns a `Promise`, it will be awaited.
Sets the callback that handles connection from other worker threads in the same process.
If the callback is `null` or `undefined` then the current listener is removed.
When no listeners are present (the default) all connection requests are immediately
refused.
See the example in [`worker.connect()`][] for more info on how to use this function and its callback.
## `worker.setEnvironmentData(key[, value])`
<!-- YAML
Expand Down Expand Up @@ -1437,8 +1585,10 @@ thread spawned will spawn another until the application crashes.
[`v8.getHeapSnapshot()`]: v8.md#v8getheapsnapshotoptions
[`vm`]: vm.md
[`worker.SHARE_ENV`]: #workershare_env
[`worker.connect()`]: #workerconnecttarget-data-timeout
[`worker.on('message')`]: #event-message_1
[`worker.postMessage()`]: #workerpostmessagevalue-transferlist
[`worker.setConnectionsListener()`]: #workersetconnectionslistenerfn
[`worker.terminate()`]: #workerterminate
[`worker.threadId`]: #workerthreadid_1
[async-resource-worker-pool]: async_context.md#using-asyncresource-for-a-worker-thread-pool
Expand Down
3 changes: 3 additions & 0 deletions lib/internal/errors.js
Original file line number Diff line number Diff line change
Expand Up @@ -1858,10 +1858,12 @@ E('ERR_VM_MODULE_NOT_MODULE',
E('ERR_VM_MODULE_STATUS', 'Module status %s', Error);
E('ERR_WASI_ALREADY_STARTED', 'WASI instance has already started', Error);
E('ERR_WEBASSEMBLY_RESPONSE', 'WebAssembly response %s', TypeError);
E('ERR_WORKER_CONNECTION_REFUSED', 'Connection refused from worker', Error);
E('ERR_WORKER_INIT_FAILED', 'Worker initialization failure: %s', Error);
E('ERR_WORKER_INVALID_EXEC_ARGV', (errors, msg = 'invalid execArgv flags') =>
`Initiated Worker with ${msg}: ${ArrayPrototypeJoin(errors, ', ')}`,
Error);
E('ERR_WORKER_INVALID_ID', 'Invalid worker id %d', Error);
E('ERR_WORKER_NOT_RUNNING', 'Worker instance not running', Error);
E('ERR_WORKER_OUT_OF_MEMORY',
'Worker terminated due to reaching memory limit: %s', Error);
Expand All @@ -1876,6 +1878,7 @@ E('ERR_WORKER_PATH', (filename) =>
) +
` Received "${filename}"`,
TypeError);
E('ERR_WORKER_SAME_THREAD', 'Cannot connect to the same thread', Error);
E('ERR_WORKER_UNSERIALIZABLE_ERROR',
'Serializing an uncaught exception failed', Error);
E('ERR_WORKER_UNSUPPORTED_OPERATION',
Expand Down
4 changes: 4 additions & 0 deletions lib/internal/main/worker_thread.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ const {
getEnvMessagePort,
} = internalBinding('worker');

const { processConnectionRequest } = require('internal/worker');
const workerIo = require('internal/worker/io');
const {
messageTypes: {
Expand All @@ -40,6 +41,7 @@ const {
// Messages that may be either received or posted
STDIO_PAYLOAD,
STDIO_WANTS_MORE_DATA,
CONNECT,
},
kStdioWantsMoreDataCallback,
} = workerIo;
Expand Down Expand Up @@ -182,6 +184,8 @@ port.on('message', (message) => {
break;
}
}
} else if (message.type === CONNECT) {
processConnectionRequest(message);
} else if (message.type === STDIO_PAYLOAD) {
const { stream, chunks } = message;
ArrayPrototypeForEach(chunks, ({ chunk, encoding }) => {
Expand Down
Loading

0 comments on commit 728f5e7

Please sign in to comment.