Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 120 additions & 32 deletions codex-rs/rollout/src/recorder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -528,22 +528,22 @@ impl RolloutRecorder {
)
.await;
}
return Ok(page_from_filesystem_scan(
fs_page,
sort_direction,
page_size,
sort_key,
));
let page = page_from_filesystem_scan(fs_page, sort_direction, page_size, sort_key);
return Ok(fill_missing_thread_item_metadata_from_state_db(
state_db_ctx.as_deref(),
page,
)
.await);
}
return Ok(db_page.into());
}
if listing_has_metadata_filters {
return Ok(page_from_filesystem_scan(
fs_page,
sort_direction,
page_size,
sort_key,
));
let page = page_from_filesystem_scan(fs_page, sort_direction, page_size, sort_key);
return Ok(fill_missing_thread_item_metadata_from_state_db(
state_db_ctx.as_deref(),
page,
)
.await);
}
// If SQLite listing still fails, return the filesystem page rather than failing the list.
tracing::error!("Falling back on rollout system");
Expand Down Expand Up @@ -978,6 +978,90 @@ fn page_from_filesystem_scan(
}
}

async fn fill_missing_thread_item_metadata_from_state_db(
state_db_ctx: Option<&StateRuntime>,
mut page: ThreadsPage,
) -> ThreadsPage {
let Some(state_db_ctx) = state_db_ctx else {
return page;
};

for item in &mut page.items {
let Some(thread_id) = item.thread_id else {
continue;
};
let metadata = match state_db_ctx.get_thread(thread_id).await {
Ok(Some(metadata)) => metadata,
Ok(None) => continue,
Err(err) => {
warn!(
"state db get_thread failed while overlaying filesystem scan thread metadata: {err}"
);
continue;
}
};
fill_missing_thread_item_metadata(item, thread_item_from_state_metadata(metadata));
}

page
}

fn fill_missing_thread_item_metadata(item: &mut ThreadItem, state_item: ThreadItem) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks bad and probably hide a bad design

let ThreadItem {
path: _state_path,
thread_id: _state_thread_id,
first_user_message,
cwd,
git_branch,
git_sha,
git_origin_url,
source,
agent_nickname,
agent_role,
model_provider,
cli_version,
created_at,
updated_at,
} = state_item;

if item.first_user_message.is_none() {
item.first_user_message = first_user_message;
}
if item.cwd.is_none() {
item.cwd = cwd;
}
if item.git_branch.is_none() {
item.git_branch = git_branch;
}
if item.git_sha.is_none() {
item.git_sha = git_sha;
}
if item.git_origin_url.is_none() {
item.git_origin_url = git_origin_url;
}
if item.source.is_none() {
item.source = source;
}
if item.agent_nickname.is_none() {
item.agent_nickname = agent_nickname;
}
if item.agent_role.is_none() {
item.agent_role = agent_role;
}
if item.model_provider.is_none() {
item.model_provider = model_provider;
}
if item.cli_version.is_none() {
item.cli_version = cli_version;
}
if item.created_at.is_none() {
item.created_at = created_at;
}
if item.updated_at.is_none() {
item.updated_at = updated_at;
}
}

#[allow(clippy::too_many_arguments)]
async fn list_threads_from_files_desc(
codex_home: &Path,
Expand Down Expand Up @@ -1684,26 +1768,7 @@ impl From<codex_state::ThreadsPage> for ThreadsPage {
let items = db_page
.items
.into_iter()
.map(|item| ThreadItem {
path: item.rollout_path,
thread_id: Some(item.id),
first_user_message: item.first_user_message,
cwd: Some(item.cwd),
git_branch: item.git_branch,
git_sha: item.git_sha,
git_origin_url: item.git_origin_url,
source: Some(
serde_json::from_str(item.source.as_str())
.or_else(|_| serde_json::from_value(Value::String(item.source)))
.unwrap_or(SessionSource::Unknown),
),
agent_nickname: item.agent_nickname,
agent_role: item.agent_role,
model_provider: Some(item.model_provider),
cli_version: Some(item.cli_version),
created_at: Some(item.created_at.to_rfc3339_opts(SecondsFormat::Secs, true)),
updated_at: Some(item.updated_at.to_rfc3339_opts(SecondsFormat::Millis, true)),
})
.map(thread_item_from_state_metadata)
.collect();
Self {
items,
Expand All @@ -1714,6 +1779,29 @@ impl From<codex_state::ThreadsPage> for ThreadsPage {
}
}

fn thread_item_from_state_metadata(item: codex_state::ThreadMetadata) -> ThreadItem {
ThreadItem {
path: item.rollout_path,
thread_id: Some(item.id),
first_user_message: item.first_user_message,
cwd: Some(item.cwd),
git_branch: item.git_branch,
git_sha: item.git_sha,
git_origin_url: item.git_origin_url,
source: Some(
serde_json::from_str(item.source.as_str())
.or_else(|_| serde_json::from_value(Value::String(item.source)))
.unwrap_or(SessionSource::Unknown),
),
agent_nickname: item.agent_nickname,
agent_role: item.agent_role,
model_provider: Some(item.model_provider),
cli_version: Some(item.cli_version),
created_at: Some(item.created_at.to_rfc3339_opts(SecondsFormat::Secs, true)),
updated_at: Some(item.updated_at.to_rfc3339_opts(SecondsFormat::Millis, true)),
}
}

async fn select_resume_path(
page: &ThreadsPage,
filter_cwd: Option<&Path>,
Expand Down
128 changes: 128 additions & 0 deletions codex-rs/rollout/src/recorder_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -713,6 +713,134 @@ async fn list_threads_default_filter_returns_filesystem_scan_results() -> std::i
Ok(())
}

#[tokio::test]
async fn list_threads_metadata_filter_overlays_state_db_list_metadata() -> std::io::Result<()> {
let home = TempDir::new().expect("temp dir");
let config = test_config(home.path());

let uuid = Uuid::from_u128(9015);
let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id");
let rollout_path = write_session_file(home.path(), "2025-01-03T16-00-00", uuid)?;

let runtime = codex_state::StateRuntime::init(
home.path().to_path_buf(),
config.model_provider_id.clone(),
)
.await
.expect("state db should initialize");
runtime
.mark_backfill_complete(/*last_watermark*/ None)
.await
.expect("backfill should be complete");
let created_at = chrono::Utc
.with_ymd_and_hms(2025, 1, 3, 16, 0, 0)
.single()
.expect("valid datetime");
let mut builder = codex_state::ThreadMetadataBuilder::new(
thread_id,
rollout_path,
created_at,
SessionSource::Cli,
);
builder.model_provider = Some(config.model_provider_id.clone());
builder.cwd = home.path().to_path_buf();
builder.git_branch = Some("sqlite-branch".to_string());
builder.git_sha = Some("sqlite-sha".to_string());
builder.git_origin_url = Some("https://example.com/repo.git".to_string());
let mut metadata = builder.build(config.model_provider_id.as_str());
metadata.first_user_message = Some("Hello from user".to_string());
runtime
.upsert_thread(&metadata)
.await
.expect("state db upsert should succeed");

let page = RolloutRecorder::list_threads(
&config,
/*page_size*/ 10,
/*cursor*/ None,
ThreadSortKey::CreatedAt,
SortDirection::Desc,
&[SessionSource::Cli],
/*model_providers*/ None,
/*cwd_filters*/ None,
config.model_provider_id.as_str(),
/*search_term*/ None,
)
.await?;

assert_eq!(page.items.len(), 1);
assert_eq!(page.items[0].git_branch.as_deref(), Some("sqlite-branch"));
assert_eq!(page.items[0].git_sha.as_deref(), Some("sqlite-sha"));
assert_eq!(
page.items[0].git_origin_url.as_deref(),
Some("https://example.com/repo.git")
);
Ok(())
}

#[test]
fn fill_missing_thread_item_metadata_preserves_filesystem_identity() {
let filesystem_thread_id = ThreadId::new();
let state_thread_id = ThreadId::new();
let filesystem_path = PathBuf::from("/tmp/filesystem-rollout.jsonl");
let state_path = PathBuf::from("/tmp/state-rollout.jsonl");
let mut item = ThreadItem {
path: filesystem_path.clone(),
thread_id: Some(filesystem_thread_id),
first_user_message: Some("filesystem message".to_string()),
cwd: None,
git_branch: None,
git_sha: None,
git_origin_url: None,
source: None,
agent_nickname: None,
agent_role: None,
model_provider: None,
cli_version: None,
created_at: None,
updated_at: None,
};
let state_item = ThreadItem {
path: state_path,
thread_id: Some(state_thread_id),
first_user_message: Some("state message".to_string()),
cwd: Some(PathBuf::from("/tmp/state-cwd")),
git_branch: Some("state-branch".to_string()),
git_sha: Some("state-sha".to_string()),
git_origin_url: Some("https://example.com/state.git".to_string()),
source: Some(SessionSource::Exec),
agent_nickname: Some("state-agent".to_string()),
agent_role: Some("state-role".to_string()),
model_provider: Some("state-provider".to_string()),
cli_version: Some("state-version".to_string()),
created_at: Some("2025-01-03T16:00:00Z".to_string()),
updated_at: Some("2025-01-03T16:01:02.003Z".to_string()),
};

fill_missing_thread_item_metadata(&mut item, state_item);

assert_eq!(item.path, filesystem_path);
assert_eq!(item.thread_id, Some(filesystem_thread_id));
assert_eq!(
item.first_user_message.as_deref(),
Some("filesystem message")
);
assert_eq!(item.cwd.as_deref(), Some(Path::new("/tmp/state-cwd")));
assert_eq!(item.git_branch.as_deref(), Some("state-branch"));
assert_eq!(item.git_sha.as_deref(), Some("state-sha"));
assert_eq!(
item.git_origin_url.as_deref(),
Some("https://example.com/state.git")
);
assert_eq!(item.source, Some(SessionSource::Exec));
assert_eq!(item.agent_nickname.as_deref(), Some("state-agent"));
assert_eq!(item.agent_role.as_deref(), Some("state-role"));
assert_eq!(item.model_provider.as_deref(), Some("state-provider"));
assert_eq!(item.cli_version.as_deref(), Some("state-version"));
assert_eq!(item.created_at.as_deref(), Some("2025-01-03T16:00:00Z"));
assert_eq!(item.updated_at.as_deref(), Some("2025-01-03T16:01:02.003Z"));
}

#[tokio::test]
async fn list_threads_search_repairs_stale_state_db_hits_before_returning() -> std::io::Result<()> {
let home = TempDir::new().expect("temp dir");
Expand Down
Loading