diff --git a/codex-rs/app-server/src/codex_message_processor.rs b/codex-rs/app-server/src/codex_message_processor.rs index 0f3b8bf52268..71e7a9ffa378 100644 --- a/codex-rs/app-server/src/codex_message_processor.rs +++ b/codex-rs/app-server/src/codex_message_processor.rs @@ -438,6 +438,11 @@ enum ThreadShutdownResult { TimedOut, } +enum ThreadReadViewError { + InvalidRequest(String), + Internal(String), +} + impl Drop for ActiveLogin { fn drop(&mut self) { self.cancel(); @@ -3786,17 +3791,83 @@ impl CodexMessageProcessor { } }; - let loaded_thread = self.thread_manager.get_thread(thread_uuid).await.ok(); - let loaded_thread_state_db = loaded_thread.as_ref().and_then(|thread| thread.state_db()); + let thread = match self.read_thread_view(thread_uuid, include_turns).await { + Ok(thread) => thread, + Err(ThreadReadViewError::InvalidRequest(message)) => { + self.send_invalid_request_error(request_id, message).await; + return; + } + Err(ThreadReadViewError::Internal(message)) => { + self.send_internal_error(request_id, message).await; + return; + } + }; + let response = ThreadReadResponse { thread }; + self.outgoing.send_response(request_id, response).await; + } + + /// Builds the API view for `thread/read` from persisted metadata plus optional live state. + async fn read_thread_view( + &self, + thread_id: ThreadId, + include_turns: bool, + ) -> Result { + let loaded_thread = self.load_live_thread_for_read(thread_id).await; + let mut thread = if let Some(thread) = self + .load_persisted_thread_for_read(thread_id, include_turns, loaded_thread.as_ref()) + .await? + { + thread + } else if let Some(thread) = self + .load_live_thread_view(thread_id, include_turns, loaded_thread.as_ref()) + .await? + { + thread + } else { + return Err(ThreadReadViewError::InvalidRequest(format!( + "thread not loaded: {thread_id}" + ))); + }; + + let has_live_in_progress_turn = if let Some(loaded_thread) = loaded_thread { + matches!(loaded_thread.agent_status().await, AgentStatus::Running) + } else { + false + }; + + let thread_status = self + .thread_watch_manager + .loaded_status_for_thread(&thread.id) + .await; + + set_thread_status_and_interrupt_stale_turns( + &mut thread, + thread_status, + has_live_in_progress_turn, + ); + Ok(thread) + } + + async fn load_live_thread_for_read(&self, thread_id: ThreadId) -> Option> { + self.thread_manager.get_thread(thread_id).await.ok() + } + + async fn load_persisted_thread_for_read( + &self, + thread_id: ThreadId, + include_turns: bool, + loaded_thread: Option<&Arc>, + ) -> Result, ThreadReadViewError> { + let loaded_thread_state_db = loaded_thread.and_then(|thread| thread.state_db()); let db_summary = if let Some(state_db_ctx) = loaded_thread_state_db.as_ref() { - read_summary_from_state_db_context_by_thread_id(Some(state_db_ctx), thread_uuid).await + read_summary_from_state_db_context_by_thread_id(Some(state_db_ctx), thread_id).await } else { - read_summary_from_state_db_by_thread_id(&self.config, thread_uuid).await + read_summary_from_state_db_by_thread_id(&self.config, thread_id).await }; let mut rollout_path = db_summary.as_ref().map(|summary| summary.path.clone()); if rollout_path.is_none() || include_turns { rollout_path = - match find_thread_path_by_id_str(&self.config.codex_home, &thread_uuid.to_string()) + match find_thread_path_by_id_str(&self.config.codex_home, &thread_id.to_string()) .await { Ok(Some(path)) => Some(path), @@ -3808,121 +3879,117 @@ impl CodexMessageProcessor { } } Err(err) => { - self.send_invalid_request_error( - request_id, - format!("failed to locate thread id {thread_uuid}: {err}"), - ) - .await; - return; + return Err(ThreadReadViewError::InvalidRequest(format!( + "failed to locate thread id {thread_id}: {err}" + ))); } }; } if include_turns && rollout_path.is_none() && db_summary.is_some() { - self.send_internal_error( - request_id, - format!("failed to locate rollout for thread {thread_uuid}"), + return Err(ThreadReadViewError::Internal(format!( + "failed to locate rollout for thread {thread_id}" + ))); + } + + if let Some(summary) = db_summary { + let mut thread = summary_to_thread(summary, &self.config.cwd); + self.apply_thread_read_rollout_fields( + thread_id, + &mut thread, + rollout_path.as_deref(), + include_turns, ) - .await; - return; + .await?; + return Ok(Some(thread)); } - let mut thread = if let Some(summary) = db_summary { - summary_to_thread(summary, &self.config.cwd) - } else if let Some(rollout_path) = rollout_path.as_ref() { - let fallback_provider = self.config.model_provider_id.as_str(); - match read_summary_from_rollout(rollout_path, fallback_provider).await { - Ok(summary) => summary_to_thread(summary, &self.config.cwd), - Err(err) => { - self.send_internal_error( - request_id, - format!( - "failed to load rollout `{}` for thread {thread_uuid}: {err}", - rollout_path.display() - ), - ) - .await; - return; - } - } - } else { - let Some(thread) = loaded_thread.as_ref() else { - self.send_invalid_request_error( - request_id, - format!("thread not loaded: {thread_uuid}"), - ) - .await; - return; - }; - let config_snapshot = thread.config_snapshot().await; - let loaded_rollout_path = thread.rollout_path(); - if include_turns && loaded_rollout_path.is_none() { - self.send_invalid_request_error( - request_id, - "ephemeral threads do not support includeTurns".to_string(), + let Some(rollout_path) = rollout_path else { + return Ok(None); + }; + let fallback_provider = self.config.model_provider_id.as_str(); + match read_summary_from_rollout(&rollout_path, fallback_provider).await { + Ok(summary) => { + let mut thread = summary_to_thread(summary, &self.config.cwd); + self.apply_thread_read_rollout_fields( + thread_id, + &mut thread, + Some(rollout_path.as_path()), + include_turns, ) - .await; - return; + .await?; + Ok(Some(thread)) } - if include_turns { - rollout_path = loaded_rollout_path.clone(); - } - build_thread_from_snapshot(thread_uuid, &config_snapshot, loaded_rollout_path) - }; + Err(err) => Err(ThreadReadViewError::Internal(format!( + "failed to load rollout `{}` for thread {thread_id}: {err}", + rollout_path.display() + ))), + } + } + + async fn load_live_thread_view( + &self, + thread_id: ThreadId, + include_turns: bool, + loaded_thread: Option<&Arc>, + ) -> Result, ThreadReadViewError> { + let Some(thread) = loaded_thread else { + return Ok(None); + }; + let config_snapshot = thread.config_snapshot().await; + let loaded_rollout_path = thread.rollout_path(); + if include_turns && loaded_rollout_path.is_none() { + return Err(ThreadReadViewError::InvalidRequest( + "ephemeral threads do not support includeTurns".to_string(), + )); + } + + let mut thread = + build_thread_from_snapshot(thread_id, &config_snapshot, loaded_rollout_path.clone()); + self.apply_thread_read_rollout_fields( + thread_id, + &mut thread, + loaded_rollout_path.as_deref(), + include_turns, + ) + .await?; + Ok(Some(thread)) + } + + async fn apply_thread_read_rollout_fields( + &self, + thread_id: ThreadId, + thread: &mut Thread, + rollout_path: Option<&Path>, + include_turns: bool, + ) -> Result<(), ThreadReadViewError> { if thread.forked_from_id.is_none() - && let Some(rollout_path) = rollout_path.as_ref() + && let Some(rollout_path) = rollout_path { thread.forked_from_id = forked_from_id_from_rollout(rollout_path).await; } - self.attach_thread_name(thread_uuid, &mut thread).await; + self.attach_thread_name(thread_id, thread).await; - if include_turns && let Some(rollout_path) = rollout_path.as_ref() { + if include_turns && let Some(rollout_path) = rollout_path { match read_rollout_items_from_rollout(rollout_path).await { Ok(items) => { thread.turns = build_turns_from_rollout_items(&items); } Err(err) if err.kind() == std::io::ErrorKind::NotFound => { - self.send_invalid_request_error( - request_id, - format!( - "thread {thread_uuid} is not materialized yet; includeTurns is unavailable before first user message" - ), - ) - .await; - return; + return Err(ThreadReadViewError::InvalidRequest(format!( + "thread {thread_id} is not materialized yet; includeTurns is unavailable before first user message" + ))); } Err(err) => { - self.send_internal_error( - request_id, - format!( - "failed to load rollout `{}` for thread {thread_uuid}: {err}", - rollout_path.display() - ), - ) - .await; - return; + return Err(ThreadReadViewError::Internal(format!( + "failed to load rollout `{}` for thread {thread_id}: {err}", + rollout_path.display() + ))); } } } - let has_live_in_progress_turn = if let Some(loaded_thread) = loaded_thread.as_ref() { - matches!(loaded_thread.agent_status().await, AgentStatus::Running) - } else { - false - }; - - let thread_status = self - .thread_watch_manager - .loaded_status_for_thread(&thread.id) - .await; - - set_thread_status_and_interrupt_stale_turns( - &mut thread, - thread_status, - has_live_in_progress_turn, - ); - let response = ThreadReadResponse { thread }; - self.outgoing.send_response(request_id, response).await; + Ok(()) } pub(crate) fn thread_created_receiver(&self) -> broadcast::Receiver {