diff --git a/codex-rs/app-server/src/codex_message_processor.rs b/codex-rs/app-server/src/codex_message_processor.rs index 7876cccf89c..3df940889f6 100644 --- a/codex-rs/app-server/src/codex_message_processor.rs +++ b/codex-rs/app-server/src/codex_message_processor.rs @@ -185,6 +185,9 @@ pub(crate) struct TurnSummary { pub(crate) type TurnSummaryStore = Arc>>; +const THREAD_LIST_DEFAULT_LIMIT: usize = 25; +const THREAD_LIST_MAX_LIMIT: usize = 100; + // Duration before a ChatGPT login attempt is abandoned. const LOGIN_CHATGPT_TIMEOUT: Duration = Duration::from_secs(10 * 60); struct ActiveLogin { @@ -1508,10 +1511,12 @@ impl CodexMessageProcessor { model_providers, } = params; - let page_size = limit.unwrap_or(25).max(1) as usize; - + let requested_page_size = limit + .map(|value| value as usize) + .unwrap_or(THREAD_LIST_DEFAULT_LIMIT) + .clamp(1, THREAD_LIST_MAX_LIMIT); let (summaries, next_cursor) = match self - .list_conversations_common(page_size, cursor, model_providers) + .list_conversations_common(requested_page_size, cursor, model_providers) .await { Ok(r) => r, @@ -1522,7 +1527,6 @@ impl CodexMessageProcessor { }; let data = summaries.into_iter().map(summary_to_thread).collect(); - let response = ThreadListResponse { data, next_cursor }; self.outgoing.send_response(request_id, response).await; } @@ -1800,10 +1804,12 @@ impl CodexMessageProcessor { cursor, model_providers, } = params; - let page_size = page_size.unwrap_or(25).max(1); + let requested_page_size = page_size + .unwrap_or(THREAD_LIST_DEFAULT_LIMIT) + .clamp(1, THREAD_LIST_MAX_LIMIT); match self - .list_conversations_common(page_size, cursor, model_providers) + .list_conversations_common(requested_page_size, cursor, model_providers) .await { Ok((items, next_cursor)) => { @@ -1818,12 +1824,15 @@ impl CodexMessageProcessor { async fn list_conversations_common( &self, - page_size: usize, + requested_page_size: usize, cursor: Option, model_providers: Option>, ) -> Result<(Vec, Option), JSONRPCErrorError> { - let cursor_obj: Option = cursor.as_ref().and_then(|s| parse_cursor(s)); - let cursor_ref = cursor_obj.as_ref(); + let mut cursor_obj: Option = cursor.as_ref().and_then(|s| parse_cursor(s)); + let mut last_cursor = cursor_obj.clone(); + let mut remaining = requested_page_size; + let mut items = Vec::with_capacity(requested_page_size); + let mut next_cursor: Option = None; let model_provider_filter = match model_providers { Some(providers) => { @@ -1837,48 +1846,69 @@ impl CodexMessageProcessor { }; let fallback_provider = self.config.model_provider_id.clone(); - let page = match RolloutRecorder::list_conversations( - &self.config.codex_home, - page_size, - cursor_ref, - INTERACTIVE_SESSION_SOURCES, - model_provider_filter.as_deref(), - fallback_provider.as_str(), - ) - .await - { - Ok(p) => p, - Err(err) => { - return Err(JSONRPCErrorError { - code: INTERNAL_ERROR_CODE, - message: format!("failed to list conversations: {err}"), - data: None, - }); - } - }; - - let items = page - .items - .into_iter() - .filter_map(|it| { - let session_meta_line = it.head.first().and_then(|first| { - serde_json::from_value::(first.clone()).ok() - })?; - extract_conversation_summary( - it.path, - &it.head, - &session_meta_line.meta, - session_meta_line.git.as_ref(), - fallback_provider.as_str(), - ) - }) - .collect::>(); + while remaining > 0 { + let page_size = remaining.min(THREAD_LIST_MAX_LIMIT); + let page = RolloutRecorder::list_conversations( + &self.config.codex_home, + page_size, + cursor_obj.as_ref(), + INTERACTIVE_SESSION_SOURCES, + model_provider_filter.as_deref(), + fallback_provider.as_str(), + ) + .await + .map_err(|err| JSONRPCErrorError { + code: INTERNAL_ERROR_CODE, + message: format!("failed to list conversations: {err}"), + data: None, + })?; - // Encode next_cursor as a plain string - let next_cursor = page - .next_cursor - .and_then(|cursor| serde_json::to_value(&cursor).ok()) - .and_then(|value| value.as_str().map(str::to_owned)); + let mut filtered = page + .items + .into_iter() + .filter_map(|it| { + let session_meta_line = it.head.first().and_then(|first| { + serde_json::from_value::(first.clone()).ok() + })?; + extract_conversation_summary( + it.path, + &it.head, + &session_meta_line.meta, + session_meta_line.git.as_ref(), + fallback_provider.as_str(), + ) + }) + .collect::>(); + if filtered.len() > remaining { + filtered.truncate(remaining); + } + items.extend(filtered); + remaining = requested_page_size.saturating_sub(items.len()); + + // Encode RolloutCursor into the JSON-RPC string form returned to clients. + let next_cursor_value = page.next_cursor.clone(); + next_cursor = next_cursor_value + .as_ref() + .and_then(|cursor| serde_json::to_value(cursor).ok()) + .and_then(|value| value.as_str().map(str::to_owned)); + if remaining == 0 { + break; + } + + match next_cursor_value { + Some(cursor_val) if remaining > 0 => { + // Break if our pagination would reuse the same cursor again; this avoids + // an infinite loop when filtering drops everything on the page. + if last_cursor.as_ref() == Some(&cursor_val) { + next_cursor = None; + break; + } + last_cursor = Some(cursor_val.clone()); + cursor_obj = Some(cursor_val); + } + _ => break, + } + } Ok((items, next_cursor)) } diff --git a/codex-rs/app-server/tests/suite/list_resume.rs b/codex-rs/app-server/tests/suite/list_resume.rs index 1e89c068484..34e737437ca 100644 --- a/codex-rs/app-server/tests/suite/list_resume.rs +++ b/codex-rs/app-server/tests/suite/list_resume.rs @@ -358,3 +358,81 @@ async fn test_list_and_resume_conversations() -> Result<()> { Ok(()) } + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn list_conversations_fetches_through_filtered_pages() -> Result<()> { + let codex_home = TempDir::new()?; + + // Only the last 3 conversations match the provider filter; request 3 and + // ensure pagination keeps fetching past non-matching pages. + let cases = [ + ( + "2025-03-04T12-00-00", + "2025-03-04T12:00:00Z", + "skip_provider", + ), + ( + "2025-03-03T12-00-00", + "2025-03-03T12:00:00Z", + "skip_provider", + ), + ( + "2025-03-02T12-00-00", + "2025-03-02T12:00:00Z", + "target_provider", + ), + ( + "2025-03-01T12-00-00", + "2025-03-01T12:00:00Z", + "target_provider", + ), + ( + "2025-02-28T12-00-00", + "2025-02-28T12:00:00Z", + "target_provider", + ), + ]; + + for (ts_file, ts_rfc, provider) in cases { + create_fake_rollout( + codex_home.path(), + ts_file, + ts_rfc, + "Hello", + Some(provider), + None, + )?; + } + + let mut mcp = McpProcess::new(codex_home.path()).await?; + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + + let req_id = mcp + .send_list_conversations_request(ListConversationsParams { + page_size: Some(3), + cursor: None, + model_providers: Some(vec!["target_provider".to_string()]), + }) + .await?; + let resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(req_id)), + ) + .await??; + let ListConversationsResponse { items, next_cursor } = + to_response::(resp)?; + + assert_eq!( + items.len(), + 3, + "should fetch across pages to satisfy the limit" + ); + assert!( + items + .iter() + .all(|item| item.model_provider == "target_provider") + ); + assert_eq!(next_cursor, None); + + Ok(()) +} diff --git a/codex-rs/app-server/tests/suite/v2/thread_list.rs b/codex-rs/app-server/tests/suite/v2/thread_list.rs index 57299ef97e2..0132651df82 100644 --- a/codex-rs/app-server/tests/suite/v2/thread_list.rs +++ b/codex-rs/app-server/tests/suite/v2/thread_list.rs @@ -6,37 +6,96 @@ use codex_app_server_protocol::GitInfo as ApiGitInfo; use codex_app_server_protocol::JSONRPCResponse; use codex_app_server_protocol::RequestId; use codex_app_server_protocol::SessionSource; -use codex_app_server_protocol::ThreadListParams; use codex_app_server_protocol::ThreadListResponse; use codex_protocol::protocol::GitInfo as CoreGitInfo; +use std::path::Path; use std::path::PathBuf; use tempfile::TempDir; use tokio::time::timeout; const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10); -#[tokio::test] -async fn thread_list_basic_empty() -> Result<()> { - let codex_home = TempDir::new()?; - create_minimal_config(codex_home.path())?; - - let mut mcp = McpProcess::new(codex_home.path()).await?; +async fn init_mcp(codex_home: &Path) -> Result { + let mut mcp = McpProcess::new(codex_home).await?; timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + Ok(mcp) +} - // List threads in an empty CODEX_HOME; should return an empty page with nextCursor: null. - let list_id = mcp - .send_thread_list_request(ThreadListParams { - cursor: None, - limit: Some(10), - model_providers: Some(vec!["mock_provider".to_string()]), +async fn list_threads( + mcp: &mut McpProcess, + cursor: Option, + limit: Option, + providers: Option>, +) -> Result { + let request_id = mcp + .send_thread_list_request(codex_app_server_protocol::ThreadListParams { + cursor, + limit, + model_providers: providers, }) .await?; - let list_resp: JSONRPCResponse = timeout( + let resp: JSONRPCResponse = timeout( DEFAULT_READ_TIMEOUT, - mcp.read_stream_until_response_message(RequestId::Integer(list_id)), + mcp.read_stream_until_response_message(RequestId::Integer(request_id)), ) .await??; - let ThreadListResponse { data, next_cursor } = to_response::(list_resp)?; + to_response::(resp) +} + +fn create_fake_rollouts( + codex_home: &Path, + count: usize, + provider_for_index: F, + timestamp_for_index: G, + preview: &str, +) -> Result> +where + F: Fn(usize) -> &'static str, + G: Fn(usize) -> (String, String), +{ + let mut ids = Vec::with_capacity(count); + for i in 0..count { + let (ts_file, ts_rfc) = timestamp_for_index(i); + ids.push(create_fake_rollout( + codex_home, + &ts_file, + &ts_rfc, + preview, + Some(provider_for_index(i)), + None, + )?); + } + Ok(ids) +} + +fn timestamp_at( + year: i32, + month: u32, + day: u32, + hour: u32, + minute: u32, + second: u32, +) -> (String, String) { + ( + format!("{year:04}-{month:02}-{day:02}T{hour:02}-{minute:02}-{second:02}"), + format!("{year:04}-{month:02}-{day:02}T{hour:02}:{minute:02}:{second:02}Z"), + ) +} + +#[tokio::test] +async fn thread_list_basic_empty() -> Result<()> { + let codex_home = TempDir::new()?; + create_minimal_config(codex_home.path())?; + + let mut mcp = init_mcp(codex_home.path()).await?; + + let ThreadListResponse { data, next_cursor } = list_threads( + &mut mcp, + None, + Some(10), + Some(vec!["mock_provider".to_string()]), + ) + .await?; assert!(data.is_empty()); assert_eq!(next_cursor, None); @@ -86,26 +145,19 @@ async fn thread_list_pagination_next_cursor_none_on_last_page() -> Result<()> { None, )?; - let mut mcp = McpProcess::new(codex_home.path()).await?; - timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + let mut mcp = init_mcp(codex_home.path()).await?; // Page 1: limit 2 → expect next_cursor Some. - let page1_id = mcp - .send_thread_list_request(ThreadListParams { - cursor: None, - limit: Some(2), - model_providers: Some(vec!["mock_provider".to_string()]), - }) - .await?; - let page1_resp: JSONRPCResponse = timeout( - DEFAULT_READ_TIMEOUT, - mcp.read_stream_until_response_message(RequestId::Integer(page1_id)), - ) - .await??; let ThreadListResponse { data: data1, next_cursor: cursor1, - } = to_response::(page1_resp)?; + } = list_threads( + &mut mcp, + None, + Some(2), + Some(vec!["mock_provider".to_string()]), + ) + .await?; assert_eq!(data1.len(), 2); for thread in &data1 { assert_eq!(thread.preview, "Hello"); @@ -119,22 +171,16 @@ async fn thread_list_pagination_next_cursor_none_on_last_page() -> Result<()> { let cursor1 = cursor1.expect("expected nextCursor on first page"); // Page 2: with cursor → expect next_cursor None when no more results. - let page2_id = mcp - .send_thread_list_request(ThreadListParams { - cursor: Some(cursor1), - limit: Some(2), - model_providers: Some(vec!["mock_provider".to_string()]), - }) - .await?; - let page2_resp: JSONRPCResponse = timeout( - DEFAULT_READ_TIMEOUT, - mcp.read_stream_until_response_message(RequestId::Integer(page2_id)), - ) - .await??; let ThreadListResponse { data: data2, next_cursor: cursor2, - } = to_response::(page2_resp)?; + } = list_threads( + &mut mcp, + Some(cursor1), + Some(2), + Some(vec!["mock_provider".to_string()]), + ) + .await?; assert!(data2.len() <= 2); for thread in &data2 { assert_eq!(thread.preview, "Hello"); @@ -173,23 +219,16 @@ async fn thread_list_respects_provider_filter() -> Result<()> { None, )?; - let mut mcp = McpProcess::new(codex_home.path()).await?; - timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + let mut mcp = init_mcp(codex_home.path()).await?; // Filter to only other_provider; expect 1 item, nextCursor None. - let list_id = mcp - .send_thread_list_request(ThreadListParams { - cursor: None, - limit: Some(10), - model_providers: Some(vec!["other_provider".to_string()]), - }) - .await?; - let resp: JSONRPCResponse = timeout( - DEFAULT_READ_TIMEOUT, - mcp.read_stream_until_response_message(RequestId::Integer(list_id)), + let ThreadListResponse { data, next_cursor } = list_threads( + &mut mcp, + None, + Some(10), + Some(vec!["other_provider".to_string()]), ) - .await??; - let ThreadListResponse { data, next_cursor } = to_response::(resp)?; + .await?; assert_eq!(data.len(), 1); assert_eq!(next_cursor, None); let thread = &data[0]; @@ -205,6 +244,146 @@ async fn thread_list_respects_provider_filter() -> Result<()> { Ok(()) } +#[tokio::test] +async fn thread_list_fetches_until_limit_or_exhausted() -> Result<()> { + let codex_home = TempDir::new()?; + create_minimal_config(codex_home.path())?; + + // Newest 16 conversations belong to a different provider; the older 8 are the + // only ones that match the filter. We request 8 so the server must keep + // paging past the first two pages to reach the desired count. + create_fake_rollouts( + codex_home.path(), + 24, + |i| { + if i < 16 { + "skip_provider" + } else { + "target_provider" + } + }, + |i| timestamp_at(2025, 3, 30 - i as u32, 12, 0, 0), + "Hello", + )?; + + let mut mcp = init_mcp(codex_home.path()).await?; + + // Request 8 threads for the target provider; the matches only start on the + // third page so we rely on pagination to reach the limit. + let ThreadListResponse { data, next_cursor } = list_threads( + &mut mcp, + None, + Some(8), + Some(vec!["target_provider".to_string()]), + ) + .await?; + assert_eq!( + data.len(), + 8, + "should keep paging until the requested count is filled" + ); + assert!( + data.iter() + .all(|thread| thread.model_provider == "target_provider"), + "all returned threads must match the requested provider" + ); + assert_eq!( + next_cursor, None, + "once the requested count is satisfied on the final page, nextCursor should be None" + ); + + Ok(()) +} + +#[tokio::test] +async fn thread_list_enforces_max_limit() -> Result<()> { + let codex_home = TempDir::new()?; + create_minimal_config(codex_home.path())?; + + create_fake_rollouts( + codex_home.path(), + 105, + |_| "mock_provider", + |i| { + let month = 5 + (i / 28); + let day = (i % 28) + 1; + timestamp_at(2025, month as u32, day as u32, 0, 0, 0) + }, + "Hello", + )?; + + let mut mcp = init_mcp(codex_home.path()).await?; + + let ThreadListResponse { data, next_cursor } = list_threads( + &mut mcp, + None, + Some(200), + Some(vec!["mock_provider".to_string()]), + ) + .await?; + assert_eq!( + data.len(), + 100, + "limit should be clamped to the maximum page size" + ); + assert!( + next_cursor.is_some(), + "when more than the maximum exist, nextCursor should continue pagination" + ); + + Ok(()) +} + +#[tokio::test] +async fn thread_list_stops_when_not_enough_filtered_results_exist() -> Result<()> { + let codex_home = TempDir::new()?; + create_minimal_config(codex_home.path())?; + + // Only the last 7 conversations match the provider filter; we ask for 10 to + // ensure the server exhausts pagination without looping forever. + create_fake_rollouts( + codex_home.path(), + 22, + |i| { + if i < 15 { + "skip_provider" + } else { + "target_provider" + } + }, + |i| timestamp_at(2025, 4, 28 - i as u32, 8, 0, 0), + "Hello", + )?; + + let mut mcp = init_mcp(codex_home.path()).await?; + + // Request more threads than exist after filtering; expect all matches to be + // returned with nextCursor None. + let ThreadListResponse { data, next_cursor } = list_threads( + &mut mcp, + None, + Some(10), + Some(vec!["target_provider".to_string()]), + ) + .await?; + assert_eq!( + data.len(), + 7, + "all available filtered threads should be returned" + ); + assert!( + data.iter() + .all(|thread| thread.model_provider == "target_provider"), + "results should still respect the provider filter" + ); + assert_eq!( + next_cursor, None, + "when results are exhausted before reaching the limit, nextCursor should be None" + ); + + Ok(()) +} + #[tokio::test] async fn thread_list_includes_git_info() -> Result<()> { let codex_home = TempDir::new()?; @@ -224,22 +403,15 @@ async fn thread_list_includes_git_info() -> Result<()> { Some(git_info), )?; - let mut mcp = McpProcess::new(codex_home.path()).await?; - timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + let mut mcp = init_mcp(codex_home.path()).await?; - let list_id = mcp - .send_thread_list_request(ThreadListParams { - cursor: None, - limit: Some(10), - model_providers: Some(vec!["mock_provider".to_string()]), - }) - .await?; - let resp: JSONRPCResponse = timeout( - DEFAULT_READ_TIMEOUT, - mcp.read_stream_until_response_message(RequestId::Integer(list_id)), + let ThreadListResponse { data, .. } = list_threads( + &mut mcp, + None, + Some(10), + Some(vec!["mock_provider".to_string()]), ) - .await??; - let ThreadListResponse { data, .. } = to_response::(resp)?; + .await?; let thread = data .iter() .find(|t| t.id == conversation_id)