diff --git a/codex-rs/Cargo.lock b/codex-rs/Cargo.lock index 507c7d7bf07..97144613d6c 100644 --- a/codex-rs/Cargo.lock +++ b/codex-rs/Cargo.lock @@ -1061,7 +1061,6 @@ dependencies = [ "codex-apply-patch", "codex-async-utils", "codex-file-search", - "codex-mcp-client", "codex-otel", "codex-protocol", "codex-rmcp-client", @@ -1249,19 +1248,6 @@ dependencies = [ "wiremock", ] -[[package]] -name = "codex-mcp-client" -version = "0.0.0" -dependencies = [ - "anyhow", - "mcp-types", - "serde", - "serde_json", - "tokio", - "tracing", - "tracing-subscriber", -] - [[package]] name = "codex-mcp-server" version = "0.0.0" diff --git a/codex-rs/Cargo.toml b/codex-rs/Cargo.toml index 83c9e78d1b7..d72c527bc2b 100644 --- a/codex-rs/Cargo.toml +++ b/codex-rs/Cargo.toml @@ -20,7 +20,6 @@ members = [ "git-tooling", "linux-sandbox", "login", - "mcp-client", "mcp-server", "mcp-types", "ollama", @@ -68,7 +67,6 @@ codex-file-search = { path = "file-search" } codex-git-tooling = { path = "git-tooling" } codex-linux-sandbox = { path = "linux-sandbox" } codex-login = { path = "login" } -codex-mcp-client = { path = "mcp-client" } codex-mcp-server = { path = "mcp-server" } codex-ollama = { path = "ollama" } codex-otel = { path = "otel" } diff --git a/codex-rs/core/Cargo.toml b/codex-rs/core/Cargo.toml index fdc1136f08e..36d53586051 100644 --- a/codex-rs/core/Cargo.toml +++ b/codex-rs/core/Cargo.toml @@ -22,7 +22,6 @@ chrono = { workspace = true, features = ["serde"] } codex-app-server-protocol = { workspace = true } codex-apply-patch = { workspace = true } codex-file-search = { workspace = true } -codex-mcp-client = { workspace = true } codex-otel = { workspace = true, features = ["otel"] } codex-protocol = { workspace = true } codex-rmcp-client = { workspace = true } diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index f0b2f61b68a..202e24b00ca 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -458,9 +458,6 @@ impl Session { let mcp_fut = McpConnectionManager::new( config.mcp_servers.clone(), - config - .features - .enabled(crate::features::Feature::RmcpClient), config.mcp_oauth_credentials_store_mode, ); let default_shell_fut = shell::default_user_shell(); diff --git a/codex-rs/core/src/features.rs b/codex-rs/core/src/features.rs index 647aac5fa02..ead4604d547 100644 --- a/codex-rs/core/src/features.rs +++ b/codex-rs/core/src/features.rs @@ -31,7 +31,7 @@ pub enum Feature { UnifiedExec, /// Use the streamable exec-command/write-stdin tool pair. StreamableShell, - /// Use the official Rust MCP client (rmcp). + /// Enable experimental RMCP features such as OAuth login. RmcpClient, /// Include the freeform apply_patch tool. ApplyPatchFreeform, diff --git a/codex-rs/core/src/mcp_connection_manager.rs b/codex-rs/core/src/mcp_connection_manager.rs index edf612c433d..8cc2f48b72b 100644 --- a/codex-rs/core/src/mcp_connection_manager.rs +++ b/codex-rs/core/src/mcp_connection_manager.rs @@ -1,6 +1,6 @@ //! Connection manager for Model Context Protocol (MCP) servers. //! -//! The [`McpConnectionManager`] owns one [`codex_mcp_client::McpClient`] per +//! The [`McpConnectionManager`] owns one [`codex_rmcp_client::RmcpClient`] per //! configured server (keyed by the *server name*). It offers convenience //! helpers to query the available tools across *all* servers and returns them //! in a single aggregated map using the fully-qualified tool name @@ -10,14 +10,12 @@ use std::collections::HashMap; use std::collections::HashSet; use std::env; use std::ffi::OsString; -use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; use anyhow::Context; use anyhow::Result; use anyhow::anyhow; -use codex_mcp_client::McpClient; use codex_rmcp_client::OAuthCredentialsStoreMode; use codex_rmcp_client::RmcpClient; use mcp_types::ClientCapabilities; @@ -99,134 +97,12 @@ struct ToolInfo { } struct ManagedClient { - client: McpClientAdapter, + client: Arc, startup_timeout: Duration, tool_timeout: Option, } -#[derive(Clone)] -enum McpClientAdapter { - Legacy(Arc), - Rmcp(Arc), -} - -impl McpClientAdapter { - #[allow(clippy::too_many_arguments)] - async fn new_stdio_client( - use_rmcp_client: bool, - program: OsString, - args: Vec, - env: Option>, - env_vars: Vec, - cwd: Option, - params: mcp_types::InitializeRequestParams, - startup_timeout: Duration, - ) -> Result { - if use_rmcp_client { - let client = - Arc::new(RmcpClient::new_stdio_client(program, args, env, &env_vars, cwd).await?); - client.initialize(params, Some(startup_timeout)).await?; - Ok(McpClientAdapter::Rmcp(client)) - } else { - let client = - Arc::new(McpClient::new_stdio_client(program, args, env, &env_vars, cwd).await?); - client.initialize(params, Some(startup_timeout)).await?; - Ok(McpClientAdapter::Legacy(client)) - } - } - - #[allow(clippy::too_many_arguments)] - async fn new_streamable_http_client( - server_name: String, - url: String, - bearer_token: Option, - http_headers: Option>, - env_http_headers: Option>, - params: mcp_types::InitializeRequestParams, - startup_timeout: Duration, - store_mode: OAuthCredentialsStoreMode, - ) -> Result { - let client = Arc::new( - RmcpClient::new_streamable_http_client( - &server_name, - &url, - bearer_token, - http_headers, - env_http_headers, - store_mode, - ) - .await?, - ); - client.initialize(params, Some(startup_timeout)).await?; - Ok(McpClientAdapter::Rmcp(client)) - } - - async fn list_tools( - &self, - params: Option, - timeout: Option, - ) -> Result { - match self { - McpClientAdapter::Legacy(client) => client.list_tools(params, timeout).await, - McpClientAdapter::Rmcp(client) => client.list_tools(params, timeout).await, - } - } - - async fn list_resources( - &self, - params: Option, - timeout: Option, - ) -> Result { - match self { - McpClientAdapter::Legacy(_) => Ok(ListResourcesResult { - next_cursor: None, - resources: Vec::new(), - }), - McpClientAdapter::Rmcp(client) => client.list_resources(params, timeout).await, - } - } - - async fn read_resource( - &self, - params: mcp_types::ReadResourceRequestParams, - timeout: Option, - ) -> Result { - match self { - McpClientAdapter::Legacy(_) => Err(anyhow!( - "resources/read is not supported by legacy MCP clients" - )), - McpClientAdapter::Rmcp(client) => client.read_resource(params, timeout).await, - } - } - - async fn list_resource_templates( - &self, - params: Option, - timeout: Option, - ) -> Result { - match self { - McpClientAdapter::Legacy(_) => Ok(ListResourceTemplatesResult { - next_cursor: None, - resource_templates: Vec::new(), - }), - McpClientAdapter::Rmcp(client) => client.list_resource_templates(params, timeout).await, - } - } - - async fn call_tool( - &self, - name: String, - arguments: Option, - timeout: Option, - ) -> Result { - match self { - McpClientAdapter::Legacy(client) => client.call_tool(name, arguments, timeout).await, - McpClientAdapter::Rmcp(client) => client.call_tool(name, arguments, timeout).await, - } - } -} - -/// A thin wrapper around a set of running [`McpClient`] instances. +/// A thin wrapper around a set of running [`RmcpClient`] instances. #[derive(Default)] pub(crate) struct McpConnectionManager { /// Server-name -> client instance. @@ -243,7 +119,7 @@ pub(crate) struct McpConnectionManager { } impl McpConnectionManager { - /// Spawn a [`McpClient`] for each configured server. + /// Spawn a [`RmcpClient`] for each configured server. /// /// * `mcp_servers` – Map loaded from the user configuration where *keys* /// are human-readable server identifiers and *values* are the spawn @@ -253,7 +129,6 @@ impl McpConnectionManager { /// user should be informed about these errors. pub async fn new( mcp_servers: HashMap, - use_rmcp_client: bool, store_mode: OAuthCredentialsStoreMode, ) -> Result<(Self, ClientStartErrors)> { // Early exit if no servers are configured. @@ -316,7 +191,8 @@ impl McpConnectionManager { protocol_version: mcp_types::MCP_SCHEMA_VERSION.to_owned(), }; - let client = match transport { + let resolved_bearer_token = resolved_bearer_token.unwrap_or_default(); + let client_result = match transport { McpServerTransportConfig::Stdio { command, args, @@ -326,17 +202,18 @@ impl McpConnectionManager { } => { let command_os: OsString = command.into(); let args_os: Vec = args.into_iter().map(Into::into).collect(); - McpClientAdapter::new_stdio_client( - use_rmcp_client, - command_os, - args_os, - env, - env_vars, - cwd, - params, - startup_timeout, - ) - .await + match RmcpClient::new_stdio_client(command_os, args_os, env, &env_vars, cwd) + .await + { + Ok(client) => { + let client = Arc::new(client); + client + .initialize(params.clone(), Some(startup_timeout)) + .await + .map(|_| client) + } + Err(err) => Err(err.into()), + } } McpServerTransportConfig::StreamableHttp { url, @@ -344,22 +221,32 @@ impl McpConnectionManager { env_http_headers, .. } => { - McpClientAdapter::new_streamable_http_client( - server_name.clone(), - url, - resolved_bearer_token.unwrap_or_default(), + match RmcpClient::new_streamable_http_client( + &server_name, + &url, + resolved_bearer_token.clone(), http_headers, env_http_headers, - params, - startup_timeout, store_mode, ) .await + { + Ok(client) => { + let client = Arc::new(client); + client + .initialize(params.clone(), Some(startup_timeout)) + .await + .map(|_| client) + } + Err(err) => Err(err), + } } - } - .map(|c| (c, startup_timeout)); + }; - ((server_name, tool_timeout), client) + ( + (server_name, tool_timeout), + client_result.map(|client| (client, startup_timeout)), + ) }); } diff --git a/codex-rs/mcp-client/Cargo.toml b/codex-rs/mcp-client/Cargo.toml deleted file mode 100644 index 5025063b0aa..00000000000 --- a/codex-rs/mcp-client/Cargo.toml +++ /dev/null @@ -1,23 +0,0 @@ -[package] -name = "codex-mcp-client" -version = { workspace = true } -edition = "2024" - -[lints] -workspace = true - -[dependencies] -anyhow = { workspace = true } -mcp-types = { workspace = true } -serde = { workspace = true, features = ["derive"] } -serde_json = { workspace = true } -tracing = { workspace = true, features = ["log"] } -tracing-subscriber = { workspace = true, features = ["fmt", "env-filter"] } -tokio = { workspace = true, features = [ - "io-util", - "macros", - "process", - "rt-multi-thread", - "sync", - "time", -] } diff --git a/codex-rs/mcp-client/src/lib.rs b/codex-rs/mcp-client/src/lib.rs deleted file mode 100644 index 1664dec04d2..00000000000 --- a/codex-rs/mcp-client/src/lib.rs +++ /dev/null @@ -1,3 +0,0 @@ -mod mcp_client; - -pub use mcp_client::McpClient; diff --git a/codex-rs/mcp-client/src/main.rs b/codex-rs/mcp-client/src/main.rs deleted file mode 100644 index 8e1f322dc72..00000000000 --- a/codex-rs/mcp-client/src/main.rs +++ /dev/null @@ -1,88 +0,0 @@ -//! Simple command-line utility to exercise `McpClient`. -//! -//! Example usage: -//! -//! ```bash -//! cargo run -p codex-mcp-client -- `codex-mcp-server` -//! ``` -//! -//! Any additional arguments after the first one are forwarded to the spawned -//! program. The utility connects, issues a `tools/list` request and prints the -//! server's response as pretty JSON. - -use std::ffi::OsString; -use std::time::Duration; - -use anyhow::Context; -use anyhow::Result; -use codex_mcp_client::McpClient; -use mcp_types::ClientCapabilities; -use mcp_types::Implementation; -use mcp_types::InitializeRequestParams; -use mcp_types::ListToolsRequestParams; -use mcp_types::MCP_SCHEMA_VERSION; -use tracing_subscriber::EnvFilter; - -#[tokio::main] -async fn main() -> Result<()> { - let default_level = "debug"; - let _ = tracing_subscriber::fmt() - // Fallback to the `default_level` log filter if the environment - // variable is not set _or_ contains an invalid value - .with_env_filter( - EnvFilter::try_from_default_env() - .or_else(|_| EnvFilter::try_new(default_level)) - .unwrap_or_else(|_| EnvFilter::new(default_level)), - ) - .with_writer(std::io::stderr) - .try_init(); - - // Collect command-line arguments excluding the program name itself. - let mut args: Vec = std::env::args_os().skip(1).collect(); - - if args.is_empty() || args[0] == "--help" || args[0] == "-h" { - eprintln!("Usage: mcp-client [args..]\n\nExample: mcp-client codex-mcp-server"); - std::process::exit(1); - } - let original_args = args.clone(); - - // Spawn the subprocess and connect the client. - let program = args.remove(0); - let env = None; - let client = McpClient::new_stdio_client(program, args, env, &[], None) - .await - .with_context(|| format!("failed to spawn subprocess: {original_args:?}"))?; - - let params = InitializeRequestParams { - capabilities: ClientCapabilities { - experimental: None, - roots: None, - sampling: None, - elicitation: None, - }, - client_info: Implementation { - name: "codex-mcp-client".to_owned(), - version: env!("CARGO_PKG_VERSION").to_owned(), - title: Some("Codex".to_string()), - // This field is used by Codex when it is an MCP server: it should - // not be used when Codex is an MCP client. - user_agent: None, - }, - protocol_version: MCP_SCHEMA_VERSION.to_owned(), - }; - let timeout = Some(Duration::from_secs(10)); - let response = client.initialize(params, timeout).await?; - eprintln!("initialize response: {response:?}"); - - // Issue `tools/list` request (no params). - let timeout = None; - let tools = client - .list_tools(None::, timeout) - .await - .context("tools/list request failed")?; - - // Print the result in a human readable form. - println!("{}", serde_json::to_string_pretty(&tools)?); - - Ok(()) -} diff --git a/codex-rs/mcp-client/src/mcp_client.rs b/codex-rs/mcp-client/src/mcp_client.rs deleted file mode 100644 index 3be93f35f6e..00000000000 --- a/codex-rs/mcp-client/src/mcp_client.rs +++ /dev/null @@ -1,509 +0,0 @@ -//! A minimal async client for the Model Context Protocol (MCP). -//! -//! The client is intentionally lightweight – it is only capable of: -//! 1. Spawning a subprocess that launches a conforming MCP server that -//! communicates over stdio. -//! 2. Sending MCP requests and pairing them with their corresponding -//! responses. -//! 3. Offering a convenience helper for the common `tools/list` request. -//! -//! The crate hides all JSON‐RPC framing details behind a typed API. Users -//! interact with the [`ModelContextProtocolRequest`] trait from `mcp-types` to -//! issue requests and receive strongly-typed results. - -use std::collections::HashMap; -use std::ffi::OsString; -use std::path::PathBuf; -use std::sync::Arc; -use std::sync::atomic::AtomicI64; -use std::sync::atomic::Ordering; -use std::time::Duration; - -use anyhow::Context; -use anyhow::Result; -use anyhow::anyhow; -use mcp_types::CallToolRequest; -use mcp_types::CallToolRequestParams; -use mcp_types::InitializeRequest; -use mcp_types::InitializeRequestParams; -use mcp_types::InitializedNotification; -use mcp_types::JSONRPC_VERSION; -use mcp_types::JSONRPCMessage; -use mcp_types::JSONRPCNotification; -use mcp_types::JSONRPCRequest; -use mcp_types::JSONRPCResponse; -use mcp_types::ListToolsRequest; -use mcp_types::ListToolsRequestParams; -use mcp_types::ListToolsResult; -use mcp_types::ModelContextProtocolNotification; -use mcp_types::ModelContextProtocolRequest; -use mcp_types::RequestId; -use serde::Serialize; -use serde::de::DeserializeOwned; -use tokio::io::AsyncBufReadExt; -use tokio::io::AsyncWriteExt; -use tokio::io::BufReader; -use tokio::process::Command; -use tokio::sync::Mutex; -use tokio::sync::mpsc; -use tokio::sync::oneshot; -use tokio::time; -use tracing::debug; -use tracing::error; -use tracing::info; -use tracing::warn; - -/// Capacity of the bounded channels used for transporting messages between the -/// client API and the IO tasks. -const CHANNEL_CAPACITY: usize = 128; - -/// Internal representation of a pending request sender. -type PendingSender = oneshot::Sender; - -/// A running MCP client instance. -pub struct McpClient { - /// Retain this child process until the client is dropped. The Tokio runtime - /// will make a "best effort" to reap the process after it exits, but it is - /// not a guarantee. See the `kill_on_drop` documentation for details. - #[allow(dead_code)] - child: tokio::process::Child, - - /// Channel for sending JSON-RPC messages *to* the background writer task. - outgoing_tx: mpsc::Sender, - - /// Map of `request.id -> oneshot::Sender` used to dispatch responses back - /// to the originating caller. - pending: Arc>>, - - /// Monotonically increasing counter used to generate request IDs. - id_counter: AtomicI64, -} - -impl McpClient { - /// Spawn the given command and establish an MCP session over its STDIO. - /// Caller is responsible for sending the `initialize` request. See - /// [`initialize`](Self::initialize) for details. - pub async fn new_stdio_client( - program: OsString, - args: Vec, - env: Option>, - env_vars: &[String], - cwd: Option, - ) -> std::io::Result { - let mut command = Command::new(program); - command - .args(args) - .env_clear() - .envs(create_env_for_mcp_server(env, env_vars)) - .stdin(std::process::Stdio::piped()) - .stdout(std::process::Stdio::piped()) - .stderr(std::process::Stdio::null()) - // As noted in the `kill_on_drop` documentation, the Tokio runtime makes - // a "best effort" to reap-after-exit to avoid zombie processes, but it - // is not a guarantee. - .kill_on_drop(true); - if let Some(cwd) = cwd { - command.current_dir(cwd); - } - - let mut child = command.spawn()?; - - let stdin = child - .stdin - .take() - .ok_or_else(|| std::io::Error::other("failed to capture child stdin"))?; - let stdout = child - .stdout - .take() - .ok_or_else(|| std::io::Error::other("failed to capture child stdout"))?; - - let (outgoing_tx, mut outgoing_rx) = mpsc::channel::(CHANNEL_CAPACITY); - let pending: Arc>> = Arc::new(Mutex::new(HashMap::new())); - - // Spawn writer task. It listens on the `outgoing_rx` channel and - // writes messages to the child's STDIN. - let writer_handle = { - let mut stdin = stdin; - tokio::spawn(async move { - while let Some(msg) = outgoing_rx.recv().await { - match serde_json::to_string(&msg) { - Ok(json) => { - debug!("MCP message to server: {json}"); - if stdin.write_all(json.as_bytes()).await.is_err() { - error!("failed to write message to child stdin"); - break; - } - if stdin.write_all(b"\n").await.is_err() { - error!("failed to write newline to child stdin"); - break; - } - // No explicit flush needed on a pipe; write_all is sufficient. - } - Err(e) => error!("failed to serialize JSONRPCMessage: {e}"), - } - } - }) - }; - - // Spawn reader task. It reads line-delimited JSON from the child's - // STDOUT and dispatches responses to the pending map. - let reader_handle = { - let pending = pending.clone(); - let mut lines = BufReader::new(stdout).lines(); - - tokio::spawn(async move { - while let Ok(Some(line)) = lines.next_line().await { - debug!("MCP message from server: {line}"); - match serde_json::from_str::(&line) { - Ok(JSONRPCMessage::Response(resp)) => { - Self::dispatch_response(resp, &pending).await; - } - Ok(JSONRPCMessage::Error(err)) => { - Self::dispatch_error(err, &pending).await; - } - Ok(JSONRPCMessage::Notification(JSONRPCNotification { .. })) => { - // For now we only log server-initiated notifications. - info!("<- notification: {}", line); - } - Ok(other) => { - // Batch responses and requests are currently not - // expected from the server – log and ignore. - info!("<- unhandled message: {:?}", other); - } - Err(e) => { - error!("failed to deserialize JSONRPCMessage: {e}; line = {}", line) - } - } - } - }) - }; - - // We intentionally *detach* the tasks. They will keep running in the - // background as long as their respective resources (channels/stdin/ - // stdout) are alive. Dropping `McpClient` cancels the tasks due to - // dropped resources. - let _ = (writer_handle, reader_handle); - - Ok(Self { - child, - outgoing_tx, - pending, - id_counter: AtomicI64::new(1), - }) - } - - /// Send an arbitrary MCP request and await the typed result. - /// - /// If `timeout` is `None` the call waits indefinitely. If `Some(duration)` - /// is supplied and no response is received within the given period, a - /// timeout error is returned. - pub async fn send_request( - &self, - params: R::Params, - timeout: Option, - ) -> Result - where - R: ModelContextProtocolRequest, - R::Params: Serialize, - R::Result: DeserializeOwned, - { - // Create a new unique ID. - let id = self.id_counter.fetch_add(1, Ordering::SeqCst); - let request_id = RequestId::Integer(id); - - // Serialize params -> JSON. For many request types `Params` is - // `Option` and `None` should be encoded as *absence* of the field. - let params_json = serde_json::to_value(¶ms)?; - let params_field = if params_json.is_null() { - None - } else { - Some(params_json) - }; - - let jsonrpc_request = JSONRPCRequest { - id: request_id.clone(), - jsonrpc: JSONRPC_VERSION.to_string(), - method: R::METHOD.to_string(), - params: params_field, - }; - - let message = JSONRPCMessage::Request(jsonrpc_request); - - // oneshot channel for the response. - let (tx, rx) = oneshot::channel(); - - // Register in pending map *before* sending the message so a race where - // the response arrives immediately cannot be lost. - { - let mut guard = self.pending.lock().await; - guard.insert(id, tx); - } - - // Send to writer task. - if self.outgoing_tx.send(message).await.is_err() { - return Err(anyhow!( - "failed to send message to writer task - channel closed" - )); - } - - // Await the response, optionally bounded by a timeout. - let msg = match timeout { - Some(duration) => { - match time::timeout(duration, rx).await { - Ok(Ok(msg)) => msg, - Ok(Err(_)) => { - // Channel closed without a reply – remove the pending entry. - let mut guard = self.pending.lock().await; - guard.remove(&id); - return Err(anyhow!( - "response channel closed before a reply was received" - )); - } - Err(_) => { - // Timed out. Remove the pending entry so we don't leak. - let mut guard = self.pending.lock().await; - guard.remove(&id); - return Err(anyhow!("request timed out")); - } - } - } - None => rx - .await - .map_err(|_| anyhow!("response channel closed before a reply was received"))?, - }; - - match msg { - JSONRPCMessage::Response(JSONRPCResponse { result, .. }) => { - let typed: R::Result = serde_json::from_value(result)?; - Ok(typed) - } - JSONRPCMessage::Error(err) => Err(anyhow!(format!( - "server returned JSON-RPC error: code = {}, message = {}", - err.error.code, err.error.message - ))), - other => Err(anyhow!(format!( - "unexpected message variant received in reply path: {other:?}" - ))), - } - } - - pub async fn send_notification(&self, params: N::Params) -> Result<()> - where - N: ModelContextProtocolNotification, - N::Params: Serialize, - { - // Serialize params -> JSON. For many request types `Params` is - // `Option` and `None` should be encoded as *absence* of the field. - let params_json = serde_json::to_value(¶ms)?; - let params_field = if params_json.is_null() { - None - } else { - Some(params_json) - }; - - let method = N::METHOD.to_string(); - let jsonrpc_notification = JSONRPCNotification { - jsonrpc: JSONRPC_VERSION.to_string(), - method: method.clone(), - params: params_field, - }; - - let notification = JSONRPCMessage::Notification(jsonrpc_notification); - self.outgoing_tx - .send(notification) - .await - .with_context(|| format!("failed to send notification `{method}` to writer task")) - } - - /// Negotiates the initialization with the MCP server. Sends an `initialize` - /// request with the specified `initialize_params` and then the - /// `notifications/initialized` notification once the response has been - /// received. Returns the response to the `initialize` request. - pub async fn initialize( - &self, - initialize_params: InitializeRequestParams, - timeout: Option, - ) -> Result { - let response = self - .send_request::(initialize_params, timeout) - .await?; - self.send_notification::(None) - .await?; - Ok(response) - } - - /// Convenience wrapper around `tools/list`. - pub async fn list_tools( - &self, - params: Option, - timeout: Option, - ) -> Result { - self.send_request::(params, timeout).await - } - - /// Convenience wrapper around `tools/call`. - pub async fn call_tool( - &self, - name: String, - arguments: Option, - timeout: Option, - ) -> Result { - let params = CallToolRequestParams { name, arguments }; - debug!("MCP tool call: {params:?}"); - self.send_request::(params, timeout).await - } - - /// Internal helper: route a JSON-RPC *response* object to the pending map. - async fn dispatch_response( - resp: JSONRPCResponse, - pending: &Arc>>, - ) { - let id = match resp.id { - RequestId::Integer(i) => i, - RequestId::String(_) => { - // We only ever generate integer IDs. Receiving a string here - // means we will not find a matching entry in `pending`. - error!("response with string ID - no matching pending request"); - return; - } - }; - - let tx_opt = { - let mut guard = pending.lock().await; - guard.remove(&id) - }; - if let Some(tx) = tx_opt { - // Ignore send errors – the receiver might have been dropped. - let _ = tx.send(JSONRPCMessage::Response(resp)); - } else { - warn!(id, "no pending request found for response"); - } - } - - /// Internal helper: route a JSON-RPC *error* object to the pending map. - async fn dispatch_error( - err: mcp_types::JSONRPCError, - pending: &Arc>>, - ) { - let id = match err.id { - RequestId::Integer(i) => i, - RequestId::String(_) => return, // see comment above - }; - - let tx_opt = { - let mut guard = pending.lock().await; - guard.remove(&id) - }; - if let Some(tx) = tx_opt { - let _ = tx.send(JSONRPCMessage::Error(err)); - } - } -} - -impl Drop for McpClient { - fn drop(&mut self) { - // Even though we have already tagged this process with - // `kill_on_drop(true)` above, this extra check has the benefit of - // forcing the process to be reaped immediately if it has already exited - // instead of waiting for the Tokio runtime to reap it later. - let _ = self.child.try_wait(); - } -} - -/// Environment variables that are always included when spawning a new MCP -/// server. -#[rustfmt::skip] -#[cfg(unix)] -const DEFAULT_ENV_VARS: &[&str] = &[ - // https://modelcontextprotocol.io/docs/tools/debugging#environment-variables - // states: - // - // > MCP servers inherit only a subset of environment variables automatically, - // > like `USER`, `HOME`, and `PATH`. - // - // But it does not fully enumerate the list. Empirically, when spawning a - // an MCP server via Claude Desktop on macOS, it reports the following - // environment variables: - "HOME", - "LOGNAME", - "PATH", - "SHELL", - "USER", - "__CF_USER_TEXT_ENCODING", - - // Additional environment variables Codex chooses to include by default: - "LANG", - "LC_ALL", - "TERM", - "TMPDIR", - "TZ", -]; - -#[cfg(windows)] -const DEFAULT_ENV_VARS: &[&str] = &[ - // TODO: More research is necessary to curate this list. - "PATH", - "PATHEXT", - "USERNAME", - "USERDOMAIN", - "USERPROFILE", - "TEMP", - "TMP", -]; - -/// `extra_env` comes from the config for an entry in `mcp_servers` in -/// `config.toml`. -fn create_env_for_mcp_server( - extra_env: Option>, - env_vars: &[String], -) -> HashMap { - DEFAULT_ENV_VARS - .iter() - .copied() - .chain(env_vars.iter().map(String::as_str)) - .filter_map(|var| { - std::env::var(var) - .ok() - .map(|value| (var.to_string(), value)) - }) - .chain(extra_env.unwrap_or_default()) - .collect::>() -} - -#[cfg(test)] -mod tests { - use super::*; - - fn set_env_var(key: &str, value: &str) { - unsafe { - std::env::set_var(key, value); - } - } - - fn remove_env_var(key: &str) { - unsafe { - std::env::remove_var(key); - } - } - - #[test] - fn test_create_env_for_mcp_server() { - let env_var = "USER"; - let env_var_existing_value = std::env::var(env_var).unwrap_or_default(); - let env_var_new_value = format!("{env_var_existing_value}-extra"); - let extra_env = HashMap::from([(env_var.to_owned(), env_var_new_value.clone())]); - let mcp_server_env = create_env_for_mcp_server(Some(extra_env), &[]); - assert!(mcp_server_env.contains_key("PATH")); - assert_eq!(Some(&env_var_new_value), mcp_server_env.get(env_var)); - } - - #[test] - fn test_create_env_for_mcp_server_includes_extra_whitelisted_vars() { - let custom_var = "CUSTOM_TEST_VAR"; - let value = "value".to_string(); - set_env_var(custom_var, &value); - let mcp_server_env = create_env_for_mcp_server(None, &[custom_var.to_string()]); - assert_eq!(Some(&value), mcp_server_env.get(custom_var)); - remove_env_var(custom_var); - } -} diff --git a/docs/config.md b/docs/config.md index b5699021539..4186c4ff348 100644 --- a/docs/config.md +++ b/docs/config.md @@ -451,11 +451,7 @@ When both `enabled_tools` and `disabled_tools` are specified, Codex first restri #### Experimental RMCP client -Codex is transitioning to the [official Rust MCP SDK](https://github.com/modelcontextprotocol/rust-sdk). - -This flag enables OAuth support for streamable HTTP servers and switches STDIO servers over to the new client implementation. - -Please try and report issues with the new client. To enable it, add this to the top level of your `config.toml` +This flag enables OAuth support for streamable HTTP servers. ```toml experimental_use_rmcp_client = true