Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions codex-rs/core/src/session/turn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1916,6 +1916,11 @@ async fn try_run_sampling_request(
otel.name = field::Empty,
tool_name = field::Empty,
from = field::Empty,
gen_ai.usage.input_tokens = field::Empty,
gen_ai.usage.cache_read.input_tokens = field::Empty,
gen_ai.usage.output_tokens = field::Empty,
codex.usage.reasoning_output_tokens = field::Empty,
codex.usage.total_tokens = field::Empty,
);

let event = match stream
Expand Down
18 changes: 13 additions & 5 deletions codex-rs/core/src/state/turn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,17 +79,25 @@ pub(crate) struct RunningTask {
pub(crate) _timer: Option<codex_otel::Timer>,
}

pub(crate) struct RemovedTask {
pub(crate) records_turn_token_usage_on_span: bool,
pub(crate) active_turn_is_empty: bool,
}

impl ActiveTurn {
pub(crate) fn add_task(&mut self, task: RunningTask) {
let sub_id = task.turn_context.sub_id.clone();
self.tasks.insert(sub_id, task);
}

pub(crate) fn remove_task(&mut self, sub_id: &str) -> bool {
if let Some(task) = self.tasks.swap_remove(sub_id) {
task.handle.detach();
}
self.tasks.is_empty()
pub(crate) fn remove_task(&mut self, sub_id: &str) -> Option<RemovedTask> {
let task = self.tasks.swap_remove(sub_id)?;
let records_turn_token_usage_on_span = task.task.records_turn_token_usage_on_span();
task.handle.detach();
Some(RemovedTask {
records_turn_token_usage_on_span,
active_turn_is_empty: self.tasks.is_empty(),
})
}

pub(crate) fn drain_tasks(&mut self) -> Vec<RunningTask> {
Expand Down
60 changes: 56 additions & 4 deletions codex-rs/core/src/tasks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ use tokio::sync::Notify;
use tokio_util::sync::CancellationToken;
use tokio_util::task::AbortOnDropHandle;
use tracing::Instrument;
use tracing::Span;
use tracing::field;
use tracing::info_span;
use tracing::trace;
use tracing::warn;
Expand Down Expand Up @@ -191,6 +193,11 @@ pub(crate) trait SessionTask: Send + Sync + 'static {
/// Returns the tracing name for a spawned task span.
fn span_name(&self) -> &'static str;

/// Returns whether turn token usage should be recorded on this task's turn span.
fn records_turn_token_usage_on_span(&self) -> bool {
false
}

/// Executes the task until completion or cancellation.
///
/// Implementations typically stream protocol events using `session` and
Expand Down Expand Up @@ -228,6 +235,8 @@ pub(crate) trait AnySessionTask: Send + Sync + 'static {

fn span_name(&self) -> &'static str;

fn records_turn_token_usage_on_span(&self) -> bool;

fn run(
self: Arc<Self>,
session: Arc<SessionTaskContext>,
Expand Down Expand Up @@ -255,6 +264,10 @@ where
SessionTask::span_name(self)
}

fn records_turn_token_usage_on_span(&self) -> bool {
SessionTask::records_turn_token_usage_on_span(self)
}

fn run(
self: Arc<Self>,
session: Arc<SessionTaskContext>,
Expand Down Expand Up @@ -361,6 +374,12 @@ impl Session {
thread.id = %self.conversation_id,
turn.id = %turn_context.sub_id,
model = %turn_context.model_info.slug,
codex.turn.token_usage.input_tokens = field::Empty,
codex.turn.token_usage.cached_input_tokens = field::Empty,
codex.turn.token_usage.non_cached_input_tokens = field::Empty,
codex.turn.token_usage.output_tokens = field::Empty,
codex.turn.token_usage.reasoning_output_tokens = field::Empty,
codex.turn.token_usage.total_tokens = field::Empty,
);
let handle = tokio::spawn(
async move {
Expand Down Expand Up @@ -548,14 +567,20 @@ impl Session {
let mut token_usage_at_turn_start = None;
let mut turn_had_memory_citation = false;
let mut turn_tool_calls = 0_u64;
let mut records_turn_token_usage_on_span = false;
let turn_state = {
let mut active = self.active_turn.lock().await;
if let Some(at) = active.as_mut()
&& at.remove_task(&turn_context.sub_id)
&& let Some(removed_task) = at.remove_task(&turn_context.sub_id)
{
should_clear_active_turn = true;
let turn_state = Arc::clone(&at.turn_state);
Some(turn_state)
records_turn_token_usage_on_span = removed_task.records_turn_token_usage_on_span;
if removed_task.active_turn_is_empty {
should_clear_active_turn = true;
let turn_state = Arc::clone(&at.turn_state);
Some(turn_state)
} else {
None
}
} else {
None
}
Expand Down Expand Up @@ -634,6 +659,33 @@ impl Session {
- token_usage_at_turn_start.total_tokens)
.max(0),
};
if records_turn_token_usage_on_span {
let current_span = Span::current();
current_span.record(
"codex.turn.token_usage.input_tokens",
turn_token_usage.input_tokens,
);
current_span.record(
"codex.turn.token_usage.cached_input_tokens",
turn_token_usage.cached_input(),
);
current_span.record(
"codex.turn.token_usage.non_cached_input_tokens",
turn_token_usage.non_cached_input(),
);
current_span.record(
"codex.turn.token_usage.output_tokens",
turn_token_usage.output_tokens,
);
current_span.record(
"codex.turn.token_usage.reasoning_output_tokens",
turn_token_usage.reasoning_output_tokens,
);
current_span.record(
"codex.turn.token_usage.total_tokens",
turn_token_usage.total_tokens,
);
}
self.services
.analytics_events_client
.track_turn_token_usage(TurnTokenUsageFact {
Expand Down
4 changes: 4 additions & 0 deletions codex-rs/core/src/tasks/regular.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ impl SessionTask for RegularTask {
"session_task.turn"
}

fn records_turn_token_usage_on_span(&self) -> bool {
true
}

async fn run(
self: Arc<Self>,
session: Arc<SessionTaskContext>,
Expand Down
85 changes: 85 additions & 0 deletions codex-rs/core/tests/suite/otel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,91 @@ async fn process_sse_emits_completed_telemetry() {
});
}

#[tokio::test(flavor = "current_thread")]
async fn turn_and_completed_response_spans_record_token_usage() {
let buffer: &'static Mutex<Vec<u8>> = Box::leak(Box::new(Mutex::new(Vec::new())));
let subscriber = tracing_subscriber::fmt()
.with_level(true)
.with_ansi(false)
.with_max_level(Level::TRACE)
.with_span_events(FmtSpan::FULL)
.with_writer(MockWriter::new(buffer))
.finish();
let _guard = tracing::subscriber::set_default(subscriber);

let server = start_mock_server().await;

mount_sse_once(
&server,
sse(vec![serde_json::json!({
"type": "response.completed",
"response": {
"id": "resp1",
"usage": {
"input_tokens": 3,
"input_tokens_details": { "cached_tokens": 1 },
"output_tokens": 5,
"output_tokens_details": { "reasoning_tokens": 2 },
"total_tokens": 9
}
}
})]),
)
.await;

let TestCodex { codex, .. } = test_codex()
.with_config(|config| {
config
.features
.disable(Feature::GhostCommit)
.expect("test config should allow feature update");
})
.build(&server)
.await
.unwrap();

codex
.submit(Op::UserInput {
environments: None,
items: vec![UserInput::Text {
text: "hello".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
})
.await
.unwrap();

wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;

let logs = String::from_utf8(buffer.lock().unwrap().clone()).unwrap();

assert!(
logs.lines().any(|line| {
line.contains("handle_responses{otel.name=\"completed\"")
&& line.contains("gen_ai.usage.input_tokens=3")
&& line.contains("gen_ai.usage.cache_read.input_tokens=1")
&& line.contains("gen_ai.usage.output_tokens=5")
&& line.contains("codex.usage.reasoning_output_tokens=2")
&& line.contains("codex.usage.total_tokens=9")
}),
"missing completed response span token usage\nlogs:\n{logs}"
);
assert!(
logs.lines().any(|line| {
line.contains("turn{otel.name=\"session_task.turn\"")
&& line.contains("codex.turn.token_usage.input_tokens=3")
&& line.contains("codex.turn.token_usage.cached_input_tokens=1")
&& line.contains("codex.turn.token_usage.non_cached_input_tokens=2")
&& line.contains("codex.turn.token_usage.output_tokens=5")
&& line.contains("codex.turn.token_usage.reasoning_output_tokens=2")
&& line.contains("codex.turn.token_usage.total_tokens=9")
}),
"missing regular turn span token usage\nlogs:\n{logs}"
);
}

#[tokio::test]
async fn handle_responses_span_records_response_kind_and_tool_name() {
let buffer: &'static Mutex<Vec<u8>> = Box::leak(Box::new(Mutex::new(Vec::new())));
Expand Down
17 changes: 17 additions & 0 deletions codex-rs/otel/src/events/session_telemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,23 @@ impl SessionTelemetry {
handle_responses_span.record("tool_name", name.as_str());
}
}
ResponseEvent::Completed {
token_usage: Some(token_usage),
..
} => {
handle_responses_span.record("gen_ai.usage.input_tokens", token_usage.input_tokens);
handle_responses_span.record(
"gen_ai.usage.cache_read.input_tokens",
token_usage.cached_input(),
);
handle_responses_span
.record("gen_ai.usage.output_tokens", token_usage.output_tokens);
handle_responses_span.record(
"codex.usage.reasoning_output_tokens",
token_usage.reasoning_output_tokens,
);
handle_responses_span.record("codex.usage.total_tokens", token_usage.total_tokens);
}
_ => {}
}
}
Expand Down
Loading