From db943a6356bdb35ebed717dac900024d18701aa9 Mon Sep 17 00:00:00 2001 From: jif-oai Date: Mon, 18 May 2026 14:09:32 +0200 Subject: [PATCH] chore: prep for goal DB --- .../thread_goal_processor.rs | 7 + .../request_processors/thread_lifecycle.rs | 2 +- .../tests/suite/v2/thread_resume.rs | 3 + codex-rs/core/src/goals.rs | 47 ++++- codex-rs/core/src/session/tests.rs | 10 ++ codex-rs/core/src/thread_manager_tests.rs | 2 + codex-rs/state/src/lib.rs | 1 + codex-rs/state/src/runtime.rs | 8 +- codex-rs/state/src/runtime/goals.rs | 160 ++++++++++++++---- codex-rs/state/src/runtime/threads.rs | 6 +- 10 files changed, 203 insertions(+), 43 deletions(-) diff --git a/codex-rs/app-server/src/request_processors/thread_goal_processor.rs b/codex-rs/app-server/src/request_processors/thread_goal_processor.rs index 57d257559020..a7c9a2c20c57 100644 --- a/codex-rs/app-server/src/request_processors/thread_goal_processor.rs +++ b/codex-rs/app-server/src/request_processors/thread_goal_processor.rs @@ -150,12 +150,14 @@ impl ThreadGoalRequestProcessor { let (goal, previous_status) = (if let Some(objective) = objective { let existing_goal = state_db + .thread_goals() .get_thread_goal(thread_id) .await .map_err(|err| invalid_request(err.to_string()))?; if let Some(goal) = existing_goal.as_ref() { let previous_status = ExternalGoalPreviousStatus::from(goal); state_db + .thread_goals() .update_thread_goal( thread_id, codex_state::ThreadGoalUpdate { @@ -177,6 +179,7 @@ impl ThreadGoalRequestProcessor { } else { let previous_status = ExternalGoalPreviousStatus::NewGoal; state_db + .thread_goals() .replace_thread_goal( thread_id, objective, @@ -188,6 +191,7 @@ impl ThreadGoalRequestProcessor { } } else { let existing_goal = state_db + .thread_goals() .get_thread_goal(thread_id) .await .map_err(|err| invalid_request(err.to_string()))?; @@ -198,6 +202,7 @@ impl ThreadGoalRequestProcessor { }; let previous_status = ExternalGoalPreviousStatus::from(&existing_goal); state_db + .thread_goals() .update_thread_goal( thread_id, codex_state::ThreadGoalUpdate { @@ -246,6 +251,7 @@ impl ThreadGoalRequestProcessor { let thread_id = parse_thread_id_for_request(params.thread_id.as_str())?; let state_db = self.state_db_for_materialized_thread(thread_id).await?; let goal = state_db + .thread_goals() .get_thread_goal(thread_id) .await .map_err(|err| internal_error(format!("failed to read thread goal: {err}")))? @@ -303,6 +309,7 @@ impl ThreadGoalRequestProcessor { thread_state.listener_command_tx() }; let cleared = state_db + .thread_goals() .delete_thread_goal(thread_id) .await .map_err(|err| internal_error(format!("failed to clear thread goal: {err}")))?; diff --git a/codex-rs/app-server/src/request_processors/thread_lifecycle.rs b/codex-rs/app-server/src/request_processors/thread_lifecycle.rs index 4781df83502c..c4d5fa6c96a4 100644 --- a/codex-rs/app-server/src/request_processors/thread_lifecycle.rs +++ b/codex-rs/app-server/src/request_processors/thread_lifecycle.rs @@ -676,7 +676,7 @@ pub(super) async fn send_thread_goal_snapshot_notification( thread_id: ThreadId, state_db: &StateDbHandle, ) { - match state_db.get_thread_goal(thread_id).await { + match state_db.thread_goals().get_thread_goal(thread_id).await { Ok(Some(goal)) => { outgoing .send_server_notification(ServerNotification::ThreadGoalUpdated( diff --git a/codex-rs/app-server/tests/suite/v2/thread_resume.rs b/codex-rs/app-server/tests/suite/v2/thread_resume.rs index ff2ccec49cca..852b86e5621d 100644 --- a/codex-rs/app-server/tests/suite/v2/thread_resume.rs +++ b/codex-rs/app-server/tests/suite/v2/thread_resume.rs @@ -943,10 +943,12 @@ async fn thread_goal_set_edits_objective_without_resetting_usage() -> Result<()> StateRuntime::init(codex_home.path().to_path_buf(), "mock_provider".into()).await?; let thread_id = ThreadId::from_string(&thread_id)?; let persisted_goal = state_db + .thread_goals() .get_thread_goal(thread_id) .await? .expect("goal should exist"); state_db + .thread_goals() .account_thread_goal_usage( thread_id, /*time_delta_seconds*/ 12, @@ -974,6 +976,7 @@ async fn thread_goal_set_edits_objective_without_resetting_usage() -> Result<()> .await??; let edit: ThreadGoalSetResponse = to_response(edit_resp)?; let updated_goal = state_db + .thread_goals() .get_thread_goal(thread_id) .await? .expect("goal should still exist"); diff --git a/codex-rs/core/src/goals.rs b/codex-rs/core/src/goals.rs index 4a7a9a3d0b83..ce4c0a809d90 100644 --- a/codex-rs/core/src/goals.rs +++ b/codex-rs/core/src/goals.rs @@ -422,6 +422,7 @@ impl Session { let state_db = self.require_state_db_for_thread_goals().await?; state_db + .thread_goals() .get_thread_goal(self.conversation_id) .await .map(|goal| goal.map(protocol_goal_from_state)) @@ -459,10 +460,14 @@ impl Session { let mut replacing_goal = false; let previous_status; let goal = if let Some(objective) = objective.as_deref() { - let existing_goal = state_db.get_thread_goal(self.conversation_id).await?; + let existing_goal = state_db + .thread_goals() + .get_thread_goal(self.conversation_id) + .await?; previous_status = existing_goal.as_ref().map(|goal| goal.status); if let Some(existing_goal) = existing_goal.as_ref() { state_db + .thread_goals() .update_thread_goal( self.conversation_id, codex_state::ThreadGoalUpdate { @@ -482,6 +487,7 @@ impl Session { } else { replacing_goal = true; state_db + .thread_goals() .replace_thread_goal( self.conversation_id, objective, @@ -493,11 +499,15 @@ impl Session { .await? } } else { - let existing_goal = state_db.get_thread_goal(self.conversation_id).await?; + let existing_goal = state_db + .thread_goals() + .get_thread_goal(self.conversation_id) + .await?; previous_status = existing_goal.as_ref().map(|goal| goal.status); let expected_goal_id = existing_goal.map(|goal| goal.goal_id); let status = status.map(state_goal_status_from_protocol); state_db + .thread_goals() .update_thread_goal( self.conversation_id, codex_state::ThreadGoalUpdate { @@ -581,6 +591,7 @@ impl Session { ) .await?; let goal = state_db + .thread_goals() .insert_thread_goal( self.conversation_id, objective, @@ -760,7 +771,10 @@ impl Session { state_db: &StateDbHandle, expected_goal_id: Option<&str>, ) -> anyhow::Result> { - let goal = state_db.get_thread_goal(self.conversation_id).await?; + let goal = state_db + .thread_goals() + .get_thread_goal(self.conversation_id) + .await?; Ok(goal.and_then(|goal| { expected_goal_id .is_none_or(|expected_goal_id| goal.goal_id == expected_goal_id) @@ -801,7 +815,11 @@ impl Session { return; } }; - match state_db.get_thread_goal(self.conversation_id).await { + match state_db + .thread_goals() + .get_thread_goal(self.conversation_id) + .await + { Ok(Some(goal)) if matches!( goal.status, @@ -963,6 +981,7 @@ impl Session { .current_goal_status_for_metrics(&state_db, expected_goal_id.as_deref()) .await?; let outcome = state_db + .thread_goals() .account_thread_goal_usage( self.conversation_id, time_delta_seconds, @@ -1085,6 +1104,7 @@ impl Session { .await?; match state_db + .thread_goals() .account_thread_goal_usage( self.conversation_id, time_delta_seconds, @@ -1147,6 +1167,7 @@ impl Session { ) .await?; let Some(goal) = state_db + .thread_goals() .pause_active_thread_goal(self.conversation_id) .await? else { @@ -1192,7 +1213,11 @@ impl Session { let Some(state_db) = self.state_db_for_thread_goals().await? else { return Ok(()); }; - let Some(goal) = state_db.get_thread_goal(self.conversation_id).await? else { + let Some(goal) = state_db + .thread_goals() + .get_thread_goal(self.conversation_id) + .await? + else { self.clear_stopped_thread_goal_runtime_state().await; return Ok(()); }; @@ -1237,7 +1262,11 @@ impl Session { Arc::clone(&active_turn.turn_state) }; let goal_is_current = match self.state_db_for_thread_goals().await { - Ok(Some(state_db)) => match state_db.get_thread_goal(self.conversation_id).await { + Ok(Some(state_db)) => match state_db + .thread_goals() + .get_thread_goal(self.conversation_id) + .await + { Ok(Some(goal)) if goal.goal_id == candidate.goal_id && goal.status == codex_state::ThreadGoalStatus::Active => @@ -1333,7 +1362,11 @@ impl Session { return None; } }; - let goal = match state_db.get_thread_goal(self.conversation_id).await { + let goal = match state_db + .thread_goals() + .get_thread_goal(self.conversation_id) + .await + { Ok(Some(goal)) => goal, Ok(None) => { tracing::debug!("skipping active goal continuation because no goal is set"); diff --git a/codex-rs/core/src/session/tests.rs b/codex-rs/core/src/session/tests.rs index 511dbfeabbe1..b0bc6330d4e7 100644 --- a/codex-rs/core/src/session/tests.rs +++ b/codex-rs/core/src/session/tests.rs @@ -8592,6 +8592,7 @@ async fn budget_limited_accounting_steers_active_turn_without_aborting() -> anyh let state_db = goal_test_state_db(sess.as_ref()).await?; let goal = state_db + .thread_goals() .get_thread_goal(sess.conversation_id) .await? .expect("goal should remain persisted after accounting"); @@ -8615,6 +8616,7 @@ async fn budget_limited_accounting_steers_active_turn_without_aborting() -> anyh .await?; let goal = state_db + .thread_goals() .get_thread_goal(sess.conversation_id) .await? .expect("goal should remain persisted after follow-up accounting"); @@ -8654,6 +8656,7 @@ async fn external_goal_mutation_accounts_active_turn_before_status_change() -> a let state_db = goal_test_state_db(sess.as_ref()).await?; let goal = state_db + .thread_goals() .get_thread_goal(sess.conversation_id) .await? .expect("goal should remain persisted"); @@ -8662,6 +8665,7 @@ async fn external_goal_mutation_accounts_active_turn_before_status_change() -> a let previous_goal = goal.clone(); let goal_id = goal.goal_id.clone(); let updated_goal = state_db + .thread_goals() .update_thread_goal( sess.conversation_id, codex_state::ThreadGoalUpdate { @@ -8683,6 +8687,7 @@ async fn external_goal_mutation_accounts_active_turn_before_status_change() -> a assert!(sess.active_turn.lock().await.is_some()); let goal = state_db + .thread_goals() .get_thread_goal(sess.conversation_id) .await? .expect("goal should remain persisted"); @@ -8709,6 +8714,7 @@ async fn external_objective_change_steers_active_turn() -> anyhow::Result<()> { let state_db = goal_test_state_db(sess.as_ref()).await?; let old_goal = state_db + .thread_goals() .replace_thread_goal( sess.conversation_id, "Keep improving the benchmark", @@ -8717,6 +8723,7 @@ async fn external_objective_change_steers_active_turn() -> anyhow::Result<()> { ) .await?; let new_goal = state_db + .thread_goals() .replace_thread_goal( sess.conversation_id, "Write a concise benchmark summary", @@ -8774,6 +8781,7 @@ async fn external_active_goal_set_marks_current_turn_for_accounting() -> anyhow: let state_db = goal_test_state_db(sess.as_ref()).await?; let goal = state_db + .thread_goals() .replace_thread_goal( sess.conversation_id, "Keep improving the benchmark", @@ -8807,6 +8815,7 @@ async fn external_active_goal_set_marks_current_turn_for_accounting() -> anyhow: .await?; let goal = state_db + .thread_goals() .get_thread_goal(sess.conversation_id) .await? .expect("goal should remain persisted"); @@ -8905,6 +8914,7 @@ async fn completed_goal_accounts_current_turn_tokens_before_tool_response() -> a ) .await?; let persisted_goal = state_db + .thread_goals() .get_thread_goal(test.session_configured.thread_id) .await? .expect("goal should be persisted"); diff --git a/codex-rs/core/src/thread_manager_tests.rs b/codex-rs/core/src/thread_manager_tests.rs index 956486f18204..43ec706f1a6a 100644 --- a/codex-rs/core/src/thread_manager_tests.rs +++ b/codex-rs/core/src/thread_manager_tests.rs @@ -1561,6 +1561,7 @@ async fn resumed_thread_keeps_paused_goal_paused() -> anyhow::Result<()> { .state_db() .expect("source thread should have a state db"); state_db + .thread_goals() .replace_thread_goal( source.thread_id, "Keep working until the task is done", @@ -1581,6 +1582,7 @@ async fn resumed_thread_keeps_paused_goal_paused() -> anyhow::Result<()> { .await .expect("resume source thread"); let goal = state_db + .thread_goals() .get_thread_goal(resumed.thread_id) .await? .expect("goal should still exist after resume"); diff --git a/codex-rs/state/src/lib.rs b/codex-rs/state/src/lib.rs index 43c02faae281..aeb8b2d13b1c 100644 --- a/codex-rs/state/src/lib.rs +++ b/codex-rs/state/src/lib.rs @@ -48,6 +48,7 @@ pub use model::ThreadGoalStatus; pub use model::ThreadMetadata; pub use model::ThreadMetadataBuilder; pub use model::ThreadsPage; +pub use runtime::GoalStore; pub use runtime::RemoteControlEnrollmentRecord; pub use runtime::ThreadFilterOptions; pub use runtime::ThreadGoalAccountingMode; diff --git a/codex-rs/state/src/runtime.rs b/codex-rs/state/src/runtime.rs index d6ed7bfcc0ba..e95cc7cc8ac0 100644 --- a/codex-rs/state/src/runtime.rs +++ b/codex-rs/state/src/runtime.rs @@ -18,7 +18,6 @@ use crate::apply_rollout_item; use crate::migrations::runtime_logs_migrator; use crate::migrations::runtime_state_migrator; use crate::model::AgentJobRow; -use crate::model::ThreadGoalRow; use crate::model::ThreadRow; use crate::model::anchor_from_item; use crate::model::datetime_to_epoch_millis; @@ -65,6 +64,7 @@ mod remote_control; mod test_support; mod threads; +pub use goals::GoalStore; pub use goals::ThreadGoalAccountingMode; pub use goals::ThreadGoalAccountingOutcome; pub use goals::ThreadGoalUpdate; @@ -86,6 +86,7 @@ pub struct StateRuntime { default_provider: String, pool: Arc, logs_pool: Arc, + thread_goals: GoalStore, thread_updated_at_millis: Arc, } @@ -164,6 +165,7 @@ impl StateRuntime { let thread_updated_at_millis = thread_updated_at_millis_result?; let thread_updated_at_millis = thread_updated_at_millis.unwrap_or(0); let runtime = Arc::new(Self { + thread_goals: GoalStore::new(Arc::clone(&pool)), pool, logs_pool, codex_home, @@ -183,6 +185,10 @@ impl StateRuntime { pub fn codex_home(&self) -> &Path { self.codex_home.as_path() } + + pub fn thread_goals(&self) -> &GoalStore { + &self.thread_goals + } } fn base_sqlite_options(path: &Path) -> SqliteConnectOptions { diff --git a/codex-rs/state/src/runtime/goals.rs b/codex-rs/state/src/runtime/goals.rs index 63dc63557cfb..b43e8f69d502 100644 --- a/codex-rs/state/src/runtime/goals.rs +++ b/codex-rs/state/src/runtime/goals.rs @@ -1,6 +1,18 @@ use super::*; +use crate::model::ThreadGoalRow; use uuid::Uuid; +#[derive(Clone)] +pub struct GoalStore { + pool: Arc, +} + +impl GoalStore { + pub(crate) fn new(pool: Arc) -> Self { + Self { pool } + } +} + pub struct ThreadGoalUpdate { pub objective: Option, pub status: Option, @@ -21,7 +33,7 @@ pub enum ThreadGoalAccountingMode { ActiveOrStopped, } -impl StateRuntime { +impl GoalStore { pub async fn get_thread_goal( &self, thread_id: ThreadId, @@ -308,29 +320,6 @@ WHERE thread_id = ? self.get_thread_goal(thread_id).await } - async fn set_thread_preview_if_empty( - &self, - thread_id: ThreadId, - preview: &str, - ) -> anyhow::Result<()> { - let preview = preview.trim(); - if preview.is_empty() { - return Ok(()); - } - sqlx::query( - r#" -UPDATE threads -SET preview = ? -WHERE id = ? AND preview = '' - "#, - ) - .bind(preview) - .bind(thread_id.to_string()) - .execute(self.pool.as_ref()) - .await?; - Ok(()) - } - pub async fn pause_active_thread_goal( &self, thread_id: ThreadId, @@ -463,6 +452,29 @@ RETURNING let updated = thread_goal_from_row(&row)?; Ok(ThreadGoalAccountingOutcome::Updated(updated)) } + + async fn set_thread_preview_if_empty( + &self, + thread_id: ThreadId, + preview: &str, + ) -> anyhow::Result<()> { + let preview = preview.trim(); + if preview.is_empty() { + return Ok(()); + } + sqlx::query( + r#" +UPDATE threads +SET preview = ? +WHERE id = ? AND preview = '' + "#, + ) + .bind(preview) + .bind(thread_id.to_string()) + .execute(self.pool.as_ref()) + .await?; + Ok(()) + } } fn thread_goal_from_row(row: &sqlx::sqlite::SqliteRow) -> anyhow::Result { @@ -519,6 +531,7 @@ mod tests { upsert_test_thread(&runtime, thread_id).await; let goal = runtime + .thread_goals() .replace_thread_goal( thread_id, "optimize the benchmark", @@ -529,7 +542,11 @@ mod tests { .expect("goal replacement should succeed"); assert_eq!( Some(goal.clone()), - runtime.get_thread_goal(thread_id).await.unwrap() + runtime + .thread_goals() + .get_thread_goal(thread_id) + .await + .unwrap() ); let metadata = runtime .get_thread(thread_id) @@ -539,6 +556,7 @@ mod tests { assert_eq!(metadata.preview.as_deref(), Some("hello")); let updated = runtime + .thread_goals() .update_thread_goal( thread_id, ThreadGoalUpdate { @@ -560,6 +578,7 @@ mod tests { assert_eq!(expected, updated); let replaced = runtime + .thread_goals() .replace_thread_goal( thread_id, "ship the new result", @@ -574,9 +593,28 @@ mod tests { assert_eq!(0, replaced.tokens_used); assert_eq!(0, replaced.time_used_seconds); - assert!(runtime.delete_thread_goal(thread_id).await.unwrap()); - assert_eq!(None, runtime.get_thread_goal(thread_id).await.unwrap()); - assert!(!runtime.delete_thread_goal(thread_id).await.unwrap()); + assert!( + runtime + .thread_goals() + .delete_thread_goal(thread_id) + .await + .unwrap() + ); + assert_eq!( + None, + runtime + .thread_goals() + .get_thread_goal(thread_id) + .await + .unwrap() + ); + assert!( + !runtime + .thread_goals() + .delete_thread_goal(thread_id) + .await + .unwrap() + ); } #[tokio::test] @@ -596,6 +634,7 @@ mod tests { .expect("test thread should be upserted"); runtime + .thread_goals() .replace_thread_goal( thread_id, "optimize the benchmark", @@ -621,6 +660,7 @@ mod tests { upsert_test_thread(&runtime, thread_id).await; let replaced = runtime + .thread_goals() .replace_thread_goal( thread_id, "stay within budget", @@ -643,6 +683,7 @@ mod tests { upsert_test_thread(&runtime, thread_id).await; let inserted = runtime + .thread_goals() .insert_thread_goal( thread_id, "optimize the benchmark", @@ -654,6 +695,7 @@ mod tests { .expect("goal should be inserted"); let duplicate = runtime + .thread_goals() .insert_thread_goal( thread_id, "replace the benchmark", @@ -666,7 +708,11 @@ mod tests { assert_eq!(None, duplicate); assert_eq!( Some(inserted), - runtime.get_thread_goal(thread_id).await.unwrap() + runtime + .thread_goals() + .get_thread_goal(thread_id) + .await + .unwrap() ); } @@ -677,6 +723,7 @@ mod tests { upsert_test_thread(&runtime, thread_id).await; let inserted = runtime + .thread_goals() .insert_thread_goal( thread_id, "stay within budget", @@ -700,6 +747,7 @@ mod tests { upsert_test_thread(&runtime, thread_id).await; let original = runtime + .thread_goals() .replace_thread_goal( thread_id, "old objective", @@ -709,6 +757,7 @@ mod tests { .await .expect("goal replacement should succeed"); let replacement = runtime + .thread_goals() .replace_thread_goal( thread_id, "new objective", @@ -719,6 +768,7 @@ mod tests { .expect("goal replacement should succeed"); let stale_update = runtime + .thread_goals() .update_thread_goal( thread_id, ThreadGoalUpdate { @@ -735,12 +785,14 @@ mod tests { assert_eq!( Some(replacement.clone()), runtime + .thread_goals() .get_thread_goal(thread_id) .await .expect("goal read should succeed") ); let fresh_update = runtime + .thread_goals() .update_thread_goal( thread_id, ThreadGoalUpdate { @@ -763,6 +815,7 @@ mod tests { upsert_test_thread(&runtime, thread_id).await; let original = runtime + .thread_goals() .replace_thread_goal( thread_id, "old objective", @@ -772,6 +825,7 @@ mod tests { .await .expect("goal replacement should succeed"); let replacement = runtime + .thread_goals() .replace_thread_goal( thread_id, "new objective", @@ -782,6 +836,7 @@ mod tests { .expect("goal replacement should succeed"); let outcome = runtime + .thread_goals() .account_thread_goal_usage( thread_id, /*time_delta_seconds*/ 5, @@ -809,6 +864,7 @@ mod tests { upsert_test_thread(&runtime, thread_id).await; runtime + .thread_goals() .replace_thread_goal( thread_id, "draft the report", @@ -818,6 +874,7 @@ mod tests { .await .expect("goal replacement should succeed"); let outcome = runtime + .thread_goals() .account_thread_goal_usage( thread_id, /*time_delta_seconds*/ 12, @@ -832,6 +889,7 @@ mod tests { }; let updated = runtime + .thread_goals() .update_thread_goal( thread_id, ThreadGoalUpdate { @@ -860,6 +918,7 @@ mod tests { let thread_id = test_thread_id(); upsert_test_thread(&runtime, thread_id).await; runtime + .thread_goals() .replace_thread_goal( thread_id, "optimize the benchmark", @@ -869,7 +928,7 @@ mod tests { .await .expect("goal replacement should succeed"); - let status_update = runtime.update_thread_goal( + let status_update = runtime.thread_goals().update_thread_goal( thread_id, ThreadGoalUpdate { objective: None, @@ -878,7 +937,7 @@ mod tests { expected_goal_id: None, }, ); - let budget_update = runtime.update_thread_goal( + let budget_update = runtime.thread_goals().update_thread_goal( thread_id, ThreadGoalUpdate { objective: None, @@ -892,6 +951,7 @@ mod tests { budget_update.expect("budget update should succeed"); let goal = runtime + .thread_goals() .get_thread_goal(thread_id) .await .expect("goal read should succeed") @@ -906,6 +966,7 @@ mod tests { let thread_id = test_thread_id(); upsert_test_thread(&runtime, thread_id).await; let goal = runtime + .thread_goals() .replace_thread_goal( thread_id, "optimize the benchmark", @@ -916,6 +977,7 @@ mod tests { .expect("goal replacement should succeed"); let paused = runtime + .thread_goals() .pause_active_thread_goal(thread_id) .await .expect("active pause should succeed") @@ -928,6 +990,7 @@ mod tests { assert_eq!(expected, paused); let complete = runtime + .thread_goals() .update_thread_goal( thread_id, ThreadGoalUpdate { @@ -941,6 +1004,7 @@ mod tests { .expect("goal update should succeed") .expect("goal should exist"); let pause_result = runtime + .thread_goals() .pause_active_thread_goal(thread_id) .await .expect("terminal pause attempt should succeed"); @@ -948,6 +1012,7 @@ mod tests { assert_eq!( Some(complete), runtime + .thread_goals() .get_thread_goal(thread_id) .await .expect("goal read should succeed") @@ -960,6 +1025,7 @@ mod tests { let thread_id = test_thread_id(); upsert_test_thread(&runtime, thread_id).await; runtime + .thread_goals() .replace_thread_goal( thread_id, "stay within budget", @@ -970,6 +1036,7 @@ mod tests { .expect("goal replacement should succeed"); let outcome = runtime + .thread_goals() .account_thread_goal_usage( thread_id, /*time_delta_seconds*/ 7, @@ -987,6 +1054,7 @@ mod tests { assert_eq!(7, goal.time_used_seconds); let outcome = runtime + .thread_goals() .account_thread_goal_usage( thread_id, /*time_delta_seconds*/ 3, @@ -1004,6 +1072,7 @@ mod tests { assert_eq!(10, goal.time_used_seconds); let outcome = runtime + .thread_goals() .account_thread_goal_usage( thread_id, /*time_delta_seconds*/ 5, @@ -1027,6 +1096,7 @@ mod tests { let thread_id = test_thread_id(); upsert_test_thread(&runtime, thread_id).await; runtime + .thread_goals() .replace_thread_goal( thread_id, "stay stopped", @@ -1037,6 +1107,7 @@ mod tests { .expect("goal replacement should succeed"); let outcome = runtime + .thread_goals() .account_thread_goal_usage( thread_id, /*time_delta_seconds*/ 5, @@ -1060,6 +1131,7 @@ mod tests { let thread_id = test_thread_id(); upsert_test_thread(&runtime, thread_id).await; runtime + .thread_goals() .replace_thread_goal( thread_id, "stop before overrun", @@ -1069,6 +1141,7 @@ mod tests { .await .expect("goal replacement should succeed"); runtime + .thread_goals() .update_thread_goal( thread_id, crate::ThreadGoalUpdate { @@ -1082,6 +1155,7 @@ mod tests { .expect("goal update should succeed"); let outcome = runtime + .thread_goals() .account_thread_goal_usage( thread_id, /*time_delta_seconds*/ 3, @@ -1105,6 +1179,7 @@ mod tests { let thread_id = test_thread_id(); upsert_test_thread(&runtime, thread_id).await; runtime + .thread_goals() .replace_thread_goal( thread_id, "stay within budget", @@ -1114,6 +1189,7 @@ mod tests { .await .expect("goal replacement should succeed"); runtime + .thread_goals() .account_thread_goal_usage( thread_id, /*time_delta_seconds*/ 1, @@ -1125,6 +1201,7 @@ mod tests { .expect("usage accounting should succeed"); let lowered = runtime + .thread_goals() .update_thread_goal( thread_id, ThreadGoalUpdate { @@ -1149,6 +1226,7 @@ mod tests { let thread_id = test_thread_id(); upsert_test_thread(&runtime, thread_id).await; runtime + .thread_goals() .replace_thread_goal( thread_id, "stay within budget", @@ -1158,6 +1236,7 @@ mod tests { .await .expect("goal replacement should succeed"); runtime + .thread_goals() .account_thread_goal_usage( thread_id, /*time_delta_seconds*/ 1, @@ -1169,6 +1248,7 @@ mod tests { .expect("usage accounting should succeed"); let reactivated = runtime + .thread_goals() .update_thread_goal( thread_id, ThreadGoalUpdate { @@ -1197,6 +1277,7 @@ mod tests { let thread_id = test_thread_id(); upsert_test_thread(&runtime, thread_id).await; runtime + .thread_goals() .replace_thread_goal( thread_id, "stay within budget", @@ -1206,6 +1287,7 @@ mod tests { .await .expect("goal replacement should succeed"); runtime + .thread_goals() .account_thread_goal_usage( thread_id, /*time_delta_seconds*/ 1, @@ -1217,6 +1299,7 @@ mod tests { .expect("usage accounting should succeed"); let paused = runtime + .thread_goals() .update_thread_goal( thread_id, ThreadGoalUpdate { @@ -1241,6 +1324,7 @@ mod tests { let thread_id = test_thread_id(); upsert_test_thread(&runtime, thread_id).await; runtime + .thread_goals() .replace_thread_goal( thread_id, "finish the report", @@ -1251,6 +1335,7 @@ mod tests { .expect("goal replacement should succeed"); let active_only = runtime + .thread_goals() .account_thread_goal_usage( thread_id, /*time_delta_seconds*/ 30, @@ -1268,6 +1353,7 @@ mod tests { assert_eq!(0, goal.time_used_seconds); let completing_turn = runtime + .thread_goals() .account_thread_goal_usage( thread_id, /*time_delta_seconds*/ 30, @@ -1291,6 +1377,7 @@ mod tests { let thread_id = test_thread_id(); upsert_test_thread(&runtime, thread_id).await; runtime + .thread_goals() .replace_thread_goal( thread_id, "finish the report", @@ -1300,6 +1387,7 @@ mod tests { .await .expect("goal replacement should succeed"); runtime + .thread_goals() .update_thread_goal( thread_id, ThreadGoalUpdate { @@ -1314,6 +1402,7 @@ mod tests { .expect("goal should exist"); let active_only = runtime + .thread_goals() .account_thread_goal_usage( thread_id, /*time_delta_seconds*/ 30, @@ -1331,6 +1420,7 @@ mod tests { assert_eq!(0, goal.time_used_seconds); let in_flight_turn = runtime + .thread_goals() .account_thread_goal_usage( thread_id, /*time_delta_seconds*/ 30, @@ -1354,6 +1444,7 @@ mod tests { let thread_id = test_thread_id(); upsert_test_thread(&runtime, thread_id).await; runtime + .thread_goals() .replace_thread_goal( thread_id, "count every token", @@ -1363,14 +1454,14 @@ mod tests { .await .expect("goal replacement should succeed"); - let first = runtime.account_thread_goal_usage( + let first = runtime.thread_goals().account_thread_goal_usage( thread_id, /*time_delta_seconds*/ 4, /*token_delta*/ 40, ThreadGoalAccountingMode::ActiveOnly, /*expected_goal_id*/ None, ); - let second = runtime.account_thread_goal_usage( + let second = runtime.thread_goals().account_thread_goal_usage( thread_id, /*time_delta_seconds*/ 6, /*token_delta*/ 60, @@ -1382,6 +1473,7 @@ mod tests { second.expect("second usage accounting should succeed"); let goal = runtime + .thread_goals() .get_thread_goal(thread_id) .await .expect("goal read should succeed") @@ -1396,6 +1488,7 @@ mod tests { let thread_id = test_thread_id(); upsert_test_thread(&runtime, thread_id).await; runtime + .thread_goals() .replace_thread_goal( thread_id, "clean up with the thread", @@ -1413,6 +1506,7 @@ mod tests { assert_eq!( None, runtime + .thread_goals() .get_thread_goal(thread_id) .await .expect("goal read should succeed") diff --git a/codex-rs/state/src/runtime/threads.rs b/codex-rs/state/src/runtime/threads.rs index 925fb4528891..c8c48db72196 100644 --- a/codex-rs/state/src/runtime/threads.rs +++ b/codex-rs/state/src/runtime/threads.rs @@ -947,7 +947,11 @@ ON CONFLICT(thread_id, position) DO NOTHING .bind(thread_id.to_string()) .execute(self.pool.as_ref()) .await?; - Ok(result.rows_affected()) + let rows_affected = result.rows_affected(); + if rows_affected > 0 { + self.thread_goals.delete_thread_goal(thread_id).await?; + } + Ok(rows_affected) } }