diff --git a/codex-rs/app-server/tests/suite/v2/thread_resume.rs b/codex-rs/app-server/tests/suite/v2/thread_resume.rs index ff2ccec49cca..011bd29c4827 100644 --- a/codex-rs/app-server/tests/suite/v2/thread_resume.rs +++ b/codex-rs/app-server/tests/suite/v2/thread_resume.rs @@ -2755,7 +2755,7 @@ async fn thread_resume_with_overrides_defers_updated_at_until_turn_start() -> Re } #[tokio::test] -async fn thread_resume_fails_when_required_mcp_server_fails_to_initialize() -> Result<()> { +async fn thread_resume_returns_before_required_mcp_server_initializes() -> Result<()> { let server = create_mock_responses_server_repeating_assistant("Done").await; let codex_home = TempDir::new()?; let rollout = setup_rollout_fixture(codex_home.path(), &server.uri())?; @@ -2770,24 +2770,13 @@ async fn thread_resume_fails_when_required_mcp_server_fails_to_initialize() -> R ..Default::default() }) .await?; - let err: JSONRPCError = timeout( - DEFAULT_READ_TIMEOUT, - mcp.read_stream_until_error_message(RequestId::Integer(resume_id)), - ) - .await??; - - assert!( - err.error - .message - .contains("required MCP servers failed to initialize"), - "unexpected error message: {}", - err.error.message - ); - assert!( - err.error.message.contains("required_broken"), - "unexpected error message: {}", - err.error.message - ); + let _: ThreadResumeResponse = to_response( + timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(resume_id)), + ) + .await??, + )?; Ok(()) } diff --git a/codex-rs/app-server/tests/suite/v2/thread_start.rs b/codex-rs/app-server/tests/suite/v2/thread_start.rs index 75c124e8931f..314839221f71 100644 --- a/codex-rs/app-server/tests/suite/v2/thread_start.rs +++ b/codex-rs/app-server/tests/suite/v2/thread_start.rs @@ -586,7 +586,7 @@ async fn thread_start_ephemeral_remains_pathless() -> Result<()> { } #[tokio::test] -async fn thread_start_fails_when_required_mcp_server_fails_to_initialize() -> Result<()> { +async fn thread_start_returns_before_required_mcp_server_initializes() -> Result<()> { let server = create_mock_responses_server_repeating_assistant("Done").await; let codex_home = TempDir::new()?; @@ -599,24 +599,13 @@ async fn thread_start_fails_when_required_mcp_server_fails_to_initialize() -> Re .send_thread_start_request(ThreadStartParams::default()) .await?; - let err: JSONRPCError = timeout( - DEFAULT_READ_TIMEOUT, - mcp.read_stream_until_error_message(RequestId::Integer(req_id)), - ) - .await??; - - assert!( - err.error - .message - .contains("required MCP servers failed to initialize"), - "unexpected error message: {}", - err.error.message - ); - assert!( - err.error.message.contains("required_broken"), - "unexpected error message: {}", - err.error.message - ); + let _: ThreadStartResponse = to_response( + timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(req_id)), + ) + .await??, + )?; Ok(()) } diff --git a/codex-rs/core/src/mcp_tool_call.rs b/codex-rs/core/src/mcp_tool_call.rs index 8f6e3afaf5af..7db29c3a5619 100644 --- a/codex-rs/core/src/mcp_tool_call.rs +++ b/codex-rs/core/src/mcp_tool_call.rs @@ -320,6 +320,7 @@ async fn handle_approved_mcp_tool_call( let arguments_value = invocation.arguments.clone(); let connector_id = metadata.and_then(|metadata| metadata.connector_id.as_deref()); let connector_name = metadata.and_then(|metadata| metadata.connector_name.as_deref()); + sess.ensure_mcp_connection_manager_initialized().await; let server_origin = sess .services .mcp_connection_manager @@ -1480,6 +1481,7 @@ pub(crate) async fn lookup_mcp_tool_metadata( server: &str, tool_name: &str, ) -> Option { + sess.ensure_mcp_connection_manager_initialized().await; let tools = sess .services .mcp_connection_manager @@ -1575,6 +1577,7 @@ async fn lookup_mcp_app_usage_metadata( server: &str, tool_name: &str, ) -> Option { + sess.ensure_mcp_connection_manager_initialized().await; let tools = sess .services .mcp_connection_manager diff --git a/codex-rs/core/src/session/handlers.rs b/codex-rs/core/src/session/handlers.rs index a26b9bb3f6fe..f5ac52fac725 100644 --- a/codex-rs/core/src/session/handlers.rs +++ b/codex-rs/core/src/session/handlers.rs @@ -468,7 +468,7 @@ pub async fn dynamic_tool_response(sess: &Arc, id: String, response: Dy } pub async fn refresh_mcp_servers(sess: &Arc, refresh_config: McpServerRefreshConfig) { - let mut guard = sess.pending_mcp_server_refresh_config.lock().await; + let mut guard = sess.pending_mcp_server_refresh.lock().await; *guard = Some(refresh_config); } diff --git a/codex-rs/core/src/session/mcp.rs b/codex-rs/core/src/session/mcp.rs index 9f27751f7e04..012e27ca3c4c 100644 --- a/codex-rs/core/src/session/mcp.rs +++ b/codex-rs/core/src/session/mcp.rs @@ -1,4 +1,5 @@ use super::*; +use codex_mcp::EffectiveMcpServer; use codex_mcp::ElicitationReviewRequest; use codex_mcp::ElicitationReviewer; use codex_mcp::ElicitationReviewerHandle; @@ -19,9 +20,24 @@ use rmcp::model::CreateElicitationRequestParams; use rmcp::model::ElicitationAction; use rmcp::model::Meta; use serde_json::Map; +use std::sync::atomic::Ordering; const MCP_ELICITATION_DECLINE_MESSAGE_KEY: &str = "message"; +struct ReplaceMcpConnectionManagerArgs<'a> { + submit_id: String, + approval_policy: &'a Constrained, + permission_profile: PermissionProfile, + runtime_environment: McpRuntimeEnvironment, + config: &'a Arc, + mcp_servers: HashMap, + store_mode: OAuthCredentialsStoreMode, + auth: Option<&'a CodexAuth>, + host_owned_codex_apps_enabled: bool, + client_elicitation_capability: ElicitationCapability, + elicitation_reviewer: Option, +} + #[derive(Debug, PartialEq)] enum GuardianElicitationReview { NotRequested, @@ -61,6 +77,159 @@ impl Session { Arc::new(GuardianMcpElicitationReviewer::new(self)) } + pub(crate) fn start_mcp_connection_manager_initialization(self: &Arc) { + let session = Arc::clone(self); + drop(self.services.runtime_handle.spawn(async move { + session.ensure_mcp_connection_manager_initialized().await; + })); + } + + fn session_mcp_runtime_environment( + &self, + session_configuration: &SessionConfiguration, + ) -> McpRuntimeEnvironment { + let turn_environment = crate::environment_selection::resolve_environment_selections( + self.services.environment_manager.as_ref(), + &session_configuration.environments, + ) + .unwrap_or_else(|err| { + panic!("session MCP environment selections should remain valid: {err}") + }) + .primary() + .cloned(); + match turn_environment { + Some(turn_environment) => McpRuntimeEnvironment::new( + Arc::clone(&turn_environment.environment), + turn_environment.cwd.to_path_buf(), + ), + None => McpRuntimeEnvironment::new( + self.services + .environment_manager + .default_environment() + .unwrap_or_else(|| self.services.environment_manager.local_environment()), + session_configuration.cwd.to_path_buf(), + ), + } + } + + async fn replace_mcp_connection_manager(&self, args: ReplaceMcpConnectionManagerArgs<'_>) { + let ReplaceMcpConnectionManagerArgs { + submit_id, + approval_policy, + permission_profile, + runtime_environment, + config, + mcp_servers, + store_mode, + auth, + host_owned_codex_apps_enabled, + client_elicitation_capability, + elicitation_reviewer, + } = args; + let tool_plugin_provenance = self + .services + .mcp_manager + .tool_plugin_provenance(config.as_ref()) + .await; + let auth_statuses = compute_auth_statuses(mcp_servers.iter(), store_mode, auth).await; + { + let mut guard = self.services.mcp_startup_cancellation_token.lock().await; + guard.cancel(); + *guard = CancellationToken::new(); + } + let (refreshed_manager, cancel_token) = McpConnectionManager::new( + &mcp_servers, + store_mode, + auth_statuses, + approval_policy, + submit_id, + self.get_tx_event(), + permission_profile, + runtime_environment, + config.codex_home.to_path_buf(), + codex_apps_tools_cache_key(auth), + host_owned_codex_apps_enabled, + client_elicitation_capability, + tool_plugin_provenance, + auth, + elicitation_reviewer, + ) + .await; + { + let current_manager = self.services.mcp_connection_manager.read().await; + refreshed_manager.set_elicitations_auto_deny(current_manager.elicitations_auto_deny()); + } + { + let mut guard = self.services.mcp_startup_cancellation_token.lock().await; + if guard.is_cancelled() { + cancel_token.cancel(); + } + *guard = cancel_token; + } + + let mut old_manager = { + let mut manager = self.services.mcp_connection_manager.write().await; + std::mem::replace(&mut *manager, refreshed_manager) + }; + self.mcp_connection_manager_initialized + .store(true, Ordering::Release); + old_manager.shutdown().await; + } + + #[expect( + clippy::await_holding_invalid_type, + reason = "session MCP initialization must stay single-flight while the shared pool is built" + )] + pub(crate) async fn ensure_mcp_connection_manager_initialized(&self) { + if self + .mcp_connection_manager_initialized + .load(Ordering::Acquire) + { + return; + } + + let _guard = self.services.mcp_connection_manager_init_lock.lock().await; + if self + .mcp_connection_manager_initialized + .load(Ordering::Acquire) + { + return; + } + + let auth = self.services.auth_manager.auth().await; + let session_configuration = { + let state = self.state.lock().await; + state.session_configuration.clone() + }; + let config = Arc::clone(&session_configuration.original_config_do_not_use); + let mcp_servers = self + .services + .mcp_manager + .effective_servers(&config, auth.as_ref()) + .await; + let mcp_config = config + .to_mcp_config(self.services.plugins_manager.as_ref()) + .await; + + self.replace_mcp_connection_manager(ReplaceMcpConnectionManagerArgs { + submit_id: INITIAL_SUBMIT_ID.to_owned(), + approval_policy: &session_configuration.approval_policy, + permission_profile: session_configuration.permission_profile(), + runtime_environment: self.session_mcp_runtime_environment(&session_configuration), + config: &config, + mcp_servers, + store_mode: config.mcp_oauth_credentials_store_mode, + auth: auth.as_ref(), + host_owned_codex_apps_enabled: host_owned_codex_apps_enabled( + &mcp_config, + auth.as_ref(), + ), + client_elicitation_capability: mcp_config.client_elicitation_capability, + elicitation_reviewer: None, + }) + .await; + } + #[expect( clippy::await_holding_invalid_type, reason = "active turn checks and turn state updates must remain atomic" @@ -188,6 +357,7 @@ impl Session { return Ok(()); } + self.ensure_mcp_connection_manager_initialized().await; self.services .mcp_connection_manager .read() @@ -205,6 +375,7 @@ impl Session { server: &str, params: Option, ) -> anyhow::Result { + self.ensure_mcp_connection_manager_initialized().await; self.services .mcp_connection_manager .read() @@ -222,6 +393,7 @@ impl Session { server: &str, params: Option, ) -> anyhow::Result { + self.ensure_mcp_connection_manager_initialized().await; self.services .mcp_connection_manager .read() @@ -239,6 +411,7 @@ impl Session { server: &str, params: ReadResourceRequestParams, ) -> anyhow::Result { + self.ensure_mcp_connection_manager_initialized().await; self.services .mcp_connection_manager .read() @@ -258,6 +431,7 @@ impl Session { arguments: Option, meta: Option, ) -> anyhow::Result { + self.ensure_mcp_connection_manager_initialized().await; self.services .mcp_connection_manager .read() @@ -278,71 +452,39 @@ impl Session { let mcp_config = config .to_mcp_config(self.services.plugins_manager.as_ref()) .await; - let tool_plugin_provenance = self - .services - .mcp_manager - .tool_plugin_provenance(config.as_ref()) - .await; let mcp_servers = effective_mcp_servers_from_configured(mcp_servers, &mcp_config, auth.as_ref()); - let host_owned_codex_apps_enabled = - host_owned_codex_apps_enabled(&mcp_config, auth.as_ref()); - let auth_statuses = - compute_auth_statuses(mcp_servers.iter(), store_mode, auth.as_ref()).await; - let mcp_runtime_environment = match turn_context.environments.primary() { - Some(turn_environment) => McpRuntimeEnvironment::new( - Arc::clone(&turn_environment.environment), - turn_environment.cwd.to_path_buf(), - ), - None => McpRuntimeEnvironment::new( - self.services - .environment_manager - .default_environment() - .unwrap_or_else(|| self.services.environment_manager.local_environment()), - #[allow(deprecated)] - turn_context.cwd.to_path_buf(), - ), - }; - { - let mut guard = self.services.mcp_startup_cancellation_token.lock().await; - guard.cancel(); - *guard = CancellationToken::new(); - } - let (refreshed_manager, cancel_token) = McpConnectionManager::new( - &mcp_servers, + + self.replace_mcp_connection_manager(ReplaceMcpConnectionManagerArgs { + submit_id: turn_context.sub_id.clone(), + approval_policy: &turn_context.approval_policy, + permission_profile: turn_context.permission_profile(), + runtime_environment: match turn_context.environments.primary() { + Some(turn_environment) => McpRuntimeEnvironment::new( + Arc::clone(&turn_environment.environment), + turn_environment.cwd.to_path_buf(), + ), + None => McpRuntimeEnvironment::new( + self.services + .environment_manager + .default_environment() + .unwrap_or_else(|| self.services.environment_manager.local_environment()), + #[allow(deprecated)] + turn_context.cwd.to_path_buf(), + ), + }, + config: &config, + mcp_servers, store_mode, - auth_statuses, - &turn_context.approval_policy, - turn_context.sub_id.clone(), - self.get_tx_event(), - turn_context.permission_profile(), - mcp_runtime_environment, - config.codex_home.to_path_buf(), - codex_apps_tools_cache_key(auth.as_ref()), - host_owned_codex_apps_enabled, - mcp_config.client_elicitation_capability, - tool_plugin_provenance, - auth.as_ref(), + auth: auth.as_ref(), + host_owned_codex_apps_enabled: host_owned_codex_apps_enabled( + &mcp_config, + auth.as_ref(), + ), + client_elicitation_capability: mcp_config.client_elicitation_capability, elicitation_reviewer, - ) + }) .await; - { - let current_manager = self.services.mcp_connection_manager.read().await; - refreshed_manager.set_elicitations_auto_deny(current_manager.elicitations_auto_deny()); - } - { - let mut guard = self.services.mcp_startup_cancellation_token.lock().await; - if guard.is_cancelled() { - cancel_token.cancel(); - } - *guard = cancel_token; - } - - let mut old_manager = { - let mut manager = self.services.mcp_connection_manager.write().await; - std::mem::replace(&mut *manager, refreshed_manager) - }; - old_manager.shutdown().await; } pub(crate) async fn refresh_mcp_servers_if_requested( @@ -350,15 +492,15 @@ impl Session { turn_context: &TurnContext, elicitation_reviewer: Option, ) { - let refresh_config = { self.pending_mcp_server_refresh_config.lock().await.take() }; - let Some(refresh_config) = refresh_config else { + let refresh = { self.pending_mcp_server_refresh.lock().await.take() }; + let Some(refresh) = refresh else { return; }; let McpServerRefreshConfig { mcp_servers, mcp_oauth_credentials_store_mode, - } = refresh_config; + } = refresh; let mcp_servers = match serde_json::from_value::>(mcp_servers) { diff --git a/codex-rs/core/src/session/mod.rs b/codex-rs/core/src/session/mod.rs index 2dd97710e837..b632b46ee3e1 100644 --- a/codex-rs/core/src/session/mod.rs +++ b/codex-rs/core/src/session/mod.rs @@ -142,14 +142,12 @@ use futures::future::BoxFuture; use futures::future::Shared; use futures::prelude::*; use rmcp::model::ElicitationCapability; -use rmcp::model::FormElicitationCapability; use rmcp::model::ListResourceTemplatesResult; use rmcp::model::ListResourcesResult; use rmcp::model::PaginatedRequestParams; use rmcp::model::ReadResourceRequestParams; use rmcp::model::ReadResourceResult; use rmcp::model::RequestId; -use rmcp::model::UrlElicitationCapability; use serde_json::Value; use tokio::sync::Mutex; use tokio::sync::RwLock; @@ -2722,6 +2720,7 @@ impl Session { } } if turn_context.config.include_apps_instructions && turn_context.apps_enabled() { + self.ensure_mcp_connection_manager_initialized().await; let mcp_connection_manager = self.services.mcp_connection_manager.read().await; let accessible_and_enabled_connectors = connectors::list_accessible_and_enabled_connectors_from_manager( diff --git a/codex-rs/core/src/session/session.rs b/codex-rs/core/src/session/session.rs index 5b57e11895ef..148e2e295588 100644 --- a/codex-rs/core/src/session/session.rs +++ b/codex-rs/core/src/session/session.rs @@ -24,7 +24,8 @@ pub(crate) struct Session { /// The set of enabled features should be invariant for the lifetime of the /// session. pub(super) features: ManagedFeatures, - pub(super) pending_mcp_server_refresh_config: Mutex>, + pub(super) pending_mcp_server_refresh: Mutex>, + pub(super) mcp_connection_manager_initialized: std::sync::atomic::AtomicBool, pub(crate) conversation: Arc, pub(crate) active_turn: Mutex>, pub(super) mailbox: Mailbox, @@ -407,10 +408,6 @@ impl Session { #[instrument(name = "session_init", level = "info", skip_all)] #[allow(clippy::too_many_arguments)] - #[expect( - clippy::await_holding_invalid_type, - reason = "session initialization must serialize access through session-owned manager guards" - )] pub(crate) async fn new( mut session_configuration: SessionConfiguration, config: Arc, @@ -554,13 +551,7 @@ impl Session { let mcp_servers = mcp_manager_for_mcp .effective_servers(&config_for_mcp, auth.as_ref()) .await; - let auth_statuses = compute_auth_statuses( - mcp_servers.iter(), - config_for_mcp.mcp_oauth_credentials_store_mode, - auth.as_ref(), - ) - .await; - (auth, mcp_servers, auth_statuses) + (auth, mcp_servers) } .instrument(info_span!( "session_init.auth_mcp", @@ -568,7 +559,7 @@ impl Session { )); // Join all independent futures. - let (thread_persistence_result, state_db_ctx, (auth, mcp_servers, auth_statuses)) = + let (thread_persistence_result, state_db_ctx, (auth, mcp_servers)) = tokio::join!(thread_persistence_fut, state_db_fut, auth_and_mcp_fut); let mut live_thread_init = @@ -885,6 +876,7 @@ impl Session { config.permissions.permission_profile(), ), )), + mcp_connection_manager_init_lock: Mutex::new(()), mcp_startup_cancellation_token: Mutex::new(CancellationToken::new()), unified_exec_manager: UnifiedExecProcessManager::new( config.background_terminal_max_timeout, @@ -951,7 +943,8 @@ impl Session { state: Mutex::new(state), managed_network_proxy_refresh_lock: Semaphore::new(/*permits*/ 1), features: config.features.clone(), - pending_mcp_server_refresh_config: Mutex::new(None), + pending_mcp_server_refresh: Mutex::new(None), + mcp_connection_manager_initialized: std::sync::atomic::AtomicBool::new(false), conversation: Arc::new(RealtimeConversationManager::new()), active_turn: Mutex::new(None), mailbox, @@ -1001,115 +994,7 @@ impl Session { for event in events { sess.send_event_raw(event).await; } - - let mut required_mcp_servers: Vec = mcp_servers - .iter() - .filter(|(_, server)| server.enabled() && server.required()) - .map(|(name, _)| name.clone()) - .collect(); - required_mcp_servers.sort(); - let enabled_mcp_server_count = - mcp_servers.values().filter(|server| server.enabled()).count(); - let required_mcp_server_count = required_mcp_servers.len(); - let tool_plugin_provenance = mcp_manager.tool_plugin_provenance(config.as_ref()).await; - let host_owned_codex_apps_enabled = config - .features - .apps_enabled_for_auth(auth.as_ref().is_some_and(|auth| auth.uses_codex_backend())); - let client_elicitation_capability = if config.features.enabled(Feature::AuthElicitation) { - ElicitationCapability { - form: Some(FormElicitationCapability::default()), - url: Some(UrlElicitationCapability::default()), - } - } else { - ElicitationCapability::default() - }; - { - let mut cancel_guard = sess.services.mcp_startup_cancellation_token.lock().await; - cancel_guard.cancel(); - *cancel_guard = CancellationToken::new(); - } - let turn_environment = crate::environment_selection::resolve_environment_selections( - sess.services.environment_manager.as_ref(), - &session_configuration.environments, - ) - .map_err(|err| { - CodexErr::InvalidRequest(err.to_string().replace( - "unknown turn environment id", - "unknown stored MCP environment id", - )) - })? - .primary() - .cloned(); - let mcp_runtime_environment = match turn_environment { - Some(turn_environment) => McpRuntimeEnvironment::new( - Arc::clone(&turn_environment.environment), - turn_environment.cwd.to_path_buf(), - ), - None => McpRuntimeEnvironment::new( - sess.services - .environment_manager - .default_environment() - .unwrap_or_else(|| sess.services.environment_manager.local_environment()), - session_configuration.cwd.to_path_buf(), - ), - }; - let (mcp_connection_manager, cancel_token) = McpConnectionManager::new( - &mcp_servers, - config.mcp_oauth_credentials_store_mode, - auth_statuses.clone(), - &session_configuration.approval_policy, - INITIAL_SUBMIT_ID.to_owned(), - tx_event.clone(), - session_configuration.permission_profile(), - mcp_runtime_environment, - config.codex_home.to_path_buf(), - codex_apps_tools_cache_key(auth), - host_owned_codex_apps_enabled, - client_elicitation_capability, - tool_plugin_provenance, - auth, - Some(sess.mcp_elicitation_reviewer()), - ) - .instrument(info_span!( - "session_init.mcp_manager_init", - otel.name = "session_init.mcp_manager_init", - session_init.enabled_mcp_server_count = enabled_mcp_server_count, - session_init.required_mcp_server_count = required_mcp_server_count, - )) - .await; - { - let mut manager_guard = sess.services.mcp_connection_manager.write().await; - *manager_guard = mcp_connection_manager; - } - { - let mut cancel_guard = sess.services.mcp_startup_cancellation_token.lock().await; - if cancel_guard.is_cancelled() { - cancel_token.cancel(); - } - *cancel_guard = cancel_token; - } - if !required_mcp_servers.is_empty() { - let failures = sess - .services - .mcp_connection_manager - .read() - .await - .required_startup_failures(&required_mcp_servers) - .instrument(info_span!( - "session_init.required_mcp_wait", - otel.name = "session_init.required_mcp_wait", - session_init.required_mcp_server_count = required_mcp_server_count, - )) - .await; - if !failures.is_empty() { - let details = failures - .iter() - .map(|failure| format!("{}: {}", failure.server, failure.error)) - .collect::>() - .join("; "); - anyhow::bail!("required MCP servers failed to initialize: {details}"); - } - } + sess.start_mcp_connection_manager_initialization(); sess.schedule_startup_prewarm(session_configuration.base_instructions.clone()) .await; let session_start_source = match &initial_history { diff --git a/codex-rs/core/src/session/tests.rs b/codex-rs/core/src/session/tests.rs index aa89efe1d490..c3f623ca8452 100644 --- a/codex-rs/core/src/session/tests.rs +++ b/codex-rs/core/src/session/tests.rs @@ -4300,6 +4300,7 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) { config.permissions.permission_profile(), ), )), + mcp_connection_manager_init_lock: Mutex::new(()), mcp_startup_cancellation_token: Mutex::new(CancellationToken::new()), unified_exec_manager: UnifiedExecProcessManager::new( config.background_terminal_max_timeout, @@ -4408,7 +4409,8 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) { state: Mutex::new(state), managed_network_proxy_refresh_lock: Semaphore::new(/*permits*/ 1), features: config.features.clone(), - pending_mcp_server_refresh_config: Mutex::new(None), + pending_mcp_server_refresh: Mutex::new(None), + mcp_connection_manager_initialized: std::sync::atomic::AtomicBool::new(false), conversation: Arc::new(RealtimeConversationManager::new()), active_turn: Mutex::new(None), mailbox, @@ -6156,6 +6158,7 @@ where config.permissions.permission_profile(), ), )), + mcp_connection_manager_init_lock: Mutex::new(()), mcp_startup_cancellation_token: Mutex::new(CancellationToken::new()), unified_exec_manager: UnifiedExecProcessManager::new( config.background_terminal_max_timeout, @@ -6264,7 +6267,8 @@ where state: Mutex::new(state), managed_network_proxy_refresh_lock: Semaphore::new(/*permits*/ 1), features: config.features.clone(), - pending_mcp_server_refresh_config: Mutex::new(None), + pending_mcp_server_refresh: Mutex::new(None), + mcp_connection_manager_initialized: std::sync::atomic::AtomicBool::new(false), conversation: Arc::new(RealtimeConversationManager::new()), active_turn: Mutex::new(None), mailbox, @@ -6363,35 +6367,41 @@ async fn refresh_mcp_servers_is_deferred_until_next_turn() { mcp_oauth_credentials_store_mode, }; { - let mut guard = session.pending_mcp_server_refresh_config.lock().await; + let mut guard = session.pending_mcp_server_refresh.lock().await; *guard = Some(refresh_config); } assert!(!old_token.is_cancelled()); - assert!( - session - .pending_mcp_server_refresh_config - .lock() - .await - .is_some() - ); + assert!(session.pending_mcp_server_refresh.lock().await.is_some()); session .refresh_mcp_servers_if_requested(&turn_context, /*elicitation_reviewer*/ None) .await; assert!(old_token.is_cancelled()); - assert!( - session - .pending_mcp_server_refresh_config - .lock() - .await - .is_none() - ); + assert!(session.pending_mcp_server_refresh.lock().await.is_none()); let new_token = session.mcp_startup_cancellation_token().await; assert!(!new_token.is_cancelled()); } +#[tokio::test] +async fn lazy_mcp_initialization_is_idempotent() { + let (session, _turn_context) = make_session_and_context().await; + + session.ensure_mcp_connection_manager_initialized().await; + let first_token = session.mcp_startup_cancellation_token().await; + assert!(!first_token.is_cancelled()); + + session.ensure_mcp_connection_manager_initialized().await; + + assert!( + !first_token.is_cancelled(), + "reinitializing the session-owned MCP pool should not cancel the first token", + ); + let second_token = session.mcp_startup_cancellation_token().await; + assert!(!second_token.is_cancelled()); +} + #[tokio::test] async fn spawn_task_does_not_update_previous_turn_settings_for_non_run_turn_tasks() { let (sess, tc, _rx) = make_session_and_context_with_rx().await; diff --git a/codex-rs/core/src/session/turn.rs b/codex-rs/core/src/session/turn.rs index aee4bd360ff5..b3f7cbd1ac87 100644 --- a/codex-rs/core/src/session/turn.rs +++ b/codex-rs/core/src/session/turn.rs @@ -188,6 +188,7 @@ pub(crate) async fn run_turn( // Plugin mentions need raw MCP/app inventory even when app tools // are normally hidden so we can describe the plugin's currently // usable capabilities for this turn. + sess.ensure_mcp_connection_manager_initialized().await; match sess .services .mcp_connection_manager @@ -1165,6 +1166,7 @@ pub(crate) async fn built_tools( skills_outcome: Option<&SkillLoadOutcome>, cancellation_token: &CancellationToken, ) -> CodexResult> { + sess.ensure_mcp_connection_manager_initialized().await; let mcp_connection_manager = sess.services.mcp_connection_manager.read().await; let has_mcp_servers = mcp_connection_manager.has_servers(); let all_mcp_tools = mcp_connection_manager diff --git a/codex-rs/core/src/state/service.rs b/codex-rs/core/src/state/service.rs index afba409a068e..95a507d45bce 100644 --- a/codex-rs/core/src/state/service.rs +++ b/codex-rs/core/src/state/service.rs @@ -38,6 +38,7 @@ use tokio_util::sync::CancellationToken; pub(crate) struct SessionServices { pub(crate) mcp_connection_manager: Arc>, + pub(crate) mcp_connection_manager_init_lock: Mutex<()>, pub(crate) mcp_startup_cancellation_token: Mutex, pub(crate) unified_exec_manager: UnifiedExecProcessManager, #[cfg_attr(not(unix), allow(dead_code))] diff --git a/codex-rs/core/src/tools/code_mode/mod.rs b/codex-rs/core/src/tools/code_mode/mod.rs index ff9f8c889384..f9adc2711d70 100644 --- a/codex-rs/core/src/tools/code_mode/mod.rs +++ b/codex-rs/core/src/tools/code_mode/mod.rs @@ -4,6 +4,7 @@ mod response_adapter; mod wait_handler; pub(crate) mod wait_spec; +use std::collections::HashSet; use std::sync::Arc; use std::time::Duration; @@ -29,9 +30,11 @@ use crate::tools::context::ToolPayload; use crate::tools::parallel::ToolCallRuntime; use crate::tools::router::ToolCall; use crate::tools::router::ToolCallSource; +use crate::tools::router::ToolRouterParams; use crate::unified_exec::resolve_max_tokens; use codex_features::Feature; use codex_tools::ToolName; +use codex_tools::collect_code_mode_tool_definitions; use codex_utils_output_truncation::TruncationPolicy; use codex_utils_output_truncation::formatted_truncate_text_content_items_with_policy; use codex_utils_output_truncation::truncate_function_output_items_with_policy; @@ -257,6 +260,64 @@ fn truncate_code_mode_result( truncate_function_output_items_with_policy(&items, policy) } +pub(super) async fn build_enabled_tools( + exec: &ExecContext, +) -> Vec { + let router = build_nested_router(exec).await; + let specs = router.model_visible_specs(); + collect_code_mode_tool_definitions(&specs) +} + +#[expect( + clippy::await_holding_invalid_type, + reason = "nested tool router construction reads through the session-owned manager guard" +)] +async fn build_nested_router(exec: &ExecContext) -> ToolRouter { + let nested_tools_config = exec.turn.tools_config.for_code_mode_nested_tools(); + exec.session + .ensure_mcp_connection_manager_initialized() + .await; + let listed_mcp_tools = exec + .session + .services + .mcp_connection_manager + .read() + .await + .list_all_tools() + .await; + let parallel_mcp_server_names = exec + .turn + .config + .mcp_servers + .get() + .iter() + .filter_map(|(server_name, server_config)| { + server_config + .supports_parallel_tool_calls + .then_some(server_name.clone()) + }) + .collect::>(); + + let listed_mcp_tools = listed_mcp_tools + .into_iter() + .map(|mut tool| { + tool.supports_parallel_tool_calls = + parallel_mcp_server_names.contains(&tool.server_name); + tool + }) + .collect(); + + ToolRouter::from_config( + &nested_tools_config, + ToolRouterParams { + deferred_mcp_tools: None, + mcp_tools: Some(listed_mcp_tools), + discoverable_tools: None, + extension_tool_executors: Vec::new(), + dynamic_tools: exec.turn.dynamic_tools.as_slice(), + }, + ) +} async fn call_nested_tool( _exec: ExecContext, tool_runtime: ToolCallRuntime, diff --git a/codex-rs/core/src/tools/handlers/mcp_resource.rs b/codex-rs/core/src/tools/handlers/mcp_resource.rs index 630a1cccb4e2..420ad255abdc 100644 --- a/codex-rs/core/src/tools/handlers/mcp_resource.rs +++ b/codex-rs/core/src/tools/handlers/mcp_resource.rs @@ -33,7 +33,6 @@ pub use read_mcp_resource::ReadMcpResourceHandler; #[derive(Debug, Deserialize, Default)] struct ListResourcesArgs { - /// Lists all resources from all servers if not specified. #[serde(default)] server: Option, #[serde(default)] @@ -42,7 +41,6 @@ struct ListResourcesArgs { #[derive(Debug, Deserialize, Default)] struct ListResourceTemplatesArgs { - /// Lists all resource templates from all servers if not specified. #[serde(default)] server: Option, #[serde(default)] diff --git a/codex-rs/core/src/tools/handlers/mcp_resource/list_mcp_resource_templates.rs b/codex-rs/core/src/tools/handlers/mcp_resource/list_mcp_resource_templates.rs index 866c7a9e123f..3946c2b29388 100644 --- a/codex-rs/core/src/tools/handlers/mcp_resource/list_mcp_resource_templates.rs +++ b/codex-rs/core/src/tools/handlers/mcp_resource/list_mcp_resource_templates.rs @@ -105,6 +105,7 @@ impl ToolExecutor for ListMcpResourceTemplatesHandler { )); } + session.ensure_mcp_connection_manager_initialized().await; let templates = session .services .mcp_connection_manager diff --git a/codex-rs/core/src/tools/handlers/mcp_resource/list_mcp_resources.rs b/codex-rs/core/src/tools/handlers/mcp_resource/list_mcp_resources.rs index c87747eea42f..6b55ff9350a0 100644 --- a/codex-rs/core/src/tools/handlers/mcp_resource/list_mcp_resources.rs +++ b/codex-rs/core/src/tools/handlers/mcp_resource/list_mcp_resources.rs @@ -103,6 +103,7 @@ impl ToolExecutor for ListMcpResourcesHandler { )); } + session.ensure_mcp_connection_manager_initialized().await; let resources = session .services .mcp_connection_manager diff --git a/codex-rs/core/src/tools/handlers/request_plugin_install.rs b/codex-rs/core/src/tools/handlers/request_plugin_install.rs index 3b53fd007ab5..db2e9c3512e6 100644 --- a/codex-rs/core/src/tools/handlers/request_plugin_install.rs +++ b/codex-rs/core/src/tools/handlers/request_plugin_install.rs @@ -112,6 +112,7 @@ impl ToolExecutor for RequestPluginInstallHandler { } let auth = session.services.auth_manager.auth().await; + session.ensure_mcp_connection_manager_initialized().await; let manager = session.services.mcp_connection_manager.read().await; let mcp_tools = manager.list_all_tools().await; drop(manager); @@ -314,6 +315,7 @@ async fn refresh_missing_requested_connectors( return Some(Vec::new()); } + session.ensure_mcp_connection_manager_initialized().await; let manager = session.services.mcp_connection_manager.read().await; let mcp_tools = manager.list_all_tools().await; let accessible_connectors = connectors::with_app_enabled_state( diff --git a/codex-rs/core/src/tools/registry.rs b/codex-rs/core/src/tools/registry.rs index 1cf0e00623c8..e3d073d3c09b 100644 --- a/codex-rs/core/src/tools/registry.rs +++ b/codex-rs/core/src/tools/registry.rs @@ -332,7 +332,6 @@ impl ToolRegistry { ), ), ]; - { let mut active = invocation.session.active_turn.lock().await; if let Some(active_turn) = active.as_mut() {