Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
329 changes: 57 additions & 272 deletions codex-rs/app-server/src/codex_message_processor.rs

Large diffs are not rendered by default.

36 changes: 9 additions & 27 deletions codex-rs/core/src/agent/control_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use crate::config::Config;
use crate::config::ConfigBuilder;
use crate::contextual_user_message::SUBAGENT_NOTIFICATION_OPEN_TAG;
use assert_matches::assert_matches;
use chrono::Utc;
use codex_features::Feature;
use codex_login::CodexAuth;
use codex_protocol::AgentPath;
Expand All @@ -24,6 +23,9 @@ use codex_protocol::protocol::TurnAbortReason;
use codex_protocol::protocol::TurnAbortedEvent;
use codex_protocol::protocol::TurnCompleteEvent;
use codex_protocol::protocol::TurnStartedEvent;
use codex_thread_store::ArchiveThreadParams;
use codex_thread_store::LocalThreadStore;
use codex_thread_store::ThreadStore;
use pretty_assertions::assert_eq;
use tempfile::TempDir;
use tokio::time::Duration;
Expand Down Expand Up @@ -1658,38 +1660,18 @@ async fn resume_agent_from_rollout_reads_archived_rollout_path() {
.await
.expect("child thread should exist");
persist_thread_for_tree_resume(&child_thread, "persist before archiving").await;
let rollout_path = child_thread
.rollout_path()
.expect("thread should have rollout path");
let state_db = child_thread
.state_db()
.expect("thread should have state db handle");

let _ = harness
.control
.shutdown_live_agent(child_thread_id)
.await
.expect("child shutdown should succeed");

let archived_root = harness
.config
.codex_home
.join(crate::ARCHIVED_SESSIONS_SUBDIR);
tokio::fs::create_dir_all(&archived_root)
.await
.expect("archived root should exist");
let archived_rollout_path = archived_root.join(
rollout_path
.file_name()
.expect("rollout file name should be present"),
);
tokio::fs::rename(&rollout_path, &archived_rollout_path)
.await
.expect("rollout should move to archived path");
state_db
.mark_archived(child_thread_id, archived_rollout_path.as_path(), Utc::now())
let store = LocalThreadStore::new(codex_rollout::RolloutConfig::from_view(&harness.config));
store
.archive_thread(ArchiveThreadParams {
thread_id: child_thread_id,
})
.await
.expect("state db archive update should succeed");
.expect("child thread should archive");

let resumed_thread_id = harness
.control
Expand Down
1 change: 1 addition & 0 deletions codex-rs/rollout/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ pub use list::get_threads_in_root;
pub use list::parse_cursor;
pub use list::read_head_for_summary;
pub use list::read_session_meta_line;
pub use list::read_thread_item_from_rollout;
pub use list::rollout_date_parts;
pub use metadata::builder_from_items;
pub use policy::EventPersistenceMode;
Expand Down
15 changes: 15 additions & 0 deletions codex-rs/rollout/src/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -756,6 +756,21 @@ async fn build_thread_item(
None
}

/// Read a single rollout file into the same summary item shape used by thread listing.
///
/// This is for callers that already resolved a rollout path and need the same
/// metadata/preview extraction as list operations without scanning the whole
/// sessions tree.
pub async fn read_thread_item_from_rollout(path: PathBuf) -> Option<ThreadItem> {
build_thread_item(
path,
&[],
/*provider_matcher*/ None,
/*updated_at*/ None,
)
.await
}

/// Collects immediate subdirectories of `parent`, parses their (string) names with `parse`,
/// and returns them sorted descending by the parsed key.
async fn collect_dirs_desc<T, F>(parent: &Path, parse: F) -> io::Result<Vec<(T, PathBuf)>>
Expand Down
170 changes: 170 additions & 0 deletions codex-rs/thread-store/src/local/archive_thread.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
use chrono::Utc;
use codex_rollout::find_thread_path_by_id_str;

use super::LocalThreadStore;
use super::helpers::matching_rollout_file_name;
use super::helpers::scoped_rollout_path;
use crate::ArchiveThreadParams;
use crate::ThreadStoreError;
use crate::ThreadStoreResult;

pub(super) async fn archive_thread(
store: &LocalThreadStore,
params: ArchiveThreadParams,
) -> ThreadStoreResult<()> {
let thread_id = params.thread_id;
let rollout_path =
find_thread_path_by_id_str(store.config.codex_home.as_path(), &thread_id.to_string())
.await
.map_err(|err| ThreadStoreError::InvalidRequest {
message: format!("failed to locate thread id {thread_id}: {err}"),
})?
.ok_or_else(|| ThreadStoreError::InvalidRequest {
message: format!("no rollout found for thread id {thread_id}"),
})?;

let canonical_rollout_path = scoped_rollout_path(
store.config.codex_home.join(codex_rollout::SESSIONS_SUBDIR),
rollout_path.as_path(),
"sessions",
)?;
let file_name = matching_rollout_file_name(
canonical_rollout_path.as_path(),
thread_id,
rollout_path.as_path(),
)?;

let archive_folder = store
.config
.codex_home
.join(codex_rollout::ARCHIVED_SESSIONS_SUBDIR);
std::fs::create_dir_all(&archive_folder).map_err(|err| ThreadStoreError::Internal {
message: format!("failed to archive thread: {err}"),
})?;
let archived_path = archive_folder.join(&file_name);
std::fs::rename(&canonical_rollout_path, &archived_path).map_err(|err| {
ThreadStoreError::Internal {
message: format!("failed to archive thread: {err}"),
}
})?;

if let Some(ctx) = codex_rollout::state_db::get_state_db(&store.config).await {
let _ = ctx
.mark_archived(thread_id, archived_path.as_path(), Utc::now())
.await;
}
Ok(())
}

#[cfg(test)]
mod tests {
use chrono::Utc;
use codex_protocol::ThreadId;
use codex_protocol::protocol::SessionSource;
use codex_rollout::ARCHIVED_SESSIONS_SUBDIR;
use pretty_assertions::assert_eq;
use tempfile::TempDir;
use uuid::Uuid;

use super::*;
use crate::ListThreadsParams;
use crate::ThreadSortKey;
use crate::ThreadStore;
use crate::local::LocalThreadStore;
use crate::local::test_support::test_config;
use crate::local::test_support::write_session_file;

#[tokio::test]
async fn archive_thread_moves_rollout_to_archived_collection() {
let home = TempDir::new().expect("temp dir");
let store = LocalThreadStore::new(test_config(home.path()));
let uuid = Uuid::from_u128(201);
let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id");
let active_path =
write_session_file(home.path(), "2025-01-03T12-00-00", uuid).expect("session file");

store
.archive_thread(ArchiveThreadParams { thread_id })
.await
.expect("archive thread");

assert!(!active_path.exists());
let archived_path = home
.path()
.join(ARCHIVED_SESSIONS_SUBDIR)
.join(active_path.file_name().expect("file name"));
assert!(archived_path.exists());

let archived = store
.list_threads(ListThreadsParams {
page_size: 10,
cursor: None,
sort_key: ThreadSortKey::CreatedAt,
allowed_sources: Vec::new(),
model_providers: None,
archived: true,
search_term: None,
})
.await
.expect("archived listing");
assert_eq!(archived.items.len(), 1);
assert_eq!(archived.items[0].thread_id, thread_id);
assert_eq!(archived.items[0].rollout_path, Some(archived_path));
assert_eq!(
archived.items[0].archived_at,
Some(archived.items[0].updated_at)
);
}

#[tokio::test]
async fn archive_thread_updates_sqlite_metadata_when_present() {
let home = TempDir::new().expect("temp dir");
let config = test_config(home.path());
let store = LocalThreadStore::new(config.clone());
let uuid = Uuid::from_u128(202);
let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id");
let active_path =
write_session_file(home.path(), "2025-01-03T12-00-00", uuid).expect("session file");
let runtime = codex_state::StateRuntime::init(
home.path().to_path_buf(),
config.model_provider_id.clone(),
)
.await
.expect("state db should initialize");
runtime
.mark_backfill_complete(/*last_watermark*/ None)
.await
.expect("backfill should be complete");
let mut builder = codex_state::ThreadMetadataBuilder::new(
thread_id,
active_path.clone(),
Utc::now(),
SessionSource::Cli,
);
builder.model_provider = Some(config.model_provider_id.clone());
builder.cwd = home.path().to_path_buf();
builder.cli_version = Some("test_version".to_string());
let metadata = builder.build(config.model_provider_id.as_str());
runtime
.upsert_thread(&metadata)
.await
.expect("state db upsert should succeed");

store
.archive_thread(ArchiveThreadParams { thread_id })
.await
.expect("archive thread");

let archived_path = home
.path()
.join(ARCHIVED_SESSIONS_SUBDIR)
.join(active_path.file_name().expect("file name"));
let updated = runtime
.get_thread(thread_id)
.await
.expect("state db read should succeed")
.expect("thread metadata should exist");
assert_eq!(updated.rollout_path, archived_path);
assert!(updated.archived_at.is_some());
}
}
Loading
Loading