diff --git a/doc/api/errors.md b/doc/api/errors.md index 4d3829f78e7b95..3e8b9dfc33043a 100644 --- a/doc/api/errors.md +++ b/doc/api/errors.md @@ -3072,6 +3072,16 @@ added: v18.1.0 The `Response` that has been passed to `WebAssembly.compileStreaming` or to `WebAssembly.instantiateStreaming` is not a valid WebAssembly response. + + +### `ERR_WORKER_CONNECTION_REFUSED` + + + +The thread requested in [`connect()`][] refused the connection or has no connections listener provided. + ### `ERR_WORKER_INIT_FAILED` @@ -3085,6 +3095,16 @@ The `Worker` initialization failed. The `execArgv` option passed to the `Worker` constructor contains invalid flags. + + +### `ERR_WORKER_INVALID_ID` + + + +The thread ID requested in [`connect()`][] is invalid. + ### `ERR_WORKER_NOT_RUNNING` @@ -3104,6 +3124,16 @@ The `Worker` instance terminated because it reached its memory limit. The path for the main script of a worker is neither an absolute path nor a relative path starting with `./` or `../`. + + +### `ERR_WORKER_SAME_THREAD` + + + +The thread id requested in [`connect()`][] is the current thread id. + ### `ERR_WORKER_UNSERIALIZABLE_ERROR` @@ -3999,6 +4029,7 @@ An error occurred trying to allocate memory. This should never happen. [`Writable`]: stream.md#class-streamwritable [`child_process`]: child_process.md [`cipher.getAuthTag()`]: crypto.md#ciphergetauthtag +[`connect()`]: worker_threads.md#workerconnecttarget-data-timeout [`crypto.getDiffieHellman()`]: crypto.md#cryptogetdiffiehellmangroupname [`crypto.scrypt()`]: crypto.md#cryptoscryptpassword-salt-keylen-options-callback [`crypto.scryptSync()`]: crypto.md#cryptoscryptsyncpassword-salt-keylen-options diff --git a/doc/api/worker_threads.md b/doc/api/worker_threads.md index e4e842c6d21365..a255195f202d75 100644 --- a/doc/api/worker_threads.md +++ b/doc/api/worker_threads.md @@ -61,6 +61,128 @@ Worker threads inherit non-process-specific options by default. Refer to [`Worker constructor options`][] to know how to customize worker thread options, specifically `argv` and `execArgv` options. +## `worker.connect(target[, data][, timeout])` + + + +> Stability: 1.1 - Active development + +* `target` {number} The target thread ID. +* `data` {any} Any arbitrary, cloneable JavaScript value. + The data will be passed as the second argument to the callback provided in the + target thread via [`worker.setConnectionsListener()`][]. +* `timeout` {number} Time to wait for the communication port to the target thread, + in milliseconds. By default it's `undefined`, which means wait forever. +* Returns: {Promise} A promise for a `MessagePort`. + +Establishes a connection to another worker thread in the same process, returning a +`MessagePort` that can be used for the communication. + +The target thread must have a connection listener setup via [`worker.setConnectionsListener()`][] +otherwise the connection request will fail. + +This method does not support transferables in the `data` argument. If there is need to +transfer objects between threads it can be accomplished via the returned port once the +connection has been established. + +This method should be used when the target thread is not the direct +parent or child of the current thread. +If the two threads are parent-children, use the [`require('node:worker_threads').parentPort.postMessage()`][] +and the [`worker.postMessage()`][] to let the threads communicate. + +If the messages never need transferable, use `BroadcastChannel` for the communication. Remember that +they are one to many channels so if you need to narrow it down to a one to one channel you will have to +ensure only two threads subscribe to a certain channel, for instance by using unique channel IDs. + +The example below shows the combined use of `connect` and [`worker.setConnectionsListener()`][]: +it creates 10 nested threads, the last one will try to communicate with the third one. +Since `MessagePort`s cannot be transferred in `BroadcastChannel`, the `connect` API was the only way +to let the two threads talk. + +```mjs +import { fileURLToPath } from 'node:url'; +import { + Worker, + connect, + setConnectionsListener, + threadId, + workerData, +} from 'node:worker_threads'; + +const level = workerData?.level ?? 0; +const targetThread = + workerData?.targetThread ?? (level === 2 ? threadId : undefined); + +if (level < 10) { + const worker = new Worker(fileURLToPath(import.meta.url), { + workerData: { level: level + 1, targetThread }, + }); +} + +if (level === 2) { + setConnectionsListener((sender, port, data) => { + port.on('message', (message) => { + console.log(`${sender} -> ${threadId}`, message); + port.postMessage({ message: 'pong', data }); + }); + + return true; + }); +} else if (level === 10) { + const port = await connect(targetThread, { foo: 'bar' }); + + port.on('message', (message) => { + console.log(`${targetThread} -> ${threadId}`, message); + port.close(); + }); + + port.postMessage({ message: 'ping' }); +} +``` + +```cjs +const { + Worker, + connect, + setConnectionsListener, + threadId, + workerData, +} = require('node:worker_threads'); + +const level = workerData?.level ?? 0; +const targetThread = + workerData?.targetThread ?? (level === 2 ? threadId : undefined); + +if (level < 10) { + const worker = new Worker(__filename, { + workerData: { level: level + 1, targetThread }, + }); +} + +if (level === 2) { + setConnectionsListener((sender, port, data) => { + port.on('message', (message) => { + console.log(`${sender} -> ${threadId}`, message); + port.postMessage({ message: 'pong', data }); + }); + + return true; + }); +} else if (level === 10) { + connect(targetThread, { foo: 'bar' }).then((port) => { + port.on('message', (message) => { + console.log(`${targetThread} -> ${threadId}`, message); + port.close(); + }); + + port.postMessage({ message: 'ping' }); + + }); +} +``` + ## `worker.getEnvironmentData(key)` + +> Stability: 1.1 - Active development + +* `fn` {Function} A callback to be executed when [`worker.connect()`][] is called from another thread. + The function will receive the following arguments: + + * `sender` {number} The other thread ID. + * `port` {MessagePort} The port than can be used to communicate with the other thread. + * `data` {any} The data passed as second argument to [`worker.connect()`][]. + +The function must return `true` to accept the connection or any other value to +refuse the connection. If the function returns a `Promise`, it will be awaited. + +Sets the callback that handles connection from other worker threads in the same process. +If the callback is `null` or `undefined` then the current listener is removed. + +When no listeners are present (the default) all connection requests are immediately +refused. + +See the example in [`worker.connect()`][] for more info on how to use this function and its callback. + ## `worker.setEnvironmentData(key[, value])`