Skip to content

Commit 3f40711

Browse files
authored
feat(server, openapi)!: event-source support (#147)
* serialize/deserialize * serialize/deserialize * improve mapEventSourceIterator * retry / last-event-id * sync * validation schema * openapi specs * fix * event-source state * update server-standard * improve naming * docs * improve options
1 parent 9125edb commit 3f40711

49 files changed

Lines changed: 2116 additions & 239 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

apps/content/content/docs/client/vanilla.mdx

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,11 +52,11 @@ type ClientContext = { cache?: RequestCache }
5252
const rpcLink = new RPCLink<ClientContext>({
5353
url: 'http://localhost:3000/rpc',
5454
// headers: provide additional headers
55-
fetch: (input, init, context) => globalThis.fetch(input, {
55+
fetch: (url, init, { context }) => globalThis.fetch(url, {
5656
...init,
5757
cache: context?.cache,
5858
}),
59-
method: (path, input, context) => {
59+
method: ({ context }, path, input) => {
6060
// if input contain file, and you return GET, oRPC will change it to POST automatically
6161

6262
if (context?.cache) {
@@ -98,7 +98,7 @@ const rpcLink2 = new RPCLink({
9898
// headers: provide additional headers
9999
})
100100

101-
const dynamicLink = new DynamicLink((path, input, context) => { // can be async
101+
const dynamicLink = new DynamicLink(({ context }, path, input) => { // can be async
102102
if (path.includes('post')) {
103103
return rpcLink1
104104
}
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
---
2+
title: Event Iterator
3+
description: Server-Sent Events (SSE) with out-of-the-box support in oRPC
4+
---
5+
6+
The event iterator in oRPC provides built-in support for streaming responses using Server-Sent Events (SSE) without any extra configuration. This makes it easy to build real-time, event-driven APIs.
7+
8+
### Example Usage
9+
10+
The following example demonstrates how to create a streaming endpoint that continuously sends events to the client:
11+
12+
```ts twoslash
13+
import { createORPCClient } from '@orpc/client'
14+
import { RPCLink } from '@orpc/client/fetch'
15+
import { eventIterator, os, withEventMeta } from '@orpc/server'
16+
import { z } from 'zod'
17+
18+
// Define a streaming endpoint using the event iterator
19+
const streaming = os
20+
// Validate the input with Zod
21+
.input(z.object({ prompt: z.string() }))
22+
// Use eventIterator to specify the streaming output (optional, but recommended)
23+
.output(eventIterator(z.object({ message: z.string() })))
24+
.handler(async function* ({ input, lastEventId }) {
25+
// The lastEventId (if provided) can be used to resume streaming on reconnects
26+
while (true) {
27+
// withEventMeta attaches metadata (e.g., event id, retry interval) to each event
28+
yield withEventMeta(
29+
{ message: 'Hello, world!' },
30+
{ id: 'some-id', retry: 1000 }
31+
)
32+
// Wait for 1 second before sending the next event
33+
await new Promise(resolve => setTimeout(resolve, 1000))
34+
}
35+
})
36+
37+
// Create a router with the streaming endpoint
38+
const router = { streaming }
39+
40+
// Create an ORPC client and configure the SSE behavior
41+
const client = createORPCClient<typeof router>(
42+
new RPCLink({
43+
url: 'http://localhost:3000/rpc',
44+
eventSourceMaxNumberOfRetries: 0, // Set to 0 to disable automatic retries on connection failure,
45+
// Optionally, you can configure:
46+
// - eventSourceRetryDelay: Delay between retry attempts
47+
// - eventSourceRetry: Custom retry behavior
48+
})
49+
)
50+
51+
// Invoke the streaming endpoint with an initial prompt and optional lastEventId
52+
const result = await client.streaming(
53+
{ prompt: 'Hello' },
54+
{ lastEventId: 'you also can pass initial last event id here' }
55+
)
56+
57+
// Process the incoming stream of events using async iteration
58+
for await (const event of result) {
59+
console.log(event)
60+
}
61+
62+
// To close the connection manually, call result.return()
63+
```
64+
65+
### Key Concepts
66+
67+
- **`lastEventId`**:
68+
This parameter represents the ID of the last event received by the client. When reconnecting, the client can send this ID so that the server can resume streaming from where it left off.
69+
70+
- **`withEventMeta`**:
71+
This helper function attaches additional metadata (such as `id` and `retry`) to an event. This metadata is useful for controlling the behavior of SSE, for example:
72+
- **`id`**: A unique identifier for the event. Last event id is used when reconnecting.
73+
- **`retry`**: The suggested reconnection delay (in milliseconds) if the connection is lost.

apps/content/content/docs/server/meta.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
"context",
77
"contract",
88
"file-upload",
9+
"event-iterator",
910
"lazy",
1011
"server-action",
1112
"client",

packages/client/package.json

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,15 +43,15 @@
4343
"dist"
4444
],
4545
"scripts": {
46-
"build": "tsup --clean --sourcemap --entry.index=src/index.ts --entry.fetch=src/adapters/fetch/index.ts --format=esm --onSuccess='tsc -b --noCheck'",
46+
"build": "tsup --onSuccess='tsc -b --noCheck'",
4747
"build:watch": "pnpm run build --watch",
4848
"type:check": "tsc -b"
4949
},
5050
"dependencies": {
5151
"@orpc/contract": "workspace:*",
5252
"@orpc/server": "workspace:*",
53-
"@orpc/server-standard": "^0.0.0",
54-
"@orpc/server-standard-fetch": "^0.0.0",
53+
"@orpc/server-standard": "^0.4.0",
54+
"@orpc/server-standard-fetch": "^0.4.0",
5555
"@orpc/shared": "workspace:*"
5656
},
5757
"devDependencies": {

0 commit comments

Comments
 (0)