Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions codex-rs/app-server-protocol/src/protocol/v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3631,8 +3631,9 @@ pub struct ThreadStartParams {
#[experimental("thread/start.experimentalRawEvents")]
#[serde(default)]
pub experimental_raw_events: bool,
/// If true, persist additional rollout EventMsg variants required to
/// reconstruct a richer thread history on resume/fork/read.
/// If true, persist additional EventMsg variants to the rollout file.
/// However, `thread/read`, `thread/resume`, and `thread/fork` still only
/// return the limited form of thread history for scalability reasons.
#[experimental("thread/start.persistFullHistory")]
#[serde(default)]
pub persist_extended_history: bool,
Expand Down Expand Up @@ -3762,8 +3763,9 @@ pub struct ThreadResumeParams {
#[experimental("thread/resume.excludeTurns")]
#[serde(default, skip_serializing_if = "std::ops::Not::not")]
pub exclude_turns: bool,
/// If true, persist additional rollout EventMsg variants required to
/// reconstruct a richer thread history on subsequent resume/fork/read.
/// If true, persist additional EventMsg variants to the rollout file.
/// However, `thread/read`, `thread/resume`, and `thread/fork` still only
/// return the limited form of thread history for scalability reasons.
#[experimental("thread/resume.persistFullHistory")]
#[serde(default)]
pub persist_extended_history: bool,
Expand Down Expand Up @@ -3867,8 +3869,9 @@ pub struct ThreadForkParams {
#[experimental("thread/fork.excludeTurns")]
#[serde(default, skip_serializing_if = "std::ops::Not::not")]
pub exclude_turns: bool,
/// If true, persist additional rollout EventMsg variants required to
/// reconstruct a richer thread history on subsequent resume/fork/read.
/// If true, persist additional EventMsg variants to the rollout file.
/// However, `thread/read`, `thread/resume`, and `thread/fork` still only
/// return the limited form of thread history for scalability reasons.
#[experimental("thread/fork.persistFullHistory")]
#[serde(default)]
pub persist_extended_history: bool,
Expand Down
4 changes: 1 addition & 3 deletions codex-rs/app-server/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -307,8 +307,6 @@ To branch from a stored session, call `thread/fork` with the `thread.id`. This c

Like `thread/resume`, experimental clients can pass `excludeTurns: true` to `thread/fork` to return only thread metadata in `thread.turns` and page history with `thread/turns/list`. In that mode the server skips replaying restored `thread/tokenUsage/updated`, which keeps the fork path from rebuilding turns just to attribute historical usage.

Experimental API: `thread/start`, `thread/resume`, and `thread/fork` accept `persistExtendedHistory: true` to persist a richer subset of ThreadItems for non-lossy history when calling `thread/read`, `thread/resume`, and `thread/fork` later. This does not backfill events that were not persisted previously.

### Example: List threads (with pagination & filters)

`thread/list` lets you render a history UI. Results default to `createdAt` (newest first) descending. Pass any combination of:
Expand Down Expand Up @@ -403,7 +401,7 @@ Later, after the idle unload timeout:

### Example: Read a thread

Use `thread/read` to fetch a stored thread by id without resuming it. Pass `includeTurns` when you want the full rollout history loaded into `thread.turns`. The returned thread includes `agentNickname` and `agentRole` for AgentControl-spawned thread sub-agents when available.
Use `thread/read` to fetch a stored thread by id without resuming it. Pass `includeTurns` when you want thread history loaded into `thread.turns`. The returned thread includes `agentNickname` and `agentRole` for AgentControl-spawned thread sub-agents when available.

```json
{ "method": "thread/read", "id": 22, "params": { "threadId": "thr_123" } }
Expand Down
4 changes: 2 additions & 2 deletions codex-rs/app-server/src/bespoke_event_handling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::error_code::internal_error;
use crate::error_code::invalid_request;
use crate::outgoing_message::ClientRequestResult;
use crate::outgoing_message::ThreadScopedOutgoingMessageSender;
use crate::request_processors::build_api_turns_from_rollout_items;
use crate::request_processors::read_rollout_items_from_rollout;
use crate::request_processors::read_summary_from_rollout;
use crate::request_processors::summary_to_thread;
Expand Down Expand Up @@ -81,7 +82,6 @@ use codex_app_server_protocol::TurnStartedNotification;
use codex_app_server_protocol::TurnStatus;
use codex_app_server_protocol::WarningNotification;
use codex_app_server_protocol::build_item_from_guardian_event;
use codex_app_server_protocol::build_turns_from_rollout_items;
use codex_app_server_protocol::guardian_auto_approval_review_notification;
use codex_app_server_protocol::item_event_to_server_notification;
use codex_core::CodexThread;
Expand Down Expand Up @@ -1179,7 +1179,7 @@ pub(crate) async fn apply_bespoke_event_handling(
let mut thread = summary_to_thread(summary, &fallback_cwd);
match read_rollout_items_from_rollout(rollout_path.as_path()).await {
Ok(items) => {
thread.turns = build_turns_from_rollout_items(&items);
thread.turns = build_api_turns_from_rollout_items(&items);
thread.status = thread_watch_manager
.loaded_status_for_thread(&thread.id)
.await;
Expand Down
14 changes: 13 additions & 1 deletion codex-rs/app-server/src/request_processors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ use codex_app_server_protocol::ThreadGoalSetParams;
use codex_app_server_protocol::ThreadGoalSetResponse;
use codex_app_server_protocol::ThreadGoalStatus;
use codex_app_server_protocol::ThreadGoalUpdatedNotification;
use codex_app_server_protocol::ThreadHistoryBuilder;
use codex_app_server_protocol::ThreadIncrementElicitationParams;
use codex_app_server_protocol::ThreadIncrementElicitationResponse;
use codex_app_server_protocol::ThreadInjectItemsParams;
Expand Down Expand Up @@ -233,7 +234,6 @@ use codex_app_server_protocol::WindowsSandboxSetupCompletedNotification;
use codex_app_server_protocol::WindowsSandboxSetupMode;
use codex_app_server_protocol::WindowsSandboxSetupStartParams;
use codex_app_server_protocol::WindowsSandboxSetupStartResponse;
use codex_app_server_protocol::build_turns_from_rollout_items;
use codex_arg0::Arg0DispatchPaths;
use codex_backend_client::AddCreditsNudgeCreditType as BackendAddCreditsNudgeCreditType;
use codex_backend_client::Client as BackendClient;
Expand Down Expand Up @@ -366,6 +366,8 @@ use codex_protocol::protocol::W3cTraceContext;
use codex_protocol::user_input::MAX_USER_INPUT_TEXT_CHARS;
use codex_protocol::user_input::UserInput as CoreInputItem;
use codex_rmcp_client::perform_oauth_login_return_url;
use codex_rollout::EventPersistenceMode;
use codex_rollout::is_persisted_rollout_item;
use codex_rollout::state_db::StateDbHandle;
use codex_rollout::state_db::get_state_db;
use codex_rollout::state_db::reconcile_rollout;
Expand Down Expand Up @@ -479,3 +481,13 @@ use self::thread_summary::*;
pub(crate) use self::thread_summary::read_rollout_items_from_rollout;
pub(crate) use self::thread_summary::read_summary_from_rollout;
pub(crate) use self::thread_summary::summary_to_thread;

pub(crate) fn build_api_turns_from_rollout_items(items: &[RolloutItem]) -> Vec<Turn> {
let mut builder = ThreadHistoryBuilder::new();
for item in items {
if is_persisted_rollout_item(item, EventPersistenceMode::Limited) {
builder.handle_rollout_item(item);
}
}
builder.finish()
}
16 changes: 5 additions & 11 deletions codex-rs/app-server/src/request_processors/thread_lifecycle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -542,17 +542,12 @@ pub(super) async fn handle_pending_thread_resume_request(
let request_id = pending.request_id;
let connection_id = request_id.connection_id;
let mut thread = pending.thread_summary;
if pending.include_turns
&& let Err(message) = populate_thread_turns_from_history(
if pending.include_turns {
populate_thread_turns_from_history(
&mut thread,
&pending.history_items,
active_turn.as_ref(),
)
{
outgoing
.send_error(request_id, internal_error(message))
.await;
return;
);
}

let thread_status = thread_watch_manager
Expand Down Expand Up @@ -711,13 +706,12 @@ pub(super) fn populate_thread_turns_from_history(
thread: &mut Thread,
items: &[RolloutItem],
active_turn: Option<&Turn>,
) -> std::result::Result<(), String> {
let mut turns = build_turns_from_rollout_items(items);
) {
let mut turns = build_api_turns_from_rollout_items(items);
if let Some(active_turn) = active_turn {
merge_turn_history_with_active_turn(&mut turns, active_turn.clone());
}
thread.turns = turns;
Ok(())
}

pub(super) async fn resolve_pending_server_request(
Expand Down
59 changes: 16 additions & 43 deletions codex-rs/app-server/src/request_processors/thread_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2048,7 +2048,7 @@ impl ThreadRequestProcessor {
let (mut thread, history) =
thread_from_stored_thread(stored_thread, fallback_provider, &self.config.cwd);
if include_turns && let Some(history) = history {
thread.turns = build_turns_from_rollout_items(&history.items);
thread.turns = build_api_turns_from_rollout_items(&history.items);
}
Ok(Some(thread))
}
Expand Down Expand Up @@ -2113,7 +2113,7 @@ impl ThreadRequestProcessor {
.load_history(/*include_archived*/ true)
.await
.map_err(|err| thread_read_history_load_error(thread_id, err))?;
thread.turns = build_turns_from_rollout_items(&history.items);
thread.turns = build_api_turns_from_rollout_items(&history.items);
}

Ok(())
Expand Down Expand Up @@ -2662,17 +2662,11 @@ impl ThreadRequestProcessor {
}
let mut summary_source_thread = source_thread;
summary_source_thread.history = None;
let thread_summary = match self
.stored_thread_to_api_thread(
summary_source_thread,
config_snapshot.model_provider_id.as_str(),
/*include_turns*/ false,
)
.await
{
Ok(thread) => thread,
Err(message) => return Err(internal_error(message)),
};
let thread_summary = self.stored_thread_to_api_thread(
summary_source_thread,
config_snapshot.model_provider_id.as_str(),
/*include_turns*/ false,
);
let mut config_for_instruction_sources = self.config.as_ref().clone();
config_for_instruction_sources.cwd = config_snapshot.cwd.clone();
let instruction_sources =
Expand Down Expand Up @@ -2798,22 +2792,22 @@ impl ThreadRequestProcessor {
}))
}

async fn stored_thread_to_api_thread(
fn stored_thread_to_api_thread(
&self,
stored_thread: StoredThread,
fallback_provider: &str,
include_turns: bool,
) -> std::result::Result<Thread, String> {
) -> Thread {
let (mut thread, history) =
thread_from_stored_thread(stored_thread, fallback_provider, &self.config.cwd);
if include_turns && let Some(history) = history {
populate_thread_turns_from_history(
&mut thread,
&history.items,
/*active_turn*/ None,
)?;
);
}
Ok(thread)
thread
}

async fn read_stored_thread_for_new_fork(
Expand Down Expand Up @@ -2921,7 +2915,7 @@ impl ThreadRequestProcessor {
&mut thread,
&history_items,
/*active_turn*/ None,
)?;
);
}
self.attach_thread_name(thread_id, &mut thread).await;
Ok(thread)
Expand Down Expand Up @@ -3066,7 +3060,7 @@ impl ThreadRequestProcessor {

// Persistent forks materialize their own rollout immediately. Ephemeral forks stay
// pathless, so they rebuild their visible history from the copied source history instead.
let mut thread = if let Some(fork_rollout_path) = session_configured.rollout_path.as_ref() {
let mut thread = if session_configured.rollout_path.is_some() {
let stored_thread = self
.read_stored_thread_for_new_fork(thread_id, include_turns)
.await?;
Expand All @@ -3075,13 +3069,6 @@ impl ThreadRequestProcessor {
fallback_model_provider.as_str(),
include_turns,
)
.await
.map_err(|message| {
internal_error(format!(
"failed to load rollout `{}` for thread {thread_id}: {message}",
fork_rollout_path.display()
))
})?
} else {
let config_snapshot = forked_thread.config_snapshot().await;
// forked thread names do not inherit the source thread name
Expand All @@ -3094,8 +3081,7 @@ impl ThreadRequestProcessor {
&mut thread,
&history_items,
/*active_turn*/ None,
)
.map_err(internal_error)?;
);
}
thread
};
Expand Down Expand Up @@ -3466,16 +3452,6 @@ fn parse_thread_turns_cursor(cursor: &str) -> Result<ThreadTurnsCursor, JSONRPCE
})
}

fn reconstruct_thread_turns_from_rollout_items(
items: &[RolloutItem],
loaded_status: ThreadStatus,
has_live_in_progress_turn: bool,
) -> Vec<Turn> {
let mut turns = build_turns_from_rollout_items(items);
normalize_thread_turns_status(&mut turns, loaded_status, has_live_in_progress_turn);
turns
}

fn reconstruct_thread_turns_for_turns_list(
items: &[RolloutItem],
loaded_status: ThreadStatus,
Expand All @@ -3486,11 +3462,8 @@ fn reconstruct_thread_turns_for_turns_list(
|| active_turn
.as_ref()
.is_some_and(|turn| matches!(turn.status, TurnStatus::InProgress));
let mut turns = reconstruct_thread_turns_from_rollout_items(
items,
loaded_status,
has_live_in_progress_turn,
);
let mut turns = build_api_turns_from_rollout_items(items);
normalize_thread_turns_status(&mut turns, loaded_status, has_live_in_progress_turn);
if let Some(active_turn) = active_turn {
merge_turn_history_with_active_turn(&mut turns, active_turn);
}
Expand Down
Loading
Loading