Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions apps/content/docs/tanstack-query/basic.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,19 @@ const query = useQuery(orpc.planet.find.queryOptions({
}))
```

## Streamed Query Options Utility

Use `.streamedOptions` to configure queries for [Event Iterator](/docs/event-iterator), which is built on top of [streamedQuery](https://tanstack.com/query/latest/docs/reference/streamedQuery). Use it with hooks like `useQuery`, `useSuspenseQuery`, or `prefetchQuery`.

```ts
const query = useQuery(orpc.streamed.experimental_streamedOptions({
input: { id: 123 }, // Specify input if needed
context: { cache: true }, // Provide client context if needed
refetchMode: 'reset', // Specify refetch mode if needed
// additional options...
}))
```

## Infinite Query Options Utility

Use `.infiniteOptions` to configure infinite queries. Use it with hooks like `useInfiniteQuery`, `useSuspenseInfiniteQuery`, or `prefetchInfiniteQuery`.
Expand Down
15 changes: 14 additions & 1 deletion packages/client/tests/shared.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import type { RouterClient } from '@orpc/server'
import { RPCHandler } from '@orpc/server/fetch'
import { router } from '../../server/tests/shared'
import { router, streamed } from '../../server/tests/shared'
import { createORPCClient } from '../src'
import { RPCLink } from '../src/adapters/fetch'

Expand Down Expand Up @@ -31,6 +31,19 @@ const rpcLink = new RPCLink<ClientContext>({

export const orpc: RouterClient<typeof router, ClientContext> = createORPCClient(rpcLink)

const streamedHandler = new RPCHandler({ streamed })

export const streamedOrpc: RouterClient<{ streamed: typeof streamed }, ClientContext> = createORPCClient(new RPCLink({
url: 'http://localhost:3000',
fetch: async (url, init) => {
const { response } = await streamedHandler.handle(new Request(url, init), {
context: { db: 'postgres' },
})

return response ?? new Response('not found', { status: 404 })
},
}))

enum Test {
A = 1,
B = 2,
Expand Down
17 changes: 16 additions & 1 deletion packages/contract/tests/shared.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import type { Schema } from '../src'
import type { Meta } from '../src/meta'
import { z } from 'zod'
import { ContractProcedure } from '../src'
import { ContractProcedure, eventIterator } from '../src'

export const inputSchema = z.object({ input: z.number().transform(n => `${n}`) })

Expand Down Expand Up @@ -56,3 +56,18 @@ export const router = {
pong,
},
}

export const streamedOutputSchema = eventIterator(outputSchema)

export const streamed = new ContractProcedure<
typeof inputSchema,
typeof streamedOutputSchema,
typeof baseErrorMap,
Meta
>({
errorMap: baseErrorMap,
meta: {},
route: {},
inputSchema,
outputSchema: streamedOutputSchema,
})
8 changes: 4 additions & 4 deletions packages/react-query/src/key.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import type { PartialDeep } from '@orpc/shared'
import type { QueryKey } from '@tanstack/react-query'

export type KeyType = 'query' | 'infinite' | 'mutation' | undefined
export type KeyType = 'query' | 'streamed' | 'infinite' | 'mutation' | undefined

export interface BuildKeyOptions<TType extends KeyType, TInput> {
type?: TType
Expand All @@ -10,10 +10,10 @@ export interface BuildKeyOptions<TType extends KeyType, TInput> {

export function buildKey<TType extends KeyType, TInput>(
path: string[],
options?: BuildKeyOptions<TType, TInput>,
options: BuildKeyOptions<TType, TInput> = {},
): QueryKey {
const withInput = options?.input !== undefined ? { input: options?.input } : {}
const withType = options?.type !== undefined ? { type: options?.type } : {}
const withInput = options.input !== undefined ? { input: options?.input } : {}
const withType = options.type !== undefined ? { type: options?.type } : {}

return [path, {
...withInput,
Expand Down
90 changes: 90 additions & 0 deletions packages/react-query/src/procedure-utils.test-d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,96 @@ describe('ProcedureUtils', () => {
})
})

describe('.streamedOptions', () => {
const utils = {} as ProcedureUtils<{ batch?: boolean }, UtilsInput, AsyncIterable<UtilsOutput[number]>, ErrorFromErrorMap<typeof baseErrorMap>>

it('can optional options', () => {
const requiredUtils = {} as ProcedureUtils<{ batch: boolean }, 'input', UtilsOutput, Error>

utils.experimental_streamedOptions()
utils.experimental_streamedOptions({ context: { batch: true } })
utils.experimental_streamedOptions({ input: { search: 'search' } })
utils.experimental_streamedOptions({ input: condition ? skipToken : { search: 'search' } })

requiredUtils.experimental_streamedOptions({
context: { batch: true },
input: 'input',
})
requiredUtils.experimental_streamedOptions({
context: { batch: true },
input: condition ? skipToken : 'input',
})
// @ts-expect-error input and context is required
requiredUtils.experimental_streamedOptions()
// @ts-expect-error input and context is required
requiredUtils.experimental_streamedOptions({})
// @ts-expect-error input is required
requiredUtils.experimental_streamedOptions({ context: { batch: true } })
// @ts-expect-error context is required
requiredUtils.experimental_streamedOptions({ input: 'input' })
// @ts-expect-error context is required
requiredUtils.experimental_streamedOptions({ input: condition ? skipToken : 'input' })
})

it('infer correct input type', () => {
utils.experimental_streamedOptions({ input: { cursor: 1 }, context: { batch: true } })
utils.experimental_streamedOptions({ input: condition ? { cursor: 2 } : skipToken, context: { batch: true } })
// @ts-expect-error invalid input
utils.experimental_streamedOptions({ input: { cursor: 'invalid' }, context: { batch: true } })
// @ts-expect-error invalid input
utils.experimental_streamedOptions({ input: condition ? { cursor: 'invalid' } : skipToken, context: { batch: true } })
})

it('infer correct context type', () => {
utils.experimental_streamedOptions({ context: { batch: true } })
// @ts-expect-error invalid context
utils.experimental_streamedOptions({ context: { batch: 'invalid' } })
})

it('not usable in non event iterator output', () => {
const utils = {} as ProcedureUtils<{ batch?: boolean }, UtilsInput, UtilsOutput, ErrorFromErrorMap<typeof baseErrorMap>>
const query = useQuery(utils.experimental_streamedOptions())
expectTypeOf(query.data).toExtend<undefined>()
})

describe('works with useQuery', () => {
it('without initial data', () => {
const query = useQuery(utils.experimental_streamedOptions({
select: data => ({ mapped: data }),
throwOnError(error) {
expectTypeOf(error).toEqualTypeOf<ErrorFromErrorMap<typeof baseErrorMap>>()
return false
},
}))

expectTypeOf(query.data).toEqualTypeOf<{ mapped: UtilsOutput } | undefined>()
expectTypeOf(query.error).toEqualTypeOf<ErrorFromErrorMap<typeof baseErrorMap> | null>()
})

it('with initial data', () => {
const query = useQuery(utils.experimental_streamedOptions({
select: data => ({ mapped: data }),
initialData: [{ title: 'title' }],
throwOnError(error) {
expectTypeOf(error).toEqualTypeOf<ErrorFromErrorMap<typeof baseErrorMap>>()
return false
},
}))

expectTypeOf(query.data).toEqualTypeOf<{ mapped: UtilsOutput }>()
expectTypeOf(query.error).toEqualTypeOf<ErrorFromErrorMap<typeof baseErrorMap> | null>()
})
})

it('works with fetchQuery', () => {
expectTypeOf(
queryClient.fetchQuery(utils.experimental_streamedOptions()),
).toEqualTypeOf<
Promise<UtilsOutput>
>()
})
})

describe('.infiniteOptions', () => {
const getNextPageParam = {} as GetNextPageParamFunction<number, UtilsOutput>
const initialPageParam = 1
Expand Down
70 changes: 69 additions & 1 deletion packages/react-query/src/procedure-utils.test.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,21 @@
import { skipToken } from '@tanstack/react-query'
import { skipToken, experimental_streamedQuery as streamedQuery } from '@tanstack/react-query'
import { queryClient } from '../tests/shared'
import * as Key from './key'
import { createProcedureUtils } from './procedure-utils'

vi.mock('@tanstack/react-query', async (origin) => {
const original = await origin() as any

return {
...original,
experimental_streamedQuery: vi.fn(original.experimental_streamedQuery),
}
})

const buildKeySpy = vi.spyOn(Key, 'buildKey')

beforeEach(() => {
queryClient.clear()
vi.clearAllMocks()
})

Expand Down Expand Up @@ -47,6 +58,63 @@ describe('createProcedureUtils', () => {
})
})

describe('.streamedOptions', () => {
it('without skipToken', async () => {
client.mockImplementationOnce(async function* (input) {
yield '__1__'
yield '__2__'
return '__3__'
})

const options = utils.experimental_streamedOptions({
input: { search: '__search__' },
context: { batch: '__batch__' },
refetchMode: 'replace',
})

expect(options.enabled).toBe(true)

expect(options.queryKey).toBe(buildKeySpy.mock.results[0]!.value)
expect(buildKeySpy).toHaveBeenCalledTimes(1)
expect(buildKeySpy).toHaveBeenCalledWith(['ping'], { type: 'streamed', input: { search: '__search__' } })

expect(options.queryFn).toBe(vi.mocked(streamedQuery).mock.results[0]!.value)
expect(streamedQuery).toHaveBeenCalledTimes(1)
expect(streamedQuery).toHaveBeenCalledWith(expect.objectContaining({
refetchMode: 'replace',
queryFn: expect.any(Function),
}))

await expect(options.queryFn!({ signal, client: queryClient, queryKey: options.queryKey } as any)).resolves.toEqual(['__1__', '__2__'])
expect(queryClient.getQueryData(options.queryKey)).toEqual(['__1__', '__2__'])

expect(client).toHaveBeenCalledTimes(1)
expect(client).toBeCalledWith({ search: '__search__' }, { signal, context: { batch: '__batch__' } })
})

it('with skipToken', async () => {
const options = utils.experimental_streamedOptions({ input: skipToken, context: { batch: '__batch__' } })

expect(options.enabled).toBe(false)

expect(options.queryKey).toBe(buildKeySpy.mock.results[0]!.value)
expect(buildKeySpy).toHaveBeenCalledTimes(1)
expect(buildKeySpy).toHaveBeenCalledWith(['ping'], { type: 'streamed', input: skipToken })

await expect(options.queryFn!({ signal, client: queryClient } as any)).rejects.toThrow('queryFn should not be called with skipToken used as input')
expect(client).toHaveBeenCalledTimes(0)
})

it('with unsupported output', async () => {
client.mockResolvedValueOnce('__1__')
const options = utils.experimental_streamedOptions({ input: { search: '__search__' }, context: { batch: '__batch__' } })

await expect(options.queryFn!({ signal, client: queryClient } as any)).rejects.toThrow('streamedQuery requires an event iterator output')
expect(client).toHaveBeenCalledTimes(1)
expect(client).toBeCalledWith({ search: '__search__' }, { signal, context: { batch: '__batch__' } })
})
})

describe('.infiniteOptions', () => {
it('without skipToken', async () => {
const getNextPageParam = vi.fn()
Expand Down
51 changes: 49 additions & 2 deletions packages/react-query/src/procedure-utils.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,19 @@
import type { Client, ClientContext } from '@orpc/client'
import type { MaybeOptionalOptions } from '@orpc/shared'
import type { InfiniteData } from '@tanstack/react-query'
import type { InfiniteOptionsBase, InfiniteOptionsIn, MutationOptions, MutationOptionsIn, QueryOptionsBase, QueryOptionsIn } from './types'
import { skipToken } from '@tanstack/react-query'
import type {
experimental_InferStreamedOutput,
InfiniteOptionsBase,
InfiniteOptionsIn,
MutationOptions,
MutationOptionsIn,
QueryOptionsBase,
QueryOptionsIn,
experimental_StreamedOptionsBase as StreamedOptionsBase,
experimental_StreamedOptionsIn as StreamedOptionsIn,
} from './types'
import { isAsyncIteratorObject } from '@orpc/shared'
import { skipToken, experimental_streamedQuery as streamedQuery } from '@tanstack/react-query'
import { buildKey } from './key'

export interface ProcedureUtils<TClientContext extends ClientContext, TInput, TOutput, TError> {
Expand All @@ -24,6 +35,18 @@ export interface ProcedureUtils<TClientContext extends ClientContext, TInput, TO
>
): NoInfer<U & Omit<QueryOptionsBase<TOutput, TError>, keyof U>>

/**
* Generate [Event Iterator](https://orpc.unnoq.com/docs/event-iterator) options used for useQuery/useSuspenseQuery/prefetchQuery/...
* Built on top of [steamedQuery](https://tanstack.com/query/latest/docs/reference/streamedQuery)
*
* @see {@link https://orpc.unnoq.com/docs/tanstack-query/basic#streamed-query-options-utility Tanstack Streamed Query Options Utility Docs}
*/
experimental_streamedOptions<U, USelectData = experimental_InferStreamedOutput<TOutput>>(
...rest: MaybeOptionalOptions<
U & StreamedOptionsIn<TClientContext, TInput, experimental_InferStreamedOutput<TOutput>, TError, USelectData>
>
): NoInfer<U & Omit<StreamedOptionsBase<experimental_InferStreamedOutput<TOutput>, TError>, keyof U>>

/**
* Generate options used for useInfiniteQuery/useSuspenseInfiniteQuery/prefetchInfiniteQuery/...
*
Expand Down Expand Up @@ -69,6 +92,30 @@ export function createProcedureUtils<TClientContext extends ClientContext, TInpu
}
},

experimental_streamedOptions(...[optionsIn = {} as any]) {
return {
enabled: optionsIn.input !== skipToken,
queryKey: buildKey(options.path, { type: 'streamed', input: optionsIn.input }),
queryFn: streamedQuery({
...optionsIn,
queryFn: async ({ signal }) => {
if (optionsIn.input === skipToken) {
throw new Error('queryFn should not be called with skipToken used as input')
}

const output = await client(optionsIn.input, { signal, context: optionsIn.context })

if (!isAsyncIteratorObject(output)) {
throw new Error('streamedQuery requires an event iterator output')
}

return output
},
}),
...optionsIn,
}
},

infiniteOptions(optionsIn) {
return {
queryKey: buildKey(options.path, {
Expand Down
13 changes: 12 additions & 1 deletion packages/react-query/src/types.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import type { ClientContext } from '@orpc/client'
import type { SetOptional } from '@orpc/shared'
import type { QueryFunctionContext, QueryKey, SkipToken, UseInfiniteQueryOptions, UseMutationOptions, UseQueryOptions } from '@tanstack/react-query'
import type { experimental_streamedQuery, QueryFunctionContext, QueryKey, SkipToken, UseInfiniteQueryOptions, UseMutationOptions, UseQueryOptions } from '@tanstack/react-query'

export type QueryOptionsIn<TClientContext extends ClientContext, TInput, TOutput, TError, TSelectData> =
& (undefined extends TInput ? { input?: TInput | SkipToken } : { input: TInput | SkipToken })
Expand All @@ -14,6 +14,17 @@ export interface QueryOptionsBase<TOutput, TError> {
enabled: boolean
}

type experimental_StreamedQueryOptions = Omit<Parameters<typeof experimental_streamedQuery>[0], 'queryFn'>

export type experimental_InferStreamedOutput<TOutput> = TOutput extends AsyncIterable<infer U> ? U[] : never

export type experimental_StreamedOptionsIn<TClientContext extends ClientContext, TInput, TOutput, TError, TSelectData> =
& QueryOptionsIn<TClientContext, TInput, TOutput, TError, TSelectData>
& experimental_StreamedQueryOptions

export interface experimental_StreamedOptionsBase<TOutput, TError> extends QueryOptionsBase<TOutput, TError> {
}

export type InfiniteOptionsIn<TClientContext extends ClientContext, TInput, TOutput, TError, TSelectData, TPageParam> =
& { input: ((pageParam: TPageParam) => TInput) | SkipToken }
& (Record<never, never> extends TClientContext ? { context?: TClientContext } : { context: TClientContext })
Expand Down
Loading