Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 80 additions & 50 deletions codex-rs/app-server/src/codex_message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,9 @@ pub(crate) struct TurnSummary {

pub(crate) type TurnSummaryStore = Arc<Mutex<HashMap<ConversationId, TurnSummary>>>;

const THREAD_LIST_DEFAULT_LIMIT: usize = 25;
const THREAD_LIST_MAX_LIMIT: usize = 100;

// Duration before a ChatGPT login attempt is abandoned.
const LOGIN_CHATGPT_TIMEOUT: Duration = Duration::from_secs(10 * 60);
struct ActiveLogin {
Expand Down Expand Up @@ -1508,10 +1511,12 @@ impl CodexMessageProcessor {
model_providers,
} = params;

let page_size = limit.unwrap_or(25).max(1) as usize;

let requested_page_size = limit
.map(|value| value as usize)
.unwrap_or(THREAD_LIST_DEFAULT_LIMIT)
.clamp(1, THREAD_LIST_MAX_LIMIT);
let (summaries, next_cursor) = match self
.list_conversations_common(page_size, cursor, model_providers)
.list_conversations_common(requested_page_size, cursor, model_providers)
.await
{
Ok(r) => r,
Expand All @@ -1522,7 +1527,6 @@ impl CodexMessageProcessor {
};

let data = summaries.into_iter().map(summary_to_thread).collect();

let response = ThreadListResponse { data, next_cursor };
self.outgoing.send_response(request_id, response).await;
}
Expand Down Expand Up @@ -1800,10 +1804,12 @@ impl CodexMessageProcessor {
cursor,
model_providers,
} = params;
let page_size = page_size.unwrap_or(25).max(1);
let requested_page_size = page_size
.unwrap_or(THREAD_LIST_DEFAULT_LIMIT)
.clamp(1, THREAD_LIST_MAX_LIMIT);

match self
.list_conversations_common(page_size, cursor, model_providers)
.list_conversations_common(requested_page_size, cursor, model_providers)
.await
{
Ok((items, next_cursor)) => {
Expand All @@ -1818,12 +1824,15 @@ impl CodexMessageProcessor {

async fn list_conversations_common(
&self,
page_size: usize,
requested_page_size: usize,
cursor: Option<String>,
model_providers: Option<Vec<String>>,
) -> Result<(Vec<ConversationSummary>, Option<String>), JSONRPCErrorError> {
let cursor_obj: Option<RolloutCursor> = cursor.as_ref().and_then(|s| parse_cursor(s));
let cursor_ref = cursor_obj.as_ref();
let mut cursor_obj: Option<RolloutCursor> = cursor.as_ref().and_then(|s| parse_cursor(s));
let mut last_cursor = cursor_obj.clone();
let mut remaining = requested_page_size;
let mut items = Vec::with_capacity(requested_page_size);
let mut next_cursor: Option<String> = None;

let model_provider_filter = match model_providers {
Some(providers) => {
Expand All @@ -1837,48 +1846,69 @@ impl CodexMessageProcessor {
};
let fallback_provider = self.config.model_provider_id.clone();

let page = match RolloutRecorder::list_conversations(
&self.config.codex_home,
page_size,
cursor_ref,
INTERACTIVE_SESSION_SOURCES,
model_provider_filter.as_deref(),
fallback_provider.as_str(),
)
.await
{
Ok(p) => p,
Err(err) => {
return Err(JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("failed to list conversations: {err}"),
data: None,
});
}
};

let items = page
.items
.into_iter()
.filter_map(|it| {
let session_meta_line = it.head.first().and_then(|first| {
serde_json::from_value::<SessionMetaLine>(first.clone()).ok()
})?;
extract_conversation_summary(
it.path,
&it.head,
&session_meta_line.meta,
session_meta_line.git.as_ref(),
fallback_provider.as_str(),
)
})
.collect::<Vec<_>>();
while remaining > 0 {
let page_size = remaining.min(THREAD_LIST_MAX_LIMIT);
let page = RolloutRecorder::list_conversations(
&self.config.codex_home,
page_size,
cursor_obj.as_ref(),
INTERACTIVE_SESSION_SOURCES,
model_provider_filter.as_deref(),
fallback_provider.as_str(),
)
.await
.map_err(|err| JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("failed to list conversations: {err}"),
data: None,
})?;

// Encode next_cursor as a plain string
let next_cursor = page
.next_cursor
.and_then(|cursor| serde_json::to_value(&cursor).ok())
.and_then(|value| value.as_str().map(str::to_owned));
let mut filtered = page
.items
.into_iter()
.filter_map(|it| {
let session_meta_line = it.head.first().and_then(|first| {
serde_json::from_value::<SessionMetaLine>(first.clone()).ok()
})?;
extract_conversation_summary(
it.path,
&it.head,
&session_meta_line.meta,
session_meta_line.git.as_ref(),
fallback_provider.as_str(),
)
})
.collect::<Vec<_>>();
if filtered.len() > remaining {
filtered.truncate(remaining);
}
items.extend(filtered);
remaining = requested_page_size.saturating_sub(items.len());

// Encode RolloutCursor into the JSON-RPC string form returned to clients.
let next_cursor_value = page.next_cursor.clone();
next_cursor = next_cursor_value
.as_ref()
.and_then(|cursor| serde_json::to_value(cursor).ok())
.and_then(|value| value.as_str().map(str::to_owned));
Comment on lines +1889 to +1893
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Clear next_cursor when no further threads exist

list_conversations_common always forwards the cursor from the last page even after the requested page size has been satisfied. RolloutRecorder::list_conversations marks next_cursor as soon as a page fills (see core/src/rollout/list.rs where more_matches_available is set when items.len() == page_size), so a request that matches exactly limit conversations—e.g., three threads for a single provider when only three exist—returns a non-null next_cursor despite there being no additional results. Clients will keep paginating and receive an empty page, which contradicts the API contract that nextCursor should be None once results are exhausted.

Useful? React with 👍 / 👎.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looked into this, and if that happens, you just request more with that cursor and then get an empty array, which is fine, and it seems that changing that behavior will just add more complexity so I think this is fine.

if remaining == 0 {
break;
}

match next_cursor_value {
Some(cursor_val) if remaining > 0 => {
// Break if our pagination would reuse the same cursor again; this avoids
// an infinite loop when filtering drops everything on the page.
if last_cursor.as_ref() == Some(&cursor_val) {
next_cursor = None;
break;
}
last_cursor = Some(cursor_val.clone());
cursor_obj = Some(cursor_val);
}
_ => break,
}
}

Ok((items, next_cursor))
}
Expand Down
78 changes: 78 additions & 0 deletions codex-rs/app-server/tests/suite/list_resume.rs
Original file line number Diff line number Diff line change
Expand Up @@ -358,3 +358,81 @@ async fn test_list_and_resume_conversations() -> Result<()> {

Ok(())
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn list_conversations_fetches_through_filtered_pages() -> Result<()> {
let codex_home = TempDir::new()?;

// Only the last 3 conversations match the provider filter; request 3 and
// ensure pagination keeps fetching past non-matching pages.
let cases = [
(
"2025-03-04T12-00-00",
"2025-03-04T12:00:00Z",
"skip_provider",
),
(
"2025-03-03T12-00-00",
"2025-03-03T12:00:00Z",
"skip_provider",
),
(
"2025-03-02T12-00-00",
"2025-03-02T12:00:00Z",
"target_provider",
),
(
"2025-03-01T12-00-00",
"2025-03-01T12:00:00Z",
"target_provider",
),
(
"2025-02-28T12-00-00",
"2025-02-28T12:00:00Z",
"target_provider",
),
];

for (ts_file, ts_rfc, provider) in cases {
create_fake_rollout(
codex_home.path(),
ts_file,
ts_rfc,
"Hello",
Some(provider),
None,
)?;
}

let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;

let req_id = mcp
.send_list_conversations_request(ListConversationsParams {
page_size: Some(3),
cursor: None,
model_providers: Some(vec!["target_provider".to_string()]),
})
.await?;
let resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(req_id)),
)
.await??;
let ListConversationsResponse { items, next_cursor } =
to_response::<ListConversationsResponse>(resp)?;

assert_eq!(
items.len(),
3,
"should fetch across pages to satisfy the limit"
);
assert!(
items
.iter()
.all(|item| item.model_provider == "target_provider")
);
assert_eq!(next_cursor, None);

Ok(())
}
Loading
Loading