Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions .changeset/cloudflare-fetch-upgrade.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
'@openai/agents-realtime': patch
'@openai/agents-extensions': patch
---

feat: add factory-based Cloudflare support.

- Realtime (WebSocket): add `createWebSocket` and `skipOpenEventListeners` options to enable
custom socket creation and connection state control for specialized runtimes.
- Extensions: add `CloudflareRealtimeTransportLayer`, which performs a `fetch()`-based WebSocket
upgrade on Cloudflare/workerd and integrates via the WebSocket factory.
76 changes: 76 additions & 0 deletions packages/agents-extensions/src/CloudflareRealtimeTransport.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import {
RealtimeTransportLayer,
OpenAIRealtimeWebSocket,
OpenAIRealtimeWebSocketOptions,
} from '@openai/agents/realtime';

/**
* An adapter transport for Cloudflare Workers (workerd) environments.
*
* Cloudflare Workers cannot open outbound client WebSockets using the global `WebSocket`
* constructor. Instead, a `fetch()` request with `Upgrade: websocket` must be performed and the
* returned `response.webSocket` must be `accept()`ed. This transport encapsulates that pattern and
* plugs into the Realtime SDK via the factory-based `createWebSocket` option.
*
* It behaves like `OpenAIRealtimeWebSocket`, but establishes the connection using `fetch()` and
* sets `skipOpenEventListeners: true` since workerd sockets do not emit a traditional `open`
* event after acceptance.
*
* Reference: Response API — `response.webSocket` (Cloudflare Workers).
* https://developers.cloudflare.com/workers/runtime-apis/response/.
*/
export class CloudflareRealtimeTransportLayer
extends OpenAIRealtimeWebSocket
implements RealtimeTransportLayer
{
protected _audioLengthMs: number = 0;

constructor(options: OpenAIRealtimeWebSocketOptions) {
super({
...options,
createWebSocket: async ({ url, apiKey }) => {
return await this.#buildCloudflareWebSocket({ url, apiKey });
},
skipOpenEventListeners: true,
});
}

/**
* Builds a WebSocket using Cloudflare's `fetch()` + `Upgrade: websocket` flow and accepts it.
* Transforms `ws(s)` to `http(s)` for the upgrade request and forwards standard headers.
*/
async #buildCloudflareWebSocket({
url,
apiKey,
}: {
url: string;
apiKey: string;
}): Promise<WebSocket> {
const transformedUrl = url.replace(/^ws/i, 'http');
if (!transformedUrl) {
throw new Error('Realtime URL is not defined');
}

const response = await fetch(transformedUrl, {
method: 'GET',
headers: {
Authorization: `Bearer ${apiKey}`,
'Sec-WebSocket-Protocol': 'realtime',
Connection: 'Upgrade',
Upgrade: 'websocket',
...this.getCommonRequestHeaders(),
},
});

const upgradedSocket = (response as any).webSocket;
if (!upgradedSocket) {
const body = await response.text().catch(() => '');
throw new Error(
`Failed to upgrade websocket: ${response.status} ${body}`,
);
}

upgradedSocket.accept();
return upgradedSocket as unknown as WebSocket;
}
}
3 changes: 2 additions & 1 deletion packages/agents-extensions/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
export * from './TwilioRealtimeTransport';
export * from './aiSdk';
export * from './CloudflareRealtimeTransport';
export * from './TwilioRealtimeTransport';
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest';
import { CloudflareRealtimeTransportLayer } from '../src/CloudflareRealtimeTransport';

class FakeWorkersWebSocket {
url: string;
listeners: Record<string, ((ev: any) => void)[]> = {};
accepted = false;
constructor(url: string) {
this.url = url;
}
addEventListener(type: string, listener: (ev: any) => void) {
this.listeners[type] = this.listeners[type] || [];
this.listeners[type].push(listener);
}
accept() {
this.accepted = true;
}
send(_data: any) {}
close() {
this.emit('close', {});
}
emit(type: string, ev: any) {
(this.listeners[type] || []).forEach((fn) => fn(ev));
}
}

describe('CloudflareRealtimeTransportLayer', () => {
let savedFetch: any;

beforeEach(() => {
savedFetch = (globalThis as any).fetch;
});

afterEach(() => {
(globalThis as any).fetch = savedFetch;
});

it('connects via fetch upgrade and emits connection changes', async () => {
const fakeSocket = new FakeWorkersWebSocket('ws://example');
const fetchSpy = vi.fn().mockResolvedValue({
webSocket: fakeSocket,
status: 101,
text: vi.fn().mockResolvedValue(''),
});
(globalThis as any).fetch = fetchSpy;

const transport = new CloudflareRealtimeTransportLayer({
url: 'wss://api.openai.com/v1/realtime?model=foo',
});

const statuses: string[] = [];
transport.on('connection_change', (s) => statuses.push(s));

await transport.connect({ apiKey: 'ek_test', model: 'foo' });

expect(fetchSpy).toHaveBeenCalledTimes(1);
// wss -> https
expect(fetchSpy.mock.calls[0][0]).toBe(
'https://api.openai.com/v1/realtime?model=foo',
);
const init = fetchSpy.mock.calls[0][1];
expect(init.method).toBe('GET');
expect(init.headers['Authorization']).toBe('Bearer ek_test');
expect(init.headers['Upgrade']).toBe('websocket');
expect(init.headers['Connection']).toBe('Upgrade');
expect(init.headers['Sec-WebSocket-Protocol']).toBe('realtime');

// connected without relying on 'open' listener.
expect(statuses).toEqual(['connecting', 'connected']);
});

it('propagates fetch-upgrade failures with detailed error', async () => {
const fetchSpy = vi.fn().mockResolvedValue({
status: 400,
text: vi.fn().mockResolvedValue('No upgrade'),
});
(globalThis as any).fetch = fetchSpy;

const transport = new CloudflareRealtimeTransportLayer({
url: 'wss://api.openai.com/v1/realtime?model=bar',
});

await expect(
transport.connect({ apiKey: 'ek_x', model: 'bar' }),
).rejects.toThrow('Failed to upgrade websocket: 400 No upgrade');
});
});
88 changes: 64 additions & 24 deletions packages/agents-realtime/src/openaiRealtimeWebsocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ export type WebSocketState =
websocket: WebSocket;
};

export interface CreateWebSocketOptions {
url: string;
apiKey: string;
}

/**
* The options for the OpenAI Realtime WebSocket transport layer.
*/
Expand All @@ -51,6 +56,22 @@ export type OpenAIRealtimeWebSocketOptions = {
* The URL to use for the WebSocket connection.
*/
url?: string;
/**
* Builds a new WebSocket connection.
* @param options - The options for the WebSocket connection.
* @returns The WebSocket connection.
*/
createWebSocket?: (options: CreateWebSocketOptions) => Promise<WebSocket>;
/**
* When you pass your own createWebSocket function, which completes the connection state transition,
* you can set this to true to skip registering the `open` event listener for the same purpose.
* If this flag is set to true, the constructor will immediately call the internal operation
* to mark the internal connection state to `connected`. Otherwise, the constructor will register
* the `open` event listener and wait for it to be triggered.
*
* By default (meaning if this property is absent), this is set to false.
*/
skipOpenEventListeners?: boolean;
} & OpenAIRealtimeBaseOptions;

/**
Expand Down Expand Up @@ -81,11 +102,19 @@ export class OpenAIRealtimeWebSocket
protected _firstAudioTimestamp: number | undefined;
protected _audioLengthMs: number = 0;
#ongoingResponse: boolean = false;
#createWebSocket?: (options: CreateWebSocketOptions) => Promise<WebSocket>;
#skipOpenEventListeners?: boolean;

constructor(options: OpenAIRealtimeWebSocketOptions = {}) {
super(options);
this.#url = options.url;
this.#useInsecureApiKey = options.useInsecureApiKey ?? false;
this.#createWebSocket = options.createWebSocket;
this.#skipOpenEventListeners = options.skipOpenEventListeners ?? false;
}

protected getCommonRequestHeaders() {
return HEADERS;
}

/**
Expand Down Expand Up @@ -128,7 +157,7 @@ export class OpenAIRealtimeWebSocket
this.emit('audio', audioEvent);
}

#setupWebSocket(
async #setupWebSocket(
resolve: (value: void) => void,
reject: (reason?: any) => void,
sessionConfig: Partial<RealtimeSessionConfig>,
Expand All @@ -154,38 +183,53 @@ export class OpenAIRealtimeWebSocket
);
}

// browsers and workerd should use the protocols argument, node should use the headers argument
const websocketArguments = useWebSocketProtocols
? [
'realtime',
// Auth
'openai-insecure-api-key.' + this.#apiKey,
// Version header
WEBSOCKET_META,
]
: {
headers: {
Authorization: `Bearer ${this.#apiKey}`,
...HEADERS,
},
};
let ws: WebSocket | null = null;

const ws = new WebSocket(this.#url!, websocketArguments as any);
if (this.#createWebSocket) {
ws = await this.#createWebSocket({
url: this.#url!,
apiKey: this.#apiKey,
});
} else {
// browsers and workerd should use the protocols argument, node should use the headers argument
const websocketArguments = useWebSocketProtocols
? [
'realtime',
// Auth
'openai-insecure-api-key.' + this.#apiKey,
// Version header
WEBSOCKET_META,
]
: {
headers: {
Authorization: `Bearer ${this.#apiKey}`,
...this.getCommonRequestHeaders(),
},
};

ws = new WebSocket(this.#url!, websocketArguments as any);
}
this.#state = {
status: 'connecting',
websocket: ws,
};
this.emit('connection_change', this.#state.status);

ws.addEventListener('open', () => {
const onSocketOpenReady = () => {
this.#state = {
status: 'connected',
websocket: ws,
};
this.emit('connection_change', this.#state.status);
this._onOpen();
resolve();
});
};

if (this.#skipOpenEventListeners === true) {
onSocketOpenReady();
} else {
ws.addEventListener('open', onSocketOpenReady);
}

ws.addEventListener('error', (error) => {
this._onError(error);
Expand Down Expand Up @@ -292,11 +336,7 @@ export class OpenAIRealtimeWebSocket
};

await new Promise<void>((resolve, reject) => {
try {
this.#setupWebSocket(resolve, reject, sessionConfig);
} catch (error) {
reject(error);
}
this.#setupWebSocket(resolve, reject, sessionConfig).catch(reject);
});

await this.updateSessionConfig(sessionConfig);
Expand Down