|
| 1 | +--- |
| 2 | +title: Durable Event Iterator Integration |
| 3 | +description: Allows you to use Event Iterator by separating the streaming to a different service that provides durable event streams, automatic reconnections, recovery of missing events, and more. |
| 4 | +--- |
| 5 | + |
| 6 | +# Durable Event Iterator Integration |
| 7 | + |
| 8 | +Durable Event Iterator allows you to use [Event Iterator](/docs/event-iterator) by separating the streaming to a different service that provides durable event streams, automatic reconnections, recovery of missing events, and more. |
| 9 | + |
| 10 | +::: info |
| 11 | +This feature is not limited to [Cloudflare Durable Objects](https://developers.cloudflare.com/durable-objects/), but it is approachable and currently the only supported option. |
| 12 | +::: |
| 13 | + |
| 14 | +## Installation |
| 15 | + |
| 16 | +::: code-group |
| 17 | + |
| 18 | +```sh [npm] |
| 19 | +npm install @orpc/experimental-durable-event-iterator@latest |
| 20 | +``` |
| 21 | + |
| 22 | +```sh [yarn] |
| 23 | +yarn add @orpc/experimental-durable-event-iterator@latest |
| 24 | +``` |
| 25 | + |
| 26 | +```sh [pnpm] |
| 27 | +pnpm add @orpc/experimental-durable-event-iterator@latest |
| 28 | +``` |
| 29 | + |
| 30 | +```sh [bun] |
| 31 | +bun add @orpc/experimental-durable-event-iterator@latest |
| 32 | +``` |
| 33 | + |
| 34 | +```sh [deno] |
| 35 | +deno install npm:@orpc/experimental-durable-event-iterator@latest |
| 36 | +``` |
| 37 | + |
| 38 | +::: |
| 39 | + |
| 40 | +::: warning |
| 41 | +The `experimental-` prefix indicates that this feature is still in development and may change in the future. |
| 42 | +::: |
| 43 | + |
| 44 | +## Setup your Durable Object |
| 45 | + |
| 46 | +::: warning |
| 47 | +This section requires you to be familiar with [Cloudflare Durable Objects](https://developers.cloudflare.com/durable-objects/). Please learn it first before continuing. |
| 48 | +::: |
| 49 | + |
| 50 | +### Define your Durable Object |
| 51 | + |
| 52 | +Everything you need to do is extend the `DurableEventIteratorObject` class. Additionally, you can define [RPC Methods](https://developers.cloudflare.com/durable-objects/best-practices/create-durable-object-stubs-and-send-requests/) to publish events to the connected clients. |
| 53 | + |
| 54 | +```ts |
| 55 | +import { DurableEventIteratorObject } from '@orpc/experimental-durable-event-iterator/durable-object' |
| 56 | + |
| 57 | +export class ChatRoom extends DurableEventIteratorObject<{ message: string }> { |
| 58 | + constructor(ctx: DurableObjectState, env: Env) { |
| 59 | + super(ctx, env, { |
| 60 | + eventRetentionSeconds: 300, // Optional: Set the event retention duration (default is 5 minutes) |
| 61 | + customJsonSerializers: [ |
| 62 | + // Custom JSON serializers |
| 63 | + ] |
| 64 | + }) |
| 65 | + } |
| 66 | + |
| 67 | + publishMessage(message: string) { |
| 68 | + this.dei.websocketManager.publishEvent(this.ctx.getWebSockets(), { message }) |
| 69 | + } |
| 70 | +} |
| 71 | +``` |
| 72 | + |
| 73 | +::: warning |
| 74 | +Do not use [WebSocket Extended Methods](https://developers.cloudflare.com/durable-objects/best-practices/websockets/#extended-methods) like `ws.serializeAttachment` and `ws.deserializeAttachment` directly because you may interfere with the attachment that durable event iterator relies on. Instead, you should use the `serializeAttachment` and `deserializeAttachment` methods inside `dei.websocketManager`. |
| 75 | +::: |
| 76 | + |
| 77 | +### Upgrade Durable Event Iterator Request |
| 78 | + |
| 79 | +This step will upgrade and validate the WebSocket request to your Durable Object. You need to provide a signing key to validate the token and the corresponding Durable Object namespace. |
| 80 | + |
| 81 | +```ts |
| 82 | +import { upgradeDurableEventIteratorRequest } from '@orpc/experimental-durable-event-iterator/durable-object' |
| 83 | + |
| 84 | +export default { |
| 85 | + async fetch(request, env) { |
| 86 | + const url = new URL(request.url) |
| 87 | + |
| 88 | + if (url.pathname === '/chat-room') { |
| 89 | + return upgradeDurableEventIteratorRequest(request, { |
| 90 | + signingKey: 'secret-key', // Replace with your actual signing key |
| 91 | + namespace: env.CHAT_ROOM, |
| 92 | + }) |
| 93 | + } |
| 94 | + |
| 95 | + return new Response('Not Found', { status: 404 }) |
| 96 | + }, |
| 97 | +} satisfies ExportedHandler<Env> |
| 98 | + |
| 99 | +export { ChatRoom } |
| 100 | +``` |
| 101 | + |
| 102 | +## Server Side Usage |
| 103 | + |
| 104 | +Here we define two procedures: one for listening to messages in the chat room, and another for sending messages to all connected clients by invoking the `publishMessage` method on the Durable Object. |
| 105 | + |
| 106 | +::: info |
| 107 | +This example assumes your server and Durable Object are running in the same environment, but this is not required. Instead of invoking the `publishMessage` method directly, you can send a fetch request to wherever your Durable Object is running to send messages. |
| 108 | +::: |
| 109 | + |
| 110 | +```ts |
| 111 | +import { DurableEventIterator } from '@orpc/experimental-durable-event-iterator' |
| 112 | + |
| 113 | +export const router = { |
| 114 | + onMessage: base.handler(({ context }) => { |
| 115 | + return new DurableEventIterator<ChatRoom>('some-room', { |
| 116 | + signingKey: 'secret-key', // Replace with your actual signing key |
| 117 | + }) |
| 118 | + }), |
| 119 | + |
| 120 | + sendMessage: base |
| 121 | + .input(z.object({ message: z.string() })) |
| 122 | + .handler(async ({ context, input }) => { |
| 123 | + const id = context.env.CHAT_ROOM.idFromName('some-room') |
| 124 | + const stub = context.env.CHAT_ROOM.get(id) |
| 125 | + |
| 126 | + await stub.publishMessage(input.message) |
| 127 | + }), |
| 128 | +} |
| 129 | +``` |
| 130 | + |
| 131 | +After that, you need to use `DurableEventIteratorHandlerPlugin` in your handler to enable Durable Event Iterator support. |
| 132 | + |
| 133 | +```ts |
| 134 | +import { DurableEventIteratorHandlerPlugin } from '@orpc/experimental-durable-event-iterator' |
| 135 | + |
| 136 | +const handler = new RPCHandler(router, { |
| 137 | + plugins: [ |
| 138 | + new DurableEventIteratorHandlerPlugin(), |
| 139 | + ], |
| 140 | +}) |
| 141 | +``` |
| 142 | + |
| 143 | +## Client Side Usage |
| 144 | + |
| 145 | +On the client side, you only need to set up the plugin. The rest is the same as [Event Iterator](/docs/client/event-iterator). The `url` you define inside `DurableEventIteratorLinkPlugin` is the URL of your Durable Object upgrade endpoint. |
| 146 | + |
| 147 | +```ts |
| 148 | +import { DurableEventIteratorLinkPlugin } from '@orpc/experimental-durable-event-iterator/client' |
| 149 | + |
| 150 | +const link = new RPCLink({ |
| 151 | + url: 'http://localhost:3000/rpc', |
| 152 | + plugins: [ |
| 153 | + new DurableEventIteratorLinkPlugin({ |
| 154 | + url: 'ws://localhost:3000/chat-room', |
| 155 | + }), |
| 156 | + ], |
| 157 | +}) |
| 158 | +``` |
| 159 | + |
| 160 | +### Example |
| 161 | + |
| 162 | +```ts |
| 163 | +const iterator = await client.onMessage() |
| 164 | + |
| 165 | +for await (const { message } of iterator) { |
| 166 | + console.log('Received message:', message) |
| 167 | +} |
| 168 | + |
| 169 | +await client.sendMessage({ message: 'Hello, world!' }) |
| 170 | +``` |
| 171 | + |
| 172 | +## Recovery of Missing Events |
| 173 | + |
| 174 | +The Durable Event Iterator automatically persists events for 5 minutes and recovers missed events when clients connect/reconnect, ensuring reliable message delivery even during network interruptions. |
| 175 | +You can use the `eventRetentionSeconds` option to change the retention duration. |
| 176 | + |
| 177 | +## Durable Objects RPC |
| 178 | + |
| 179 | +Unlike the [Cloudflare Durable Objects RPC](https://developers.cloudflare.com/durable-objects/best-practices/create-durable-object-stubs-and-send-requests/), this RPC utilizes oRPC built-in RPC system, allowing clients to easily interact with Durable Objects directly. To use it, you need to define methods that accept a `WebSocket` instance as the first argument and return an [oRPC Client](/docs/client/server-side). |
| 180 | + |
| 181 | +```ts |
| 182 | +import { DurableEventIteratorObject } from '@orpc/experimental-durable-event-iterator/durable-object' |
| 183 | + |
| 184 | +export class ChatRoom extends DurableEventIteratorObject< |
| 185 | + { message: string }, // Event type |
| 186 | + { userId: string }, // (Optional) Token Attachment |
| 187 | + { something: string } |
| 188 | +> { |
| 189 | + publishMessage(currentWs: WebSocket) { |
| 190 | + return base |
| 191 | + .input(z.object({ message: z.string() })) |
| 192 | + .handler(({ input, context }) => { |
| 193 | + // Get attachments |
| 194 | + const wsAttachment = this.dei.websocketManager.deserializeAttachment(currentWs) |
| 195 | + const { userId } = wsAttachment['dei:token:payload'].att |
| 196 | + const something = wsAttachment.something |
| 197 | + |
| 198 | + // Set attachments |
| 199 | + this.dei.websocketManager.serializeAttachment(currentWs, { |
| 200 | + something: 'new value', |
| 201 | + }) |
| 202 | + |
| 203 | + // Publish event to all other connected clients |
| 204 | + this.dei.websocketManager.publishEvent( |
| 205 | + this.ctx.getWebSockets().filter(ws => ws !== currentWs), |
| 206 | + input, |
| 207 | + ) |
| 208 | + }) |
| 209 | + .callable() |
| 210 | + } |
| 211 | + |
| 212 | + /** |
| 213 | + * Nested Client |
| 214 | + */ |
| 215 | + router(ws: WebSocket) { |
| 216 | + return { |
| 217 | + ping: base.handler(() => 'pong').callable(), |
| 218 | + echo: base |
| 219 | + .input(z.object({ text: z.string() })) |
| 220 | + .handler(({ input }) => `Echo: ${input.text}`) |
| 221 | + .callable(), |
| 222 | + } |
| 223 | + } |
| 224 | +} |
| 225 | +``` |
| 226 | + |
| 227 | +### Server Side Usage |
| 228 | + |
| 229 | +```ts |
| 230 | +import { DurableEventIterator } from '@orpc/experimental-durable-event-iterator' |
| 231 | + |
| 232 | +export const onMessage = base.handler(({ context }) => { |
| 233 | + return new DurableEventIterator<ChatRoom>('some-room', { |
| 234 | + signingKey: 'secret-key', // Replace with your actual signing key |
| 235 | + att: { |
| 236 | + userId: 'user-123', // User-specific data |
| 237 | + }, |
| 238 | + }).rpc('publishMessage', 'router') // List of allowed methods |
| 239 | +}) |
| 240 | +``` |
| 241 | + |
| 242 | +::: info |
| 243 | +Clients only have permission to call the methods you defined in the `rpc` method. This provides fine-grained access control. |
| 244 | +::: |
| 245 | + |
| 246 | +::: warning |
| 247 | +The `att` (attachment) data can be viewed from the client side, so do not put any sensitive data in it. Use it only for non-sensitive metadata like user IDs or preferences. |
| 248 | +::: |
| 249 | + |
| 250 | +### Client Side Usage |
| 251 | + |
| 252 | +You can invoke methods defined inside `rpc` directly from the client `iterator` result. |
| 253 | + |
| 254 | +```ts |
| 255 | +const iterator = await client.onMessage() |
| 256 | + |
| 257 | +// Listen for events |
| 258 | +for await (const { message } of iterator) { |
| 259 | + console.log('Received message:', message) |
| 260 | +} |
| 261 | + |
| 262 | +// Call RPC methods |
| 263 | +await iterator.publishMessage({ message: 'Hello, world!' }) |
| 264 | + |
| 265 | +// Call nested router methods |
| 266 | +const response = await iterator.router.ping() |
| 267 | +console.log(response) // "pong" |
| 268 | + |
| 269 | +const echoResponse = await iterator.router.echo({ text: 'Hello' }) |
| 270 | +console.log(echoResponse) // "Echo: Hello" |
| 271 | +``` |
| 272 | + |
| 273 | +## Contract First |
| 274 | + |
| 275 | +This integration also supports [Contract First](/docs/contract-first/define-contract). What you need to do is define an interface that extends `DurableEventIteratorObject`. |
| 276 | + |
| 277 | +```ts |
| 278 | +import type { ContractRouterClient } from '@orpc/contract' |
| 279 | +import { oc, type } from '@orpc/contract' |
| 280 | +import type { ClientDurableEventIterator } from '@orpc/experimental-durable-event-iterator/client' |
| 281 | +import type { DurableEventIteratorObject } from '@orpc/experimental-durable-event-iterator' |
| 282 | + |
| 283 | +export const publishMessageContract = oc.input(z.object({ message: z.string() })) |
| 284 | + |
| 285 | +export interface ChatRoom extends DurableEventIteratorObject<{ message: string }> { |
| 286 | + publishMessage(...args: any[]): ContractRouterClient<typeof publishMessageContract> |
| 287 | +} |
| 288 | + |
| 289 | +export const contract = { |
| 290 | + onMessage: oc.output(type<ClientDurableEventIterator<ChatRoom, 'publishMessage'>>()), |
| 291 | +} |
| 292 | +``` |
0 commit comments