diff --git a/.changeset/cloudflare-fetch-upgrade.md b/.changeset/cloudflare-fetch-upgrade.md new file mode 100644 index 00000000..f41d502c --- /dev/null +++ b/.changeset/cloudflare-fetch-upgrade.md @@ -0,0 +1,11 @@ +--- +'@openai/agents-realtime': patch +'@openai/agents-extensions': patch +--- + +feat: add factory-based Cloudflare support. + +- Realtime (WebSocket): add `createWebSocket` and `skipOpenEventListeners` options to enable + custom socket creation and connection state control for specialized runtimes. +- Extensions: add `CloudflareRealtimeTransportLayer`, which performs a `fetch()`-based WebSocket + upgrade on Cloudflare/workerd and integrates via the WebSocket factory. diff --git a/packages/agents-extensions/src/CloudflareRealtimeTransport.ts b/packages/agents-extensions/src/CloudflareRealtimeTransport.ts new file mode 100644 index 00000000..7279a6c6 --- /dev/null +++ b/packages/agents-extensions/src/CloudflareRealtimeTransport.ts @@ -0,0 +1,76 @@ +import { + RealtimeTransportLayer, + OpenAIRealtimeWebSocket, + OpenAIRealtimeWebSocketOptions, +} from '@openai/agents/realtime'; + +/** + * An adapter transport for Cloudflare Workers (workerd) environments. + * + * Cloudflare Workers cannot open outbound client WebSockets using the global `WebSocket` + * constructor. Instead, a `fetch()` request with `Upgrade: websocket` must be performed and the + * returned `response.webSocket` must be `accept()`ed. This transport encapsulates that pattern and + * plugs into the Realtime SDK via the factory-based `createWebSocket` option. + * + * It behaves like `OpenAIRealtimeWebSocket`, but establishes the connection using `fetch()` and + * sets `skipOpenEventListeners: true` since workerd sockets do not emit a traditional `open` + * event after acceptance. + * + * Reference: Response API — `response.webSocket` (Cloudflare Workers). + * https://developers.cloudflare.com/workers/runtime-apis/response/. + */ +export class CloudflareRealtimeTransportLayer + extends OpenAIRealtimeWebSocket + implements RealtimeTransportLayer +{ + protected _audioLengthMs: number = 0; + + constructor(options: OpenAIRealtimeWebSocketOptions) { + super({ + ...options, + createWebSocket: async ({ url, apiKey }) => { + return await this.#buildCloudflareWebSocket({ url, apiKey }); + }, + skipOpenEventListeners: true, + }); + } + + /** + * Builds a WebSocket using Cloudflare's `fetch()` + `Upgrade: websocket` flow and accepts it. + * Transforms `ws(s)` to `http(s)` for the upgrade request and forwards standard headers. + */ + async #buildCloudflareWebSocket({ + url, + apiKey, + }: { + url: string; + apiKey: string; + }): Promise { + const transformedUrl = url.replace(/^ws/i, 'http'); + if (!transformedUrl) { + throw new Error('Realtime URL is not defined'); + } + + const response = await fetch(transformedUrl, { + method: 'GET', + headers: { + Authorization: `Bearer ${apiKey}`, + 'Sec-WebSocket-Protocol': 'realtime', + Connection: 'Upgrade', + Upgrade: 'websocket', + ...this.getCommonRequestHeaders(), + }, + }); + + const upgradedSocket = (response as any).webSocket; + if (!upgradedSocket) { + const body = await response.text().catch(() => ''); + throw new Error( + `Failed to upgrade websocket: ${response.status} ${body}`, + ); + } + + upgradedSocket.accept(); + return upgradedSocket as unknown as WebSocket; + } +} diff --git a/packages/agents-extensions/src/index.ts b/packages/agents-extensions/src/index.ts index b1cafe0d..3b571f4c 100644 --- a/packages/agents-extensions/src/index.ts +++ b/packages/agents-extensions/src/index.ts @@ -1,2 +1,3 @@ -export * from './TwilioRealtimeTransport'; export * from './aiSdk'; +export * from './CloudflareRealtimeTransport'; +export * from './TwilioRealtimeTransport'; diff --git a/packages/agents-extensions/test/CloudflareRealtimeTransport.test.ts b/packages/agents-extensions/test/CloudflareRealtimeTransport.test.ts new file mode 100644 index 00000000..6924689e --- /dev/null +++ b/packages/agents-extensions/test/CloudflareRealtimeTransport.test.ts @@ -0,0 +1,87 @@ +import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest'; +import { CloudflareRealtimeTransportLayer } from '../src/CloudflareRealtimeTransport'; + +class FakeWorkersWebSocket { + url: string; + listeners: Record void)[]> = {}; + accepted = false; + constructor(url: string) { + this.url = url; + } + addEventListener(type: string, listener: (ev: any) => void) { + this.listeners[type] = this.listeners[type] || []; + this.listeners[type].push(listener); + } + accept() { + this.accepted = true; + } + send(_data: any) {} + close() { + this.emit('close', {}); + } + emit(type: string, ev: any) { + (this.listeners[type] || []).forEach((fn) => fn(ev)); + } +} + +describe('CloudflareRealtimeTransportLayer', () => { + let savedFetch: any; + + beforeEach(() => { + savedFetch = (globalThis as any).fetch; + }); + + afterEach(() => { + (globalThis as any).fetch = savedFetch; + }); + + it('connects via fetch upgrade and emits connection changes', async () => { + const fakeSocket = new FakeWorkersWebSocket('ws://example'); + const fetchSpy = vi.fn().mockResolvedValue({ + webSocket: fakeSocket, + status: 101, + text: vi.fn().mockResolvedValue(''), + }); + (globalThis as any).fetch = fetchSpy; + + const transport = new CloudflareRealtimeTransportLayer({ + url: 'wss://api.openai.com/v1/realtime?model=foo', + }); + + const statuses: string[] = []; + transport.on('connection_change', (s) => statuses.push(s)); + + await transport.connect({ apiKey: 'ek_test', model: 'foo' }); + + expect(fetchSpy).toHaveBeenCalledTimes(1); + // wss -> https + expect(fetchSpy.mock.calls[0][0]).toBe( + 'https://api.openai.com/v1/realtime?model=foo', + ); + const init = fetchSpy.mock.calls[0][1]; + expect(init.method).toBe('GET'); + expect(init.headers['Authorization']).toBe('Bearer ek_test'); + expect(init.headers['Upgrade']).toBe('websocket'); + expect(init.headers['Connection']).toBe('Upgrade'); + expect(init.headers['Sec-WebSocket-Protocol']).toBe('realtime'); + + // connected without relying on 'open' listener. + expect(statuses).toEqual(['connecting', 'connected']); + }); + + it('propagates fetch-upgrade failures with detailed error', async () => { + const fetchSpy = vi.fn().mockResolvedValue({ + status: 400, + text: vi.fn().mockResolvedValue('No upgrade'), + }); + (globalThis as any).fetch = fetchSpy; + + const transport = new CloudflareRealtimeTransportLayer({ + url: 'wss://api.openai.com/v1/realtime?model=bar', + }); + + await expect( + transport.connect({ apiKey: 'ek_x', model: 'bar' }), + ).rejects.toThrow('Failed to upgrade websocket: 400 No upgrade'); + }); +}); diff --git a/packages/agents-realtime/src/openaiRealtimeWebsocket.ts b/packages/agents-realtime/src/openaiRealtimeWebsocket.ts index 5def1de2..de1e9dcc 100644 --- a/packages/agents-realtime/src/openaiRealtimeWebsocket.ts +++ b/packages/agents-realtime/src/openaiRealtimeWebsocket.ts @@ -35,6 +35,11 @@ export type WebSocketState = websocket: WebSocket; }; +export interface CreateWebSocketOptions { + url: string; + apiKey: string; +} + /** * The options for the OpenAI Realtime WebSocket transport layer. */ @@ -51,6 +56,22 @@ export type OpenAIRealtimeWebSocketOptions = { * The URL to use for the WebSocket connection. */ url?: string; + /** + * Builds a new WebSocket connection. + * @param options - The options for the WebSocket connection. + * @returns The WebSocket connection. + */ + createWebSocket?: (options: CreateWebSocketOptions) => Promise; + /** + * When you pass your own createWebSocket function, which completes the connection state transition, + * you can set this to true to skip registering the `open` event listener for the same purpose. + * If this flag is set to true, the constructor will immediately call the internal operation + * to mark the internal connection state to `connected`. Otherwise, the constructor will register + * the `open` event listener and wait for it to be triggered. + * + * By default (meaning if this property is absent), this is set to false. + */ + skipOpenEventListeners?: boolean; } & OpenAIRealtimeBaseOptions; /** @@ -81,11 +102,19 @@ export class OpenAIRealtimeWebSocket protected _firstAudioTimestamp: number | undefined; protected _audioLengthMs: number = 0; #ongoingResponse: boolean = false; + #createWebSocket?: (options: CreateWebSocketOptions) => Promise; + #skipOpenEventListeners?: boolean; constructor(options: OpenAIRealtimeWebSocketOptions = {}) { super(options); this.#url = options.url; this.#useInsecureApiKey = options.useInsecureApiKey ?? false; + this.#createWebSocket = options.createWebSocket; + this.#skipOpenEventListeners = options.skipOpenEventListeners ?? false; + } + + protected getCommonRequestHeaders() { + return HEADERS; } /** @@ -128,7 +157,7 @@ export class OpenAIRealtimeWebSocket this.emit('audio', audioEvent); } - #setupWebSocket( + async #setupWebSocket( resolve: (value: void) => void, reject: (reason?: any) => void, sessionConfig: Partial, @@ -154,30 +183,39 @@ export class OpenAIRealtimeWebSocket ); } - // browsers and workerd should use the protocols argument, node should use the headers argument - const websocketArguments = useWebSocketProtocols - ? [ - 'realtime', - // Auth - 'openai-insecure-api-key.' + this.#apiKey, - // Version header - WEBSOCKET_META, - ] - : { - headers: { - Authorization: `Bearer ${this.#apiKey}`, - ...HEADERS, - }, - }; + let ws: WebSocket | null = null; - const ws = new WebSocket(this.#url!, websocketArguments as any); + if (this.#createWebSocket) { + ws = await this.#createWebSocket({ + url: this.#url!, + apiKey: this.#apiKey, + }); + } else { + // browsers and workerd should use the protocols argument, node should use the headers argument + const websocketArguments = useWebSocketProtocols + ? [ + 'realtime', + // Auth + 'openai-insecure-api-key.' + this.#apiKey, + // Version header + WEBSOCKET_META, + ] + : { + headers: { + Authorization: `Bearer ${this.#apiKey}`, + ...this.getCommonRequestHeaders(), + }, + }; + + ws = new WebSocket(this.#url!, websocketArguments as any); + } this.#state = { status: 'connecting', websocket: ws, }; this.emit('connection_change', this.#state.status); - ws.addEventListener('open', () => { + const onSocketOpenReady = () => { this.#state = { status: 'connected', websocket: ws, @@ -185,7 +223,13 @@ export class OpenAIRealtimeWebSocket this.emit('connection_change', this.#state.status); this._onOpen(); resolve(); - }); + }; + + if (this.#skipOpenEventListeners === true) { + onSocketOpenReady(); + } else { + ws.addEventListener('open', onSocketOpenReady); + } ws.addEventListener('error', (error) => { this._onError(error); @@ -292,11 +336,7 @@ export class OpenAIRealtimeWebSocket }; await new Promise((resolve, reject) => { - try { - this.#setupWebSocket(resolve, reject, sessionConfig); - } catch (error) { - reject(error); - } + this.#setupWebSocket(resolve, reject, sessionConfig).catch(reject); }); await this.updateSessionConfig(sessionConfig);