diff --git a/codex-rs/app-server/src/codex_message_processor.rs b/codex-rs/app-server/src/codex_message_processor.rs index bf6b4bdf933..c6f8e380873 100644 --- a/codex-rs/app-server/src/codex_message_processor.rs +++ b/codex-rs/app-server/src/codex_message_processor.rs @@ -247,7 +247,6 @@ use codex_core::exec::ExecParams; use codex_core::exec_env::create_env; use codex_core::find_archived_thread_path_by_id_str; use codex_core::find_thread_name_by_id; -use codex_core::find_thread_names_by_ids; use codex_core::find_thread_path_by_id_str; use codex_core::path_utils; use codex_core::plugins::PluginInstallError as CorePluginInstallError; @@ -354,7 +353,6 @@ use codex_state::ThreadMetadata; use codex_state::ThreadMetadataBuilder; use codex_state::log_db::LogDbLayer; use codex_thread_store::ArchiveThreadParams as StoreArchiveThreadParams; -#[cfg(debug_assertions)] use codex_thread_store::InMemoryThreadStore; use codex_thread_store::ListThreadsParams as StoreListThreadsParams; use codex_thread_store::LocalThreadStore; @@ -667,7 +665,6 @@ fn configured_thread_store(config: &Config) -> Arc { match &config.experimental_thread_store { ThreadStoreConfig::Local => Arc::new(configured_local_thread_store(config)), ThreadStoreConfig::Remote { endpoint } => Arc::new(RemoteThreadStore::new(endpoint)), - #[cfg(debug_assertions)] ThreadStoreConfig::InMemory { id } => InMemoryThreadStore::for_id(id), } } @@ -3961,31 +3958,30 @@ impl CodexMessageProcessor { }, ) .await; - let (summaries, next_cursor) = match list_result { + let (stored_threads, next_cursor) = match list_result { Ok(r) => r, Err(error) => { self.outgoing.send_error(request_id, error).await; return; } }; - let backwards_cursor = summaries.first().and_then(|summary| { - thread_backwards_cursor_for_sort_key(summary, store_sort_key, sort_direction) + let backwards_cursor = stored_threads.first().and_then(|thread| { + thread_backwards_cursor_for_sort_key(thread, store_sort_key, sort_direction) }); - let mut threads = Vec::with_capacity(summaries.len()); - let mut thread_ids = HashSet::with_capacity(summaries.len()); - let mut status_ids = Vec::with_capacity(summaries.len()); - - for summary in summaries { - let conversation_id = summary.conversation_id; - thread_ids.insert(conversation_id); + let mut threads = Vec::with_capacity(stored_threads.len()); + let mut status_ids = Vec::with_capacity(stored_threads.len()); + let fallback_provider = self.config.model_provider_id.clone(); - let thread = summary_to_thread(summary, &self.config.cwd); + for stored_thread in stored_threads { + let (thread, _) = thread_from_stored_thread( + stored_thread, + fallback_provider.as_str(), + &self.config.cwd, + ); status_ids.push(thread.id.clone()); - threads.push((conversation_id, thread)); + threads.push(thread); } - let names = thread_titles_by_ids(&self.config, &thread_ids).await; - let statuses = self .thread_watch_manager .loaded_statuses_for_threads(status_ids) @@ -3993,10 +3989,7 @@ impl CodexMessageProcessor { let data: Vec<_> = threads .into_iter() - .map(|(conversation_id, mut thread)| { - if let Some(title) = names.get(&conversation_id).cloned() { - set_thread_name_from_title(&mut thread, title); - } + .map(|mut thread| { if let Some(status) = statuses.get(&thread.id) { thread.status = status.clone(); } @@ -4106,7 +4099,7 @@ impl CodexMessageProcessor { thread_id: ThreadId, include_turns: bool, ) -> Result { - let loaded_thread = self.load_live_thread_for_read(thread_id).await; + let loaded_thread = self.thread_manager.get_thread(thread_id).await.ok(); let mut thread = if let Some(thread) = self .load_persisted_thread_for_read(thread_id, include_turns) .await? @@ -4142,10 +4135,6 @@ impl CodexMessageProcessor { Ok(thread) } - async fn load_live_thread_for_read(&self, thread_id: ThreadId) -> Option> { - self.thread_manager.get_thread(thread_id).await.ok() - } - async fn load_persisted_thread_for_read( &self, thread_id: ThreadId, @@ -4271,95 +4260,38 @@ impl CodexMessageProcessor { } }; - let state_db_ctx = get_state_db(&self.config).await; - let mut rollout_path = self - .resolve_rollout_path(thread_uuid, state_db_ctx.as_ref()) - .await; - if rollout_path.is_none() { - rollout_path = - match find_thread_path_by_id_str(&self.config.codex_home, &thread_uuid.to_string()) - .await - { - Ok(Some(path)) => Some(path), - Ok(None) => match find_archived_thread_path_by_id_str( - &self.config.codex_home, - &thread_uuid.to_string(), - ) - .await - { - Ok(path) => path, - Err(err) => { - self.send_invalid_request_error( - request_id, - format!("failed to locate archived thread id {thread_uuid}: {err}"), - ) - .await; - return; - } - }, - Err(err) => { - self.send_invalid_request_error( - request_id, - format!("failed to locate thread id {thread_uuid}: {err}"), - ) - .await; - return; - } - }; - } - - if rollout_path.is_none() { - match self.thread_manager.get_thread(thread_uuid).await { - Ok(thread) => { - rollout_path = thread.rollout_path(); - if rollout_path.is_none() { - self.send_invalid_request_error( - request_id, - "ephemeral threads do not support thread/turns/list".to_string(), - ) - .await; - return; - } - } - Err(_) => { - self.send_invalid_request_error( - request_id, - format!("thread not loaded: {thread_uuid}"), - ) - .await; - return; - } - } - } - - let Some(rollout_path) = rollout_path.as_ref() else { - self.send_internal_error( - request_id, - format!("failed to locate rollout for thread {thread_uuid}"), - ) - .await; - return; - }; - - match read_rollout_items_from_rollout(rollout_path).await { + match self.load_thread_turns_list_history(thread_uuid).await { Ok(items) => { // This API optimizes network transfer by letting clients page through a // thread's turns incrementally, but it still replays the entire rollout on // every request. Rollback and compaction events can change earlier turns, so // the server has to rebuild the full turn list until turn metadata is indexed // separately. - let has_live_in_progress_turn = - match self.thread_manager.get_thread(thread_uuid).await { - Ok(thread) => matches!(thread.agent_status().await, AgentStatus::Running), - Err(_) => false, - }; - let turns = reconstruct_thread_turns_from_rollout_items( + let loaded_thread = self.thread_manager.get_thread(thread_uuid).await.ok(); + let has_live_in_progress_turn = match loaded_thread.as_ref() { + Some(thread) => matches!(thread.agent_status().await, AgentStatus::Running), + None => false, + }; + let mut turns = reconstruct_thread_turns_from_rollout_items( &items, self.thread_watch_manager .loaded_status_for_thread(&thread_uuid.to_string()) .await, has_live_in_progress_turn, ); + if has_live_in_progress_turn { + // Persisted history may not yet include the currently running turn. The + // app-server listener has already projected live turn events into ThreadState, + // so merge that in-memory snapshot before paginating. + let thread_state = self.thread_state_manager.thread_state(thread_uuid).await; + let active_turn = { + let state = thread_state.lock().await; + state.active_turn_snapshot() + }; + if let Some(active_turn) = active_turn { + merge_turn_history_with_active_turn(&mut turns, active_turn); + } + } let page = match paginate_thread_turns( turns, cursor.as_deref(), @@ -4379,26 +4311,70 @@ impl CodexMessageProcessor { }; self.outgoing.send_response(request_id, response).await; } - Err(err) if err.kind() == std::io::ErrorKind::NotFound => { - self.send_invalid_request_error( - request_id, - format!( - "thread {thread_uuid} is not materialized yet; thread/turns/list is unavailable before first user message" - ), - ) - .await; + Err(ThreadReadViewError::InvalidRequest(message)) => { + self.send_invalid_request_error(request_id, message).await; + } + Err(ThreadReadViewError::Internal(message)) => { + self.send_internal_error(request_id, message).await; + } + } + } + + async fn load_thread_turns_list_history( + &self, + thread_id: ThreadId, + ) -> Result, ThreadReadViewError> { + match self + .thread_store + .read_thread(StoreReadThreadParams { + thread_id, + include_archived: true, + include_history: true, + }) + .await + { + Ok(stored_thread) => { + let history = stored_thread.history.ok_or_else(|| { + ThreadReadViewError::Internal(format!( + "thread store did not return history for thread {thread_id}" + )) + })?; + return Ok(history.items); + } + Err(ThreadStoreError::InvalidRequest { message }) + if message == format!("no rollout found for thread id {thread_id}") => {} + Err(ThreadStoreError::ThreadNotFound { + thread_id: missing_thread_id, + }) if missing_thread_id == thread_id => {} + Err(ThreadStoreError::InvalidRequest { message }) => { + return Err(ThreadReadViewError::InvalidRequest(message)); } Err(err) => { - self.send_internal_error( - request_id, - format!( - "failed to load rollout `{}` for thread {thread_uuid}: {err}", - rollout_path.display() - ), - ) - .await; + return Err(ThreadReadViewError::Internal(format!( + "failed to read thread: {err}" + ))); } } + + let thread = self + .thread_manager + .get_thread(thread_id) + .await + .map_err(|_| { + ThreadReadViewError::InvalidRequest(format!("thread not loaded: {thread_id}")) + })?; + let config_snapshot = thread.config_snapshot().await; + if config_snapshot.ephemeral { + return Err(ThreadReadViewError::InvalidRequest( + "ephemeral threads do not support thread/turns/list".to_string(), + )); + } + + thread + .load_history(/*include_archived*/ true) + .await + .map(|history| history.items) + .map_err(|err| thread_turns_list_history_load_error(thread_id, err)) } pub(crate) fn thread_created_receiver(&self) -> broadcast::Receiver { @@ -5006,12 +4982,11 @@ impl CodexMessageProcessor { let (mut thread, history) = thread_from_stored_thread(stored_thread, fallback_provider, &self.config.cwd); if include_turns && let Some(history) = history { - populate_thread_turns( + populate_thread_turns_from_history( &mut thread, - ThreadTurnSource::HistoryItems(&history.items), + &history.items, /*active_turn*/ None, - ) - .await?; + )?; } Ok(thread) } @@ -5102,12 +5077,11 @@ impl CodexMessageProcessor { thread.path = Some(rollout_path.to_path_buf()); if include_turns { let history_items = thread_history.get_rollout_items(); - populate_thread_turns( + populate_thread_turns_from_history( &mut thread, - ThreadTurnSource::HistoryItems(&history_items), + &history_items, /*active_turn*/ None, - ) - .await?; + )?; } self.attach_thread_name(thread_id, &mut thread).await; Ok(thread) @@ -5331,12 +5305,11 @@ impl CodexMessageProcessor { thread.preview = preview_from_rollout_items(&history_items); thread.forked_from_id = Some(source_thread_id.to_string()); if include_turns - && let Err(message) = populate_thread_turns( + && let Err(message) = populate_thread_turns_from_history( &mut thread, - ThreadTurnSource::HistoryItems(&history_items), + &history_items, /*active_turn*/ None, ) - .await { self.send_internal_error(request_id, message).await; return; @@ -5488,7 +5461,7 @@ impl CodexMessageProcessor { sort_key: StoreThreadSortKey, sort_direction: SortDirection, filters: ThreadListFilters, - ) -> Result<(Vec, Option), JSONRPCErrorError> { + ) -> Result<(Vec, Option), JSONRPCErrorError> { let ThreadListFilters { model_providers, source_kinds, @@ -5513,7 +5486,6 @@ impl CodexMessageProcessor { } None => Some(vec![self.config.model_provider_id.clone()]), }; - let fallback_provider = self.config.model_provider_id.clone(); let (allowed_sources_vec, source_kind_filter) = compute_source_filters(source_kinds); let allowed_sources = allowed_sources_vec.as_slice(); let store_sort_direction = match sort_direction { @@ -5542,20 +5514,21 @@ impl CodexMessageProcessor { let mut filtered = Vec::with_capacity(page.items.len()); for it in page.items { - let Some(summary) = summary_from_stored_thread(it, fallback_provider.as_str()) - else { - continue; - }; + let source = with_thread_spawn_agent_metadata( + it.source.clone(), + it.agent_nickname.clone(), + it.agent_role.clone(), + ); if source_kind_filter .as_ref() - .is_none_or(|filter| source_kind_matches(&summary.source, filter)) + .is_none_or(|filter| source_kind_matches(&source, filter)) && cwd_filters.as_ref().is_none_or(|expected_cwds| { expected_cwds.iter().any(|expected_cwd| { - path_utils::paths_match_after_normalization(&summary.cwd, expected_cwd) + path_utils::paths_match_after_normalization(&it.cwd, expected_cwd) }) }) { - filtered.push(summary); + filtered.push(it); if filtered.len() >= remaining { break; } @@ -8854,12 +8827,11 @@ async fn handle_pending_thread_resume_request( let connection_id = request_id.connection_id; let mut thread = pending.thread_summary; if pending.include_turns - && let Err(message) = populate_thread_turns( + && let Err(message) = populate_thread_turns_from_history( &mut thread, - ThreadTurnSource::HistoryItems(&pending.history_items), + &pending.history_items, active_turn.as_ref(), ) - .await { outgoing .send_error( @@ -8969,18 +8941,12 @@ async fn handle_pending_thread_resume_request( .await; } -enum ThreadTurnSource<'a> { - HistoryItems(&'a [RolloutItem]), -} - -async fn populate_thread_turns( +fn populate_thread_turns_from_history( thread: &mut Thread, - turn_source: ThreadTurnSource<'_>, + items: &[RolloutItem], active_turn: Option<&Turn>, ) -> std::result::Result<(), String> { - let mut turns = match turn_source { - ThreadTurnSource::HistoryItems(items) => build_turns_from_rollout_items(items), - }; + let mut turns = build_turns_from_rollout_items(items); if let Some(active_turn) = active_turn { merge_turn_history_with_active_turn(&mut turns, active_turn.clone()); } @@ -9428,31 +9394,6 @@ async fn title_from_state_db(config: &Config, thread_id: ThreadId) -> Option, -) -> HashMap { - let mut names = HashMap::with_capacity(thread_ids.len()); - if let Some(state_db_ctx) = open_state_db_for_direct_thread_lookup(config).await { - for &thread_id in thread_ids { - let Ok(Some(metadata)) = state_db_ctx.get_thread(thread_id).await else { - continue; - }; - if let Some(title) = distinct_title(&metadata) { - names.insert(thread_id, title); - } - } - } - if names.len() < thread_ids.len() - && let Ok(legacy_names) = find_thread_names_by_ids(&config.codex_home, thread_ids).await - { - for (thread_id, title) in legacy_names { - names.entry(thread_id).or_insert(title); - } - } - names -} - async fn open_state_db_for_direct_thread_lookup(config: &Config) -> Option { StateRuntime::init(config.sqlite_home.clone(), config.model_provider_id.clone()) .await @@ -9515,6 +9456,27 @@ fn thread_store_resume_read_error(err: ThreadStoreError) -> JSONRPCErrorError { } } +fn thread_turns_list_history_load_error( + thread_id: ThreadId, + err: ThreadStoreError, +) -> ThreadReadViewError { + match err { + ThreadStoreError::InvalidRequest { message } + if message.starts_with("failed to resolve rollout path `") => + { + ThreadReadViewError::InvalidRequest(format!( + "thread {thread_id} is not materialized yet; thread/turns/list is unavailable before first user message" + )) + } + ThreadStoreError::InvalidRequest { message } => { + ThreadReadViewError::InvalidRequest(message) + } + err => ThreadReadViewError::Internal(format!( + "failed to load thread history for thread {thread_id}: {err}" + )), + } +} + fn conversation_summary_thread_id_read_error( conversation_id: ThreadId, err: ThreadStoreError, @@ -10132,18 +10094,14 @@ pub(crate) fn summary_to_thread( } fn thread_backwards_cursor_for_sort_key( - summary: &ConversationSummary, + thread: &StoredThread, sort_key: StoreThreadSortKey, sort_direction: SortDirection, ) -> Option { let timestamp = match sort_key { - StoreThreadSortKey::CreatedAt => summary.timestamp.as_deref(), - StoreThreadSortKey::UpdatedAt => summary - .updated_at - .as_deref() - .or(summary.timestamp.as_deref()), + StoreThreadSortKey::CreatedAt => thread.created_at, + StoreThreadSortKey::UpdatedAt => thread.updated_at, }; - let timestamp = parse_datetime(timestamp)?; // The state DB stores unique millisecond timestamps. Offset the reverse cursor by one // millisecond so the opposite-direction query includes the page anchor. let timestamp = match sort_direction { diff --git a/codex-rs/app-server/tests/suite/v2/remote_thread_store.rs b/codex-rs/app-server/tests/suite/v2/remote_thread_store.rs index ebee1fd7c15..3a7798c6e20 100644 --- a/codex-rs/app-server/tests/suite/v2/remote_thread_store.rs +++ b/codex-rs/app-server/tests/suite/v2/remote_thread_store.rs @@ -28,6 +28,8 @@ use codex_app_server_protocol::ClientRequest; use codex_app_server_protocol::InitializeParams; use codex_app_server_protocol::RequestId; use codex_app_server_protocol::ServerNotification; +use codex_app_server_protocol::ThreadListParams; +use codex_app_server_protocol::ThreadListResponse; use codex_app_server_protocol::ThreadStartParams; use codex_app_server_protocol::ThreadStartResponse; use codex_app_server_protocol::TurnStartParams; @@ -134,10 +136,35 @@ async fn thread_start_with_non_local_thread_store_does_not_create_local_persiste }) .await??; + let response = client + .request(ClientRequest::ThreadList { + request_id: RequestId::Integer(3), + params: ThreadListParams { + cursor: None, + limit: Some(10), + sort_key: None, + sort_direction: None, + model_providers: Some(Vec::new()), + source_kinds: None, + archived: None, + cwd: None, + use_state_db_only: false, + search_term: None, + }, + }) + .await? + .expect("thread/list should succeed"); + let ThreadListResponse { data, .. } = + serde_json::from_value(response).expect("thread/list response should parse"); + assert_eq!(data.len(), 1); + assert_eq!(data[0].id, thread.id); + assert_eq!(data[0].path, None); + client.shutdown().await?; let calls = thread_store.calls().await; assert_eq!(calls.create_thread, 1); + assert_eq!(calls.list_threads, 1); assert!( calls.append_items > 0, "turn/start should append rollout items through the injected store" diff --git a/codex-rs/app-server/tests/suite/v2/thread_read.rs b/codex-rs/app-server/tests/suite/v2/thread_read.rs index 8e0e253ac0c..b6dc29dcc6b 100644 --- a/codex-rs/app-server/tests/suite/v2/thread_read.rs +++ b/codex-rs/app-server/tests/suite/v2/thread_read.rs @@ -5,6 +5,12 @@ use app_test_support::create_mock_responses_server_repeating_assistant; use app_test_support::rollout_path; use app_test_support::test_absolute_path; use app_test_support::to_response; +use codex_app_server::in_process; +use codex_app_server::in_process::InProcessStartArgs; +use codex_app_server_protocol::ClientInfo; +use codex_app_server_protocol::ClientRequest; +use codex_app_server_protocol::InitializeCapabilities; +use codex_app_server_protocol::InitializeParams; use codex_app_server_protocol::JSONRPCError; use codex_app_server_protocol::JSONRPCResponse; use codex_app_server_protocol::RequestId; @@ -31,17 +37,37 @@ use codex_app_server_protocol::TurnStartParams; use codex_app_server_protocol::TurnStartResponse; use codex_app_server_protocol::TurnStatus; use codex_app_server_protocol::UserInput; +use codex_arg0::Arg0DispatchPaths; use codex_core::ARCHIVED_SESSIONS_SUBDIR; +use codex_core::config::ConfigBuilder; +use codex_core::config_loader::CloudRequirementsLoader; +use codex_core::config_loader::LoaderOverrides; +use codex_exec_server::EnvironmentManager; +use codex_feedback::CodexFeedback; +use codex_protocol::models::BaseInstructions; +use codex_protocol::protocol::EventMsg; +use codex_protocol::protocol::RolloutItem; +use codex_protocol::protocol::SessionSource as ProtocolSessionSource; +use codex_protocol::protocol::UserMessageEvent; use codex_protocol::user_input::ByteRange; use codex_protocol::user_input::TextElement; +use codex_thread_store::AppendThreadItemsParams; +use codex_thread_store::CreateThreadParams; +use codex_thread_store::InMemoryThreadStore; +use codex_thread_store::ThreadEventPersistenceMode; +use codex_thread_store::ThreadMetadataPatch; +use codex_thread_store::ThreadStore; +use codex_thread_store::UpdateThreadMetadataParams; use core_test_support::responses; use pretty_assertions::assert_eq; use serde_json::Value; use serde_json::json; use std::io::Write; use std::path::Path; +use std::sync::Arc; use tempfile::TempDir; use tokio::time::timeout; +use uuid::Uuid; #[cfg(windows)] const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(25); @@ -246,6 +272,147 @@ async fn thread_turns_list_can_page_backward_and_forward() -> Result<()> { Ok(()) } +#[tokio::test] +async fn thread_turns_list_reads_store_history_without_rollout_path() -> Result<()> { + let codex_home = TempDir::new()?; + let thread_id = codex_protocol::ThreadId::from_string("00000000-0000-4000-8000-000000000123")?; + let store_id = Uuid::new_v4().to_string(); + create_config_toml_with_thread_store(codex_home.path(), &store_id)?; + let store = InMemoryThreadStore::for_id(store_id.clone()); + let _in_memory_store = InMemoryThreadStoreId { store_id }; + seed_pathless_store_thread(&store, thread_id).await?; + + let loader_overrides = LoaderOverrides::without_managed_config_for_tests(); + let config = ConfigBuilder::default() + .codex_home(codex_home.path().to_path_buf()) + .fallback_cwd(Some(codex_home.path().to_path_buf())) + .loader_overrides(loader_overrides.clone()) + .build() + .await?; + let client = in_process::start(InProcessStartArgs { + arg0_paths: Arg0DispatchPaths::default(), + config: Arc::new(config), + cli_overrides: Vec::new(), + loader_overrides, + cloud_requirements: CloudRequirementsLoader::default(), + thread_config_loader: Arc::new(codex_config::NoopThreadConfigLoader), + feedback: CodexFeedback::new(), + log_db: None, + environment_manager: Arc::new(EnvironmentManager::default_for_tests()), + config_warnings: Vec::new(), + session_source: SessionSource::Cli.into(), + enable_codex_api_key_env: false, + initialize: InitializeParams { + client_info: ClientInfo { + name: "codex-app-server-tests".to_string(), + title: None, + version: "0.1.0".to_string(), + }, + capabilities: Some(InitializeCapabilities { + experimental_api: true, + ..Default::default() + }), + }, + channel_capacity: in_process::DEFAULT_IN_PROCESS_CHANNEL_CAPACITY, + }) + .await?; + + let result = client + .request(ClientRequest::ThreadTurnsList { + request_id: RequestId::Integer(1), + params: ThreadTurnsListParams { + thread_id: thread_id.to_string(), + cursor: None, + limit: Some(10), + sort_direction: Some(SortDirection::Asc), + }, + }) + .await? + .expect("thread/turns/list should succeed"); + let ThreadTurnsListResponse { data, .. } = serde_json::from_value(result)?; + + assert_eq!(turn_user_texts(&data), vec!["history from store"]); + + client.shutdown().await?; + Ok(()) +} + +#[tokio::test] +async fn thread_list_includes_store_thread_without_rollout_path() -> Result<()> { + let codex_home = TempDir::new()?; + let thread_id = codex_protocol::ThreadId::from_string("00000000-0000-4000-8000-000000000124")?; + let store_id = Uuid::new_v4().to_string(); + create_config_toml_with_thread_store(codex_home.path(), &store_id)?; + let store = InMemoryThreadStore::for_id(store_id.clone()); + let _in_memory_store = InMemoryThreadStoreId { store_id }; + seed_pathless_store_thread(&store, thread_id).await?; + + let loader_overrides = LoaderOverrides::without_managed_config_for_tests(); + let config = ConfigBuilder::default() + .codex_home(codex_home.path().to_path_buf()) + .fallback_cwd(Some(codex_home.path().to_path_buf())) + .loader_overrides(loader_overrides.clone()) + .build() + .await?; + let client = in_process::start(InProcessStartArgs { + arg0_paths: Arg0DispatchPaths::default(), + config: Arc::new(config), + cli_overrides: Vec::new(), + loader_overrides, + cloud_requirements: CloudRequirementsLoader::default(), + thread_config_loader: Arc::new(codex_config::NoopThreadConfigLoader), + feedback: CodexFeedback::new(), + log_db: None, + environment_manager: Arc::new(EnvironmentManager::default_for_tests()), + config_warnings: Vec::new(), + session_source: SessionSource::Cli.into(), + enable_codex_api_key_env: false, + initialize: InitializeParams { + client_info: ClientInfo { + name: "codex-app-server-tests".to_string(), + title: None, + version: "0.1.0".to_string(), + }, + capabilities: Some(InitializeCapabilities { + experimental_api: true, + ..Default::default() + }), + }, + channel_capacity: in_process::DEFAULT_IN_PROCESS_CHANNEL_CAPACITY, + }) + .await?; + + let result = client + .request(ClientRequest::ThreadList { + request_id: RequestId::Integer(1), + params: ThreadListParams { + cursor: None, + limit: Some(10), + sort_key: None, + sort_direction: None, + model_providers: Some(Vec::new()), + source_kinds: None, + archived: None, + cwd: None, + use_state_db_only: false, + search_term: None, + }, + }) + .await? + .expect("thread/list should succeed"); + let ThreadListResponse { data, .. } = serde_json::from_value(result)?; + + assert_eq!(data.len(), 1); + let thread = &data[0]; + assert_eq!(thread.id, thread_id.to_string()); + assert_eq!(thread.path, None); + assert_eq!(thread.preview, ""); + assert_eq!(thread.name.as_deref(), Some("named pathless thread")); + + client.shutdown().await?; + Ok(()) +} + #[tokio::test] async fn thread_read_can_return_archived_threads_by_id() -> Result<()> { let server = create_mock_responses_server_repeating_assistant("Done").await; @@ -670,6 +837,59 @@ async fn thread_read_include_turns_rejects_unmaterialized_loaded_thread() -> Res Ok(()) } +#[tokio::test] +async fn thread_turns_list_rejects_unmaterialized_loaded_thread() -> Result<()> { + let server = create_mock_responses_server_repeating_assistant("Done").await; + let codex_home = TempDir::new()?; + create_config_toml(codex_home.path(), &server.uri())?; + + let mut mcp = McpProcess::new(codex_home.path()).await?; + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + + let start_id = mcp + .send_thread_start_request(ThreadStartParams { + model: Some("mock-model".to_string()), + ..Default::default() + }) + .await?; + let start_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(start_id)), + ) + .await??; + let ThreadStartResponse { thread, .. } = to_response::(start_resp)?; + let thread_path = thread.path.clone().expect("thread path"); + assert!( + !thread_path.exists(), + "fresh thread rollout should not be materialized yet" + ); + + let read_id = mcp + .send_thread_turns_list_request(ThreadTurnsListParams { + thread_id: thread.id, + cursor: None, + limit: None, + sort_direction: None, + }) + .await?; + let read_err: JSONRPCError = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_error_message(RequestId::Integer(read_id)), + ) + .await??; + + assert!( + read_err + .error + .message + .contains("thread/turns/list is unavailable before first user message"), + "unexpected error: {}", + read_err.error.message + ); + + Ok(()) +} + #[tokio::test] async fn thread_read_reports_system_error_idle_flag_after_failed_turn() -> Result<()> { let server = responses::start_mock_server().await; @@ -787,6 +1007,84 @@ fn turn_user_texts(turns: &[codex_app_server_protocol::Turn]) -> Vec<&str> { .collect() } +struct InMemoryThreadStoreId { + store_id: String, +} + +impl Drop for InMemoryThreadStoreId { + fn drop(&mut self) { + InMemoryThreadStore::remove_id(&self.store_id); + } +} + +async fn seed_pathless_store_thread( + store: &InMemoryThreadStore, + thread_id: codex_protocol::ThreadId, +) -> Result<()> { + store + .create_thread(CreateThreadParams { + thread_id, + forked_from_id: None, + source: ProtocolSessionSource::Cli, + base_instructions: BaseInstructions::default(), + dynamic_tools: Vec::new(), + event_persistence_mode: ThreadEventPersistenceMode::default(), + }) + .await?; + store + .append_items(AppendThreadItemsParams { + thread_id, + items: store_history_items(), + }) + .await?; + store + .update_thread_metadata(UpdateThreadMetadataParams { + thread_id, + patch: ThreadMetadataPatch { + name: Some("named pathless thread".to_string()), + ..Default::default() + }, + include_archived: true, + }) + .await?; + Ok(()) +} + +fn store_history_items() -> Vec { + vec![RolloutItem::EventMsg(EventMsg::UserMessage( + UserMessageEvent { + message: "history from store".to_string(), + images: None, + local_images: Vec::new(), + text_elements: Vec::new(), + }, + ))] +} + +fn create_config_toml_with_thread_store(codex_home: &Path, store_id: &str) -> std::io::Result<()> { + let config_toml = codex_home.join("config.toml"); + std::fs::write( + config_toml, + format!( + r#" +model = "mock-model" +approval_policy = "never" +sandbox_mode = "read-only" +experimental_thread_store = {{ type = "in_memory", id = "{store_id}" }} + +model_provider = "mock_provider" + +[model_providers.mock_provider] +name = "Mock provider for test" +base_url = "http://127.0.0.1:1/v1" +wire_api = "responses" +request_max_retries = 0 +stream_max_retries = 0 +"# + ), + ) +} + // Helper to create a config.toml pointing at the mock model server. fn create_config_toml(codex_home: &Path, server_uri: &str) -> std::io::Result<()> { let config_toml = codex_home.join("config.toml"); diff --git a/codex-rs/config/src/config_toml.rs b/codex-rs/config/src/config_toml.rs index f0de00192f5..b5c247acc0d 100644 --- a/codex-rs/config/src/config_toml.rs +++ b/codex-rs/config/src/config_toml.rs @@ -423,7 +423,6 @@ pub enum ThreadStoreToml { Remote { endpoint: String, }, - #[cfg(debug_assertions)] #[schemars(skip)] InMemory { id: String, diff --git a/codex-rs/core/src/codex_thread.rs b/codex-rs/core/src/codex_thread.rs index cda2d22fb06..6dff2b52f1e 100644 --- a/codex-rs/core/src/codex_thread.rs +++ b/codex-rs/core/src/codex_thread.rs @@ -30,6 +30,9 @@ use codex_protocol::protocol::TokenUsage; use codex_protocol::protocol::TokenUsageInfo; use codex_protocol::protocol::W3cTraceContext; use codex_protocol::user_input::UserInput; +use codex_thread_store::StoredThreadHistory; +use codex_thread_store::ThreadStoreError; +use codex_thread_store::ThreadStoreResult; use codex_utils_absolute_path::AbsolutePathBuf; use rmcp::model::ReadResourceRequestParams; use std::collections::HashMap; @@ -307,6 +310,20 @@ impl CodexThread { self.rollout_path.clone() } + pub async fn load_history( + &self, + include_archived: bool, + ) -> ThreadStoreResult { + let live_thread = self + .codex + .session + .live_thread_for_persistence("load history") + .map_err(|err| ThreadStoreError::Internal { + message: err.to_string(), + })?; + live_thread.load_history(include_archived).await + } + pub fn state_db(&self) -> Option { self.codex.state_db() } diff --git a/codex-rs/core/src/config/mod.rs b/codex-rs/core/src/config/mod.rs index 33fe18d1f48..550b44fbe9f 100644 --- a/codex-rs/core/src/config/mod.rs +++ b/codex-rs/core/src/config/mod.rs @@ -239,8 +239,7 @@ pub enum ThreadStoreConfig { Local, /// Persist threads through the remote thread-store service. Remote { endpoint: String }, - /// Test-only in-memory thread store. - #[cfg(debug_assertions)] + /// In-memory thread store for test and debug configurations. InMemory { id: String }, } @@ -1317,7 +1316,6 @@ fn thread_store_config( match thread_store { Some(ThreadStoreToml::Local {}) => ThreadStoreConfig::Local, Some(ThreadStoreToml::Remote { endpoint }) => ThreadStoreConfig::Remote { endpoint }, - #[cfg(debug_assertions)] Some(ThreadStoreToml::InMemory { id }) => ThreadStoreConfig::InMemory { id }, None => legacy_remote_endpoint.map_or(ThreadStoreConfig::Local, |endpoint| { ThreadStoreConfig::Remote { endpoint } diff --git a/codex-rs/core/src/thread_manager.rs b/codex-rs/core/src/thread_manager.rs index 30d220694df..331d2548c8c 100644 --- a/codex-rs/core/src/thread_manager.rs +++ b/codex-rs/core/src/thread_manager.rs @@ -53,7 +53,6 @@ use codex_protocol::protocol::TurnEnvironmentSelection; use codex_protocol::protocol::W3cTraceContext; use codex_rollout::RolloutConfig; use codex_state::DirectionalThreadSpawnEdgeStatus; -#[cfg(debug_assertions)] use codex_thread_store::InMemoryThreadStore; use codex_thread_store::LocalThreadStore; use codex_thread_store::RemoteThreadStore; @@ -259,7 +258,6 @@ fn configured_thread_store(config: &Config) -> Arc { Arc::new(LocalThreadStore::new(RolloutConfig::from_view(config))) } ThreadStoreConfig::Remote { endpoint } => Arc::new(RemoteThreadStore::new(endpoint)), - #[cfg(debug_assertions)] ThreadStoreConfig::InMemory { id } => InMemoryThreadStore::for_id(id), } } diff --git a/codex-rs/thread-store/src/in_memory.rs b/codex-rs/thread-store/src/in_memory.rs index 084975abd27..c54ecb4af26 100644 --- a/codex-rs/thread-store/src/in_memory.rs +++ b/codex-rs/thread-store/src/in_memory.rs @@ -61,9 +61,9 @@ pub struct InMemoryThreadStoreCalls { pub unarchive_thread: usize, } -/// Test-only in-memory [`ThreadStore`] implementation. +/// In-memory [`ThreadStore`] implementation for tests and debug configs. /// -/// Debug/test configs can select this store by id, letting tests exercise +/// Test and debug configs can select this store by id, letting tests exercise /// config-driven non-local persistence without requiring the real remote gRPC /// service. #[derive(Default)] diff --git a/codex-rs/thread-store/src/lib.rs b/codex-rs/thread-store/src/lib.rs index 42b9297bcae..b1adf5e743e 100644 --- a/codex-rs/thread-store/src/lib.rs +++ b/codex-rs/thread-store/src/lib.rs @@ -5,7 +5,6 @@ //! any other backing store. mod error; -#[cfg(debug_assertions)] mod in_memory; mod live_thread; mod local; @@ -15,9 +14,7 @@ mod types; pub use error::ThreadStoreError; pub use error::ThreadStoreResult; -#[cfg(debug_assertions)] pub use in_memory::InMemoryThreadStore; -#[cfg(debug_assertions)] pub use in_memory::InMemoryThreadStoreCalls; pub use live_thread::LiveThread; pub use live_thread::LiveThreadInitGuard; diff --git a/codex-rs/thread-store/src/local/helpers.rs b/codex-rs/thread-store/src/local/helpers.rs index 108218e9e94..e7ca821773d 100644 --- a/codex-rs/thread-store/src/local/helpers.rs +++ b/codex-rs/thread-store/src/local/helpers.rs @@ -14,6 +14,7 @@ use codex_protocol::protocol::GitInfo; use codex_protocol::protocol::SandboxPolicy; use codex_protocol::protocol::SessionSource; use codex_rollout::ThreadItem; +use codex_state::ThreadMetadata; use crate::StoredThread; use crate::ThreadStoreError; @@ -133,6 +134,22 @@ pub(super) fn stored_thread_from_rollout_item( }) } +pub(super) fn distinct_thread_metadata_title(metadata: &ThreadMetadata) -> Option { + let title = metadata.title.trim(); + if title.is_empty() || metadata.first_user_message.as_deref().map(str::trim) == Some(title) { + None + } else { + Some(title.to_string()) + } +} + +pub(super) fn set_thread_name_from_title(thread: &mut StoredThread, title: String) { + if title.trim().is_empty() || thread.preview.trim() == title.trim() { + return; + } + thread.name = Some(title); +} + fn parse_rfc3339(value: Option<&str>) -> Option> { DateTime::parse_from_rfc3339(value?) .ok() diff --git a/codex-rs/thread-store/src/local/list_threads.rs b/codex-rs/thread-store/src/local/list_threads.rs index d68ebd47f64..42c0e8144b7 100644 --- a/codex-rs/thread-store/src/local/list_threads.rs +++ b/codex-rs/thread-store/src/local/list_threads.rs @@ -1,8 +1,15 @@ +use std::collections::HashMap; +use std::collections::HashSet; + +use codex_protocol::ThreadId; use codex_rollout::RolloutConfig; use codex_rollout::RolloutRecorder; +use codex_rollout::find_thread_names_by_ids; use codex_rollout::parse_cursor; use super::LocalThreadStore; +use super::helpers::distinct_thread_metadata_title; +use super::helpers::set_thread_name_from_title; use super::helpers::stored_thread_from_rollout_item; use crate::ListThreadsParams; use crate::SortDirection; @@ -46,7 +53,7 @@ pub(super) async fn list_threads( .as_ref() .and_then(|cursor| serde_json::to_value(cursor).ok()) .and_then(|value| value.as_str().map(str::to_owned)); - let items = page + let mut items = page .items .into_iter() .filter_map(|item| { @@ -58,6 +65,35 @@ pub(super) async fn list_threads( }) .collect::>(); + let thread_ids = items + .iter() + .map(|thread| thread.thread_id) + .collect::>(); + let mut names = HashMap::::with_capacity(thread_ids.len()); + if let Some(state_db_ctx) = store.state_db().await { + for &thread_id in &thread_ids { + let Ok(Some(metadata)) = state_db_ctx.get_thread(thread_id).await else { + continue; + }; + if let Some(title) = distinct_thread_metadata_title(&metadata) { + names.insert(thread_id, title); + } + } + } + if names.len() < thread_ids.len() + && let Ok(legacy_names) = + find_thread_names_by_ids(store.config.codex_home.as_path(), &thread_ids).await + { + for (thread_id, title) in legacy_names { + names.entry(thread_id).or_insert(title); + } + } + for thread in &mut items { + if let Some(title) = names.get(&thread.thread_id).cloned() { + set_thread_name_from_title(thread, title); + } + } + Ok(ThreadPage { items, next_cursor }) } diff --git a/codex-rs/thread-store/src/local/read_thread.rs b/codex-rs/thread-store/src/local/read_thread.rs index 7bbc7bb3dde..f782e433826 100644 --- a/codex-rs/thread-store/src/local/read_thread.rs +++ b/codex-rs/thread-store/src/local/read_thread.rs @@ -14,7 +14,9 @@ use codex_state::StateRuntime; use codex_state::ThreadMetadata; use super::LocalThreadStore; +use super::helpers::distinct_thread_metadata_title; use super::helpers::git_info_from_parts; +use super::helpers::set_thread_name_from_title; use super::helpers::stored_thread_from_rollout_item; use crate::ReadThreadParams; use crate::StoredThread; @@ -206,7 +208,7 @@ async fn stored_thread_from_sqlite_metadata( store: &LocalThreadStore, metadata: ThreadMetadata, ) -> StoredThread { - let name = match distinct_title(&metadata) { + let name = match distinct_thread_metadata_title(&metadata) { Some(title) => Some(title), None => find_thread_name_by_id(store.config.codex_home.as_path(), &metadata.id) .await @@ -318,22 +320,6 @@ fn stored_thread_from_meta_line( } } -fn distinct_title(metadata: &ThreadMetadata) -> Option { - let title = metadata.title.trim(); - if title.is_empty() || metadata.first_user_message.as_deref().map(str::trim) == Some(title) { - None - } else { - Some(title.to_string()) - } -} - -fn set_thread_name_from_title(thread: &mut StoredThread, title: String) { - if title.trim().is_empty() || thread.preview.trim() == title.trim() { - return; - } - thread.name = Some(title); -} - fn parse_session_source(source: &str) -> SessionSource { serde_json::from_str(source) .or_else(|_| serde_json::from_value(serde_json::Value::String(source.to_string())))