diff --git a/codex-rs/tui/src/chatwidget.rs b/codex-rs/tui/src/chatwidget.rs index e7a58d448a7f..fda2dd1f62f5 100644 --- a/codex-rs/tui/src/chatwidget.rs +++ b/codex-rs/tui/src/chatwidget.rs @@ -30,7 +30,6 @@ //! here. That split lets the composer stage a recall entry before clearing input while this module //! records the attempted slash command after dispatch just like ordinary submitted text. use std::collections::BTreeMap; -use std::collections::BTreeSet; use std::collections::HashMap; use std::collections::HashSet; use std::collections::VecDeque; @@ -95,9 +94,7 @@ use codex_app_server_protocol::FileChangeRequestApprovalParams; use codex_app_server_protocol::GuardianApprovalReviewAction; use codex_app_server_protocol::ItemCompletedNotification; use codex_app_server_protocol::ItemStartedNotification; -use codex_app_server_protocol::McpServerStartupState; use codex_app_server_protocol::McpServerStatusDetail; -use codex_app_server_protocol::McpServerStatusUpdatedNotification; use codex_app_server_protocol::ModelVerification as AppServerModelVerification; use codex_app_server_protocol::ServerNotification; use codex_app_server_protocol::ServerRequest; @@ -191,11 +188,7 @@ use codex_protocol::protocol::ImageGenerationEndEvent; use codex_protocol::protocol::ListSkillsResponseEvent; #[cfg(test)] use codex_protocol::protocol::McpListToolsResponseEvent; -#[cfg(test)] -use codex_protocol::protocol::McpStartupCompleteEvent; use codex_protocol::protocol::McpStartupStatus; -#[cfg(test)] -use codex_protocol::protocol::McpStartupUpdateEvent; use codex_protocol::protocol::McpToolCallBeginEvent; use codex_protocol::protocol::McpToolCallEndEvent; #[cfg(test)] @@ -393,6 +386,7 @@ use self::goal_status::GoalStatusState; use self::goal_status::goal_status_indicator_from_app_goal; mod goal_menu; mod interrupts; +mod mcp_startup; use self::interrupts::InterruptManager; mod keymap_picker; mod session_header; @@ -3471,249 +3465,6 @@ impl ChatWidget { } } - /// Record one MCP startup update, promoting it into either the active startup - /// round or a buffered "next" round. - /// - /// This path has to deal with lossy app-server delivery. After - /// `finish_mcp_startup()` or `finish_mcp_startup_after_lag()`, we briefly - /// ignore incoming updates so stale events from the just-finished round do not - /// reopen startup. While that guard is active we buffer updates for a possible - /// next round, and only reactivate once the buffered set is coherent enough to - /// treat as a fresh startup round. - fn update_mcp_startup_status( - &mut self, - server: String, - status: McpStartupStatus, - complete_when_settled: bool, - ) { - let mut activated_pending_round = false; - let startup_status = if self.mcp_startup_ignore_updates_until_next_start { - // Ignore-mode buffers the next plausible round so stale post-finish - // updates cannot immediately reopen startup. A fresh `Starting` - // update resets the buffer only if we have not already seen a - // pending-round `Starting`; this preserves valid interleavings like - // `alpha: Starting -> alpha: Ready -> beta: Starting`. - if matches!(status, McpStartupStatus::Starting) - && !self.mcp_startup_pending_next_round_saw_starting - { - self.mcp_startup_pending_next_round.clear(); - self.mcp_startup_allow_terminal_only_next_round = false; - } - self.mcp_startup_pending_next_round_saw_starting |= - matches!(status, McpStartupStatus::Starting); - self.mcp_startup_pending_next_round.insert(server, status); - let Some(expected_servers) = &self.mcp_startup_expected_servers else { - return; - }; - let saw_full_round = expected_servers.is_empty() - || expected_servers - .iter() - .all(|name| self.mcp_startup_pending_next_round.contains_key(name)); - let saw_starting = self - .mcp_startup_pending_next_round - .values() - .any(|state| matches!(state, McpStartupStatus::Starting)); - if !(saw_full_round - && (saw_starting || self.mcp_startup_allow_terminal_only_next_round)) - { - return; - } - - // The buffered map now looks like a complete next round, so promote it - // to the active round and resume normal completion tracking. - self.mcp_startup_ignore_updates_until_next_start = false; - self.mcp_startup_allow_terminal_only_next_round = false; - self.mcp_startup_pending_next_round_saw_starting = false; - activated_pending_round = true; - std::mem::take(&mut self.mcp_startup_pending_next_round) - } else { - // Normal path: fold the update into the active round and surface - // per-server failures immediately. - let mut startup_status = self.mcp_startup_status.take().unwrap_or_default(); - if let McpStartupStatus::Failed { error } = &status { - self.on_warning(error); - } - startup_status.insert(server, status); - startup_status - }; - if activated_pending_round { - // A promoted buffered round may already contain terminal failures. - for state in startup_status.values() { - if let McpStartupStatus::Failed { error } = state { - self.on_warning(error); - } - } - } - self.mcp_startup_status = Some(startup_status); - self.update_task_running_state(); - - // App-server-backed startup completes when every expected server has - // reported a non-Starting status. Lag handling can force an earlier - // settle via `finish_mcp_startup_after_lag()`. - if complete_when_settled - && let Some(current) = &self.mcp_startup_status - && let Some(expected_servers) = &self.mcp_startup_expected_servers - && !current.is_empty() - && expected_servers - .iter() - .all(|name| current.contains_key(name)) - && current - .values() - .all(|state| !matches!(state, McpStartupStatus::Starting)) - { - let mut failed = Vec::new(); - let mut cancelled = Vec::new(); - for (name, state) in current { - match state { - McpStartupStatus::Ready => {} - McpStartupStatus::Failed { .. } => failed.push(name.clone()), - McpStartupStatus::Cancelled => cancelled.push(name.clone()), - McpStartupStatus::Starting => {} - } - } - failed.sort(); - cancelled.sort(); - self.finish_mcp_startup(failed, cancelled); - return; - } - if let Some(current) = &self.mcp_startup_status { - // Otherwise keep the status header focused on the remaining - // in-progress servers for the active round. - let total = current.len(); - let mut starting: Vec<_> = current - .iter() - .filter_map(|(name, state)| { - if matches!(state, McpStartupStatus::Starting) { - Some(name) - } else { - None - } - }) - .collect(); - starting.sort(); - if let Some(first) = starting.first() { - let completed = total.saturating_sub(starting.len()); - let max_to_show = 3; - let mut to_show: Vec = starting - .iter() - .take(max_to_show) - .map(ToString::to_string) - .collect(); - if starting.len() > max_to_show { - to_show.push("…".to_string()); - } - let header = if total > 1 { - format!( - "Starting MCP servers ({completed}/{total}): {}", - to_show.join(", ") - ) - } else { - format!("Booting MCP server: {first}") - }; - self.set_status_header(header); - } - } - self.request_redraw(); - } - - pub(crate) fn set_mcp_startup_expected_servers(&mut self, server_names: I) - where - I: IntoIterator, - { - self.mcp_startup_expected_servers = Some(server_names.into_iter().collect()); - } - - #[cfg(test)] - fn on_mcp_startup_update(&mut self, ev: McpStartupUpdateEvent) { - self.update_mcp_startup_status(ev.server, ev.status, /*complete_when_settled*/ false); - } - - fn finish_mcp_startup(&mut self, failed: Vec, cancelled: Vec) { - if !cancelled.is_empty() { - self.on_warning(format!( - "MCP startup interrupted. The following servers were not initialized: {}", - cancelled.join(", ") - )); - } - let mut parts = Vec::new(); - if !failed.is_empty() { - parts.push(format!("failed: {}", failed.join(", "))); - } - if !parts.is_empty() { - self.on_warning(format!("MCP startup incomplete ({})", parts.join("; "))); - } - - self.mcp_startup_status = None; - self.mcp_startup_ignore_updates_until_next_start = true; - self.mcp_startup_allow_terminal_only_next_round = false; - self.mcp_startup_pending_next_round.clear(); - self.mcp_startup_pending_next_round_saw_starting = false; - self.update_task_running_state(); - self.maybe_send_next_queued_input(); - self.request_redraw(); - } - - pub(crate) fn finish_mcp_startup_after_lag(&mut self) { - if self.mcp_startup_ignore_updates_until_next_start { - if self.mcp_startup_pending_next_round.is_empty() { - self.mcp_startup_pending_next_round_saw_starting = false; - } - self.mcp_startup_allow_terminal_only_next_round = true; - } - - let Some(current) = &self.mcp_startup_status else { - return; - }; - - let mut failed = Vec::new(); - let mut cancelled = Vec::new(); - - let mut server_names: BTreeSet = current.keys().cloned().collect(); - if let Some(expected_servers) = &self.mcp_startup_expected_servers { - server_names.extend(expected_servers.iter().cloned()); - } - - for name in server_names { - match current.get(&name) { - Some(McpStartupStatus::Ready) => {} - Some(McpStartupStatus::Failed { .. }) => failed.push(name), - Some(McpStartupStatus::Cancelled | McpStartupStatus::Starting) | None => { - cancelled.push(name); - } - } - } - - failed.sort(); - failed.dedup(); - cancelled.sort(); - cancelled.dedup(); - self.finish_mcp_startup(failed, cancelled); - } - - #[cfg(test)] - fn on_mcp_startup_complete(&mut self, ev: McpStartupCompleteEvent) { - let failed = ev.failed.into_iter().map(|f| f.server).collect(); - self.finish_mcp_startup(failed, ev.cancelled); - } - - fn on_mcp_server_status_updated(&mut self, notification: McpServerStatusUpdatedNotification) { - let status = match notification.status { - McpServerStartupState::Starting => McpStartupStatus::Starting, - McpServerStartupState::Ready => McpStartupStatus::Ready, - McpServerStartupState::Failed => McpStartupStatus::Failed { - error: notification.error.unwrap_or_else(|| { - format!("MCP client for `{}` failed to start", notification.name) - }), - }, - McpServerStartupState::Cancelled => McpStartupStatus::Cancelled, - }; - self.update_mcp_startup_status( - notification.name, - status, - /*complete_when_settled*/ true, - ); - } - /// Handle a turn aborted due to user interrupt (Esc), budget exhaustion, /// or review completion. /// When there are queued user messages, restore them into the composer diff --git a/codex-rs/tui/src/chatwidget/mcp_startup.rs b/codex-rs/tui/src/chatwidget/mcp_startup.rs new file mode 100644 index 000000000000..d2dc0fc7277f --- /dev/null +++ b/codex-rs/tui/src/chatwidget/mcp_startup.rs @@ -0,0 +1,265 @@ +//! MCP startup state and status handling for the chat widget. +//! +//! This module is a mechanical extraction from `chatwidget.rs`: it keeps the +//! buffered startup-round bookkeeping together while preserving the existing +//! event shapes and rendering behavior. + +use std::collections::BTreeSet; + +use codex_app_server_protocol::McpServerStartupState; +use codex_app_server_protocol::McpServerStatusUpdatedNotification; +#[cfg(test)] +use codex_protocol::protocol::McpStartupCompleteEvent; +use codex_protocol::protocol::McpStartupStatus; +#[cfg(test)] +use codex_protocol::protocol::McpStartupUpdateEvent; + +use super::ChatWidget; + +impl ChatWidget { + /// Record one MCP startup update, promoting it into either the active startup + /// round or a buffered "next" round. + /// + /// This path has to deal with lossy app-server delivery. After + /// `finish_mcp_startup()` or `finish_mcp_startup_after_lag()`, we briefly + /// ignore incoming updates so stale events from the just-finished round do not + /// reopen startup. While that guard is active we buffer updates for a possible + /// next round, and only reactivate once the buffered set is coherent enough to + /// treat as a fresh startup round. + fn update_mcp_startup_status( + &mut self, + server: String, + status: McpStartupStatus, + complete_when_settled: bool, + ) { + let mut activated_pending_round = false; + let startup_status = if self.mcp_startup_ignore_updates_until_next_start { + // Ignore-mode buffers the next plausible round so stale post-finish + // updates cannot immediately reopen startup. A fresh `Starting` + // update resets the buffer only if we have not already seen a + // pending-round `Starting`; this preserves valid interleavings like + // `alpha: Starting -> alpha: Ready -> beta: Starting`. + if matches!(status, McpStartupStatus::Starting) + && !self.mcp_startup_pending_next_round_saw_starting + { + self.mcp_startup_pending_next_round.clear(); + self.mcp_startup_allow_terminal_only_next_round = false; + } + self.mcp_startup_pending_next_round_saw_starting |= + matches!(status, McpStartupStatus::Starting); + self.mcp_startup_pending_next_round.insert(server, status); + let Some(expected_servers) = &self.mcp_startup_expected_servers else { + return; + }; + let saw_full_round = expected_servers.is_empty() + || expected_servers + .iter() + .all(|name| self.mcp_startup_pending_next_round.contains_key(name)); + let saw_starting = self + .mcp_startup_pending_next_round + .values() + .any(|state| matches!(state, McpStartupStatus::Starting)); + if !(saw_full_round + && (saw_starting || self.mcp_startup_allow_terminal_only_next_round)) + { + return; + } + + // The buffered map now looks like a complete next round, so promote it + // to the active round and resume normal completion tracking. + self.mcp_startup_ignore_updates_until_next_start = false; + self.mcp_startup_allow_terminal_only_next_round = false; + self.mcp_startup_pending_next_round_saw_starting = false; + activated_pending_round = true; + std::mem::take(&mut self.mcp_startup_pending_next_round) + } else { + // Normal path: fold the update into the active round and surface + // per-server failures immediately. + let mut startup_status = self.mcp_startup_status.take().unwrap_or_default(); + if let McpStartupStatus::Failed { error } = &status { + self.on_warning(error); + } + startup_status.insert(server, status); + startup_status + }; + if activated_pending_round { + // A promoted buffered round may already contain terminal failures. + for state in startup_status.values() { + if let McpStartupStatus::Failed { error } = state { + self.on_warning(error); + } + } + } + self.mcp_startup_status = Some(startup_status); + self.update_task_running_state(); + + // App-server-backed startup completes when every expected server has + // reported a non-Starting status. Lag handling can force an earlier + // settle via `finish_mcp_startup_after_lag()`. + if complete_when_settled + && let Some(current) = &self.mcp_startup_status + && let Some(expected_servers) = &self.mcp_startup_expected_servers + && !current.is_empty() + && expected_servers + .iter() + .all(|name| current.contains_key(name)) + && current + .values() + .all(|state| !matches!(state, McpStartupStatus::Starting)) + { + let mut failed = Vec::new(); + let mut cancelled = Vec::new(); + for (name, state) in current { + match state { + McpStartupStatus::Ready => {} + McpStartupStatus::Failed { .. } => failed.push(name.clone()), + McpStartupStatus::Cancelled => cancelled.push(name.clone()), + McpStartupStatus::Starting => {} + } + } + failed.sort(); + cancelled.sort(); + self.finish_mcp_startup(failed, cancelled); + return; + } + if let Some(current) = &self.mcp_startup_status { + // Otherwise keep the status header focused on the remaining + // in-progress servers for the active round. + let total = current.len(); + let mut starting: Vec<_> = current + .iter() + .filter_map(|(name, state)| { + if matches!(state, McpStartupStatus::Starting) { + Some(name) + } else { + None + } + }) + .collect(); + starting.sort(); + if let Some(first) = starting.first() { + let completed = total.saturating_sub(starting.len()); + let max_to_show = 3; + let mut to_show: Vec = starting + .iter() + .take(max_to_show) + .map(ToString::to_string) + .collect(); + if starting.len() > max_to_show { + to_show.push("…".to_string()); + } + let header = if total > 1 { + format!( + "Starting MCP servers ({completed}/{total}): {}", + to_show.join(", ") + ) + } else { + format!("Booting MCP server: {first}") + }; + self.set_status_header(header); + } + } + self.request_redraw(); + } + + pub(crate) fn set_mcp_startup_expected_servers(&mut self, server_names: I) + where + I: IntoIterator, + { + self.mcp_startup_expected_servers = Some(server_names.into_iter().collect()); + } + + #[cfg(test)] + pub(super) fn on_mcp_startup_update(&mut self, ev: McpStartupUpdateEvent) { + self.update_mcp_startup_status(ev.server, ev.status, /*complete_when_settled*/ false); + } + + pub(super) fn finish_mcp_startup(&mut self, failed: Vec, cancelled: Vec) { + if !cancelled.is_empty() { + self.on_warning(format!( + "MCP startup interrupted. The following servers were not initialized: {}", + cancelled.join(", ") + )); + } + let mut parts = Vec::new(); + if !failed.is_empty() { + parts.push(format!("failed: {}", failed.join(", "))); + } + if !parts.is_empty() { + self.on_warning(format!("MCP startup incomplete ({})", parts.join("; "))); + } + + self.mcp_startup_status = None; + self.mcp_startup_ignore_updates_until_next_start = true; + self.mcp_startup_allow_terminal_only_next_round = false; + self.mcp_startup_pending_next_round.clear(); + self.mcp_startup_pending_next_round_saw_starting = false; + self.update_task_running_state(); + self.maybe_send_next_queued_input(); + self.request_redraw(); + } + + pub(crate) fn finish_mcp_startup_after_lag(&mut self) { + if self.mcp_startup_ignore_updates_until_next_start { + if self.mcp_startup_pending_next_round.is_empty() { + self.mcp_startup_pending_next_round_saw_starting = false; + } + self.mcp_startup_allow_terminal_only_next_round = true; + } + + let Some(current) = &self.mcp_startup_status else { + return; + }; + + let mut failed = Vec::new(); + let mut cancelled = Vec::new(); + + let mut server_names: BTreeSet = current.keys().cloned().collect(); + if let Some(expected_servers) = &self.mcp_startup_expected_servers { + server_names.extend(expected_servers.iter().cloned()); + } + + for name in server_names { + match current.get(&name) { + Some(McpStartupStatus::Ready) => {} + Some(McpStartupStatus::Failed { .. }) => failed.push(name), + Some(McpStartupStatus::Cancelled | McpStartupStatus::Starting) | None => { + cancelled.push(name); + } + } + } + + failed.sort(); + failed.dedup(); + cancelled.sort(); + cancelled.dedup(); + self.finish_mcp_startup(failed, cancelled); + } + + #[cfg(test)] + pub(super) fn on_mcp_startup_complete(&mut self, ev: McpStartupCompleteEvent) { + let failed = ev.failed.into_iter().map(|f| f.server).collect(); + self.finish_mcp_startup(failed, ev.cancelled); + } + + pub(super) fn on_mcp_server_status_updated( + &mut self, + notification: McpServerStatusUpdatedNotification, + ) { + let status = match notification.status { + McpServerStartupState::Starting => McpStartupStatus::Starting, + McpServerStartupState::Ready => McpStartupStatus::Ready, + McpServerStartupState::Failed => McpStartupStatus::Failed { + error: notification.error.unwrap_or_else(|| { + format!("MCP client for `{}` failed to start", notification.name) + }), + }, + McpServerStartupState::Cancelled => McpStartupStatus::Cancelled, + }; + self.update_mcp_startup_status( + notification.name, + status, + /*complete_when_settled*/ true, + ); + } +} diff --git a/codex-rs/tui/src/chatwidget/tests.rs b/codex-rs/tui/src/chatwidget/tests.rs index 1641dc7e325c..eb7a9f1248f2 100644 --- a/codex-rs/tui/src/chatwidget/tests.rs +++ b/codex-rs/tui/src/chatwidget/tests.rs @@ -170,9 +170,6 @@ pub(super) use codex_protocol::protocol::GuardianRiskLevel; pub(super) use codex_protocol::protocol::GuardianUserAuthorization; pub(super) use codex_protocol::protocol::ImageGenerationEndEvent; pub(super) use codex_protocol::protocol::ItemCompletedEvent; -pub(super) use codex_protocol::protocol::McpStartupCompleteEvent; -pub(super) use codex_protocol::protocol::McpStartupStatus; -pub(super) use codex_protocol::protocol::McpStartupUpdateEvent; pub(super) use codex_protocol::protocol::ModelVerification as CoreModelVerification; pub(super) use codex_protocol::protocol::ModelVerificationEvent; pub(super) use codex_protocol::protocol::NonSteerableTurnKind; diff --git a/codex-rs/tui/src/chatwidget/tests/mcp_startup.rs b/codex-rs/tui/src/chatwidget/tests/mcp_startup.rs index 8b1180cfa35e..6baed81a43cf 100644 --- a/codex-rs/tui/src/chatwidget/tests/mcp_startup.rs +++ b/codex-rs/tui/src/chatwidget/tests/mcp_startup.rs @@ -1,4 +1,7 @@ use super::*; +use codex_protocol::protocol::McpStartupCompleteEvent; +use codex_protocol::protocol::McpStartupStatus; +use codex_protocol::protocol::McpStartupUpdateEvent; use pretty_assertions::assert_eq; #[tokio::test]