Skip to content

Commit e91d2fd

Browse files
authored
feat(client, server): add eventIteratorToUnproxiedDataStream util (#1110)
Some event iterators proxy their emitted data to track event metadata. When integrating with tools that don't support proxied data (such as AI SDK), use eventIteratorToUnproxiedDataStream instead of eventIteratorToStream to ensure raw, unproxied data is emitted. Fixed: #977 <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit * **New Features** * Added a data-stream conversion that returns unproxied values for streamed results, exposed across client and server packages for better compatibility with AI SDKs and integrations. * **Documentation** * Updated AI SDK integration docs to recommend using the new unproxied data-stream handling for reliable structured-data processing. * **Tests** * Added tests covering normal, empty, error, and cancellation scenarios for the new data-stream behavior. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
1 parent 12bec78 commit e91d2fd

5 files changed

Lines changed: 166 additions & 3 deletions

File tree

apps/content/docs/integrations/ai-sdk.md

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,13 +59,13 @@ export const chat = os
5959
const client = { chat }
6060
// ---cut---
6161
import { useChat } from '@ai-sdk/react'
62-
import { eventIteratorToStream } from '@orpc/client'
62+
import { eventIteratorToUnproxiedDataStream } from '@orpc/client'
6363

6464
export function Example() {
6565
const { messages, sendMessage, status } = useChat({
6666
transport: {
6767
async sendMessages(options) {
68-
return eventIteratorToStream(await client.chat({
68+
return eventIteratorToUnproxiedDataStream(await client.chat({
6969
chatId: options.chatId,
7070
messages: options.messages,
7171
}, { signal: options.abortSignal }))
@@ -115,3 +115,9 @@ export function Example() {
115115
::: info
116116
The `reconnectToStream` function is not supported by default, which is fine for most use cases. If you need reconnection support, implement it similar to `sendMessages` with custom reconnection logic. See this [reconnect example](<https://github.com/vercel/ai-chatbot/blob/main/app/(chat)/api/chat/%5Bid%5D/stream/route.ts>).
117117
:::
118+
119+
::: info
120+
Prefer `eventIteratorToUnproxiedDataStream` over `eventIteratorToStream`.
121+
AI SDK internally uses `structuredClone`, which doesn't support proxied data.
122+
oRPC may proxy events for metadata, so unproxy before passing to AI SDK.
123+
:::

packages/client/src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ export * from './utils'
1010
export {
1111
AsyncIteratorClass,
1212
asyncIteratorToStream as eventIteratorToStream,
13+
asyncIteratorToUnproxiedDataStream as eventIteratorToUnproxiedDataStream,
1314
EventPublisher,
1415
onError,
1516
onFinish,

packages/server/src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ export type { IntersectPick } from '@orpc/shared'
4343
export {
4444
AsyncIteratorClass,
4545
asyncIteratorToStream as eventIteratorToStream,
46+
asyncIteratorToUnproxiedDataStream as eventIteratorToUnproxiedDataStream,
4647
EventPublisher,
4748
onError,
4849
onFinish,

packages/shared/src/stream.test.ts

Lines changed: 119 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { AsyncIteratorClass } from './iterator'
2-
import { asyncIteratorToStream, streamToAsyncIteratorClass } from './stream'
2+
import { asyncIteratorToStream, asyncIteratorToUnproxiedDataStream, streamToAsyncIteratorClass } from './stream'
33

44
describe('streamToAsyncIteratorClass', () => {
55
it('should convert a ReadableStream to AsyncIteratorClass', async () => {
@@ -187,3 +187,121 @@ it('streamToAsyncIteratorClass + asyncIteratorToStream', async () => {
187187

188188
expect(results).toEqual([1, 2, 3])
189189
})
190+
191+
describe('asyncIteratorToUnproxiedDataStream', () => {
192+
const PROXY_SYMBOL = Symbol('proxy')
193+
194+
function proxy(value: object) {
195+
return new Proxy(value, {
196+
get(target, prop) {
197+
if (prop === PROXY_SYMBOL) {
198+
return true
199+
}
200+
201+
return Reflect.get(target, prop)
202+
},
203+
})
204+
}
205+
206+
function isProxied(value: any) {
207+
return Boolean(typeof value === 'object' && value && value[PROXY_SYMBOL])
208+
}
209+
210+
it('should convert an AsyncIterator to ReadableStream and unproxied data', async () => {
211+
const date = new Date()
212+
const set = new Set([date])
213+
214+
async function* generator() {
215+
yield 1
216+
yield proxy({ order: 2 })
217+
yield { order: 3 }
218+
yield [4]
219+
yield proxy([5])
220+
yield date // support native Date
221+
yield set // support native Set
222+
}
223+
224+
const asyncIterator = generator()
225+
const stream = asyncIteratorToUnproxiedDataStream(asyncIterator)
226+
227+
expect(stream).toBeInstanceOf(ReadableStream)
228+
229+
const reader = stream.getReader()
230+
const results: any[] = []
231+
232+
let result = await reader.read()
233+
while (!result.done) {
234+
results.push(result.value)
235+
result = await reader.read()
236+
}
237+
238+
expect(results).toEqual([
239+
1,
240+
{ order: 2 },
241+
{ order: 3 },
242+
[4],
243+
[5],
244+
date,
245+
set,
246+
])
247+
248+
expect(results.some(isProxied)).toBe(false)
249+
})
250+
251+
it('should handle empty async iterator', async () => {
252+
async function* emptyGenerator() {
253+
// Empty generator
254+
}
255+
256+
const asyncIterator = emptyGenerator()
257+
const stream = asyncIteratorToUnproxiedDataStream(asyncIterator)
258+
259+
const reader = stream.getReader()
260+
const result = await reader.read()
261+
262+
expect(result.done).toBe(true)
263+
expect(result.value).toBeUndefined()
264+
})
265+
266+
it('should handle async iterator errors', async () => {
267+
const error = new Error('Iterator error')
268+
async function* errorGenerator() {
269+
yield 1
270+
throw error
271+
}
272+
273+
const asyncIterator = errorGenerator()
274+
const stream = asyncIteratorToUnproxiedDataStream(asyncIterator)
275+
276+
const reader = stream.getReader()
277+
278+
// First read should succeed
279+
const firstResult = await reader.read()
280+
expect(firstResult.done).toBe(false)
281+
expect(firstResult.value).toBe(1)
282+
283+
// Second read should throw the error
284+
await expect(reader.read()).rejects.toThrow(error)
285+
})
286+
287+
it('should call iterator.return when stream is cancelled', async () => {
288+
let cleanupCalled = false
289+
290+
const stream = asyncIteratorToUnproxiedDataStream(async function* () {
291+
try {
292+
yield 1
293+
yield 2
294+
await new Promise(resolve => setTimeout(resolve, 100)) // Simulate async operation
295+
}
296+
finally {
297+
cleanupCalled = true
298+
}
299+
}())
300+
301+
const reader = stream.getReader()
302+
await reader.read()
303+
await reader.cancel()
304+
305+
expect(cleanupCalled).toBe(true)
306+
})
307+
})

packages/shared/src/stream.ts

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
import { AsyncIteratorClass } from './iterator'
2+
import { isObject } from './object'
23

4+
/**
5+
* Converts a `ReadableStream` into an `AsyncIteratorClass`.
6+
*/
37
export function streamToAsyncIteratorClass<T>(
48
stream: ReadableStream<T>,
59
): AsyncIteratorClass<T> {
@@ -15,6 +19,9 @@ export function streamToAsyncIteratorClass<T>(
1519
)
1620
}
1721

22+
/**
23+
* Converts an `AsyncIterator` into a `ReadableStream`.
24+
*/
1825
export function asyncIteratorToStream<T>(
1926
iterator: AsyncIterator<T>,
2027
): ReadableStream<T> {
@@ -34,3 +41,33 @@ export function asyncIteratorToStream<T>(
3441
},
3542
})
3643
}
44+
45+
/**
46+
* Converts an `AsyncIterator` into a `ReadableStream`, ensuring that
47+
* all emitted object values are *unproxied* before enqueuing.
48+
*/
49+
export function asyncIteratorToUnproxiedDataStream<T>(
50+
iterator: AsyncIterator<T>,
51+
): ReadableStream<T> {
52+
return new ReadableStream<T>({
53+
async pull(controller) {
54+
const { done, value } = await iterator.next()
55+
56+
if (done) {
57+
controller.close()
58+
}
59+
else {
60+
const unproxied = isObject(value)
61+
? { ...value }
62+
: Array.isArray(value)
63+
? value.map(i => i) as T // use .map instead of ... to deal with sparse arrays
64+
: value
65+
66+
controller.enqueue(unproxied)
67+
}
68+
},
69+
async cancel() {
70+
await iterator.return?.()
71+
},
72+
})
73+
}

0 commit comments

Comments
 (0)