From 990dc51c94d798206b0944cdcb9b588a2b64854b Mon Sep 17 00:00:00 2001 From: Joey Trasatti Date: Wed, 22 Apr 2026 14:10:26 -0700 Subject: [PATCH 1/2] Overlay state DB git metadata for filtered thread lists --- codex-rs/rollout/src/recorder.rs | 57 +++++++++++++++++----- codex-rs/rollout/src/recorder_tests.rs | 65 ++++++++++++++++++++++++++ 2 files changed, 110 insertions(+), 12 deletions(-) diff --git a/codex-rs/rollout/src/recorder.rs b/codex-rs/rollout/src/recorder.rs index 184cfc1919fd..6ce79922303c 100644 --- a/codex-rs/rollout/src/recorder.rs +++ b/codex-rs/rollout/src/recorder.rs @@ -528,22 +528,16 @@ impl RolloutRecorder { ) .await; } - return Ok(page_from_filesystem_scan( - fs_page, - sort_direction, - page_size, - sort_key, - )); + let page = page_from_filesystem_scan(fs_page, sort_direction, page_size, sort_key); + return Ok( + fill_missing_git_info_from_state_db(state_db_ctx.as_deref(), page).await, + ); } return Ok(db_page.into()); } if listing_has_metadata_filters { - return Ok(page_from_filesystem_scan( - fs_page, - sort_direction, - page_size, - sort_key, - )); + let page = page_from_filesystem_scan(fs_page, sort_direction, page_size, sort_key); + return Ok(fill_missing_git_info_from_state_db(state_db_ctx.as_deref(), page).await); } // If SQLite listing still fails, return the filesystem page rather than failing the list. tracing::error!("Falling back on rollout system"); @@ -978,6 +972,45 @@ fn page_from_filesystem_scan( } } +async fn fill_missing_git_info_from_state_db( + state_db_ctx: Option<&StateRuntime>, + mut page: ThreadsPage, +) -> ThreadsPage { + let Some(state_db_ctx) = state_db_ctx else { + return page; + }; + + for item in &mut page.items { + if item.git_branch.is_some() && item.git_sha.is_some() && item.git_origin_url.is_some() { + continue; + } + let Some(thread_id) = item.thread_id else { + continue; + }; + let metadata = match state_db_ctx.get_thread(thread_id).await { + Ok(Some(metadata)) => metadata, + Ok(None) => continue, + Err(err) => { + warn!( + "state db get_thread failed while overlaying filesystem scan git info: {err}" + ); + continue; + } + }; + if item.git_branch.is_none() { + item.git_branch = metadata.git_branch; + } + if item.git_sha.is_none() { + item.git_sha = metadata.git_sha; + } + if item.git_origin_url.is_none() { + item.git_origin_url = metadata.git_origin_url; + } + } + + page +} + #[allow(clippy::too_many_arguments)] async fn list_threads_from_files_desc( codex_home: &Path, diff --git a/codex-rs/rollout/src/recorder_tests.rs b/codex-rs/rollout/src/recorder_tests.rs index 852bf6c6df66..9e5fc8c348af 100644 --- a/codex-rs/rollout/src/recorder_tests.rs +++ b/codex-rs/rollout/src/recorder_tests.rs @@ -713,6 +713,71 @@ async fn list_threads_default_filter_returns_filesystem_scan_results() -> std::i Ok(()) } +#[tokio::test] +async fn list_threads_metadata_filter_overlays_state_db_git_info() -> std::io::Result<()> { + let home = TempDir::new().expect("temp dir"); + let config = test_config(home.path()); + + let uuid = Uuid::from_u128(9015); + let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id"); + let rollout_path = write_session_file(home.path(), "2025-01-03T16-00-00", uuid)?; + + let runtime = codex_state::StateRuntime::init( + home.path().to_path_buf(), + config.model_provider_id.clone(), + ) + .await + .expect("state db should initialize"); + runtime + .mark_backfill_complete(/*last_watermark*/ None) + .await + .expect("backfill should be complete"); + let created_at = chrono::Utc + .with_ymd_and_hms(2025, 1, 3, 16, 0, 0) + .single() + .expect("valid datetime"); + let mut builder = codex_state::ThreadMetadataBuilder::new( + thread_id, + rollout_path, + created_at, + SessionSource::Cli, + ); + builder.model_provider = Some(config.model_provider_id.clone()); + builder.cwd = home.path().to_path_buf(); + builder.git_branch = Some("sqlite-branch".to_string()); + builder.git_sha = Some("sqlite-sha".to_string()); + builder.git_origin_url = Some("https://example.com/repo.git".to_string()); + let mut metadata = builder.build(config.model_provider_id.as_str()); + metadata.first_user_message = Some("Hello from user".to_string()); + runtime + .upsert_thread(&metadata) + .await + .expect("state db upsert should succeed"); + + let page = RolloutRecorder::list_threads( + &config, + /*page_size*/ 10, + /*cursor*/ None, + ThreadSortKey::CreatedAt, + SortDirection::Desc, + &[SessionSource::Cli], + /*model_providers*/ None, + /*cwd_filters*/ None, + config.model_provider_id.as_str(), + /*search_term*/ None, + ) + .await?; + + assert_eq!(page.items.len(), 1); + assert_eq!(page.items[0].git_branch.as_deref(), Some("sqlite-branch")); + assert_eq!(page.items[0].git_sha.as_deref(), Some("sqlite-sha")); + assert_eq!( + page.items[0].git_origin_url.as_deref(), + Some("https://example.com/repo.git") + ); + Ok(()) +} + #[tokio::test] async fn list_threads_search_repairs_stale_state_db_hits_before_returning() -> std::io::Result<()> { let home = TempDir::new().expect("temp dir"); From 2380b10688ee428b0252f619c7c53fd4b0d6abeb Mon Sep 17 00:00:00 2001 From: Joey Trasatti Date: Wed, 22 Apr 2026 14:29:18 -0700 Subject: [PATCH 2/2] Generalize filtered thread metadata overlay --- codex-rs/rollout/src/recorder.rs | 131 ++++++++++++++++++------- codex-rs/rollout/src/recorder_tests.rs | 65 +++++++++++- 2 files changed, 157 insertions(+), 39 deletions(-) diff --git a/codex-rs/rollout/src/recorder.rs b/codex-rs/rollout/src/recorder.rs index 6ce79922303c..cd93b6d55814 100644 --- a/codex-rs/rollout/src/recorder.rs +++ b/codex-rs/rollout/src/recorder.rs @@ -529,15 +529,21 @@ impl RolloutRecorder { .await; } let page = page_from_filesystem_scan(fs_page, sort_direction, page_size, sort_key); - return Ok( - fill_missing_git_info_from_state_db(state_db_ctx.as_deref(), page).await, - ); + return Ok(fill_missing_thread_item_metadata_from_state_db( + state_db_ctx.as_deref(), + page, + ) + .await); } return Ok(db_page.into()); } if listing_has_metadata_filters { let page = page_from_filesystem_scan(fs_page, sort_direction, page_size, sort_key); - return Ok(fill_missing_git_info_from_state_db(state_db_ctx.as_deref(), page).await); + return Ok(fill_missing_thread_item_metadata_from_state_db( + state_db_ctx.as_deref(), + page, + ) + .await); } // If SQLite listing still fails, return the filesystem page rather than failing the list. tracing::error!("Falling back on rollout system"); @@ -972,7 +978,7 @@ fn page_from_filesystem_scan( } } -async fn fill_missing_git_info_from_state_db( +async fn fill_missing_thread_item_metadata_from_state_db( state_db_ctx: Option<&StateRuntime>, mut page: ThreadsPage, ) -> ThreadsPage { @@ -981,9 +987,6 @@ async fn fill_missing_git_info_from_state_db( }; for item in &mut page.items { - if item.git_branch.is_some() && item.git_sha.is_some() && item.git_origin_url.is_some() { - continue; - } let Some(thread_id) = item.thread_id else { continue; }; @@ -992,25 +995,73 @@ async fn fill_missing_git_info_from_state_db( Ok(None) => continue, Err(err) => { warn!( - "state db get_thread failed while overlaying filesystem scan git info: {err}" + "state db get_thread failed while overlaying filesystem scan thread metadata: {err}" ); continue; } }; - if item.git_branch.is_none() { - item.git_branch = metadata.git_branch; - } - if item.git_sha.is_none() { - item.git_sha = metadata.git_sha; - } - if item.git_origin_url.is_none() { - item.git_origin_url = metadata.git_origin_url; - } + fill_missing_thread_item_metadata(item, thread_item_from_state_metadata(metadata)); } page } +fn fill_missing_thread_item_metadata(item: &mut ThreadItem, state_item: ThreadItem) { + let ThreadItem { + path: _state_path, + thread_id: _state_thread_id, + first_user_message, + cwd, + git_branch, + git_sha, + git_origin_url, + source, + agent_nickname, + agent_role, + model_provider, + cli_version, + created_at, + updated_at, + } = state_item; + + if item.first_user_message.is_none() { + item.first_user_message = first_user_message; + } + if item.cwd.is_none() { + item.cwd = cwd; + } + if item.git_branch.is_none() { + item.git_branch = git_branch; + } + if item.git_sha.is_none() { + item.git_sha = git_sha; + } + if item.git_origin_url.is_none() { + item.git_origin_url = git_origin_url; + } + if item.source.is_none() { + item.source = source; + } + if item.agent_nickname.is_none() { + item.agent_nickname = agent_nickname; + } + if item.agent_role.is_none() { + item.agent_role = agent_role; + } + if item.model_provider.is_none() { + item.model_provider = model_provider; + } + if item.cli_version.is_none() { + item.cli_version = cli_version; + } + if item.created_at.is_none() { + item.created_at = created_at; + } + if item.updated_at.is_none() { + item.updated_at = updated_at; + } +} + #[allow(clippy::too_many_arguments)] async fn list_threads_from_files_desc( codex_home: &Path, @@ -1717,26 +1768,7 @@ impl From for ThreadsPage { let items = db_page .items .into_iter() - .map(|item| ThreadItem { - path: item.rollout_path, - thread_id: Some(item.id), - first_user_message: item.first_user_message, - cwd: Some(item.cwd), - git_branch: item.git_branch, - git_sha: item.git_sha, - git_origin_url: item.git_origin_url, - source: Some( - serde_json::from_str(item.source.as_str()) - .or_else(|_| serde_json::from_value(Value::String(item.source))) - .unwrap_or(SessionSource::Unknown), - ), - agent_nickname: item.agent_nickname, - agent_role: item.agent_role, - model_provider: Some(item.model_provider), - cli_version: Some(item.cli_version), - created_at: Some(item.created_at.to_rfc3339_opts(SecondsFormat::Secs, true)), - updated_at: Some(item.updated_at.to_rfc3339_opts(SecondsFormat::Millis, true)), - }) + .map(thread_item_from_state_metadata) .collect(); Self { items, @@ -1747,6 +1779,29 @@ impl From for ThreadsPage { } } +fn thread_item_from_state_metadata(item: codex_state::ThreadMetadata) -> ThreadItem { + ThreadItem { + path: item.rollout_path, + thread_id: Some(item.id), + first_user_message: item.first_user_message, + cwd: Some(item.cwd), + git_branch: item.git_branch, + git_sha: item.git_sha, + git_origin_url: item.git_origin_url, + source: Some( + serde_json::from_str(item.source.as_str()) + .or_else(|_| serde_json::from_value(Value::String(item.source))) + .unwrap_or(SessionSource::Unknown), + ), + agent_nickname: item.agent_nickname, + agent_role: item.agent_role, + model_provider: Some(item.model_provider), + cli_version: Some(item.cli_version), + created_at: Some(item.created_at.to_rfc3339_opts(SecondsFormat::Secs, true)), + updated_at: Some(item.updated_at.to_rfc3339_opts(SecondsFormat::Millis, true)), + } +} + async fn select_resume_path( page: &ThreadsPage, filter_cwd: Option<&Path>, diff --git a/codex-rs/rollout/src/recorder_tests.rs b/codex-rs/rollout/src/recorder_tests.rs index 9e5fc8c348af..e5e898857e46 100644 --- a/codex-rs/rollout/src/recorder_tests.rs +++ b/codex-rs/rollout/src/recorder_tests.rs @@ -714,7 +714,7 @@ async fn list_threads_default_filter_returns_filesystem_scan_results() -> std::i } #[tokio::test] -async fn list_threads_metadata_filter_overlays_state_db_git_info() -> std::io::Result<()> { +async fn list_threads_metadata_filter_overlays_state_db_list_metadata() -> std::io::Result<()> { let home = TempDir::new().expect("temp dir"); let config = test_config(home.path()); @@ -778,6 +778,69 @@ async fn list_threads_metadata_filter_overlays_state_db_git_info() -> std::io::R Ok(()) } +#[test] +fn fill_missing_thread_item_metadata_preserves_filesystem_identity() { + let filesystem_thread_id = ThreadId::new(); + let state_thread_id = ThreadId::new(); + let filesystem_path = PathBuf::from("/tmp/filesystem-rollout.jsonl"); + let state_path = PathBuf::from("/tmp/state-rollout.jsonl"); + let mut item = ThreadItem { + path: filesystem_path.clone(), + thread_id: Some(filesystem_thread_id), + first_user_message: Some("filesystem message".to_string()), + cwd: None, + git_branch: None, + git_sha: None, + git_origin_url: None, + source: None, + agent_nickname: None, + agent_role: None, + model_provider: None, + cli_version: None, + created_at: None, + updated_at: None, + }; + let state_item = ThreadItem { + path: state_path, + thread_id: Some(state_thread_id), + first_user_message: Some("state message".to_string()), + cwd: Some(PathBuf::from("/tmp/state-cwd")), + git_branch: Some("state-branch".to_string()), + git_sha: Some("state-sha".to_string()), + git_origin_url: Some("https://example.com/state.git".to_string()), + source: Some(SessionSource::Exec), + agent_nickname: Some("state-agent".to_string()), + agent_role: Some("state-role".to_string()), + model_provider: Some("state-provider".to_string()), + cli_version: Some("state-version".to_string()), + created_at: Some("2025-01-03T16:00:00Z".to_string()), + updated_at: Some("2025-01-03T16:01:02.003Z".to_string()), + }; + + fill_missing_thread_item_metadata(&mut item, state_item); + + assert_eq!(item.path, filesystem_path); + assert_eq!(item.thread_id, Some(filesystem_thread_id)); + assert_eq!( + item.first_user_message.as_deref(), + Some("filesystem message") + ); + assert_eq!(item.cwd.as_deref(), Some(Path::new("/tmp/state-cwd"))); + assert_eq!(item.git_branch.as_deref(), Some("state-branch")); + assert_eq!(item.git_sha.as_deref(), Some("state-sha")); + assert_eq!( + item.git_origin_url.as_deref(), + Some("https://example.com/state.git") + ); + assert_eq!(item.source, Some(SessionSource::Exec)); + assert_eq!(item.agent_nickname.as_deref(), Some("state-agent")); + assert_eq!(item.agent_role.as_deref(), Some("state-role")); + assert_eq!(item.model_provider.as_deref(), Some("state-provider")); + assert_eq!(item.cli_version.as_deref(), Some("state-version")); + assert_eq!(item.created_at.as_deref(), Some("2025-01-03T16:00:00Z")); + assert_eq!(item.updated_at.as_deref(), Some("2025-01-03T16:01:02.003Z")); +} + #[tokio::test] async fn list_threads_search_repairs_stale_state_db_hits_before_returning() -> std::io::Result<()> { let home = TempDir::new().expect("temp dir");