diff --git a/codex-rs/core/src/mcp_tool_call.rs b/codex-rs/core/src/mcp_tool_call.rs index 5dd16acb9a05..8855aae62ee6 100644 --- a/codex-rs/core/src/mcp_tool_call.rs +++ b/codex-rs/core/src/mcp_tool_call.rs @@ -576,6 +576,11 @@ async fn execute_mcp_tool_call( ) .await .map_err(|e| format!("failed to build MCP tool request metadata: {e:#}"))?; + let mcp_call_trace = sess + .services + .rollout_thread_trace + .start_mcp_call_trace(call_id); + let request_meta = mcp_call_trace.add_request_meta(request_meta); let result = sess .call_tool( &invocation.server, diff --git a/codex-rs/core/src/mcp_tool_call_tests.rs b/codex-rs/core/src/mcp_tool_call_tests.rs index ecad2091c5ef..65f89772831e 100644 --- a/codex-rs/core/src/mcp_tool_call_tests.rs +++ b/codex-rs/core/src/mcp_tool_call_tests.rs @@ -23,6 +23,13 @@ use codex_protocol::models::PermissionProfile; use codex_protocol::protocol::AskForApproval; use codex_protocol::protocol::EventMsg; use codex_protocol::protocol::GranularApprovalConfig; +use codex_protocol::protocol::McpInvocation; +use codex_protocol::protocol::SessionSource; +use codex_rollout_trace::ThreadStartedTraceMetadata; +use codex_rollout_trace::ToolDispatchInvocation; +use codex_rollout_trace::ToolDispatchPayload; +use codex_rollout_trace::ToolDispatchRequester; +use codex_rollout_trace::replay_bundle; use core_test_support::PathExt; use core_test_support::hooks::trusted_config_layer_stack; use core_test_support::responses::ev_assistant_message; @@ -34,6 +41,9 @@ use core_test_support::responses::start_mock_server; use pretty_assertions::assert_eq; use serde::Deserialize; use std::collections::HashMap; +use std::fs; +use std::path::Path; +use std::path::PathBuf; use std::sync::Arc; use tempfile::tempdir; use tracing::Instrument; @@ -116,6 +126,60 @@ fn prompt_options( } } +#[tokio::test] +async fn execute_mcp_tool_call_records_replayable_correlation() -> anyhow::Result<()> { + let temp = tempdir()?; + let (mut session, turn_context) = make_session_and_context().await; + attach_trace_bundle(&mut session, &turn_context, temp.path())?; + + let dispatch_trace = session + .services + .rollout_thread_trace + .start_tool_dispatch_trace(|| { + Some(ToolDispatchInvocation { + thread_id: session.conversation_id.to_string(), + codex_turn_id: turn_context.sub_id.clone(), + tool_call_id: "mcp-call".to_string(), + tool_name: "search".to_string(), + tool_namespace: Some("mcp__docs__".to_string()), + requester: ToolDispatchRequester::Model { + model_visible_call_id: "mcp-call".to_string(), + }, + payload: ToolDispatchPayload::Function { + arguments: r#"{"query":"trace"}"#.to_string(), + }, + }) + }); + assert!(dispatch_trace.is_enabled()); + + let result = execute_mcp_tool_call( + &session, + &turn_context, + "mcp-call", + &McpInvocation { + server: "docs".to_string(), + tool: "search".to_string(), + arguments: Some(serde_json::json!({ "query": "trace" })), + }, + /*rewritten_arguments*/ None, + /*metadata*/ None, + /*request_meta*/ None, + ) + .await; + assert!( + result.is_err(), + "the synthetic backend is absent; only trace emission matters", + ); + + let replayed = replay_bundle(single_bundle_dir(temp.path())?)?; + assert!( + replayed.tool_calls["mcp-call"].mcp_call_id.is_some(), + "the real MCP execution path should emit a reducer-visible correlation ID", + ); + + Ok(()) +} + fn install_mcp_permission_request_hook( session: &mut Session, turn_context: &TurnContext, @@ -205,6 +269,45 @@ print({hook_output:?}) log_path.to_path_buf() } +/// Attaches a replayable rollout bundle to one synthetic session under test. +fn attach_trace_bundle( + session: &mut Session, + turn_context: &TurnContext, + root: &Path, +) -> anyhow::Result<()> { + let rollout_thread_trace = + codex_rollout_trace::ThreadTraceContext::start_root_in_root_for_test( + root, + ThreadStartedTraceMetadata { + thread_id: session.conversation_id.to_string(), + agent_path: "/root".to_string(), + task_name: None, + nickname: None, + agent_role: None, + session_source: SessionSource::Exec, + cwd: PathBuf::from("/workspace"), + rollout_path: None, + model: "gpt-test".to_string(), + provider_name: "test-provider".to_string(), + approval_policy: "never".to_string(), + sandbox_policy: "danger-full-access".to_string(), + }, + )?; + rollout_thread_trace.record_codex_turn_started(turn_context.sub_id.as_str()); + session.services.rollout_thread_trace = rollout_thread_trace; + Ok(()) +} + +/// Returns the sole bundle emitted under a temporary rollout trace root. +fn single_bundle_dir(root: &Path) -> anyhow::Result { + let mut entries = fs::read_dir(root)? + .map(|entry| entry.map(|entry| entry.path())) + .collect::, _>>()?; + entries.sort(); + assert_eq!(entries.len(), 1); + Ok(entries.remove(0)) +} + #[test] fn mcp_app_resource_uri_reads_known_tool_meta_keys() { let nested = serde_json::json!({ diff --git a/codex-rs/rollout-trace/src/lib.rs b/codex-rs/rollout-trace/src/lib.rs index 3d9e04f36b4e..54094f0525b3 100644 --- a/codex-rs/rollout-trace/src/lib.rs +++ b/codex-rs/rollout-trace/src/lib.rs @@ -10,6 +10,7 @@ mod bundle; mod code_cell; mod compaction; mod inference; +mod mcp; mod model; mod payload; mod protocol_event; @@ -33,6 +34,8 @@ pub use compaction::CompactionTraceContext; pub use inference::InferenceTraceAttempt; /// Shared recorder context for inference attempts within one Codex turn. pub use inference::InferenceTraceContext; +/// Trace-owned MCP execution correlation propagated to bridge request metadata. +pub use mcp::McpCallTraceContext; /// Public reduced trace model returned by replay. pub use model::*; /// Stable identifier for one raw payload inside a rollout bundle. diff --git a/codex-rs/rollout-trace/src/mcp.rs b/codex-rs/rollout-trace/src/mcp.rs new file mode 100644 index 000000000000..b8564e1579bf --- /dev/null +++ b/codex-rs/rollout-trace/src/mcp.rs @@ -0,0 +1,99 @@ +//! Hot-path helpers for correlating concrete MCP executions with rollout traces. +//! +//! Core decides when an MCP request is actually going to execute. The trace +//! crate owns the globally unique ID, the trace event that preserves it in the +//! reduced artifact, and the bridge-private MCP request metadata key. + +use crate::McpCallId; +use serde_json::Value as JsonValue; + +const MCP_CALL_ID_META_KEY: &str = "codex_bridge_mcp_call_id"; + +/// No-op capable handle for one concrete MCP backend call. +#[derive(Clone, Debug)] +pub struct McpCallTraceContext { + mcp_call_id: Option, +} + +impl McpCallTraceContext { + /// Builds a context that records nothing and leaves request metadata unchanged. + pub fn disabled() -> Self { + Self { mcp_call_id: None } + } + + /// Builds the trace handle for one concrete MCP execution. + pub(crate) fn enabled(mcp_call_id: McpCallId) -> Self { + Self { + mcp_call_id: Some(mcp_call_id), + } + } + + /// Returns the trace-owned MCP call ID when rollout tracing is enabled. + pub(crate) fn mcp_call_id(&self) -> Option<&str> { + self.mcp_call_id.as_deref() + } + + /// Adds bridge-private MCP correlation metadata to one outgoing request. + pub fn add_request_meta(&self, meta: Option) -> Option { + let Some(mcp_call_id) = self.mcp_call_id() else { + return meta; + }; + + match meta { + Some(JsonValue::Object(mut map)) => { + map.insert( + MCP_CALL_ID_META_KEY.to_string(), + JsonValue::String(mcp_call_id.to_string()), + ); + Some(JsonValue::Object(map)) + } + None => { + let mut map = serde_json::Map::new(); + map.insert( + MCP_CALL_ID_META_KEY.to_string(), + JsonValue::String(mcp_call_id.to_string()), + ); + Some(JsonValue::Object(map)) + } + // This should never happen but if it does then we'll fallback to + // a noop rather than any breaking behavior. The tracing is best + // effort after all. + Some(_) => meta, + } + } +} + +#[cfg(test)] +mod tests { + use serde_json::json; + + use super::MCP_CALL_ID_META_KEY; + use super::McpCallTraceContext; + + #[test] + fn disabled_mcp_trace_leaves_request_meta_unchanged() { + let meta = Some(json!({"source": "test"})); + + assert_eq!( + McpCallTraceContext::disabled().add_request_meta(meta.clone()), + meta + ); + } + + #[test] + fn enabled_mcp_trace_adds_bridge_correlation_meta() { + let trace = McpCallTraceContext::enabled("mcp-call-id".to_string()); + let meta = trace + .add_request_meta(Some(json!({"source": "test"}))) + .expect("enabled trace keeps request metadata"); + let object = meta + .as_object() + .expect("MCP request metadata remains an object"); + + assert_eq!(object["source"], json!("test")); + assert_eq!( + object[MCP_CALL_ID_META_KEY], + json!(trace.mcp_call_id().expect("enabled trace has an ID")) + ); + } +} diff --git a/codex-rs/rollout-trace/src/model/mod.rs b/codex-rs/rollout-trace/src/model/mod.rs index e265e2910028..0166121da9f4 100644 --- a/codex-rs/rollout-trace/src/model/mod.rs +++ b/codex-rs/rollout-trace/src/model/mod.rs @@ -28,6 +28,8 @@ pub type CodexTurnId = String; pub type ConversationItemId = String; /// Local ID for one outbound upstream inference request. pub type InferenceCallId = String; +/// Globally unique ID for one concrete MCP backend request. +pub type McpCallId = String; /// Reducer-owned ID for one runtime tool-call object. pub type ToolCallId = String; /// Responses `call_id` / custom-tool call ID visible in inference payloads. diff --git a/codex-rs/rollout-trace/src/model/runtime.rs b/codex-rs/rollout-trace/src/model/runtime.rs index a2afcb0636dc..f0d1d1caf1d1 100644 --- a/codex-rs/rollout-trace/src/model/runtime.rs +++ b/codex-rs/rollout-trace/src/model/runtime.rs @@ -13,6 +13,7 @@ use super::CompactionId; use super::CompactionRequestId; use super::ConversationItemId; use super::EdgeId; +use super::McpCallId; use super::ModelVisibleCallId; use super::TerminalId; use super::TerminalOperationId; @@ -113,6 +114,8 @@ pub struct CompactionRequest { #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub struct ToolCall { pub tool_call_id: ToolCallId, + /// Globally unique MCP execution ID, when this tool reached an MCP backend. + pub mcp_call_id: Option, /// Model-visible protocol call ID, if the model directly requested this tool. pub model_visible_call_id: Option, /// Code-mode runtime's internal tool invocation ID, if this call came from JS. diff --git a/codex-rs/rollout-trace/src/raw_event.rs b/codex-rs/rollout-trace/src/raw_event.rs index d60a2b6c4851..bd9010d7bbee 100644 --- a/codex-rs/rollout-trace/src/raw_event.rs +++ b/codex-rs/rollout-trace/src/raw_event.rs @@ -8,6 +8,7 @@ use crate::model::CompactionRequestId; use crate::model::EdgeId; use crate::model::ExecutionStatus; use crate::model::InferenceCallId; +use crate::model::McpCallId; use crate::model::ModelVisibleCallId; use crate::model::RolloutStatus; use crate::model::ToolCallId; @@ -138,6 +139,11 @@ pub enum RawTraceEventPayload { summary: ToolCallSummary, invocation_payload: Option, }, + /// Bridge correlation UUID assigned only when a tool reaches an MCP backend. + McpToolCallCorrelationAssigned { + tool_call_id: ToolCallId, + mcp_call_id: McpCallId, + }, ToolCallRuntimeStarted { tool_call_id: ToolCallId, /// Runtime/protocol observation for how Codex began executing the tool. @@ -236,6 +242,7 @@ impl RawTraceEventPayload { | RawTraceEventPayload::CodexTurnEnded { .. } | RawTraceEventPayload::CompactionRequestFailed { .. } | RawTraceEventPayload::CodeCellStarted { .. } + | RawTraceEventPayload::McpToolCallCorrelationAssigned { .. } | RawTraceEventPayload::AgentResultObserved { carried_payload: None, .. diff --git a/codex-rs/rollout-trace/src/reducer/mod.rs b/codex-rs/rollout-trace/src/reducer/mod.rs index 3e8f6f13537b..4a908b6e57ed 100644 --- a/codex-rs/rollout-trace/src/reducer/mod.rs +++ b/codex-rs/rollout-trace/src/reducer/mod.rs @@ -261,6 +261,12 @@ impl TraceReducer { }, )?; } + RawTraceEventPayload::McpToolCallCorrelationAssigned { + tool_call_id, + mcp_call_id, + } => { + self.assign_mcp_tool_call_correlation(tool_call_id, mcp_call_id)?; + } RawTraceEventPayload::ToolCallRuntimeStarted { tool_call_id, runtime_payload, diff --git a/codex-rs/rollout-trace/src/reducer/tool.rs b/codex-rs/rollout-trace/src/reducer/tool.rs index d481bb6acdfb..43845ea93918 100644 --- a/codex-rs/rollout-trace/src/reducer/tool.rs +++ b/codex-rs/rollout-trace/src/reducer/tool.rs @@ -7,6 +7,7 @@ use crate::model::CodeModeRuntimeToolId; use crate::model::ConversationItemKind; use crate::model::ExecutionStatus; use crate::model::ExecutionWindow; +use crate::model::McpCallId; use crate::model::ModelVisibleCallId; use crate::model::ProducerRef; use crate::model::ToolCall; @@ -128,6 +129,7 @@ impl TraceReducer { tool_call_id.clone(), ToolCall { tool_call_id: tool_call_id.clone(), + mcp_call_id: None, model_visible_call_id, code_mode_runtime_tool_id: started.code_mode_runtime_tool_id, thread_id, @@ -166,6 +168,21 @@ impl TraceReducer { Ok(()) } + /// Attaches the bridge-visible MCP UUID after the generic tool call exists. + pub(super) fn assign_mcp_tool_call_correlation( + &mut self, + tool_call_id: ToolCallId, + mcp_call_id: McpCallId, + ) -> Result<()> { + let Some(tool_call) = self.rollout.tool_calls.get_mut(&tool_call_id) else { + bail!("MCP correlation referenced unknown tool call {tool_call_id}"); + }; + if tool_call.mcp_call_id.replace(mcp_call_id).is_some() { + bail!("duplicate MCP correlation for tool call {tool_call_id}"); + } + Ok(()) + } + /// Completes the canonical tool call and any terminal operation driven by dispatch output. /// /// Protocol-backed terminal tools end from runtime events; direct tools diff --git a/codex-rs/rollout-trace/src/thread.rs b/codex-rs/rollout-trace/src/thread.rs index 47526807a17c..514b8d382bc0 100644 --- a/codex-rs/rollout-trace/src/thread.rs +++ b/codex-rs/rollout-trace/src/thread.rs @@ -22,11 +22,13 @@ use crate::CodexTurnId; use crate::CompactionId; use crate::CompactionTraceContext; use crate::InferenceTraceContext; +use crate::McpCallTraceContext; use crate::RawPayloadKind; use crate::RawPayloadRef; use crate::RawTraceEventContext; use crate::RawTraceEventPayload; use crate::RolloutStatus; +use crate::ToolCallId; use crate::ToolDispatchInvocation; use crate::ToolDispatchTraceContext; use crate::TraceWriter; @@ -395,6 +397,24 @@ impl ThreadTraceContext { provider_name.into(), ) } + + /// Starts bridge correlation for one concrete MCP backend request. + /// + /// Dispatch-level tool IDs remain compact and UI-friendly. This UUID is + /// deliberately separate: it is only for cross-process log joins where a + /// rollout-local counter would collide across samples. + pub fn start_mcp_call_trace(&self, tool_call_id: impl Into) -> McpCallTraceContext { + let ThreadTraceContextState::Enabled(context) = &self.state else { + return McpCallTraceContext::disabled(); + }; + let mcp_call_id = Uuid::new_v4().to_string(); + let trace = McpCallTraceContext::enabled(mcp_call_id.clone()); + context.append_best_effort(RawTraceEventPayload::McpToolCallCorrelationAssigned { + tool_call_id: tool_call_id.into(), + mcp_call_id, + }); + trace + } } fn start_root_in_root(