From 68e1c654ad92210482389a7eec8a6c206116d378 Mon Sep 17 00:00:00 2001 From: pakrym-oai Date: Fri, 22 May 2026 10:24:36 -0700 Subject: [PATCH 1/3] core: make active turn task singular --- codex-rs/core/src/goals.rs | 6 ++-- codex-rs/core/src/session/input_queue.rs | 5 ++-- codex-rs/core/src/session/mod.rs | 21 +++++++------- codex-rs/core/src/state/turn.rs | 24 +++++++++------- codex-rs/core/src/tasks/mod.rs | 32 +++++++++------------ codex-rs/core/src/tools/network_approval.rs | 4 +-- 6 files changed, 46 insertions(+), 46 deletions(-) diff --git a/codex-rs/core/src/goals.rs b/codex-rs/core/src/goals.rs index d40176bcb37..75b84093e70 100644 --- a/codex-rs/core/src/goals.rs +++ b/codex-rs/core/src/goals.rs @@ -836,7 +836,7 @@ impl Session { let active = self.active_turn.lock().await; active .as_ref() - .and_then(|active_turn| active_turn.tasks.values().next()) + .and_then(|active_turn| active_turn.task.as_ref()) .map(|task| Arc::clone(&task.turn_context)) } @@ -916,7 +916,7 @@ impl Session { async fn clear_reserved_goal_continuation_turn(&self, turn_state: &Arc>) { let mut active_turn_guard = self.active_turn.lock().await; if let Some(active_turn) = active_turn_guard.as_ref() - && active_turn.tasks.is_empty() + && active_turn.task.is_none() && Arc::ptr_eq(&active_turn.turn_state, turn_state) { *active_turn_guard = None; @@ -1364,7 +1364,7 @@ impl Session { let still_reserved = { let active_turn = self.active_turn.lock().await; active_turn.as_ref().is_some_and(|active_turn| { - active_turn.tasks.is_empty() && Arc::ptr_eq(&active_turn.turn_state, &turn_state) + active_turn.task.is_none() && Arc::ptr_eq(&active_turn.turn_state, &turn_state) }) }; if !still_reserved { diff --git a/codex-rs/core/src/session/input_queue.rs b/codex-rs/core/src/session/input_queue.rs index 5f92322c8d8..98e912c7e57 100644 --- a/codex-rs/core/src/session/input_queue.rs +++ b/codex-rs/core/src/session/input_queue.rs @@ -103,8 +103,9 @@ impl InputQueue { let active = active_turn.lock().await; active.as_ref().and_then(|active_turn| { active_turn - .tasks - .contains_key(sub_id) + .task + .as_ref() + .is_some_and(|task| task.turn_context.sub_id == sub_id) .then(|| Arc::clone(&active_turn.turn_state)) }) } diff --git a/codex-rs/core/src/session/mod.rs b/codex-rs/core/src/session/mod.rs index 675e794aee5..67cc5484fc3 100644 --- a/codex-rs/core/src/session/mod.rs +++ b/codex-rs/core/src/session/mod.rs @@ -1832,7 +1832,8 @@ impl Session { let active = self.active_turn.lock().await; active .as_ref() - .and_then(|turn| turn.tasks.get(sub_id)) + .and_then(|turn| turn.task.as_ref()) + .filter(|task| task.turn_context.sub_id == sub_id) .map(|task| Arc::clone(&task.turn_context)) } @@ -1840,7 +1841,7 @@ impl Session { &self, ) -> Option<(Arc, CancellationToken)> { let active = self.active_turn.lock().await; - let (_, task) = active.as_ref()?.tasks.first()?; + let task = active.as_ref()?.task.as_ref()?; Some(( Arc::clone(&task.turn_context), task.cancellation_token.child_token(), @@ -3159,9 +3160,10 @@ impl Session { return Err(SteerInputError::NoActiveTurn(input)); }; - let Some((active_turn_id, _)) = active_turn.tasks.first() else { + let Some(active_task) = active_turn.task.as_ref() else { return Err(SteerInputError::NoActiveTurn(input)); }; + let active_turn_id = &active_task.turn_context.sub_id; if let Some(expected_turn_id) = expected_turn_id && expected_turn_id != active_turn_id @@ -3172,28 +3174,25 @@ impl Session { }); } - match active_turn.tasks.first().map(|(_, task)| task.kind) { - Some(crate::state::TaskKind::Regular) => {} - Some(crate::state::TaskKind::Review) => { + match active_task.kind { + crate::state::TaskKind::Regular => {} + crate::state::TaskKind::Review => { return Err(SteerInputError::ActiveTurnNotSteerable { turn_kind: NonSteerableTurnKind::Review, }); } - Some(crate::state::TaskKind::Compact) => { + crate::state::TaskKind::Compact => { return Err(SteerInputError::ActiveTurnNotSteerable { turn_kind: NonSteerableTurnKind::Compact, }); } - None => return Err(SteerInputError::NoActiveTurn(input)), } if input.is_empty() { return Err(SteerInputError::EmptyInput); } - if let Some(responsesapi_client_metadata) = responsesapi_client_metadata - && let Some((_, active_task)) = active_turn.tasks.first() - { + if let Some(responsesapi_client_metadata) = responsesapi_client_metadata { active_task .turn_context .turn_metadata_state diff --git a/codex-rs/core/src/state/turn.rs b/codex-rs/core/src/state/turn.rs index 86438ad56a9..4665cbe4437 100644 --- a/codex-rs/core/src/state/turn.rs +++ b/codex-rs/core/src/state/turn.rs @@ -1,7 +1,6 @@ //! Turn-scoped state and active turn metadata scaffolding. use codex_sandboxing::policy_transforms::merge_permission_profiles; -use indexmap::IndexMap; use std::collections::HashMap; use std::sync::Arc; use tokio::sync::Mutex; @@ -28,7 +27,7 @@ use codex_protocol::protocol::TokenUsage; /// Metadata about the currently running turn. pub(crate) struct ActiveTurn { - pub(crate) tasks: IndexMap, + pub(crate) task: Option, pub(crate) turn_state: Arc>, } @@ -56,7 +55,7 @@ pub(crate) enum MailboxDeliveryPhase { impl Default for ActiveTurn { fn default() -> Self { Self { - tasks: IndexMap::new(), + task: None, turn_state: Arc::new(Mutex::new(TurnState::default())), } } @@ -83,27 +82,32 @@ pub(crate) struct RunningTask { pub(crate) struct RemovedTask { pub(crate) records_turn_token_usage_on_span: bool, - pub(crate) active_turn_is_empty: bool, } impl ActiveTurn { pub(crate) fn add_task(&mut self, task: RunningTask) { - let sub_id = task.turn_context.sub_id.clone(); - self.tasks.insert(sub_id, task); + assert!( + self.task.is_none(), + "active turn already has a running task" + ); + self.task = Some(task); } pub(crate) fn remove_task(&mut self, sub_id: &str) -> Option { - let task = self.tasks.swap_remove(sub_id)?; + let task = self.task.as_ref()?; + if task.turn_context.sub_id != sub_id { + return None; + } + let task = self.task.take()?; let records_turn_token_usage_on_span = task.task.records_turn_token_usage_on_span(); task.handle.detach(); Some(RemovedTask { records_turn_token_usage_on_span, - active_turn_is_empty: self.tasks.is_empty(), }) } - pub(crate) fn drain_tasks(&mut self) -> Vec { - self.tasks.drain(..).map(|(_, task)| task).collect() + pub(crate) fn take_task(&mut self) -> Option { + self.task.take() } } diff --git a/codex-rs/core/src/tasks/mod.rs b/codex-rs/core/src/tasks/mod.rs index 9cb6b5f0f20..f57a0279f28 100644 --- a/codex-rs/core/src/tasks/mod.rs +++ b/codex-rs/core/src/tasks/mod.rs @@ -354,7 +354,7 @@ impl Session { let turn_state = { let mut active = self.active_turn.lock().await; let turn = active.get_or_insert_with(ActiveTurn::default); - debug_assert!(turn.tasks.is_empty()); + debug_assert!(turn.task.is_none()); Arc::clone(&turn.turn_state) }; turn_state.lock().await.token_usage_at_turn_start = token_usage_at_turn_start.clone(); @@ -372,7 +372,7 @@ impl Session { let turn_extension_data = Arc::clone(&turn_context.extension_data); let mut active = self.active_turn.lock().await; let turn = active.get_or_insert_with(ActiveTurn::default); - debug_assert!(turn.tasks.is_empty()); + debug_assert!(turn.task.is_none()); let done_clone = Arc::clone(&done); let session_ctx = Arc::new(SessionTaskContext::new( Arc::clone(self), @@ -498,10 +498,10 @@ impl Session { let mut active_turn_to_clear = None; let mut turn_context = None; if let Some(mut active_turn) = self.take_active_turn().await { - let tasks = active_turn.drain_tasks(); - aborted_turn = !tasks.is_empty(); - turn_context = tasks.first().map(|task| Arc::clone(&task.turn_context)); - for task in tasks { + let task = active_turn.take_task(); + aborted_turn = task.is_some(); + turn_context = task.as_ref().map(|task| Arc::clone(&task.turn_context)); + if let Some(task) = task { self.handle_task_abort(task, reason.clone()).await; } if aborted_turn { @@ -541,7 +541,8 @@ impl Session { let mut active = self.active_turn.lock().await; if active .as_ref() - .is_some_and(|active_turn| active_turn.tasks.contains_key(turn_id)) + .and_then(|active_turn| active_turn.task.as_ref()) + .is_some_and(|task| task.turn_context.sub_id == turn_id) { active.take() } else { @@ -552,9 +553,9 @@ impl Session { return false; }; - let tasks = active_turn.drain_tasks(); - let turn_context = tasks.first().map(|task| Arc::clone(&task.turn_context)); - for task in tasks { + let task = active_turn.take_task(); + let turn_context = task.as_ref().map(|task| Arc::clone(&task.turn_context)); + if let Some(task) = task { self.handle_task_abort(task, reason.clone()).await; } if let Some(turn_context) = turn_context.as_deref() { @@ -601,13 +602,8 @@ impl Session { && let Some(removed_task) = at.remove_task(&turn_context.sub_id) { records_turn_token_usage_on_span = removed_task.records_turn_token_usage_on_span; - if removed_task.active_turn_is_empty { - should_clear_active_turn = true; - let turn_state = Arc::clone(&at.turn_state); - Some(turn_state) - } else { - None - } + should_clear_active_turn = true; + Some(Arc::clone(&at.turn_state)) } else { None } @@ -803,7 +799,7 @@ impl Session { let cleared_active_turn = { let mut active = self.active_turn.lock().await; if let Some(active_turn) = active.as_ref() - && active_turn.tasks.is_empty() + && active_turn.task.is_none() && turn_state .as_ref() .is_some_and(|turn_state| Arc::ptr_eq(&active_turn.turn_state, turn_state)) diff --git a/codex-rs/core/src/tools/network_approval.rs b/codex-rs/core/src/tools/network_approval.rs index 39441d96f5c..9cc808a5b69 100644 --- a/codex-rs/core/src/tools/network_approval.rs +++ b/codex-rs/core/src/tools/network_approval.rs @@ -375,8 +375,8 @@ impl NetworkApprovalService { let active_turn = session.active_turn.lock().await; active_turn .as_ref() - .and_then(|turn| turn.tasks.first()) - .map(|(_, task)| Arc::clone(&task.turn_context)) + .and_then(|turn| turn.task.as_ref()) + .map(|task| Arc::clone(&task.turn_context)) } fn format_network_target(protocol: &str, host: &str, port: u16) -> String { From 7532d69f88e26648ef0297d196e9c3b0e849e4f8 Mon Sep 17 00:00:00 2001 From: pakrym-oai Date: Fri, 22 May 2026 10:59:09 -0700 Subject: [PATCH 2/3] core: simplify active turn task slot --- codex-rs/core/src/state/turn.rs | 31 ----------- codex-rs/core/src/tasks/mod.rs | 86 ++++++++++++------------------ codex-rs/core/src/tasks/regular.rs | 4 -- 3 files changed, 34 insertions(+), 87 deletions(-) diff --git a/codex-rs/core/src/state/turn.rs b/codex-rs/core/src/state/turn.rs index 4665cbe4437..fe092351977 100644 --- a/codex-rs/core/src/state/turn.rs +++ b/codex-rs/core/src/state/turn.rs @@ -80,37 +80,6 @@ pub(crate) struct RunningTask { pub(crate) _timer: Option, } -pub(crate) struct RemovedTask { - pub(crate) records_turn_token_usage_on_span: bool, -} - -impl ActiveTurn { - pub(crate) fn add_task(&mut self, task: RunningTask) { - assert!( - self.task.is_none(), - "active turn already has a running task" - ); - self.task = Some(task); - } - - pub(crate) fn remove_task(&mut self, sub_id: &str) -> Option { - let task = self.task.as_ref()?; - if task.turn_context.sub_id != sub_id { - return None; - } - let task = self.task.take()?; - let records_turn_token_usage_on_span = task.task.records_turn_token_usage_on_span(); - task.handle.detach(); - Some(RemovedTask { - records_turn_token_usage_on_span, - }) - } - - pub(crate) fn take_task(&mut self) -> Option { - self.task.take() - } -} - /// Mutable state for a single turn. #[derive(Default)] pub(crate) struct TurnState { diff --git a/codex-rs/core/src/tasks/mod.rs b/codex-rs/core/src/tasks/mod.rs index f57a0279f28..ea44268bdf9 100644 --- a/codex-rs/core/src/tasks/mod.rs +++ b/codex-rs/core/src/tasks/mod.rs @@ -197,11 +197,6 @@ pub(crate) trait SessionTask: Send + Sync + 'static { /// Returns the tracing name for a spawned task span. fn span_name(&self) -> &'static str; - /// Returns whether turn token usage should be recorded on this task's turn span. - fn records_turn_token_usage_on_span(&self) -> bool { - false - } - /// Executes the task until completion or cancellation. /// /// Implementations typically stream protocol events using `session` and @@ -239,8 +234,6 @@ pub(crate) trait AnySessionTask: Send + Sync + 'static { fn span_name(&self) -> &'static str; - fn records_turn_token_usage_on_span(&self) -> bool; - fn run( self: Arc, session: Arc, @@ -268,10 +261,6 @@ where SessionTask::span_name(self) } - fn records_turn_token_usage_on_span(&self) -> bool { - SessionTask::records_turn_token_usage_on_span(self) - } - fn run( self: Arc, session: Arc, @@ -445,7 +434,7 @@ impl Session { turn_extension_data, _timer: timer, }; - turn.add_task(running_task); + turn.task = Some(running_task); } /// Starts a regular turn when the session is idle and pending work is waiting. @@ -498,7 +487,7 @@ impl Session { let mut active_turn_to_clear = None; let mut turn_context = None; if let Some(mut active_turn) = self.take_active_turn().await { - let task = active_turn.take_task(); + let task = active_turn.task.take(); aborted_turn = task.is_some(); turn_context = task.as_ref().map(|task| Arc::clone(&task.turn_context)); if let Some(task) = task { @@ -553,7 +542,7 @@ impl Session { return false; }; - let task = active_turn.take_task(); + let task = active_turn.task.take(); let turn_context = task.as_ref().map(|task| Arc::clone(&task.turn_context)); if let Some(task) = task { self.handle_task_abort(task, reason.clone()).await; @@ -591,23 +580,18 @@ impl Session { .cancel_git_enrichment_task(); let mut pending_input = Vec::::new(); - let mut should_clear_active_turn = false; let mut token_usage_at_turn_start = None; let mut turn_had_memory_citation = false; let mut turn_tool_calls = 0_u64; - let mut records_turn_token_usage_on_span = false; let turn_state = { let mut active = self.active_turn.lock().await; - if let Some(at) = active.as_mut() - && let Some(removed_task) = at.remove_task(&turn_context.sub_id) - { - records_turn_token_usage_on_span = removed_task.records_turn_token_usage_on_span; - should_clear_active_turn = true; - Some(Arc::clone(&at.turn_state)) - } else { - None - } + active.as_mut().and_then(|active_turn| { + let task = active_turn.task.take()?; + task.handle.detach(); + Some(Arc::clone(&active_turn.turn_state)) + }) }; + let should_clear_active_turn = turn_state.is_some(); if let Some(turn_state) = turn_state.as_ref() { pending_input = self .input_queue @@ -694,33 +678,31 @@ impl Session { - token_usage_at_turn_start.total_tokens) .max(0), }; - if records_turn_token_usage_on_span { - let current_span = Span::current(); - current_span.record( - "codex.turn.token_usage.input_tokens", - turn_token_usage.input_tokens, - ); - current_span.record( - "codex.turn.token_usage.cached_input_tokens", - turn_token_usage.cached_input(), - ); - current_span.record( - "codex.turn.token_usage.non_cached_input_tokens", - turn_token_usage.non_cached_input(), - ); - current_span.record( - "codex.turn.token_usage.output_tokens", - turn_token_usage.output_tokens, - ); - current_span.record( - "codex.turn.token_usage.reasoning_output_tokens", - turn_token_usage.reasoning_output_tokens, - ); - current_span.record( - "codex.turn.token_usage.total_tokens", - turn_token_usage.total_tokens, - ); - } + let current_span = Span::current(); + current_span.record( + "codex.turn.token_usage.input_tokens", + turn_token_usage.input_tokens, + ); + current_span.record( + "codex.turn.token_usage.cached_input_tokens", + turn_token_usage.cached_input(), + ); + current_span.record( + "codex.turn.token_usage.non_cached_input_tokens", + turn_token_usage.non_cached_input(), + ); + current_span.record( + "codex.turn.token_usage.output_tokens", + turn_token_usage.output_tokens, + ); + current_span.record( + "codex.turn.token_usage.reasoning_output_tokens", + turn_token_usage.reasoning_output_tokens, + ); + current_span.record( + "codex.turn.token_usage.total_tokens", + turn_token_usage.total_tokens, + ); self.services .analytics_events_client .track_turn_token_usage(TurnTokenUsageFact { diff --git a/codex-rs/core/src/tasks/regular.rs b/codex-rs/core/src/tasks/regular.rs index 50414df2787..8e8e1b99533 100644 --- a/codex-rs/core/src/tasks/regular.rs +++ b/codex-rs/core/src/tasks/regular.rs @@ -33,10 +33,6 @@ impl SessionTask for RegularTask { "session_task.turn" } - fn records_turn_token_usage_on_span(&self) -> bool { - true - } - async fn run( self: Arc, session: Arc, From 88e728326f9cf4557336f324e008fe358df1a3d2 Mon Sep 17 00:00:00 2001 From: pakrym-oai Date: Fri, 22 May 2026 11:18:49 -0700 Subject: [PATCH 3/3] core: return when finished task is no longer active --- codex-rs/core/src/tasks/mod.rs | 78 ++++++++++++++++------------------ 1 file changed, 36 insertions(+), 42 deletions(-) diff --git a/codex-rs/core/src/tasks/mod.rs b/codex-rs/core/src/tasks/mod.rs index ea44268bdf9..adad56b3cab 100644 --- a/codex-rs/core/src/tasks/mod.rs +++ b/codex-rs/core/src/tasks/mod.rs @@ -579,10 +579,6 @@ impl Session { .turn_metadata_state .cancel_git_enrichment_task(); - let mut pending_input = Vec::::new(); - let mut token_usage_at_turn_start = None; - let mut turn_had_memory_citation = false; - let mut turn_tool_calls = 0_u64; let turn_state = { let mut active = self.active_turn.lock().await; active.as_mut().and_then(|active_turn| { @@ -591,17 +587,21 @@ impl Session { Some(Arc::clone(&active_turn.turn_state)) }) }; - let should_clear_active_turn = turn_state.is_some(); - if let Some(turn_state) = turn_state.as_ref() { - pending_input = self - .input_queue - .take_pending_input_for_turn_state(turn_state.as_ref()) - .await; + let Some(turn_state) = turn_state else { + return; + }; + let pending_input = self + .input_queue + .take_pending_input_for_turn_state(turn_state.as_ref()) + .await; + let (turn_had_memory_citation, turn_tool_calls, token_usage_at_turn_start) = { let ts = turn_state.lock().await; - turn_had_memory_citation = ts.has_memory_citation; - turn_tool_calls = ts.tool_calls; - token_usage_at_turn_start = Some(ts.token_usage_at_turn_start.clone()); - } + ( + ts.has_memory_citation, + ts.tool_calls, + ts.token_usage_at_turn_start.clone(), + ) + }; if !pending_input.is_empty() { for pending_input_item in pending_input { let hook_outcome = @@ -625,7 +625,7 @@ impl Session { } } // Emit token usage metrics. - if let Some(token_usage_at_turn_start) = token_usage_at_turn_start { + { // TODO(jif): drop this let tmp_mem = ( "tmp_mem_enabled", @@ -750,14 +750,12 @@ impl Session { .turn_timing_state .time_to_first_token_ms() .await; - if should_clear_active_turn { - self.emit_turn_stop_lifecycle(turn_context.extension_data.as_ref()) - .await; - } + self.emit_turn_stop_lifecycle(turn_context.extension_data.as_ref()) + .await; if let Err(err) = self .goal_runtime_apply(GoalRuntimeEvent::TurnFinished { turn_context: turn_context.as_ref(), - turn_completed: should_clear_active_turn, + turn_completed: true, }) .await { @@ -777,30 +775,26 @@ impl Session { .await .clear_turn(&turn_context.sub_id); - if should_clear_active_turn { - let cleared_active_turn = { - let mut active = self.active_turn.lock().await; - if let Some(active_turn) = active.as_ref() - && active_turn.task.is_none() - && turn_state - .as_ref() - .is_some_and(|turn_state| Arc::ptr_eq(&active_turn.turn_state, turn_state)) - { - *active = None; - true - } else { - false - } - }; - if !cleared_active_turn { - return; - } - if let Err(err) = self - .goal_runtime_apply(GoalRuntimeEvent::MaybeContinueIfIdle) - .await + let cleared_active_turn = { + let mut active = self.active_turn.lock().await; + if let Some(active_turn) = active.as_ref() + && active_turn.task.is_none() + && Arc::ptr_eq(&active_turn.turn_state, &turn_state) { - warn!("failed to apply goal runtime maybe-continue event: {err}"); + *active = None; + true + } else { + false } + }; + if !cleared_active_turn { + return; + } + if let Err(err) = self + .goal_runtime_apply(GoalRuntimeEvent::MaybeContinueIfIdle) + .await + { + warn!("failed to apply goal runtime maybe-continue event: {err}"); } }