Skip to content

feat(streamedQuery): maxChunks #9184

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion docs/reference/streamedQuery.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,10 @@ const query = queryOptions({
- Defaults to `'reset'`
- When set to `'reset'`, the query will erase all data and go back into `pending` state.
- When set to `'append'`, data will be appended to existing data.
- When set to `'replace'`, data will be written to the cache at the end of the stream.
- When set to `'replace'`, all data will be written to the cache once the stream ends.
- `maxChunks?: number`
- Optional
- The maximum number of chunks to keep in the cache.
- Defaults to `undefined`, meaning all chunks will be kept.
- If `undefined` or `0`, the number of chunks is unlimited.
- If the number of chunks exceeds this number, the oldest chunk will be removed.
125 changes: 125 additions & 0 deletions packages/query-core/src/__tests__/streamedQuery.test.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -349,4 +349,129 @@ describe('streamedQuery', () => {

unsubscribe()
})

test('should support maxChunks', async () => {
const key = queryKey()
const observer = new QueryObserver(queryClient, {
queryKey: key,
queryFn: streamedQuery({
queryFn: () => createAsyncNumberGenerator(3),
maxChunks: 2,
}),
})

const unsubscribe = observer.subscribe(vi.fn())

expect(observer.getCurrentResult()).toMatchObject({
status: 'pending',
fetchStatus: 'fetching',
data: undefined,
})

await vi.advanceTimersByTimeAsync(50)

expect(observer.getCurrentResult()).toMatchObject({
status: 'success',
fetchStatus: 'fetching',
data: [0],
})

await vi.advanceTimersByTimeAsync(50)

expect(observer.getCurrentResult()).toMatchObject({
status: 'success',
fetchStatus: 'fetching',
data: [0, 1],
})

await vi.advanceTimersByTimeAsync(50)

expect(observer.getCurrentResult()).toMatchObject({
status: 'success',
fetchStatus: 'idle',
data: [1, 2],
})

unsubscribe()
})

test('maxChunks with append refetch', async () => {
const key = queryKey()
const observer = new QueryObserver(queryClient, {
queryKey: key,
queryFn: streamedQuery({
queryFn: () => createAsyncNumberGenerator(3),
maxChunks: 2,
refetchMode: 'append',
}),
})

const unsubscribe = observer.subscribe(vi.fn())

expect(observer.getCurrentResult()).toMatchObject({
status: 'pending',
fetchStatus: 'fetching',
data: undefined,
})

await vi.advanceTimersByTimeAsync(50)

expect(observer.getCurrentResult()).toMatchObject({
status: 'success',
fetchStatus: 'fetching',
data: [0],
})

await vi.advanceTimersByTimeAsync(50)

expect(observer.getCurrentResult()).toMatchObject({
status: 'success',
fetchStatus: 'fetching',
data: [0, 1],
})

await vi.advanceTimersByTimeAsync(50)

expect(observer.getCurrentResult()).toMatchObject({
status: 'success',
fetchStatus: 'idle',
data: [1, 2],
})

void observer.refetch()

await vi.advanceTimersByTimeAsync(10)

expect(observer.getCurrentResult()).toMatchObject({
status: 'success',
fetchStatus: 'fetching',
data: [1, 2],
})

await vi.advanceTimersByTimeAsync(40)

expect(observer.getCurrentResult()).toMatchObject({
status: 'success',
fetchStatus: 'fetching',
data: [2, 0],
})

await vi.advanceTimersByTimeAsync(50)

expect(observer.getCurrentResult()).toMatchObject({
status: 'success',
fetchStatus: 'fetching',
data: [0, 1],
})

await vi.advanceTimersByTimeAsync(50)

expect(observer.getCurrentResult()).toMatchObject({
status: 'success',
fetchStatus: 'idle',
data: [1, 2],
})

unsubscribe()
})
})
15 changes: 11 additions & 4 deletions packages/query-core/src/streamedQuery.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { addToEnd } from './utils'
import type { QueryFunction, QueryFunctionContext, QueryKey } from './types'

/**
Expand All @@ -9,19 +10,25 @@ import type { QueryFunction, QueryFunctionContext, QueryKey } from './types'
* @param refetchMode - Defines how re-fetches are handled.
* Defaults to `'reset'`, erases all data and puts the query back into `pending` state.
* Set to `'append'` to append new data to the existing data.
* Set to `'replace'` to write the data to the cache at the end of the stream.
* Set to `'replace'` to write all data to the cache once the stream ends.
* @param maxChunks - The maximum number of chunks to keep in the cache.
* Defaults to `undefined`, meaning all chunks will be kept.
* If `undefined` or `0`, the number of chunks is unlimited.
* If the number of chunks exceeds this number, the oldest chunk will be removed.
*/
export function streamedQuery<
TQueryFnData = unknown,
TQueryKey extends QueryKey = QueryKey,
>({
queryFn,
refetchMode = 'reset',
maxChunks,
}: {
queryFn: (
context: QueryFunctionContext<TQueryKey>,
) => AsyncIterable<TQueryFnData> | Promise<AsyncIterable<TQueryFnData>>
refetchMode?: 'append' | 'reset' | 'replace'
maxChunks?: number
}): QueryFunction<Array<TQueryFnData>, TQueryKey> {
return async (context) => {
const query = context.client
Expand All @@ -38,7 +45,7 @@ export function streamedQuery<
})
}

const result: Array<TQueryFnData> = []
let result: Array<TQueryFnData> = []
const stream = await queryFn(context)

for await (const chunk of stream) {
Expand All @@ -51,11 +58,11 @@ export function streamedQuery<
context.client.setQueryData<Array<TQueryFnData>>(
context.queryKey,
(prev = []) => {
return prev.concat([chunk])
return addToEnd(prev, chunk, maxChunks)
},
)
}
result.push(chunk)
result = addToEnd(result, chunk, maxChunks)
}

// finalize result: replace-refetching needs to write to the cache
Expand Down