From c84284e583a73947ed4a1b8351887de973f94815 Mon Sep 17 00:00:00 2001 From: Albin Cassirer Date: Tue, 21 Apr 2026 11:59:09 -0700 Subject: [PATCH 1/6] Add core rollout trace recorder --- codex-rs/Cargo.lock | 1 + codex-rs/Cargo.toml | 1 + codex-rs/core/Cargo.toml | 1 + codex-rs/core/src/client.rs | 120 +++++++-- codex-rs/core/src/codex_delegate.rs | 1 + codex-rs/core/src/compact_remote.rs | 42 ++- codex-rs/core/src/lib.rs | 1 + codex-rs/core/src/rollout_trace.rs | 250 ++++++++++++++++++ codex-rs/core/src/rollout_trace_tests.rs | 129 +++++++++ codex-rs/core/src/session/mod.rs | 5 + codex-rs/core/src/session/session.rs | 36 +++ codex-rs/core/src/session/tests.rs | 4 + .../core/src/session/tests/guardian_tests.rs | 1 + codex-rs/core/src/session/turn.rs | 14 +- codex-rs/core/src/state/service.rs | 2 + codex-rs/core/src/thread_manager.rs | 1 + 16 files changed, 591 insertions(+), 18 deletions(-) create mode 100644 codex-rs/core/src/rollout_trace.rs create mode 100644 codex-rs/core/src/rollout_trace_tests.rs diff --git a/codex-rs/Cargo.lock b/codex-rs/Cargo.lock index de908f05fce3..74114b63081c 100644 --- a/codex-rs/Cargo.lock +++ b/codex-rs/Cargo.lock @@ -1961,6 +1961,7 @@ dependencies = [ "codex-response-debug-context", "codex-rmcp-client", "codex-rollout", + "codex-rollout-trace", "codex-sandboxing", "codex-secrets", "codex-shell-command", diff --git a/codex-rs/Cargo.toml b/codex-rs/Cargo.toml index d70af322a162..3b28d3dcb6a2 100644 --- a/codex-rs/Cargo.toml +++ b/codex-rs/Cargo.toml @@ -165,6 +165,7 @@ codex-responses-api-proxy = { path = "responses-api-proxy" } codex-response-debug-context = { path = "response-debug-context" } codex-rmcp-client = { path = "rmcp-client" } codex-rollout = { path = "rollout" } +codex-rollout-trace = { path = "rollout-trace" } codex-sandboxing = { path = "sandboxing" } codex-secrets = { path = "secrets" } codex-shell-command = { path = "shell-command" } diff --git a/codex-rs/core/Cargo.toml b/codex-rs/core/Cargo.toml index 86d6f8b0f3ec..dccd1f9cf000 100644 --- a/codex-rs/core/Cargo.toml +++ b/codex-rs/core/Cargo.toml @@ -53,6 +53,7 @@ codex-model-provider = { workspace = true } codex-protocol = { workspace = true } codex-response-debug-context = { workspace = true } codex-rollout = { workspace = true } +codex-rollout-trace = { workspace = true } codex-rmcp-client = { workspace = true } codex-sandboxing = { workspace = true } codex-state = { workspace = true } diff --git a/codex-rs/core/src/client.rs b/codex-rs/core/src/client.rs index 601b8106df8f..7edd8da5490d 100644 --- a/codex-rs/core/src/client.rs +++ b/codex-rs/core/src/client.rs @@ -80,6 +80,9 @@ use codex_protocol::openai_models::ReasoningEffort as ReasoningEffortConfig; use codex_protocol::protocol::SessionSource; use codex_protocol::protocol::SubAgentSource; use codex_protocol::protocol::W3cTraceContext; +use codex_rollout_trace::CompactionTraceContext; +use codex_rollout_trace::InferenceTraceAttempt; +use codex_rollout_trace::InferenceTraceContext; use codex_tools::create_tools_json_for_responses_api; use eventsource_stream::Event; use eventsource_stream::EventStreamError; @@ -411,6 +414,7 @@ impl ModelClient { effort: Option, summary: ReasoningSummaryConfig, session_telemetry: &SessionTelemetry, + compaction_trace: &CompactionTraceContext, ) -> Result> { if prompt.input.is_empty() { return Ok(Vec::new()); @@ -465,10 +469,18 @@ impl ModelClient { extra_headers.extend(build_conversation_headers(Some( self.state.conversation_id.to_string(), ))); - client - .compact_input(&payload, extra_headers) - .await - .map_err(map_api_error) + let trace_attempt = compaction_trace.start_attempt(&payload); + match client.compact_input(&payload, extra_headers).await { + Ok(output) => { + trace_attempt.record_completed(&output); + Ok(output) + } + Err(err) => { + let err = map_api_error(err); + trace_attempt.record_failed(&err); + Err(err) + } + } } pub(crate) async fn create_realtime_call_with_headers( @@ -1148,6 +1160,7 @@ impl ModelClientSession { summary: ReasoningSummaryConfig, service_tier: Option, turn_metadata_header: Option<&str>, + inference_trace: &InferenceTraceContext, ) -> Result { if let Some(path) = &*CODEX_RS_SSE_FIXTURE { warn!(path, "Streaming from fixture"); @@ -1156,7 +1169,11 @@ impl ModelClientSession { self.client.state.provider.info().stream_idle_timeout(), ) .map_err(map_api_error)?; - let (stream, _last_request_rx) = map_response_stream(stream, session_telemetry.clone()); + let (stream, _last_request_rx) = map_response_stream( + stream, + session_telemetry.clone(), + InferenceTraceAttempt::disabled(), + ); return Ok(stream); } @@ -1190,6 +1207,8 @@ impl ModelClientSession { summary, service_tier, )?; + let inference_trace_attempt = inference_trace.start_attempt(); + inference_trace_attempt.record_started(&request); let client = ApiResponsesClient::new( transport, client_setup.api_provider, @@ -1200,12 +1219,17 @@ impl ModelClientSession { match stream_result { Ok(stream) => { - let (stream, _) = map_response_stream(stream, session_telemetry.clone()); + let (stream, _) = map_response_stream( + stream, + session_telemetry.clone(), + inference_trace_attempt, + ); return Ok(stream); } Err(ApiError::Transport( unauthorized_transport @ TransportError::Http { status, .. }, )) if status == StatusCode::UNAUTHORIZED => { + inference_trace_attempt.record_failed(&unauthorized_transport); pending_retry = PendingUnauthorizedRetry::from_recovery( handle_unauthorized( unauthorized_transport, @@ -1216,7 +1240,11 @@ impl ModelClientSession { ); continue; } - Err(err) => return Err(map_api_error(err)), + Err(err) => { + let err = map_api_error(err); + inference_trace_attempt.record_failed(&err); + return Err(err); + } } } } @@ -1247,6 +1275,7 @@ impl ModelClientSession { turn_metadata_header: Option<&str>, warmup: bool, request_trace: Option, + inference_trace: &InferenceTraceContext, ) -> Result { let auth_manager = self.client.state.provider.auth_manager(); @@ -1321,17 +1350,35 @@ impl ModelClientSession { let ws_request = self.prepare_websocket_request(ws_payload, &request); self.websocket_session.last_request = Some(request); + let inference_trace_attempt = if warmup { + // Prewarm sends `generate=false`; it is connection setup, not a + // model inference attempt that should appear in rollout traces. + InferenceTraceAttempt::disabled() + } else { + inference_trace.start_attempt() + }; + inference_trace_attempt.record_started(&ws_request); let stream_result = self.websocket_session.connection.as_ref().ok_or_else(|| { map_api_error(ApiError::Stream( "websocket connection is unavailable".to_string(), )) })?; - let stream_result = stream_result + let stream_result = match stream_result .stream_request(ws_request, self.websocket_session.connection_reused()) .await - .map_err(map_api_error)?; - let (stream, last_request_rx) = - map_response_stream(stream_result, session_telemetry.clone()); + { + Ok(stream_result) => stream_result, + Err(err) => { + let err = map_api_error(err); + inference_trace_attempt.record_failed(&err); + return Err(err); + } + }; + let (stream, last_request_rx) = map_response_stream( + stream_result, + session_telemetry.clone(), + inference_trace_attempt, + ); self.websocket_session.last_response_rx = Some(last_request_rx); return Ok(WebsocketStreamOutcome::Stream(stream)); } @@ -1390,6 +1437,7 @@ impl ModelClientSession { return Ok(()); } + let disabled_trace = InferenceTraceContext::disabled(); match self .stream_responses_websocket( prompt, @@ -1401,6 +1449,7 @@ impl ModelClientSession { turn_metadata_header, /*warmup*/ true, current_span_w3c_trace_context(), + &disabled_trace, ) .await { @@ -1424,12 +1473,11 @@ impl ModelClientSession { } #[allow(clippy::too_many_arguments)] - /// Streams a single model request within the current turn. + /// Streams a single model request without rollout tracing. /// - /// The caller is responsible for passing per-turn settings explicitly (model selection, - /// reasoning settings, telemetry context, and turn metadata). This method will prefer the - /// Responses WebSocket transport when the provider supports it and it remains healthy, and will - /// fall back to the HTTP Responses API transport otherwise. + /// This is the public client API. It routes through the same transport code + /// as traced Codex turns, but supplies a disabled trace context so tracing + /// does not leak into callers that only need model streaming. pub async fn stream( &mut self, prompt: &Prompt, @@ -1439,6 +1487,37 @@ impl ModelClientSession { summary: ReasoningSummaryConfig, service_tier: Option, turn_metadata_header: Option<&str>, + ) -> Result { + let disabled_trace = InferenceTraceContext::disabled(); + self.stream_with_trace( + prompt, + model_info, + session_telemetry, + effort, + summary, + service_tier, + turn_metadata_header, + &disabled_trace, + ) + .await + } + + #[allow(clippy::too_many_arguments)] + /// Streams a model request with an explicit rollout trace context. + /// + /// The context may be enabled or disabled. Transport code records against it + /// unconditionally so HTTP, WebSocket, retry, and fallback paths do not need + /// separate trace/no-trace branches. + pub(crate) async fn stream_with_trace( + &mut self, + prompt: &Prompt, + model_info: &ModelInfo, + session_telemetry: &SessionTelemetry, + effort: Option, + summary: ReasoningSummaryConfig, + service_tier: Option, + turn_metadata_header: Option<&str>, + inference_trace: &InferenceTraceContext, ) -> Result { let wire_api = self.client.state.provider.info().wire_api; match wire_api { @@ -1456,6 +1535,7 @@ impl ModelClientSession { turn_metadata_header, /*warmup*/ false, request_trace, + inference_trace, ) .await? { @@ -1474,6 +1554,7 @@ impl ModelClientSession { summary, service_tier, turn_metadata_header, + inference_trace, ) .await } @@ -1569,6 +1650,7 @@ fn parent_thread_id_header_value(session_source: &SessionSource) -> Option( api_stream: S, session_telemetry: SessionTelemetry, + inference_trace_attempt: InferenceTraceAttempt, ) -> (ResponseStream, oneshot::Receiver) where S: futures::Stream> @@ -1609,6 +1691,11 @@ where usage.total_tokens, ); } + inference_trace_attempt.record_completed( + &response_id, + &token_usage, + &items_added, + ); if let Some(sender) = tx_last_response.take() { let _ = sender.send(LastResponse { response_id: response_id.clone(), @@ -1633,6 +1720,7 @@ where } Err(err) => { let mapped = map_api_error(err); + inference_trace_attempt.record_failed(&mapped); if !logged_error { session_telemetry.see_event_completed_failed(&mapped); logged_error = true; diff --git a/codex-rs/core/src/codex_delegate.rs b/codex-rs/core/src/codex_delegate.rs index 7247c601f46e..a06698590ba5 100644 --- a/codex-rs/core/src/codex_delegate.rs +++ b/codex-rs/core/src/codex_delegate.rs @@ -94,6 +94,7 @@ pub(crate) async fn run_codex_thread_interactive( inherited_shell_snapshot: None, user_shell_override: None, inherited_exec_policy: Some(Arc::clone(&parent_session.services.exec_policy)), + inherited_rollout_trace: None, parent_trace: None, analytics_events_client: Some(parent_session.services.analytics_events_client.clone()), })) diff --git a/codex-rs/core/src/compact_remote.rs b/codex-rs/core/src/compact_remote.rs index e56812eca305..bc147b7f7ead 100644 --- a/codex-rs/core/src/compact_remote.rs +++ b/codex-rs/core/src/compact_remote.rs @@ -10,6 +10,7 @@ use crate::context_manager::ContextManager; use crate::context_manager::TotalTokenUsageBreakdown; use crate::context_manager::estimate_response_item_model_visible_bytes; use crate::context_manager::is_codex_generated_item; +use crate::rollout_trace::CompactionCheckpointTracePayload; use crate::session::session::Session; use crate::session::turn::built_tools; use crate::session::turn_context::TurnContext; @@ -26,6 +27,7 @@ use codex_protocol::models::ResponseItem; use codex_protocol::protocol::CompactedItem; use codex_protocol::protocol::EventMsg; use codex_protocol::protocol::TurnStartedEvent; +use codex_rollout_trace::CompactionTraceContext; use futures::TryFutureExt; use tokio_util::sync::CancellationToken; use tracing::error; @@ -114,7 +116,11 @@ async fn run_remote_compact_task_inner_impl( turn_context: &Arc, initial_context_injection: InitialContextInjection, ) -> CodexResult<()> { - let compaction_item = TurnItem::ContextCompaction(ContextCompactionItem::new()); + let context_compaction_item = ContextCompactionItem::new(); + // Use the UI compaction item ID as the trace compaction ID so protocol lifecycle events, + // endpoint attempts, and the installed history checkpoint all have one join key. + let compaction_id = context_compaction_item.id.clone(); + let compaction_item = TurnItem::ContextCompaction(context_compaction_item); sess.emit_turn_item_started(turn_context, &compaction_item) .await; let mut history = sess.clone_history().await; @@ -131,6 +137,10 @@ async fn run_remote_compact_task_inner_impl( "trimmed history items before remote compaction" ); } + // This is the history selected for remote compaction, after any trimming required to fit the + // compact endpoint. The checkpoint below records it separately from the next sampling request, + // whose prompt will repeat current developer/context prefix items. + let trace_input_history = history.raw_items().to_vec(); // Required to keep `/undo` available after compaction let ghost_snapshots: Vec = history .raw_items() @@ -157,6 +167,21 @@ async fn run_remote_compact_task_inner_impl( personality: turn_context.personality, output_schema: None, }; + // Remote compaction is the only compaction shape rollout tracing supports. The trace context + // records the exact `/responses/compact` request and response; normal sampling requests remain + // traced through the inference path. + let compaction_trace = sess.services.rollout_trace.as_ref().map_or_else( + CompactionTraceContext::disabled, + |trace| { + trace.compaction_trace_context( + sess.conversation_id.to_string(), + turn_context.sub_id.clone(), + compaction_id.clone(), + turn_context.model_info.slug.clone(), + turn_context.provider.info().name.clone(), + ) + }, + ); let mut new_history = sess .services @@ -167,6 +192,7 @@ async fn run_remote_compact_task_inner_impl( turn_context.reasoning_effort, turn_context.reasoning_summary, &turn_context.session_telemetry, + &compaction_trace, ) .or_else(|err| async { let total_usage_breakdown = sess.get_total_token_usage_breakdown().await; @@ -200,6 +226,20 @@ async fn run_remote_compact_task_inner_impl( message: String::new(), replacement_history: Some(new_history.clone()), }; + if let Some(trace) = sess.services.rollout_trace.as_ref() { + // Install is the semantic boundary where the compact endpoint's output becomes live + // thread history. Keep it distinct from the later inference request so the reducer can + // still represent repeated developer/context prefix items exactly as the model saw them. + trace.record_compaction_installed( + sess.conversation_id.to_string(), + turn_context.sub_id.clone(), + compaction_id, + &CompactionCheckpointTracePayload { + input_history: &trace_input_history, + replacement_history: &new_history, + }, + ); + } sess.replace_compacted_history(new_history, reference_context_item, compacted_item) .await; sess.recompute_token_usage(turn_context).await; diff --git a/codex-rs/core/src/lib.rs b/codex-rs/core/src/lib.rs index 373e57737be0..00ef56f20f73 100644 --- a/codex-rs/core/src/lib.rs +++ b/codex-rs/core/src/lib.rs @@ -133,6 +133,7 @@ pub use agents_md::AgentsMdManager; pub use agents_md::DEFAULT_AGENTS_MD_FILENAME; pub use agents_md::LOCAL_AGENTS_MD_FILENAME; mod rollout; +mod rollout_trace; pub(crate) mod safety; mod session_rollout_init_error; pub mod shell; diff --git a/codex-rs/core/src/rollout_trace.rs b/codex-rs/core/src/rollout_trace.rs new file mode 100644 index 000000000000..b66f61f93390 --- /dev/null +++ b/codex-rs/core/src/rollout_trace.rs @@ -0,0 +1,250 @@ +//! Opt-in producer for the rollout trace bundle. +//! +//! This module is the deliberately thin bridge from `codex-core` into +//! `codex-rollout-trace`. Core emits raw observations; the trace crate's +//! offline reducer owns the semantic graph. + +use std::path::Path; +use std::path::PathBuf; +use std::sync::Arc; + +use codex_protocol::ThreadId; +use codex_protocol::models::ResponseItem; +use codex_protocol::protocol::SessionSource; +use codex_rollout_trace::AgentThreadId; +use codex_rollout_trace::CompactionTraceContext; +use codex_rollout_trace::InferenceTraceContext; +use codex_rollout_trace::RawPayloadKind; +use codex_rollout_trace::RawTraceEventContext; +use codex_rollout_trace::RawTraceEventPayload; +use codex_rollout_trace::TraceWriter; +use serde::Serialize; +use tracing::debug; +use tracing::warn; +use uuid::Uuid; + +/// Environment variable that enables local trace-bundle recording. +/// +/// The value is a root directory. Each independent root session gets one child +/// bundle directory. Spawned child threads share their root session's bundle so +/// one reduced `state.json` describes the whole multi-agent rollout tree. +pub(crate) const CODEX_ROLLOUT_TRACE_ROOT_ENV: &str = "CODEX_ROLLOUT_TRACE_ROOT"; + +/// Lightweight handle stored in `SessionServices`. +/// +/// Cloning the handle is cheap; all sequencing and file ownership remains +/// inside `TraceWriter`. +#[derive(Clone, Debug)] +pub(crate) struct RolloutTraceRecorder { + writer: Arc, +} + +/// Metadata captured once at thread/session start. +/// +/// This payload is intentionally operational rather than reduced: it is a raw +/// payload that later reducers can mine as the reduced thread model evolves. +#[derive(Serialize)] +pub(crate) struct ThreadStartedTraceMetadata { + pub(crate) thread_id: String, + pub(crate) agent_path: String, + pub(crate) task_name: Option, + pub(crate) nickname: Option, + pub(crate) agent_role: Option, + pub(crate) session_source: SessionSource, + pub(crate) cwd: PathBuf, + pub(crate) rollout_path: Option, + pub(crate) model: String, + pub(crate) provider_name: String, + pub(crate) approval_policy: String, + pub(crate) sandbox_policy: String, +} + +/// History replacement checkpoint persisted when compaction installs new live history. +/// +/// The checkpoint keeps compaction separate from ordinary sampling snapshots: +/// `input_history` is the live thread history selected for compaction, while +/// `replacement_history` is what future prompts may carry after the checkpoint. +#[derive(Serialize)] +pub(crate) struct CompactionCheckpointTracePayload<'a> { + pub(crate) input_history: &'a [ResponseItem], + pub(crate) replacement_history: &'a [ResponseItem], +} + +impl RolloutTraceRecorder { + /// Creates and starts a trace bundle if `CODEX_ROLLOUT_TRACE_ROOT` is set. + /// + /// Trace startup is best-effort. A tracing failure must not make the Codex + /// session unusable, because traces are diagnostic and can be enabled while + /// debugging unrelated production failures. + pub(crate) fn maybe_create( + thread_id: ThreadId, + metadata: ThreadStartedTraceMetadata, + ) -> Option { + let root = std::env::var_os(CODEX_ROLLOUT_TRACE_ROOT_ENV)?; + let root = PathBuf::from(root); + match Self::create_in_root(root.as_path(), thread_id, metadata) { + Ok(recorder) => Some(recorder), + Err(err) => { + warn!("failed to initialize rollout trace recorder: {err:#}"); + None + } + } + } + + fn create_in_root( + root: &Path, + thread_id: ThreadId, + metadata: ThreadStartedTraceMetadata, + ) -> anyhow::Result { + let trace_id = Uuid::new_v4().to_string(); + let thread_id = thread_id.to_string(); + let bundle_dir = root.join(format!("trace-{trace_id}-{thread_id}")); + let writer = TraceWriter::create( + &bundle_dir, + trace_id.clone(), + thread_id.clone(), + thread_id.clone(), + )?; + let recorder = Self { + writer: Arc::new(writer), + }; + + recorder.append_best_effort(RawTraceEventPayload::RolloutStarted { + trace_id, + root_thread_id: thread_id, + }); + + recorder.record_thread_started(metadata); + + debug!("recording rollout trace at {}", bundle_dir.display()); + Ok(recorder) + } + + /// Emits the lifecycle event and metadata for one thread in this rollout tree. + /// + /// Root sessions call this immediately after `RolloutStarted`; spawned + /// child sessions call it on the inherited recorder. Keeping children in + /// the root bundle preserves one raw payload namespace and one reduced + /// `RolloutTrace` for the whole multi-agent task. + pub(crate) fn record_thread_started(&self, metadata: ThreadStartedTraceMetadata) { + let metadata_payload = + self.write_json_payload_best_effort(RawPayloadKind::SessionMetadata, &metadata); + self.append_best_effort(RawTraceEventPayload::ThreadStarted { + thread_id: metadata.thread_id, + agent_path: metadata.agent_path, + metadata_payload, + }); + } + + /// Builds reusable inference trace context for one Codex turn. + /// + /// The returned context is intentionally not "an inference call" yet. + /// Transport code owns retry/fallback attempts and calls `start_attempt` + /// only after it has built the concrete request payload for that attempt. + pub(crate) fn inference_trace_context( + &self, + thread_id: AgentThreadId, + codex_turn_id: String, + model: String, + provider_name: String, + ) -> InferenceTraceContext { + InferenceTraceContext::enabled( + Arc::clone(&self.writer), + thread_id, + codex_turn_id, + model, + provider_name, + ) + } + + /// Builds remote-compaction trace context for one checkpoint. + /// + /// Rollout tracing currently has a first-class checkpoint model only for remote compaction. + /// The compact endpoint is a model-facing request whose output replaces live history, so it + /// needs both request/response attempt events and a later checkpoint event when processed + /// replacement history is installed. + pub(crate) fn compaction_trace_context( + &self, + thread_id: AgentThreadId, + codex_turn_id: String, + compaction_id: String, + model: String, + provider_name: String, + ) -> CompactionTraceContext { + CompactionTraceContext::enabled( + Arc::clone(&self.writer), + thread_id, + codex_turn_id, + compaction_id, + model, + provider_name, + ) + } + + /// Emits the checkpoint where remote-compacted history replaces live thread history. + /// + /// This checkpoint is deliberately separate from the compact endpoint response: Codex filters + /// and reinjects context before replacement history becomes live. The reducer uses this event + /// to connect the pre-compaction history to the processed replacement items without treating + /// repeated developer/context prefix items as part of the replacement itself. + pub(crate) fn record_compaction_installed( + &self, + thread_id: AgentThreadId, + codex_turn_id: String, + compaction_id: String, + checkpoint: &CompactionCheckpointTracePayload<'_>, + ) { + let Some(checkpoint_payload) = + self.write_json_payload_best_effort(RawPayloadKind::CompactionCheckpoint, checkpoint) + else { + return; + }; + self.append_with_context_best_effort( + thread_id, + codex_turn_id, + RawTraceEventPayload::CompactionInstalled { + compaction_id, + checkpoint_payload, + }, + ); + } + + fn write_json_payload_best_effort( + &self, + kind: RawPayloadKind, + payload: &impl Serialize, + ) -> Option { + match self.writer.write_json_payload(kind, payload) { + Ok(payload_ref) => Some(payload_ref), + Err(err) => { + warn!("failed to write rollout trace payload: {err:#}"); + None + } + } + } + + fn append_best_effort(&self, payload: RawTraceEventPayload) { + if let Err(err) = self.writer.append(payload) { + warn!("failed to append rollout trace event: {err:#}"); + } + } + + fn append_with_context_best_effort( + &self, + thread_id: AgentThreadId, + codex_turn_id: String, + payload: RawTraceEventPayload, + ) { + let context = RawTraceEventContext { + thread_id: Some(thread_id), + codex_turn_id: Some(codex_turn_id), + }; + if let Err(err) = self.writer.append_with_context(context, payload) { + warn!("failed to append rollout trace event: {err:#}"); + } + } +} + +#[cfg(test)] +#[path = "rollout_trace_tests.rs"] +mod tests; diff --git a/codex-rs/core/src/rollout_trace_tests.rs b/codex-rs/core/src/rollout_trace_tests.rs new file mode 100644 index 000000000000..e69cc88c9ba5 --- /dev/null +++ b/codex-rs/core/src/rollout_trace_tests.rs @@ -0,0 +1,129 @@ +use std::fs; +use std::path::Path; +use std::path::PathBuf; + +use codex_protocol::AgentPath; +use codex_protocol::ThreadId; +use codex_protocol::protocol::SandboxPolicy; +use codex_protocol::protocol::SessionSource; +use codex_protocol::protocol::SubAgentSource; +use codex_rollout_trace::RolloutStatus; +use tempfile::TempDir; + +use super::*; + +#[test] +fn create_in_root_writes_replayable_lifecycle_events() -> anyhow::Result<()> { + let temp = TempDir::new()?; + let thread_id = ThreadId::new(); + RolloutTraceRecorder::create_in_root( + temp.path(), + thread_id, + ThreadStartedTraceMetadata { + thread_id: thread_id.to_string(), + agent_path: "/root".to_string(), + task_name: None, + nickname: None, + agent_role: None, + session_source: SessionSource::Exec, + cwd: PathBuf::from("/workspace"), + rollout_path: Some(PathBuf::from("/tmp/rollout.jsonl")), + model: "gpt-test".to_string(), + provider_name: "test-provider".to_string(), + approval_policy: "never".to_string(), + sandbox_policy: format!("{:?}", SandboxPolicy::DangerFullAccess), + }, + ) + .expect("trace recorder"); + + let bundle_dir = single_bundle_dir(temp.path())?; + let replayed = codex_rollout_trace::replay_bundle(&bundle_dir)?; + + assert_eq!(replayed.status, RolloutStatus::Running); + assert_eq!(replayed.root_thread_id, thread_id.to_string()); + assert_eq!(replayed.threads[&thread_id.to_string()].agent_path, "/root"); + assert_eq!(replayed.raw_payloads.len(), 1); + + Ok(()) +} + +#[test] +fn spawned_thread_start_appends_to_root_bundle() -> anyhow::Result<()> { + let temp = TempDir::new()?; + let root_thread_id = ThreadId::new(); + let child_thread_id = ThreadId::new(); + let recorder = RolloutTraceRecorder::create_in_root( + temp.path(), + root_thread_id, + minimal_metadata(root_thread_id), + ) + .expect("trace recorder"); + + recorder.record_thread_started(ThreadStartedTraceMetadata { + thread_id: child_thread_id.to_string(), + agent_path: "/root/repo_file_counter".to_string(), + task_name: Some("repo_file_counter".to_string()), + nickname: Some("Kepler".to_string()), + agent_role: Some("worker".to_string()), + session_source: SessionSource::SubAgent(SubAgentSource::ThreadSpawn { + parent_thread_id: root_thread_id, + depth: 1, + agent_path: Some( + AgentPath::try_from("/root/repo_file_counter").map_err(anyhow::Error::msg)?, + ), + agent_nickname: Some("Kepler".to_string()), + agent_role: Some("worker".to_string()), + }), + cwd: PathBuf::from("/workspace"), + rollout_path: Some(PathBuf::from("/tmp/child-rollout.jsonl")), + model: "gpt-test".to_string(), + provider_name: "test-provider".to_string(), + approval_policy: "never".to_string(), + sandbox_policy: format!("{:?}", SandboxPolicy::DangerFullAccess), + }); + let bundle_dir = single_bundle_dir(temp.path())?; + let replayed = codex_rollout_trace::replay_bundle(&bundle_dir)?; + + assert_eq!(fs::read_dir(temp.path())?.count(), 1); + assert_eq!(replayed.threads.len(), 2); + assert_eq!( + replayed.threads[&child_thread_id.to_string()].agent_path, + "/root/repo_file_counter" + ); + assert_eq!(replayed.status, RolloutStatus::Running); + assert_eq!( + replayed.threads[&child_thread_id.to_string()] + .execution + .status, + codex_rollout_trace::ExecutionStatus::Running + ); + assert_eq!(replayed.raw_payloads.len(), 2); + + Ok(()) +} + +fn minimal_metadata(thread_id: ThreadId) -> ThreadStartedTraceMetadata { + ThreadStartedTraceMetadata { + thread_id: thread_id.to_string(), + agent_path: "/root".to_string(), + task_name: None, + nickname: None, + agent_role: None, + session_source: SessionSource::Exec, + cwd: PathBuf::from("/workspace"), + rollout_path: None, + model: "gpt-test".to_string(), + provider_name: "test-provider".to_string(), + approval_policy: "never".to_string(), + sandbox_policy: "danger-full-access".to_string(), + } +} + +fn single_bundle_dir(root: &Path) -> anyhow::Result { + let mut entries = fs::read_dir(root)? + .map(|entry| entry.map(|entry| entry.path())) + .collect::, _>>()?; + entries.sort(); + assert_eq!(entries.len(), 1); + Ok(entries.remove(0)) +} diff --git a/codex-rs/core/src/session/mod.rs b/codex-rs/core/src/session/mod.rs index 80084f197930..05fdeba33d35 100644 --- a/codex-rs/core/src/session/mod.rs +++ b/codex-rs/core/src/session/mod.rs @@ -35,6 +35,8 @@ use crate::parse_turn_item; use crate::path_utils::normalize_for_native_workdir; use crate::realtime_conversation::RealtimeConversationManager; use crate::rollout::find_thread_name_by_id; +use crate::rollout_trace::RolloutTraceRecorder; +use crate::rollout_trace::ThreadStartedTraceMetadata; use crate::session_prefix::format_subagent_notification_message; use crate::skills::SkillRenderSideEffects; use crate::skills_load_input_from_config; @@ -397,6 +399,7 @@ pub(crate) struct CodexSpawnArgs { pub(crate) metrics_service_name: Option, pub(crate) inherited_shell_snapshot: Option>, pub(crate) inherited_exec_policy: Option>, + pub(crate) inherited_rollout_trace: Option, pub(crate) user_shell_override: Option, pub(crate) parent_trace: Option, pub(crate) analytics_events_client: Option, @@ -452,6 +455,7 @@ impl Codex { inherited_shell_snapshot, user_shell_override, inherited_exec_policy, + inherited_rollout_trace, parent_trace: _, analytics_events_client, } = args; @@ -650,6 +654,7 @@ impl Codex { agent_control, environment, analytics_events_client, + inherited_rollout_trace, ) .await .map_err(|e| { diff --git a/codex-rs/core/src/session/session.rs b/codex-rs/core/src/session/session.rs index 640e59b5b712..3a3f43af5547 100644 --- a/codex-rs/core/src/session/session.rs +++ b/codex-rs/core/src/session/session.rs @@ -229,6 +229,7 @@ impl Session { agent_control: AgentControl, environment: Option>, analytics_events_client: Option, + inherited_rollout_trace: Option, ) -> anyhow::Result> { debug!( "Configuring session: model={}; provider={:?}", @@ -365,6 +366,40 @@ impl Session { let rollout_path = rollout_recorder .as_ref() .map(|rec| rec.rollout_path().to_path_buf()); + let trace_agent_path = session_configuration + .session_source + .get_agent_path() + .unwrap_or_else(codex_protocol::AgentPath::root); + let trace_task_name = + (!trace_agent_path.is_root()).then(|| trace_agent_path.name().to_string()); + let trace_metadata = ThreadStartedTraceMetadata { + thread_id: conversation_id.to_string(), + agent_path: trace_agent_path.to_string(), + task_name: trace_task_name, + nickname: session_configuration.session_source.get_nickname(), + agent_role: session_configuration.session_source.get_agent_role(), + session_source: session_configuration.session_source.clone(), + cwd: session_configuration.cwd.to_path_buf(), + rollout_path: rollout_path.clone(), + model: session_configuration.collaboration_mode.model().to_string(), + provider_name: config.model_provider_id.clone(), + approval_policy: session_configuration.approval_policy.value().to_string(), + sandbox_policy: format!("{:?}", session_configuration.sandbox_policy.get()), + }; + let rollout_trace = if let Some(rollout_trace) = inherited_rollout_trace { + rollout_trace.record_thread_started(trace_metadata); + Some(rollout_trace) + } else if matches!( + session_configuration.session_source, + SessionSource::SubAgent(SubAgentSource::ThreadSpawn { .. }) + ) { + // Spawned child threads are part of their root rollout tree. If + // the parent had no trace recorder, do not create an orphan child + // bundle that looks like an independent rollout. + None + } else { + RolloutTraceRecorder::maybe_create(conversation_id, trace_metadata) + }; let mut post_session_configured_events = Vec::::new(); @@ -644,6 +679,7 @@ impl Session { analytics_events_client, hooks, rollout: Mutex::new(rollout_recorder), + rollout_trace, user_shell: Arc::new(default_shell), shell_snapshot_tx, show_raw_agent_reasoning: config.show_raw_agent_reasoning, diff --git a/codex-rs/core/src/session/tests.rs b/codex-rs/core/src/session/tests.rs index ee72769bda6a..632408e03e65 100644 --- a/codex-rs/core/src/session/tests.rs +++ b/codex-rs/core/src/session/tests.rs @@ -2951,6 +2951,7 @@ async fn session_new_fails_when_zsh_fork_enabled_without_zsh_path() { .expect("create environment"), )), /*analytics_events_client*/ None, + /*inherited_rollout_trace*/ None, ) .await; @@ -3073,6 +3074,7 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) { ..HooksConfig::default() }), rollout: Mutex::new(None), + rollout_trace: None, user_shell: Arc::new(default_user_shell()), shell_snapshot_tx: watch::channel(None).0, show_raw_agent_reasoning: config.show_raw_agent_reasoning, @@ -3268,6 +3270,7 @@ async fn make_session_with_config_and_rx( .expect("create environment"), )), /*analytics_events_client*/ None, + /*inherited_rollout_trace*/ None, ) .await?; @@ -4164,6 +4167,7 @@ pub(crate) async fn make_session_and_context_with_dynamic_tools_and_rx( ..HooksConfig::default() }), rollout: Mutex::new(None), + rollout_trace: None, user_shell: Arc::new(default_user_shell()), shell_snapshot_tx: watch::channel(None).0, show_raw_agent_reasoning: config.show_raw_agent_reasoning, diff --git a/codex-rs/core/src/session/tests/guardian_tests.rs b/codex-rs/core/src/session/tests/guardian_tests.rs index 73a0de285a0f..8445651ab74a 100644 --- a/codex-rs/core/src/session/tests/guardian_tests.rs +++ b/codex-rs/core/src/session/tests/guardian_tests.rs @@ -650,6 +650,7 @@ async fn guardian_subagent_does_not_inherit_parent_exec_policy_rules() { metrics_service_name: None, inherited_shell_snapshot: None, inherited_exec_policy: Some(Arc::new(parent_exec_policy)), + inherited_rollout_trace: None, user_shell_override: None, parent_trace: None, analytics_events_client: None, diff --git a/codex-rs/core/src/session/turn.rs b/codex-rs/core/src/session/turn.rs index 98f572243a51..25e97c0e2e74 100644 --- a/codex-rs/core/src/session/turn.rs +++ b/codex-rs/core/src/session/turn.rs @@ -1874,8 +1874,19 @@ async fn try_run_sampling_request( auth_mode = sess.services.auth_manager.auth_mode(), features = sess.features.enabled_features(), ); + let inference_trace = sess.services.rollout_trace.as_ref().map_or_else( + codex_rollout_trace::InferenceTraceContext::disabled, + |trace| { + trace.inference_trace_context( + sess.conversation_id.to_string(), + turn_context.sub_id.clone(), + turn_context.model_info.slug.clone(), + turn_context.provider.info().name.clone(), + ) + }, + ); let mut stream = client_session - .stream( + .stream_with_trace( prompt, &turn_context.model_info, &turn_context.session_telemetry, @@ -1883,6 +1894,7 @@ async fn try_run_sampling_request( turn_context.reasoning_summary, turn_context.config.service_tier, turn_metadata_header, + &inference_trace, ) .instrument(trace_span!("stream_request")) .or_cancel(&cancellation_token) diff --git a/codex-rs/core/src/state/service.rs b/codex-rs/core/src/state/service.rs index aae02d61bdc6..62b21fe3f822 100644 --- a/codex-rs/core/src/state/service.rs +++ b/codex-rs/core/src/state/service.rs @@ -10,6 +10,7 @@ use crate::exec_policy::ExecPolicyManager; use crate::guardian::GuardianRejection; use crate::mcp::McpManager; use crate::plugins::PluginsManager; +use crate::rollout_trace::RolloutTraceRecorder; use crate::skills_watcher::SkillsWatcher; use crate::tools::code_mode::CodeModeService; use crate::tools::network_approval::NetworkApprovalService; @@ -41,6 +42,7 @@ pub(crate) struct SessionServices { pub(crate) analytics_events_client: AnalyticsEventsClient, pub(crate) hooks: Hooks, pub(crate) rollout: Mutex>, + pub(crate) rollout_trace: Option, pub(crate) user_shell: Arc, pub(crate) shell_snapshot_tx: watch::Sender>>, pub(crate) show_raw_agent_reasoning: bool, diff --git a/codex-rs/core/src/thread_manager.rs b/codex-rs/core/src/thread_manager.rs index e4da99bb55a0..d66999a90672 100644 --- a/codex-rs/core/src/thread_manager.rs +++ b/codex-rs/core/src/thread_manager.rs @@ -957,6 +957,7 @@ impl ThreadManagerState { metrics_service_name, inherited_shell_snapshot, inherited_exec_policy, + inherited_rollout_trace: None, user_shell_override, parent_trace, analytics_events_client: self.analytics_events_client.clone(), From c45ca665d996907ecc45ed8f53586c4fec7901b8 Mon Sep 17 00:00:00 2001 From: Albin Cassirer Date: Tue, 21 Apr 2026 14:57:39 -0700 Subject: [PATCH 2/6] Move rollout trace recorder into trace crate --- codex-rs/Cargo.lock | 2 + codex-rs/core/src/compact_remote.rs | 2 +- codex-rs/core/src/lib.rs | 1 - codex-rs/core/src/session/mod.rs | 4 +- codex-rs/core/src/state/service.rs | 2 +- codex-rs/rollout-trace/Cargo.toml | 2 + codex-rs/rollout-trace/src/lib.rs | 9 +++ .../src/recorder.rs} | 75 +++++++++---------- .../src/recorder_tests.rs} | 9 ++- 9 files changed, 57 insertions(+), 49 deletions(-) rename codex-rs/{core/src/rollout_trace.rs => rollout-trace/src/recorder.rs} (80%) rename codex-rs/{core/src/rollout_trace_tests.rs => rollout-trace/src/recorder_tests.rs} (94%) diff --git a/codex-rs/Cargo.lock b/codex-rs/Cargo.lock index 74114b63081c..4716ac790573 100644 --- a/codex-rs/Cargo.lock +++ b/codex-rs/Cargo.lock @@ -2799,6 +2799,8 @@ dependencies = [ "serde", "serde_json", "tempfile", + "tracing", + "uuid", ] [[package]] diff --git a/codex-rs/core/src/compact_remote.rs b/codex-rs/core/src/compact_remote.rs index bc147b7f7ead..6e0383efd032 100644 --- a/codex-rs/core/src/compact_remote.rs +++ b/codex-rs/core/src/compact_remote.rs @@ -10,7 +10,6 @@ use crate::context_manager::ContextManager; use crate::context_manager::TotalTokenUsageBreakdown; use crate::context_manager::estimate_response_item_model_visible_bytes; use crate::context_manager::is_codex_generated_item; -use crate::rollout_trace::CompactionCheckpointTracePayload; use crate::session::session::Session; use crate::session::turn::built_tools; use crate::session::turn_context::TurnContext; @@ -27,6 +26,7 @@ use codex_protocol::models::ResponseItem; use codex_protocol::protocol::CompactedItem; use codex_protocol::protocol::EventMsg; use codex_protocol::protocol::TurnStartedEvent; +use codex_rollout_trace::CompactionCheckpointTracePayload; use codex_rollout_trace::CompactionTraceContext; use futures::TryFutureExt; use tokio_util::sync::CancellationToken; diff --git a/codex-rs/core/src/lib.rs b/codex-rs/core/src/lib.rs index 00ef56f20f73..373e57737be0 100644 --- a/codex-rs/core/src/lib.rs +++ b/codex-rs/core/src/lib.rs @@ -133,7 +133,6 @@ pub use agents_md::AgentsMdManager; pub use agents_md::DEFAULT_AGENTS_MD_FILENAME; pub use agents_md::LOCAL_AGENTS_MD_FILENAME; mod rollout; -mod rollout_trace; pub(crate) mod safety; mod session_rollout_init_error; pub mod shell; diff --git a/codex-rs/core/src/session/mod.rs b/codex-rs/core/src/session/mod.rs index 05fdeba33d35..d84bf698c4e4 100644 --- a/codex-rs/core/src/session/mod.rs +++ b/codex-rs/core/src/session/mod.rs @@ -35,8 +35,6 @@ use crate::parse_turn_item; use crate::path_utils::normalize_for_native_workdir; use crate::realtime_conversation::RealtimeConversationManager; use crate::rollout::find_thread_name_by_id; -use crate::rollout_trace::RolloutTraceRecorder; -use crate::rollout_trace::ThreadStartedTraceMetadata; use crate::session_prefix::format_subagent_notification_message; use crate::skills::SkillRenderSideEffects; use crate::skills_load_input_from_config; @@ -122,6 +120,8 @@ use codex_protocol::request_user_input::RequestUserInputResponse; use codex_rmcp_client::ElicitationResponse; use codex_rollout::RolloutConfig; use codex_rollout::state_db; +use codex_rollout_trace::RolloutTraceRecorder; +use codex_rollout_trace::ThreadStartedTraceMetadata; use codex_sandboxing::policy_transforms::intersect_permission_profiles; use codex_shell_command::parse_command::parse_command; use codex_terminal_detection::user_agent; diff --git a/codex-rs/core/src/state/service.rs b/codex-rs/core/src/state/service.rs index 62b21fe3f822..b9e3dc3c7996 100644 --- a/codex-rs/core/src/state/service.rs +++ b/codex-rs/core/src/state/service.rs @@ -10,7 +10,6 @@ use crate::exec_policy::ExecPolicyManager; use crate::guardian::GuardianRejection; use crate::mcp::McpManager; use crate::plugins::PluginsManager; -use crate::rollout_trace::RolloutTraceRecorder; use crate::skills_watcher::SkillsWatcher; use crate::tools::code_mode::CodeModeService; use crate::tools::network_approval::NetworkApprovalService; @@ -24,6 +23,7 @@ use codex_mcp::McpConnectionManager; use codex_models_manager::manager::ModelsManager; use codex_otel::SessionTelemetry; use codex_rollout::state_db::StateDbHandle; +use codex_rollout_trace::RolloutTraceRecorder; use codex_thread_store::LocalThreadStore; use std::path::PathBuf; use tokio::sync::Mutex; diff --git a/codex-rs/rollout-trace/Cargo.toml b/codex-rs/rollout-trace/Cargo.toml index e67b35953a5b..540f794fb7a2 100644 --- a/codex-rs/rollout-trace/Cargo.toml +++ b/codex-rs/rollout-trace/Cargo.toml @@ -17,6 +17,8 @@ anyhow = { workspace = true } codex-protocol = { workspace = true } serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true } +tracing = { workspace = true } +uuid = { workspace = true } [dev-dependencies] pretty_assertions = { workspace = true } diff --git a/codex-rs/rollout-trace/src/lib.rs b/codex-rs/rollout-trace/src/lib.rs index 5c8022d6d711..b03c6ca3447e 100644 --- a/codex-rs/rollout-trace/src/lib.rs +++ b/codex-rs/rollout-trace/src/lib.rs @@ -12,6 +12,7 @@ mod inference; mod model; mod payload; mod raw_event; +mod recorder; mod reducer; mod writer; @@ -43,6 +44,14 @@ pub use raw_event::RawTraceEvent; pub use raw_event::RawTraceEventContext; /// Typed payload for one raw trace event. pub use raw_event::RawTraceEventPayload; +/// Environment variable that enables local trace-bundle recording. +pub use recorder::CODEX_ROLLOUT_TRACE_ROOT_ENV; +/// Raw checkpoint payload for a remote compaction install event. +pub use recorder::CompactionCheckpointTracePayload; +/// Best-effort hot-path recorder for one rollout trace bundle. +pub use recorder::RolloutTraceRecorder; +/// Raw metadata captured when a thread starts. +pub use recorder::ThreadStartedTraceMetadata; /// Replay a raw trace bundle and write/read its reduced `RolloutTrace`. pub use reducer::replay_bundle; /// Append-only writer used by hot-path Codex instrumentation. diff --git a/codex-rs/core/src/rollout_trace.rs b/codex-rs/rollout-trace/src/recorder.rs similarity index 80% rename from codex-rs/core/src/rollout_trace.rs rename to codex-rs/rollout-trace/src/recorder.rs index b66f61f93390..08b9d1023c36 100644 --- a/codex-rs/core/src/rollout_trace.rs +++ b/codex-rs/rollout-trace/src/recorder.rs @@ -1,8 +1,4 @@ -//! Opt-in producer for the rollout trace bundle. -//! -//! This module is the deliberately thin bridge from `codex-core` into -//! `codex-rollout-trace`. Core emits raw observations; the trace crate's -//! offline reducer owns the semantic graph. +//! Opt-in hot-path producer for rollout trace bundles. use std::path::Path; use std::path::PathBuf; @@ -11,31 +7,33 @@ use std::sync::Arc; use codex_protocol::ThreadId; use codex_protocol::models::ResponseItem; use codex_protocol::protocol::SessionSource; -use codex_rollout_trace::AgentThreadId; -use codex_rollout_trace::CompactionTraceContext; -use codex_rollout_trace::InferenceTraceContext; -use codex_rollout_trace::RawPayloadKind; -use codex_rollout_trace::RawTraceEventContext; -use codex_rollout_trace::RawTraceEventPayload; -use codex_rollout_trace::TraceWriter; use serde::Serialize; use tracing::debug; use tracing::warn; use uuid::Uuid; +use crate::AgentThreadId; +use crate::CompactionTraceContext; +use crate::InferenceTraceContext; +use crate::RawPayloadKind; +use crate::RawPayloadRef; +use crate::RawTraceEventContext; +use crate::RawTraceEventPayload; +use crate::TraceWriter; + /// Environment variable that enables local trace-bundle recording. /// /// The value is a root directory. Each independent root session gets one child /// bundle directory. Spawned child threads share their root session's bundle so /// one reduced `state.json` describes the whole multi-agent rollout tree. -pub(crate) const CODEX_ROLLOUT_TRACE_ROOT_ENV: &str = "CODEX_ROLLOUT_TRACE_ROOT"; +pub const CODEX_ROLLOUT_TRACE_ROOT_ENV: &str = "CODEX_ROLLOUT_TRACE_ROOT"; /// Lightweight handle stored in `SessionServices`. /// /// Cloning the handle is cheap; all sequencing and file ownership remains /// inside `TraceWriter`. #[derive(Clone, Debug)] -pub(crate) struct RolloutTraceRecorder { +pub struct RolloutTraceRecorder { writer: Arc, } @@ -44,19 +42,19 @@ pub(crate) struct RolloutTraceRecorder { /// This payload is intentionally operational rather than reduced: it is a raw /// payload that later reducers can mine as the reduced thread model evolves. #[derive(Serialize)] -pub(crate) struct ThreadStartedTraceMetadata { - pub(crate) thread_id: String, - pub(crate) agent_path: String, - pub(crate) task_name: Option, - pub(crate) nickname: Option, - pub(crate) agent_role: Option, - pub(crate) session_source: SessionSource, - pub(crate) cwd: PathBuf, - pub(crate) rollout_path: Option, - pub(crate) model: String, - pub(crate) provider_name: String, - pub(crate) approval_policy: String, - pub(crate) sandbox_policy: String, +pub struct ThreadStartedTraceMetadata { + pub thread_id: String, + pub agent_path: String, + pub task_name: Option, + pub nickname: Option, + pub agent_role: Option, + pub session_source: SessionSource, + pub cwd: PathBuf, + pub rollout_path: Option, + pub model: String, + pub provider_name: String, + pub approval_policy: String, + pub sandbox_policy: String, } /// History replacement checkpoint persisted when compaction installs new live history. @@ -65,9 +63,9 @@ pub(crate) struct ThreadStartedTraceMetadata { /// `input_history` is the live thread history selected for compaction, while /// `replacement_history` is what future prompts may carry after the checkpoint. #[derive(Serialize)] -pub(crate) struct CompactionCheckpointTracePayload<'a> { - pub(crate) input_history: &'a [ResponseItem], - pub(crate) replacement_history: &'a [ResponseItem], +pub struct CompactionCheckpointTracePayload<'a> { + pub input_history: &'a [ResponseItem], + pub replacement_history: &'a [ResponseItem], } impl RolloutTraceRecorder { @@ -76,10 +74,7 @@ impl RolloutTraceRecorder { /// Trace startup is best-effort. A tracing failure must not make the Codex /// session unusable, because traces are diagnostic and can be enabled while /// debugging unrelated production failures. - pub(crate) fn maybe_create( - thread_id: ThreadId, - metadata: ThreadStartedTraceMetadata, - ) -> Option { + pub fn maybe_create(thread_id: ThreadId, metadata: ThreadStartedTraceMetadata) -> Option { let root = std::env::var_os(CODEX_ROLLOUT_TRACE_ROOT_ENV)?; let root = PathBuf::from(root); match Self::create_in_root(root.as_path(), thread_id, metadata) { @@ -126,7 +121,7 @@ impl RolloutTraceRecorder { /// child sessions call it on the inherited recorder. Keeping children in /// the root bundle preserves one raw payload namespace and one reduced /// `RolloutTrace` for the whole multi-agent task. - pub(crate) fn record_thread_started(&self, metadata: ThreadStartedTraceMetadata) { + pub fn record_thread_started(&self, metadata: ThreadStartedTraceMetadata) { let metadata_payload = self.write_json_payload_best_effort(RawPayloadKind::SessionMetadata, &metadata); self.append_best_effort(RawTraceEventPayload::ThreadStarted { @@ -141,7 +136,7 @@ impl RolloutTraceRecorder { /// The returned context is intentionally not "an inference call" yet. /// Transport code owns retry/fallback attempts and calls `start_attempt` /// only after it has built the concrete request payload for that attempt. - pub(crate) fn inference_trace_context( + pub fn inference_trace_context( &self, thread_id: AgentThreadId, codex_turn_id: String, @@ -163,7 +158,7 @@ impl RolloutTraceRecorder { /// The compact endpoint is a model-facing request whose output replaces live history, so it /// needs both request/response attempt events and a later checkpoint event when processed /// replacement history is installed. - pub(crate) fn compaction_trace_context( + pub fn compaction_trace_context( &self, thread_id: AgentThreadId, codex_turn_id: String, @@ -187,7 +182,7 @@ impl RolloutTraceRecorder { /// and reinjects context before replacement history becomes live. The reducer uses this event /// to connect the pre-compaction history to the processed replacement items without treating /// repeated developer/context prefix items as part of the replacement itself. - pub(crate) fn record_compaction_installed( + pub fn record_compaction_installed( &self, thread_id: AgentThreadId, codex_turn_id: String, @@ -213,7 +208,7 @@ impl RolloutTraceRecorder { &self, kind: RawPayloadKind, payload: &impl Serialize, - ) -> Option { + ) -> Option { match self.writer.write_json_payload(kind, payload) { Ok(payload_ref) => Some(payload_ref), Err(err) => { @@ -246,5 +241,5 @@ impl RolloutTraceRecorder { } #[cfg(test)] -#[path = "rollout_trace_tests.rs"] +#[path = "recorder_tests.rs"] mod tests; diff --git a/codex-rs/core/src/rollout_trace_tests.rs b/codex-rs/rollout-trace/src/recorder_tests.rs similarity index 94% rename from codex-rs/core/src/rollout_trace_tests.rs rename to codex-rs/rollout-trace/src/recorder_tests.rs index e69cc88c9ba5..08b0cfbbd086 100644 --- a/codex-rs/core/src/rollout_trace_tests.rs +++ b/codex-rs/rollout-trace/src/recorder_tests.rs @@ -7,10 +7,11 @@ use codex_protocol::ThreadId; use codex_protocol::protocol::SandboxPolicy; use codex_protocol::protocol::SessionSource; use codex_protocol::protocol::SubAgentSource; -use codex_rollout_trace::RolloutStatus; use tempfile::TempDir; use super::*; +use crate::RolloutStatus; +use crate::replay_bundle; #[test] fn create_in_root_writes_replayable_lifecycle_events() -> anyhow::Result<()> { @@ -37,7 +38,7 @@ fn create_in_root_writes_replayable_lifecycle_events() -> anyhow::Result<()> { .expect("trace recorder"); let bundle_dir = single_bundle_dir(temp.path())?; - let replayed = codex_rollout_trace::replay_bundle(&bundle_dir)?; + let replayed = replay_bundle(&bundle_dir)?; assert_eq!(replayed.status, RolloutStatus::Running); assert_eq!(replayed.root_thread_id, thread_id.to_string()); @@ -82,7 +83,7 @@ fn spawned_thread_start_appends_to_root_bundle() -> anyhow::Result<()> { sandbox_policy: format!("{:?}", SandboxPolicy::DangerFullAccess), }); let bundle_dir = single_bundle_dir(temp.path())?; - let replayed = codex_rollout_trace::replay_bundle(&bundle_dir)?; + let replayed = replay_bundle(&bundle_dir)?; assert_eq!(fs::read_dir(temp.path())?.count(), 1); assert_eq!(replayed.threads.len(), 2); @@ -95,7 +96,7 @@ fn spawned_thread_start_appends_to_root_bundle() -> anyhow::Result<()> { replayed.threads[&child_thread_id.to_string()] .execution .status, - codex_rollout_trace::ExecutionStatus::Running + crate::ExecutionStatus::Running ); assert_eq!(replayed.raw_payloads.len(), 2); From d47f44116790a4e32ef9223d40c12920bfe2f2c3 Mon Sep 17 00:00:00 2001 From: Albin Cassirer Date: Tue, 21 Apr 2026 15:23:12 -0700 Subject: [PATCH 3/6] Make rollout trace recorder no-op capable --- codex-rs/core/src/codex_delegate.rs | 2 +- codex-rs/core/src/compact_remote.rs | 44 ++++----- codex-rs/core/src/session/mod.rs | 3 +- codex-rs/core/src/session/session.rs | 12 ++- codex-rs/core/src/session/tests.rs | 8 +- .../core/src/session/tests/guardian_tests.rs | 2 +- codex-rs/core/src/session/turn.rs | 15 ++-- codex-rs/core/src/state/service.rs | 2 +- codex-rs/core/src/thread_manager.rs | 2 +- codex-rs/rollout-trace/src/recorder.rs | 88 +++++++++++++----- codex-rs/rollout-trace/src/recorder_tests.rs | 90 +++++++++++++------ 11 files changed, 167 insertions(+), 101 deletions(-) diff --git a/codex-rs/core/src/codex_delegate.rs b/codex-rs/core/src/codex_delegate.rs index a06698590ba5..8871228d7ea3 100644 --- a/codex-rs/core/src/codex_delegate.rs +++ b/codex-rs/core/src/codex_delegate.rs @@ -94,7 +94,7 @@ pub(crate) async fn run_codex_thread_interactive( inherited_shell_snapshot: None, user_shell_override: None, inherited_exec_policy: Some(Arc::clone(&parent_session.services.exec_policy)), - inherited_rollout_trace: None, + inherited_rollout_trace: codex_rollout_trace::RolloutTraceRecorder::disabled(), parent_trace: None, analytics_events_client: Some(parent_session.services.analytics_events_client.clone()), })) diff --git a/codex-rs/core/src/compact_remote.rs b/codex-rs/core/src/compact_remote.rs index 6e0383efd032..105c81ddfe7d 100644 --- a/codex-rs/core/src/compact_remote.rs +++ b/codex-rs/core/src/compact_remote.rs @@ -27,7 +27,6 @@ use codex_protocol::protocol::CompactedItem; use codex_protocol::protocol::EventMsg; use codex_protocol::protocol::TurnStartedEvent; use codex_rollout_trace::CompactionCheckpointTracePayload; -use codex_rollout_trace::CompactionTraceContext; use futures::TryFutureExt; use tokio_util::sync::CancellationToken; use tracing::error; @@ -170,17 +169,12 @@ async fn run_remote_compact_task_inner_impl( // Remote compaction is the only compaction shape rollout tracing supports. The trace context // records the exact `/responses/compact` request and response; normal sampling requests remain // traced through the inference path. - let compaction_trace = sess.services.rollout_trace.as_ref().map_or_else( - CompactionTraceContext::disabled, - |trace| { - trace.compaction_trace_context( - sess.conversation_id.to_string(), - turn_context.sub_id.clone(), - compaction_id.clone(), - turn_context.model_info.slug.clone(), - turn_context.provider.info().name.clone(), - ) - }, + let compaction_trace = sess.services.rollout_trace.compaction_trace_context( + sess.conversation_id.to_string(), + turn_context.sub_id.clone(), + compaction_id.clone(), + turn_context.model_info.slug.clone(), + turn_context.provider.info().name.clone(), ); let mut new_history = sess @@ -226,20 +220,18 @@ async fn run_remote_compact_task_inner_impl( message: String::new(), replacement_history: Some(new_history.clone()), }; - if let Some(trace) = sess.services.rollout_trace.as_ref() { - // Install is the semantic boundary where the compact endpoint's output becomes live - // thread history. Keep it distinct from the later inference request so the reducer can - // still represent repeated developer/context prefix items exactly as the model saw them. - trace.record_compaction_installed( - sess.conversation_id.to_string(), - turn_context.sub_id.clone(), - compaction_id, - &CompactionCheckpointTracePayload { - input_history: &trace_input_history, - replacement_history: &new_history, - }, - ); - } + // Install is the semantic boundary where the compact endpoint's output becomes live + // thread history. Keep it distinct from the later inference request so the reducer can + // still represent repeated developer/context prefix items exactly as the model saw them. + sess.services.rollout_trace.record_compaction_installed( + sess.conversation_id.to_string(), + turn_context.sub_id.clone(), + compaction_id, + &CompactionCheckpointTracePayload { + input_history: &trace_input_history, + replacement_history: &new_history, + }, + ); sess.replace_compacted_history(new_history, reference_context_item, compacted_item) .await; sess.recompute_token_usage(turn_context).await; diff --git a/codex-rs/core/src/session/mod.rs b/codex-rs/core/src/session/mod.rs index d84bf698c4e4..93f283ee66e0 100644 --- a/codex-rs/core/src/session/mod.rs +++ b/codex-rs/core/src/session/mod.rs @@ -399,7 +399,8 @@ pub(crate) struct CodexSpawnArgs { pub(crate) metrics_service_name: Option, pub(crate) inherited_shell_snapshot: Option>, pub(crate) inherited_exec_policy: Option>, - pub(crate) inherited_rollout_trace: Option, + /// Parent rollout-tree recorder, or a disabled recorder when this spawn has no parent trace. + pub(crate) inherited_rollout_trace: RolloutTraceRecorder, pub(crate) user_shell_override: Option, pub(crate) parent_trace: Option, pub(crate) analytics_events_client: Option, diff --git a/codex-rs/core/src/session/session.rs b/codex-rs/core/src/session/session.rs index 3a3f43af5547..28760f539d1c 100644 --- a/codex-rs/core/src/session/session.rs +++ b/codex-rs/core/src/session/session.rs @@ -229,7 +229,7 @@ impl Session { agent_control: AgentControl, environment: Option>, analytics_events_client: Option, - inherited_rollout_trace: Option, + inherited_rollout_trace: RolloutTraceRecorder, ) -> anyhow::Result> { debug!( "Configuring session: model={}; provider={:?}", @@ -386,20 +386,18 @@ impl Session { approval_policy: session_configuration.approval_policy.value().to_string(), sandbox_policy: format!("{:?}", session_configuration.sandbox_policy.get()), }; - let rollout_trace = if let Some(rollout_trace) = inherited_rollout_trace { - rollout_trace.record_thread_started(trace_metadata); - Some(rollout_trace) - } else if matches!( + let rollout_trace = if matches!( session_configuration.session_source, SessionSource::SubAgent(SubAgentSource::ThreadSpawn { .. }) ) { // Spawned child threads are part of their root rollout tree. If // the parent had no trace recorder, do not create an orphan child // bundle that looks like an independent rollout. - None + inherited_rollout_trace } else { - RolloutTraceRecorder::maybe_create(conversation_id, trace_metadata) + RolloutTraceRecorder::create_root_or_disabled(conversation_id) }; + rollout_trace.record_thread_started(trace_metadata); let mut post_session_configured_events = Vec::::new(); diff --git a/codex-rs/core/src/session/tests.rs b/codex-rs/core/src/session/tests.rs index 632408e03e65..d3a7fac8d659 100644 --- a/codex-rs/core/src/session/tests.rs +++ b/codex-rs/core/src/session/tests.rs @@ -2951,7 +2951,7 @@ async fn session_new_fails_when_zsh_fork_enabled_without_zsh_path() { .expect("create environment"), )), /*analytics_events_client*/ None, - /*inherited_rollout_trace*/ None, + RolloutTraceRecorder::disabled(), ) .await; @@ -3074,7 +3074,7 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) { ..HooksConfig::default() }), rollout: Mutex::new(None), - rollout_trace: None, + rollout_trace: RolloutTraceRecorder::disabled(), user_shell: Arc::new(default_user_shell()), shell_snapshot_tx: watch::channel(None).0, show_raw_agent_reasoning: config.show_raw_agent_reasoning, @@ -3270,7 +3270,7 @@ async fn make_session_with_config_and_rx( .expect("create environment"), )), /*analytics_events_client*/ None, - /*inherited_rollout_trace*/ None, + RolloutTraceRecorder::disabled(), ) .await?; @@ -4167,7 +4167,7 @@ pub(crate) async fn make_session_and_context_with_dynamic_tools_and_rx( ..HooksConfig::default() }), rollout: Mutex::new(None), - rollout_trace: None, + rollout_trace: RolloutTraceRecorder::disabled(), user_shell: Arc::new(default_user_shell()), shell_snapshot_tx: watch::channel(None).0, show_raw_agent_reasoning: config.show_raw_agent_reasoning, diff --git a/codex-rs/core/src/session/tests/guardian_tests.rs b/codex-rs/core/src/session/tests/guardian_tests.rs index 8445651ab74a..d7b354454f1a 100644 --- a/codex-rs/core/src/session/tests/guardian_tests.rs +++ b/codex-rs/core/src/session/tests/guardian_tests.rs @@ -650,7 +650,7 @@ async fn guardian_subagent_does_not_inherit_parent_exec_policy_rules() { metrics_service_name: None, inherited_shell_snapshot: None, inherited_exec_policy: Some(Arc::new(parent_exec_policy)), - inherited_rollout_trace: None, + inherited_rollout_trace: RolloutTraceRecorder::disabled(), user_shell_override: None, parent_trace: None, analytics_events_client: None, diff --git a/codex-rs/core/src/session/turn.rs b/codex-rs/core/src/session/turn.rs index 25e97c0e2e74..49447a860470 100644 --- a/codex-rs/core/src/session/turn.rs +++ b/codex-rs/core/src/session/turn.rs @@ -1874,16 +1874,11 @@ async fn try_run_sampling_request( auth_mode = sess.services.auth_manager.auth_mode(), features = sess.features.enabled_features(), ); - let inference_trace = sess.services.rollout_trace.as_ref().map_or_else( - codex_rollout_trace::InferenceTraceContext::disabled, - |trace| { - trace.inference_trace_context( - sess.conversation_id.to_string(), - turn_context.sub_id.clone(), - turn_context.model_info.slug.clone(), - turn_context.provider.info().name.clone(), - ) - }, + let inference_trace = sess.services.rollout_trace.inference_trace_context( + sess.conversation_id.to_string(), + turn_context.sub_id.clone(), + turn_context.model_info.slug.clone(), + turn_context.provider.info().name.clone(), ); let mut stream = client_session .stream_with_trace( diff --git a/codex-rs/core/src/state/service.rs b/codex-rs/core/src/state/service.rs index b9e3dc3c7996..b7e2aeb3c9e2 100644 --- a/codex-rs/core/src/state/service.rs +++ b/codex-rs/core/src/state/service.rs @@ -42,7 +42,7 @@ pub(crate) struct SessionServices { pub(crate) analytics_events_client: AnalyticsEventsClient, pub(crate) hooks: Hooks, pub(crate) rollout: Mutex>, - pub(crate) rollout_trace: Option, + pub(crate) rollout_trace: RolloutTraceRecorder, pub(crate) user_shell: Arc, pub(crate) shell_snapshot_tx: watch::Sender>>, pub(crate) show_raw_agent_reasoning: bool, diff --git a/codex-rs/core/src/thread_manager.rs b/codex-rs/core/src/thread_manager.rs index d66999a90672..7b6a4730473b 100644 --- a/codex-rs/core/src/thread_manager.rs +++ b/codex-rs/core/src/thread_manager.rs @@ -957,7 +957,7 @@ impl ThreadManagerState { metrics_service_name, inherited_shell_snapshot, inherited_exec_policy, - inherited_rollout_trace: None, + inherited_rollout_trace: codex_rollout_trace::RolloutTraceRecorder::disabled(), user_shell_override, parent_trace, analytics_events_client: self.analytics_events_client.clone(), diff --git a/codex-rs/rollout-trace/src/recorder.rs b/codex-rs/rollout-trace/src/recorder.rs index 08b9d1023c36..faceb5ce5c4e 100644 --- a/codex-rs/rollout-trace/src/recorder.rs +++ b/codex-rs/rollout-trace/src/recorder.rs @@ -31,9 +31,22 @@ pub const CODEX_ROLLOUT_TRACE_ROOT_ENV: &str = "CODEX_ROLLOUT_TRACE_ROOT"; /// Lightweight handle stored in `SessionServices`. /// /// Cloning the handle is cheap; all sequencing and file ownership remains -/// inside `TraceWriter`. +/// inside `TraceWriter`. Disabled handles intentionally accept the same calls +/// as enabled handles so hot-path session code can describe traceable events +/// without repeatedly branching on whether diagnostic recording is enabled. #[derive(Clone, Debug)] pub struct RolloutTraceRecorder { + state: RolloutTraceRecorderState, +} + +#[derive(Clone, Debug)] +enum RolloutTraceRecorderState { + Disabled, + Enabled(EnabledRolloutTraceRecorder), +} + +#[derive(Clone, Debug)] +struct EnabledRolloutTraceRecorder { writer: Arc, } @@ -69,28 +82,35 @@ pub struct CompactionCheckpointTracePayload<'a> { } impl RolloutTraceRecorder { - /// Creates and starts a trace bundle if `CODEX_ROLLOUT_TRACE_ROOT` is set. + /// Builds a recorder handle that accepts trace calls and records nothing. + pub fn disabled() -> Self { + Self { + state: RolloutTraceRecorderState::Disabled, + } + } + + /// Creates and starts a root trace bundle, or returns a disabled recorder. /// /// Trace startup is best-effort. A tracing failure must not make the Codex /// session unusable, because traces are diagnostic and can be enabled while - /// debugging unrelated production failures. - pub fn maybe_create(thread_id: ThreadId, metadata: ThreadStartedTraceMetadata) -> Option { - let root = std::env::var_os(CODEX_ROLLOUT_TRACE_ROOT_ENV)?; + /// debugging unrelated production failures. The returned recorder has not + /// emitted `ThreadStarted`; session setup records that event uniformly for + /// root and inherited child recorders. + pub fn create_root_or_disabled(thread_id: ThreadId) -> Self { + let Some(root) = std::env::var_os(CODEX_ROLLOUT_TRACE_ROOT_ENV) else { + return Self::disabled(); + }; let root = PathBuf::from(root); - match Self::create_in_root(root.as_path(), thread_id, metadata) { - Ok(recorder) => Some(recorder), + match Self::create_in_root(root.as_path(), thread_id) { + Ok(recorder) => recorder, Err(err) => { warn!("failed to initialize rollout trace recorder: {err:#}"); - None + Self::disabled() } } } - fn create_in_root( - root: &Path, - thread_id: ThreadId, - metadata: ThreadStartedTraceMetadata, - ) -> anyhow::Result { + fn create_in_root(root: &Path, thread_id: ThreadId) -> anyhow::Result { let trace_id = Uuid::new_v4().to_string(); let thread_id = thread_id.to_string(); let bundle_dir = root.join(format!("trace-{trace_id}-{thread_id}")); @@ -100,7 +120,7 @@ impl RolloutTraceRecorder { thread_id.clone(), thread_id.clone(), )?; - let recorder = Self { + let recorder = EnabledRolloutTraceRecorder { writer: Arc::new(writer), }; @@ -109,10 +129,14 @@ impl RolloutTraceRecorder { root_thread_id: thread_id, }); - recorder.record_thread_started(metadata); - debug!("recording rollout trace at {}", bundle_dir.display()); - Ok(recorder) + Ok(Self::enabled(recorder)) + } + + fn enabled(inner: EnabledRolloutTraceRecorder) -> Self { + Self { + state: RolloutTraceRecorderState::Enabled(inner), + } } /// Emits the lifecycle event and metadata for one thread in this rollout tree. @@ -122,9 +146,12 @@ impl RolloutTraceRecorder { /// the root bundle preserves one raw payload namespace and one reduced /// `RolloutTrace` for the whole multi-agent task. pub fn record_thread_started(&self, metadata: ThreadStartedTraceMetadata) { + let RolloutTraceRecorderState::Enabled(recorder) = &self.state else { + return; + }; let metadata_payload = - self.write_json_payload_best_effort(RawPayloadKind::SessionMetadata, &metadata); - self.append_best_effort(RawTraceEventPayload::ThreadStarted { + recorder.write_json_payload_best_effort(RawPayloadKind::SessionMetadata, &metadata); + recorder.append_best_effort(RawTraceEventPayload::ThreadStarted { thread_id: metadata.thread_id, agent_path: metadata.agent_path, metadata_payload, @@ -143,8 +170,12 @@ impl RolloutTraceRecorder { model: String, provider_name: String, ) -> InferenceTraceContext { + let RolloutTraceRecorderState::Enabled(recorder) = &self.state else { + return InferenceTraceContext::disabled(); + }; + InferenceTraceContext::enabled( - Arc::clone(&self.writer), + Arc::clone(&recorder.writer), thread_id, codex_turn_id, model, @@ -166,8 +197,12 @@ impl RolloutTraceRecorder { model: String, provider_name: String, ) -> CompactionTraceContext { + let RolloutTraceRecorderState::Enabled(recorder) = &self.state else { + return CompactionTraceContext::disabled(); + }; + CompactionTraceContext::enabled( - Arc::clone(&self.writer), + Arc::clone(&recorder.writer), thread_id, codex_turn_id, compaction_id, @@ -189,12 +224,15 @@ impl RolloutTraceRecorder { compaction_id: String, checkpoint: &CompactionCheckpointTracePayload<'_>, ) { - let Some(checkpoint_payload) = - self.write_json_payload_best_effort(RawPayloadKind::CompactionCheckpoint, checkpoint) + let RolloutTraceRecorderState::Enabled(recorder) = &self.state else { + return; + }; + let Some(checkpoint_payload) = recorder + .write_json_payload_best_effort(RawPayloadKind::CompactionCheckpoint, checkpoint) else { return; }; - self.append_with_context_best_effort( + recorder.append_with_context_best_effort( thread_id, codex_turn_id, RawTraceEventPayload::CompactionInstalled { @@ -203,7 +241,9 @@ impl RolloutTraceRecorder { }, ); } +} +impl EnabledRolloutTraceRecorder { fn write_json_payload_best_effort( &self, kind: RawPayloadKind, diff --git a/codex-rs/rollout-trace/src/recorder_tests.rs b/codex-rs/rollout-trace/src/recorder_tests.rs index 08b0cfbbd086..d5c18efba028 100644 --- a/codex-rs/rollout-trace/src/recorder_tests.rs +++ b/codex-rs/rollout-trace/src/recorder_tests.rs @@ -17,25 +17,22 @@ use crate::replay_bundle; fn create_in_root_writes_replayable_lifecycle_events() -> anyhow::Result<()> { let temp = TempDir::new()?; let thread_id = ThreadId::new(); - RolloutTraceRecorder::create_in_root( - temp.path(), - thread_id, - ThreadStartedTraceMetadata { - thread_id: thread_id.to_string(), - agent_path: "/root".to_string(), - task_name: None, - nickname: None, - agent_role: None, - session_source: SessionSource::Exec, - cwd: PathBuf::from("/workspace"), - rollout_path: Some(PathBuf::from("/tmp/rollout.jsonl")), - model: "gpt-test".to_string(), - provider_name: "test-provider".to_string(), - approval_policy: "never".to_string(), - sandbox_policy: format!("{:?}", SandboxPolicy::DangerFullAccess), - }, - ) - .expect("trace recorder"); + let recorder = + RolloutTraceRecorder::create_in_root(temp.path(), thread_id).expect("trace recorder"); + recorder.record_thread_started(ThreadStartedTraceMetadata { + thread_id: thread_id.to_string(), + agent_path: "/root".to_string(), + task_name: None, + nickname: None, + agent_role: None, + session_source: SessionSource::Exec, + cwd: PathBuf::from("/workspace"), + rollout_path: Some(PathBuf::from("/tmp/rollout.jsonl")), + model: "gpt-test".to_string(), + provider_name: "test-provider".to_string(), + approval_policy: "never".to_string(), + sandbox_policy: format!("{:?}", SandboxPolicy::DangerFullAccess), + }); let bundle_dir = single_bundle_dir(temp.path())?; let replayed = replay_bundle(&bundle_dir)?; @@ -53,12 +50,9 @@ fn spawned_thread_start_appends_to_root_bundle() -> anyhow::Result<()> { let temp = TempDir::new()?; let root_thread_id = ThreadId::new(); let child_thread_id = ThreadId::new(); - let recorder = RolloutTraceRecorder::create_in_root( - temp.path(), - root_thread_id, - minimal_metadata(root_thread_id), - ) - .expect("trace recorder"); + let recorder = + RolloutTraceRecorder::create_in_root(temp.path(), root_thread_id).expect("trace recorder"); + recorder.record_thread_started(minimal_metadata(root_thread_id)); recorder.record_thread_started(ThreadStartedTraceMetadata { thread_id: child_thread_id.to_string(), @@ -103,6 +97,52 @@ fn spawned_thread_start_appends_to_root_bundle() -> anyhow::Result<()> { Ok(()) } +#[test] +fn disabled_recorder_accepts_trace_calls_without_writing() -> anyhow::Result<()> { + let temp = TempDir::new()?; + let thread_id = ThreadId::new(); + let recorder = RolloutTraceRecorder::disabled(); + + recorder.record_thread_started(minimal_metadata(thread_id)); + + let inference_trace = recorder.inference_trace_context( + thread_id.to_string(), + "turn-1".to_string(), + "gpt-test".to_string(), + "test-provider".to_string(), + ); + let inference_attempt = inference_trace.start_attempt(); + inference_attempt.record_started(&serde_json::json!({ "kind": "inference" })); + let token_usage: Option = None; + inference_attempt.record_completed("response-1", &token_usage, &[]); + inference_attempt.record_failed("inference failed"); + + let compaction_trace = recorder.compaction_trace_context( + thread_id.to_string(), + "turn-1".to_string(), + "compaction-1".to_string(), + "gpt-test".to_string(), + "test-provider".to_string(), + ); + let compaction_attempt = + compaction_trace.start_attempt(&serde_json::json!({ "kind": "compaction" })); + compaction_attempt.record_completed(&[]); + compaction_attempt.record_failed("compaction failed"); + recorder.record_compaction_installed( + thread_id.to_string(), + "turn-1".to_string(), + "compaction-1".to_string(), + &CompactionCheckpointTracePayload { + input_history: &[], + replacement_history: &[], + }, + ); + + assert_eq!(fs::read_dir(temp.path())?.count(), 0); + + Ok(()) +} + fn minimal_metadata(thread_id: ThreadId) -> ThreadStartedTraceMetadata { ThreadStartedTraceMetadata { thread_id: thread_id.to_string(), From 899bb9939848c060efcdcfb39d5c80455ea33fb9 Mon Sep 17 00:00:00 2001 From: Albin Cassirer Date: Tue, 21 Apr 2026 15:39:51 -0700 Subject: [PATCH 4/6] Clean up rollout trace recorder call sites --- codex-rs/core/src/compact_remote.rs | 32 +++---- codex-rs/core/src/session/turn.rs | 8 +- codex-rs/rollout-trace/src/compaction.rs | 46 ++++++++++ codex-rs/rollout-trace/src/lib.rs | 4 +- codex-rs/rollout-trace/src/recorder.rs | 97 ++++---------------- codex-rs/rollout-trace/src/recorder_tests.rs | 32 +++---- 6 files changed, 95 insertions(+), 124 deletions(-) diff --git a/codex-rs/core/src/compact_remote.rs b/codex-rs/core/src/compact_remote.rs index 105c81ddfe7d..77c8834a8581 100644 --- a/codex-rs/core/src/compact_remote.rs +++ b/codex-rs/core/src/compact_remote.rs @@ -118,7 +118,13 @@ async fn run_remote_compact_task_inner_impl( let context_compaction_item = ContextCompactionItem::new(); // Use the UI compaction item ID as the trace compaction ID so protocol lifecycle events, // endpoint attempts, and the installed history checkpoint all have one join key. - let compaction_id = context_compaction_item.id.clone(); + let compaction_trace = sess.services.rollout_trace.compaction_trace_context( + sess.conversation_id, + turn_context.sub_id.as_str(), + context_compaction_item.id.as_str(), + turn_context.model_info.slug.as_str(), + turn_context.provider.info().name.as_str(), + ); let compaction_item = TurnItem::ContextCompaction(context_compaction_item); sess.emit_turn_item_started(turn_context, &compaction_item) .await; @@ -166,17 +172,6 @@ async fn run_remote_compact_task_inner_impl( personality: turn_context.personality, output_schema: None, }; - // Remote compaction is the only compaction shape rollout tracing supports. The trace context - // records the exact `/responses/compact` request and response; normal sampling requests remain - // traced through the inference path. - let compaction_trace = sess.services.rollout_trace.compaction_trace_context( - sess.conversation_id.to_string(), - turn_context.sub_id.clone(), - compaction_id.clone(), - turn_context.model_info.slug.clone(), - turn_context.provider.info().name.clone(), - ); - let mut new_history = sess .services .model_client @@ -223,15 +218,10 @@ async fn run_remote_compact_task_inner_impl( // Install is the semantic boundary where the compact endpoint's output becomes live // thread history. Keep it distinct from the later inference request so the reducer can // still represent repeated developer/context prefix items exactly as the model saw them. - sess.services.rollout_trace.record_compaction_installed( - sess.conversation_id.to_string(), - turn_context.sub_id.clone(), - compaction_id, - &CompactionCheckpointTracePayload { - input_history: &trace_input_history, - replacement_history: &new_history, - }, - ); + compaction_trace.record_installed(&CompactionCheckpointTracePayload { + input_history: &trace_input_history, + replacement_history: &new_history, + }); sess.replace_compacted_history(new_history, reference_context_item, compacted_item) .await; sess.recompute_token_usage(turn_context).await; diff --git a/codex-rs/core/src/session/turn.rs b/codex-rs/core/src/session/turn.rs index 49447a860470..67a40f01af0d 100644 --- a/codex-rs/core/src/session/turn.rs +++ b/codex-rs/core/src/session/turn.rs @@ -1875,10 +1875,10 @@ async fn try_run_sampling_request( features = sess.features.enabled_features(), ); let inference_trace = sess.services.rollout_trace.inference_trace_context( - sess.conversation_id.to_string(), - turn_context.sub_id.clone(), - turn_context.model_info.slug.clone(), - turn_context.provider.info().name.clone(), + sess.conversation_id, + turn_context.sub_id.as_str(), + turn_context.model_info.slug.as_str(), + turn_context.provider.info().name.as_str(), ); let mut stream = client_session .stream_with_trace( diff --git a/codex-rs/rollout-trace/src/compaction.rs b/codex-rs/rollout-trace/src/compaction.rs index 6464f5781fef..8bc2eae7677e 100644 --- a/codex-rs/rollout-trace/src/compaction.rs +++ b/codex-rs/rollout-trace/src/compaction.rs @@ -13,6 +13,7 @@ use std::sync::atomic::Ordering; use codex_protocol::models::ResponseItem; use serde::Serialize; use serde_json::Value as JsonValue; +use tracing::warn; use crate::inference::trace_response_item_json; use crate::model::AgentThreadId; @@ -74,6 +75,17 @@ struct TracedCompactionCompleted { output_items: Vec, } +/// History replacement checkpoint persisted when compaction installs new live history. +/// +/// The checkpoint keeps compaction separate from ordinary sampling snapshots: +/// `input_history` is the live thread history selected for compaction, while +/// `replacement_history` is what future prompts may carry after the checkpoint. +#[derive(Serialize)] +pub struct CompactionCheckpointTracePayload<'a> { + pub input_history: &'a [ResponseItem], + pub replacement_history: &'a [ResponseItem], +} + impl CompactionTraceContext { /// Builds a context that accepts trace calls and records nothing. pub fn disabled() -> Self { @@ -118,6 +130,40 @@ impl CompactionTraceContext { attempt.record_started(request); attempt } + + /// Records the point where compacted history becomes the live thread history. + /// + /// The checkpoint belongs to the same semantic compaction lifecycle as the + /// compact endpoint attempts, so the context reuses its stable compaction ID. + pub fn record_installed(&self, checkpoint: &CompactionCheckpointTracePayload<'_>) { + let CompactionTraceContextState::Enabled(context) = &self.state else { + return; + }; + let checkpoint_payload = match context + .writer + .write_json_payload(RawPayloadKind::CompactionCheckpoint, checkpoint) + { + Ok(payload_ref) => payload_ref, + Err(err) => { + warn!("failed to write rollout trace payload: {err:#}"); + return; + } + }; + + let event_context = RawTraceEventContext { + thread_id: Some(context.thread_id.clone()), + codex_turn_id: Some(context.codex_turn_id.clone()), + }; + if let Err(err) = context.writer.append_with_context( + event_context, + RawTraceEventPayload::CompactionInstalled { + compaction_id: context.compaction_id.clone(), + checkpoint_payload, + }, + ) { + warn!("failed to append rollout trace event: {err:#}"); + } + } } impl CompactionTraceAttempt { diff --git a/codex-rs/rollout-trace/src/lib.rs b/codex-rs/rollout-trace/src/lib.rs index b03c6ca3447e..0ca522c9a204 100644 --- a/codex-rs/rollout-trace/src/lib.rs +++ b/codex-rs/rollout-trace/src/lib.rs @@ -18,6 +18,8 @@ mod writer; /// Conventional reduced-state cache name written next to a raw trace bundle. pub use bundle::REDUCED_STATE_FILE_NAME; +/// Raw checkpoint payload for a remote compaction install event. +pub use compaction::CompactionCheckpointTracePayload; /// No-op-capable handle for recording remote-compaction requests. pub use compaction::CompactionTraceAttempt; /// Shared recorder context for a compaction checkpoint. @@ -46,8 +48,6 @@ pub use raw_event::RawTraceEventContext; pub use raw_event::RawTraceEventPayload; /// Environment variable that enables local trace-bundle recording. pub use recorder::CODEX_ROLLOUT_TRACE_ROOT_ENV; -/// Raw checkpoint payload for a remote compaction install event. -pub use recorder::CompactionCheckpointTracePayload; /// Best-effort hot-path recorder for one rollout trace bundle. pub use recorder::RolloutTraceRecorder; /// Raw metadata captured when a thread starts. diff --git a/codex-rs/rollout-trace/src/recorder.rs b/codex-rs/rollout-trace/src/recorder.rs index faceb5ce5c4e..d17951fda59b 100644 --- a/codex-rs/rollout-trace/src/recorder.rs +++ b/codex-rs/rollout-trace/src/recorder.rs @@ -5,7 +5,6 @@ use std::path::PathBuf; use std::sync::Arc; use codex_protocol::ThreadId; -use codex_protocol::models::ResponseItem; use codex_protocol::protocol::SessionSource; use serde::Serialize; use tracing::debug; @@ -13,11 +12,12 @@ use tracing::warn; use uuid::Uuid; use crate::AgentThreadId; +use crate::CodexTurnId; +use crate::CompactionId; use crate::CompactionTraceContext; use crate::InferenceTraceContext; use crate::RawPayloadKind; use crate::RawPayloadRef; -use crate::RawTraceEventContext; use crate::RawTraceEventPayload; use crate::TraceWriter; @@ -70,17 +70,6 @@ pub struct ThreadStartedTraceMetadata { pub sandbox_policy: String, } -/// History replacement checkpoint persisted when compaction installs new live history. -/// -/// The checkpoint keeps compaction separate from ordinary sampling snapshots: -/// `input_history` is the live thread history selected for compaction, while -/// `replacement_history` is what future prompts may carry after the checkpoint. -#[derive(Serialize)] -pub struct CompactionCheckpointTracePayload<'a> { - pub input_history: &'a [ResponseItem], - pub replacement_history: &'a [ResponseItem], -} - impl RolloutTraceRecorder { /// Builds a recorder handle that accepts trace calls and records nothing. pub fn disabled() -> Self { @@ -165,10 +154,10 @@ impl RolloutTraceRecorder { /// only after it has built the concrete request payload for that attempt. pub fn inference_trace_context( &self, - thread_id: AgentThreadId, - codex_turn_id: String, - model: String, - provider_name: String, + thread_id: impl Into, + codex_turn_id: impl Into, + model: impl Into, + provider_name: impl Into, ) -> InferenceTraceContext { let RolloutTraceRecorderState::Enabled(recorder) = &self.state else { return InferenceTraceContext::disabled(); @@ -176,10 +165,10 @@ impl RolloutTraceRecorder { InferenceTraceContext::enabled( Arc::clone(&recorder.writer), - thread_id, - codex_turn_id, - model, - provider_name, + thread_id.into(), + codex_turn_id.into(), + model.into(), + provider_name.into(), ) } @@ -191,11 +180,11 @@ impl RolloutTraceRecorder { /// replacement history is installed. pub fn compaction_trace_context( &self, - thread_id: AgentThreadId, - codex_turn_id: String, - compaction_id: String, - model: String, - provider_name: String, + thread_id: impl Into, + codex_turn_id: impl Into, + compaction_id: impl Into, + model: impl Into, + provider_name: impl Into, ) -> CompactionTraceContext { let RolloutTraceRecorderState::Enabled(recorder) = &self.state else { return CompactionTraceContext::disabled(); @@ -203,44 +192,13 @@ impl RolloutTraceRecorder { CompactionTraceContext::enabled( Arc::clone(&recorder.writer), - thread_id, - codex_turn_id, - compaction_id, - model, - provider_name, + thread_id.into(), + codex_turn_id.into(), + compaction_id.into(), + model.into(), + provider_name.into(), ) } - - /// Emits the checkpoint where remote-compacted history replaces live thread history. - /// - /// This checkpoint is deliberately separate from the compact endpoint response: Codex filters - /// and reinjects context before replacement history becomes live. The reducer uses this event - /// to connect the pre-compaction history to the processed replacement items without treating - /// repeated developer/context prefix items as part of the replacement itself. - pub fn record_compaction_installed( - &self, - thread_id: AgentThreadId, - codex_turn_id: String, - compaction_id: String, - checkpoint: &CompactionCheckpointTracePayload<'_>, - ) { - let RolloutTraceRecorderState::Enabled(recorder) = &self.state else { - return; - }; - let Some(checkpoint_payload) = recorder - .write_json_payload_best_effort(RawPayloadKind::CompactionCheckpoint, checkpoint) - else { - return; - }; - recorder.append_with_context_best_effort( - thread_id, - codex_turn_id, - RawTraceEventPayload::CompactionInstalled { - compaction_id, - checkpoint_payload, - }, - ); - } } impl EnabledRolloutTraceRecorder { @@ -263,21 +221,6 @@ impl EnabledRolloutTraceRecorder { warn!("failed to append rollout trace event: {err:#}"); } } - - fn append_with_context_best_effort( - &self, - thread_id: AgentThreadId, - codex_turn_id: String, - payload: RawTraceEventPayload, - ) { - let context = RawTraceEventContext { - thread_id: Some(thread_id), - codex_turn_id: Some(codex_turn_id), - }; - if let Err(err) = self.writer.append_with_context(context, payload) { - warn!("failed to append rollout trace event: {err:#}"); - } - } } #[cfg(test)] diff --git a/codex-rs/rollout-trace/src/recorder_tests.rs b/codex-rs/rollout-trace/src/recorder_tests.rs index d5c18efba028..2bd05c77479c 100644 --- a/codex-rs/rollout-trace/src/recorder_tests.rs +++ b/codex-rs/rollout-trace/src/recorder_tests.rs @@ -10,6 +10,7 @@ use codex_protocol::protocol::SubAgentSource; use tempfile::TempDir; use super::*; +use crate::CompactionCheckpointTracePayload; use crate::RolloutStatus; use crate::replay_bundle; @@ -105,12 +106,8 @@ fn disabled_recorder_accepts_trace_calls_without_writing() -> anyhow::Result<()> recorder.record_thread_started(minimal_metadata(thread_id)); - let inference_trace = recorder.inference_trace_context( - thread_id.to_string(), - "turn-1".to_string(), - "gpt-test".to_string(), - "test-provider".to_string(), - ); + let inference_trace = + recorder.inference_trace_context(thread_id, "turn-1", "gpt-test", "test-provider"); let inference_attempt = inference_trace.start_attempt(); inference_attempt.record_started(&serde_json::json!({ "kind": "inference" })); let token_usage: Option = None; @@ -118,25 +115,20 @@ fn disabled_recorder_accepts_trace_calls_without_writing() -> anyhow::Result<()> inference_attempt.record_failed("inference failed"); let compaction_trace = recorder.compaction_trace_context( - thread_id.to_string(), - "turn-1".to_string(), - "compaction-1".to_string(), - "gpt-test".to_string(), - "test-provider".to_string(), + thread_id, + "turn-1", + "compaction-1", + "gpt-test", + "test-provider", ); let compaction_attempt = compaction_trace.start_attempt(&serde_json::json!({ "kind": "compaction" })); compaction_attempt.record_completed(&[]); compaction_attempt.record_failed("compaction failed"); - recorder.record_compaction_installed( - thread_id.to_string(), - "turn-1".to_string(), - "compaction-1".to_string(), - &CompactionCheckpointTracePayload { - input_history: &[], - replacement_history: &[], - }, - ); + compaction_trace.record_installed(&CompactionCheckpointTracePayload { + input_history: &[], + replacement_history: &[], + }); assert_eq!(fs::read_dir(temp.path())?.count(), 0); From 17cdf07c0370bd86b1ae5e77a23d8102ed525f4a Mon Sep 17 00:00:00 2001 From: Albin Cassirer Date: Tue, 21 Apr 2026 16:14:20 -0700 Subject: [PATCH 5/6] Simplify model streaming trace context --- codex-rs/core/src/client.rs | 41 ++++--------------- codex-rs/core/src/compact.rs | 4 ++ codex-rs/core/src/memories/phase1.rs | 2 + codex-rs/core/src/session/turn.rs | 2 +- codex-rs/core/tests/responses_headers.rs | 3 ++ codex-rs/core/tests/suite/client.rs | 2 + .../core/tests/suite/client_websockets.rs | 7 ++++ 7 files changed, 26 insertions(+), 35 deletions(-) diff --git a/codex-rs/core/src/client.rs b/codex-rs/core/src/client.rs index 7edd8da5490d..57e499848f96 100644 --- a/codex-rs/core/src/client.rs +++ b/codex-rs/core/src/client.rs @@ -1473,11 +1473,14 @@ impl ModelClientSession { } #[allow(clippy::too_many_arguments)] - /// Streams a single model request without rollout tracing. + /// Streams a single model request within the current turn. /// - /// This is the public client API. It routes through the same transport code - /// as traced Codex turns, but supplies a disabled trace context so tracing - /// does not leak into callers that only need model streaming. + /// The caller is responsible for passing per-turn settings explicitly (model selection, + /// reasoning settings, telemetry context, and turn metadata). This method will prefer the + /// Responses WebSocket transport when the provider supports it and it remains healthy, and will + /// fall back to the HTTP Responses API transport otherwise. The trace context may be enabled or + /// disabled, but is always explicit so transport paths do not need separate trace/no-trace + /// branches. pub async fn stream( &mut self, prompt: &Prompt, @@ -1487,36 +1490,6 @@ impl ModelClientSession { summary: ReasoningSummaryConfig, service_tier: Option, turn_metadata_header: Option<&str>, - ) -> Result { - let disabled_trace = InferenceTraceContext::disabled(); - self.stream_with_trace( - prompt, - model_info, - session_telemetry, - effort, - summary, - service_tier, - turn_metadata_header, - &disabled_trace, - ) - .await - } - - #[allow(clippy::too_many_arguments)] - /// Streams a model request with an explicit rollout trace context. - /// - /// The context may be enabled or disabled. Transport code records against it - /// unconditionally so HTTP, WebSocket, retry, and fallback paths do not need - /// separate trace/no-trace branches. - pub(crate) async fn stream_with_trace( - &mut self, - prompt: &Prompt, - model_info: &ModelInfo, - session_telemetry: &SessionTelemetry, - effort: Option, - summary: ReasoningSummaryConfig, - service_tier: Option, - turn_metadata_header: Option<&str>, inference_trace: &InferenceTraceContext, ) -> Result { let wire_api = self.client.state.provider.info().wire_api; diff --git a/codex-rs/core/src/compact.rs b/codex-rs/core/src/compact.rs index c997e92bcfde..4ae9e9fcdc2a 100644 --- a/codex-rs/core/src/compact.rs +++ b/codex-rs/core/src/compact.rs @@ -31,6 +31,7 @@ use codex_protocol::protocol::EventMsg; use codex_protocol::protocol::TurnStartedEvent; use codex_protocol::protocol::WarningEvent; use codex_protocol::user_input::UserInput; +use codex_rollout_trace::InferenceTraceContext; use codex_utils_output_truncation::TruncationPolicy; use codex_utils_output_truncation::approx_token_count; use codex_utils_output_truncation::truncate_text; @@ -546,6 +547,9 @@ async fn drain_to_completed( turn_context.reasoning_summary, turn_context.config.service_tier, turn_metadata_header, + // Rollout tracing currently models remote compaction only; local compaction streams + // are left untraced until the reducer has a first-class local compaction lifecycle. + &InferenceTraceContext::disabled(), ) .await?; loop { diff --git a/codex-rs/core/src/memories/phase1.rs b/codex-rs/core/src/memories/phase1.rs index 4f19caa5c5d0..f3e680265cd0 100644 --- a/codex-rs/core/src/memories/phase1.rs +++ b/codex-rs/core/src/memories/phase1.rs @@ -23,6 +23,7 @@ use codex_protocol::openai_models::ModelInfo; use codex_protocol::openai_models::ReasoningEffort as ReasoningEffortConfig; use codex_protocol::protocol::RolloutItem; use codex_protocol::protocol::TokenUsage; +use codex_rollout_trace::InferenceTraceContext; use codex_secrets::redact_secrets; use futures::StreamExt; use serde::Deserialize; @@ -353,6 +354,7 @@ mod job { stage_one_context.reasoning_summary, stage_one_context.service_tier, stage_one_context.turn_metadata_header.as_deref(), + &InferenceTraceContext::disabled(), ) .await?; diff --git a/codex-rs/core/src/session/turn.rs b/codex-rs/core/src/session/turn.rs index 67a40f01af0d..ad6bed544d51 100644 --- a/codex-rs/core/src/session/turn.rs +++ b/codex-rs/core/src/session/turn.rs @@ -1881,7 +1881,7 @@ async fn try_run_sampling_request( turn_context.provider.info().name.as_str(), ); let mut stream = client_session - .stream_with_trace( + .stream( prompt, &turn_context.model_info, &turn_context.session_telemetry, diff --git a/codex-rs/core/tests/responses_headers.rs b/codex-rs/core/tests/responses_headers.rs index 85b23af31ff2..2cdcaf448c44 100644 --- a/codex-rs/core/tests/responses_headers.rs +++ b/codex-rs/core/tests/responses_headers.rs @@ -131,6 +131,7 @@ async fn responses_stream_includes_subagent_header_on_review() { summary.unwrap_or(model_info.default_reasoning_summary), /*service_tier*/ None, /*turn_metadata_header*/ None, + &codex_rollout_trace::InferenceTraceContext::disabled(), ) .await .expect("stream failed"); @@ -257,6 +258,7 @@ async fn responses_stream_includes_subagent_header_on_other() { summary.unwrap_or(model_info.default_reasoning_summary), /*service_tier*/ None, /*turn_metadata_header*/ None, + &codex_rollout_trace::InferenceTraceContext::disabled(), ) .await .expect("stream failed"); @@ -372,6 +374,7 @@ async fn responses_respects_model_info_overrides_from_config() { summary.unwrap_or(model_info.default_reasoning_summary), /*service_tier*/ None, /*turn_metadata_header*/ None, + &codex_rollout_trace::InferenceTraceContext::disabled(), ) .await .expect("stream failed"); diff --git a/codex-rs/core/tests/suite/client.rs b/codex-rs/core/tests/suite/client.rs index 2086367b21e7..a38a9e6289ce 100644 --- a/codex-rs/core/tests/suite/client.rs +++ b/codex-rs/core/tests/suite/client.rs @@ -912,6 +912,7 @@ async fn send_provider_auth_request(server: &MockServer, auth: ModelProviderAuth summary.unwrap_or(ReasoningSummary::Auto), /*service_tier*/ None, /*turn_metadata_header*/ None, + &codex_rollout_trace::InferenceTraceContext::disabled(), ) .await .expect("responses stream to start"); @@ -2259,6 +2260,7 @@ async fn azure_responses_request_includes_store_and_reasoning_ids() { summary.unwrap_or(ReasoningSummary::Auto), /*service_tier*/ None, /*turn_metadata_header*/ None, + &codex_rollout_trace::InferenceTraceContext::disabled(), ) .await .expect("responses stream to start"); diff --git a/codex-rs/core/tests/suite/client_websockets.rs b/codex-rs/core/tests/suite/client_websockets.rs index f6cfd0d913e1..f1d273aa0ccc 100755 --- a/codex-rs/core/tests/suite/client_websockets.rs +++ b/codex-rs/core/tests/suite/client_websockets.rs @@ -390,6 +390,7 @@ async fn responses_websocket_preconnect_is_reused_even_with_header_changes() { harness.summary, /*service_tier*/ None, /*turn_metadata_header*/ None, + &codex_rollout_trace::InferenceTraceContext::disabled(), ) .await .expect("websocket stream failed"); @@ -440,6 +441,7 @@ async fn responses_websocket_request_prewarm_is_reused_even_with_header_changes( harness.summary, /*service_tier*/ None, /*turn_metadata_header*/ None, + &codex_rollout_trace::InferenceTraceContext::disabled(), ) .await .expect("websocket stream failed"); @@ -842,6 +844,7 @@ async fn responses_websocket_emits_reasoning_included_event() { harness.summary, /*service_tier*/ None, /*turn_metadata_header*/ None, + &codex_rollout_trace::InferenceTraceContext::disabled(), ) .await .expect("websocket stream failed"); @@ -915,6 +918,7 @@ async fn responses_websocket_emits_rate_limit_events() { harness.summary, /*service_tier*/ None, /*turn_metadata_header*/ None, + &codex_rollout_trace::InferenceTraceContext::disabled(), ) .await .expect("websocket stream failed"); @@ -1551,6 +1555,7 @@ async fn responses_websocket_v2_after_error_uses_full_create_without_previous_re harness.summary, /*service_tier*/ None, /*turn_metadata_header*/ None, + &codex_rollout_trace::InferenceTraceContext::disabled(), ) .await .expect("websocket stream failed"); @@ -1638,6 +1643,7 @@ async fn responses_websocket_v2_surfaces_terminal_error_without_close_handshake( harness.summary, /*service_tier*/ None, /*turn_metadata_header*/ None, + &codex_rollout_trace::InferenceTraceContext::disabled(), ) .await .expect("websocket stream failed"); @@ -1901,6 +1907,7 @@ async fn stream_until_complete_with_request_metadata( harness.summary, service_tier, turn_metadata_header, + &codex_rollout_trace::InferenceTraceContext::disabled(), ) .await .expect("websocket stream failed"); From ba1c59e9cfc693a6660a076d9be978a3fa722965 Mon Sep 17 00:00:00 2001 From: Albin Cassirer Date: Wed, 22 Apr 2026 09:36:50 -0700 Subject: [PATCH 6/6] Small refactors to address comments. --- codex-rs/core/src/client.rs | 39 ++++++++++-------------- codex-rs/rollout-trace/src/compaction.rs | 8 +++++ 2 files changed, 24 insertions(+), 23 deletions(-) diff --git a/codex-rs/core/src/client.rs b/codex-rs/core/src/client.rs index 57e499848f96..1ff16a97aaca 100644 --- a/codex-rs/core/src/client.rs +++ b/codex-rs/core/src/client.rs @@ -470,17 +470,12 @@ impl ModelClient { self.state.conversation_id.to_string(), ))); let trace_attempt = compaction_trace.start_attempt(&payload); - match client.compact_input(&payload, extra_headers).await { - Ok(output) => { - trace_attempt.record_completed(&output); - Ok(output) - } - Err(err) => { - let err = map_api_error(err); - trace_attempt.record_failed(&err); - Err(err) - } - } + let result = client + .compact_input(&payload, extra_headers) + .await + .map_err(map_api_error); + trace_attempt.record_result(result.as_deref()); + result } pub(crate) async fn create_realtime_call_with_headers( @@ -1358,22 +1353,20 @@ impl ModelClientSession { inference_trace.start_attempt() }; inference_trace_attempt.record_started(&ws_request); - let stream_result = self.websocket_session.connection.as_ref().ok_or_else(|| { - map_api_error(ApiError::Stream( - "websocket connection is unavailable".to_string(), - )) - })?; - let stream_result = match stream_result + let websocket_connection = + self.websocket_session.connection.as_ref().ok_or_else(|| { + map_api_error(ApiError::Stream( + "websocket connection is unavailable".to_string(), + )) + })?; + let stream_result = websocket_connection .stream_request(ws_request, self.websocket_session.connection_reused()) .await - { - Ok(stream_result) => stream_result, - Err(err) => { + .map_err(|err| { let err = map_api_error(err); inference_trace_attempt.record_failed(&err); - return Err(err); - } - }; + err + })?; let (stream, last_request_rx) = map_response_stream( stream_result, session_telemetry.clone(), diff --git a/codex-rs/rollout-trace/src/compaction.rs b/codex-rs/rollout-trace/src/compaction.rs index 8bc2eae7677e..01608974ac6c 100644 --- a/codex-rs/rollout-trace/src/compaction.rs +++ b/codex-rs/rollout-trace/src/compaction.rs @@ -230,6 +230,14 @@ impl CompactionTraceAttempt { ); } + /// Records the compact endpoint result without forcing callers to branch on trace events. + pub fn record_result(&self, result: Result<&[ResponseItem], E>) { + match result { + Ok(output_items) => self.record_completed(output_items), + Err(err) => self.record_failed(err), + } + } + /// Records pre-response failures from the compact endpoint. pub fn record_failed(&self, error: impl Display) { let CompactionTraceAttemptState::Enabled(attempt) = &self.state else {