From c4d1e02e3c474e301574e45980efadabefba06a3 Mon Sep 17 00:00:00 2001 From: jif-oai Date: Mon, 9 Feb 2026 16:04:42 +0000 Subject: [PATCH 1/2] tools: remove get_memory tool and tests --- .../core/src/tools/handlers/get_memory.rs | 72 ------------- codex-rs/core/src/tools/handlers/mod.rs | 2 - codex-rs/core/src/tools/spec.rs | 59 ---------- codex-rs/core/tests/suite/memory_tool.rs | 101 ------------------ codex-rs/core/tests/suite/mod.rs | 1 - 5 files changed, 235 deletions(-) delete mode 100644 codex-rs/core/src/tools/handlers/get_memory.rs delete mode 100644 codex-rs/core/tests/suite/memory_tool.rs diff --git a/codex-rs/core/src/tools/handlers/get_memory.rs b/codex-rs/core/src/tools/handlers/get_memory.rs deleted file mode 100644 index df2929b88a7..00000000000 --- a/codex-rs/core/src/tools/handlers/get_memory.rs +++ /dev/null @@ -1,72 +0,0 @@ -use crate::function_tool::FunctionCallError; -use crate::state_db; -use crate::tools::context::ToolInvocation; -use crate::tools::context::ToolOutput; -use crate::tools::context::ToolPayload; -use crate::tools::handlers::parse_arguments; -use crate::tools::registry::ToolHandler; -use crate::tools::registry::ToolKind; -use async_trait::async_trait; -use codex_protocol::ThreadId; -use codex_protocol::models::FunctionCallOutputBody; -use serde::Deserialize; -use serde_json::json; - -pub struct GetMemoryHandler; - -#[derive(Deserialize)] -struct GetMemoryArgs { - memory_id: String, -} - -#[async_trait] -impl ToolHandler for GetMemoryHandler { - fn kind(&self) -> ToolKind { - ToolKind::Function - } - - async fn handle(&self, invocation: ToolInvocation) -> Result { - let ToolInvocation { - session, payload, .. - } = invocation; - - let arguments = match payload { - ToolPayload::Function { arguments } => arguments, - _ => { - return Err(FunctionCallError::RespondToModel( - "get_memory handler received unsupported payload".to_string(), - )); - } - }; - - let args: GetMemoryArgs = parse_arguments(&arguments)?; - let thread_id = ThreadId::from_string(args.memory_id.as_str()).map_err(|err| { - FunctionCallError::RespondToModel(format!("memory_id must be a valid thread id: {err}")) - })?; - - let state_db_ctx = session.state_db(); - let memory = - state_db::get_thread_memory(state_db_ctx.as_deref(), thread_id, "get_memory_tool") - .await - .ok_or_else(|| { - FunctionCallError::RespondToModel(format!( - "memory not found for memory_id={}", - args.memory_id - )) - })?; - - let content = serde_json::to_string_pretty(&json!({ - "memory_id": args.memory_id, - "trace_summary": memory.trace_summary, - "memory_summary": memory.memory_summary, - })) - .map_err(|err| { - FunctionCallError::Fatal(format!("failed to serialize memory payload: {err}")) - })?; - - Ok(ToolOutput::Function { - body: FunctionCallOutputBody::Text(content), - success: Some(true), - }) - } -} diff --git a/codex-rs/core/src/tools/handlers/mod.rs b/codex-rs/core/src/tools/handlers/mod.rs index d8ec8871626..dda4760bd79 100644 --- a/codex-rs/core/src/tools/handlers/mod.rs +++ b/codex-rs/core/src/tools/handlers/mod.rs @@ -1,7 +1,6 @@ pub mod apply_patch; pub(crate) mod collab; mod dynamic; -mod get_memory; mod grep_files; mod list_dir; mod mcp; @@ -21,7 +20,6 @@ use crate::function_tool::FunctionCallError; pub use apply_patch::ApplyPatchHandler; pub use collab::CollabHandler; pub use dynamic::DynamicToolHandler; -pub use get_memory::GetMemoryHandler; pub use grep_files::GrepFilesHandler; pub use list_dir::ListDirHandler; pub use mcp::McpHandler; diff --git a/codex-rs/core/src/tools/spec.rs b/codex-rs/core/src/tools/spec.rs index 949a5f050db..4844828886f 100644 --- a/codex-rs/core/src/tools/spec.rs +++ b/codex-rs/core/src/tools/spec.rs @@ -33,7 +33,6 @@ pub(crate) struct ToolsConfig { pub supports_image_input: bool, pub collab_tools: bool, pub collaboration_modes_tools: bool, - pub memory_tools: bool, pub request_rule_enabled: bool, pub experimental_supported_tools: Vec, } @@ -54,7 +53,6 @@ impl ToolsConfig { let include_apply_patch_tool = features.enabled(Feature::ApplyPatchFreeform); let include_collab_tools = features.enabled(Feature::Collab); let include_collaboration_modes_tools = features.enabled(Feature::CollaborationModes); - let include_memory_tools = features.enabled(Feature::MemoryTool); let request_rule_enabled = features.enabled(Feature::RequestRule); let shell_type = if !features.enabled(Feature::ShellTool) { @@ -89,7 +87,6 @@ impl ToolsConfig { supports_image_input: model_info.input_modalities.contains(&InputModality::Image), collab_tools: include_collab_tools, collaboration_modes_tools: include_collaboration_modes_tools, - memory_tools: include_memory_tools, request_rule_enabled, experimental_supported_tools: model_info.experimental_supported_tools.clone(), } @@ -663,28 +660,6 @@ fn create_request_user_input_tool() -> ToolSpec { }) } -fn create_get_memory_tool() -> ToolSpec { - let properties = BTreeMap::from([( - "memory_id".to_string(), - JsonSchema::String { - description: Some( - "Memory ID to fetch. Uses the thread ID as the memory identifier.".to_string(), - ), - }, - )]); - - ToolSpec::Function(ResponsesApiTool { - name: "get_memory".to_string(), - description: "Loads the full stored memory payload for a memory_id.".to_string(), - strict: false, - parameters: JsonSchema::Object { - properties, - required: Some(vec!["memory_id".to_string()]), - additional_properties: Some(false.into()), - }, - }) -} - fn create_close_agent_tool() -> ToolSpec { let mut properties = BTreeMap::new(); properties.insert( @@ -1279,7 +1254,6 @@ pub(crate) fn build_specs( use crate::tools::handlers::ApplyPatchHandler; use crate::tools::handlers::CollabHandler; use crate::tools::handlers::DynamicToolHandler; - use crate::tools::handlers::GetMemoryHandler; use crate::tools::handlers::GrepFilesHandler; use crate::tools::handlers::ListDirHandler; use crate::tools::handlers::McpHandler; @@ -1301,7 +1275,6 @@ pub(crate) fn build_specs( let plan_handler = Arc::new(PlanHandler); let apply_patch_handler = Arc::new(ApplyPatchHandler); let dynamic_tool_handler = Arc::new(DynamicToolHandler); - let get_memory_handler = Arc::new(GetMemoryHandler); let view_image_handler = Arc::new(ViewImageHandler); let mcp_handler = Arc::new(McpHandler); let mcp_resource_handler = Arc::new(McpResourceHandler); @@ -1361,11 +1334,6 @@ pub(crate) fn build_specs( builder.register_handler("request_user_input", request_user_input_handler); } - if config.memory_tools { - builder.push_spec(create_get_memory_tool()); - builder.register_handler("get_memory", get_memory_handler); - } - if let Some(apply_patch_tool_type) = &config.apply_patch_tool_type { match apply_patch_tool_type { ApplyPatchToolType::Freeform => { @@ -1742,33 +1710,6 @@ mod tests { assert_contains_tool_names(&tools, &["request_user_input"]); } - #[test] - fn get_memory_requires_memory_tool_feature() { - let config = test_config(); - let model_info = ModelsManager::construct_model_info_offline("gpt-5-codex", &config); - let mut features = Features::with_defaults(); - features.disable(Feature::MemoryTool); - let tools_config = ToolsConfig::new(&ToolsConfigParams { - model_info: &model_info, - features: &features, - web_search_mode: Some(WebSearchMode::Cached), - }); - let (tools, _) = build_specs(&tools_config, None, &[]).build(); - assert!( - !tools.iter().any(|t| t.spec.name() == "get_memory"), - "get_memory should be disabled when memory_tool feature is off" - ); - - features.enable(Feature::MemoryTool); - let tools_config = ToolsConfig::new(&ToolsConfigParams { - model_info: &model_info, - features: &features, - web_search_mode: Some(WebSearchMode::Cached), - }); - let (tools, _) = build_specs(&tools_config, None, &[]).build(); - assert_contains_tool_names(&tools, &["get_memory"]); - } - fn assert_model_tools( model_slug: &str, features: &Features, diff --git a/codex-rs/core/tests/suite/memory_tool.rs b/codex-rs/core/tests/suite/memory_tool.rs deleted file mode 100644 index 7fbd15db461..00000000000 --- a/codex-rs/core/tests/suite/memory_tool.rs +++ /dev/null @@ -1,101 +0,0 @@ -#![allow(clippy::expect_used, clippy::unwrap_used)] - -use anyhow::Result; -use codex_core::features::Feature; -use core_test_support::responses::ev_assistant_message; -use core_test_support::responses::ev_completed; -use core_test_support::responses::ev_response_created; -use core_test_support::responses::mount_function_call_agent_response; -use core_test_support::responses::mount_sse_once; -use core_test_support::responses::sse; -use core_test_support::responses::start_mock_server; -use core_test_support::skip_if_no_network; -use core_test_support::test_codex::test_codex; -use pretty_assertions::assert_eq; -use serde_json::Value; -use serde_json::json; -use tokio::time::Duration; - -#[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn get_memory_tool_returns_persisted_thread_memory() -> Result<()> { - skip_if_no_network!(Ok(())); - - let server = start_mock_server().await; - let mut builder = test_codex().with_config(|config| { - config.features.enable(Feature::Sqlite); - config.features.enable(Feature::MemoryTool); - }); - let test = builder.build(&server).await?; - - let db = test.codex.state_db().expect("state db enabled"); - let thread_id = test.session_configured.session_id; - let thread_id_string = thread_id.to_string(); - - mount_sse_once( - &server, - sse(vec![ - ev_response_created("resp-init"), - ev_assistant_message("msg-init", "Materialized"), - ev_completed("resp-init"), - ]), - ) - .await; - test.submit_turn("materialize thread before memory write") - .await?; - - let mut thread_exists = false; - // Wait for DB creation. - for _ in 0..100 { - if db.get_thread(thread_id).await?.is_some() { - thread_exists = true; - break; - } - tokio::time::sleep(Duration::from_millis(25)).await; - } - assert!(thread_exists, "thread should exist in state db"); - - let trace_summary = "trace summary from sqlite"; - let memory_summary = "memory summary from sqlite"; - db.upsert_thread_memory(thread_id, trace_summary, memory_summary) - .await?; - - let call_id = "memory-call-1"; - let arguments = json!({ - "memory_id": thread_id_string, - }) - .to_string(); - let mocks = - mount_function_call_agent_response(&server, call_id, &arguments, "get_memory").await; - - test.submit_turn("load the saved memory").await?; - - let initial_request = mocks.function_call.single_request().body_json(); - assert!( - initial_request["tools"] - .as_array() - .expect("tools array") - .iter() - .filter_map(|tool| tool.get("name").and_then(Value::as_str)) - .any(|name| name == "get_memory"), - "get_memory tool should be exposed when memory_tool feature is enabled" - ); - - let completion_request = mocks.completion.single_request(); - let (content_opt, success_opt) = completion_request - .function_call_output_content_and_success(call_id) - .expect("function_call_output should be present"); - let success = success_opt.unwrap_or(true); - assert!(success, "expected successful get_memory tool call output"); - let content = content_opt.expect("function_call_output content should be present"); - let payload: Value = serde_json::from_str(&content)?; - assert_eq!( - payload, - json!({ - "memory_id": thread_id_string, - "trace_summary": trace_summary, - "memory_summary": memory_summary, - }) - ); - - Ok(()) -} diff --git a/codex-rs/core/tests/suite/mod.rs b/codex-rs/core/tests/suite/mod.rs index 379f521682b..b903f4e6cab 100644 --- a/codex-rs/core/tests/suite/mod.rs +++ b/codex-rs/core/tests/suite/mod.rs @@ -82,7 +82,6 @@ mod list_dir; mod list_models; mod live_cli; mod live_reload; -mod memory_tool; mod model_info_overrides; mod model_overrides; mod model_switching; From 4974348a8c4bc73802216258f4c70cf4c321d397 Mon Sep 17 00:00:00 2001 From: jif-oai Date: Mon, 9 Feb 2026 16:12:24 +0000 Subject: [PATCH 2/2] state: add memory consolidation lock primitives --- codex-rs/core/src/state_db.rs | 57 ++++++- .../0009_memory_consolidation_locks.sql | 8 + codex-rs/state/src/runtime.rs | 143 ++++++++++++++++++ 3 files changed, 207 insertions(+), 1 deletion(-) create mode 100644 codex-rs/state/migrations/0009_memory_consolidation_locks.sql diff --git a/codex-rs/core/src/state_db.rs b/codex-rs/core/src/state_db.rs index b1b53bcbc17..3707be77c72 100644 --- a/codex-rs/core/src/state_db.rs +++ b/codex-rs/core/src/state_db.rs @@ -1,5 +1,6 @@ use crate::config::Config; use crate::features::Feature; +use crate::path_utils::normalize_for_path_comparison; use crate::rollout::list::Cursor; use crate::rollout::list::ThreadSortKey; use crate::rollout::metadata; @@ -156,6 +157,10 @@ fn cursor_to_anchor(cursor: Option<&Cursor>) -> Option { Some(codex_state::Anchor { ts, id }) } +fn normalize_cwd_for_state_db(cwd: &Path) -> PathBuf { + normalize_for_path_comparison(cwd).unwrap_or_else(|_| cwd.to_path_buf()) +} + /// List thread ids from SQLite for parity checks without rollout scanning. #[allow(clippy::too_many_arguments)] pub async fn list_thread_ids_db( @@ -355,7 +360,11 @@ pub async fn get_last_n_thread_memories_for_cwd( stage: &str, ) -> Option> { let ctx = context?; - match ctx.get_last_n_thread_memories_for_cwd(cwd, n).await { + let normalized_cwd = normalize_cwd_for_state_db(cwd); + match ctx + .get_last_n_thread_memories_for_cwd(&normalized_cwd, n) + .await + { Ok(memories) => Some(memories), Err(err) => { warn!("state db get_last_n_thread_memories_for_cwd failed during {stage}: {err}"); @@ -364,6 +373,49 @@ pub async fn get_last_n_thread_memories_for_cwd( } } +/// Try to acquire or renew a per-cwd memory consolidation lock. +pub async fn try_acquire_memory_consolidation_lock( + context: Option<&codex_state::StateRuntime>, + cwd: &Path, + working_thread_id: ThreadId, + lease_seconds: i64, + stage: &str, +) -> Option { + let ctx = context?; + let normalized_cwd = normalize_cwd_for_state_db(cwd); + match ctx + .try_acquire_memory_consolidation_lock(&normalized_cwd, working_thread_id, lease_seconds) + .await + { + Ok(acquired) => Some(acquired), + Err(err) => { + warn!("state db try_acquire_memory_consolidation_lock failed during {stage}: {err}"); + None + } + } +} + +/// Release a per-cwd memory consolidation lock if held by `working_thread_id`. +pub async fn release_memory_consolidation_lock( + context: Option<&codex_state::StateRuntime>, + cwd: &Path, + working_thread_id: ThreadId, + stage: &str, +) -> Option { + let ctx = context?; + let normalized_cwd = normalize_cwd_for_state_db(cwd); + match ctx + .release_memory_consolidation_lock(&normalized_cwd, working_thread_id) + .await + { + Ok(released) => Some(released), + Err(err) => { + warn!("state db release_memory_consolidation_lock failed during {stage}: {err}"); + None + } + } +} + /// Reconcile rollout items into SQLite, falling back to scanning the rollout file. pub async fn reconcile_rollout( context: Option<&codex_state::StateRuntime>, @@ -400,6 +452,7 @@ pub async fn reconcile_rollout( } }; let mut metadata = outcome.metadata; + metadata.cwd = normalize_cwd_for_state_db(&metadata.cwd); match archived_only { Some(true) if metadata.archived_at.is_none() => { metadata.archived_at = Some(metadata.updated_at); @@ -447,6 +500,7 @@ pub async fn read_repair_rollout_path( && let Ok(Some(mut metadata)) = ctx.get_thread(thread_id).await { metadata.rollout_path = rollout_path.to_path_buf(); + metadata.cwd = normalize_cwd_for_state_db(&metadata.cwd); match archived_only { Some(true) if metadata.archived_at.is_none() => { metadata.archived_at = Some(metadata.updated_at); @@ -509,6 +563,7 @@ pub async fn apply_rollout_items( }, }; builder.rollout_path = rollout_path.to_path_buf(); + builder.cwd = normalize_cwd_for_state_db(&builder.cwd); if let Err(err) = ctx.apply_rollout_items(&builder, items, None).await { warn!( "state db apply_rollout_items failed during {stage} for {}: {err}", diff --git a/codex-rs/state/migrations/0009_memory_consolidation_locks.sql b/codex-rs/state/migrations/0009_memory_consolidation_locks.sql new file mode 100644 index 00000000000..793ab4dc3b6 --- /dev/null +++ b/codex-rs/state/migrations/0009_memory_consolidation_locks.sql @@ -0,0 +1,8 @@ +CREATE TABLE memory_consolidation_locks ( + cwd TEXT PRIMARY KEY, + working_thread_id TEXT NOT NULL, + updated_at INTEGER NOT NULL +); + +CREATE INDEX idx_memory_consolidation_locks_updated_at + ON memory_consolidation_locks(updated_at DESC); diff --git a/codex-rs/state/src/runtime.rs b/codex-rs/state/src/runtime.rs index 6e6fb90017d..980443d5b16 100644 --- a/codex-rs/state/src/runtime.rs +++ b/codex-rs/state/src/runtime.rs @@ -583,6 +583,64 @@ LIMIT ? .collect() } + /// Try to acquire or renew the per-cwd memory consolidation lock. + /// + /// Returns `true` when the lock is acquired/renewed for `working_thread_id`. + /// Returns `false` when another owner holds a non-expired lease. + pub async fn try_acquire_memory_consolidation_lock( + &self, + cwd: &Path, + working_thread_id: ThreadId, + lease_seconds: i64, + ) -> anyhow::Result { + let now = Utc::now().timestamp(); + let stale_cutoff = now.saturating_sub(lease_seconds.max(0)); + let result = sqlx::query( + r#" +INSERT INTO memory_consolidation_locks ( + cwd, + working_thread_id, + updated_at +) VALUES (?, ?, ?) +ON CONFLICT(cwd) DO UPDATE SET + working_thread_id = excluded.working_thread_id, + updated_at = excluded.updated_at +WHERE memory_consolidation_locks.working_thread_id = excluded.working_thread_id + OR memory_consolidation_locks.updated_at <= ? + "#, + ) + .bind(cwd.display().to_string()) + .bind(working_thread_id.to_string()) + .bind(now) + .bind(stale_cutoff) + .execute(self.pool.as_ref()) + .await?; + + Ok(result.rows_affected() > 0) + } + + /// Release the per-cwd memory consolidation lock if held by `working_thread_id`. + /// + /// Returns `true` when a lock row was removed. + pub async fn release_memory_consolidation_lock( + &self, + cwd: &Path, + working_thread_id: ThreadId, + ) -> anyhow::Result { + let result = sqlx::query( + r#" +DELETE FROM memory_consolidation_locks +WHERE cwd = ? AND working_thread_id = ? + "#, + ) + .bind(cwd.display().to_string()) + .bind(working_thread_id.to_string()) + .execute(self.pool.as_ref()) + .await?; + + Ok(result.rows_affected() > 0) + } + /// Persist dynamic tools for a thread if none have been stored yet. /// /// Dynamic tools are defined at thread start and should not change afterward. @@ -1328,6 +1386,91 @@ mod tests { let _ = tokio::fs::remove_dir_all(codex_home).await; } + #[tokio::test] + async fn memory_consolidation_lock_enforces_owner_and_release() { + let codex_home = unique_temp_dir(); + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None) + .await + .expect("initialize runtime"); + + let cwd = codex_home.join("workspace"); + let owner_a = ThreadId::from_string(&Uuid::new_v4().to_string()).expect("thread id"); + let owner_b = ThreadId::from_string(&Uuid::new_v4().to_string()).expect("thread id"); + + assert!( + runtime + .try_acquire_memory_consolidation_lock(cwd.as_path(), owner_a, 600) + .await + .expect("acquire for owner_a"), + "owner_a should acquire lock" + ); + assert!( + !runtime + .try_acquire_memory_consolidation_lock(cwd.as_path(), owner_b, 600) + .await + .expect("acquire for owner_b should fail"), + "owner_b should not steal active lock" + ); + assert!( + runtime + .try_acquire_memory_consolidation_lock(cwd.as_path(), owner_a, 600) + .await + .expect("owner_a should renew lock"), + "owner_a should renew lock" + ); + assert!( + !runtime + .release_memory_consolidation_lock(cwd.as_path(), owner_b) + .await + .expect("owner_b release should be no-op"), + "non-owner release should not remove lock" + ); + assert!( + runtime + .release_memory_consolidation_lock(cwd.as_path(), owner_a) + .await + .expect("owner_a release"), + "owner_a should release lock" + ); + assert!( + runtime + .try_acquire_memory_consolidation_lock(cwd.as_path(), owner_b, 600) + .await + .expect("owner_b acquire after release"), + "owner_b should acquire released lock" + ); + + let _ = tokio::fs::remove_dir_all(codex_home).await; + } + + #[tokio::test] + async fn memory_consolidation_lock_can_be_stolen_when_lease_expired() { + let codex_home = unique_temp_dir(); + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None) + .await + .expect("initialize runtime"); + + let cwd = codex_home.join("workspace"); + let owner_a = ThreadId::from_string(&Uuid::new_v4().to_string()).expect("thread id"); + let owner_b = ThreadId::from_string(&Uuid::new_v4().to_string()).expect("thread id"); + + assert!( + runtime + .try_acquire_memory_consolidation_lock(cwd.as_path(), owner_a, 600) + .await + .expect("owner_a acquire") + ); + assert!( + runtime + .try_acquire_memory_consolidation_lock(cwd.as_path(), owner_b, 0) + .await + .expect("owner_b steal with expired lease"), + "owner_b should steal lock when lease cutoff marks previous lock stale" + ); + + let _ = tokio::fs::remove_dir_all(codex_home).await; + } + #[tokio::test] async fn deleting_thread_cascades_thread_memory() { let codex_home = unique_temp_dir();