Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions codex-rs/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions codex-rs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ codex-responses-api-proxy = { path = "responses-api-proxy" }
codex-response-debug-context = { path = "response-debug-context" }
codex-rmcp-client = { path = "rmcp-client" }
codex-rollout = { path = "rollout" }
codex-rollout-trace = { path = "rollout-trace" }
codex-sandboxing = { path = "sandboxing" }
codex-secrets = { path = "secrets" }
codex-shell-command = { path = "shell-command" }
Expand Down
1 change: 1 addition & 0 deletions codex-rs/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ codex-model-provider = { workspace = true }
codex-protocol = { workspace = true }
codex-response-debug-context = { workspace = true }
codex-rollout = { workspace = true }
codex-rollout-trace = { workspace = true }
codex-rmcp-client = { workspace = true }
codex-sandboxing = { workspace = true }
codex-state = { workspace = true }
Expand Down
84 changes: 69 additions & 15 deletions codex-rs/core/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ use codex_protocol::openai_models::ReasoningEffort as ReasoningEffortConfig;
use codex_protocol::protocol::SessionSource;
use codex_protocol::protocol::SubAgentSource;
use codex_protocol::protocol::W3cTraceContext;
use codex_rollout_trace::CompactionTraceContext;
use codex_rollout_trace::InferenceTraceAttempt;
use codex_rollout_trace::InferenceTraceContext;
use codex_tools::create_tools_json_for_responses_api;
use eventsource_stream::Event;
use eventsource_stream::EventStreamError;
Expand Down Expand Up @@ -411,6 +414,7 @@ impl ModelClient {
effort: Option<ReasoningEffortConfig>,
summary: ReasoningSummaryConfig,
session_telemetry: &SessionTelemetry,
compaction_trace: &CompactionTraceContext,
) -> Result<Vec<ResponseItem>> {
if prompt.input.is_empty() {
return Ok(Vec::new());
Expand Down Expand Up @@ -465,10 +469,13 @@ impl ModelClient {
extra_headers.extend(build_conversation_headers(Some(
self.state.conversation_id.to_string(),
)));
client
let trace_attempt = compaction_trace.start_attempt(&payload);
let result = client
.compact_input(&payload, extra_headers)
.await
.map_err(map_api_error)
.map_err(map_api_error);
trace_attempt.record_result(result.as_deref());
result
}

pub(crate) async fn create_realtime_call_with_headers(
Expand Down Expand Up @@ -1148,6 +1155,7 @@ impl ModelClientSession {
summary: ReasoningSummaryConfig,
service_tier: Option<ServiceTier>,
turn_metadata_header: Option<&str>,
inference_trace: &InferenceTraceContext,
) -> Result<ResponseStream> {
if let Some(path) = &*CODEX_RS_SSE_FIXTURE {
warn!(path, "Streaming from fixture");
Expand All @@ -1156,7 +1164,11 @@ impl ModelClientSession {
self.client.state.provider.info().stream_idle_timeout(),
)
.map_err(map_api_error)?;
let (stream, _last_request_rx) = map_response_stream(stream, session_telemetry.clone());
let (stream, _last_request_rx) = map_response_stream(
stream,
session_telemetry.clone(),
InferenceTraceAttempt::disabled(),
);
return Ok(stream);
}

Expand Down Expand Up @@ -1190,6 +1202,8 @@ impl ModelClientSession {
summary,
service_tier,
)?;
let inference_trace_attempt = inference_trace.start_attempt();
inference_trace_attempt.record_started(&request);
let client = ApiResponsesClient::new(
transport,
client_setup.api_provider,
Expand All @@ -1200,12 +1214,17 @@ impl ModelClientSession {

match stream_result {
Ok(stream) => {
let (stream, _) = map_response_stream(stream, session_telemetry.clone());
let (stream, _) = map_response_stream(
stream,
session_telemetry.clone(),
inference_trace_attempt,
);
return Ok(stream);
}
Err(ApiError::Transport(
unauthorized_transport @ TransportError::Http { status, .. },
)) if status == StatusCode::UNAUTHORIZED => {
inference_trace_attempt.record_failed(&unauthorized_transport);
pending_retry = PendingUnauthorizedRetry::from_recovery(
handle_unauthorized(
unauthorized_transport,
Expand All @@ -1216,7 +1235,11 @@ impl ModelClientSession {
);
continue;
}
Err(err) => return Err(map_api_error(err)),
Err(err) => {
let err = map_api_error(err);
inference_trace_attempt.record_failed(&err);
return Err(err);
}
}
}
}
Expand Down Expand Up @@ -1247,6 +1270,7 @@ impl ModelClientSession {
turn_metadata_header: Option<&str>,
warmup: bool,
request_trace: Option<W3cTraceContext>,
inference_trace: &InferenceTraceContext,
) -> Result<WebsocketStreamOutcome> {
let auth_manager = self.client.state.provider.auth_manager();

Expand Down Expand Up @@ -1321,17 +1345,33 @@ impl ModelClientSession {

let ws_request = self.prepare_websocket_request(ws_payload, &request);
self.websocket_session.last_request = Some(request);
let stream_result = self.websocket_session.connection.as_ref().ok_or_else(|| {
map_api_error(ApiError::Stream(
"websocket connection is unavailable".to_string(),
))
})?;
let stream_result = stream_result
let inference_trace_attempt = if warmup {
// Prewarm sends `generate=false`; it is connection setup, not a
// model inference attempt that should appear in rollout traces.
InferenceTraceAttempt::disabled()
} else {
inference_trace.start_attempt()
};
inference_trace_attempt.record_started(&ws_request);
let websocket_connection =
self.websocket_session.connection.as_ref().ok_or_else(|| {
map_api_error(ApiError::Stream(
"websocket connection is unavailable".to_string(),
))
})?;
let stream_result = websocket_connection
.stream_request(ws_request, self.websocket_session.connection_reused())
.await
.map_err(map_api_error)?;
let (stream, last_request_rx) =
map_response_stream(stream_result, session_telemetry.clone());
.map_err(|err| {
let err = map_api_error(err);
inference_trace_attempt.record_failed(&err);
err
})?;
let (stream, last_request_rx) = map_response_stream(
stream_result,
session_telemetry.clone(),
inference_trace_attempt,
);
self.websocket_session.last_response_rx = Some(last_request_rx);
return Ok(WebsocketStreamOutcome::Stream(stream));
}
Expand Down Expand Up @@ -1390,6 +1430,7 @@ impl ModelClientSession {
return Ok(());
}

let disabled_trace = InferenceTraceContext::disabled();
match self
.stream_responses_websocket(
prompt,
Expand All @@ -1401,6 +1442,7 @@ impl ModelClientSession {
turn_metadata_header,
/*warmup*/ true,
current_span_w3c_trace_context(),
&disabled_trace,
)
.await
{
Expand Down Expand Up @@ -1429,7 +1471,9 @@ impl ModelClientSession {
/// The caller is responsible for passing per-turn settings explicitly (model selection,
/// reasoning settings, telemetry context, and turn metadata). This method will prefer the
/// Responses WebSocket transport when the provider supports it and it remains healthy, and will
/// fall back to the HTTP Responses API transport otherwise.
/// fall back to the HTTP Responses API transport otherwise. The trace context may be enabled or
/// disabled, but is always explicit so transport paths do not need separate trace/no-trace
/// branches.
pub async fn stream(
&mut self,
prompt: &Prompt,
Expand All @@ -1439,6 +1483,7 @@ impl ModelClientSession {
summary: ReasoningSummaryConfig,
service_tier: Option<ServiceTier>,
turn_metadata_header: Option<&str>,
inference_trace: &InferenceTraceContext,
) -> Result<ResponseStream> {
let wire_api = self.client.state.provider.info().wire_api;
match wire_api {
Expand All @@ -1456,6 +1501,7 @@ impl ModelClientSession {
turn_metadata_header,
/*warmup*/ false,
request_trace,
inference_trace,
)
.await?
{
Expand All @@ -1474,6 +1520,7 @@ impl ModelClientSession {
summary,
service_tier,
turn_metadata_header,
inference_trace,
)
.await
}
Expand Down Expand Up @@ -1569,6 +1616,7 @@ fn parent_thread_id_header_value(session_source: &SessionSource) -> Option<Strin
fn map_response_stream<S>(
api_stream: S,
session_telemetry: SessionTelemetry,
inference_trace_attempt: InferenceTraceAttempt,
) -> (ResponseStream, oneshot::Receiver<LastResponse>)
where
S: futures::Stream<Item = std::result::Result<ResponseEvent, ApiError>>
Expand Down Expand Up @@ -1609,6 +1657,11 @@ where
usage.total_tokens,
);
}
inference_trace_attempt.record_completed(
&response_id,
&token_usage,
&items_added,
);
if let Some(sender) = tx_last_response.take() {
let _ = sender.send(LastResponse {
response_id: response_id.clone(),
Expand All @@ -1633,6 +1686,7 @@ where
}
Err(err) => {
let mapped = map_api_error(err);
inference_trace_attempt.record_failed(&mapped);
if !logged_error {
session_telemetry.see_event_completed_failed(&mapped);
logged_error = true;
Expand Down
1 change: 1 addition & 0 deletions codex-rs/core/src/codex_delegate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ pub(crate) async fn run_codex_thread_interactive(
inherited_shell_snapshot: None,
user_shell_override: None,
inherited_exec_policy: Some(Arc::clone(&parent_session.services.exec_policy)),
inherited_rollout_trace: codex_rollout_trace::RolloutTraceRecorder::disabled(),
parent_trace: None,
analytics_events_client: Some(parent_session.services.analytics_events_client.clone()),
}))
Expand Down
4 changes: 4 additions & 0 deletions codex-rs/core/src/compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::TurnStartedEvent;
use codex_protocol::protocol::WarningEvent;
use codex_protocol::user_input::UserInput;
use codex_rollout_trace::InferenceTraceContext;
use codex_utils_output_truncation::TruncationPolicy;
use codex_utils_output_truncation::approx_token_count;
use codex_utils_output_truncation::truncate_text;
Expand Down Expand Up @@ -546,6 +547,9 @@ async fn drain_to_completed(
turn_context.reasoning_summary,
turn_context.config.service_tier,
turn_metadata_header,
// Rollout tracing currently models remote compaction only; local compaction streams
// are left untraced until the reducer has a first-class local compaction lifecycle.
&InferenceTraceContext::disabled(),
)
.await?;
loop {
Expand Down
26 changes: 24 additions & 2 deletions codex-rs/core/src/compact_remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::CompactedItem;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::TurnStartedEvent;
use codex_rollout_trace::CompactionCheckpointTracePayload;
use futures::TryFutureExt;
use tokio_util::sync::CancellationToken;
use tracing::error;
Expand Down Expand Up @@ -114,7 +115,17 @@ async fn run_remote_compact_task_inner_impl(
turn_context: &Arc<TurnContext>,
initial_context_injection: InitialContextInjection,
) -> CodexResult<()> {
let compaction_item = TurnItem::ContextCompaction(ContextCompactionItem::new());
let context_compaction_item = ContextCompactionItem::new();
// Use the UI compaction item ID as the trace compaction ID so protocol lifecycle events,
// endpoint attempts, and the installed history checkpoint all have one join key.
let compaction_trace = sess.services.rollout_trace.compaction_trace_context(
sess.conversation_id,
turn_context.sub_id.as_str(),
context_compaction_item.id.as_str(),
turn_context.model_info.slug.as_str(),
turn_context.provider.info().name.as_str(),
);
let compaction_item = TurnItem::ContextCompaction(context_compaction_item);
sess.emit_turn_item_started(turn_context, &compaction_item)
.await;
let mut history = sess.clone_history().await;
Expand All @@ -131,6 +142,10 @@ async fn run_remote_compact_task_inner_impl(
"trimmed history items before remote compaction"
);
}
// This is the history selected for remote compaction, after any trimming required to fit the
// compact endpoint. The checkpoint below records it separately from the next sampling request,
// whose prompt will repeat current developer/context prefix items.
let trace_input_history = history.raw_items().to_vec();
// Required to keep `/undo` available after compaction
let ghost_snapshots: Vec<ResponseItem> = history
.raw_items()
Expand All @@ -157,7 +172,6 @@ async fn run_remote_compact_task_inner_impl(
personality: turn_context.personality,
output_schema: None,
};

let mut new_history = sess
.services
.model_client
Expand All @@ -167,6 +181,7 @@ async fn run_remote_compact_task_inner_impl(
turn_context.reasoning_effort,
turn_context.reasoning_summary,
&turn_context.session_telemetry,
&compaction_trace,
)
.or_else(|err| async {
let total_usage_breakdown = sess.get_total_token_usage_breakdown().await;
Expand Down Expand Up @@ -200,6 +215,13 @@ async fn run_remote_compact_task_inner_impl(
message: String::new(),
replacement_history: Some(new_history.clone()),
};
// Install is the semantic boundary where the compact endpoint's output becomes live
// thread history. Keep it distinct from the later inference request so the reducer can
// still represent repeated developer/context prefix items exactly as the model saw them.
compaction_trace.record_installed(&CompactionCheckpointTracePayload {
input_history: &trace_input_history,
replacement_history: &new_history,
});
sess.replace_compacted_history(new_history, reference_context_item, compacted_item)
.await;
sess.recompute_token_usage(turn_context).await;
Expand Down
2 changes: 2 additions & 0 deletions codex-rs/core/src/memories/phase1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use codex_protocol::openai_models::ModelInfo;
use codex_protocol::openai_models::ReasoningEffort as ReasoningEffortConfig;
use codex_protocol::protocol::RolloutItem;
use codex_protocol::protocol::TokenUsage;
use codex_rollout_trace::InferenceTraceContext;
use codex_secrets::redact_secrets;
use futures::StreamExt;
use serde::Deserialize;
Expand Down Expand Up @@ -353,6 +354,7 @@ mod job {
stage_one_context.reasoning_summary,
stage_one_context.service_tier,
stage_one_context.turn_metadata_header.as_deref(),
&InferenceTraceContext::disabled(),
)
.await?;

Expand Down
Loading
Loading