Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
255 changes: 161 additions & 94 deletions codex-rs/app-server/src/codex_message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,11 @@ enum ThreadShutdownResult {
TimedOut,
}

enum ThreadReadViewError {
InvalidRequest(String),
Internal(String),
}

impl Drop for ActiveLogin {
fn drop(&mut self) {
self.cancel();
Expand Down Expand Up @@ -3786,17 +3791,83 @@ impl CodexMessageProcessor {
}
};

let loaded_thread = self.thread_manager.get_thread(thread_uuid).await.ok();
let loaded_thread_state_db = loaded_thread.as_ref().and_then(|thread| thread.state_db());
let thread = match self.read_thread_view(thread_uuid, include_turns).await {
Ok(thread) => thread,
Err(ThreadReadViewError::InvalidRequest(message)) => {
self.send_invalid_request_error(request_id, message).await;
return;
}
Err(ThreadReadViewError::Internal(message)) => {
self.send_internal_error(request_id, message).await;
return;
}
};
let response = ThreadReadResponse { thread };
self.outgoing.send_response(request_id, response).await;
}

/// Builds the API view for `thread/read` from persisted metadata plus optional live state.
async fn read_thread_view(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we rename to make it clear that this is for the live thread? and maybe move it to its own file?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this isn't just for the live thread, it is for the combined state derived from the live thread + the persisted thread. The persisted thread state takes precedence if available, falling back to live thread state, but even in the persisted case it adds in some state from the in-memory thread manager (like "status").

The two individual underlying methods are named load_live_thread_view and load_persisted_thread_for_read which are pretty literal already but if you have suggestions for other names lmk.

I can definitely move this to another file if you think that's better though?

&self,
thread_id: ThreadId,
include_turns: bool,
) -> Result<Thread, ThreadReadViewError> {
let loaded_thread = self.load_live_thread_for_read(thread_id).await;
let mut thread = if let Some(thread) = self
.load_persisted_thread_for_read(thread_id, include_turns, loaded_thread.as_ref())
.await?
{
thread
} else if let Some(thread) = self
.load_live_thread_view(thread_id, include_turns, loaded_thread.as_ref())
.await?
{
thread
} else {
return Err(ThreadReadViewError::InvalidRequest(format!(
"thread not loaded: {thread_id}"
)));
};

let has_live_in_progress_turn = if let Some(loaded_thread) = loaded_thread {
matches!(loaded_thread.agent_status().await, AgentStatus::Running)
} else {
false
};

let thread_status = self
.thread_watch_manager
.loaded_status_for_thread(&thread.id)
.await;

set_thread_status_and_interrupt_stale_turns(
&mut thread,
thread_status,
has_live_in_progress_turn,
);
Ok(thread)
}

async fn load_live_thread_for_read(&self, thread_id: ThreadId) -> Option<Arc<CodexThread>> {
self.thread_manager.get_thread(thread_id).await.ok()
}

async fn load_persisted_thread_for_read(
&self,
thread_id: ThreadId,
include_turns: bool,
loaded_thread: Option<&Arc<CodexThread>>,
) -> Result<Option<Thread>, ThreadReadViewError> {
let loaded_thread_state_db = loaded_thread.and_then(|thread| thread.state_db());
let db_summary = if let Some(state_db_ctx) = loaded_thread_state_db.as_ref() {
read_summary_from_state_db_context_by_thread_id(Some(state_db_ctx), thread_uuid).await
read_summary_from_state_db_context_by_thread_id(Some(state_db_ctx), thread_id).await
} else {
read_summary_from_state_db_by_thread_id(&self.config, thread_uuid).await
read_summary_from_state_db_by_thread_id(&self.config, thread_id).await
};
let mut rollout_path = db_summary.as_ref().map(|summary| summary.path.clone());
if rollout_path.is_none() || include_turns {
rollout_path =
match find_thread_path_by_id_str(&self.config.codex_home, &thread_uuid.to_string())
match find_thread_path_by_id_str(&self.config.codex_home, &thread_id.to_string())
.await
{
Ok(Some(path)) => Some(path),
Expand All @@ -3808,121 +3879,117 @@ impl CodexMessageProcessor {
}
}
Err(err) => {
self.send_invalid_request_error(
request_id,
format!("failed to locate thread id {thread_uuid}: {err}"),
)
.await;
return;
return Err(ThreadReadViewError::InvalidRequest(format!(
"failed to locate thread id {thread_id}: {err}"
)));
}
};
}

if include_turns && rollout_path.is_none() && db_summary.is_some() {
self.send_internal_error(
request_id,
format!("failed to locate rollout for thread {thread_uuid}"),
return Err(ThreadReadViewError::Internal(format!(
"failed to locate rollout for thread {thread_id}"
)));
}

if let Some(summary) = db_summary {
let mut thread = summary_to_thread(summary, &self.config.cwd);
self.apply_thread_read_rollout_fields(
thread_id,
&mut thread,
rollout_path.as_deref(),
include_turns,
)
.await;
return;
.await?;
return Ok(Some(thread));
}

let mut thread = if let Some(summary) = db_summary {
summary_to_thread(summary, &self.config.cwd)
} else if let Some(rollout_path) = rollout_path.as_ref() {
let fallback_provider = self.config.model_provider_id.as_str();
match read_summary_from_rollout(rollout_path, fallback_provider).await {
Ok(summary) => summary_to_thread(summary, &self.config.cwd),
Err(err) => {
self.send_internal_error(
request_id,
format!(
"failed to load rollout `{}` for thread {thread_uuid}: {err}",
rollout_path.display()
),
)
.await;
return;
}
}
} else {
let Some(thread) = loaded_thread.as_ref() else {
self.send_invalid_request_error(
request_id,
format!("thread not loaded: {thread_uuid}"),
)
.await;
return;
};
let config_snapshot = thread.config_snapshot().await;
let loaded_rollout_path = thread.rollout_path();
if include_turns && loaded_rollout_path.is_none() {
self.send_invalid_request_error(
request_id,
"ephemeral threads do not support includeTurns".to_string(),
let Some(rollout_path) = rollout_path else {
return Ok(None);
};
let fallback_provider = self.config.model_provider_id.as_str();
match read_summary_from_rollout(&rollout_path, fallback_provider).await {
Ok(summary) => {
let mut thread = summary_to_thread(summary, &self.config.cwd);
self.apply_thread_read_rollout_fields(
thread_id,
&mut thread,
Some(rollout_path.as_path()),
include_turns,
)
.await;
return;
.await?;
Ok(Some(thread))
}
if include_turns {
rollout_path = loaded_rollout_path.clone();
}
build_thread_from_snapshot(thread_uuid, &config_snapshot, loaded_rollout_path)
};
Err(err) => Err(ThreadReadViewError::Internal(format!(
"failed to load rollout `{}` for thread {thread_id}: {err}",
rollout_path.display()
))),
}
}

async fn load_live_thread_view(
&self,
thread_id: ThreadId,
include_turns: bool,
loaded_thread: Option<&Arc<CodexThread>>,
) -> Result<Option<Thread>, ThreadReadViewError> {
let Some(thread) = loaded_thread else {
return Ok(None);
};
let config_snapshot = thread.config_snapshot().await;
let loaded_rollout_path = thread.rollout_path();
if include_turns && loaded_rollout_path.is_none() {
return Err(ThreadReadViewError::InvalidRequest(
"ephemeral threads do not support includeTurns".to_string(),
));
}

let mut thread =
build_thread_from_snapshot(thread_id, &config_snapshot, loaded_rollout_path.clone());
self.apply_thread_read_rollout_fields(
thread_id,
&mut thread,
loaded_rollout_path.as_deref(),
include_turns,
)
.await?;
Ok(Some(thread))
}

async fn apply_thread_read_rollout_fields(
&self,
thread_id: ThreadId,
thread: &mut Thread,
rollout_path: Option<&Path>,
include_turns: bool,
) -> Result<(), ThreadReadViewError> {
if thread.forked_from_id.is_none()
&& let Some(rollout_path) = rollout_path.as_ref()
&& let Some(rollout_path) = rollout_path
{
thread.forked_from_id = forked_from_id_from_rollout(rollout_path).await;
}
self.attach_thread_name(thread_uuid, &mut thread).await;
self.attach_thread_name(thread_id, thread).await;

if include_turns && let Some(rollout_path) = rollout_path.as_ref() {
if include_turns && let Some(rollout_path) = rollout_path {
match read_rollout_items_from_rollout(rollout_path).await {
Ok(items) => {
thread.turns = build_turns_from_rollout_items(&items);
}
Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
self.send_invalid_request_error(
request_id,
format!(
"thread {thread_uuid} is not materialized yet; includeTurns is unavailable before first user message"
),
)
.await;
return;
return Err(ThreadReadViewError::InvalidRequest(format!(
"thread {thread_id} is not materialized yet; includeTurns is unavailable before first user message"
)));
}
Err(err) => {
self.send_internal_error(
request_id,
format!(
"failed to load rollout `{}` for thread {thread_uuid}: {err}",
rollout_path.display()
),
)
.await;
return;
return Err(ThreadReadViewError::Internal(format!(
"failed to load rollout `{}` for thread {thread_id}: {err}",
rollout_path.display()
)));
}
}
}

let has_live_in_progress_turn = if let Some(loaded_thread) = loaded_thread.as_ref() {
matches!(loaded_thread.agent_status().await, AgentStatus::Running)
} else {
false
};

let thread_status = self
.thread_watch_manager
.loaded_status_for_thread(&thread.id)
.await;

set_thread_status_and_interrupt_stale_turns(
&mut thread,
thread_status,
has_live_in_progress_turn,
);
let response = ThreadReadResponse { thread };
self.outgoing.send_response(request_id, response).await;
Ok(())
}

pub(crate) fn thread_created_receiver(&self) -> broadcast::Receiver<ThreadId> {
Expand Down
Loading