From c10a72d968a0e0936adfb1a68a2c602fca7e656a Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim Date: Mon, 29 Sep 2025 13:13:33 -0700 Subject: [PATCH 1/2] add tail in the data --- codex-rs/core/src/rollout/list.rs | 101 +++++++++-- codex-rs/core/src/rollout/tests.rs | 268 +++++++++++++++++++++++++++++ codex-rs/tui/src/resume_picker.rs | 3 + 3 files changed, 362 insertions(+), 10 deletions(-) diff --git a/codex-rs/core/src/rollout/list.rs b/codex-rs/core/src/rollout/list.rs index 6f06709396..feafb366f9 100644 --- a/codex-rs/core/src/rollout/list.rs +++ b/codex-rs/core/src/rollout/list.rs @@ -36,13 +36,16 @@ pub struct ConversationsPage { pub struct ConversationItem { /// Absolute path to the rollout file. pub path: PathBuf, - /// First up to 5 JSONL records parsed as JSON (includes meta line). + /// First up to `HEAD_RECORD_LIMIT` JSONL records parsed as JSON (includes meta line). pub head: Vec, + /// Last up to `TAIL_RECORD_LIMIT` JSONL response records parsed as JSON. + pub tail: Vec, } /// Hard cap to bound worst‑case work per request. const MAX_SCAN_FILES: usize = 100; const HEAD_RECORD_LIMIT: usize = 10; +const TAIL_RECORD_LIMIT: usize = 10; /// Pagination cursor identifying a file by timestamp and UUID. #[derive(Debug, Clone, PartialEq, Eq)] @@ -176,13 +179,13 @@ async fn traverse_directories_for_paths( } // Read head and simultaneously detect message events within the same // first N JSONL records to avoid a second file read. - let (head, saw_session_meta, saw_user_event) = - read_head_and_flags(&path, HEAD_RECORD_LIMIT) + let (head, tail, saw_session_meta, saw_user_event) = + read_head_and_tail(&path, HEAD_RECORD_LIMIT, TAIL_RECORD_LIMIT) .await - .unwrap_or((Vec::new(), false, false)); + .unwrap_or((Vec::new(), Vec::new(), false, false)); // Apply filters: must have session meta and at least one user message event if saw_session_meta && saw_user_event { - items.push(ConversationItem { path, head }); + items.push(ConversationItem { path, head, tail }); } } } @@ -286,10 +289,11 @@ fn parse_timestamp_uuid_from_filename(name: &str) -> Option<(OffsetDateTime, Uui Some((ts, uuid)) } -async fn read_head_and_flags( +async fn read_head_and_tail( path: &Path, - max_records: usize, -) -> io::Result<(Vec, bool, bool)> { + head_limit: usize, + tail_limit: usize, +) -> io::Result<(Vec, Vec, bool, bool)> { use tokio::io::AsyncBufReadExt; let file = tokio::fs::File::open(path).await?; @@ -299,7 +303,7 @@ async fn read_head_and_flags( let mut saw_session_meta = false; let mut saw_user_event = false; - while head.len() < max_records { + while head.len() < head_limit { let line_opt = lines.next_line().await?; let Some(line) = line_opt else { break }; let trimmed = line.trim(); @@ -336,7 +340,84 @@ async fn read_head_and_flags( } } - Ok((head, saw_session_meta, saw_user_event)) + let tail = if tail_limit == 0 { + Vec::new() + } else { + read_tail_records(path, tail_limit).await? + }; + + Ok((head, tail, saw_session_meta, saw_user_event)) +} + +async fn read_tail_records(path: &Path, max_records: usize) -> io::Result> { + use std::io::SeekFrom; + use tokio::io::AsyncReadExt; + use tokio::io::AsyncSeekExt; + + if max_records == 0 { + return Ok(Vec::new()); + } + + const CHUNK_SIZE: usize = 8192; + + let mut file = tokio::fs::File::open(path).await?; + let mut pos = file.seek(SeekFrom::End(0)).await?; + if pos == 0 { + return Ok(Vec::new()); + } + + let mut buffer: Vec = Vec::new(); + + loop { + let slice_start = match (pos > 0, buffer.iter().position(|&b| b == b'\n')) { + (true, Some(idx)) => idx + 1, + _ => 0, + }; + let tail = collect_last_response_values(&buffer[slice_start..], max_records); + if tail.len() >= max_records || pos == 0 { + return Ok(tail); + } + + let read_size = CHUNK_SIZE.min(pos as usize); + if read_size == 0 { + return Ok(tail); + } + pos -= read_size as u64; + file.seek(SeekFrom::Start(pos)).await?; + let mut chunk = vec![0; read_size]; + file.read_exact(&mut chunk).await?; + chunk.extend_from_slice(&buffer); + buffer = chunk; + } +} + +fn collect_last_response_values(buffer: &[u8], max_records: usize) -> Vec { + use std::borrow::Cow; + + if buffer.is_empty() || max_records == 0 { + return Vec::new(); + } + + let text: Cow<'_, str> = String::from_utf8_lossy(buffer); + let mut collected_rev: Vec = Vec::new(); + for line in text.lines().rev() { + let trimmed = line.trim(); + if trimmed.is_empty() { + continue; + } + let parsed: serde_json::Result = serde_json::from_str(trimmed); + let Ok(rollout_line) = parsed else { continue }; + if let RolloutItem::ResponseItem(item) = rollout_line.item { + if let Ok(val) = serde_json::to_value(item) { + collected_rev.push(val); + if collected_rev.len() == max_records { + break; + } + } + } + } + collected_rev.reverse(); + collected_rev } /// Locate a recorded conversation rollout file by its UUID string using the existing diff --git a/codex-rs/core/src/rollout/tests.rs b/codex-rs/core/src/rollout/tests.rs index 86f47645f9..af5234d31b 100644 --- a/codex-rs/core/src/rollout/tests.rs +++ b/codex-rs/core/src/rollout/tests.rs @@ -17,6 +17,18 @@ use crate::rollout::list::ConversationsPage; use crate::rollout::list::Cursor; use crate::rollout::list::get_conversation; use crate::rollout::list::get_conversations; +use anyhow::Result; +use codex_protocol::mcp_protocol::ConversationId; +use codex_protocol::models::ContentItem; +use codex_protocol::models::ResponseItem; +use codex_protocol::protocol::CompactedItem; +use codex_protocol::protocol::EventMsg; +use codex_protocol::protocol::InputMessageKind; +use codex_protocol::protocol::RolloutItem; +use codex_protocol::protocol::RolloutLine; +use codex_protocol::protocol::SessionMeta; +use codex_protocol::protocol::SessionMetaLine; +use codex_protocol::protocol::UserMessageEvent; fn write_session_file( root: &Path, @@ -146,14 +158,17 @@ async fn test_list_conversations_latest_first() { ConversationItem { path: p1, head: head_3, + tail: Vec::new(), }, ConversationItem { path: p2, head: head_2, + tail: Vec::new(), }, ConversationItem { path: p3, head: head_1, + tail: Vec::new(), }, ], next_cursor: Some(expected_cursor), @@ -219,10 +234,12 @@ async fn test_pagination_cursor() { ConversationItem { path: p5, head: head_5, + tail: Vec::new(), }, ConversationItem { path: p4, head: head_4, + tail: Vec::new(), }, ], next_cursor: Some(expected_cursor1.clone()), @@ -269,10 +286,12 @@ async fn test_pagination_cursor() { ConversationItem { path: p3, head: head_3, + tail: Vec::new(), }, ConversationItem { path: p2, head: head_2, + tail: Vec::new(), }, ], next_cursor: Some(expected_cursor2.clone()), @@ -304,6 +323,7 @@ async fn test_pagination_cursor() { items: vec![ConversationItem { path: p1, head: head_1, + tail: Vec::new(), }], next_cursor: Some(expected_cursor3), num_scanned_files: 5, // scanned 05, 04 (anchor), 03, 02 (anchor), 01 @@ -346,6 +366,7 @@ async fn test_get_conversation_contents() { items: vec![ConversationItem { path: expected_path, head: expected_head, + tail: Vec::new(), }], next_cursor: Some(expected_cursor), num_scanned_files: 1, @@ -366,6 +387,250 @@ async fn test_get_conversation_contents() { assert_eq!(content, expected_content); } +#[tokio::test] +async fn test_tail_includes_last_response_items() -> Result<()> { + let temp = TempDir::new().unwrap(); + let home = temp.path(); + + let ts = "2025-06-01T08-00-00"; + let uuid = Uuid::from_u128(42); + let day_dir = home.join("sessions").join("2025").join("06").join("01"); + fs::create_dir_all(&day_dir)?; + let file_path = day_dir.join(format!("rollout-{ts}-{uuid}.jsonl")); + let mut file = File::create(&file_path)?; + + let conversation_id = ConversationId::from_string(&uuid.to_string())?; + let meta_line = RolloutLine { + timestamp: ts.to_string(), + item: RolloutItem::SessionMeta(SessionMetaLine { + meta: SessionMeta { + id: conversation_id, + timestamp: ts.to_string(), + instructions: None, + cwd: ".".into(), + originator: "test_originator".into(), + cli_version: "test_version".into(), + }, + git: None, + }), + }; + writeln!(file, "{}", serde_json::to_string(&meta_line)?)?; + + let user_event_line = RolloutLine { + timestamp: ts.to_string(), + item: RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent { + message: "hello".into(), + kind: Some(InputMessageKind::Plain), + images: None, + })), + }; + writeln!(file, "{}", serde_json::to_string(&user_event_line)?)?; + + let total_messages = 12usize; + for idx in 0..total_messages { + let response_line = RolloutLine { + timestamp: format!("{ts}-{idx:02}"), + item: RolloutItem::ResponseItem(ResponseItem::Message { + id: None, + role: "assistant".into(), + content: vec![ContentItem::OutputText { + text: format!("reply-{idx}"), + }], + }), + }; + writeln!(file, "{}", serde_json::to_string(&response_line)?)?; + } + drop(file); + + let page = get_conversations(home, 1, None).await?; + let item = page.items.first().expect("conversation item"); + let tail_len = item.tail.len(); + assert_eq!(tail_len, 10usize.min(total_messages)); + + let expected: Vec = (total_messages - tail_len..total_messages) + .map(|idx| { + serde_json::to_value(ResponseItem::Message { + id: None, + role: "assistant".into(), + content: vec![ContentItem::OutputText { + text: format!("reply-{idx}"), + }], + }) + .expect("serialize response item") + }) + .collect(); + + assert_eq!(item.tail, expected); + + Ok(()) +} + +#[tokio::test] +async fn test_tail_handles_short_sessions() -> Result<()> { + let temp = TempDir::new().unwrap(); + let home = temp.path(); + + let ts = "2025-06-02T08-30-00"; + let uuid = Uuid::from_u128(7); + let day_dir = home.join("sessions").join("2025").join("06").join("02"); + fs::create_dir_all(&day_dir)?; + let file_path = day_dir.join(format!("rollout-{ts}-{uuid}.jsonl")); + let mut file = File::create(&file_path)?; + + let conversation_id = ConversationId::from_string(&uuid.to_string())?; + let meta_line = RolloutLine { + timestamp: ts.to_string(), + item: RolloutItem::SessionMeta(SessionMetaLine { + meta: SessionMeta { + id: conversation_id, + timestamp: ts.to_string(), + instructions: None, + cwd: ".".into(), + originator: "test_originator".into(), + cli_version: "test_version".into(), + }, + git: None, + }), + }; + writeln!(file, "{}", serde_json::to_string(&meta_line)?)?; + + let user_event_line = RolloutLine { + timestamp: ts.to_string(), + item: RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent { + message: "hi".into(), + kind: Some(InputMessageKind::Plain), + images: None, + })), + }; + writeln!(file, "{}", serde_json::to_string(&user_event_line)?)?; + + for idx in 0..3 { + let response_line = RolloutLine { + timestamp: format!("{ts}-{idx:02}"), + item: RolloutItem::ResponseItem(ResponseItem::Message { + id: None, + role: "assistant".into(), + content: vec![ContentItem::OutputText { + text: format!("short-{idx}"), + }], + }), + }; + writeln!(file, "{}", serde_json::to_string(&response_line)?)?; + } + drop(file); + + let page = get_conversations(home, 1, None).await?; + let tail = &page.items.first().expect("conversation item").tail; + + assert_eq!(tail.len(), 3); + + let expected: Vec = (0..3) + .map(|idx| { + serde_json::to_value(ResponseItem::Message { + id: None, + role: "assistant".into(), + content: vec![ContentItem::OutputText { + text: format!("short-{idx}"), + }], + }) + .expect("serialize response item") + }) + .collect(); + + assert_eq!(tail, &expected); + + Ok(()) +} + +#[tokio::test] +async fn test_tail_skips_trailing_non_responses() -> Result<()> { + let temp = TempDir::new().unwrap(); + let home = temp.path(); + + let ts = "2025-06-03T10-00-00"; + let uuid = Uuid::from_u128(11); + let day_dir = home.join("sessions").join("2025").join("06").join("03"); + fs::create_dir_all(&day_dir)?; + let file_path = day_dir.join(format!("rollout-{ts}-{uuid}.jsonl")); + let mut file = File::create(&file_path)?; + + let conversation_id = ConversationId::from_string(&uuid.to_string())?; + let meta_line = RolloutLine { + timestamp: ts.to_string(), + item: RolloutItem::SessionMeta(SessionMetaLine { + meta: SessionMeta { + id: conversation_id, + timestamp: ts.to_string(), + instructions: None, + cwd: ".".into(), + originator: "test_originator".into(), + cli_version: "test_version".into(), + }, + git: None, + }), + }; + writeln!(file, "{}", serde_json::to_string(&meta_line)?)?; + + let user_event_line = RolloutLine { + timestamp: ts.to_string(), + item: RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent { + message: "hello".into(), + kind: Some(InputMessageKind::Plain), + images: None, + })), + }; + writeln!(file, "{}", serde_json::to_string(&user_event_line)?)?; + + for idx in 0..4 { + let response_line = RolloutLine { + timestamp: format!("{ts}-{idx:02}"), + item: RolloutItem::ResponseItem(ResponseItem::Message { + id: None, + role: "assistant".into(), + content: vec![ContentItem::OutputText { + text: format!("response-{idx}"), + }], + }), + }; + writeln!(file, "{}", serde_json::to_string(&response_line)?)?; + } + + let compacted_line = RolloutLine { + timestamp: format!("{ts}-compacted"), + item: RolloutItem::Compacted(CompactedItem { + message: "compacted".into(), + }), + }; + writeln!(file, "{}", serde_json::to_string(&compacted_line)?)?; + + let shutdown_event = RolloutLine { + timestamp: format!("{ts}-shutdown"), + item: RolloutItem::EventMsg(EventMsg::ShutdownComplete), + }; + writeln!(file, "{}", serde_json::to_string(&shutdown_event)?)?; + drop(file); + + let page = get_conversations(home, 1, None).await?; + let tail = &page.items.first().expect("conversation item").tail; + + let expected: Vec = (0..4) + .map(|idx| { + serde_json::to_value(ResponseItem::Message { + id: None, + role: "assistant".into(), + content: vec![ContentItem::OutputText { + text: format!("response-{idx}"), + }], + }) + .expect("serialize response item") + }) + .collect(); + + assert_eq!(tail, &expected); + + Ok(()) +} + #[tokio::test] async fn test_stable_ordering_same_second_pagination() { let temp = TempDir::new().unwrap(); @@ -410,10 +675,12 @@ async fn test_stable_ordering_same_second_pagination() { ConversationItem { path: p3, head: head(u3), + tail: Vec::new(), }, ConversationItem { path: p2, head: head(u2), + tail: Vec::new(), }, ], next_cursor: Some(expected_cursor1.clone()), @@ -436,6 +703,7 @@ async fn test_stable_ordering_same_second_pagination() { items: vec![ConversationItem { path: p1, head: head(u1), + tail: Vec::new(), }], next_cursor: Some(expected_cursor2), num_scanned_files: 3, // scanned u3, u2 (anchor), u1 diff --git a/codex-rs/tui/src/resume_picker.rs b/codex-rs/tui/src/resume_picker.rs index ead04eecd5..d6297528ce 100644 --- a/codex-rs/tui/src/resume_picker.rs +++ b/codex-rs/tui/src/resume_picker.rs @@ -804,6 +804,7 @@ mod tests { ConversationItem { path: PathBuf::from(path), head: head_with_ts_and_user_text(ts, &[preview]), + tail: Vec::new(), } } @@ -863,10 +864,12 @@ mod tests { let a = ConversationItem { path: PathBuf::from("/tmp/a.jsonl"), head: head_with_ts_and_user_text("2025-01-01T00:00:00Z", &["A"]), + tail: Vec::new(), }; let b = ConversationItem { path: PathBuf::from("/tmp/b.jsonl"), head: head_with_ts_and_user_text("2025-01-02T00:00:00Z", &["B"]), + tail: Vec::new(), }; let rows = rows_from_items(vec![a, b]); assert_eq!(rows.len(), 2); From 81b181cc2d46858363b5b43dc5e04a1edddb6d88 Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim Date: Mon, 29 Sep 2025 13:24:03 -0700 Subject: [PATCH 2/2] add tail in the data --- codex-rs/core/src/rollout/list.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/codex-rs/core/src/rollout/list.rs b/codex-rs/core/src/rollout/list.rs index feafb366f9..4ad7e73b89 100644 --- a/codex-rs/core/src/rollout/list.rs +++ b/codex-rs/core/src/rollout/list.rs @@ -407,12 +407,12 @@ fn collect_last_response_values(buffer: &[u8], max_records: usize) -> Vec = serde_json::from_str(trimmed); let Ok(rollout_line) = parsed else { continue }; - if let RolloutItem::ResponseItem(item) = rollout_line.item { - if let Ok(val) = serde_json::to_value(item) { - collected_rev.push(val); - if collected_rev.len() == max_records { - break; - } + if let RolloutItem::ResponseItem(item) = rollout_line.item + && let Ok(val) = serde_json::to_value(item) + { + collected_rev.push(val); + if collected_rev.len() == max_records { + break; } } }