Skip to content

Commit e788265

Browse files
authored
feat: stream converter utils + AI SDK examples (#775)
Closes: https://github.com/unnoq/orpc/issues/768 <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit * **New Features** * Added documentation for integrating the AI SDK, including setup instructions and usage examples for both server and client sides. * Introduced new utilities for converting between streams and async iterators, now available in the shared package. * Expanded sidebar navigation in the documentation to include an "AI SDK" section. * **Bug Fixes** * None. * **Tests** * Added comprehensive tests to verify correct conversion between streams and async iterators, including error handling and resource cleanup. * **Chores** * Updated development dependencies to include the latest AI SDK packages. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
1 parent 752b165 commit e788265

9 files changed

Lines changed: 475 additions & 5 deletions

File tree

apps/content/.vitepress/config.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,7 @@ export default withMermaid(defineConfig({
142142
text: 'Integrations',
143143
collapsed: true,
144144
items: [
145+
{ text: 'AI SDK', link: '/docs/integrations/ai-sdk' },
145146
{ text: 'Tanstack Query', link: '/docs/integrations/tanstack-query' },
146147
{
147148
text: 'Tanstack Query (Old)',
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
---
2+
title: AI SDK Integration
3+
description: Seamlessly use AI SDK inside your oRPC projects without any extra overhead.
4+
---
5+
6+
# AI SDK Integration
7+
8+
[AI SDK](https://ai-sdk.dev/) is a free open-source library for building AI-powered products. You can seamlessly integrate it with oRPC without any extra overhead.
9+
10+
::: warning
11+
This documentation requires AI SDK v5.0.0 or later. For a refresher, review the [AI SDK documentation](https://ai-sdk.dev/docs).
12+
:::
13+
14+
## Server
15+
16+
Use `streamToEventIterator` to convert AI SDK streams to [oRPC Event Iterators](/docs/event-iterator).
17+
18+
```ts
19+
import { os, streamToEventIterator, type } from '@orpc/server'
20+
import { convertToModelMessages, streamText, UIMessage } from 'ai'
21+
22+
export const chat = os
23+
.input(type<{ chatId: string, messages: UIMessage[] }>())
24+
.handler(({ input }) => {
25+
const result = streamText({
26+
model: google('gemini-1.5-flash'),
27+
system: 'You are a helpful assistant.',
28+
messages: convertToModelMessages(input.messages),
29+
})
30+
31+
return streamToEventIterator(result.toUIMessageStream())
32+
})
33+
```
34+
35+
## Client
36+
37+
On the client side, convert the event iterator back to a stream using `eventIteratorToStream`.
38+
39+
```tsx
40+
import { useChat } from '@ai-sdk/react'
41+
import { eventIteratorToStream } from '@orpc/client'
42+
43+
export function Example() {
44+
const { messages, sendMessage, status } = useChat({
45+
transport: {
46+
async sendMessages(options) {
47+
return eventIteratorToStream(await client.chat({
48+
chatId: options.chatId,
49+
messages: options.messages,
50+
}, { signal: options.abortSignal }))
51+
},
52+
reconnectToStream(options) {
53+
throw new Error('Unsupported')
54+
},
55+
},
56+
})
57+
const [input, setInput] = useState('')
58+
59+
return (
60+
<>
61+
{messages.map(message => (
62+
<div key={message.id}>
63+
{message.role === 'user' ? 'User: ' : 'AI: '}
64+
{message.parts.map((part, index) =>
65+
part.type === 'text' ? <span key={index}>{part.text}</span> : null,
66+
)}
67+
</div>
68+
))}
69+
70+
<form
71+
onSubmit={(e) => {
72+
e.preventDefault()
73+
if (input.trim()) {
74+
sendMessage({ text: input })
75+
setInput('')
76+
}
77+
}}
78+
>
79+
<input
80+
value={input}
81+
onChange={e => setInput(e.target.value)}
82+
disabled={status !== 'ready'}
83+
placeholder="Say something..."
84+
/>
85+
<button type="submit" disabled={status !== 'ready'}>
86+
Submit
87+
</button>
88+
</form>
89+
</>
90+
)
91+
}
92+
```
93+
94+
::: info
95+
The `reconnectToStream` function is not supported by default, which is fine for most use cases. If you need reconnection support, implement it similar to `sendMessages` with custom reconnection logic. See this [reconnect example](<https://github.com/vercel/ai-chatbot/blob/main/app/(chat)/api/chat/%5Bid%5D/stream/route.ts>).
96+
:::

apps/content/package.json

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
"serve": "vitepress serve"
99
},
1010
"devDependencies": {
11+
"@ai-sdk/google": "2.0.0-beta.9",
12+
"@ai-sdk/react": "2.0.0-beta.18",
1113
"@orpc/arktype": "workspace:*",
1214
"@orpc/client": "workspace:*",
1315
"@orpc/contract": "workspace:*",
@@ -28,6 +30,7 @@
2830
"@tanstack/svelte-query": "^5.83.0",
2931
"@tanstack/vue-query": "^5.83.0",
3032
"@types/node": "^22.15.18",
33+
"ai": "5.0.0-beta.18",
3134
"mermaid": "^11.8.1",
3235
"openai": "^5.8.2",
3336
"pinia": "^3.0.2",

packages/client/src/index.ts

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,14 @@ export * from './event-iterator'
66
export * from './types'
77
export * from './utils'
88

9-
export { EventPublisher, onError, onFinish, onStart, onSuccess } from '@orpc/shared'
9+
export {
10+
asyncIteratorToStream as eventIteratorToStream,
11+
EventPublisher,
12+
onError,
13+
onFinish,
14+
onStart,
15+
onSuccess,
16+
streamToAsyncIteratorClass as streamToEventIterator,
17+
} from '@orpc/shared'
1018
export type { EventPublisherOptions, EventPublisherSubscribeIteratorOptions, Registry, ThrowableError } from '@orpc/shared'
1119
export { ErrorEvent } from '@orpc/standard-server'

packages/server/src/index.ts

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,14 @@ export type {
4040
Schema,
4141
} from '@orpc/contract'
4242
export type { IntersectPick } from '@orpc/shared'
43-
export { EventPublisher, onError, onFinish, onStart, onSuccess } from '@orpc/shared'
43+
export {
44+
asyncIteratorToStream as eventIteratorToStream,
45+
EventPublisher,
46+
onError,
47+
onFinish,
48+
onStart,
49+
onSuccess,
50+
streamToAsyncIteratorClass as streamToEventIterator,
51+
} from '@orpc/shared'
4452
export type { EventPublisherOptions, EventPublisherSubscribeIteratorOptions, Registry, ThrowableError } from '@orpc/shared'
4553
export { getEventMeta, withEventMeta } from '@orpc/standard-server'

packages/shared/src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ export * from './iterator'
1010
export * from './json'
1111
export * from './object'
1212
export * from './queue'
13+
export * from './stream'
1314
export * from './types'
1415
export * from './value'
1516

packages/shared/src/stream.test.ts

Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,189 @@
1+
import { AsyncIteratorClass } from './iterator'
2+
import { asyncIteratorToStream, streamToAsyncIteratorClass } from './stream'
3+
4+
describe('streamToAsyncIteratorClass', () => {
5+
it('should convert a ReadableStream to AsyncIteratorClass', async () => {
6+
const values = [1, 2, 3, 4, 5]
7+
const stream = new ReadableStream<number>({
8+
start(controller) {
9+
values.forEach(value => controller.enqueue(value))
10+
controller.close()
11+
},
12+
})
13+
14+
const iterator = streamToAsyncIteratorClass(stream)
15+
expect(iterator).toBeInstanceOf(AsyncIteratorClass)
16+
17+
const results: number[] = []
18+
for await (const value of iterator) {
19+
results.push(value)
20+
}
21+
22+
expect(results).toEqual(values)
23+
})
24+
25+
it('should handle empty stream', async () => {
26+
const stream = new ReadableStream<number>({
27+
start(controller) {
28+
controller.close()
29+
},
30+
})
31+
32+
const iterator = streamToAsyncIteratorClass(stream)
33+
const results: number[] = []
34+
35+
for await (const value of iterator) {
36+
results.push(value)
37+
}
38+
39+
expect(results).toEqual([])
40+
})
41+
42+
it('should handle stream errors', async () => {
43+
const error = new Error('Stream error')
44+
const stream = new ReadableStream<number>({
45+
start(controller) {
46+
controller.error(error)
47+
},
48+
})
49+
50+
const iterator = streamToAsyncIteratorClass(stream)
51+
52+
try {
53+
for await (const value of iterator) {
54+
// Should not reach here
55+
}
56+
expect.fail('Should have thrown an error')
57+
}
58+
catch (err) {
59+
expect(err).toBe(error)
60+
}
61+
})
62+
63+
it('should properly cleanup when iterator is returned early', async () => {
64+
let cleanupCalled = false
65+
const stream = new ReadableStream<number>({
66+
start(controller) {
67+
for (let i = 1; i <= 10; i++) {
68+
controller.enqueue(i)
69+
}
70+
controller.close()
71+
},
72+
cancel() {
73+
cleanupCalled = true
74+
},
75+
})
76+
77+
const iterator = streamToAsyncIteratorClass(stream)
78+
79+
await iterator.next()
80+
await iterator.return()
81+
82+
expect(cleanupCalled).toBe(true)
83+
})
84+
})
85+
86+
describe('asyncIteratorToStream', () => {
87+
it('should convert an AsyncIterator to ReadableStream', async () => {
88+
async function* generator() {
89+
yield 1
90+
yield 2
91+
yield 3
92+
}
93+
94+
const asyncIterator = generator()
95+
const stream = asyncIteratorToStream(asyncIterator)
96+
97+
expect(stream).toBeInstanceOf(ReadableStream)
98+
99+
const reader = stream.getReader()
100+
const results: number[] = []
101+
102+
let result = await reader.read()
103+
while (!result.done) {
104+
results.push(result.value)
105+
result = await reader.read()
106+
}
107+
108+
expect(results).toEqual([1, 2, 3])
109+
})
110+
111+
it('should handle empty async iterator', async () => {
112+
async function* emptyGenerator() {
113+
// Empty generator
114+
}
115+
116+
const asyncIterator = emptyGenerator()
117+
const stream = asyncIteratorToStream(asyncIterator)
118+
119+
const reader = stream.getReader()
120+
const result = await reader.read()
121+
122+
expect(result.done).toBe(true)
123+
expect(result.value).toBeUndefined()
124+
})
125+
126+
it('should handle async iterator errors', async () => {
127+
const error = new Error('Iterator error')
128+
async function* errorGenerator() {
129+
yield 1
130+
throw error
131+
}
132+
133+
const asyncIterator = errorGenerator()
134+
const stream = asyncIteratorToStream(asyncIterator)
135+
136+
const reader = stream.getReader()
137+
138+
// First read should succeed
139+
const firstResult = await reader.read()
140+
expect(firstResult.done).toBe(false)
141+
expect(firstResult.value).toBe(1)
142+
143+
// Second read should throw the error
144+
await expect(reader.read()).rejects.toThrow(error)
145+
})
146+
147+
it('should call iterator.return when stream is cancelled', async () => {
148+
let cleanupCalled = false
149+
150+
const stream = asyncIteratorToStream(async function* () {
151+
try {
152+
yield 1
153+
yield 2
154+
await new Promise(resolve => setTimeout(resolve, 100)) // Simulate async operation
155+
}
156+
finally {
157+
cleanupCalled = true
158+
}
159+
}())
160+
161+
const reader = stream.getReader()
162+
await reader.read()
163+
await reader.cancel()
164+
165+
expect(cleanupCalled).toBe(true)
166+
})
167+
})
168+
169+
it('streamToAsyncIteratorClass + asyncIteratorToStream', async () => {
170+
const stream = new ReadableStream<number>({
171+
start(controller) {
172+
controller.enqueue(1)
173+
controller.enqueue(2)
174+
controller.enqueue(3)
175+
controller.close()
176+
},
177+
})
178+
179+
const iterator = streamToAsyncIteratorClass(stream)
180+
const newStream = asyncIteratorToStream(iterator)
181+
const newIterator = streamToAsyncIteratorClass(newStream)
182+
183+
const results: number[] = []
184+
for await (const value of newIterator) {
185+
results.push(value)
186+
}
187+
188+
expect(results).toEqual([1, 2, 3])
189+
})

packages/shared/src/stream.ts

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
import { AsyncIteratorClass } from './iterator'
2+
3+
export function streamToAsyncIteratorClass<T>(
4+
stream: ReadableStream<T>,
5+
): AsyncIteratorClass<T> {
6+
const reader = stream.getReader()
7+
8+
return new AsyncIteratorClass<T>(
9+
async () => {
10+
return reader.read() as Promise<IteratorResult<T>>
11+
},
12+
async () => {
13+
await reader.cancel()
14+
},
15+
)
16+
}
17+
18+
export function asyncIteratorToStream<T>(
19+
iterator: AsyncIterator<T>,
20+
): ReadableStream<T> {
21+
return new ReadableStream<T>({
22+
async pull(controller) {
23+
const { done, value } = await iterator.next()
24+
25+
if (done) {
26+
controller.close()
27+
}
28+
else {
29+
controller.enqueue(value)
30+
}
31+
},
32+
async cancel() {
33+
await iterator.return?.()
34+
},
35+
})
36+
}

0 commit comments

Comments
 (0)