diff --git a/codex-rs/core/src/session/turn.rs b/codex-rs/core/src/session/turn.rs index fe9320b12e92..f423c2ea76d4 100644 --- a/codex-rs/core/src/session/turn.rs +++ b/codex-rs/core/src/session/turn.rs @@ -1916,6 +1916,11 @@ async fn try_run_sampling_request( otel.name = field::Empty, tool_name = field::Empty, from = field::Empty, + gen_ai.usage.input_tokens = field::Empty, + gen_ai.usage.cache_read.input_tokens = field::Empty, + gen_ai.usage.output_tokens = field::Empty, + codex.usage.reasoning_output_tokens = field::Empty, + codex.usage.total_tokens = field::Empty, ); let event = match stream diff --git a/codex-rs/core/src/state/turn.rs b/codex-rs/core/src/state/turn.rs index 1439e28bb3f4..0dd0c0937959 100644 --- a/codex-rs/core/src/state/turn.rs +++ b/codex-rs/core/src/state/turn.rs @@ -79,17 +79,25 @@ pub(crate) struct RunningTask { pub(crate) _timer: Option, } +pub(crate) struct RemovedTask { + pub(crate) records_turn_token_usage_on_span: bool, + pub(crate) active_turn_is_empty: bool, +} + impl ActiveTurn { pub(crate) fn add_task(&mut self, task: RunningTask) { let sub_id = task.turn_context.sub_id.clone(); self.tasks.insert(sub_id, task); } - pub(crate) fn remove_task(&mut self, sub_id: &str) -> bool { - if let Some(task) = self.tasks.swap_remove(sub_id) { - task.handle.detach(); - } - self.tasks.is_empty() + pub(crate) fn remove_task(&mut self, sub_id: &str) -> Option { + let task = self.tasks.swap_remove(sub_id)?; + let records_turn_token_usage_on_span = task.task.records_turn_token_usage_on_span(); + task.handle.detach(); + Some(RemovedTask { + records_turn_token_usage_on_span, + active_turn_is_empty: self.tasks.is_empty(), + }) } pub(crate) fn drain_tasks(&mut self) -> Vec { diff --git a/codex-rs/core/src/tasks/mod.rs b/codex-rs/core/src/tasks/mod.rs index fd9aaf2e33f1..94adc6d45bc6 100644 --- a/codex-rs/core/src/tasks/mod.rs +++ b/codex-rs/core/src/tasks/mod.rs @@ -15,6 +15,8 @@ use tokio::sync::Notify; use tokio_util::sync::CancellationToken; use tokio_util::task::AbortOnDropHandle; use tracing::Instrument; +use tracing::Span; +use tracing::field; use tracing::info_span; use tracing::trace; use tracing::warn; @@ -191,6 +193,11 @@ pub(crate) trait SessionTask: Send + Sync + 'static { /// Returns the tracing name for a spawned task span. fn span_name(&self) -> &'static str; + /// Returns whether turn token usage should be recorded on this task's turn span. + fn records_turn_token_usage_on_span(&self) -> bool { + false + } + /// Executes the task until completion or cancellation. /// /// Implementations typically stream protocol events using `session` and @@ -228,6 +235,8 @@ pub(crate) trait AnySessionTask: Send + Sync + 'static { fn span_name(&self) -> &'static str; + fn records_turn_token_usage_on_span(&self) -> bool; + fn run( self: Arc, session: Arc, @@ -255,6 +264,10 @@ where SessionTask::span_name(self) } + fn records_turn_token_usage_on_span(&self) -> bool { + SessionTask::records_turn_token_usage_on_span(self) + } + fn run( self: Arc, session: Arc, @@ -361,6 +374,12 @@ impl Session { thread.id = %self.conversation_id, turn.id = %turn_context.sub_id, model = %turn_context.model_info.slug, + codex.turn.token_usage.input_tokens = field::Empty, + codex.turn.token_usage.cached_input_tokens = field::Empty, + codex.turn.token_usage.non_cached_input_tokens = field::Empty, + codex.turn.token_usage.output_tokens = field::Empty, + codex.turn.token_usage.reasoning_output_tokens = field::Empty, + codex.turn.token_usage.total_tokens = field::Empty, ); let handle = tokio::spawn( async move { @@ -548,14 +567,20 @@ impl Session { let mut token_usage_at_turn_start = None; let mut turn_had_memory_citation = false; let mut turn_tool_calls = 0_u64; + let mut records_turn_token_usage_on_span = false; let turn_state = { let mut active = self.active_turn.lock().await; if let Some(at) = active.as_mut() - && at.remove_task(&turn_context.sub_id) + && let Some(removed_task) = at.remove_task(&turn_context.sub_id) { - should_clear_active_turn = true; - let turn_state = Arc::clone(&at.turn_state); - Some(turn_state) + records_turn_token_usage_on_span = removed_task.records_turn_token_usage_on_span; + if removed_task.active_turn_is_empty { + should_clear_active_turn = true; + let turn_state = Arc::clone(&at.turn_state); + Some(turn_state) + } else { + None + } } else { None } @@ -634,6 +659,33 @@ impl Session { - token_usage_at_turn_start.total_tokens) .max(0), }; + if records_turn_token_usage_on_span { + let current_span = Span::current(); + current_span.record( + "codex.turn.token_usage.input_tokens", + turn_token_usage.input_tokens, + ); + current_span.record( + "codex.turn.token_usage.cached_input_tokens", + turn_token_usage.cached_input(), + ); + current_span.record( + "codex.turn.token_usage.non_cached_input_tokens", + turn_token_usage.non_cached_input(), + ); + current_span.record( + "codex.turn.token_usage.output_tokens", + turn_token_usage.output_tokens, + ); + current_span.record( + "codex.turn.token_usage.reasoning_output_tokens", + turn_token_usage.reasoning_output_tokens, + ); + current_span.record( + "codex.turn.token_usage.total_tokens", + turn_token_usage.total_tokens, + ); + } self.services .analytics_events_client .track_turn_token_usage(TurnTokenUsageFact { diff --git a/codex-rs/core/src/tasks/regular.rs b/codex-rs/core/src/tasks/regular.rs index d64b83f01b93..08c493348896 100644 --- a/codex-rs/core/src/tasks/regular.rs +++ b/codex-rs/core/src/tasks/regular.rs @@ -33,6 +33,10 @@ impl SessionTask for RegularTask { "session_task.turn" } + fn records_turn_token_usage_on_span(&self) -> bool { + true + } + async fn run( self: Arc, session: Arc, diff --git a/codex-rs/core/tests/suite/otel.rs b/codex-rs/core/tests/suite/otel.rs index 6407ec2702f5..fe5905a0a7f3 100644 --- a/codex-rs/core/tests/suite/otel.rs +++ b/codex-rs/core/tests/suite/otel.rs @@ -563,6 +563,91 @@ async fn process_sse_emits_completed_telemetry() { }); } +#[tokio::test(flavor = "current_thread")] +async fn turn_and_completed_response_spans_record_token_usage() { + let buffer: &'static Mutex> = Box::leak(Box::new(Mutex::new(Vec::new()))); + let subscriber = tracing_subscriber::fmt() + .with_level(true) + .with_ansi(false) + .with_max_level(Level::TRACE) + .with_span_events(FmtSpan::FULL) + .with_writer(MockWriter::new(buffer)) + .finish(); + let _guard = tracing::subscriber::set_default(subscriber); + + let server = start_mock_server().await; + + mount_sse_once( + &server, + sse(vec![serde_json::json!({ + "type": "response.completed", + "response": { + "id": "resp1", + "usage": { + "input_tokens": 3, + "input_tokens_details": { "cached_tokens": 1 }, + "output_tokens": 5, + "output_tokens_details": { "reasoning_tokens": 2 }, + "total_tokens": 9 + } + } + })]), + ) + .await; + + let TestCodex { codex, .. } = test_codex() + .with_config(|config| { + config + .features + .disable(Feature::GhostCommit) + .expect("test config should allow feature update"); + }) + .build(&server) + .await + .unwrap(); + + codex + .submit(Op::UserInput { + environments: None, + items: vec![UserInput::Text { + text: "hello".into(), + text_elements: Vec::new(), + }], + final_output_json_schema: None, + responsesapi_client_metadata: None, + }) + .await + .unwrap(); + + wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await; + + let logs = String::from_utf8(buffer.lock().unwrap().clone()).unwrap(); + + assert!( + logs.lines().any(|line| { + line.contains("handle_responses{otel.name=\"completed\"") + && line.contains("gen_ai.usage.input_tokens=3") + && line.contains("gen_ai.usage.cache_read.input_tokens=1") + && line.contains("gen_ai.usage.output_tokens=5") + && line.contains("codex.usage.reasoning_output_tokens=2") + && line.contains("codex.usage.total_tokens=9") + }), + "missing completed response span token usage\nlogs:\n{logs}" + ); + assert!( + logs.lines().any(|line| { + line.contains("turn{otel.name=\"session_task.turn\"") + && line.contains("codex.turn.token_usage.input_tokens=3") + && line.contains("codex.turn.token_usage.cached_input_tokens=1") + && line.contains("codex.turn.token_usage.non_cached_input_tokens=2") + && line.contains("codex.turn.token_usage.output_tokens=5") + && line.contains("codex.turn.token_usage.reasoning_output_tokens=2") + && line.contains("codex.turn.token_usage.total_tokens=9") + }), + "missing regular turn span token usage\nlogs:\n{logs}" + ); +} + #[tokio::test] async fn handle_responses_span_records_response_kind_and_tool_name() { let buffer: &'static Mutex> = Box::leak(Box::new(Mutex::new(Vec::new()))); diff --git a/codex-rs/otel/src/events/session_telemetry.rs b/codex-rs/otel/src/events/session_telemetry.rs index 1ca4a492e657..9df0b3ef1052 100644 --- a/codex-rs/otel/src/events/session_telemetry.rs +++ b/codex-rs/otel/src/events/session_telemetry.rs @@ -305,6 +305,23 @@ impl SessionTelemetry { handle_responses_span.record("tool_name", name.as_str()); } } + ResponseEvent::Completed { + token_usage: Some(token_usage), + .. + } => { + handle_responses_span.record("gen_ai.usage.input_tokens", token_usage.input_tokens); + handle_responses_span.record( + "gen_ai.usage.cache_read.input_tokens", + token_usage.cached_input(), + ); + handle_responses_span + .record("gen_ai.usage.output_tokens", token_usage.output_tokens); + handle_responses_span.record( + "codex.usage.reasoning_output_tokens", + token_usage.reasoning_output_tokens, + ); + handle_responses_span.record("codex.usage.total_tokens", token_usage.total_tokens); + } _ => {} } }