From 618a620e847089651c15f320a16a4859472e8b1a Mon Sep 17 00:00:00 2001 From: Tom Wiltzius Date: Thu, 16 Apr 2026 16:11:18 -0700 Subject: [PATCH 1/2] codex: split thread read view loading --- .../app-server/src/codex_message_processor.rs | 224 ++++++++++++------ 1 file changed, 146 insertions(+), 78 deletions(-) diff --git a/codex-rs/app-server/src/codex_message_processor.rs b/codex-rs/app-server/src/codex_message_processor.rs index 0f3b8bf52268..ae60c83a685f 100644 --- a/codex-rs/app-server/src/codex_message_processor.rs +++ b/codex-rs/app-server/src/codex_message_processor.rs @@ -438,6 +438,16 @@ enum ThreadShutdownResult { TimedOut, } +struct ThreadReadViewSource { + thread: Thread, + rollout_path: Option, +} + +enum ThreadReadViewError { + InvalidRequest(String), + Internal(String), +} + impl Drop for ActiveLogin { fn drop(&mut self) { self.cancel(); @@ -3786,17 +3796,67 @@ impl CodexMessageProcessor { } }; - let loaded_thread = self.thread_manager.get_thread(thread_uuid).await.ok(); - let loaded_thread_state_db = loaded_thread.as_ref().and_then(|thread| thread.state_db()); + let thread = match self.read_thread_view(thread_uuid, include_turns).await { + Ok(thread) => thread, + Err(ThreadReadViewError::InvalidRequest(message)) => { + self.send_invalid_request_error(request_id, message).await; + return; + } + Err(ThreadReadViewError::Internal(message)) => { + self.send_internal_error(request_id, message).await; + return; + } + }; + let response = ThreadReadResponse { thread }; + self.outgoing.send_response(request_id, response).await; + } + + /// Builds the API view for `thread/read` from persisted metadata plus optional live state. + async fn read_thread_view( + &self, + thread_id: ThreadId, + include_turns: bool, + ) -> Result { + let loaded_thread = self.load_live_thread_for_read(thread_id).await; + let persisted = self + .load_persisted_thread_for_read(thread_id, include_turns, loaded_thread.as_ref()) + .await?; + let live = if persisted.is_none() { + self.load_live_thread_view(thread_id, include_turns, loaded_thread.as_ref()) + .await? + } else { + None + }; + self.merge_thread_read_views( + thread_id, + include_turns, + loaded_thread.as_ref(), + persisted, + live, + ) + .await + } + + async fn load_live_thread_for_read(&self, thread_id: ThreadId) -> Option> { + self.thread_manager.get_thread(thread_id).await.ok() + } + + async fn load_persisted_thread_for_read( + &self, + thread_id: ThreadId, + include_turns: bool, + loaded_thread: Option<&Arc>, + ) -> Result, ThreadReadViewError> { + let loaded_thread_state_db = loaded_thread.and_then(|thread| thread.state_db()); let db_summary = if let Some(state_db_ctx) = loaded_thread_state_db.as_ref() { - read_summary_from_state_db_context_by_thread_id(Some(state_db_ctx), thread_uuid).await + read_summary_from_state_db_context_by_thread_id(Some(state_db_ctx), thread_id).await } else { - read_summary_from_state_db_by_thread_id(&self.config, thread_uuid).await + read_summary_from_state_db_by_thread_id(&self.config, thread_id).await }; let mut rollout_path = db_summary.as_ref().map(|summary| summary.path.clone()); if rollout_path.is_none() || include_turns { rollout_path = - match find_thread_path_by_id_str(&self.config.codex_home, &thread_uuid.to_string()) + match find_thread_path_by_id_str(&self.config.codex_home, &thread_id.to_string()) .await { Ok(Some(path)) => Some(path), @@ -3808,73 +3868,92 @@ impl CodexMessageProcessor { } } Err(err) => { - self.send_invalid_request_error( - request_id, - format!("failed to locate thread id {thread_uuid}: {err}"), - ) - .await; - return; + return Err(ThreadReadViewError::InvalidRequest(format!( + "failed to locate thread id {thread_id}: {err}" + ))); } }; } if include_turns && rollout_path.is_none() && db_summary.is_some() { - self.send_internal_error( - request_id, - format!("failed to locate rollout for thread {thread_uuid}"), - ) - .await; - return; + return Err(ThreadReadViewError::Internal(format!( + "failed to locate rollout for thread {thread_id}" + ))); } - let mut thread = if let Some(summary) = db_summary { - summary_to_thread(summary, &self.config.cwd) - } else if let Some(rollout_path) = rollout_path.as_ref() { - let fallback_provider = self.config.model_provider_id.as_str(); - match read_summary_from_rollout(rollout_path, fallback_provider).await { - Ok(summary) => summary_to_thread(summary, &self.config.cwd), - Err(err) => { - self.send_internal_error( - request_id, - format!( - "failed to load rollout `{}` for thread {thread_uuid}: {err}", - rollout_path.display() - ), - ) - .await; - return; - } - } + if let Some(summary) = db_summary { + return Ok(Some(ThreadReadViewSource { + thread: summary_to_thread(summary, &self.config.cwd), + rollout_path, + })); + } + + let Some(rollout_path) = rollout_path else { + return Ok(None); + }; + let fallback_provider = self.config.model_provider_id.as_str(); + match read_summary_from_rollout(&rollout_path, fallback_provider).await { + Ok(summary) => Ok(Some(ThreadReadViewSource { + thread: summary_to_thread(summary, &self.config.cwd), + rollout_path: Some(rollout_path), + })), + Err(err) => Err(ThreadReadViewError::Internal(format!( + "failed to load rollout `{}` for thread {thread_id}: {err}", + rollout_path.display() + ))), + } + } + + async fn load_live_thread_view( + &self, + thread_id: ThreadId, + include_turns: bool, + loaded_thread: Option<&Arc>, + ) -> Result, ThreadReadViewError> { + let Some(thread) = loaded_thread else { + return Ok(None); + }; + let config_snapshot = thread.config_snapshot().await; + let loaded_rollout_path = thread.rollout_path(); + if include_turns && loaded_rollout_path.is_none() { + return Err(ThreadReadViewError::InvalidRequest( + "ephemeral threads do not support includeTurns".to_string(), + )); + } + let rollout_path = if include_turns { + loaded_rollout_path.clone() } else { - let Some(thread) = loaded_thread.as_ref() else { - self.send_invalid_request_error( - request_id, - format!("thread not loaded: {thread_uuid}"), - ) - .await; - return; - }; - let config_snapshot = thread.config_snapshot().await; - let loaded_rollout_path = thread.rollout_path(); - if include_turns && loaded_rollout_path.is_none() { - self.send_invalid_request_error( - request_id, - "ephemeral threads do not support includeTurns".to_string(), - ) - .await; - return; - } - if include_turns { - rollout_path = loaded_rollout_path.clone(); - } - build_thread_from_snapshot(thread_uuid, &config_snapshot, loaded_rollout_path) + None + }; + Ok(Some(ThreadReadViewSource { + thread: build_thread_from_snapshot(thread_id, &config_snapshot, loaded_rollout_path), + rollout_path, + })) + } + + async fn merge_thread_read_views( + &self, + thread_id: ThreadId, + include_turns: bool, + loaded_thread: Option<&Arc>, + persisted: Option, + live: Option, + ) -> Result { + let Some(ThreadReadViewSource { + mut thread, + rollout_path, + }) = persisted.or(live) + else { + return Err(ThreadReadViewError::InvalidRequest(format!( + "thread not loaded: {thread_id}" + ))); }; if thread.forked_from_id.is_none() && let Some(rollout_path) = rollout_path.as_ref() { thread.forked_from_id = forked_from_id_from_rollout(rollout_path).await; } - self.attach_thread_name(thread_uuid, &mut thread).await; + self.attach_thread_name(thread_id, &mut thread).await; if include_turns && let Some(rollout_path) = rollout_path.as_ref() { match read_rollout_items_from_rollout(rollout_path).await { @@ -3882,30 +3961,20 @@ impl CodexMessageProcessor { thread.turns = build_turns_from_rollout_items(&items); } Err(err) if err.kind() == std::io::ErrorKind::NotFound => { - self.send_invalid_request_error( - request_id, - format!( - "thread {thread_uuid} is not materialized yet; includeTurns is unavailable before first user message" - ), - ) - .await; - return; + return Err(ThreadReadViewError::InvalidRequest(format!( + "thread {thread_id} is not materialized yet; includeTurns is unavailable before first user message" + ))); } Err(err) => { - self.send_internal_error( - request_id, - format!( - "failed to load rollout `{}` for thread {thread_uuid}: {err}", - rollout_path.display() - ), - ) - .await; - return; + return Err(ThreadReadViewError::Internal(format!( + "failed to load rollout `{}` for thread {thread_id}: {err}", + rollout_path.display() + ))); } } } - let has_live_in_progress_turn = if let Some(loaded_thread) = loaded_thread.as_ref() { + let has_live_in_progress_turn = if let Some(loaded_thread) = loaded_thread { matches!(loaded_thread.agent_status().await, AgentStatus::Running) } else { false @@ -3921,8 +3990,7 @@ impl CodexMessageProcessor { thread_status, has_live_in_progress_turn, ); - let response = ThreadReadResponse { thread }; - self.outgoing.send_response(request_id, response).await; + Ok(thread) } pub(crate) fn thread_created_receiver(&self) -> broadcast::Receiver { From fa10413f6810f246ec1a78c60ad8d8ab3bd1203a Mon Sep 17 00:00:00 2001 From: Tom Wiltzius Date: Thu, 16 Apr 2026 16:46:13 -0700 Subject: [PATCH 2/2] codex: simplify thread read view composition --- .../app-server/src/codex_message_processor.rs | 143 +++++++++--------- 1 file changed, 71 insertions(+), 72 deletions(-) diff --git a/codex-rs/app-server/src/codex_message_processor.rs b/codex-rs/app-server/src/codex_message_processor.rs index ae60c83a685f..71e7a9ffa378 100644 --- a/codex-rs/app-server/src/codex_message_processor.rs +++ b/codex-rs/app-server/src/codex_message_processor.rs @@ -438,11 +438,6 @@ enum ThreadShutdownResult { TimedOut, } -struct ThreadReadViewSource { - thread: Thread, - rollout_path: Option, -} - enum ThreadReadViewError { InvalidRequest(String), Internal(String), @@ -3818,23 +3813,39 @@ impl CodexMessageProcessor { include_turns: bool, ) -> Result { let loaded_thread = self.load_live_thread_for_read(thread_id).await; - let persisted = self + let mut thread = if let Some(thread) = self .load_persisted_thread_for_read(thread_id, include_turns, loaded_thread.as_ref()) - .await?; - let live = if persisted.is_none() { - self.load_live_thread_view(thread_id, include_turns, loaded_thread.as_ref()) - .await? + .await? + { + thread + } else if let Some(thread) = self + .load_live_thread_view(thread_id, include_turns, loaded_thread.as_ref()) + .await? + { + thread } else { - None + return Err(ThreadReadViewError::InvalidRequest(format!( + "thread not loaded: {thread_id}" + ))); }; - self.merge_thread_read_views( - thread_id, - include_turns, - loaded_thread.as_ref(), - persisted, - live, - ) - .await + + let has_live_in_progress_turn = if let Some(loaded_thread) = loaded_thread { + matches!(loaded_thread.agent_status().await, AgentStatus::Running) + } else { + false + }; + + let thread_status = self + .thread_watch_manager + .loaded_status_for_thread(&thread.id) + .await; + + set_thread_status_and_interrupt_stale_turns( + &mut thread, + thread_status, + has_live_in_progress_turn, + ); + Ok(thread) } async fn load_live_thread_for_read(&self, thread_id: ThreadId) -> Option> { @@ -3846,7 +3857,7 @@ impl CodexMessageProcessor { thread_id: ThreadId, include_turns: bool, loaded_thread: Option<&Arc>, - ) -> Result, ThreadReadViewError> { + ) -> Result, ThreadReadViewError> { let loaded_thread_state_db = loaded_thread.and_then(|thread| thread.state_db()); let db_summary = if let Some(state_db_ctx) = loaded_thread_state_db.as_ref() { read_summary_from_state_db_context_by_thread_id(Some(state_db_ctx), thread_id).await @@ -3882,10 +3893,15 @@ impl CodexMessageProcessor { } if let Some(summary) = db_summary { - return Ok(Some(ThreadReadViewSource { - thread: summary_to_thread(summary, &self.config.cwd), - rollout_path, - })); + let mut thread = summary_to_thread(summary, &self.config.cwd); + self.apply_thread_read_rollout_fields( + thread_id, + &mut thread, + rollout_path.as_deref(), + include_turns, + ) + .await?; + return Ok(Some(thread)); } let Some(rollout_path) = rollout_path else { @@ -3893,10 +3909,17 @@ impl CodexMessageProcessor { }; let fallback_provider = self.config.model_provider_id.as_str(); match read_summary_from_rollout(&rollout_path, fallback_provider).await { - Ok(summary) => Ok(Some(ThreadReadViewSource { - thread: summary_to_thread(summary, &self.config.cwd), - rollout_path: Some(rollout_path), - })), + Ok(summary) => { + let mut thread = summary_to_thread(summary, &self.config.cwd); + self.apply_thread_read_rollout_fields( + thread_id, + &mut thread, + Some(rollout_path.as_path()), + include_turns, + ) + .await?; + Ok(Some(thread)) + } Err(err) => Err(ThreadReadViewError::Internal(format!( "failed to load rollout `{}` for thread {thread_id}: {err}", rollout_path.display() @@ -3909,7 +3932,7 @@ impl CodexMessageProcessor { thread_id: ThreadId, include_turns: bool, loaded_thread: Option<&Arc>, - ) -> Result, ThreadReadViewError> { + ) -> Result, ThreadReadViewError> { let Some(thread) = loaded_thread else { return Ok(None); }; @@ -3920,42 +3943,34 @@ impl CodexMessageProcessor { "ephemeral threads do not support includeTurns".to_string(), )); } - let rollout_path = if include_turns { - loaded_rollout_path.clone() - } else { - None - }; - Ok(Some(ThreadReadViewSource { - thread: build_thread_from_snapshot(thread_id, &config_snapshot, loaded_rollout_path), - rollout_path, - })) + + let mut thread = + build_thread_from_snapshot(thread_id, &config_snapshot, loaded_rollout_path.clone()); + self.apply_thread_read_rollout_fields( + thread_id, + &mut thread, + loaded_rollout_path.as_deref(), + include_turns, + ) + .await?; + Ok(Some(thread)) } - async fn merge_thread_read_views( + async fn apply_thread_read_rollout_fields( &self, thread_id: ThreadId, + thread: &mut Thread, + rollout_path: Option<&Path>, include_turns: bool, - loaded_thread: Option<&Arc>, - persisted: Option, - live: Option, - ) -> Result { - let Some(ThreadReadViewSource { - mut thread, - rollout_path, - }) = persisted.or(live) - else { - return Err(ThreadReadViewError::InvalidRequest(format!( - "thread not loaded: {thread_id}" - ))); - }; + ) -> Result<(), ThreadReadViewError> { if thread.forked_from_id.is_none() - && let Some(rollout_path) = rollout_path.as_ref() + && let Some(rollout_path) = rollout_path { thread.forked_from_id = forked_from_id_from_rollout(rollout_path).await; } - self.attach_thread_name(thread_id, &mut thread).await; + self.attach_thread_name(thread_id, thread).await; - if include_turns && let Some(rollout_path) = rollout_path.as_ref() { + if include_turns && let Some(rollout_path) = rollout_path { match read_rollout_items_from_rollout(rollout_path).await { Ok(items) => { thread.turns = build_turns_from_rollout_items(&items); @@ -3974,23 +3989,7 @@ impl CodexMessageProcessor { } } - let has_live_in_progress_turn = if let Some(loaded_thread) = loaded_thread { - matches!(loaded_thread.agent_status().await, AgentStatus::Running) - } else { - false - }; - - let thread_status = self - .thread_watch_manager - .loaded_status_for_thread(&thread.id) - .await; - - set_thread_status_and_interrupt_stale_turns( - &mut thread, - thread_status, - has_live_in_progress_turn, - ); - Ok(thread) + Ok(()) } pub(crate) fn thread_created_receiver(&self) -> broadcast::Receiver {