From 8adc9385c9738be19d23d55c011ab754b761f13a Mon Sep 17 00:00:00 2001 From: Eric Traut Date: Thu, 30 Apr 2026 15:30:18 -0700 Subject: [PATCH 1/3] Track goal continuation activity separately --- codex-rs/core/src/goals.rs | 183 +++++++++++++++++++++-- codex-rs/core/src/session/mod.rs | 13 ++ codex-rs/core/src/state/turn.rs | 10 ++ codex-rs/core/src/stream_events_utils.rs | 14 +- codex-rs/core/src/tasks/mod.rs | 4 +- codex-rs/core/src/tools/registry.rs | 2 +- 6 files changed, 212 insertions(+), 14 deletions(-) diff --git a/codex-rs/core/src/goals.rs b/codex-rs/core/src/goals.rs index f570ebfda30f..950f9f72ee5d 100644 --- a/codex-rs/core/src/goals.rs +++ b/codex-rs/core/src/goals.rs @@ -13,8 +13,10 @@ use crate::tasks::RegularTask; use anyhow::Context; use codex_features::Feature; use codex_protocol::config_types::ModeKind; +use codex_protocol::items::TurnItem; use codex_protocol::models::ContentItem; use codex_protocol::models::ResponseInputItem; +use codex_protocol::models::ResponseItem; use codex_protocol::protocol::Event; use codex_protocol::protocol::EventMsg; use codex_protocol::protocol::ThreadGoal; @@ -90,7 +92,7 @@ pub(crate) enum GoalRuntimeEvent<'a> { TurnFinished { turn_context: &'a TurnContext, turn_completed: bool, - tool_calls: u64, + continuation_activity_count: u64, }, MaybeContinueIfIdle, TaskAborted { @@ -120,6 +122,30 @@ struct GoalContinuationCandidate { items: Vec, } +pub(crate) fn turn_item_counts_as_goal_continuation_activity(item: &TurnItem) -> bool { + // Keep this match exhaustive so new built-in turn item kinds force a + // review of whether they should keep goal auto-continuation alive. + match item { + TurnItem::UserMessage(_) => false, + TurnItem::HookPrompt(_) => false, + TurnItem::AgentMessage(_) => false, + TurnItem::Plan(_) => false, + TurnItem::Reasoning(_) => false, + TurnItem::WebSearch(_) => true, + TurnItem::ImageGeneration(_) => true, + TurnItem::ContextCompaction(_) => false, + } +} + +pub(crate) fn response_item_counts_as_goal_continuation_activity_without_turn_item( + item: &ResponseItem, +) -> bool { + matches!( + item, + ResponseItem::ToolSearchCall { execution, .. } if execution != "client" + ) +} + impl GoalRuntimeState { pub(crate) fn new() -> Self { Self { @@ -277,8 +303,8 @@ impl Session { /// suppresses that steering, external mutations account best-effort before /// changing state, interrupts pause active goals, resumes reactivate paused /// goals, explicit maybe-continue events start idle goal continuation turns, - /// and no-tool continuation turns suppress the next automatic continuation - /// until user/tool/external activity resets it. + /// and continuation turns with no counted autonomous activity suppress the + /// next automatic continuation until user/tool/external activity resets it. pub(crate) fn goal_runtime_apply<'a>( self: &'a Arc, event: GoalRuntimeEvent<'a>, @@ -312,10 +338,14 @@ impl Session { GoalRuntimeEvent::TurnFinished { turn_context, turn_completed, - tool_calls, + continuation_activity_count, } => Box::pin(async move { - self.finish_thread_goal_turn(turn_context, turn_completed, tool_calls) - .await; + self.finish_thread_goal_turn( + turn_context, + turn_completed, + continuation_activity_count, + ) + .await; Ok(()) }), GoalRuntimeEvent::MaybeContinueIfIdle => Box::pin(async move { @@ -757,7 +787,7 @@ impl Session { self: &Arc, turn_context: &TurnContext, turn_completed: bool, - turn_tool_calls: u64, + turn_continuation_activity_count: u64, ) { if turn_completed && let Err(err) = self @@ -770,7 +800,7 @@ impl Session { if self .take_thread_goal_continuation_turn(&turn_context.sub_id) .await - && turn_tool_calls == 0 + && turn_continuation_activity_count == 0 { self.goal_runtime .continuation_suppressed @@ -1261,7 +1291,7 @@ impl Session { .load(Ordering::SeqCst) { tracing::debug!( - "skipping active goal continuation because the last continuation made no tool calls" + "skipping active goal continuation because the last continuation made no counted autonomous activity" ); return None; } @@ -1513,12 +1543,28 @@ mod tests { use super::continuation_prompt; use super::escape_xml_text; use super::goal_token_delta_for_usage; + use super::response_item_counts_as_goal_continuation_activity_without_turn_item; use super::should_ignore_goal_for_mode; + use super::turn_item_counts_as_goal_continuation_activity; use codex_protocol::ThreadId; use codex_protocol::config_types::ModeKind; + use codex_protocol::items::AgentMessageContent; + use codex_protocol::items::AgentMessageItem; + use codex_protocol::items::ContextCompactionItem; + use codex_protocol::items::HookPromptFragment; + use codex_protocol::items::HookPromptItem; + use codex_protocol::items::ImageGenerationItem; + use codex_protocol::items::PlanItem; + use codex_protocol::items::ReasoningItem; + use codex_protocol::items::TurnItem; + use codex_protocol::items::UserMessageItem; + use codex_protocol::items::WebSearchItem; + use codex_protocol::models::ResponseItem; + use codex_protocol::models::WebSearchAction; use codex_protocol::protocol::ThreadGoal; use codex_protocol::protocol::ThreadGoalStatus; use codex_protocol::protocol::TokenUsage; + use codex_protocol::user_input::UserInput; use std::time::Duration; use std::time::Instant; @@ -1543,6 +1589,125 @@ mod tests { assert_eq!(580, goal_token_delta_for_usage(&usage)); } + #[test] + fn turn_item_goal_continuation_activity_matches_expected_variants() { + let cases = [ + ( + false, + TurnItem::UserMessage(UserMessageItem::new(&[UserInput::Text { + text: "hello".to_string(), + text_elements: Vec::new(), + }])), + ), + ( + false, + TurnItem::HookPrompt(HookPromptItem::from_fragments( + None, + vec![HookPromptFragment::from_single_hook("hook text", "hook-1")], + )), + ), + ( + false, + TurnItem::AgentMessage(AgentMessageItem::new(&[AgentMessageContent::Text { + text: "hello".to_string(), + }])), + ), + ( + false, + TurnItem::Plan(PlanItem { + id: "plan-1".to_string(), + text: "plan".to_string(), + }), + ), + ( + false, + TurnItem::Reasoning(ReasoningItem { + id: "reason-1".to_string(), + summary_text: vec!["thinking".to_string()], + raw_content: Vec::new(), + }), + ), + ( + true, + TurnItem::WebSearch(WebSearchItem { + id: "ws-1".to_string(), + query: "benchmark note structure".to_string(), + action: WebSearchAction::Search { + query: Some("benchmark note structure".to_string()), + queries: None, + }, + }), + ), + ( + true, + TurnItem::ImageGeneration(ImageGenerationItem { + id: "ig-1".to_string(), + status: "completed".to_string(), + revised_prompt: None, + result: String::new(), + saved_path: None, + }), + ), + ( + false, + TurnItem::ContextCompaction(ContextCompactionItem::new()), + ), + ]; + + for (expected, item) in cases { + assert_eq!( + expected, + turn_item_counts_as_goal_continuation_activity(&item), + "unexpected continuation-activity classification for {item:?}" + ); + } + } + + #[test] + fn raw_response_goal_continuation_activity_only_counts_server_tool_search() { + let cases = [ + ( + false, + ResponseItem::ToolSearchCall { + id: None, + call_id: Some("search-client".to_string()), + status: Some("completed".to_string()), + execution: "client".to_string(), + arguments: serde_json::json!({ "query": "calendar" }), + }, + ), + ( + true, + ResponseItem::ToolSearchCall { + id: None, + call_id: Some("search-server".to_string()), + status: Some("completed".to_string()), + execution: "server".to_string(), + arguments: serde_json::json!({ "query": "calendar" }), + }, + ), + ( + false, + ResponseItem::WebSearchCall { + id: Some("ws-1".to_string()), + status: Some("completed".to_string()), + action: Some(WebSearchAction::Search { + query: Some("calendar".to_string()), + queries: None, + }), + }, + ), + ]; + + for (expected, item) in cases { + assert_eq!( + expected, + response_item_counts_as_goal_continuation_activity_without_turn_item(&item), + "unexpected raw continuation-activity classification for {item:?}" + ); + } + } + #[test] fn wall_clock_accounting_advances_by_persisted_seconds() { let mut snapshot = super::GoalWallClockAccountingSnapshot::new(); diff --git a/codex-rs/core/src/session/mod.rs b/codex-rs/core/src/session/mod.rs index 8333fd2dd311..bf4725325ad9 100644 --- a/codex-rs/core/src/session/mod.rs +++ b/codex-rs/core/src/session/mod.rs @@ -33,6 +33,7 @@ use crate::default_skill_metadata_budget; use crate::environment_selection::selected_primary_environment; use crate::environment_selection::validate_environment_selections; use crate::exec_policy::ExecPolicyManager; +use crate::goals::turn_item_counts_as_goal_continuation_activity; use crate::installation_id::resolve_installation_id; use crate::parse_turn_item; use crate::path_utils::normalize_for_native_workdir; @@ -1653,6 +1654,10 @@ impl Session { turn_context: &TurnContext, item: TurnItem, ) { + if turn_item_counts_as_goal_continuation_activity(&item) { + self.record_continuation_activity_for_turn(&turn_context.sub_id) + .await; + } record_turn_ttfm_metric(turn_context, &item).await; self.send_event( turn_context, @@ -3085,6 +3090,14 @@ impl Session { turn_state.lock().await.has_memory_citation = true; } + pub(crate) async fn record_continuation_activity_for_turn(&self, sub_id: &str) { + let turn_state = self.turn_state_for_sub_id(sub_id).await; + let Some(turn_state) = turn_state else { + return; + }; + turn_state.lock().await.record_continuation_activity(); + } + async fn turn_state_for_sub_id( &self, sub_id: &str, diff --git a/codex-rs/core/src/state/turn.rs b/codex-rs/core/src/state/turn.rs index 0dd0c0937959..5585d30c2855 100644 --- a/codex-rs/core/src/state/turn.rs +++ b/codex-rs/core/src/state/turn.rs @@ -118,6 +118,7 @@ pub(crate) struct TurnState { granted_permissions: Option, strict_auto_review_enabled: bool, pub(crate) tool_calls: u64, + pub(crate) continuation_activity_count: u64, pub(crate) has_memory_citation: bool, pub(crate) token_usage_at_turn_start: TokenUsage, } @@ -129,6 +130,15 @@ pub(crate) struct PendingRequestPermissions { } impl TurnState { + pub(crate) fn record_tool_call(&mut self) { + self.tool_calls = self.tool_calls.saturating_add(1); + self.record_continuation_activity(); + } + + pub(crate) fn record_continuation_activity(&mut self) { + self.continuation_activity_count = self.continuation_activity_count.saturating_add(1); + } + pub(crate) fn insert_pending_approval( &mut self, key: String, diff --git a/codex-rs/core/src/stream_events_utils.rs b/codex-rs/core/src/stream_events_utils.rs index 5a31d180201a..c63e39305285 100644 --- a/codex-rs/core/src/stream_events_utils.rs +++ b/codex-rs/core/src/stream_events_utils.rs @@ -11,6 +11,7 @@ use tokio_util::sync::CancellationToken; use crate::context::ContextualUserFragment; use crate::context::ImageGenerationInstructions; use crate::function_tool::FunctionCallError; +use crate::goals::response_item_counts_as_goal_continuation_activity_without_turn_item; use crate::parse_turn_item; use crate::session::session::Session; use crate::session::turn_context::TurnContext; @@ -255,14 +256,21 @@ pub(crate) async fn handle_output_item_done( } // No tool call: convert messages/reasoning into turn items and mark them as complete. Ok(None) => { - if let Some(turn_item) = handle_non_tool_response_item( + let turn_item = handle_non_tool_response_item( ctx.sess.as_ref(), ctx.turn_context.as_ref(), &item, plan_mode, ) - .await - { + .await; + let counts_as_raw_continuation_activity = turn_item.is_none() + && response_item_counts_as_goal_continuation_activity_without_turn_item(&item); + if counts_as_raw_continuation_activity { + ctx.sess + .record_continuation_activity_for_turn(&ctx.turn_context.sub_id) + .await; + } + if let Some(turn_item) = turn_item { if previously_active_item.is_none() { let mut started_item = turn_item.clone(); if let TurnItem::ImageGeneration(item) = &mut started_item { diff --git a/codex-rs/core/src/tasks/mod.rs b/codex-rs/core/src/tasks/mod.rs index 91078c50ceb0..f50ab7959228 100644 --- a/codex-rs/core/src/tasks/mod.rs +++ b/codex-rs/core/src/tasks/mod.rs @@ -567,6 +567,7 @@ impl Session { let mut token_usage_at_turn_start = None; let mut turn_had_memory_citation = false; let mut turn_tool_calls = 0_u64; + let mut turn_continuation_activity_count = 0_u64; let mut records_turn_token_usage_on_span = false; let turn_state = { let mut active = self.active_turn.lock().await; @@ -590,6 +591,7 @@ impl Session { pending_input = ts.take_pending_input(); turn_had_memory_citation = ts.has_memory_citation; turn_tool_calls = ts.tool_calls; + turn_continuation_activity_count = ts.continuation_activity_count; token_usage_at_turn_start = Some(ts.token_usage_at_turn_start.clone()); } if !pending_input.is_empty() { @@ -737,7 +739,7 @@ impl Session { .goal_runtime_apply(GoalRuntimeEvent::TurnFinished { turn_context: turn_context.as_ref(), turn_completed: should_clear_active_turn, - tool_calls: turn_tool_calls, + continuation_activity_count: turn_continuation_activity_count, }) .await { diff --git a/codex-rs/core/src/tools/registry.rs b/codex-rs/core/src/tools/registry.rs index e1027c9fa907..c1bb65f3a7b5 100644 --- a/codex-rs/core/src/tools/registry.rs +++ b/codex-rs/core/src/tools/registry.rs @@ -309,7 +309,7 @@ impl ToolRegistry { let mut active = invocation.session.active_turn.lock().await; if let Some(active_turn) = active.as_mut() { let mut turn_state = active_turn.turn_state.lock().await; - turn_state.tool_calls = turn_state.tool_calls.saturating_add(1); + turn_state.record_tool_call(); } } From b1b716d9417a91f426bafc4c3a6936e1d3ff8eff Mon Sep 17 00:00:00 2001 From: Eric Traut Date: Thu, 30 Apr 2026 15:30:55 -0700 Subject: [PATCH 2/3] Clarify raw goal continuation activity fallback --- codex-rs/core/src/stream_events_utils.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/codex-rs/core/src/stream_events_utils.rs b/codex-rs/core/src/stream_events_utils.rs index c63e39305285..2e1839f48761 100644 --- a/codex-rs/core/src/stream_events_utils.rs +++ b/codex-rs/core/src/stream_events_utils.rs @@ -263,6 +263,9 @@ pub(crate) async fn handle_output_item_done( plan_mode, ) .await; + // Some built-in activity (currently server-executed `tool_search`) + // bypasses turn-item normalization, so preserve goal continuation + // progress for those response items here. let counts_as_raw_continuation_activity = turn_item.is_none() && response_item_counts_as_goal_continuation_activity_without_turn_item(&item); if counts_as_raw_continuation_activity { From f5a1455714169f17be3ec0b014d2e488da807bfe Mon Sep 17 00:00:00 2001 From: Eric Traut Date: Thu, 30 Apr 2026 16:01:39 -0700 Subject: [PATCH 3/3] Simplify goal continuation scheduling --- codex-rs/core/src/goals.rs | 223 +----------------- codex-rs/core/src/session/mod.rs | 13 - codex-rs/core/src/session/tests.rs | 136 ++++++++++- codex-rs/core/src/state/turn.rs | 10 - codex-rs/core/src/stream_events_utils.rs | 11 - codex-rs/core/src/tasks/mod.rs | 3 - codex-rs/core/src/tools/registry.rs | 2 +- codex-rs/core/templates/goals/continuation.md | 2 +- 8 files changed, 138 insertions(+), 262 deletions(-) diff --git a/codex-rs/core/src/goals.rs b/codex-rs/core/src/goals.rs index 950f9f72ee5d..f1805bb750fa 100644 --- a/codex-rs/core/src/goals.rs +++ b/codex-rs/core/src/goals.rs @@ -13,10 +13,8 @@ use crate::tasks::RegularTask; use anyhow::Context; use codex_features::Feature; use codex_protocol::config_types::ModeKind; -use codex_protocol::items::TurnItem; use codex_protocol::models::ContentItem; use codex_protocol::models::ResponseInputItem; -use codex_protocol::models::ResponseItem; use codex_protocol::protocol::Event; use codex_protocol::protocol::EventMsg; use codex_protocol::protocol::ThreadGoal; @@ -31,8 +29,6 @@ use codex_utils_template::Template; use futures::future::BoxFuture; use std::sync::Arc; use std::sync::LazyLock; -use std::sync::atomic::AtomicBool; -use std::sync::atomic::Ordering; use std::time::Duration; use std::time::Instant; use tokio::sync::Mutex; @@ -92,7 +88,6 @@ pub(crate) enum GoalRuntimeEvent<'a> { TurnFinished { turn_context: &'a TurnContext, turn_completed: bool, - continuation_activity_count: u64, }, MaybeContinueIfIdle, TaskAborted { @@ -114,7 +109,6 @@ pub(crate) struct GoalRuntimeState { accounting: Mutex, continuation_turn_id: Mutex>, pub(crate) continuation_lock: Semaphore, - pub(crate) continuation_suppressed: AtomicBool, } struct GoalContinuationCandidate { @@ -122,30 +116,6 @@ struct GoalContinuationCandidate { items: Vec, } -pub(crate) fn turn_item_counts_as_goal_continuation_activity(item: &TurnItem) -> bool { - // Keep this match exhaustive so new built-in turn item kinds force a - // review of whether they should keep goal auto-continuation alive. - match item { - TurnItem::UserMessage(_) => false, - TurnItem::HookPrompt(_) => false, - TurnItem::AgentMessage(_) => false, - TurnItem::Plan(_) => false, - TurnItem::Reasoning(_) => false, - TurnItem::WebSearch(_) => true, - TurnItem::ImageGeneration(_) => true, - TurnItem::ContextCompaction(_) => false, - } -} - -pub(crate) fn response_item_counts_as_goal_continuation_activity_without_turn_item( - item: &ResponseItem, -) -> bool { - matches!( - item, - ResponseItem::ToolSearchCall { execution, .. } if execution != "client" - ) -} - impl GoalRuntimeState { pub(crate) fn new() -> Self { Self { @@ -155,7 +125,6 @@ impl GoalRuntimeState { accounting: Mutex::new(GoalAccountingSnapshot::new()), continuation_turn_id: Mutex::new(None), continuation_lock: Semaphore::new(/*permits*/ 1), - continuation_suppressed: AtomicBool::new(false), } } } @@ -322,7 +291,6 @@ impl Session { turn_context, tool_name, } => Box::pin(async move { - self.reset_thread_goal_continuation_suppression(); if tool_name != codex_tools::UPDATE_GOAL_TOOL_NAME { self.account_thread_goal_progress(turn_context, BudgetLimitSteering::Allowed) .await?; @@ -330,7 +298,6 @@ impl Session { Ok(()) }), GoalRuntimeEvent::ToolCompletedGoal { turn_context } => Box::pin(async move { - self.reset_thread_goal_continuation_suppression(); self.account_thread_goal_progress(turn_context, BudgetLimitSteering::Suppressed) .await?; Ok(()) @@ -338,14 +305,9 @@ impl Session { GoalRuntimeEvent::TurnFinished { turn_context, turn_completed, - continuation_activity_count, } => Box::pin(async move { - self.finish_thread_goal_turn( - turn_context, - turn_completed, - continuation_activity_count, - ) - .await; + self.finish_thread_goal_turn(turn_context, turn_completed) + .await; Ok(()) }), GoalRuntimeEvent::MaybeContinueIfIdle => Box::pin(async move { @@ -361,7 +323,6 @@ impl Session { Ok(()) }), GoalRuntimeEvent::ExternalMutationStarting => Box::pin(async move { - self.reset_thread_goal_continuation_suppression(); if let Err(err) = self.account_thread_goal_before_external_mutation().await { tracing::warn!( "failed to account thread goal progress before external mutation: {err}" @@ -493,7 +454,6 @@ impl Session { let goal_status = goal.status; let goal_id = goal.goal_id.clone(); let goal = protocol_goal_from_state(goal); - self.reset_thread_goal_continuation_suppression(); *self.goal_runtime.budget_limit_reported_goal_id.lock().await = None; let newly_active_goal = goal_status == codex_state::ThreadGoalStatus::Active && (replacing_goal @@ -562,7 +522,6 @@ impl Session { let goal_id = goal.goal_id.clone(); let goal = protocol_goal_from_state(goal); - self.reset_thread_goal_continuation_suppression(); *self.goal_runtime.budget_limit_reported_goal_id.lock().await = None; let current_token_usage = self.total_token_usage().await.unwrap_or_default(); @@ -591,7 +550,6 @@ impl Session { ) { match status { codex_state::ThreadGoalStatus::Active => { - self.reset_thread_goal_continuation_suppression(); match self.state_db_for_thread_goals().await { Ok(Some(state_db)) => { match state_db.get_thread_goal(self.conversation_id).await { @@ -638,7 +596,6 @@ impl Session { } async fn clear_stopped_thread_goal_runtime_state(&self) { - self.reset_thread_goal_continuation_suppression(); *self.goal_runtime.budget_limit_reported_goal_id.lock().await = None; let mut accounting = self.goal_runtime.accounting.lock().await; if let Some(turn) = accounting.turn.as_mut() { @@ -693,16 +650,6 @@ impl Session { turn_context: &TurnContext, token_usage: TokenUsage, ) { - if self - .goal_runtime - .continuation_turn_id - .lock() - .await - .as_ref() - .is_none_or(|turn_id| turn_id != &turn_context.sub_id) - { - self.reset_thread_goal_continuation_suppression(); - } self.goal_runtime.accounting.lock().await.turn = Some(GoalTurnAccountingSnapshot::new( turn_context.sub_id.clone(), token_usage, @@ -753,12 +700,6 @@ impl Session { } } - fn reset_thread_goal_continuation_suppression(&self) { - self.goal_runtime - .continuation_suppressed - .store(false, Ordering::SeqCst); - } - async fn mark_thread_goal_continuation_turn_started(&self, turn_id: String) { *self.goal_runtime.continuation_turn_id.lock().await = Some(turn_id); } @@ -787,7 +728,6 @@ impl Session { self: &Arc, turn_context: &TurnContext, turn_completed: bool, - turn_continuation_activity_count: u64, ) { if turn_completed && let Err(err) = self @@ -797,15 +737,8 @@ impl Session { tracing::warn!("failed to account thread goal progress at turn end: {err}"); } - if self - .take_thread_goal_continuation_turn(&turn_context.sub_id) - .await - && turn_continuation_activity_count == 0 - { - self.goal_runtime - .continuation_suppressed - .store(true, Ordering::SeqCst); - } + self.take_thread_goal_continuation_turn(&turn_context.sub_id) + .await; if turn_completed { let mut accounting = self.goal_runtime.accounting.lock().await; if accounting @@ -1156,7 +1089,6 @@ impl Session { }; let goal_id = goal.goal_id.clone(); let goal = protocol_goal_from_state(goal); - self.reset_thread_goal_continuation_suppression(); *self.goal_runtime.budget_limit_reported_goal_id.lock().await = None; let active_turn_id = self .active_turn_context() @@ -1285,16 +1217,6 @@ impl Session { ); return None; } - if self - .goal_runtime - .continuation_suppressed - .load(Ordering::SeqCst) - { - tracing::debug!( - "skipping active goal continuation because the last continuation made no counted autonomous activity" - ); - return None; - } let state_db = match self.state_db_for_thread_goals().await { Ok(Some(state_db)) => state_db, Ok(None) => { @@ -1543,28 +1465,12 @@ mod tests { use super::continuation_prompt; use super::escape_xml_text; use super::goal_token_delta_for_usage; - use super::response_item_counts_as_goal_continuation_activity_without_turn_item; use super::should_ignore_goal_for_mode; - use super::turn_item_counts_as_goal_continuation_activity; use codex_protocol::ThreadId; use codex_protocol::config_types::ModeKind; - use codex_protocol::items::AgentMessageContent; - use codex_protocol::items::AgentMessageItem; - use codex_protocol::items::ContextCompactionItem; - use codex_protocol::items::HookPromptFragment; - use codex_protocol::items::HookPromptItem; - use codex_protocol::items::ImageGenerationItem; - use codex_protocol::items::PlanItem; - use codex_protocol::items::ReasoningItem; - use codex_protocol::items::TurnItem; - use codex_protocol::items::UserMessageItem; - use codex_protocol::items::WebSearchItem; - use codex_protocol::models::ResponseItem; - use codex_protocol::models::WebSearchAction; use codex_protocol::protocol::ThreadGoal; use codex_protocol::protocol::ThreadGoalStatus; use codex_protocol::protocol::TokenUsage; - use codex_protocol::user_input::UserInput; use std::time::Duration; use std::time::Instant; @@ -1589,125 +1495,6 @@ mod tests { assert_eq!(580, goal_token_delta_for_usage(&usage)); } - #[test] - fn turn_item_goal_continuation_activity_matches_expected_variants() { - let cases = [ - ( - false, - TurnItem::UserMessage(UserMessageItem::new(&[UserInput::Text { - text: "hello".to_string(), - text_elements: Vec::new(), - }])), - ), - ( - false, - TurnItem::HookPrompt(HookPromptItem::from_fragments( - None, - vec![HookPromptFragment::from_single_hook("hook text", "hook-1")], - )), - ), - ( - false, - TurnItem::AgentMessage(AgentMessageItem::new(&[AgentMessageContent::Text { - text: "hello".to_string(), - }])), - ), - ( - false, - TurnItem::Plan(PlanItem { - id: "plan-1".to_string(), - text: "plan".to_string(), - }), - ), - ( - false, - TurnItem::Reasoning(ReasoningItem { - id: "reason-1".to_string(), - summary_text: vec!["thinking".to_string()], - raw_content: Vec::new(), - }), - ), - ( - true, - TurnItem::WebSearch(WebSearchItem { - id: "ws-1".to_string(), - query: "benchmark note structure".to_string(), - action: WebSearchAction::Search { - query: Some("benchmark note structure".to_string()), - queries: None, - }, - }), - ), - ( - true, - TurnItem::ImageGeneration(ImageGenerationItem { - id: "ig-1".to_string(), - status: "completed".to_string(), - revised_prompt: None, - result: String::new(), - saved_path: None, - }), - ), - ( - false, - TurnItem::ContextCompaction(ContextCompactionItem::new()), - ), - ]; - - for (expected, item) in cases { - assert_eq!( - expected, - turn_item_counts_as_goal_continuation_activity(&item), - "unexpected continuation-activity classification for {item:?}" - ); - } - } - - #[test] - fn raw_response_goal_continuation_activity_only_counts_server_tool_search() { - let cases = [ - ( - false, - ResponseItem::ToolSearchCall { - id: None, - call_id: Some("search-client".to_string()), - status: Some("completed".to_string()), - execution: "client".to_string(), - arguments: serde_json::json!({ "query": "calendar" }), - }, - ), - ( - true, - ResponseItem::ToolSearchCall { - id: None, - call_id: Some("search-server".to_string()), - status: Some("completed".to_string()), - execution: "server".to_string(), - arguments: serde_json::json!({ "query": "calendar" }), - }, - ), - ( - false, - ResponseItem::WebSearchCall { - id: Some("ws-1".to_string()), - status: Some("completed".to_string()), - action: Some(WebSearchAction::Search { - query: Some("calendar".to_string()), - queries: None, - }), - }, - ), - ]; - - for (expected, item) in cases { - assert_eq!( - expected, - response_item_counts_as_goal_continuation_activity_without_turn_item(&item), - "unexpected raw continuation-activity classification for {item:?}" - ); - } - } - #[test] fn wall_clock_accounting_advances_by_persisted_seconds() { let mut snapshot = super::GoalWallClockAccountingSnapshot::new(); @@ -1743,7 +1530,7 @@ mod tests { assert!(prompt.contains("\nfinish the stack\n")); assert!(prompt.contains("Token budget: 10000")); assert!(prompt.contains("call update_goal with status \"complete\"")); - assert!(prompt.contains( + assert!(!prompt.contains( "explain the blocker or next required input to the user and wait for new input" )); assert!(!prompt.contains("budgetLimited")); diff --git a/codex-rs/core/src/session/mod.rs b/codex-rs/core/src/session/mod.rs index bf4725325ad9..8333fd2dd311 100644 --- a/codex-rs/core/src/session/mod.rs +++ b/codex-rs/core/src/session/mod.rs @@ -33,7 +33,6 @@ use crate::default_skill_metadata_budget; use crate::environment_selection::selected_primary_environment; use crate::environment_selection::validate_environment_selections; use crate::exec_policy::ExecPolicyManager; -use crate::goals::turn_item_counts_as_goal_continuation_activity; use crate::installation_id::resolve_installation_id; use crate::parse_turn_item; use crate::path_utils::normalize_for_native_workdir; @@ -1654,10 +1653,6 @@ impl Session { turn_context: &TurnContext, item: TurnItem, ) { - if turn_item_counts_as_goal_continuation_activity(&item) { - self.record_continuation_activity_for_turn(&turn_context.sub_id) - .await; - } record_turn_ttfm_metric(turn_context, &item).await; self.send_event( turn_context, @@ -3090,14 +3085,6 @@ impl Session { turn_state.lock().await.has_memory_citation = true; } - pub(crate) async fn record_continuation_activity_for_turn(&self, sub_id: &str) { - let turn_state = self.turn_state_for_sub_id(sub_id).await; - let Some(turn_state) = turn_state else { - return; - }; - turn_state.lock().await.record_continuation_activity(); - } - async fn turn_state_for_sub_id( &self, sub_id: &str, diff --git a/codex-rs/core/src/session/tests.rs b/codex-rs/core/src/session/tests.rs index 068e47bd7105..85403ffca54b 100644 --- a/codex-rs/core/src/session/tests.rs +++ b/codex-rs/core/src/session/tests.rs @@ -118,6 +118,8 @@ use codex_protocol::protocol::TurnCompleteEvent; use codex_protocol::protocol::TurnStartedEvent; use codex_protocol::protocol::UserMessageEvent; use codex_protocol::protocol::W3cTraceContext; +use codex_protocol::request_user_input::RequestUserInputAnswer; +use codex_protocol::request_user_input::RequestUserInputResponse; use core_test_support::PathBufExt; use core_test_support::PathExt; use core_test_support::context_snapshot; @@ -136,6 +138,7 @@ use core_test_support::test_codex::test_codex; use core_test_support::test_path_buf; use core_test_support::tracing::install_test_tracing; use core_test_support::wait_for_event; +use core_test_support::wait_for_event_match; use opentelemetry::trace::TraceContextExt; use opentelemetry::trace::TraceId; use opentelemetry_sdk::metrics::InMemoryMetricExporter; @@ -6941,7 +6944,7 @@ async fn interrupt_accounts_active_goal_before_pausing() -> anyhow::Result<()> { } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn active_goal_continuation_runs_to_completion_after_turn() -> anyhow::Result<()> { +async fn active_goal_continuation_runs_again_after_no_tool_turn() -> anyhow::Result<()> { let server = start_mock_server().await; let mut builder = test_codex().with_config(|config| { config @@ -6967,18 +6970,107 @@ async fn active_goal_continuation_runs_to_completion_after_turn() -> anyhow::Res ev_completed("resp-2"), ]), sse(vec![ - ev_response_created("resp-3"), + ev_assistant_message("msg-2", "I am still working on the benchmark note."), + ev_completed("resp-3"), + ]), + sse(vec![ + ev_response_created("resp-4"), ev_function_call( "call-complete-goal", "update_goal", r#"{"status":"complete"}"#, ), + ev_completed("resp-4"), + ]), + sse(vec![ + ev_assistant_message("msg-3", "Goal complete."), + ev_completed("resp-5"), + ]), + ], + ) + .await; + + test.codex + .submit(Op::UserInput { + environments: None, + items: vec![UserInput::Text { + text: "write a benchmark note".into(), + text_elements: Vec::new(), + }], + final_output_json_schema: None, + responsesapi_client_metadata: None, + }) + .await?; + + let mut completed_turns = 0; + tokio::time::timeout(std::time::Duration::from_secs(8), async { + loop { + let event = test.codex.next_event().await?; + if matches!(event.msg, EventMsg::TurnComplete(_)) { + completed_turns += 1; + if completed_turns == 3 { + return anyhow::Ok(()); + } + } + } + }) + .await??; + + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn pending_request_user_input_does_not_spawn_extra_goal_continuation() -> anyhow::Result<()> { + let server = start_mock_server().await; + let mut builder = test_codex().with_config(|config| { + config + .features + .enable(Feature::Goals) + .expect("goal mode should be enableable in tests"); + config + .features + .enable(Feature::DefaultModeRequestUserInput) + .expect("default-mode request_user_input should be enableable in tests"); + }); + let test = builder.build(&server).await?; + let responses = mount_sse_sequence( + &server, + vec![ + sse(vec![ + ev_response_created("resp-1"), + ev_function_call( + "call-create-goal", + "create_goal", + r#"{"objective":"write a benchmark note"}"#, + ), + ev_completed("resp-1"), + ]), + sse(vec![ + ev_assistant_message("msg-1", "Draft ready."), + ev_completed("resp-2"), + ]), + sse(vec![ + ev_response_created("resp-3"), + ev_function_call( + "call-ask-user", + "request_user_input", + r#"{"questions":[{"header":"Choice","id":"next_step","question":"Pick one","options":[{"label":"Outline","description":"Start with an outline."},{"label":"Draft","description":"Write a full draft."}]}]}"#, + ), ev_completed("resp-3"), ]), sse(vec![ - ev_assistant_message("msg-2", "Goal complete."), + ev_response_created("resp-4"), + ev_function_call( + "call-complete-goal", + "update_goal", + r#"{"status":"complete"}"#, + ), ev_completed("resp-4"), ]), + sse(vec![ + ev_assistant_message("msg-2", "Goal complete."), + ev_completed("resp-5"), + ]), ], ) .await; @@ -6995,13 +7087,45 @@ async fn active_goal_continuation_runs_to_completion_after_turn() -> anyhow::Res }) .await?; + let request_user_input_event = wait_for_event_match(&test.codex, |event| match event { + EventMsg::RequestUserInput(event) => Some(event.clone()), + _ => None, + }) + .await; + assert_eq!(3, responses.requests().len()); + assert!( + timeout(Duration::from_millis(200), test.codex.next_event()) + .await + .is_err(), + "waiting for request_user_input should keep the turn open without emitting more events" + ); + assert_eq!( + 3, + responses.requests().len(), + "waiting for request_user_input should not start another continuation request" + ); + + test.codex + .submit(Op::UserInputAnswer { + id: request_user_input_event.turn_id, + response: RequestUserInputResponse { + answers: std::collections::HashMap::from([( + "next_step".to_string(), + RequestUserInputAnswer { + answers: vec!["Outline".to_string()], + }, + )]), + }, + }) + .await?; + let mut completed_turns = 0; - tokio::time::timeout(std::time::Duration::from_secs(8), async { + timeout(Duration::from_secs(8), async { loop { let event = test.codex.next_event().await?; if matches!(event.msg, EventMsg::TurnComplete(_)) { completed_turns += 1; - if completed_turns == 2 { + if completed_turns == 1 { return anyhow::Ok(()); } } @@ -7009,6 +7133,8 @@ async fn active_goal_continuation_runs_to_completion_after_turn() -> anyhow::Res }) .await??; + assert_eq!(5, responses.requests().len()); + Ok(()) } diff --git a/codex-rs/core/src/state/turn.rs b/codex-rs/core/src/state/turn.rs index 5585d30c2855..0dd0c0937959 100644 --- a/codex-rs/core/src/state/turn.rs +++ b/codex-rs/core/src/state/turn.rs @@ -118,7 +118,6 @@ pub(crate) struct TurnState { granted_permissions: Option, strict_auto_review_enabled: bool, pub(crate) tool_calls: u64, - pub(crate) continuation_activity_count: u64, pub(crate) has_memory_citation: bool, pub(crate) token_usage_at_turn_start: TokenUsage, } @@ -130,15 +129,6 @@ pub(crate) struct PendingRequestPermissions { } impl TurnState { - pub(crate) fn record_tool_call(&mut self) { - self.tool_calls = self.tool_calls.saturating_add(1); - self.record_continuation_activity(); - } - - pub(crate) fn record_continuation_activity(&mut self) { - self.continuation_activity_count = self.continuation_activity_count.saturating_add(1); - } - pub(crate) fn insert_pending_approval( &mut self, key: String, diff --git a/codex-rs/core/src/stream_events_utils.rs b/codex-rs/core/src/stream_events_utils.rs index 2e1839f48761..8ae4374e7b89 100644 --- a/codex-rs/core/src/stream_events_utils.rs +++ b/codex-rs/core/src/stream_events_utils.rs @@ -11,7 +11,6 @@ use tokio_util::sync::CancellationToken; use crate::context::ContextualUserFragment; use crate::context::ImageGenerationInstructions; use crate::function_tool::FunctionCallError; -use crate::goals::response_item_counts_as_goal_continuation_activity_without_turn_item; use crate::parse_turn_item; use crate::session::session::Session; use crate::session::turn_context::TurnContext; @@ -263,16 +262,6 @@ pub(crate) async fn handle_output_item_done( plan_mode, ) .await; - // Some built-in activity (currently server-executed `tool_search`) - // bypasses turn-item normalization, so preserve goal continuation - // progress for those response items here. - let counts_as_raw_continuation_activity = turn_item.is_none() - && response_item_counts_as_goal_continuation_activity_without_turn_item(&item); - if counts_as_raw_continuation_activity { - ctx.sess - .record_continuation_activity_for_turn(&ctx.turn_context.sub_id) - .await; - } if let Some(turn_item) = turn_item { if previously_active_item.is_none() { let mut started_item = turn_item.clone(); diff --git a/codex-rs/core/src/tasks/mod.rs b/codex-rs/core/src/tasks/mod.rs index f50ab7959228..b2f9a045f05f 100644 --- a/codex-rs/core/src/tasks/mod.rs +++ b/codex-rs/core/src/tasks/mod.rs @@ -567,7 +567,6 @@ impl Session { let mut token_usage_at_turn_start = None; let mut turn_had_memory_citation = false; let mut turn_tool_calls = 0_u64; - let mut turn_continuation_activity_count = 0_u64; let mut records_turn_token_usage_on_span = false; let turn_state = { let mut active = self.active_turn.lock().await; @@ -591,7 +590,6 @@ impl Session { pending_input = ts.take_pending_input(); turn_had_memory_citation = ts.has_memory_citation; turn_tool_calls = ts.tool_calls; - turn_continuation_activity_count = ts.continuation_activity_count; token_usage_at_turn_start = Some(ts.token_usage_at_turn_start.clone()); } if !pending_input.is_empty() { @@ -739,7 +737,6 @@ impl Session { .goal_runtime_apply(GoalRuntimeEvent::TurnFinished { turn_context: turn_context.as_ref(), turn_completed: should_clear_active_turn, - continuation_activity_count: turn_continuation_activity_count, }) .await { diff --git a/codex-rs/core/src/tools/registry.rs b/codex-rs/core/src/tools/registry.rs index c1bb65f3a7b5..e1027c9fa907 100644 --- a/codex-rs/core/src/tools/registry.rs +++ b/codex-rs/core/src/tools/registry.rs @@ -309,7 +309,7 @@ impl ToolRegistry { let mut active = invocation.session.active_turn.lock().await; if let Some(active_turn) = active.as_mut() { let mut turn_state = active_turn.turn_state.lock().await; - turn_state.record_tool_call(); + turn_state.tool_calls = turn_state.tool_calls.saturating_add(1); } } diff --git a/codex-rs/core/templates/goals/continuation.md b/codex-rs/core/templates/goals/continuation.md index 634596c3d8b9..6b1cab1c3be8 100644 --- a/codex-rs/core/templates/goals/continuation.md +++ b/codex-rs/core/templates/goals/continuation.md @@ -25,4 +25,4 @@ Before deciding that the goal is achieved, perform a completion audit against th Do not rely on intent, partial progress, elapsed effort, memory of earlier work, or a plausible final answer as proof of completion. Only mark the goal achieved when the audit shows that the objective has actually been achieved and no required work remains. If any requirement is missing, incomplete, or unverified, keep working instead of marking the goal complete. If the objective is achieved, call update_goal with status "complete" so usage accounting is preserved. Report the final elapsed time, and if the achieved goal has a token budget, report the final consumed token budget to the user after update_goal succeeds. -If the goal has not been achieved and cannot continue productively, explain the blocker or next required input to the user and wait for new input. Do not call update_goal unless the goal is complete. Do not mark a goal complete merely because the budget is nearly exhausted or because you are stopping work. +Do not call update_goal unless the goal is complete. Do not mark a goal complete merely because the budget is nearly exhausted or because you are stopping work.