diff --git a/codex-rs/Cargo.lock b/codex-rs/Cargo.lock index f9d3620c0b..651edafb1b 100644 --- a/codex-rs/Cargo.lock +++ b/codex-rs/Cargo.lock @@ -861,6 +861,7 @@ dependencies = [ "codex-arg0", "codex-common", "codex-core", + "codex-file-search", "codex-login", "codex-protocol", "core_test_support", diff --git a/codex-rs/file-search/src/lib.rs b/codex-rs/file-search/src/lib.rs index 3238c99f24..82b8fcb695 100644 --- a/codex-rs/file-search/src/lib.rs +++ b/codex-rs/file-search/src/lib.rs @@ -287,10 +287,27 @@ pub fn run( /// Sort matches in-place by descending score, then ascending path. fn sort_matches(matches: &mut [(u32, String)]) { - matches.sort_by(|a, b| match b.0.cmp(&a.0) { - std::cmp::Ordering::Equal => a.1.cmp(&b.1), + matches.sort_by(cmp_by_score_desc_then_path_asc::<(u32, String), _, _>( + |t| t.0, + |t| t.1.as_str(), + )); +} + +/// Returns a comparator closure suitable for `slice.sort_by(...)` that orders +/// items by descending score and then ascending path using the provided accessors. +pub fn cmp_by_score_desc_then_path_asc( + score_of: FScore, + path_of: FPath, +) -> impl FnMut(&T, &T) -> std::cmp::Ordering +where + FScore: Fn(&T) -> u32, + FPath: Fn(&T) -> &str, +{ + use std::cmp::Ordering; + move |a, b| match score_of(b).cmp(&score_of(a)) { + Ordering::Equal => path_of(a).cmp(path_of(b)), other => other, - }); + } } /// Maintains the `max_count` best matches for a given pattern. diff --git a/codex-rs/mcp-server/Cargo.toml b/codex-rs/mcp-server/Cargo.toml index e40dd15df7..9a5fd947d8 100644 --- a/codex-rs/mcp-server/Cargo.toml +++ b/codex-rs/mcp-server/Cargo.toml @@ -19,6 +19,7 @@ anyhow = { workspace = true } codex-arg0 = { workspace = true } codex-common = { workspace = true, features = ["cli"] } codex-core = { workspace = true } +codex-file-search = { workspace = true } codex-login = { workspace = true } codex-protocol = { workspace = true } mcp-types = { workspace = true } diff --git a/codex-rs/mcp-server/src/codex_message_processor.rs b/codex-rs/mcp-server/src/codex_message_processor.rs index eec3a62a2c..53830431fd 100644 --- a/codex-rs/mcp-server/src/codex_message_processor.rs +++ b/codex-rs/mcp-server/src/codex_message_processor.rs @@ -1,5 +1,6 @@ use crate::error_code::INTERNAL_ERROR_CODE; use crate::error_code::INVALID_REQUEST_ERROR_CODE; +use crate::fuzzy_file_search::run_fuzzy_file_search; use crate::json_to_toml::json_to_toml; use crate::outgoing_message::OutgoingMessageSender; use crate::outgoing_message::OutgoingNotification; @@ -52,6 +53,8 @@ use codex_protocol::mcp_protocol::ExecArbitraryCommandResponse; use codex_protocol::mcp_protocol::ExecCommandApprovalParams; use codex_protocol::mcp_protocol::ExecCommandApprovalResponse; use codex_protocol::mcp_protocol::ExecOneOffCommandParams; +use codex_protocol::mcp_protocol::FuzzyFileSearchParams; +use codex_protocol::mcp_protocol::FuzzyFileSearchResponse; use codex_protocol::mcp_protocol::GetUserAgentResponse; use codex_protocol::mcp_protocol::GetUserSavedConfigResponse; use codex_protocol::mcp_protocol::GitDiffToRemoteResponse; @@ -88,6 +91,8 @@ use std::collections::HashMap; use std::ffi::OsStr; use std::path::PathBuf; use std::sync::Arc; +use std::sync::atomic::AtomicBool; +use std::sync::atomic::Ordering; use std::time::Duration; use tokio::select; use tokio::sync::Mutex; @@ -122,6 +127,7 @@ pub(crate) struct CodexMessageProcessor { active_login: Arc>>, // Queue of pending interrupt requests per conversation. We reply when TurnAborted arrives. pending_interrupts: Arc>>>, + pending_fuzzy_searches: Arc>>>, } impl CodexMessageProcessor { @@ -141,6 +147,7 @@ impl CodexMessageProcessor { conversation_listeners: HashMap::new(), active_login: Arc::new(Mutex::new(None)), pending_interrupts: Arc::new(Mutex::new(HashMap::new())), + pending_fuzzy_searches: Arc::new(Mutex::new(HashMap::new())), } } @@ -206,6 +213,9 @@ impl CodexMessageProcessor { ClientRequest::UserInfo { request_id } => { self.get_user_info(request_id).await; } + ClientRequest::FuzzyFileSearch { request_id, params } => { + self.fuzzy_file_search(request_id, params).await; + } ClientRequest::ExecOneOffCommand { request_id, params } => { self.exec_one_off_command(request_id, params).await; } @@ -1167,6 +1177,46 @@ impl CodexMessageProcessor { } } } + + async fn fuzzy_file_search(&mut self, request_id: RequestId, params: FuzzyFileSearchParams) { + let FuzzyFileSearchParams { + query, + roots, + cancellation_token, + } = params; + + let cancel_flag = match cancellation_token.clone() { + Some(token) => { + let mut pending_fuzzy_searches = self.pending_fuzzy_searches.lock().await; + // if a cancellation_token is provided and a pending_request exists for + // that token, cancel it + if let Some(existing) = pending_fuzzy_searches.get(&token) { + existing.store(true, Ordering::Relaxed); + } + let flag = Arc::new(AtomicBool::new(false)); + pending_fuzzy_searches.insert(token.clone(), flag.clone()); + flag + } + None => Arc::new(AtomicBool::new(false)), + }; + + let results = match query.as_str() { + "" => vec![], + _ => run_fuzzy_file_search(query, roots, cancel_flag.clone()).await, + }; + + if let Some(token) = cancellation_token { + let mut pending_fuzzy_searches = self.pending_fuzzy_searches.lock().await; + if let Some(current_flag) = pending_fuzzy_searches.get(&token) + && Arc::ptr_eq(current_flag, &cancel_flag) + { + pending_fuzzy_searches.remove(&token); + } + } + + let response = FuzzyFileSearchResponse { files: results }; + self.outgoing.send_response(request_id, response).await; + } } async fn apply_bespoke_event_handling( diff --git a/codex-rs/mcp-server/src/fuzzy_file_search.rs b/codex-rs/mcp-server/src/fuzzy_file_search.rs new file mode 100644 index 0000000000..eef760df26 --- /dev/null +++ b/codex-rs/mcp-server/src/fuzzy_file_search.rs @@ -0,0 +1,84 @@ +use std::num::NonZero; +use std::num::NonZeroUsize; +use std::path::PathBuf; +use std::sync::Arc; +use std::sync::atomic::AtomicBool; + +use codex_file_search as file_search; +use codex_protocol::mcp_protocol::FuzzyFileSearchResult; +use tokio::task::JoinSet; +use tracing::warn; + +const LIMIT_PER_ROOT: usize = 50; +const MAX_THREADS: usize = 12; +const COMPUTE_INDICES: bool = true; + +pub(crate) async fn run_fuzzy_file_search( + query: String, + roots: Vec, + cancellation_flag: Arc, +) -> Vec { + #[expect(clippy::expect_used)] + let limit_per_root = + NonZero::new(LIMIT_PER_ROOT).expect("LIMIT_PER_ROOT should be a valid non-zero usize"); + + let cores = std::thread::available_parallelism() + .map(std::num::NonZero::get) + .unwrap_or(1); + let threads = cores.min(MAX_THREADS); + let threads_per_root = (threads / roots.len()).max(1); + let threads = NonZero::new(threads_per_root).unwrap_or(NonZeroUsize::MIN); + + let mut files: Vec = Vec::new(); + let mut join_set = JoinSet::new(); + + for root in roots { + let search_dir = PathBuf::from(&root); + let query = query.clone(); + let cancel_flag = cancellation_flag.clone(); + join_set.spawn_blocking(move || { + match file_search::run( + query.as_str(), + limit_per_root, + &search_dir, + Vec::new(), + threads, + cancel_flag, + COMPUTE_INDICES, + ) { + Ok(res) => Ok((root, res)), + Err(err) => Err((root, err)), + } + }); + } + + while let Some(res) = join_set.join_next().await { + match res { + Ok(Ok((root, res))) => { + for m in res.matches { + let result = FuzzyFileSearchResult { + root: root.clone(), + path: m.path, + score: m.score, + indices: m.indices, + }; + files.push(result); + } + } + Ok(Err((root, err))) => { + warn!("fuzzy-file-search in dir '{root}' failed: {err}"); + } + Err(err) => { + warn!("fuzzy-file-search join_next failed: {err}"); + } + } + } + + files.sort_by(file_search::cmp_by_score_desc_then_path_asc::< + FuzzyFileSearchResult, + _, + _, + >(|f| f.score, |f| f.path.as_str())); + + files +} diff --git a/codex-rs/mcp-server/src/lib.rs b/codex-rs/mcp-server/src/lib.rs index b1397846ca..830bf49247 100644 --- a/codex-rs/mcp-server/src/lib.rs +++ b/codex-rs/mcp-server/src/lib.rs @@ -25,6 +25,7 @@ mod codex_tool_config; mod codex_tool_runner; mod error_code; mod exec_approval; +mod fuzzy_file_search; mod json_to_toml; pub(crate) mod message_processor; mod outgoing_message; diff --git a/codex-rs/mcp-server/tests/common/mcp_process.rs b/codex-rs/mcp-server/tests/common/mcp_process.rs index a30e9817f2..f89f72f5a4 100644 --- a/codex-rs/mcp-server/tests/common/mcp_process.rs +++ b/codex-rs/mcp-server/tests/common/mcp_process.rs @@ -357,6 +357,23 @@ impl McpProcess { self.send_request("logoutChatGpt", None).await } + /// Send a `fuzzyFileSearch` JSON-RPC request. + pub async fn send_fuzzy_file_search_request( + &mut self, + query: &str, + roots: Vec, + cancellation_token: Option, + ) -> anyhow::Result { + let mut params = serde_json::json!({ + "query": query, + "roots": roots, + }); + if let Some(token) = cancellation_token { + params["cancellationToken"] = serde_json::json!(token); + } + self.send_request("fuzzyFileSearch", Some(params)).await + } + async fn send_request( &mut self, method: &str, diff --git a/codex-rs/mcp-server/tests/suite/fuzzy_file_search.rs b/codex-rs/mcp-server/tests/suite/fuzzy_file_search.rs new file mode 100644 index 0000000000..e4aa0add9c --- /dev/null +++ b/codex-rs/mcp-server/tests/suite/fuzzy_file_search.rs @@ -0,0 +1,104 @@ +use mcp_test_support::McpProcess; +use mcp_types::JSONRPCResponse; +use mcp_types::RequestId; +use pretty_assertions::assert_eq; +use serde_json::json; +use tempfile::TempDir; +use tokio::time::timeout; + +const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10); + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn test_fuzzy_file_search_sorts_and_includes_indices() { + // Prepare a temporary Codex home and a separate root with test files. + let codex_home = TempDir::new().expect("create temp codex home"); + let root = TempDir::new().expect("create temp search root"); + + // Create files designed to have deterministic ordering for query "abc". + std::fs::write(root.path().join("abc"), "x").expect("write file abc"); + std::fs::write(root.path().join("abcde"), "x").expect("write file abcx"); + std::fs::write(root.path().join("abexy"), "x").expect("write file abcx"); + std::fs::write(root.path().join("zzz.txt"), "x").expect("write file zzz"); + + // Start MCP server and initialize. + let mut mcp = McpProcess::new(codex_home.path()).await.expect("spawn mcp"); + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()) + .await + .expect("init timeout") + .expect("init failed"); + + let root_path = root.path().to_string_lossy().to_string(); + // Send fuzzyFileSearch request. + let request_id = mcp + .send_fuzzy_file_search_request("abe", vec![root_path.clone()], None) + .await + .expect("send fuzzyFileSearch"); + + // Read response and verify shape and ordering. + let resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(request_id)), + ) + .await + .expect("fuzzyFileSearch timeout") + .expect("fuzzyFileSearch resp"); + + let value = resp.result; + assert_eq!( + value, + json!({ + "files": [ + { "root": root_path.clone(), "path": "abexy", "score": 88, "indices": [0, 1, 2] }, + { "root": root_path.clone(), "path": "abcde", "score": 74, "indices": [0, 1, 4] }, + ] + }) + ); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn test_fuzzy_file_search_accepts_cancellation_token() { + let codex_home = TempDir::new().expect("create temp codex home"); + let root = TempDir::new().expect("create temp search root"); + + std::fs::write(root.path().join("alpha.txt"), "contents").expect("write alpha"); + + let mut mcp = McpProcess::new(codex_home.path()).await.expect("spawn mcp"); + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()) + .await + .expect("init timeout") + .expect("init failed"); + + let root_path = root.path().to_string_lossy().to_string(); + let request_id = mcp + .send_fuzzy_file_search_request("alp", vec![root_path.clone()], None) + .await + .expect("send fuzzyFileSearch"); + + let request_id_2 = mcp + .send_fuzzy_file_search_request( + "alp", + vec![root_path.clone()], + Some(request_id.to_string()), + ) + .await + .expect("send fuzzyFileSearch"); + + let resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(request_id_2)), + ) + .await + .expect("fuzzyFileSearch timeout") + .expect("fuzzyFileSearch resp"); + + let files = resp + .result + .get("files") + .and_then(|value| value.as_array()) + .cloned() + .expect("files array"); + + assert_eq!(files.len(), 1); + assert_eq!(files[0]["root"], root_path); + assert_eq!(files[0]["path"], "alpha.txt"); +} diff --git a/codex-rs/mcp-server/tests/suite/mod.rs b/codex-rs/mcp-server/tests/suite/mod.rs index 97e5370908..4a3a91206f 100644 --- a/codex-rs/mcp-server/tests/suite/mod.rs +++ b/codex-rs/mcp-server/tests/suite/mod.rs @@ -5,6 +5,7 @@ mod codex_message_processor_flow; mod codex_tool; mod config; mod create_conversation; +mod fuzzy_file_search; mod interrupt; mod list_resume; mod login; diff --git a/codex-rs/protocol-ts/src/lib.rs b/codex-rs/protocol-ts/src/lib.rs index 3aec3d892f..aaf820965d 100644 --- a/codex-rs/protocol-ts/src/lib.rs +++ b/codex-rs/protocol-ts/src/lib.rs @@ -39,6 +39,9 @@ pub fn generate_ts(out_dir: &Path, prettier: Option<&Path>) -> Result<()> { codex_protocol::mcp_protocol::GetAuthStatusResponse::export_all_to(out_dir)?; codex_protocol::mcp_protocol::ApplyPatchApprovalResponse::export_all_to(out_dir)?; codex_protocol::mcp_protocol::ExecCommandApprovalResponse::export_all_to(out_dir)?; + codex_protocol::mcp_protocol::FuzzyFileSearchParams::export_all_to(out_dir)?; + codex_protocol::mcp_protocol::FuzzyFileSearchResult::export_all_to(out_dir)?; + codex_protocol::mcp_protocol::FuzzyFileSearchResponse::export_all_to(out_dir)?; codex_protocol::mcp_protocol::GetUserSavedConfigResponse::export_all_to(out_dir)?; codex_protocol::mcp_protocol::SetDefaultModelResponse::export_all_to(out_dir)?; codex_protocol::mcp_protocol::GetUserAgentResponse::export_all_to(out_dir)?; diff --git a/codex-rs/protocol/src/mcp_protocol.rs b/codex-rs/protocol/src/mcp_protocol.rs index c63bf2aecc..3024556a81 100644 --- a/codex-rs/protocol/src/mcp_protocol.rs +++ b/codex-rs/protocol/src/mcp_protocol.rs @@ -184,6 +184,11 @@ pub enum ClientRequest { #[serde(rename = "id")] request_id: RequestId, }, + FuzzyFileSearch { + #[serde(rename = "id")] + request_id: RequestId, + params: FuzzyFileSearchParams, + }, /// Execute a command (argv vector) under the server's sandbox. ExecOneOffCommand { #[serde(rename = "id")] @@ -662,6 +667,32 @@ pub struct ApplyPatchApprovalResponse { pub decision: ReviewDecision, } +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, TS)] +#[serde(rename_all = "camelCase")] +#[ts(rename_all = "camelCase")] +pub struct FuzzyFileSearchParams { + pub query: String, + pub roots: Vec, + // if provided, will cancel any previous request that used the same value + #[serde(skip_serializing_if = "Option::is_none")] + pub cancellation_token: Option, +} + +/// Superset of [`codex_file_search::FileMatch`] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, TS)] +pub struct FuzzyFileSearchResult { + pub root: String, + pub path: String, + pub score: u32, + #[serde(skip_serializing_if = "Option::is_none")] + pub indices: Option>, +} + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, TS)] +pub struct FuzzyFileSearchResponse { + pub files: Vec, +} + #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, TS)] #[serde(rename_all = "camelCase")] pub struct LoginChatGptCompleteNotification {