Skip to content

Commit

Permalink
Fixed a bug where the RPC coalescer would leave the application with …
Browse files Browse the repository at this point in the history
…no request even though there were consumers (#2819)

# Summary

If multiple consumers made the same RPC call, then all of them aborted, then another consumer made the same RPC call, all in the same runloop, you'd end up with zero inflight requests and the remaining callers would never recieve their response.
  • Loading branch information
steveluscher committed Jun 14, 2024
1 parent 1c3a24a commit 7ee47ae
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 5 deletions.
5 changes: 5 additions & 0 deletions .changeset/early-eyes-mix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@solana/rpc': patch
---

Fixed a bug where coalesced RPC calls could end up aborted even though there were still interested consumers. This would happen if the consumer count fell to zero, then rose above zero again, in the same runloop.
22 changes: 21 additions & 1 deletion packages/rpc/src/__tests__/rpc-request-coalescer-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,22 @@ describe('RPC request coalescer', () => {
const expectationB = expect(responsePromiseB).rejects.toBe(mockErrorB);
await Promise.all([expectationA, expectationB]);
});
it('does not abort the transport when the number of consumers increases, falls to zero, then increases again in the same runloop', async () => {
expect.assertions(2);
const abortControllerA = new AbortController();
const abortControllerB = new AbortController();
coalescedTransport({ payload: null, signal: abortControllerA.signal }).catch(() => {});
coalescedTransport({ payload: null, signal: abortControllerB.signal }).catch(() => {});
// Both abort, bringing the consumer count to zero.
abortControllerA.abort('o no A');
abortControllerB.abort('o no B');
// New request comes in at the last moment before the end of the runloop.
coalescedTransport({ payload: null });
await jest.runOnlyPendingTimersAsync();
expect(mockTransport).toHaveBeenCalledTimes(1);
const transportAbortSignal = mockTransport.mock.lastCall![0].signal!;
expect(transportAbortSignal.aborted).toBe(false);
});
describe('multiple coalesced requests each with an abort signal', () => {
let abortControllerA: AbortController;
let abortControllerB: AbortController;
Expand Down Expand Up @@ -120,9 +136,13 @@ describe('RPC request coalescer', () => {
abortControllerA.abort('o no');
await expect(responsePromiseA).rejects.toBe('o no');
});
it('aborts the transport when all of the requests abort', () => {
it('aborts the transport at the end of the runloop when all of the requests abort', async () => {
expect.assertions(1);
responsePromiseA.catch(() => {});
responsePromiseB.catch(() => {});
abortControllerA.abort('o no A');
abortControllerB.abort('o no B');
await jest.runOnlyPendingTimersAsync();
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const transportAbortSignal = mockTransport.mock.lastCall![0].signal!;
expect(transportAbortSignal.aborted).toBe(true);
Expand Down
10 changes: 6 additions & 4 deletions packages/rpc/src/rpc-request-coalescer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,12 @@ export function getRpcTransportWithRequestCoalescing<TTransport extends RpcTrans
const handleAbort = (e: AbortSignalEventMap['abort']) => {
signal.removeEventListener('abort', handleAbort);
coalescedRequest.numConsumers -= 1;
if (coalescedRequest.numConsumers === 0) {
const abortController = coalescedRequest.abortController;
abortController.abort(EXPLICIT_ABORT_TOKEN);
}
Promise.resolve().then(() => {
if (coalescedRequest.numConsumers === 0) {
const abortController = coalescedRequest.abortController;
abortController.abort(EXPLICIT_ABORT_TOKEN);
}
});
reject((e.target as AbortSignal).reason);
};
signal.addEventListener('abort', handleAbort);
Expand Down

0 comments on commit 7ee47ae

Please sign in to comment.