Skip to content

Commit 251560d

Browse files
authored
feat(tanstack-query): live query options (#721)
Use `.liveOptions` to configure live queries for Event Iterator. Unlike `.streamedOptions` which accumulates chunks, live queries replace the entire result with each new chunk received ```ts const query = useQuery(orpc.live.experimental_liveOptions({ input: { id: 123 }, // Specify input if needed context: { cache: true }, // Provide client context if needed // additional options... })) ``` <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit * **New Features** * Introduced experimental support for "live queries," enabling real-time data updates that replace the entire result with each new chunk. * Added new utility methods for generating live query keys and options, compatible with popular query hooks (e.g., useQuery, prefetchQuery). * Enhanced documentation with detailed explanations and examples for live queries. * **Bug Fixes** * Improved error handling for invalid or unsupported live query outputs. * **Tests** * Added comprehensive test suites for live query utilities across React, Solid, Svelte, Vue, and Angular integrations, ensuring correct behavior and type inference. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
1 parent b782987 commit 251560d

13 files changed

Lines changed: 842 additions & 6 deletions

apps/content/docs/integrations/tanstack-query.md

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,12 @@ orpc.planet.find.queryOptions({ input: { id: 123 } })
5858

5959
//
6060

61+
//
62+
63+
//
64+
65+
//
66+
6167
//
6268
```
6369

@@ -91,20 +97,40 @@ const query = useQuery(orpc.planet.find.queryOptions({
9197

9298
## Streamed Query Options
9399

94-
Use `.streamedOptions` to configure queries for [Event Iterator](/docs/event-iterator), which is built on top of [streamedQuery](https://tanstack.com/query/latest/docs/reference/streamedQuery). Use it with hooks like `useQuery`, `useSuspenseQuery`, or `prefetchQuery`.
100+
Use `.streamedOptions` to configure queries for [Event Iterator](/docs/event-iterator). This is built on [TanStack Query streamedQuery](https://tanstack.com/query/latest/docs/reference/streamedQuery) and works with hooks like `useQuery`, `useSuspenseQuery`, or `prefetchQuery`.
95101

96102
```ts
97103
const query = useQuery(orpc.streamed.experimental_streamedOptions({
98104
input: { id: 123 }, // Specify input if needed
99105
context: { cache: true }, // Provide client context if needed
100-
queryFnOptions: { // Specify streamedQuery options if needed
106+
queryFnOptions: { // Configure streamedQuery behavior
101107
refetchMode: 'reset',
102108
maxChunks: 3,
103109
}
104110
// additional options...
105111
}))
106112
```
107113

114+
::: info
115+
Combine with [Client Retry](/docs/plugins/client-retry#event-iterator-sse) for more reliable streaming queries.
116+
:::
117+
118+
## Live Query Options
119+
120+
Use `.liveOptions` to configure live queries for [Event Iterator](/docs/event-iterator). Unlike `.streamedOptions` which accumulates chunks, live queries replace the entire result with each new chunk received. Works with hooks like `useQuery`, `useSuspenseQuery`, or `prefetchQuery`.
121+
122+
```ts
123+
const query = useQuery(orpc.live.experimental_liveOptions({
124+
input: { id: 123 }, // Specify input if needed
125+
context: { cache: true }, // Provide client context if needed
126+
// additional options...
127+
}))
128+
```
129+
130+
::: info
131+
Combine with [Client Retry](/docs/plugins/client-retry#event-iterator-sse) for more reliable live queries.
132+
:::
133+
108134
## Infinite Query Options
109135

110136
Use `.infiniteOptions` to configure infinite queries. Use it with hooks like `useInfiniteQuery`, `useSuspenseInfiniteQuery`, or `prefetchInfiniteQuery`.
@@ -263,12 +289,14 @@ import {
263289
interface ClientContext extends TanstackQueryOperationContext {
264290
}
265291

292+
const GET_OPERATION_TYPE = new Set(['query', 'streamed', 'live', 'infinite'])
293+
266294
const link = new RPCLink<ClientContext>({
267295
url: 'http://localhost:3000/rpc',
268296
method: ({ context }, path) => {
269297
const operationType = context[TANSTACK_QUERY_OPERATION_CONTEXT_SYMBOL]?.type
270298

271-
if (operationType === 'query' || operationType === 'streamed' || operationType === 'infinite') {
299+
if (operationType && GET_OPERATION_TYPE.has(operationType)) {
272300
return 'GET'
273301
}
274302

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
import { queryClient } from '../tests/shared'
2+
import { experimental_liveQuery as liveQuery } from './live-query'
3+
4+
beforeEach(() => {
5+
queryClient.clear()
6+
})
7+
8+
describe('liveQuery', async () => {
9+
it('works', async () => {
10+
const queryFn = liveQuery(async function* () {
11+
yield 1
12+
await new Promise(resolve => setTimeout(resolve, 50))
13+
yield 2
14+
await new Promise(resolve => setTimeout(resolve, 50))
15+
yield 3
16+
})
17+
18+
const controller = new AbortController()
19+
20+
const resultPromise = expect(queryFn({
21+
queryKey: ['live-query'],
22+
signal: controller.signal,
23+
client: queryClient,
24+
} as any)).resolves.toEqual(3)
25+
26+
await vi.waitFor(() => {
27+
expect(queryClient.getQueryData(['live-query'])).toEqual(1)
28+
})
29+
30+
await vi.waitFor(() => {
31+
expect(queryClient.getQueryData(['live-query'])).toEqual(2)
32+
})
33+
34+
await resultPromise
35+
})
36+
37+
it('on abort signal', async () => {
38+
let cleanupCalled = false
39+
40+
const queryFn = liveQuery(async function* () {
41+
try {
42+
yield 1
43+
await new Promise(resolve => setTimeout(resolve, 50))
44+
yield 2
45+
await new Promise(resolve => setTimeout(resolve, 50))
46+
yield 3
47+
}
48+
finally {
49+
cleanupCalled = true
50+
}
51+
})
52+
53+
const controller = new AbortController()
54+
55+
const resultPromise = expect(queryFn({
56+
queryKey: ['live-query'],
57+
signal: controller.signal,
58+
client: queryClient,
59+
} as any)).resolves.toEqual(1)
60+
61+
await vi.waitFor(() => {
62+
expect(queryClient.getQueryData(['live-query'])).toEqual(1)
63+
})
64+
65+
controller.abort()
66+
67+
await resultPromise
68+
expect(cleanupCalled).toBe(true)
69+
})
70+
71+
it('throw if no data yielded', async () => {
72+
const queryFn = liveQuery(async function* () {
73+
// No yield
74+
})
75+
76+
await expect(queryFn({
77+
queryKey: ['live-query'],
78+
signal: new AbortController().signal,
79+
client: queryClient,
80+
} as any)).rejects.toThrowError(
81+
'Live query for ["live-query"] did not yield any data. Ensure the query function returns an AsyncIterable with at least one chunk.',
82+
)
83+
})
84+
})
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
import type { Promisable } from '@orpc/shared'
2+
import type { QueryFunction, QueryFunctionContext, QueryKey } from '@tanstack/query-core'
3+
import { stringifyJSON } from '@orpc/shared'
4+
5+
export function experimental_liveQuery<
6+
TQueryFnData = unknown,
7+
TQueryKey extends QueryKey = QueryKey,
8+
>(
9+
queryFn: (
10+
context: QueryFunctionContext<TQueryKey>,
11+
) => Promisable<AsyncIterable<TQueryFnData>>,
12+
): QueryFunction<TQueryFnData, TQueryKey> {
13+
return async (context) => {
14+
const stream = await queryFn(context)
15+
let last: { chunk: TQueryFnData } | undefined
16+
17+
for await (const chunk of stream) {
18+
if (context.signal.aborted) {
19+
break
20+
}
21+
22+
last = { chunk }
23+
context.client.setQueryData<TQueryFnData>(context.queryKey, chunk)
24+
}
25+
26+
if (!last) {
27+
throw new Error(
28+
`Live query for ${stringifyJSON(context.queryKey)} did not yield any data. Ensure the query function returns an AsyncIterable with at least one chunk.`,
29+
)
30+
}
31+
32+
return last.chunk
33+
}
34+
}

packages/tanstack-query/src/procedure-utils.angular.test-d.ts

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,72 @@ describe('ProcedureUtils', () => {
158158
})
159159
})
160160

161+
describe('.liveOptions', () => {
162+
describe('injectQuery', () => {
163+
it('without args', () => {
164+
const query = injectQuery(() => streamUtils.experimental_liveOptions())
165+
expectTypeOf(query.data()).toEqualTypeOf<UtilsOutput[number] | undefined>()
166+
expectTypeOf(query.error()).toEqualTypeOf<UtilsError | null>()
167+
})
168+
169+
it('can infer errors inside options', () => {
170+
const query = injectQuery(() => streamUtils.experimental_liveOptions({
171+
throwOnError(error) {
172+
expectTypeOf(error).toEqualTypeOf<UtilsError>()
173+
return false
174+
},
175+
}))
176+
expectTypeOf(query.data()).toEqualTypeOf<UtilsOutput[number] | undefined>()
177+
expectTypeOf(query.error()).toEqualTypeOf<UtilsError | null>()
178+
})
179+
180+
it('with initial data & select', () => {
181+
const query = injectQuery(() => streamUtils.experimental_liveOptions({
182+
select: data => ({ mapped: data }),
183+
initialData: { title: 'title' },
184+
}))
185+
186+
expectTypeOf(query.data()).toEqualTypeOf<{ mapped: UtilsOutput[number] }>()
187+
expectTypeOf(query.error()).toEqualTypeOf<UtilsError | null>()
188+
})
189+
})
190+
191+
it('injectQueries', () => {
192+
const queries = injectQueries({
193+
queries: signal([
194+
streamUtils.experimental_liveOptions(),
195+
streamUtils.experimental_liveOptions({
196+
input: { search: 'search' },
197+
context: { batch: true },
198+
}),
199+
streamUtils.experimental_liveOptions({
200+
select: data => ({ mapped: data }),
201+
}),
202+
]),
203+
})
204+
205+
// @ts-expect-error - TODO: fix this, injectQueries not working
206+
expectTypeOf(queries()[0].data).toEqualTypeOf<UtilsOutput[number] | undefined>()
207+
// @ts-expect-error - TODO: fix this, injectQueries not work at all
208+
expectTypeOf(queries()[1].data).toEqualTypeOf<UtilsOutput[number] | undefined>()
209+
expectTypeOf(queries()[2].data).toEqualTypeOf<{ mapped: UtilsOutput[number] } | undefined>()
210+
211+
// @ts-expect-error - TODO: fix this, injectQueries not work at all
212+
expectTypeOf(queries()[0].error).toEqualTypeOf<null | UtilsError>()
213+
// @ts-expect-error - TODO: fix this, injectQueries not work at all
214+
expectTypeOf(queries()[1].error).toEqualTypeOf<null | UtilsError>()
215+
expectTypeOf(queries()[2].error).toEqualTypeOf<null | UtilsError>()
216+
})
217+
218+
it('fetchQuery', () => {
219+
expectTypeOf(
220+
queryClient.fetchQuery(streamUtils.experimental_liveOptions()),
221+
).toEqualTypeOf<
222+
Promise<UtilsOutput[number]>
223+
>()
224+
})
225+
})
226+
161227
describe('.infiniteOptions', () => {
162228
const getNextPageParam: GetNextPageParamFunction<number, UtilsOutput> = () => 1
163229
const initialPageParam = 1

packages/tanstack-query/src/procedure-utils.react.test-d.ts

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,120 @@ describe('ProcedureUtils', () => {
253253
})
254254
})
255255

256+
describe('.liveOptions', () => {
257+
describe('useQuery', () => {
258+
it('without args', () => {
259+
const query = useQuery(streamUtils.experimental_liveOptions())
260+
expectTypeOf(query.data).toEqualTypeOf<UtilsOutput[number] | undefined>()
261+
expectTypeOf(query.error).toEqualTypeOf<UtilsError | null>()
262+
})
263+
264+
it('can infer errors inside options', () => {
265+
const query = useQuery(streamUtils.experimental_liveOptions({
266+
throwOnError(error) {
267+
expectTypeOf(error).toEqualTypeOf<UtilsError>()
268+
return false
269+
},
270+
}))
271+
expectTypeOf(query.data).toEqualTypeOf<UtilsOutput[number] | undefined>()
272+
expectTypeOf(query.error).toEqualTypeOf<UtilsError | null>()
273+
})
274+
275+
it('with initial data & select', () => {
276+
const query = useQuery(streamUtils.experimental_liveOptions({
277+
select: data => ({ mapped: data }),
278+
initialData: { title: 'title' },
279+
}))
280+
281+
expectTypeOf(query.data).toEqualTypeOf<{ mapped: UtilsOutput[number] }>()
282+
expectTypeOf(query.error).toEqualTypeOf<UtilsError | null>()
283+
})
284+
})
285+
286+
describe('useSuspenseQuery', () => {
287+
it('without args', () => {
288+
const query = useSuspenseQuery(streamUtils.experimental_liveOptions())
289+
expectTypeOf(query.data).toEqualTypeOf<UtilsOutput[number]>()
290+
expectTypeOf(query.error).toEqualTypeOf<UtilsError | null>()
291+
})
292+
293+
it('can infer errors inside options', () => {
294+
const query = useSuspenseQuery(streamUtils.experimental_liveOptions({
295+
throwOnError(error) {
296+
expectTypeOf(error).toEqualTypeOf<UtilsError>()
297+
return false
298+
},
299+
}))
300+
301+
expectTypeOf(query.data).toEqualTypeOf<UtilsOutput[number]>()
302+
expectTypeOf(query.error).toEqualTypeOf<UtilsError | null>()
303+
})
304+
305+
it('with select', () => {
306+
const query = useSuspenseQuery(streamUtils.experimental_liveOptions({
307+
select: data => ({ mapped: data }),
308+
}))
309+
310+
expectTypeOf(query.data).toEqualTypeOf<{ mapped: UtilsOutput[number] }>()
311+
expectTypeOf(query.error).toEqualTypeOf<UtilsError | null>()
312+
})
313+
})
314+
315+
it('useQueries', () => {
316+
const queries = useQueries({
317+
queries: [
318+
streamUtils.experimental_liveOptions(),
319+
streamUtils.experimental_liveOptions({
320+
input: { search: 'search' },
321+
context: { batch: true },
322+
}),
323+
streamUtils.experimental_liveOptions({
324+
select: data => ({ mapped: data }),
325+
}),
326+
],
327+
})
328+
329+
expectTypeOf(queries[0].data).toEqualTypeOf<UtilsOutput[number] | undefined>()
330+
expectTypeOf(queries[1].data).toEqualTypeOf<UtilsOutput[number] | undefined>()
331+
expectTypeOf(queries[2].data).toEqualTypeOf<{ mapped: UtilsOutput[number] } | undefined>()
332+
333+
expectTypeOf(queries[0].error).toEqualTypeOf<null | UtilsError>()
334+
expectTypeOf(queries[1].error).toEqualTypeOf<null | UtilsError>()
335+
expectTypeOf(queries[2].error).toEqualTypeOf<null | UtilsError>()
336+
})
337+
338+
it('useSuspenseQueries', () => {
339+
const queries = useSuspenseQueries({
340+
queries: [
341+
streamUtils.experimental_liveOptions(),
342+
streamUtils.experimental_liveOptions({
343+
input: { search: 'search' },
344+
context: { batch: true },
345+
}),
346+
streamUtils.experimental_liveOptions({
347+
select: data => ({ mapped: data }),
348+
}),
349+
],
350+
})
351+
352+
expectTypeOf(queries[0].data).toEqualTypeOf<UtilsOutput[number]>()
353+
expectTypeOf(queries[1].data).toEqualTypeOf<UtilsOutput[number]>()
354+
expectTypeOf(queries[2].data).toEqualTypeOf<{ mapped: UtilsOutput[number] }>()
355+
356+
expectTypeOf(queries[0].error).toEqualTypeOf<null | UtilsError>()
357+
expectTypeOf(queries[1].error).toEqualTypeOf<null | UtilsError>()
358+
expectTypeOf(queries[2].error).toEqualTypeOf<null | UtilsError>()
359+
})
360+
361+
it('fetchQuery', () => {
362+
expectTypeOf(
363+
queryClient.fetchQuery(streamUtils.experimental_liveOptions()),
364+
).toEqualTypeOf<
365+
Promise<UtilsOutput[number]>
366+
>()
367+
})
368+
})
369+
256370
describe('.infiniteOptions', () => {
257371
const getNextPageParam: GetNextPageParamFunction<number, UtilsOutput> = () => 1
258372
const initialPageParam = 1

0 commit comments

Comments
 (0)