Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions codex-rs/core/src/goals.rs
Original file line number Diff line number Diff line change
Expand Up @@ -836,7 +836,7 @@ impl Session {
let active = self.active_turn.lock().await;
active
.as_ref()
.and_then(|active_turn| active_turn.tasks.values().next())
.and_then(|active_turn| active_turn.task.as_ref())
.map(|task| Arc::clone(&task.turn_context))
}

Expand Down Expand Up @@ -916,7 +916,7 @@ impl Session {
async fn clear_reserved_goal_continuation_turn(&self, turn_state: &Arc<Mutex<TurnState>>) {
let mut active_turn_guard = self.active_turn.lock().await;
if let Some(active_turn) = active_turn_guard.as_ref()
&& active_turn.tasks.is_empty()
&& active_turn.task.is_none()
&& Arc::ptr_eq(&active_turn.turn_state, turn_state)
{
*active_turn_guard = None;
Expand Down Expand Up @@ -1364,7 +1364,7 @@ impl Session {
let still_reserved = {
let active_turn = self.active_turn.lock().await;
active_turn.as_ref().is_some_and(|active_turn| {
active_turn.tasks.is_empty() && Arc::ptr_eq(&active_turn.turn_state, &turn_state)
active_turn.task.is_none() && Arc::ptr_eq(&active_turn.turn_state, &turn_state)
})
};
if !still_reserved {
Expand Down
5 changes: 3 additions & 2 deletions codex-rs/core/src/session/input_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,9 @@ impl InputQueue {
let active = active_turn.lock().await;
active.as_ref().and_then(|active_turn| {
active_turn
.tasks
.contains_key(sub_id)
.task
.as_ref()
.is_some_and(|task| task.turn_context.sub_id == sub_id)
.then(|| Arc::clone(&active_turn.turn_state))
})
}
Expand Down
21 changes: 10 additions & 11 deletions codex-rs/core/src/session/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1832,15 +1832,16 @@ impl Session {
let active = self.active_turn.lock().await;
active
.as_ref()
.and_then(|turn| turn.tasks.get(sub_id))
.and_then(|turn| turn.task.as_ref())
.filter(|task| task.turn_context.sub_id == sub_id)
.map(|task| Arc::clone(&task.turn_context))
}

async fn active_turn_context_and_cancellation_token(
&self,
) -> Option<(Arc<TurnContext>, CancellationToken)> {
let active = self.active_turn.lock().await;
let (_, task) = active.as_ref()?.tasks.first()?;
let task = active.as_ref()?.task.as_ref()?;
Some((
Arc::clone(&task.turn_context),
task.cancellation_token.child_token(),
Expand Down Expand Up @@ -3159,9 +3160,10 @@ impl Session {
return Err(SteerInputError::NoActiveTurn(input));
};

let Some((active_turn_id, _)) = active_turn.tasks.first() else {
let Some(active_task) = active_turn.task.as_ref() else {
return Err(SteerInputError::NoActiveTurn(input));
};
let active_turn_id = &active_task.turn_context.sub_id;

if let Some(expected_turn_id) = expected_turn_id
&& expected_turn_id != active_turn_id
Expand All @@ -3172,28 +3174,25 @@ impl Session {
});
}

match active_turn.tasks.first().map(|(_, task)| task.kind) {
Some(crate::state::TaskKind::Regular) => {}
Some(crate::state::TaskKind::Review) => {
match active_task.kind {
crate::state::TaskKind::Regular => {}
crate::state::TaskKind::Review => {
return Err(SteerInputError::ActiveTurnNotSteerable {
turn_kind: NonSteerableTurnKind::Review,
});
}
Some(crate::state::TaskKind::Compact) => {
crate::state::TaskKind::Compact => {
return Err(SteerInputError::ActiveTurnNotSteerable {
turn_kind: NonSteerableTurnKind::Compact,
});
}
None => return Err(SteerInputError::NoActiveTurn(input)),
}

if input.is_empty() {
return Err(SteerInputError::EmptyInput);
}

if let Some(responsesapi_client_metadata) = responsesapi_client_metadata
&& let Some((_, active_task)) = active_turn.tasks.first()
{
if let Some(responsesapi_client_metadata) = responsesapi_client_metadata {
active_task
.turn_context
.turn_metadata_state
Expand Down
31 changes: 2 additions & 29 deletions codex-rs/core/src/state/turn.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
//! Turn-scoped state and active turn metadata scaffolding.

use codex_sandboxing::policy_transforms::merge_permission_profiles;
use indexmap::IndexMap;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::Mutex;
Expand All @@ -28,7 +27,7 @@ use codex_protocol::protocol::TokenUsage;

/// Metadata about the currently running turn.
pub(crate) struct ActiveTurn {
pub(crate) tasks: IndexMap<String, RunningTask>,
pub(crate) task: Option<RunningTask>,
pub(crate) turn_state: Arc<Mutex<TurnState>>,
}

Expand Down Expand Up @@ -56,7 +55,7 @@ pub(crate) enum MailboxDeliveryPhase {
impl Default for ActiveTurn {
fn default() -> Self {
Self {
tasks: IndexMap::new(),
task: None,
turn_state: Arc::new(Mutex::new(TurnState::default())),
}
}
Expand All @@ -81,32 +80,6 @@ pub(crate) struct RunningTask {
pub(crate) _timer: Option<codex_otel::Timer>,
}

pub(crate) struct RemovedTask {
pub(crate) records_turn_token_usage_on_span: bool,
pub(crate) active_turn_is_empty: bool,
}

impl ActiveTurn {
pub(crate) fn add_task(&mut self, task: RunningTask) {
let sub_id = task.turn_context.sub_id.clone();
self.tasks.insert(sub_id, task);
}

pub(crate) fn remove_task(&mut self, sub_id: &str) -> Option<RemovedTask> {
let task = self.tasks.swap_remove(sub_id)?;
let records_turn_token_usage_on_span = task.task.records_turn_token_usage_on_span();
task.handle.detach();
Some(RemovedTask {
records_turn_token_usage_on_span,
active_turn_is_empty: self.tasks.is_empty(),
})
}

pub(crate) fn drain_tasks(&mut self) -> Vec<RunningTask> {
self.tasks.drain(..).map(|(_, task)| task).collect()
}
}

/// Mutable state for a single turn.
#[derive(Default)]
pub(crate) struct TurnState {
Expand Down
Loading
Loading