diff --git a/codex-rs/core/src/goals.rs b/codex-rs/core/src/goals.rs index f1805bb750fa..6e7688097f0e 100644 --- a/codex-rs/core/src/goals.rs +++ b/codex-rs/core/src/goals.rs @@ -12,6 +12,11 @@ use crate::state::TurnState; use crate::tasks::RegularTask; use anyhow::Context; use codex_features::Feature; +use codex_otel::GOAL_BUDGET_LIMITED_METRIC; +use codex_otel::GOAL_COMPLETED_METRIC; +use codex_otel::GOAL_CREATED_METRIC; +use codex_otel::GOAL_DURATION_SECONDS_METRIC; +use codex_otel::GOAL_TOKEN_COUNT_METRIC; use codex_protocol::config_types::ModeKind; use codex_protocol::models::ContentItem; use codex_protocol::models::ResponseInputItem; @@ -453,6 +458,15 @@ impl Session { let goal_status = goal.status; let goal_id = goal.goal_id.clone(); + let previous_status_for_goal = if replacing_goal { + None + } else { + previous_status + }; + if replacing_goal { + self.emit_goal_created_metric(); + } + self.emit_goal_terminal_metrics_if_status_changed(previous_status_for_goal, &goal); let goal = protocol_goal_from_state(goal); *self.goal_runtime.budget_limit_reported_goal_id.lock().await = None; let newly_active_goal = goal_status == codex_state::ThreadGoalStatus::Active @@ -521,6 +535,7 @@ impl Session { })?; let goal_id = goal.goal_id.clone(); + self.emit_goal_created_metric(); let goal = protocol_goal_from_state(goal); *self.goal_runtime.budget_limit_reported_goal_id.lock().await = None; @@ -637,6 +652,57 @@ impl Session { accounting.wall_clock.mark_active_goal(goal_id); } + fn emit_goal_created_metric(&self) { + self.services + .session_telemetry + .counter(GOAL_CREATED_METRIC, /*inc*/ 1, &[]); + } + + fn emit_goal_terminal_metrics_if_status_changed( + &self, + previous_status: Option, + goal: &codex_state::ThreadGoal, + ) { + if previous_status == Some(goal.status) { + return; + } + + let counter = match goal.status { + codex_state::ThreadGoalStatus::BudgetLimited => GOAL_BUDGET_LIMITED_METRIC, + codex_state::ThreadGoalStatus::Complete => GOAL_COMPLETED_METRIC, + codex_state::ThreadGoalStatus::Active | codex_state::ThreadGoalStatus::Paused => { + return; + } + }; + let status_tag = [("status", goal.status.as_str())]; + self.services + .session_telemetry + .counter(counter, /*inc*/ 1, &[]); + self.services.session_telemetry.histogram( + GOAL_TOKEN_COUNT_METRIC, + goal.tokens_used, + &status_tag, + ); + self.services.session_telemetry.histogram( + GOAL_DURATION_SECONDS_METRIC, + goal.time_used_seconds, + &status_tag, + ); + } + + async fn current_goal_status_for_metrics( + &self, + state_db: &StateDbHandle, + expected_goal_id: Option<&str>, + ) -> anyhow::Result> { + let goal = state_db.get_thread_goal(self.conversation_id).await?; + Ok(goal.and_then(|goal| { + expected_goal_id + .is_none_or(|expected_goal_id| goal.goal_id == expected_goal_id) + .then_some(goal.status) + })) + } + async fn active_turn_context(&self) -> Option> { let active = self.active_turn.lock().await; active @@ -819,6 +885,9 @@ impl Session { if time_delta_seconds == 0 && token_delta <= 0 { return Ok(()); } + let previous_status = self + .current_goal_status_for_metrics(&state_db, expected_goal_id.as_deref()) + .await?; let outcome = state_db .account_thread_goal_usage( self.conversation_id, @@ -861,6 +930,7 @@ impl Session { accounting.wall_clock.clear_active_goal(); } } + self.emit_goal_terminal_metrics_if_status_changed(previous_status, &goal); goal } codex_state::ThreadGoalAccountingOutcome::Unchanged(_) => return Ok(()), @@ -931,6 +1001,9 @@ impl Session { if time_delta_seconds == 0 { return Ok(None); } + let previous_status = self + .current_goal_status_for_metrics(state_db, expected_goal_id.as_deref()) + .await?; match state_db .account_thread_goal_usage( @@ -943,6 +1016,7 @@ impl Session { .await? { codex_state::ThreadGoalAccountingOutcome::Updated(goal) => { + self.emit_goal_terminal_metrics_if_status_changed(previous_status, &goal); self.goal_runtime .accounting .lock() diff --git a/codex-rs/core/src/session/tests.rs b/codex-rs/core/src/session/tests.rs index 89633daf35d7..c3cb397ec257 100644 --- a/codex-rs/core/src/session/tests.rs +++ b/codex-rs/core/src/session/tests.rs @@ -77,6 +77,11 @@ use codex_execpolicy::Decision; use codex_execpolicy::NetworkRuleProtocol; use codex_execpolicy::Policy; use codex_network_proxy::NetworkProxyConfig; +use codex_otel::GOAL_BUDGET_LIMITED_METRIC; +use codex_otel::GOAL_COMPLETED_METRIC; +use codex_otel::GOAL_CREATED_METRIC; +use codex_otel::GOAL_DURATION_SECONDS_METRIC; +use codex_otel::GOAL_TOKEN_COUNT_METRIC; use codex_otel::MetricsClient; use codex_otel::MetricsConfig; use codex_otel::THREAD_SKILLS_DESCRIPTION_TRUNCATED_CHARS_METRIC; @@ -243,6 +248,20 @@ fn histogram_sum(resource_metrics: &ResourceMetrics, name: &str) -> u64 { } } +fn counter_sum(resource_metrics: &ResourceMetrics, name: &str) -> u64 { + let metric = find_metric(resource_metrics, name); + match metric.data() { + AggregatedMetrics::U64(data) => match data { + MetricData::Sum(sum) => sum + .data_points() + .map(opentelemetry_sdk::metrics::data::SumDataPoint::value) + .sum(), + _ => panic!("unexpected counter aggregation"), + }, + _ => panic!("unexpected counter data type"), + } +} + fn skill_message(text: &str) -> ResponseItem { ResponseItem::Message { id: None, @@ -7309,6 +7328,116 @@ async fn goal_test_state_db(sess: &Session) -> anyhow::Result) -> SessionTelemetry { + let session_telemetry = test_session_telemetry_without_metadata(); + Arc::get_mut(session) + .expect("session should not be shared") + .services + .session_telemetry = session_telemetry.clone(); + session_telemetry +} + +#[tokio::test] +async fn goal_created_and_completed_metrics_are_emitted() -> anyhow::Result<()> { + let (mut sess, tc, _rx) = make_goal_session_and_context_with_rx().await; + let session_telemetry = install_goal_metric_test_telemetry(&mut sess); + + sess.create_thread_goal( + tc.as_ref(), + crate::goals::CreateGoalRequest { + objective: "Keep the watcher alive".to_string(), + token_budget: Some(500), + }, + ) + .await?; + set_total_token_usage(&sess, post_goal_token_usage()).await; + sess.goal_runtime_apply(GoalRuntimeEvent::ToolCompletedGoal { + turn_context: tc.as_ref(), + }) + .await?; + sess.set_thread_goal( + tc.as_ref(), + SetGoalRequest { + objective: None, + status: Some(ThreadGoalStatus::Complete), + token_budget: None, + }, + ) + .await?; + + let snapshot = session_telemetry + .snapshot_metrics() + .expect("runtime metrics snapshot"); + assert_eq!(1, counter_sum(&snapshot, GOAL_CREATED_METRIC)); + assert_eq!(1, counter_sum(&snapshot, GOAL_COMPLETED_METRIC)); + assert_eq!(70, histogram_sum(&snapshot, GOAL_TOKEN_COUNT_METRIC)); + assert_eq!(0, histogram_sum(&snapshot, GOAL_DURATION_SECONDS_METRIC)); + + Ok(()) +} + +#[tokio::test] +async fn goal_budget_limited_metrics_emit_once_at_transition() -> anyhow::Result<()> { + let (mut sess, tc, _rx) = make_goal_session_and_context_with_rx().await; + let session_telemetry = install_goal_metric_test_telemetry(&mut sess); + + sess.create_thread_goal( + tc.as_ref(), + crate::goals::CreateGoalRequest { + objective: "Keep the watcher alive".to_string(), + token_budget: Some(10), + }, + ) + .await?; + sess.goal_runtime_apply(GoalRuntimeEvent::TurnStarted { + turn_context: tc.as_ref(), + token_usage: TokenUsage::default(), + }) + .await?; + set_total_token_usage( + &sess, + TokenUsage { + input_tokens: 20, + cached_input_tokens: 0, + output_tokens: 5, + reasoning_output_tokens: 0, + total_tokens: 25, + }, + ) + .await; + sess.goal_runtime_apply(GoalRuntimeEvent::ToolCompleted { + turn_context: tc.as_ref(), + tool_name: "shell", + }) + .await?; + + set_total_token_usage( + &sess, + TokenUsage { + input_tokens: 30, + cached_input_tokens: 0, + output_tokens: 10, + reasoning_output_tokens: 0, + total_tokens: 40, + }, + ) + .await; + sess.goal_runtime_apply(GoalRuntimeEvent::ToolCompletedGoal { + turn_context: tc.as_ref(), + }) + .await?; + + let snapshot = session_telemetry + .snapshot_metrics() + .expect("runtime metrics snapshot"); + assert_eq!(1, counter_sum(&snapshot, GOAL_CREATED_METRIC)); + assert_eq!(1, counter_sum(&snapshot, GOAL_BUDGET_LIMITED_METRIC)); + assert_eq!(25, histogram_sum(&snapshot, GOAL_TOKEN_COUNT_METRIC)); + assert_eq!(0, histogram_sum(&snapshot, GOAL_DURATION_SECONDS_METRIC)); + + Ok(()) +} + #[tokio::test] async fn budget_limited_accounting_steers_active_turn_without_aborting() -> anyhow::Result<()> { let (sess, tc, rx) = make_goal_session_and_context_with_rx().await; diff --git a/codex-rs/otel/src/metrics/names.rs b/codex-rs/otel/src/metrics/names.rs index aca120f1e185..dc493721685b 100644 --- a/codex-rs/otel/src/metrics/names.rs +++ b/codex-rs/otel/src/metrics/names.rs @@ -27,6 +27,11 @@ pub const TURN_NETWORK_PROXY_METRIC: &str = "codex.turn.network_proxy"; pub const TURN_MEMORY_METRIC: &str = "codex.turn.memory"; pub const TURN_TOOL_CALL_METRIC: &str = "codex.turn.tool.call"; pub const TURN_TOKEN_USAGE_METRIC: &str = "codex.turn.token_usage"; +pub const GOAL_CREATED_METRIC: &str = "codex.goal.created"; +pub const GOAL_COMPLETED_METRIC: &str = "codex.goal.completed"; +pub const GOAL_BUDGET_LIMITED_METRIC: &str = "codex.goal.budget_limited"; +pub const GOAL_TOKEN_COUNT_METRIC: &str = "codex.goal.token_count"; +pub const GOAL_DURATION_SECONDS_METRIC: &str = "codex.goal.duration_s"; pub const PROFILE_USAGE_METRIC: &str = "codex.profile.usage"; pub const CURATED_PLUGINS_STARTUP_SYNC_METRIC: &str = "codex.plugins.startup_sync"; pub const CURATED_PLUGINS_STARTUP_SYNC_FINAL_METRIC: &str = "codex.plugins.startup_sync.final";