Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 145 additions & 0 deletions __tests__/e2e.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,151 @@ describe.each(testMatrix())(
});
});

test('defaultCallOptions provides signal when caller omits it', async () => {
const clientTransport = getClientTransport('client');
const serverTransport = getServerTransport();
const services = { subscribable: SubscribableServiceSchema };
const server = createServer(serverTransport, services);
const abortController = new AbortController();
const client = createClient<typeof services>(
clientTransport,
serverTransport.clientId,
{ defaultCallOptions: { signal: abortController.signal } },
);
addPostTestCleanup(async () => {
await cleanupTransports([clientTransport, serverTransport]);
});

// No signal passed at the call site — comes from defaultCallOptions.
const { resReadable } = client.subscribable.value.subscribe({});
let result = await readNextResult(resReadable);
expect(result).toStrictEqual({ ok: true, payload: { result: 0 } });

abortController.abort();
result = await readNextResult(resReadable);
expect(result).toStrictEqual({
ok: false,
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
payload: expect.objectContaining({ code: CANCEL_CODE }),
});
expect(await isReadableDone(resReadable)).toEqual(true);

await testFinishesCleanly({
clientTransports: [clientTransport],
serverTransport,
server,
});
});

test('caller-supplied signal overrides defaultCallOptions', async () => {
const clientTransport = getClientTransport('client');
const serverTransport = getServerTransport();
const services = { subscribable: SubscribableServiceSchema };
const server = createServer(serverTransport, services);
const defaultAc = new AbortController();
const callerAc = new AbortController();
const client = createClient<typeof services>(
clientTransport,
serverTransport.clientId,
{ defaultCallOptions: { signal: defaultAc.signal } },
);
addPostTestCleanup(async () => {
await cleanupTransports([clientTransport, serverTransport]);
});

// Caller signal is the one that should drive cancellation.
const { resReadable } = client.subscribable.value.subscribe(
{},
{ signal: callerAc.signal },
);
let result = await readNextResult(resReadable);
expect(result).toStrictEqual({ ok: true, payload: { result: 0 } });

// Aborting the default-options signal must NOT cancel — caller wins.
defaultAc.abort();
const add1 = await client.subscribable.add.rpc({ n: 1 });
expect(add1).toMatchObject({ ok: true });
result = await readNextResult(resReadable);
expect(result).toStrictEqual({ ok: true, payload: { result: 1 } });

// Aborting the caller signal cancels.
callerAc.abort();
result = await readNextResult(resReadable);
expect(result).toStrictEqual({
ok: false,
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
payload: expect.objectContaining({ code: CANCEL_CODE }),
});
expect(await isReadableDone(resReadable)).toEqual(true);

await testFinishesCleanly({
clientTransports: [clientTransport],
serverTransport,
server,
});
});

test('function-form defaultCallOptions is resolved per call', async () => {
const clientTransport = getClientTransport('client');
const serverTransport = getServerTransport();
const services = { subscribable: SubscribableServiceSchema };
const server = createServer(serverTransport, services);
let currentSignal: AbortSignal | undefined;
const client = createClient<typeof services>(
clientTransport,
serverTransport.clientId,
{ defaultCallOptions: () => ({ signal: currentSignal }) },
);
addPostTestCleanup(async () => {
await cleanupTransports([clientTransport, serverTransport]);
});

// Each subscribe resolves the getter at call time, so each call
// captures whatever signal is current.
const ac1 = new AbortController();
currentSignal = ac1.signal;
const sub1 = client.subscribable.value.subscribe({});

const ac2 = new AbortController();
currentSignal = ac2.signal;
const sub2 = client.subscribable.value.subscribe({});

let r1 = await readNextResult(sub1.resReadable);
let r2 = await readNextResult(sub2.resReadable);
expect(r1).toStrictEqual({ ok: true, payload: { result: 0 } });
expect(r2).toStrictEqual({ ok: true, payload: { result: 0 } });

// ac1 cancels sub1 only — sub2 keeps streaming.
ac1.abort();
r1 = await readNextResult(sub1.resReadable);
expect(r1).toStrictEqual({
ok: false,
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
payload: expect.objectContaining({ code: CANCEL_CODE }),
});
expect(await isReadableDone(sub1.resReadable)).toEqual(true);

const add1 = await client.subscribable.add.rpc({ n: 1 });
expect(add1).toMatchObject({ ok: true });
r2 = await readNextResult(sub2.resReadable);
expect(r2).toStrictEqual({ ok: true, payload: { result: 1 } });

ac2.abort();
r2 = await readNextResult(sub2.resReadable);
expect(r2).toStrictEqual({
ok: false,
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
payload: expect.objectContaining({ code: CANCEL_CODE }),
});
expect(await isReadableDone(sub2.resReadable)).toEqual(true);

await testFinishesCleanly({
clientTransports: [clientTransport],
serverTransport,
server,
});
});

test('subscription idempotent close', async () => {
// setup
const clientTransport = getClientTransport('client');
Expand Down
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@replit/river",
"description": "It's like tRPC but... with JSON Schema Support, duplex streaming and support for service multiplexing. Transport agnostic!",
"version": "0.216.0",
"version": "0.216.1",
"type": "module",
"exports": {
".": {
Expand Down
29 changes: 27 additions & 2 deletions router/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import {
UNEXPECTED_DISCONNECT_CODE,
} from './errors';

interface CallOptions {
export interface CallOptions {
signal?: AbortSignal;
}

Expand Down Expand Up @@ -205,6 +205,16 @@ function _createRecursiveProxy(
export interface ClientOptions {
connectOnInvoke: boolean;
eagerlyConnect: boolean;
/**
* Default options merged into every leaf call (`rpc`, `stream`,
* `upload`, `subscribe`). Caller-supplied `options` win field-by-field,
* so a caller can override `signal` while keeping other defaults.
*
* Pass a function form when the default needs to be re-resolved per
* call (e.g. an ambient signal that changes between invocations of
* the same client).
*/
defaultCallOptions?: CallOptions | (() => CallOptions);
}

const defaultClientOptions: ClientOptions = {
Expand Down Expand Up @@ -273,18 +283,33 @@ export function createClient<ServiceSchemaMap extends AnyServiceSchemaMap<any>>(
);
}

const merged = mergeCallOptions(
clientOptions.defaultCallOptions,
callOptions as CallOptions | undefined,
);

return handleProc(
procMethod === 'subscribe' ? 'subscription' : procMethod,
transport,
serverId,
init,
serviceName,
procName,
callOptions ? (callOptions as CallOptions).signal : undefined,
merged.signal,
);
}, []) as Client<ServiceSchemaMap>;
}

function mergeCallOptions(
defaults: ClientOptions['defaultCallOptions'],
caller: CallOptions | undefined,
): CallOptions {
const resolved = typeof defaults === 'function' ? defaults() : defaults ?? {};

// Caller fields win: spread defaults first, caller second.
return { ...resolved, ...caller };
}

type AnyProcReturn =
| ReturnType<RpcFn<AnyService, string>>
| ReturnType<UploadFn<AnyService, string>>
Expand Down
2 changes: 1 addition & 1 deletion router/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ export {
BaseErrorSchemaType,
} from './errors';
export { createClient } from './client';
export type { Client } from './client';
export type { CallOptions, Client, ClientOptions } from './client';
export { createServer } from './server';
export type {
Server,
Expand Down
Loading