-
Notifications
You must be signed in to change notification settings - Fork 11.5k
Add goal lifecycle metrics #20799
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Add goal lifecycle metrics #20799
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -12,6 +12,11 @@ use crate::state::TurnState; | |
| use crate::tasks::RegularTask; | ||
| use anyhow::Context; | ||
| use codex_features::Feature; | ||
| use codex_otel::GOAL_BUDGET_LIMITED_METRIC; | ||
| use codex_otel::GOAL_COMPLETED_METRIC; | ||
| use codex_otel::GOAL_CREATED_METRIC; | ||
| use codex_otel::GOAL_DURATION_SECONDS_METRIC; | ||
| use codex_otel::GOAL_TOKEN_COUNT_METRIC; | ||
| use codex_protocol::config_types::ModeKind; | ||
| use codex_protocol::models::ContentItem; | ||
| use codex_protocol::models::ResponseInputItem; | ||
|
|
@@ -453,6 +458,15 @@ impl Session { | |
|
|
||
| let goal_status = goal.status; | ||
| let goal_id = goal.goal_id.clone(); | ||
| let previous_status_for_goal = if replacing_goal { | ||
| None | ||
| } else { | ||
| previous_status | ||
| }; | ||
| if replacing_goal { | ||
| self.emit_goal_created_metric(); | ||
| } | ||
| self.emit_goal_terminal_metrics_if_status_changed(previous_status_for_goal, &goal); | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same here |
||
| let goal = protocol_goal_from_state(goal); | ||
| *self.goal_runtime.budget_limit_reported_goal_id.lock().await = None; | ||
| let newly_active_goal = goal_status == codex_state::ThreadGoalStatus::Active | ||
|
|
@@ -521,6 +535,7 @@ impl Session { | |
| })?; | ||
|
|
||
| let goal_id = goal.goal_id.clone(); | ||
| self.emit_goal_created_metric(); | ||
| let goal = protocol_goal_from_state(goal); | ||
| *self.goal_runtime.budget_limit_reported_goal_id.lock().await = None; | ||
|
|
||
|
|
@@ -637,6 +652,57 @@ impl Session { | |
| accounting.wall_clock.mark_active_goal(goal_id); | ||
| } | ||
|
|
||
| fn emit_goal_created_metric(&self) { | ||
| self.services | ||
| .session_telemetry | ||
| .counter(GOAL_CREATED_METRIC, /*inc*/ 1, &[]); | ||
| } | ||
|
|
||
| fn emit_goal_terminal_metrics_if_status_changed( | ||
| &self, | ||
| previous_status: Option<codex_state::ThreadGoalStatus>, | ||
| goal: &codex_state::ThreadGoal, | ||
| ) { | ||
| if previous_status == Some(goal.status) { | ||
| return; | ||
| } | ||
|
|
||
| let counter = match goal.status { | ||
| codex_state::ThreadGoalStatus::BudgetLimited => GOAL_BUDGET_LIMITED_METRIC, | ||
| codex_state::ThreadGoalStatus::Complete => GOAL_COMPLETED_METRIC, | ||
| codex_state::ThreadGoalStatus::Active | codex_state::ThreadGoalStatus::Paused => { | ||
| return; | ||
| } | ||
| }; | ||
| let status_tag = [("status", goal.status.as_str())]; | ||
| self.services | ||
| .session_telemetry | ||
| .counter(counter, /*inc*/ 1, &[]); | ||
| self.services.session_telemetry.histogram( | ||
| GOAL_TOKEN_COUNT_METRIC, | ||
| goal.tokens_used, | ||
| &status_tag, | ||
| ); | ||
| self.services.session_telemetry.histogram( | ||
| GOAL_DURATION_SECONDS_METRIC, | ||
| goal.time_used_seconds, | ||
| &status_tag, | ||
| ); | ||
| } | ||
|
|
||
| async fn current_goal_status_for_metrics( | ||
| &self, | ||
| state_db: &StateDbHandle, | ||
| expected_goal_id: Option<&str>, | ||
| ) -> anyhow::Result<Option<codex_state::ThreadGoalStatus>> { | ||
| let goal = state_db.get_thread_goal(self.conversation_id).await?; | ||
| Ok(goal.and_then(|goal| { | ||
| expected_goal_id | ||
| .is_none_or(|expected_goal_id| goal.goal_id == expected_goal_id) | ||
| .then_some(goal.status) | ||
| })) | ||
| } | ||
|
|
||
| async fn active_turn_context(&self) -> Option<Arc<TurnContext>> { | ||
| let active = self.active_turn.lock().await; | ||
| active | ||
|
|
@@ -819,6 +885,9 @@ impl Session { | |
| if time_delta_seconds == 0 && token_delta <= 0 { | ||
| return Ok(()); | ||
| } | ||
| let previous_status = self | ||
| .current_goal_status_for_metrics(&state_db, expected_goal_id.as_deref()) | ||
| .await?; | ||
| let outcome = state_db | ||
| .account_thread_goal_usage( | ||
| self.conversation_id, | ||
|
|
@@ -861,6 +930,7 @@ impl Session { | |
| accounting.wall_clock.clear_active_goal(); | ||
| } | ||
| } | ||
| self.emit_goal_terminal_metrics_if_status_changed(previous_status, &goal); | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (found by Codex) |
||
| goal | ||
| } | ||
| codex_state::ThreadGoalAccountingOutcome::Unchanged(_) => return Ok(()), | ||
|
|
@@ -931,6 +1001,9 @@ impl Session { | |
| if time_delta_seconds == 0 { | ||
| return Ok(None); | ||
| } | ||
| let previous_status = self | ||
| .current_goal_status_for_metrics(state_db, expected_goal_id.as_deref()) | ||
| .await?; | ||
|
|
||
| match state_db | ||
| .account_thread_goal_usage( | ||
|
|
@@ -943,6 +1016,7 @@ impl Session { | |
| .await? | ||
| { | ||
| codex_state::ThreadGoalAccountingOutcome::Updated(goal) => { | ||
| self.emit_goal_terminal_metrics_if_status_changed(previous_status, &goal); | ||
| self.goal_runtime | ||
| .accounting | ||
| .lock() | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -77,6 +77,11 @@ use codex_execpolicy::Decision; | |
| use codex_execpolicy::NetworkRuleProtocol; | ||
| use codex_execpolicy::Policy; | ||
| use codex_network_proxy::NetworkProxyConfig; | ||
| use codex_otel::GOAL_BUDGET_LIMITED_METRIC; | ||
| use codex_otel::GOAL_COMPLETED_METRIC; | ||
| use codex_otel::GOAL_CREATED_METRIC; | ||
| use codex_otel::GOAL_DURATION_SECONDS_METRIC; | ||
| use codex_otel::GOAL_TOKEN_COUNT_METRIC; | ||
| use codex_otel::MetricsClient; | ||
| use codex_otel::MetricsConfig; | ||
| use codex_otel::THREAD_SKILLS_DESCRIPTION_TRUNCATED_CHARS_METRIC; | ||
|
|
@@ -243,6 +248,20 @@ fn histogram_sum(resource_metrics: &ResourceMetrics, name: &str) -> u64 { | |
| } | ||
| } | ||
|
|
||
| fn counter_sum(resource_metrics: &ResourceMetrics, name: &str) -> u64 { | ||
| let metric = find_metric(resource_metrics, name); | ||
| match metric.data() { | ||
| AggregatedMetrics::U64(data) => match data { | ||
| MetricData::Sum(sum) => sum | ||
| .data_points() | ||
| .map(opentelemetry_sdk::metrics::data::SumDataPoint::value) | ||
| .sum(), | ||
| _ => panic!("unexpected counter aggregation"), | ||
| }, | ||
| _ => panic!("unexpected counter data type"), | ||
| } | ||
| } | ||
|
|
||
| fn skill_message(text: &str) -> ResponseItem { | ||
| ResponseItem::Message { | ||
| id: None, | ||
|
|
@@ -7309,6 +7328,116 @@ async fn goal_test_state_db(sess: &Session) -> anyhow::Result<crate::StateDbHand | |
| .await | ||
| } | ||
|
|
||
| fn install_goal_metric_test_telemetry(session: &mut Arc<Session>) -> SessionTelemetry { | ||
| let session_telemetry = test_session_telemetry_without_metadata(); | ||
| Arc::get_mut(session) | ||
| .expect("session should not be shared") | ||
| .services | ||
| .session_telemetry = session_telemetry.clone(); | ||
| session_telemetry | ||
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn goal_created_and_completed_metrics_are_emitted() -> anyhow::Result<()> { | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think we should test this tbh |
||
| let (mut sess, tc, _rx) = make_goal_session_and_context_with_rx().await; | ||
| let session_telemetry = install_goal_metric_test_telemetry(&mut sess); | ||
|
|
||
| sess.create_thread_goal( | ||
| tc.as_ref(), | ||
| crate::goals::CreateGoalRequest { | ||
| objective: "Keep the watcher alive".to_string(), | ||
| token_budget: Some(500), | ||
| }, | ||
| ) | ||
| .await?; | ||
| set_total_token_usage(&sess, post_goal_token_usage()).await; | ||
| sess.goal_runtime_apply(GoalRuntimeEvent::ToolCompletedGoal { | ||
| turn_context: tc.as_ref(), | ||
| }) | ||
| .await?; | ||
| sess.set_thread_goal( | ||
| tc.as_ref(), | ||
| SetGoalRequest { | ||
| objective: None, | ||
| status: Some(ThreadGoalStatus::Complete), | ||
| token_budget: None, | ||
| }, | ||
| ) | ||
| .await?; | ||
|
|
||
| let snapshot = session_telemetry | ||
| .snapshot_metrics() | ||
| .expect("runtime metrics snapshot"); | ||
| assert_eq!(1, counter_sum(&snapshot, GOAL_CREATED_METRIC)); | ||
| assert_eq!(1, counter_sum(&snapshot, GOAL_COMPLETED_METRIC)); | ||
| assert_eq!(70, histogram_sum(&snapshot, GOAL_TOKEN_COUNT_METRIC)); | ||
| assert_eq!(0, histogram_sum(&snapshot, GOAL_DURATION_SECONDS_METRIC)); | ||
|
|
||
| Ok(()) | ||
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn goal_budget_limited_metrics_emit_once_at_transition() -> anyhow::Result<()> { | ||
| let (mut sess, tc, _rx) = make_goal_session_and_context_with_rx().await; | ||
| let session_telemetry = install_goal_metric_test_telemetry(&mut sess); | ||
|
|
||
| sess.create_thread_goal( | ||
| tc.as_ref(), | ||
| crate::goals::CreateGoalRequest { | ||
| objective: "Keep the watcher alive".to_string(), | ||
| token_budget: Some(10), | ||
| }, | ||
| ) | ||
| .await?; | ||
| sess.goal_runtime_apply(GoalRuntimeEvent::TurnStarted { | ||
| turn_context: tc.as_ref(), | ||
| token_usage: TokenUsage::default(), | ||
| }) | ||
| .await?; | ||
| set_total_token_usage( | ||
| &sess, | ||
| TokenUsage { | ||
| input_tokens: 20, | ||
| cached_input_tokens: 0, | ||
| output_tokens: 5, | ||
| reasoning_output_tokens: 0, | ||
| total_tokens: 25, | ||
| }, | ||
| ) | ||
| .await; | ||
| sess.goal_runtime_apply(GoalRuntimeEvent::ToolCompleted { | ||
| turn_context: tc.as_ref(), | ||
| tool_name: "shell", | ||
| }) | ||
| .await?; | ||
|
|
||
| set_total_token_usage( | ||
| &sess, | ||
| TokenUsage { | ||
| input_tokens: 30, | ||
| cached_input_tokens: 0, | ||
| output_tokens: 10, | ||
| reasoning_output_tokens: 0, | ||
| total_tokens: 40, | ||
| }, | ||
| ) | ||
| .await; | ||
| sess.goal_runtime_apply(GoalRuntimeEvent::ToolCompletedGoal { | ||
| turn_context: tc.as_ref(), | ||
| }) | ||
| .await?; | ||
|
|
||
| let snapshot = session_telemetry | ||
| .snapshot_metrics() | ||
| .expect("runtime metrics snapshot"); | ||
| assert_eq!(1, counter_sum(&snapshot, GOAL_CREATED_METRIC)); | ||
| assert_eq!(1, counter_sum(&snapshot, GOAL_BUDGET_LIMITED_METRIC)); | ||
| assert_eq!(25, histogram_sum(&snapshot, GOAL_TOKEN_COUNT_METRIC)); | ||
| assert_eq!(0, histogram_sum(&snapshot, GOAL_DURATION_SECONDS_METRIC)); | ||
|
|
||
| Ok(()) | ||
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn budget_limited_accounting_steers_active_turn_without_aborting() -> anyhow::Result<()> { | ||
| let (sess, tc, rx) = make_goal_session_and_context_with_rx().await; | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This only fires on the Session-owned create/replace path. thread_goal_set in app-server still creates/replaces goals by writing state_db directly and then only applies runtime effects, so creating a goal from the UI on a live thread won’t increment codex.goal.created.