Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions codex-rs/core/src/mcp_tool_call.rs
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,11 @@ async fn execute_mcp_tool_call(
)
.await
.map_err(|e| format!("failed to build MCP tool request metadata: {e:#}"))?;
let mcp_call_trace = sess
Comment thread
cassirer-openai marked this conversation as resolved.
.services
.rollout_thread_trace
.start_mcp_call_trace(call_id);
let request_meta = mcp_call_trace.add_request_meta(request_meta);
let result = sess
.call_tool(
&invocation.server,
Expand Down
103 changes: 103 additions & 0 deletions codex-rs/core/src/mcp_tool_call_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,13 @@ use codex_protocol::models::PermissionProfile;
use codex_protocol::protocol::AskForApproval;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::GranularApprovalConfig;
use codex_protocol::protocol::McpInvocation;
use codex_protocol::protocol::SessionSource;
use codex_rollout_trace::ThreadStartedTraceMetadata;
use codex_rollout_trace::ToolDispatchInvocation;
use codex_rollout_trace::ToolDispatchPayload;
use codex_rollout_trace::ToolDispatchRequester;
use codex_rollout_trace::replay_bundle;
use core_test_support::PathExt;
use core_test_support::hooks::trusted_config_layer_stack;
use core_test_support::responses::ev_assistant_message;
Expand All @@ -34,6 +41,9 @@ use core_test_support::responses::start_mock_server;
use pretty_assertions::assert_eq;
use serde::Deserialize;
use std::collections::HashMap;
use std::fs;
use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;
use tempfile::tempdir;
use tracing::Instrument;
Expand Down Expand Up @@ -116,6 +126,60 @@ fn prompt_options(
}
}

#[tokio::test]
async fn execute_mcp_tool_call_records_replayable_correlation() -> anyhow::Result<()> {
let temp = tempdir()?;
let (mut session, turn_context) = make_session_and_context().await;
attach_trace_bundle(&mut session, &turn_context, temp.path())?;

let dispatch_trace = session
.services
.rollout_thread_trace
.start_tool_dispatch_trace(|| {
Some(ToolDispatchInvocation {
thread_id: session.conversation_id.to_string(),
codex_turn_id: turn_context.sub_id.clone(),
tool_call_id: "mcp-call".to_string(),
tool_name: "search".to_string(),
tool_namespace: Some("mcp__docs__".to_string()),
requester: ToolDispatchRequester::Model {
model_visible_call_id: "mcp-call".to_string(),
},
payload: ToolDispatchPayload::Function {
arguments: r#"{"query":"trace"}"#.to_string(),
},
})
});
assert!(dispatch_trace.is_enabled());

let result = execute_mcp_tool_call(
&session,
&turn_context,
"mcp-call",
&McpInvocation {
server: "docs".to_string(),
tool: "search".to_string(),
arguments: Some(serde_json::json!({ "query": "trace" })),
},
/*rewritten_arguments*/ None,
/*metadata*/ None,
/*request_meta*/ None,
)
.await;
assert!(
result.is_err(),
"the synthetic backend is absent; only trace emission matters",
);

let replayed = replay_bundle(single_bundle_dir(temp.path())?)?;
assert!(
replayed.tool_calls["mcp-call"].mcp_call_id.is_some(),
"the real MCP execution path should emit a reducer-visible correlation ID",
);

Ok(())
}

fn install_mcp_permission_request_hook(
session: &mut Session,
turn_context: &TurnContext,
Expand Down Expand Up @@ -205,6 +269,45 @@ print({hook_output:?})
log_path.to_path_buf()
}

/// Attaches a replayable rollout bundle to one synthetic session under test.
fn attach_trace_bundle(
session: &mut Session,
turn_context: &TurnContext,
root: &Path,
) -> anyhow::Result<()> {
let rollout_thread_trace =
codex_rollout_trace::ThreadTraceContext::start_root_in_root_for_test(
root,
ThreadStartedTraceMetadata {
thread_id: session.conversation_id.to_string(),
agent_path: "/root".to_string(),
task_name: None,
nickname: None,
agent_role: None,
session_source: SessionSource::Exec,
cwd: PathBuf::from("/workspace"),
rollout_path: None,
model: "gpt-test".to_string(),
provider_name: "test-provider".to_string(),
approval_policy: "never".to_string(),
sandbox_policy: "danger-full-access".to_string(),
},
)?;
rollout_thread_trace.record_codex_turn_started(turn_context.sub_id.as_str());
session.services.rollout_thread_trace = rollout_thread_trace;
Ok(())
}

/// Returns the sole bundle emitted under a temporary rollout trace root.
fn single_bundle_dir(root: &Path) -> anyhow::Result<PathBuf> {
let mut entries = fs::read_dir(root)?
.map(|entry| entry.map(|entry| entry.path()))
.collect::<Result<Vec<_>, _>>()?;
entries.sort();
assert_eq!(entries.len(), 1);
Ok(entries.remove(0))
}

#[test]
fn mcp_app_resource_uri_reads_known_tool_meta_keys() {
let nested = serde_json::json!({
Expand Down
3 changes: 3 additions & 0 deletions codex-rs/rollout-trace/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ mod bundle;
mod code_cell;
mod compaction;
mod inference;
mod mcp;
mod model;
mod payload;
mod protocol_event;
Expand All @@ -33,6 +34,8 @@ pub use compaction::CompactionTraceContext;
pub use inference::InferenceTraceAttempt;
/// Shared recorder context for inference attempts within one Codex turn.
pub use inference::InferenceTraceContext;
/// Trace-owned MCP execution correlation propagated to bridge request metadata.
pub use mcp::McpCallTraceContext;
/// Public reduced trace model returned by replay.
pub use model::*;
/// Stable identifier for one raw payload inside a rollout bundle.
Expand Down
99 changes: 99 additions & 0 deletions codex-rs/rollout-trace/src/mcp.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
//! Hot-path helpers for correlating concrete MCP executions with rollout traces.
//!
//! Core decides when an MCP request is actually going to execute. The trace
//! crate owns the globally unique ID, the trace event that preserves it in the
//! reduced artifact, and the bridge-private MCP request metadata key.

use crate::McpCallId;
use serde_json::Value as JsonValue;

const MCP_CALL_ID_META_KEY: &str = "codex_bridge_mcp_call_id";

/// No-op capable handle for one concrete MCP backend call.
#[derive(Clone, Debug)]
pub struct McpCallTraceContext {
mcp_call_id: Option<McpCallId>,
}

impl McpCallTraceContext {
/// Builds a context that records nothing and leaves request metadata unchanged.
pub fn disabled() -> Self {
Self { mcp_call_id: None }
}

/// Builds the trace handle for one concrete MCP execution.
pub(crate) fn enabled(mcp_call_id: McpCallId) -> Self {
Self {
mcp_call_id: Some(mcp_call_id),
}
}

/// Returns the trace-owned MCP call ID when rollout tracing is enabled.
pub(crate) fn mcp_call_id(&self) -> Option<&str> {
self.mcp_call_id.as_deref()
}

/// Adds bridge-private MCP correlation metadata to one outgoing request.
pub fn add_request_meta(&self, meta: Option<JsonValue>) -> Option<JsonValue> {
let Some(mcp_call_id) = self.mcp_call_id() else {
return meta;
};

match meta {
Some(JsonValue::Object(mut map)) => {
map.insert(
MCP_CALL_ID_META_KEY.to_string(),
JsonValue::String(mcp_call_id.to_string()),
);
Some(JsonValue::Object(map))
}
None => {
let mut map = serde_json::Map::new();
map.insert(
MCP_CALL_ID_META_KEY.to_string(),
JsonValue::String(mcp_call_id.to_string()),
);
Some(JsonValue::Object(map))
}
// This should never happen but if it does then we'll fallback to
// a noop rather than any breaking behavior. The tracing is best
// effort after all.
Some(_) => meta,
}
}
}

#[cfg(test)]
mod tests {
use serde_json::json;

use super::MCP_CALL_ID_META_KEY;
use super::McpCallTraceContext;

#[test]
fn disabled_mcp_trace_leaves_request_meta_unchanged() {
let meta = Some(json!({"source": "test"}));

assert_eq!(
McpCallTraceContext::disabled().add_request_meta(meta.clone()),
meta
);
}

#[test]
fn enabled_mcp_trace_adds_bridge_correlation_meta() {
let trace = McpCallTraceContext::enabled("mcp-call-id".to_string());
let meta = trace
.add_request_meta(Some(json!({"source": "test"})))
.expect("enabled trace keeps request metadata");
let object = meta
.as_object()
.expect("MCP request metadata remains an object");

assert_eq!(object["source"], json!("test"));
assert_eq!(
object[MCP_CALL_ID_META_KEY],
json!(trace.mcp_call_id().expect("enabled trace has an ID"))
);
}
}
2 changes: 2 additions & 0 deletions codex-rs/rollout-trace/src/model/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ pub type CodexTurnId = String;
pub type ConversationItemId = String;
/// Local ID for one outbound upstream inference request.
pub type InferenceCallId = String;
/// Globally unique ID for one concrete MCP backend request.
pub type McpCallId = String;
/// Reducer-owned ID for one runtime tool-call object.
pub type ToolCallId = String;
/// Responses `call_id` / custom-tool call ID visible in inference payloads.
Expand Down
3 changes: 3 additions & 0 deletions codex-rs/rollout-trace/src/model/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use super::CompactionId;
use super::CompactionRequestId;
use super::ConversationItemId;
use super::EdgeId;
use super::McpCallId;
use super::ModelVisibleCallId;
use super::TerminalId;
use super::TerminalOperationId;
Expand Down Expand Up @@ -113,6 +114,8 @@ pub struct CompactionRequest {
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct ToolCall {
pub tool_call_id: ToolCallId,
/// Globally unique MCP execution ID, when this tool reached an MCP backend.
pub mcp_call_id: Option<McpCallId>,
/// Model-visible protocol call ID, if the model directly requested this tool.
pub model_visible_call_id: Option<ModelVisibleCallId>,
/// Code-mode runtime's internal tool invocation ID, if this call came from JS.
Expand Down
7 changes: 7 additions & 0 deletions codex-rs/rollout-trace/src/raw_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::model::CompactionRequestId;
use crate::model::EdgeId;
use crate::model::ExecutionStatus;
use crate::model::InferenceCallId;
use crate::model::McpCallId;
use crate::model::ModelVisibleCallId;
use crate::model::RolloutStatus;
use crate::model::ToolCallId;
Expand Down Expand Up @@ -138,6 +139,11 @@ pub enum RawTraceEventPayload {
summary: ToolCallSummary,
invocation_payload: Option<RawPayloadRef>,
},
/// Bridge correlation UUID assigned only when a tool reaches an MCP backend.
McpToolCallCorrelationAssigned {
tool_call_id: ToolCallId,
mcp_call_id: McpCallId,
},
ToolCallRuntimeStarted {
tool_call_id: ToolCallId,
/// Runtime/protocol observation for how Codex began executing the tool.
Expand Down Expand Up @@ -236,6 +242,7 @@ impl RawTraceEventPayload {
| RawTraceEventPayload::CodexTurnEnded { .. }
| RawTraceEventPayload::CompactionRequestFailed { .. }
| RawTraceEventPayload::CodeCellStarted { .. }
| RawTraceEventPayload::McpToolCallCorrelationAssigned { .. }
| RawTraceEventPayload::AgentResultObserved {
carried_payload: None,
..
Expand Down
6 changes: 6 additions & 0 deletions codex-rs/rollout-trace/src/reducer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,12 @@ impl TraceReducer {
},
)?;
}
RawTraceEventPayload::McpToolCallCorrelationAssigned {
tool_call_id,
mcp_call_id,
} => {
self.assign_mcp_tool_call_correlation(tool_call_id, mcp_call_id)?;
}
RawTraceEventPayload::ToolCallRuntimeStarted {
tool_call_id,
runtime_payload,
Expand Down
17 changes: 17 additions & 0 deletions codex-rs/rollout-trace/src/reducer/tool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::model::CodeModeRuntimeToolId;
use crate::model::ConversationItemKind;
use crate::model::ExecutionStatus;
use crate::model::ExecutionWindow;
use crate::model::McpCallId;
use crate::model::ModelVisibleCallId;
use crate::model::ProducerRef;
use crate::model::ToolCall;
Expand Down Expand Up @@ -128,6 +129,7 @@ impl TraceReducer {
tool_call_id.clone(),
ToolCall {
tool_call_id: tool_call_id.clone(),
mcp_call_id: None,
model_visible_call_id,
code_mode_runtime_tool_id: started.code_mode_runtime_tool_id,
thread_id,
Expand Down Expand Up @@ -166,6 +168,21 @@ impl TraceReducer {
Ok(())
}

/// Attaches the bridge-visible MCP UUID after the generic tool call exists.
pub(super) fn assign_mcp_tool_call_correlation(
&mut self,
tool_call_id: ToolCallId,
mcp_call_id: McpCallId,
) -> Result<()> {
let Some(tool_call) = self.rollout.tool_calls.get_mut(&tool_call_id) else {
bail!("MCP correlation referenced unknown tool call {tool_call_id}");
};
if tool_call.mcp_call_id.replace(mcp_call_id).is_some() {
bail!("duplicate MCP correlation for tool call {tool_call_id}");
}
Ok(())
}

/// Completes the canonical tool call and any terminal operation driven by dispatch output.
///
/// Protocol-backed terminal tools end from runtime events; direct tools
Expand Down
Loading
Loading