Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/crazy-coats-give.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@openai/agents-core': patch
---

Add remote MCP server (Streamable HTTP) support
1 change: 1 addition & 0 deletions examples/mcp/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
"scripts": {
"build-check": "tsc --noEmit",
"start:stdio": "tsx filesystem-example.ts",
"start:streamable-http": "tsx streamable-http-example.ts",
"start:hosted-mcp-on-approval": "tsx hosted-mcp-on-approval.ts",
"start:hosted-mcp-human-in-the-loop": "tsx hosted-mcp-human-in-the-loop.ts",
"start:hosted-mcp-simple": "tsx hosted-mcp-simple.ts"
Expand Down
31 changes: 31 additions & 0 deletions examples/mcp/streamable-http-example.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import { Agent, run, MCPServerStreamableHttp, withTrace } from '@openai/agents';

async function main() {
const mcpServer = new MCPServerStreamableHttp({
url: 'https://gitmcp.io/openai/codex',
name: 'GitMCP Documentation Server',
});
const agent = new Agent({
name: 'GitMCP Assistant',
instructions: 'Use the tools to respond to user requests.',
mcpServers: [mcpServer],
});

try {
await withTrace('GitMCP Documentation Server Example', async () => {
await mcpServer.connect();
const result = await run(
agent,
'Which language is this repo written in?',
);
console.log(result.finalOutput);
});
} finally {
await mcpServer.close();
}
}

main().catch((err) => {
console.error(err);
process.exit(1);
});
1 change: 1 addition & 0 deletions packages/agents-core/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ export {
invalidateServerToolsCache,
MCPServer,
MCPServerStdio,
MCPServerStreamableHttp,
} from './mcp';
export {
Model,
Expand Down
94 changes: 93 additions & 1 deletion packages/agents-core/src/mcp.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import { FunctionTool, tool, Tool } from './tool';
import { UserError } from './errors';
import { MCPServerStdio as UnderlyingMCPServerStdio } from '@openai/agents-core/_shims';
import {
MCPServerStdio as UnderlyingMCPServerStdio,
MCPServerStreamableHttp as UnderlyingMCPServerStreamableHttp,
} from '@openai/agents-core/_shims';
import { getCurrentSpan, withMCPListToolsSpan } from './tracing';
import { logger as globalLogger, getLogger, Logger } from './logger';
import debug from 'debug';
Expand All @@ -15,6 +18,9 @@ import {
export const DEFAULT_STDIO_MCP_CLIENT_LOGGER_NAME =
'openai-agents:stdio-mcp-client';

export const DEFAULT_STREAMABLE_HTTP_MCP_CLIENT_LOGGER_NAME =
'openai-agents:streamable-http-mcp-client';

/**
* Interface for MCP server implementations.
* Provides methods for connecting, listing tools, calling tools, and cleanup.
Expand Down Expand Up @@ -63,6 +69,39 @@ export abstract class BaseMCPServerStdio implements MCPServer {
}
}

export abstract class BaseMCPServerStreamableHttp implements MCPServer {
public cacheToolsList: boolean;
protected _cachedTools: any[] | undefined = undefined;

protected logger: Logger;
constructor(options: MCPServerStreamableHttpOptions) {
this.logger =
options.logger ??
getLogger(DEFAULT_STREAMABLE_HTTP_MCP_CLIENT_LOGGER_NAME);
this.cacheToolsList = options.cacheToolsList ?? true;
}

abstract get name(): string;
abstract connect(): Promise<void>;
abstract close(): Promise<void>;
abstract listTools(): Promise<any[]>;
abstract callTool(
_toolName: string,
_args: Record<string, unknown> | null,
): Promise<CallToolResultContent>;

/**
* Logs a debug message when debug logging is enabled.
* @param buildMessage A function that returns the message to log.
*/
protected debugLog(buildMessage: () => string): void {
if (debug.enabled(this.logger.namespace)) {
// only when this is true, the function to build the string is called
this.logger.debug(buildMessage());
}
}
}

/**
* Minimum MCP tool data definition.
* This type definition does not intend to cover all possible properties.
Expand Down Expand Up @@ -116,6 +155,40 @@ export class MCPServerStdio extends BaseMCPServerStdio {
return this.underlying.callTool(toolName, args);
}
}

export class MCPServerStreamableHttp extends BaseMCPServerStreamableHttp {
private underlying: UnderlyingMCPServerStreamableHttp;
constructor(options: MCPServerStreamableHttpOptions) {
super(options);
this.underlying = new UnderlyingMCPServerStreamableHttp(options);
}
get name(): string {
return this.underlying.name;
}
connect(): Promise<void> {
return this.underlying.connect();
}
close(): Promise<void> {
return this.underlying.close();
}
async listTools(): Promise<MCPTool[]> {
if (this.cacheToolsList && this._cachedTools) {
return this._cachedTools;
}
const tools = await this.underlying.listTools();
if (this.cacheToolsList) {
this._cachedTools = tools;
}
return tools;
}
callTool(
toolName: string,
args: Record<string, unknown> | null,
): Promise<CallToolResultContent> {
return this.underlying.callTool(toolName, args);
}
}

/**
* Fetches and flattens all tools from multiple MCP servers.
* Logs and skips any servers that fail to respond.
Expand Down Expand Up @@ -292,6 +365,25 @@ export type MCPServerStdioOptions =
| DefaultMCPServerStdioOptions
| FullCommandMCPServerStdioOptions;

export interface MCPServerStreamableHttpOptions {
url: string;
cacheToolsList?: boolean;
clientSessionTimeoutSeconds?: number;
name?: string;
logger?: Logger;

// ----------------------------------------------------
// OAuth
// import { OAuthClientProvider } from '@modelcontextprotocol/sdk/client/auth.js';
authProvider?: any;
// RequestInit
requestInit?: any;
// import { StreamableHTTPReconnectionOptions } from '@modelcontextprotocol/sdk/client/streamableHttp.js';
reconnectionOptions?: any;
sessionId?: string;
// ----------------------------------------------------
}

/**
* Represents a JSON-RPC request message.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import {
BaseMCPServerStdio,
BaseMCPServerStreamableHttp,
CallToolResultContent,
MCPServerStdioOptions,
MCPServerStreamableHttpOptions,
MCPTool,
} from '../../mcp';

Expand All @@ -28,3 +30,27 @@ export class MCPServerStdio extends BaseMCPServerStdio {
throw new Error('Method not implemented.');
}
}

export class MCPServerStreamableHttp extends BaseMCPServerStreamableHttp {
constructor(params: MCPServerStreamableHttpOptions) {
super(params);
}
get name(): string {
return 'MCPServerStdio';
}
connect(): Promise<void> {
throw new Error('Method not implemented.');
}
close(): Promise<void> {
throw new Error('Method not implemented.');
}
listTools(): Promise<MCPTool[]> {
throw new Error('Method not implemented.');
}
callTool(
_toolName: string,
_args: Record<string, unknown> | null,
): Promise<CallToolResultContent> {
throw new Error('Method not implemented.');
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ import type { Client } from '@modelcontextprotocol/sdk/client/index.js';

import {
BaseMCPServerStdio,
BaseMCPServerStreamableHttp,
CallToolResultContent,
DefaultMCPServerStdioOptions,
InitializeResult,
MCPServerStdioOptions,
MCPServerStreamableHttpOptions,
MCPTool,
invalidateServerToolsCache,
} from '../../mcp';
Expand Down Expand Up @@ -154,3 +156,120 @@ export class NodeMCPServerStdio extends BaseMCPServerStdio {
}
}
}

export class NodeMCPServerStreamableHttp extends BaseMCPServerStreamableHttp {
protected session: Client | null = null;
protected _cacheDirty = true;
protected _toolsList: any[] = [];
protected serverInitializeResult: InitializeResult | null = null;
protected clientSessionTimeoutSeconds?: number;

params: MCPServerStreamableHttpOptions;
private _name: string;
private transport: any = null;

constructor(params: MCPServerStreamableHttpOptions) {
super(params);
this.clientSessionTimeoutSeconds = params.clientSessionTimeoutSeconds ?? 5;
this.params = params;
this._name = params.name || `streamable-http: ${this.params.url}`;
}

async connect(): Promise<void> {
try {
const { StreamableHTTPClientTransport } = await import(
'@modelcontextprotocol/sdk/client/streamableHttp.js'
).catch(failedToImport);
const { Client } = await import(
'@modelcontextprotocol/sdk/client/index.js'
).catch(failedToImport);
this.transport = new StreamableHTTPClientTransport(
new URL(this.params.url),
{
authProvider: this.params.authProvider,
requestInit: this.params.requestInit,
reconnectionOptions: this.params.reconnectionOptions,
sessionId: this.params.sessionId,
},
);
this.session = new Client({
name: this._name,
version: '1.0.0', // You may want to make this configurable
});
await this.session.connect(this.transport);
this.serverInitializeResult = {
serverInfo: { name: this._name, version: '1.0.0' },
} as InitializeResult;
} catch (e) {
this.logger.error('Error initializing MCP server:', e);
await this.close();
throw e;
}
this.debugLog(() => `Connected to MCP server: ${this._name}`);
}

invalidateToolsCache() {
invalidateServerToolsCache(this.name);
this._cacheDirty = true;
}

// The response element type is intentionally left as `any` to avoid explosing MCP SDK type dependencies.
async listTools(): Promise<MCPTool[]> {
const { ListToolsResultSchema } = await import(
'@modelcontextprotocol/sdk/types.js'
).catch(failedToImport);
if (!this.session) {
throw new Error(
'Server not initialized. Make sure you call connect() first.',
);
}
if (this.cacheToolsList && !this._cacheDirty && this._toolsList) {
return this._toolsList;
}
this._cacheDirty = false;
const response = await this.session.listTools();
this.debugLog(() => `Listed tools: ${JSON.stringify(response)}`);
this._toolsList = ListToolsResultSchema.parse(response).tools;
return this._toolsList;
}

async callTool(
toolName: string,
args: Record<string, unknown> | null,
): Promise<CallToolResultContent> {
const { CallToolResultSchema } = await import(
'@modelcontextprotocol/sdk/types.js'
).catch(failedToImport);
if (!this.session) {
throw new Error(
'Server not initialized. Make sure you call connect() first.',
);
}
const response = await this.session.callTool({
name: toolName,
arguments: args ?? {},
});
const parsed = CallToolResultSchema.parse(response);
const result = parsed.content;
this.debugLog(
() =>
`Called tool ${toolName} (args: ${JSON.stringify(args)}, result: ${JSON.stringify(result)})`,
);
return result as CallToolResultContent;
}

get name() {
return this._name;
}

async close(): Promise<void> {
if (this.transport) {
await this.transport.close();
this.transport = null;
}
if (this.session) {
await this.session.close();
this.session = null;
}
}
}
2 changes: 1 addition & 1 deletion packages/agents-core/src/shims/shims-browser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ export function isTracingLoopRunningByDefault(): boolean {
return false;
}

export { MCPServerStdio } from './mcp-stdio/browser';
export { MCPServerStdio, MCPServerStreamableHttp } from './mcp-server/browser';

class BrowserTimer implements Timer {
constructor() {}
Expand Down
5 changes: 4 additions & 1 deletion packages/agents-core/src/shims/shims-node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@ export function isTracingLoopRunningByDefault(): boolean {
export function isBrowserEnvironment(): boolean {
return false;
}
export { NodeMCPServerStdio as MCPServerStdio } from './mcp-stdio/node';
export {
NodeMCPServerStdio as MCPServerStdio,
NodeMCPServerStreamableHttp as MCPServerStreamableHttp,
} from './mcp-server/node';

export { clearTimeout } from 'node:timers';

Expand Down
2 changes: 1 addition & 1 deletion packages/agents-core/src/shims/shims-workerd.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ export function isTracingLoopRunningByDefault(): boolean {
/**
* Right now Cloudflare Workers does not support MCP
*/
export { MCPServerStdio } from './mcp-stdio/browser';
export { MCPServerStdio, MCPServerStreamableHttp } from './mcp-server/browser';

export { clearTimeout, setTimeout } from 'node:timers';

Expand Down
Loading