Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions codex-rs/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 20 additions & 3 deletions codex-rs/file-search/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,10 +287,27 @@ pub fn run(

/// Sort matches in-place by descending score, then ascending path.
fn sort_matches(matches: &mut [(u32, String)]) {
matches.sort_by(|a, b| match b.0.cmp(&a.0) {
std::cmp::Ordering::Equal => a.1.cmp(&b.1),
matches.sort_by(cmp_by_score_desc_then_path_asc::<(u32, String), _, _>(
|t| t.0,
|t| t.1.as_str(),
));
}

/// Returns a comparator closure suitable for `slice.sort_by(...)` that orders
/// items by descending score and then ascending path using the provided accessors.
pub fn cmp_by_score_desc_then_path_asc<T, FScore, FPath>(
score_of: FScore,
path_of: FPath,
) -> impl FnMut(&T, &T) -> std::cmp::Ordering
where
FScore: Fn(&T) -> u32,
FPath: Fn(&T) -> &str,
{
use std::cmp::Ordering;
move |a, b| match score_of(b).cmp(&score_of(a)) {
Ordering::Equal => path_of(a).cmp(path_of(b)),
other => other,
});
}
}

/// Maintains the `max_count` best matches for a given pattern.
Expand Down
1 change: 1 addition & 0 deletions codex-rs/mcp-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ anyhow = { workspace = true }
codex-arg0 = { workspace = true }
codex-common = { workspace = true, features = ["cli"] }
codex-core = { workspace = true }
codex-file-search = { workspace = true }
codex-login = { workspace = true }
codex-protocol = { workspace = true }
mcp-types = { workspace = true }
Expand Down
50 changes: 50 additions & 0 deletions codex-rs/mcp-server/src/codex_message_processor.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::error_code::INTERNAL_ERROR_CODE;
use crate::error_code::INVALID_REQUEST_ERROR_CODE;
use crate::fuzzy_file_search::run_fuzzy_file_search;
use crate::json_to_toml::json_to_toml;
use crate::outgoing_message::OutgoingMessageSender;
use crate::outgoing_message::OutgoingNotification;
Expand Down Expand Up @@ -52,6 +53,8 @@ use codex_protocol::mcp_protocol::ExecArbitraryCommandResponse;
use codex_protocol::mcp_protocol::ExecCommandApprovalParams;
use codex_protocol::mcp_protocol::ExecCommandApprovalResponse;
use codex_protocol::mcp_protocol::ExecOneOffCommandParams;
use codex_protocol::mcp_protocol::FuzzyFileSearchParams;
use codex_protocol::mcp_protocol::FuzzyFileSearchResponse;
use codex_protocol::mcp_protocol::GetUserAgentResponse;
use codex_protocol::mcp_protocol::GetUserSavedConfigResponse;
use codex_protocol::mcp_protocol::GitDiffToRemoteResponse;
Expand Down Expand Up @@ -88,6 +91,8 @@ use std::collections::HashMap;
use std::ffi::OsStr;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::time::Duration;
use tokio::select;
use tokio::sync::Mutex;
Expand Down Expand Up @@ -122,6 +127,7 @@ pub(crate) struct CodexMessageProcessor {
active_login: Arc<Mutex<Option<ActiveLogin>>>,
// Queue of pending interrupt requests per conversation. We reply when TurnAborted arrives.
pending_interrupts: Arc<Mutex<HashMap<ConversationId, Vec<RequestId>>>>,
pending_fuzzy_searches: Arc<Mutex<HashMap<String, Arc<AtomicBool>>>>,
}

impl CodexMessageProcessor {
Expand All @@ -141,6 +147,7 @@ impl CodexMessageProcessor {
conversation_listeners: HashMap::new(),
active_login: Arc::new(Mutex::new(None)),
pending_interrupts: Arc::new(Mutex::new(HashMap::new())),
pending_fuzzy_searches: Arc::new(Mutex::new(HashMap::new())),
}
}

Expand Down Expand Up @@ -206,6 +213,9 @@ impl CodexMessageProcessor {
ClientRequest::UserInfo { request_id } => {
self.get_user_info(request_id).await;
}
ClientRequest::FuzzyFileSearch { request_id, params } => {
self.fuzzy_file_search(request_id, params).await;
}
ClientRequest::ExecOneOffCommand { request_id, params } => {
self.exec_one_off_command(request_id, params).await;
}
Expand Down Expand Up @@ -1167,6 +1177,46 @@ impl CodexMessageProcessor {
}
}
}

async fn fuzzy_file_search(&mut self, request_id: RequestId, params: FuzzyFileSearchParams) {
let FuzzyFileSearchParams {
query,
roots,
cancellation_token,
} = params;

let cancel_flag = match cancellation_token.clone() {
Some(token) => {
let mut pending_fuzzy_searches = self.pending_fuzzy_searches.lock().await;
// if a cancellation_token is provided and a pending_request exists for
// that token, cancel it
if let Some(existing) = pending_fuzzy_searches.get(&token) {
existing.store(true, Ordering::Relaxed);
}
let flag = Arc::new(AtomicBool::new(false));
pending_fuzzy_searches.insert(token.clone(), flag.clone());
flag
}
None => Arc::new(AtomicBool::new(false)),
};

let results = match query.as_str() {
"" => vec![],
_ => run_fuzzy_file_search(query, roots, cancel_flag.clone()).await,
};

if let Some(token) = cancellation_token {
let mut pending_fuzzy_searches = self.pending_fuzzy_searches.lock().await;
if let Some(current_flag) = pending_fuzzy_searches.get(&token)
&& Arc::ptr_eq(current_flag, &cancel_flag)
{
pending_fuzzy_searches.remove(&token);
}
}

let response = FuzzyFileSearchResponse { files: results };
self.outgoing.send_response(request_id, response).await;
}
}

async fn apply_bespoke_event_handling(
Expand Down
84 changes: 84 additions & 0 deletions codex-rs/mcp-server/src/fuzzy_file_search.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
use std::num::NonZero;
use std::num::NonZeroUsize;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;

use codex_file_search as file_search;
use codex_protocol::mcp_protocol::FuzzyFileSearchResult;
use tokio::task::JoinSet;
use tracing::warn;

const LIMIT_PER_ROOT: usize = 50;
const MAX_THREADS: usize = 12;
const COMPUTE_INDICES: bool = true;

pub(crate) async fn run_fuzzy_file_search(
query: String,
roots: Vec<String>,
cancellation_flag: Arc<AtomicBool>,
) -> Vec<FuzzyFileSearchResult> {
#[expect(clippy::expect_used)]
let limit_per_root =
NonZero::new(LIMIT_PER_ROOT).expect("LIMIT_PER_ROOT should be a valid non-zero usize");

let cores = std::thread::available_parallelism()
.map(std::num::NonZero::get)
.unwrap_or(1);
let threads = cores.min(MAX_THREADS);
let threads_per_root = (threads / roots.len()).max(1);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let threads_per_root = (threads / roots.len()).max(1);
let threads_per_root = (threads / roots.len()).min(1);

right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we want max here! It's primarily to catch the case where roots.len() > threads, and make sure threads_per_root != 0

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You want 5.min(1) -> 5 and 0.min(1) -> 1 right?

Isn't 5.max(1) -> 1 and 0.max(1) -> 0?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other way around! Here's a repl to make sure we're on the same page:

Screenshot 2025-09-26 at 5 31 43 PM

let threads = NonZero::new(threads_per_root).unwrap_or(NonZeroUsize::MIN);

let mut files: Vec<FuzzyFileSearchResult> = Vec::new();
let mut join_set = JoinSet::new();

for root in roots {
let search_dir = PathBuf::from(&root);
let query = query.clone();
let cancel_flag = cancellation_flag.clone();
join_set.spawn_blocking(move || {
match file_search::run(
query.as_str(),
limit_per_root,
&search_dir,
Vec::new(),
threads,
cancel_flag,
COMPUTE_INDICES,
) {
Ok(res) => Ok((root, res)),
Err(err) => Err((root, err)),
}
});
}

while let Some(res) = join_set.join_next().await {
match res {
Ok(Ok((root, res))) => {
for m in res.matches {
let result = FuzzyFileSearchResult {
root: root.clone(),
path: m.path,
score: m.score,
indices: m.indices,
};
files.push(result);
}
}
Ok(Err((root, err))) => {
warn!("fuzzy-file-search in dir '{root}' failed: {err}");
}
Err(err) => {
warn!("fuzzy-file-search join_next failed: {err}");
}
}
}

files.sort_by(file_search::cmp_by_score_desc_then_path_asc::<
FuzzyFileSearchResult,
_,
_,
>(|f| f.score, |f| f.path.as_str()));

files
}
1 change: 1 addition & 0 deletions codex-rs/mcp-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ mod codex_tool_config;
mod codex_tool_runner;
mod error_code;
mod exec_approval;
mod fuzzy_file_search;
mod json_to_toml;
pub(crate) mod message_processor;
mod outgoing_message;
Expand Down
17 changes: 17 additions & 0 deletions codex-rs/mcp-server/tests/common/mcp_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,23 @@ impl McpProcess {
self.send_request("logoutChatGpt", None).await
}

/// Send a `fuzzyFileSearch` JSON-RPC request.
pub async fn send_fuzzy_file_search_request(
&mut self,
query: &str,
roots: Vec<String>,
cancellation_token: Option<String>,
) -> anyhow::Result<i64> {
let mut params = serde_json::json!({
"query": query,
"roots": roots,
});
if let Some(token) = cancellation_token {
params["cancellationToken"] = serde_json::json!(token);
}
self.send_request("fuzzyFileSearch", Some(params)).await
}

async fn send_request(
&mut self,
method: &str,
Expand Down
104 changes: 104 additions & 0 deletions codex-rs/mcp-server/tests/suite/fuzzy_file_search.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
use mcp_test_support::McpProcess;
use mcp_types::JSONRPCResponse;
use mcp_types::RequestId;
use pretty_assertions::assert_eq;
use serde_json::json;
use tempfile::TempDir;
use tokio::time::timeout;

const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_fuzzy_file_search_sorts_and_includes_indices() {
// Prepare a temporary Codex home and a separate root with test files.
let codex_home = TempDir::new().expect("create temp codex home");
let root = TempDir::new().expect("create temp search root");

// Create files designed to have deterministic ordering for query "abc".
std::fs::write(root.path().join("abc"), "x").expect("write file abc");
std::fs::write(root.path().join("abcde"), "x").expect("write file abcx");
std::fs::write(root.path().join("abexy"), "x").expect("write file abcx");
std::fs::write(root.path().join("zzz.txt"), "x").expect("write file zzz");

// Start MCP server and initialize.
let mut mcp = McpProcess::new(codex_home.path()).await.expect("spawn mcp");
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize())
.await
.expect("init timeout")
.expect("init failed");

let root_path = root.path().to_string_lossy().to_string();
// Send fuzzyFileSearch request.
let request_id = mcp
.send_fuzzy_file_search_request("abe", vec![root_path.clone()], None)
.await
.expect("send fuzzyFileSearch");

// Read response and verify shape and ordering.
let resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
)
.await
.expect("fuzzyFileSearch timeout")
.expect("fuzzyFileSearch resp");

let value = resp.result;
assert_eq!(
value,
json!({
"files": [
{ "root": root_path.clone(), "path": "abexy", "score": 88, "indices": [0, 1, 2] },
{ "root": root_path.clone(), "path": "abcde", "score": 74, "indices": [0, 1, 4] },
]
})
);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_fuzzy_file_search_accepts_cancellation_token() {
let codex_home = TempDir::new().expect("create temp codex home");
let root = TempDir::new().expect("create temp search root");

std::fs::write(root.path().join("alpha.txt"), "contents").expect("write alpha");

let mut mcp = McpProcess::new(codex_home.path()).await.expect("spawn mcp");
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize())
.await
.expect("init timeout")
.expect("init failed");

let root_path = root.path().to_string_lossy().to_string();
let request_id = mcp
.send_fuzzy_file_search_request("alp", vec![root_path.clone()], None)
.await
.expect("send fuzzyFileSearch");

let request_id_2 = mcp
.send_fuzzy_file_search_request(
"alp",
vec![root_path.clone()],
Some(request_id.to_string()),
)
.await
.expect("send fuzzyFileSearch");

let resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(request_id_2)),
)
.await
.expect("fuzzyFileSearch timeout")
.expect("fuzzyFileSearch resp");

let files = resp
.result
.get("files")
.and_then(|value| value.as_array())
.cloned()
.expect("files array");

assert_eq!(files.len(), 1);
assert_eq!(files[0]["root"], root_path);
assert_eq!(files[0]["path"], "alpha.txt");
}
1 change: 1 addition & 0 deletions codex-rs/mcp-server/tests/suite/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ mod codex_message_processor_flow;
mod codex_tool;
mod config;
mod create_conversation;
mod fuzzy_file_search;
mod interrupt;
mod list_resume;
mod login;
Expand Down
3 changes: 3 additions & 0 deletions codex-rs/protocol-ts/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ pub fn generate_ts(out_dir: &Path, prettier: Option<&Path>) -> Result<()> {
codex_protocol::mcp_protocol::GetAuthStatusResponse::export_all_to(out_dir)?;
codex_protocol::mcp_protocol::ApplyPatchApprovalResponse::export_all_to(out_dir)?;
codex_protocol::mcp_protocol::ExecCommandApprovalResponse::export_all_to(out_dir)?;
codex_protocol::mcp_protocol::FuzzyFileSearchParams::export_all_to(out_dir)?;
codex_protocol::mcp_protocol::FuzzyFileSearchResult::export_all_to(out_dir)?;
codex_protocol::mcp_protocol::FuzzyFileSearchResponse::export_all_to(out_dir)?;
codex_protocol::mcp_protocol::GetUserSavedConfigResponse::export_all_to(out_dir)?;
codex_protocol::mcp_protocol::SetDefaultModelResponse::export_all_to(out_dir)?;
codex_protocol::mcp_protocol::GetUserAgentResponse::export_all_to(out_dir)?;
Expand Down
Loading
Loading