diff --git a/codex-rs/Cargo.lock b/codex-rs/Cargo.lock index de908f05fce3..4716ac790573 100644 --- a/codex-rs/Cargo.lock +++ b/codex-rs/Cargo.lock @@ -1961,6 +1961,7 @@ dependencies = [ "codex-response-debug-context", "codex-rmcp-client", "codex-rollout", + "codex-rollout-trace", "codex-sandboxing", "codex-secrets", "codex-shell-command", @@ -2798,6 +2799,8 @@ dependencies = [ "serde", "serde_json", "tempfile", + "tracing", + "uuid", ] [[package]] diff --git a/codex-rs/Cargo.toml b/codex-rs/Cargo.toml index d70af322a162..3b28d3dcb6a2 100644 --- a/codex-rs/Cargo.toml +++ b/codex-rs/Cargo.toml @@ -165,6 +165,7 @@ codex-responses-api-proxy = { path = "responses-api-proxy" } codex-response-debug-context = { path = "response-debug-context" } codex-rmcp-client = { path = "rmcp-client" } codex-rollout = { path = "rollout" } +codex-rollout-trace = { path = "rollout-trace" } codex-sandboxing = { path = "sandboxing" } codex-secrets = { path = "secrets" } codex-shell-command = { path = "shell-command" } diff --git a/codex-rs/core/Cargo.toml b/codex-rs/core/Cargo.toml index 86d6f8b0f3ec..dccd1f9cf000 100644 --- a/codex-rs/core/Cargo.toml +++ b/codex-rs/core/Cargo.toml @@ -53,6 +53,7 @@ codex-model-provider = { workspace = true } codex-protocol = { workspace = true } codex-response-debug-context = { workspace = true } codex-rollout = { workspace = true } +codex-rollout-trace = { workspace = true } codex-rmcp-client = { workspace = true } codex-sandboxing = { workspace = true } codex-state = { workspace = true } diff --git a/codex-rs/core/src/client.rs b/codex-rs/core/src/client.rs index 601b8106df8f..1ff16a97aaca 100644 --- a/codex-rs/core/src/client.rs +++ b/codex-rs/core/src/client.rs @@ -80,6 +80,9 @@ use codex_protocol::openai_models::ReasoningEffort as ReasoningEffortConfig; use codex_protocol::protocol::SessionSource; use codex_protocol::protocol::SubAgentSource; use codex_protocol::protocol::W3cTraceContext; +use codex_rollout_trace::CompactionTraceContext; +use codex_rollout_trace::InferenceTraceAttempt; +use codex_rollout_trace::InferenceTraceContext; use codex_tools::create_tools_json_for_responses_api; use eventsource_stream::Event; use eventsource_stream::EventStreamError; @@ -411,6 +414,7 @@ impl ModelClient { effort: Option, summary: ReasoningSummaryConfig, session_telemetry: &SessionTelemetry, + compaction_trace: &CompactionTraceContext, ) -> Result> { if prompt.input.is_empty() { return Ok(Vec::new()); @@ -465,10 +469,13 @@ impl ModelClient { extra_headers.extend(build_conversation_headers(Some( self.state.conversation_id.to_string(), ))); - client + let trace_attempt = compaction_trace.start_attempt(&payload); + let result = client .compact_input(&payload, extra_headers) .await - .map_err(map_api_error) + .map_err(map_api_error); + trace_attempt.record_result(result.as_deref()); + result } pub(crate) async fn create_realtime_call_with_headers( @@ -1148,6 +1155,7 @@ impl ModelClientSession { summary: ReasoningSummaryConfig, service_tier: Option, turn_metadata_header: Option<&str>, + inference_trace: &InferenceTraceContext, ) -> Result { if let Some(path) = &*CODEX_RS_SSE_FIXTURE { warn!(path, "Streaming from fixture"); @@ -1156,7 +1164,11 @@ impl ModelClientSession { self.client.state.provider.info().stream_idle_timeout(), ) .map_err(map_api_error)?; - let (stream, _last_request_rx) = map_response_stream(stream, session_telemetry.clone()); + let (stream, _last_request_rx) = map_response_stream( + stream, + session_telemetry.clone(), + InferenceTraceAttempt::disabled(), + ); return Ok(stream); } @@ -1190,6 +1202,8 @@ impl ModelClientSession { summary, service_tier, )?; + let inference_trace_attempt = inference_trace.start_attempt(); + inference_trace_attempt.record_started(&request); let client = ApiResponsesClient::new( transport, client_setup.api_provider, @@ -1200,12 +1214,17 @@ impl ModelClientSession { match stream_result { Ok(stream) => { - let (stream, _) = map_response_stream(stream, session_telemetry.clone()); + let (stream, _) = map_response_stream( + stream, + session_telemetry.clone(), + inference_trace_attempt, + ); return Ok(stream); } Err(ApiError::Transport( unauthorized_transport @ TransportError::Http { status, .. }, )) if status == StatusCode::UNAUTHORIZED => { + inference_trace_attempt.record_failed(&unauthorized_transport); pending_retry = PendingUnauthorizedRetry::from_recovery( handle_unauthorized( unauthorized_transport, @@ -1216,7 +1235,11 @@ impl ModelClientSession { ); continue; } - Err(err) => return Err(map_api_error(err)), + Err(err) => { + let err = map_api_error(err); + inference_trace_attempt.record_failed(&err); + return Err(err); + } } } } @@ -1247,6 +1270,7 @@ impl ModelClientSession { turn_metadata_header: Option<&str>, warmup: bool, request_trace: Option, + inference_trace: &InferenceTraceContext, ) -> Result { let auth_manager = self.client.state.provider.auth_manager(); @@ -1321,17 +1345,33 @@ impl ModelClientSession { let ws_request = self.prepare_websocket_request(ws_payload, &request); self.websocket_session.last_request = Some(request); - let stream_result = self.websocket_session.connection.as_ref().ok_or_else(|| { - map_api_error(ApiError::Stream( - "websocket connection is unavailable".to_string(), - )) - })?; - let stream_result = stream_result + let inference_trace_attempt = if warmup { + // Prewarm sends `generate=false`; it is connection setup, not a + // model inference attempt that should appear in rollout traces. + InferenceTraceAttempt::disabled() + } else { + inference_trace.start_attempt() + }; + inference_trace_attempt.record_started(&ws_request); + let websocket_connection = + self.websocket_session.connection.as_ref().ok_or_else(|| { + map_api_error(ApiError::Stream( + "websocket connection is unavailable".to_string(), + )) + })?; + let stream_result = websocket_connection .stream_request(ws_request, self.websocket_session.connection_reused()) .await - .map_err(map_api_error)?; - let (stream, last_request_rx) = - map_response_stream(stream_result, session_telemetry.clone()); + .map_err(|err| { + let err = map_api_error(err); + inference_trace_attempt.record_failed(&err); + err + })?; + let (stream, last_request_rx) = map_response_stream( + stream_result, + session_telemetry.clone(), + inference_trace_attempt, + ); self.websocket_session.last_response_rx = Some(last_request_rx); return Ok(WebsocketStreamOutcome::Stream(stream)); } @@ -1390,6 +1430,7 @@ impl ModelClientSession { return Ok(()); } + let disabled_trace = InferenceTraceContext::disabled(); match self .stream_responses_websocket( prompt, @@ -1401,6 +1442,7 @@ impl ModelClientSession { turn_metadata_header, /*warmup*/ true, current_span_w3c_trace_context(), + &disabled_trace, ) .await { @@ -1429,7 +1471,9 @@ impl ModelClientSession { /// The caller is responsible for passing per-turn settings explicitly (model selection, /// reasoning settings, telemetry context, and turn metadata). This method will prefer the /// Responses WebSocket transport when the provider supports it and it remains healthy, and will - /// fall back to the HTTP Responses API transport otherwise. + /// fall back to the HTTP Responses API transport otherwise. The trace context may be enabled or + /// disabled, but is always explicit so transport paths do not need separate trace/no-trace + /// branches. pub async fn stream( &mut self, prompt: &Prompt, @@ -1439,6 +1483,7 @@ impl ModelClientSession { summary: ReasoningSummaryConfig, service_tier: Option, turn_metadata_header: Option<&str>, + inference_trace: &InferenceTraceContext, ) -> Result { let wire_api = self.client.state.provider.info().wire_api; match wire_api { @@ -1456,6 +1501,7 @@ impl ModelClientSession { turn_metadata_header, /*warmup*/ false, request_trace, + inference_trace, ) .await? { @@ -1474,6 +1520,7 @@ impl ModelClientSession { summary, service_tier, turn_metadata_header, + inference_trace, ) .await } @@ -1569,6 +1616,7 @@ fn parent_thread_id_header_value(session_source: &SessionSource) -> Option( api_stream: S, session_telemetry: SessionTelemetry, + inference_trace_attempt: InferenceTraceAttempt, ) -> (ResponseStream, oneshot::Receiver) where S: futures::Stream> @@ -1609,6 +1657,11 @@ where usage.total_tokens, ); } + inference_trace_attempt.record_completed( + &response_id, + &token_usage, + &items_added, + ); if let Some(sender) = tx_last_response.take() { let _ = sender.send(LastResponse { response_id: response_id.clone(), @@ -1633,6 +1686,7 @@ where } Err(err) => { let mapped = map_api_error(err); + inference_trace_attempt.record_failed(&mapped); if !logged_error { session_telemetry.see_event_completed_failed(&mapped); logged_error = true; diff --git a/codex-rs/core/src/codex_delegate.rs b/codex-rs/core/src/codex_delegate.rs index 7247c601f46e..8871228d7ea3 100644 --- a/codex-rs/core/src/codex_delegate.rs +++ b/codex-rs/core/src/codex_delegate.rs @@ -94,6 +94,7 @@ pub(crate) async fn run_codex_thread_interactive( inherited_shell_snapshot: None, user_shell_override: None, inherited_exec_policy: Some(Arc::clone(&parent_session.services.exec_policy)), + inherited_rollout_trace: codex_rollout_trace::RolloutTraceRecorder::disabled(), parent_trace: None, analytics_events_client: Some(parent_session.services.analytics_events_client.clone()), })) diff --git a/codex-rs/core/src/compact.rs b/codex-rs/core/src/compact.rs index c997e92bcfde..4ae9e9fcdc2a 100644 --- a/codex-rs/core/src/compact.rs +++ b/codex-rs/core/src/compact.rs @@ -31,6 +31,7 @@ use codex_protocol::protocol::EventMsg; use codex_protocol::protocol::TurnStartedEvent; use codex_protocol::protocol::WarningEvent; use codex_protocol::user_input::UserInput; +use codex_rollout_trace::InferenceTraceContext; use codex_utils_output_truncation::TruncationPolicy; use codex_utils_output_truncation::approx_token_count; use codex_utils_output_truncation::truncate_text; @@ -546,6 +547,9 @@ async fn drain_to_completed( turn_context.reasoning_summary, turn_context.config.service_tier, turn_metadata_header, + // Rollout tracing currently models remote compaction only; local compaction streams + // are left untraced until the reducer has a first-class local compaction lifecycle. + &InferenceTraceContext::disabled(), ) .await?; loop { diff --git a/codex-rs/core/src/compact_remote.rs b/codex-rs/core/src/compact_remote.rs index e56812eca305..77c8834a8581 100644 --- a/codex-rs/core/src/compact_remote.rs +++ b/codex-rs/core/src/compact_remote.rs @@ -26,6 +26,7 @@ use codex_protocol::models::ResponseItem; use codex_protocol::protocol::CompactedItem; use codex_protocol::protocol::EventMsg; use codex_protocol::protocol::TurnStartedEvent; +use codex_rollout_trace::CompactionCheckpointTracePayload; use futures::TryFutureExt; use tokio_util::sync::CancellationToken; use tracing::error; @@ -114,7 +115,17 @@ async fn run_remote_compact_task_inner_impl( turn_context: &Arc, initial_context_injection: InitialContextInjection, ) -> CodexResult<()> { - let compaction_item = TurnItem::ContextCompaction(ContextCompactionItem::new()); + let context_compaction_item = ContextCompactionItem::new(); + // Use the UI compaction item ID as the trace compaction ID so protocol lifecycle events, + // endpoint attempts, and the installed history checkpoint all have one join key. + let compaction_trace = sess.services.rollout_trace.compaction_trace_context( + sess.conversation_id, + turn_context.sub_id.as_str(), + context_compaction_item.id.as_str(), + turn_context.model_info.slug.as_str(), + turn_context.provider.info().name.as_str(), + ); + let compaction_item = TurnItem::ContextCompaction(context_compaction_item); sess.emit_turn_item_started(turn_context, &compaction_item) .await; let mut history = sess.clone_history().await; @@ -131,6 +142,10 @@ async fn run_remote_compact_task_inner_impl( "trimmed history items before remote compaction" ); } + // This is the history selected for remote compaction, after any trimming required to fit the + // compact endpoint. The checkpoint below records it separately from the next sampling request, + // whose prompt will repeat current developer/context prefix items. + let trace_input_history = history.raw_items().to_vec(); // Required to keep `/undo` available after compaction let ghost_snapshots: Vec = history .raw_items() @@ -157,7 +172,6 @@ async fn run_remote_compact_task_inner_impl( personality: turn_context.personality, output_schema: None, }; - let mut new_history = sess .services .model_client @@ -167,6 +181,7 @@ async fn run_remote_compact_task_inner_impl( turn_context.reasoning_effort, turn_context.reasoning_summary, &turn_context.session_telemetry, + &compaction_trace, ) .or_else(|err| async { let total_usage_breakdown = sess.get_total_token_usage_breakdown().await; @@ -200,6 +215,13 @@ async fn run_remote_compact_task_inner_impl( message: String::new(), replacement_history: Some(new_history.clone()), }; + // Install is the semantic boundary where the compact endpoint's output becomes live + // thread history. Keep it distinct from the later inference request so the reducer can + // still represent repeated developer/context prefix items exactly as the model saw them. + compaction_trace.record_installed(&CompactionCheckpointTracePayload { + input_history: &trace_input_history, + replacement_history: &new_history, + }); sess.replace_compacted_history(new_history, reference_context_item, compacted_item) .await; sess.recompute_token_usage(turn_context).await; diff --git a/codex-rs/core/src/memories/phase1.rs b/codex-rs/core/src/memories/phase1.rs index 4f19caa5c5d0..f3e680265cd0 100644 --- a/codex-rs/core/src/memories/phase1.rs +++ b/codex-rs/core/src/memories/phase1.rs @@ -23,6 +23,7 @@ use codex_protocol::openai_models::ModelInfo; use codex_protocol::openai_models::ReasoningEffort as ReasoningEffortConfig; use codex_protocol::protocol::RolloutItem; use codex_protocol::protocol::TokenUsage; +use codex_rollout_trace::InferenceTraceContext; use codex_secrets::redact_secrets; use futures::StreamExt; use serde::Deserialize; @@ -353,6 +354,7 @@ mod job { stage_one_context.reasoning_summary, stage_one_context.service_tier, stage_one_context.turn_metadata_header.as_deref(), + &InferenceTraceContext::disabled(), ) .await?; diff --git a/codex-rs/core/src/session/mod.rs b/codex-rs/core/src/session/mod.rs index 80084f197930..93f283ee66e0 100644 --- a/codex-rs/core/src/session/mod.rs +++ b/codex-rs/core/src/session/mod.rs @@ -120,6 +120,8 @@ use codex_protocol::request_user_input::RequestUserInputResponse; use codex_rmcp_client::ElicitationResponse; use codex_rollout::RolloutConfig; use codex_rollout::state_db; +use codex_rollout_trace::RolloutTraceRecorder; +use codex_rollout_trace::ThreadStartedTraceMetadata; use codex_sandboxing::policy_transforms::intersect_permission_profiles; use codex_shell_command::parse_command::parse_command; use codex_terminal_detection::user_agent; @@ -397,6 +399,8 @@ pub(crate) struct CodexSpawnArgs { pub(crate) metrics_service_name: Option, pub(crate) inherited_shell_snapshot: Option>, pub(crate) inherited_exec_policy: Option>, + /// Parent rollout-tree recorder, or a disabled recorder when this spawn has no parent trace. + pub(crate) inherited_rollout_trace: RolloutTraceRecorder, pub(crate) user_shell_override: Option, pub(crate) parent_trace: Option, pub(crate) analytics_events_client: Option, @@ -452,6 +456,7 @@ impl Codex { inherited_shell_snapshot, user_shell_override, inherited_exec_policy, + inherited_rollout_trace, parent_trace: _, analytics_events_client, } = args; @@ -650,6 +655,7 @@ impl Codex { agent_control, environment, analytics_events_client, + inherited_rollout_trace, ) .await .map_err(|e| { diff --git a/codex-rs/core/src/session/session.rs b/codex-rs/core/src/session/session.rs index 640e59b5b712..28760f539d1c 100644 --- a/codex-rs/core/src/session/session.rs +++ b/codex-rs/core/src/session/session.rs @@ -229,6 +229,7 @@ impl Session { agent_control: AgentControl, environment: Option>, analytics_events_client: Option, + inherited_rollout_trace: RolloutTraceRecorder, ) -> anyhow::Result> { debug!( "Configuring session: model={}; provider={:?}", @@ -365,6 +366,38 @@ impl Session { let rollout_path = rollout_recorder .as_ref() .map(|rec| rec.rollout_path().to_path_buf()); + let trace_agent_path = session_configuration + .session_source + .get_agent_path() + .unwrap_or_else(codex_protocol::AgentPath::root); + let trace_task_name = + (!trace_agent_path.is_root()).then(|| trace_agent_path.name().to_string()); + let trace_metadata = ThreadStartedTraceMetadata { + thread_id: conversation_id.to_string(), + agent_path: trace_agent_path.to_string(), + task_name: trace_task_name, + nickname: session_configuration.session_source.get_nickname(), + agent_role: session_configuration.session_source.get_agent_role(), + session_source: session_configuration.session_source.clone(), + cwd: session_configuration.cwd.to_path_buf(), + rollout_path: rollout_path.clone(), + model: session_configuration.collaboration_mode.model().to_string(), + provider_name: config.model_provider_id.clone(), + approval_policy: session_configuration.approval_policy.value().to_string(), + sandbox_policy: format!("{:?}", session_configuration.sandbox_policy.get()), + }; + let rollout_trace = if matches!( + session_configuration.session_source, + SessionSource::SubAgent(SubAgentSource::ThreadSpawn { .. }) + ) { + // Spawned child threads are part of their root rollout tree. If + // the parent had no trace recorder, do not create an orphan child + // bundle that looks like an independent rollout. + inherited_rollout_trace + } else { + RolloutTraceRecorder::create_root_or_disabled(conversation_id) + }; + rollout_trace.record_thread_started(trace_metadata); let mut post_session_configured_events = Vec::::new(); @@ -644,6 +677,7 @@ impl Session { analytics_events_client, hooks, rollout: Mutex::new(rollout_recorder), + rollout_trace, user_shell: Arc::new(default_shell), shell_snapshot_tx, show_raw_agent_reasoning: config.show_raw_agent_reasoning, diff --git a/codex-rs/core/src/session/tests.rs b/codex-rs/core/src/session/tests.rs index ee72769bda6a..d3a7fac8d659 100644 --- a/codex-rs/core/src/session/tests.rs +++ b/codex-rs/core/src/session/tests.rs @@ -2951,6 +2951,7 @@ async fn session_new_fails_when_zsh_fork_enabled_without_zsh_path() { .expect("create environment"), )), /*analytics_events_client*/ None, + RolloutTraceRecorder::disabled(), ) .await; @@ -3073,6 +3074,7 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) { ..HooksConfig::default() }), rollout: Mutex::new(None), + rollout_trace: RolloutTraceRecorder::disabled(), user_shell: Arc::new(default_user_shell()), shell_snapshot_tx: watch::channel(None).0, show_raw_agent_reasoning: config.show_raw_agent_reasoning, @@ -3268,6 +3270,7 @@ async fn make_session_with_config_and_rx( .expect("create environment"), )), /*analytics_events_client*/ None, + RolloutTraceRecorder::disabled(), ) .await?; @@ -4164,6 +4167,7 @@ pub(crate) async fn make_session_and_context_with_dynamic_tools_and_rx( ..HooksConfig::default() }), rollout: Mutex::new(None), + rollout_trace: RolloutTraceRecorder::disabled(), user_shell: Arc::new(default_user_shell()), shell_snapshot_tx: watch::channel(None).0, show_raw_agent_reasoning: config.show_raw_agent_reasoning, diff --git a/codex-rs/core/src/session/tests/guardian_tests.rs b/codex-rs/core/src/session/tests/guardian_tests.rs index 73a0de285a0f..d7b354454f1a 100644 --- a/codex-rs/core/src/session/tests/guardian_tests.rs +++ b/codex-rs/core/src/session/tests/guardian_tests.rs @@ -650,6 +650,7 @@ async fn guardian_subagent_does_not_inherit_parent_exec_policy_rules() { metrics_service_name: None, inherited_shell_snapshot: None, inherited_exec_policy: Some(Arc::new(parent_exec_policy)), + inherited_rollout_trace: RolloutTraceRecorder::disabled(), user_shell_override: None, parent_trace: None, analytics_events_client: None, diff --git a/codex-rs/core/src/session/turn.rs b/codex-rs/core/src/session/turn.rs index 98f572243a51..ad6bed544d51 100644 --- a/codex-rs/core/src/session/turn.rs +++ b/codex-rs/core/src/session/turn.rs @@ -1874,6 +1874,12 @@ async fn try_run_sampling_request( auth_mode = sess.services.auth_manager.auth_mode(), features = sess.features.enabled_features(), ); + let inference_trace = sess.services.rollout_trace.inference_trace_context( + sess.conversation_id, + turn_context.sub_id.as_str(), + turn_context.model_info.slug.as_str(), + turn_context.provider.info().name.as_str(), + ); let mut stream = client_session .stream( prompt, @@ -1883,6 +1889,7 @@ async fn try_run_sampling_request( turn_context.reasoning_summary, turn_context.config.service_tier, turn_metadata_header, + &inference_trace, ) .instrument(trace_span!("stream_request")) .or_cancel(&cancellation_token) diff --git a/codex-rs/core/src/state/service.rs b/codex-rs/core/src/state/service.rs index aae02d61bdc6..b7e2aeb3c9e2 100644 --- a/codex-rs/core/src/state/service.rs +++ b/codex-rs/core/src/state/service.rs @@ -23,6 +23,7 @@ use codex_mcp::McpConnectionManager; use codex_models_manager::manager::ModelsManager; use codex_otel::SessionTelemetry; use codex_rollout::state_db::StateDbHandle; +use codex_rollout_trace::RolloutTraceRecorder; use codex_thread_store::LocalThreadStore; use std::path::PathBuf; use tokio::sync::Mutex; @@ -41,6 +42,7 @@ pub(crate) struct SessionServices { pub(crate) analytics_events_client: AnalyticsEventsClient, pub(crate) hooks: Hooks, pub(crate) rollout: Mutex>, + pub(crate) rollout_trace: RolloutTraceRecorder, pub(crate) user_shell: Arc, pub(crate) shell_snapshot_tx: watch::Sender>>, pub(crate) show_raw_agent_reasoning: bool, diff --git a/codex-rs/core/src/thread_manager.rs b/codex-rs/core/src/thread_manager.rs index e4da99bb55a0..7b6a4730473b 100644 --- a/codex-rs/core/src/thread_manager.rs +++ b/codex-rs/core/src/thread_manager.rs @@ -957,6 +957,7 @@ impl ThreadManagerState { metrics_service_name, inherited_shell_snapshot, inherited_exec_policy, + inherited_rollout_trace: codex_rollout_trace::RolloutTraceRecorder::disabled(), user_shell_override, parent_trace, analytics_events_client: self.analytics_events_client.clone(), diff --git a/codex-rs/core/tests/responses_headers.rs b/codex-rs/core/tests/responses_headers.rs index 85b23af31ff2..2cdcaf448c44 100644 --- a/codex-rs/core/tests/responses_headers.rs +++ b/codex-rs/core/tests/responses_headers.rs @@ -131,6 +131,7 @@ async fn responses_stream_includes_subagent_header_on_review() { summary.unwrap_or(model_info.default_reasoning_summary), /*service_tier*/ None, /*turn_metadata_header*/ None, + &codex_rollout_trace::InferenceTraceContext::disabled(), ) .await .expect("stream failed"); @@ -257,6 +258,7 @@ async fn responses_stream_includes_subagent_header_on_other() { summary.unwrap_or(model_info.default_reasoning_summary), /*service_tier*/ None, /*turn_metadata_header*/ None, + &codex_rollout_trace::InferenceTraceContext::disabled(), ) .await .expect("stream failed"); @@ -372,6 +374,7 @@ async fn responses_respects_model_info_overrides_from_config() { summary.unwrap_or(model_info.default_reasoning_summary), /*service_tier*/ None, /*turn_metadata_header*/ None, + &codex_rollout_trace::InferenceTraceContext::disabled(), ) .await .expect("stream failed"); diff --git a/codex-rs/core/tests/suite/client.rs b/codex-rs/core/tests/suite/client.rs index 2086367b21e7..a38a9e6289ce 100644 --- a/codex-rs/core/tests/suite/client.rs +++ b/codex-rs/core/tests/suite/client.rs @@ -912,6 +912,7 @@ async fn send_provider_auth_request(server: &MockServer, auth: ModelProviderAuth summary.unwrap_or(ReasoningSummary::Auto), /*service_tier*/ None, /*turn_metadata_header*/ None, + &codex_rollout_trace::InferenceTraceContext::disabled(), ) .await .expect("responses stream to start"); @@ -2259,6 +2260,7 @@ async fn azure_responses_request_includes_store_and_reasoning_ids() { summary.unwrap_or(ReasoningSummary::Auto), /*service_tier*/ None, /*turn_metadata_header*/ None, + &codex_rollout_trace::InferenceTraceContext::disabled(), ) .await .expect("responses stream to start"); diff --git a/codex-rs/core/tests/suite/client_websockets.rs b/codex-rs/core/tests/suite/client_websockets.rs index f6cfd0d913e1..f1d273aa0ccc 100755 --- a/codex-rs/core/tests/suite/client_websockets.rs +++ b/codex-rs/core/tests/suite/client_websockets.rs @@ -390,6 +390,7 @@ async fn responses_websocket_preconnect_is_reused_even_with_header_changes() { harness.summary, /*service_tier*/ None, /*turn_metadata_header*/ None, + &codex_rollout_trace::InferenceTraceContext::disabled(), ) .await .expect("websocket stream failed"); @@ -440,6 +441,7 @@ async fn responses_websocket_request_prewarm_is_reused_even_with_header_changes( harness.summary, /*service_tier*/ None, /*turn_metadata_header*/ None, + &codex_rollout_trace::InferenceTraceContext::disabled(), ) .await .expect("websocket stream failed"); @@ -842,6 +844,7 @@ async fn responses_websocket_emits_reasoning_included_event() { harness.summary, /*service_tier*/ None, /*turn_metadata_header*/ None, + &codex_rollout_trace::InferenceTraceContext::disabled(), ) .await .expect("websocket stream failed"); @@ -915,6 +918,7 @@ async fn responses_websocket_emits_rate_limit_events() { harness.summary, /*service_tier*/ None, /*turn_metadata_header*/ None, + &codex_rollout_trace::InferenceTraceContext::disabled(), ) .await .expect("websocket stream failed"); @@ -1551,6 +1555,7 @@ async fn responses_websocket_v2_after_error_uses_full_create_without_previous_re harness.summary, /*service_tier*/ None, /*turn_metadata_header*/ None, + &codex_rollout_trace::InferenceTraceContext::disabled(), ) .await .expect("websocket stream failed"); @@ -1638,6 +1643,7 @@ async fn responses_websocket_v2_surfaces_terminal_error_without_close_handshake( harness.summary, /*service_tier*/ None, /*turn_metadata_header*/ None, + &codex_rollout_trace::InferenceTraceContext::disabled(), ) .await .expect("websocket stream failed"); @@ -1901,6 +1907,7 @@ async fn stream_until_complete_with_request_metadata( harness.summary, service_tier, turn_metadata_header, + &codex_rollout_trace::InferenceTraceContext::disabled(), ) .await .expect("websocket stream failed"); diff --git a/codex-rs/rollout-trace/Cargo.toml b/codex-rs/rollout-trace/Cargo.toml index e67b35953a5b..540f794fb7a2 100644 --- a/codex-rs/rollout-trace/Cargo.toml +++ b/codex-rs/rollout-trace/Cargo.toml @@ -17,6 +17,8 @@ anyhow = { workspace = true } codex-protocol = { workspace = true } serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true } +tracing = { workspace = true } +uuid = { workspace = true } [dev-dependencies] pretty_assertions = { workspace = true } diff --git a/codex-rs/rollout-trace/src/compaction.rs b/codex-rs/rollout-trace/src/compaction.rs index 6464f5781fef..01608974ac6c 100644 --- a/codex-rs/rollout-trace/src/compaction.rs +++ b/codex-rs/rollout-trace/src/compaction.rs @@ -13,6 +13,7 @@ use std::sync::atomic::Ordering; use codex_protocol::models::ResponseItem; use serde::Serialize; use serde_json::Value as JsonValue; +use tracing::warn; use crate::inference::trace_response_item_json; use crate::model::AgentThreadId; @@ -74,6 +75,17 @@ struct TracedCompactionCompleted { output_items: Vec, } +/// History replacement checkpoint persisted when compaction installs new live history. +/// +/// The checkpoint keeps compaction separate from ordinary sampling snapshots: +/// `input_history` is the live thread history selected for compaction, while +/// `replacement_history` is what future prompts may carry after the checkpoint. +#[derive(Serialize)] +pub struct CompactionCheckpointTracePayload<'a> { + pub input_history: &'a [ResponseItem], + pub replacement_history: &'a [ResponseItem], +} + impl CompactionTraceContext { /// Builds a context that accepts trace calls and records nothing. pub fn disabled() -> Self { @@ -118,6 +130,40 @@ impl CompactionTraceContext { attempt.record_started(request); attempt } + + /// Records the point where compacted history becomes the live thread history. + /// + /// The checkpoint belongs to the same semantic compaction lifecycle as the + /// compact endpoint attempts, so the context reuses its stable compaction ID. + pub fn record_installed(&self, checkpoint: &CompactionCheckpointTracePayload<'_>) { + let CompactionTraceContextState::Enabled(context) = &self.state else { + return; + }; + let checkpoint_payload = match context + .writer + .write_json_payload(RawPayloadKind::CompactionCheckpoint, checkpoint) + { + Ok(payload_ref) => payload_ref, + Err(err) => { + warn!("failed to write rollout trace payload: {err:#}"); + return; + } + }; + + let event_context = RawTraceEventContext { + thread_id: Some(context.thread_id.clone()), + codex_turn_id: Some(context.codex_turn_id.clone()), + }; + if let Err(err) = context.writer.append_with_context( + event_context, + RawTraceEventPayload::CompactionInstalled { + compaction_id: context.compaction_id.clone(), + checkpoint_payload, + }, + ) { + warn!("failed to append rollout trace event: {err:#}"); + } + } } impl CompactionTraceAttempt { @@ -184,6 +230,14 @@ impl CompactionTraceAttempt { ); } + /// Records the compact endpoint result without forcing callers to branch on trace events. + pub fn record_result(&self, result: Result<&[ResponseItem], E>) { + match result { + Ok(output_items) => self.record_completed(output_items), + Err(err) => self.record_failed(err), + } + } + /// Records pre-response failures from the compact endpoint. pub fn record_failed(&self, error: impl Display) { let CompactionTraceAttemptState::Enabled(attempt) = &self.state else { diff --git a/codex-rs/rollout-trace/src/lib.rs b/codex-rs/rollout-trace/src/lib.rs index 5c8022d6d711..0ca522c9a204 100644 --- a/codex-rs/rollout-trace/src/lib.rs +++ b/codex-rs/rollout-trace/src/lib.rs @@ -12,11 +12,14 @@ mod inference; mod model; mod payload; mod raw_event; +mod recorder; mod reducer; mod writer; /// Conventional reduced-state cache name written next to a raw trace bundle. pub use bundle::REDUCED_STATE_FILE_NAME; +/// Raw checkpoint payload for a remote compaction install event. +pub use compaction::CompactionCheckpointTracePayload; /// No-op-capable handle for recording remote-compaction requests. pub use compaction::CompactionTraceAttempt; /// Shared recorder context for a compaction checkpoint. @@ -43,6 +46,12 @@ pub use raw_event::RawTraceEvent; pub use raw_event::RawTraceEventContext; /// Typed payload for one raw trace event. pub use raw_event::RawTraceEventPayload; +/// Environment variable that enables local trace-bundle recording. +pub use recorder::CODEX_ROLLOUT_TRACE_ROOT_ENV; +/// Best-effort hot-path recorder for one rollout trace bundle. +pub use recorder::RolloutTraceRecorder; +/// Raw metadata captured when a thread starts. +pub use recorder::ThreadStartedTraceMetadata; /// Replay a raw trace bundle and write/read its reduced `RolloutTrace`. pub use reducer::replay_bundle; /// Append-only writer used by hot-path Codex instrumentation. diff --git a/codex-rs/rollout-trace/src/recorder.rs b/codex-rs/rollout-trace/src/recorder.rs new file mode 100644 index 000000000000..d17951fda59b --- /dev/null +++ b/codex-rs/rollout-trace/src/recorder.rs @@ -0,0 +1,228 @@ +//! Opt-in hot-path producer for rollout trace bundles. + +use std::path::Path; +use std::path::PathBuf; +use std::sync::Arc; + +use codex_protocol::ThreadId; +use codex_protocol::protocol::SessionSource; +use serde::Serialize; +use tracing::debug; +use tracing::warn; +use uuid::Uuid; + +use crate::AgentThreadId; +use crate::CodexTurnId; +use crate::CompactionId; +use crate::CompactionTraceContext; +use crate::InferenceTraceContext; +use crate::RawPayloadKind; +use crate::RawPayloadRef; +use crate::RawTraceEventPayload; +use crate::TraceWriter; + +/// Environment variable that enables local trace-bundle recording. +/// +/// The value is a root directory. Each independent root session gets one child +/// bundle directory. Spawned child threads share their root session's bundle so +/// one reduced `state.json` describes the whole multi-agent rollout tree. +pub const CODEX_ROLLOUT_TRACE_ROOT_ENV: &str = "CODEX_ROLLOUT_TRACE_ROOT"; + +/// Lightweight handle stored in `SessionServices`. +/// +/// Cloning the handle is cheap; all sequencing and file ownership remains +/// inside `TraceWriter`. Disabled handles intentionally accept the same calls +/// as enabled handles so hot-path session code can describe traceable events +/// without repeatedly branching on whether diagnostic recording is enabled. +#[derive(Clone, Debug)] +pub struct RolloutTraceRecorder { + state: RolloutTraceRecorderState, +} + +#[derive(Clone, Debug)] +enum RolloutTraceRecorderState { + Disabled, + Enabled(EnabledRolloutTraceRecorder), +} + +#[derive(Clone, Debug)] +struct EnabledRolloutTraceRecorder { + writer: Arc, +} + +/// Metadata captured once at thread/session start. +/// +/// This payload is intentionally operational rather than reduced: it is a raw +/// payload that later reducers can mine as the reduced thread model evolves. +#[derive(Serialize)] +pub struct ThreadStartedTraceMetadata { + pub thread_id: String, + pub agent_path: String, + pub task_name: Option, + pub nickname: Option, + pub agent_role: Option, + pub session_source: SessionSource, + pub cwd: PathBuf, + pub rollout_path: Option, + pub model: String, + pub provider_name: String, + pub approval_policy: String, + pub sandbox_policy: String, +} + +impl RolloutTraceRecorder { + /// Builds a recorder handle that accepts trace calls and records nothing. + pub fn disabled() -> Self { + Self { + state: RolloutTraceRecorderState::Disabled, + } + } + + /// Creates and starts a root trace bundle, or returns a disabled recorder. + /// + /// Trace startup is best-effort. A tracing failure must not make the Codex + /// session unusable, because traces are diagnostic and can be enabled while + /// debugging unrelated production failures. The returned recorder has not + /// emitted `ThreadStarted`; session setup records that event uniformly for + /// root and inherited child recorders. + pub fn create_root_or_disabled(thread_id: ThreadId) -> Self { + let Some(root) = std::env::var_os(CODEX_ROLLOUT_TRACE_ROOT_ENV) else { + return Self::disabled(); + }; + let root = PathBuf::from(root); + match Self::create_in_root(root.as_path(), thread_id) { + Ok(recorder) => recorder, + Err(err) => { + warn!("failed to initialize rollout trace recorder: {err:#}"); + Self::disabled() + } + } + } + + fn create_in_root(root: &Path, thread_id: ThreadId) -> anyhow::Result { + let trace_id = Uuid::new_v4().to_string(); + let thread_id = thread_id.to_string(); + let bundle_dir = root.join(format!("trace-{trace_id}-{thread_id}")); + let writer = TraceWriter::create( + &bundle_dir, + trace_id.clone(), + thread_id.clone(), + thread_id.clone(), + )?; + let recorder = EnabledRolloutTraceRecorder { + writer: Arc::new(writer), + }; + + recorder.append_best_effort(RawTraceEventPayload::RolloutStarted { + trace_id, + root_thread_id: thread_id, + }); + + debug!("recording rollout trace at {}", bundle_dir.display()); + Ok(Self::enabled(recorder)) + } + + fn enabled(inner: EnabledRolloutTraceRecorder) -> Self { + Self { + state: RolloutTraceRecorderState::Enabled(inner), + } + } + + /// Emits the lifecycle event and metadata for one thread in this rollout tree. + /// + /// Root sessions call this immediately after `RolloutStarted`; spawned + /// child sessions call it on the inherited recorder. Keeping children in + /// the root bundle preserves one raw payload namespace and one reduced + /// `RolloutTrace` for the whole multi-agent task. + pub fn record_thread_started(&self, metadata: ThreadStartedTraceMetadata) { + let RolloutTraceRecorderState::Enabled(recorder) = &self.state else { + return; + }; + let metadata_payload = + recorder.write_json_payload_best_effort(RawPayloadKind::SessionMetadata, &metadata); + recorder.append_best_effort(RawTraceEventPayload::ThreadStarted { + thread_id: metadata.thread_id, + agent_path: metadata.agent_path, + metadata_payload, + }); + } + + /// Builds reusable inference trace context for one Codex turn. + /// + /// The returned context is intentionally not "an inference call" yet. + /// Transport code owns retry/fallback attempts and calls `start_attempt` + /// only after it has built the concrete request payload for that attempt. + pub fn inference_trace_context( + &self, + thread_id: impl Into, + codex_turn_id: impl Into, + model: impl Into, + provider_name: impl Into, + ) -> InferenceTraceContext { + let RolloutTraceRecorderState::Enabled(recorder) = &self.state else { + return InferenceTraceContext::disabled(); + }; + + InferenceTraceContext::enabled( + Arc::clone(&recorder.writer), + thread_id.into(), + codex_turn_id.into(), + model.into(), + provider_name.into(), + ) + } + + /// Builds remote-compaction trace context for one checkpoint. + /// + /// Rollout tracing currently has a first-class checkpoint model only for remote compaction. + /// The compact endpoint is a model-facing request whose output replaces live history, so it + /// needs both request/response attempt events and a later checkpoint event when processed + /// replacement history is installed. + pub fn compaction_trace_context( + &self, + thread_id: impl Into, + codex_turn_id: impl Into, + compaction_id: impl Into, + model: impl Into, + provider_name: impl Into, + ) -> CompactionTraceContext { + let RolloutTraceRecorderState::Enabled(recorder) = &self.state else { + return CompactionTraceContext::disabled(); + }; + + CompactionTraceContext::enabled( + Arc::clone(&recorder.writer), + thread_id.into(), + codex_turn_id.into(), + compaction_id.into(), + model.into(), + provider_name.into(), + ) + } +} + +impl EnabledRolloutTraceRecorder { + fn write_json_payload_best_effort( + &self, + kind: RawPayloadKind, + payload: &impl Serialize, + ) -> Option { + match self.writer.write_json_payload(kind, payload) { + Ok(payload_ref) => Some(payload_ref), + Err(err) => { + warn!("failed to write rollout trace payload: {err:#}"); + None + } + } + } + + fn append_best_effort(&self, payload: RawTraceEventPayload) { + if let Err(err) = self.writer.append(payload) { + warn!("failed to append rollout trace event: {err:#}"); + } + } +} + +#[cfg(test)] +#[path = "recorder_tests.rs"] +mod tests; diff --git a/codex-rs/rollout-trace/src/recorder_tests.rs b/codex-rs/rollout-trace/src/recorder_tests.rs new file mode 100644 index 000000000000..2bd05c77479c --- /dev/null +++ b/codex-rs/rollout-trace/src/recorder_tests.rs @@ -0,0 +1,162 @@ +use std::fs; +use std::path::Path; +use std::path::PathBuf; + +use codex_protocol::AgentPath; +use codex_protocol::ThreadId; +use codex_protocol::protocol::SandboxPolicy; +use codex_protocol::protocol::SessionSource; +use codex_protocol::protocol::SubAgentSource; +use tempfile::TempDir; + +use super::*; +use crate::CompactionCheckpointTracePayload; +use crate::RolloutStatus; +use crate::replay_bundle; + +#[test] +fn create_in_root_writes_replayable_lifecycle_events() -> anyhow::Result<()> { + let temp = TempDir::new()?; + let thread_id = ThreadId::new(); + let recorder = + RolloutTraceRecorder::create_in_root(temp.path(), thread_id).expect("trace recorder"); + recorder.record_thread_started(ThreadStartedTraceMetadata { + thread_id: thread_id.to_string(), + agent_path: "/root".to_string(), + task_name: None, + nickname: None, + agent_role: None, + session_source: SessionSource::Exec, + cwd: PathBuf::from("/workspace"), + rollout_path: Some(PathBuf::from("/tmp/rollout.jsonl")), + model: "gpt-test".to_string(), + provider_name: "test-provider".to_string(), + approval_policy: "never".to_string(), + sandbox_policy: format!("{:?}", SandboxPolicy::DangerFullAccess), + }); + + let bundle_dir = single_bundle_dir(temp.path())?; + let replayed = replay_bundle(&bundle_dir)?; + + assert_eq!(replayed.status, RolloutStatus::Running); + assert_eq!(replayed.root_thread_id, thread_id.to_string()); + assert_eq!(replayed.threads[&thread_id.to_string()].agent_path, "/root"); + assert_eq!(replayed.raw_payloads.len(), 1); + + Ok(()) +} + +#[test] +fn spawned_thread_start_appends_to_root_bundle() -> anyhow::Result<()> { + let temp = TempDir::new()?; + let root_thread_id = ThreadId::new(); + let child_thread_id = ThreadId::new(); + let recorder = + RolloutTraceRecorder::create_in_root(temp.path(), root_thread_id).expect("trace recorder"); + recorder.record_thread_started(minimal_metadata(root_thread_id)); + + recorder.record_thread_started(ThreadStartedTraceMetadata { + thread_id: child_thread_id.to_string(), + agent_path: "/root/repo_file_counter".to_string(), + task_name: Some("repo_file_counter".to_string()), + nickname: Some("Kepler".to_string()), + agent_role: Some("worker".to_string()), + session_source: SessionSource::SubAgent(SubAgentSource::ThreadSpawn { + parent_thread_id: root_thread_id, + depth: 1, + agent_path: Some( + AgentPath::try_from("/root/repo_file_counter").map_err(anyhow::Error::msg)?, + ), + agent_nickname: Some("Kepler".to_string()), + agent_role: Some("worker".to_string()), + }), + cwd: PathBuf::from("/workspace"), + rollout_path: Some(PathBuf::from("/tmp/child-rollout.jsonl")), + model: "gpt-test".to_string(), + provider_name: "test-provider".to_string(), + approval_policy: "never".to_string(), + sandbox_policy: format!("{:?}", SandboxPolicy::DangerFullAccess), + }); + let bundle_dir = single_bundle_dir(temp.path())?; + let replayed = replay_bundle(&bundle_dir)?; + + assert_eq!(fs::read_dir(temp.path())?.count(), 1); + assert_eq!(replayed.threads.len(), 2); + assert_eq!( + replayed.threads[&child_thread_id.to_string()].agent_path, + "/root/repo_file_counter" + ); + assert_eq!(replayed.status, RolloutStatus::Running); + assert_eq!( + replayed.threads[&child_thread_id.to_string()] + .execution + .status, + crate::ExecutionStatus::Running + ); + assert_eq!(replayed.raw_payloads.len(), 2); + + Ok(()) +} + +#[test] +fn disabled_recorder_accepts_trace_calls_without_writing() -> anyhow::Result<()> { + let temp = TempDir::new()?; + let thread_id = ThreadId::new(); + let recorder = RolloutTraceRecorder::disabled(); + + recorder.record_thread_started(minimal_metadata(thread_id)); + + let inference_trace = + recorder.inference_trace_context(thread_id, "turn-1", "gpt-test", "test-provider"); + let inference_attempt = inference_trace.start_attempt(); + inference_attempt.record_started(&serde_json::json!({ "kind": "inference" })); + let token_usage: Option = None; + inference_attempt.record_completed("response-1", &token_usage, &[]); + inference_attempt.record_failed("inference failed"); + + let compaction_trace = recorder.compaction_trace_context( + thread_id, + "turn-1", + "compaction-1", + "gpt-test", + "test-provider", + ); + let compaction_attempt = + compaction_trace.start_attempt(&serde_json::json!({ "kind": "compaction" })); + compaction_attempt.record_completed(&[]); + compaction_attempt.record_failed("compaction failed"); + compaction_trace.record_installed(&CompactionCheckpointTracePayload { + input_history: &[], + replacement_history: &[], + }); + + assert_eq!(fs::read_dir(temp.path())?.count(), 0); + + Ok(()) +} + +fn minimal_metadata(thread_id: ThreadId) -> ThreadStartedTraceMetadata { + ThreadStartedTraceMetadata { + thread_id: thread_id.to_string(), + agent_path: "/root".to_string(), + task_name: None, + nickname: None, + agent_role: None, + session_source: SessionSource::Exec, + cwd: PathBuf::from("/workspace"), + rollout_path: None, + model: "gpt-test".to_string(), + provider_name: "test-provider".to_string(), + approval_policy: "never".to_string(), + sandbox_policy: "danger-full-access".to_string(), + } +} + +fn single_bundle_dir(root: &Path) -> anyhow::Result { + let mut entries = fs::read_dir(root)? + .map(|entry| entry.map(|entry| entry.path())) + .collect::, _>>()?; + entries.sort(); + assert_eq!(entries.len(), 1); + Ok(entries.remove(0)) +}