Skip to content

Commit

Permalink
Debounce message events (#1087)
Browse files Browse the repository at this point in the history
  • Loading branch information
ehmicky committed May 23, 2024
1 parent d6f5d9a commit e8bab97
Show file tree
Hide file tree
Showing 63 changed files with 855 additions and 773 deletions.
26 changes: 1 addition & 25 deletions docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -136,18 +136,6 @@ This requires the [`ipc`](#optionsipc) option to be `true`. The [type](ipc.md#me

[More info.](ipc.md#listening-to-messages)

### exchangeMessage(message, getOneMessageOptions?)

`message`: [`Message`](ipc.md#message-type)\
_getOneMessageOptions_: [`GetOneMessageOptions`](#getonemessageoptions)\
_Returns_: [`Promise<Message>`](ipc.md#message-type)

Send a `message` to the parent process, then receive a response from it.

This requires the [`ipc`](#optionsipc) option to be `true`. The [type](ipc.md#message-type) of `message` depends on the [`serialization`](#optionsserialization) option.

[More info.](ipc.md#exchanging-messages)

## Return value

_TypeScript:_ [`ResultPromise`](typescript.md)\
Expand Down Expand Up @@ -305,18 +293,6 @@ This requires the [`ipc`](#optionsipc) option to be `true`. The [type](ipc.md#me

[More info.](ipc.md#listening-to-messages)

### subprocess.exchangeMessage(message, getOneMessageOptions?)

`message`: [`Message`](ipc.md#message-type)\
_getOneMessageOptions_: [`GetOneMessageOptions`](#getonemessageoptions)\
_Returns_: [`Promise<Message>`](ipc.md#message-type)

Send a `message` to the subprocess, then receive a response from it.

This requires the [`ipc`](#optionsipc) option to be `true`. The [type](ipc.md#message-type) of `message` depends on the [`serialization`](#optionsserialization) option.

[More info.](ipc.md#exchanging-messages)

### subprocess.stdin

_Type:_ [`Writable | null`](https://nodejs.org/api/stream.html#class-streamwritable)
Expand Down Expand Up @@ -938,7 +914,7 @@ By default, this applies to both `stdout` and `stderr`, but [different values ca
_Type:_ `boolean`\
_Default:_ `true` if either the [`node`](#optionsnode) option or the [`ipcInput`](#optionsipcinput) is set, `false` otherwise

Enables exchanging messages with the subprocess using [`subprocess.sendMessage(message)`](#subprocesssendmessagemessage), [`subprocess.getOneMessage()`](#subprocessgetonemessagegetonemessageoptions), [`subprocess.exchangeMessage(message)`](#subprocessexchangemessagemessage-getonemessageoptions) and [`subprocess.getEachMessage()`](#subprocessgeteachmessage).
Enables exchanging messages with the subprocess using [`subprocess.sendMessage(message)`](#subprocesssendmessagemessage), [`subprocess.getOneMessage()`](#subprocessgetonemessagegetonemessageoptions) and [`subprocess.getEachMessage()`](#subprocessgeteachmessage).

The subprocess must be a Node.js file.

Expand Down
2 changes: 1 addition & 1 deletion docs/execution.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ Synchronous execution is generally discouraged as it holds the CPU and prevents
- Signal termination: [`subprocess.kill()`](api.md#subprocesskillerror), [`subprocess.pid`](api.md#subprocesspid), [`cleanup`](api.md#optionscleanup) option, [`cancelSignal`](api.md#optionscancelsignal) option, [`forceKillAfterDelay`](api.md#optionsforcekillafterdelay) option.
- Piping multiple subprocesses: [`subprocess.pipe()`](api.md#subprocesspipefile-arguments-options).
- [`subprocess.iterable()`](lines.md#progressive-splitting).
- [IPC](ipc.md): [`sendMessage()`](api.md#sendmessagemessage), [`getOneMessage()`](api.md#getonemessagegetonemessageoptions), [`exchangeMessage()`](api.md#exchangemessagemessage-getonemessageoptions), [`getEachMessage()`](api.md#geteachmessage), [`result.ipcOutput`](output.md#any-output-type), [`ipc`](api.md#optionsipc) option, [`serialization`](api.md#optionsserialization) option, [`ipcInput`](input.md#any-input-type) option.
- [IPC](ipc.md): [`sendMessage()`](api.md#sendmessagemessage), [`getOneMessage()`](api.md#getonemessagegetonemessageoptions), [`getEachMessage()`](api.md#geteachmessage), [`result.ipcOutput`](output.md#any-output-type), [`ipc`](api.md#optionsipc) option, [`serialization`](api.md#optionsserialization) option, [`ipcInput`](input.md#any-input-type) option.
- [`result.all`](api.md#resultall) is not interleaved.
- [`detached`](api.md#optionsdetached) option.
- The [`maxBuffer`](api.md#optionsmaxbuffer) option is always measured in bytes, not in characters, [lines](api.md#optionslines) nor [objects](transform.md#object-mode). Also, it ignores transforms and the [`encoding`](api.md#optionsencoding) option.
Expand Down
100 changes: 15 additions & 85 deletions docs/ipc.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,17 @@ When the [`ipc`](api.md#optionsipc) option is `true`, the current process and su

The `ipc` option defaults to `true` when using [`execaNode()`](node.md#run-nodejs-files) or the [`node`](node.md#run-nodejs-files) option.

The current process sends messages with [`subprocess.sendMessage(message)`](api.md#subprocesssendmessagemessage) and receives them with [`subprocess.getOneMessage()`](api.md#subprocessgetonemessagegetonemessageoptions). [`subprocess.exchangeMessage(message)`](api.md#subprocessexchangemessagemessage-getonemessageoptions) combines both: first it sends a message, then it returns the response.
The current process sends messages with [`subprocess.sendMessage(message)`](api.md#subprocesssendmessagemessage) and receives them with [`subprocess.getOneMessage()`](api.md#subprocessgetonemessagegetonemessageoptions).

The subprocess uses [`sendMessage(message)`](api.md#sendmessagemessage), [`getOneMessage()`](api.md#getonemessagegetonemessageoptions) and [`exchangeMessage(message)`](api.md#exchangemessagemessage-getonemessageoptions) instead. Those are the same methods, but imported directly from the `'execa'` module.
The subprocess uses [`sendMessage(message)`](api.md#sendmessagemessage) and [`getOneMessage()`](api.md#getonemessagegetonemessageoptions). Those are the same methods, but imported directly from the `'execa'` module.

```js
// parent.js
import {execaNode} from 'execa';

const subprocess = execaNode`child.js`;
const message = await subprocess.exchangeMessage('Hello from parent');
await subprocess.sendMessage('Hello from parent');
const message = await subprocess.getOneMessage();
console.log(message); // 'Hello from child'
```

Expand All @@ -36,7 +37,7 @@ await sendMessage(newMessage);

## Listening to messages

The methods described above read a single message. On the other hand, [`subprocess.getEachMessage()`](api.md#subprocessgeteachmessage) and [`getEachMessage()`](api.md#geteachmessage) return an [async iterable](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Iteration_protocols#the_async_iterator_and_async_iterable_protocols). This should be [preferred](#prefer-geteachmessage-over-multiple-getonemessage) when listening to multiple messages.
The methods described above read a single message. On the other hand, [`subprocess.getEachMessage()`](api.md#subprocessgeteachmessage) and [`getEachMessage()`](api.md#geteachmessage) return an [async iterable](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Iteration_protocols#the_async_iterator_and_async_iterable_protocols). This should be preferred when listening to multiple messages.

[`subprocess.getEachMessage()`](api.md#subprocessgeteachmessage) waits for the subprocess to end (even when using [`break`](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Statements/break) or [`return`](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Statements/return)). It throws if the subprocess [fails](api.md#result). This means you do not need to `await` the subprocess' [promise](execution.md#result).

Expand Down Expand Up @@ -148,94 +149,23 @@ const subprocess = execaNode({serialization: 'json'})`child.js`;

## Messages order

The messages are always received in the same order they were sent.

## Debugging

When the [`verbose`](api.md#optionsverbose) option is `'full'`, the IPC messages sent by the subprocess to the current process are [printed on the console](debugging.md#full-mode).

Also, when the subprocess [failed](errors.md#subprocess-failure), [`error.ipcOutput`](api.md) contains all the messages sent by the subprocess. Those are also shown at the end of the [error message](errors.md#error-message).

## Best practices

### Call `getOneMessage()`/`getEachMessage()` early

If a process sends a message but the other process is not listening/receiving it, that message is silently ignored.

This means if [`getOneMessage()`](api.md#getonemessagegetonemessageoptions) or [`getEachMessage()`](api.md#geteachmessage) is called too late (i.e. after the other process called [`sendMessage(message)`](api.md#sendmessagemessage)), it will miss the message. Also, it will keep waiting for the missed message, which might either make it hang forever, or throw when the other process exits.

Therefore, listening to messages should always be done early, before the other process sends any message.

### First `getOneMessage()` call

On the other hand, the _very first_ call to [`getOneMessage()`](api.md#getonemessagegetonemessageoptions), [`getEachMessage()`](api.md#geteachmessage) or [`exchangeMessage(message)`](api.md#exchangemessagemessage-getonemessageoptions) never misses any message. That's because, when a subprocess is just starting, any message sent to it is buffered.

This allows the current process to call [`subprocess.sendMessage(message)`](api.md#subprocesssendmessagemessage) right after spawning the subprocess, even if it is still initializing.

```js
// parent.js
import {execaNode} from 'execa';

const subprocess = execaNode`child.js`;
await subprocess.sendMessage('Hello');
await subprocess.sendMessage('world');
```
The messages are always received in the same order they were sent. Even when sent all at once.

```js
// child.js
import {getOneMessage} from 'execa';

// This always retrieves the first message.
// Even if `sendMessage()` was called while the subprocess was still initializing.
const helloMessage = await getOneMessage();

// But this might miss the second message, and hang forever.
// That's because it is not the first call to `getOneMessage()`.
const worldMessage = await getOneMessage();
```

### Prefer `getEachMessage()` over multiple `getOneMessage()`

If you call [`getOneMessage()`](api.md#getonemessagegetonemessageoptions) multiple times (like the example above) and the other process sends any message in between those calls, that message [will be missed](#call-getonemessagegeteachmessage-early).

This can be prevented by using [`getEachMessage()`](api.md#geteachmessage) instead.

### Prefer `exchangeMessage(message)` over `sendMessage(message)` + `getOneMessage()`

When _first_ sending a message, _then_ receiving a response, the following code should be avoided.

```js
import {sendMessage, getOneMessage} from 'execa';

await sendMessage(message);

// The other process might respond between those two calls

const response = await getOneMessage();
```

Indeed, when [`getOneMessage()`](api.md#getonemessagegetonemessageoptions) is called, the other process might have already sent a response. This only happens when the other process is very fast. However, when it does happen, `getOneMessage()` will miss that response.

Using [`exchangeMessage(message)`](api.md#exchangemessagemessage-getonemessageoptions) prevents this race condition.

```js
import {exchangeMessage} from 'execa';
import {sendMessage} from 'execa';

const response = await exchangeMessage(message);
await Promise.all([
sendMessage('first'),
sendMessage('second'),
sendMessage('third'),
]);
```

However please note that, when doing the reverse (_first_ receiving a message, _then_ sending a response), the following code is correct.

```js
import {sendMessage, getOneMessage} from 'execa';

const message = await getOneMessage();
## Debugging

// The other process is just waiting for a response between those two calls.
// So there is no race condition here.
When the [`verbose`](api.md#optionsverbose) option is `'full'`, the IPC messages sent by the subprocess to the current process are [printed on the console](debugging.md#full-mode).

await sendMessage('response');
```
Also, when the subprocess [failed](errors.md#subprocess-failure), [`error.ipcOutput`](api.md) contains all the messages sent by the subprocess. Those are also shown at the end of the [error message](errors.md#error-message).

<hr>

Expand Down
1 change: 0 additions & 1 deletion index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ export {execaNode} from './types/methods/node.js';
export {
sendMessage,
getOneMessage,
exchangeMessage,
getEachMessage,
type Message,
} from './types/ipc.js';
2 changes: 0 additions & 2 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,10 @@ export const $ = createExeca(mapScriptAsync, {}, deepScriptOptions, setScriptSyn
const {
sendMessage,
getOneMessage,
exchangeMessage,
getEachMessage,
} = getIpcExport();
export {
sendMessage,
getOneMessage,
exchangeMessage,
getEachMessage,
};
43 changes: 0 additions & 43 deletions lib/ipc/exchange.js

This file was deleted.

59 changes: 17 additions & 42 deletions lib/ipc/forward.js
Original file line number Diff line number Diff line change
@@ -1,42 +1,21 @@
import {EventEmitter} from 'node:events';

// By default, Node.js keeps the subprocess alive while it has a `message` or `disconnect` listener.
// This is implemented by calling `.channel.ref()` and `.channel.unref()` automatically.
// However, this prevents forwarding those events to our proxy, since that requires setting up additional listeners.
// Therefore, we need to do manual referencing counting.
// See https://github.com/nodejs/node/blob/2aaeaa863c35befa2ebaa98fb7737ec84df4d8e9/lib/internal/child_process.js#L547
export const addReference = anyProcess => {
const referencesCount = IPC_REFERENCES.get(anyProcess) ?? 0;
if (referencesCount === 0) {
anyProcess.channel?.ref();
}

IPC_REFERENCES.set(anyProcess, referencesCount + 1);
};

export const removeReference = anyProcess => {
const referencesCount = IPC_REFERENCES.get(anyProcess);
if (referencesCount === 1) {
anyProcess.channel?.unref();
}

IPC_REFERENCES.set(anyProcess, referencesCount - 1);
};

const IPC_REFERENCES = new WeakMap();
import {onMessage, onDisconnect} from './incoming.js';
import {undoAddedReferences} from './reference.js';

// Forward the `message` and `disconnect` events from the process and subprocess to a proxy emitter.
// This prevents the `error` event from stopping IPC.
export const getIpcEmitter = anyProcess => {
// This also allows debouncing the `message` event.
export const getIpcEmitter = (anyProcess, isSubprocess) => {
if (IPC_EMITTERS.has(anyProcess)) {
return IPC_EMITTERS.get(anyProcess);
}

// Use an `EventEmitter`, like the `process` that is being proxied
// eslint-disable-next-line unicorn/prefer-event-target
const ipcEmitter = new EventEmitter();
ipcEmitter.connected = true;
IPC_EMITTERS.set(anyProcess, ipcEmitter);
forwardEvents(ipcEmitter, anyProcess);
forwardEvents(ipcEmitter, anyProcess, isSubprocess);
return ipcEmitter;
};

Expand All @@ -45,21 +24,17 @@ const IPC_EMITTERS = new WeakMap();
// The `message` and `disconnect` events are buffered in the subprocess until the first listener is setup.
// However, unbuffering happens after one tick, so this give enough time for the caller to setup the listener on the proxy emitter first.
// See https://github.com/nodejs/node/blob/2aaeaa863c35befa2ebaa98fb7737ec84df4d8e9/lib/internal/child_process.js#L721
const forwardEvents = (ipcEmitter, anyProcess) => {
forwardEvent(ipcEmitter, anyProcess, 'message');
forwardEvent(ipcEmitter, anyProcess, 'disconnect');
};

const forwardEvent = (ipcEmitter, anyProcess, eventName) => {
const eventListener = forwardListener.bind(undefined, ipcEmitter, eventName);
anyProcess.on(eventName, eventListener);
anyProcess.once('disconnect', cleanupListener.bind(undefined, anyProcess, eventName, eventListener));
};

const forwardListener = (ipcEmitter, eventName, payload) => {
ipcEmitter.emit(eventName, payload);
const forwardEvents = (ipcEmitter, anyProcess, isSubprocess) => {
const boundOnMessage = onMessage.bind(undefined, anyProcess, ipcEmitter);
anyProcess.on('message', boundOnMessage);
anyProcess.once('disconnect', onDisconnect.bind(undefined, {anyProcess, ipcEmitter, boundOnMessage}));
undoAddedReferences(anyProcess, isSubprocess);
};

const cleanupListener = (anyProcess, eventName, eventListener) => {
anyProcess.removeListener(eventName, eventListener);
// Check whether there might still be some `message` events to receive
export const isConnected = anyProcess => {
const ipcEmitter = IPC_EMITTERS.get(anyProcess);
return ipcEmitter === undefined
? anyProcess.channel !== null
: ipcEmitter.connected;
};
7 changes: 4 additions & 3 deletions lib/ipc/get-each.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import {once, on} from 'node:events';
import {validateIpcMethod, disconnect} from './validation.js';
import {addReference, removeReference, getIpcEmitter} from './forward.js';
import {getIpcEmitter, isConnected} from './forward.js';
import {addReference, removeReference} from './reference.js';

// Like `[sub]process.on('message')` but promise-based
export const getEachMessage = ({anyProcess, isSubprocess, ipc}) => loopOnMessages({
Expand All @@ -16,11 +17,11 @@ export const loopOnMessages = ({anyProcess, isSubprocess, ipc, shouldAwait}) =>
methodName: 'getEachMessage',
isSubprocess,
ipc,
isConnected: anyProcess.channel !== null,
isConnected: isConnected(anyProcess),
});

addReference(anyProcess);
const ipcEmitter = getIpcEmitter(anyProcess);
const ipcEmitter = getIpcEmitter(anyProcess, isSubprocess);
const controller = new AbortController();
stopOnDisconnect(anyProcess, ipcEmitter, controller);
return iterateOnMessages({
Expand Down
Loading

0 comments on commit e8bab97

Please sign in to comment.