From 75e943e792a4ede21341157db71cfe5817d27896 Mon Sep 17 00:00:00 2001 From: Owen Lin Date: Fri, 1 May 2026 13:44:06 -0700 Subject: [PATCH] feat(app-server): always return limited thread history --- .../app-server-protocol/src/protocol/v2.rs | 15 ++-- codex-rs/app-server/README.md | 4 +- .../app-server/src/bespoke_event_handling.rs | 4 +- codex-rs/app-server/src/request_processors.rs | 14 +++- .../request_processors/thread_lifecycle.rs | 16 ++-- .../request_processors/thread_processor.rs | 59 ++++--------- .../tests/suite/v2/thread_shell_command.rs | 83 ++++++++++++------- codex-rs/rollout/src/lib.rs | 1 + codex-rs/rollout/src/policy.rs | 2 +- codex-rs/rollout/src/recorder.rs | 4 +- 10 files changed, 102 insertions(+), 100 deletions(-) diff --git a/codex-rs/app-server-protocol/src/protocol/v2.rs b/codex-rs/app-server-protocol/src/protocol/v2.rs index 4eb33fa85006..aea80029a87c 100644 --- a/codex-rs/app-server-protocol/src/protocol/v2.rs +++ b/codex-rs/app-server-protocol/src/protocol/v2.rs @@ -3631,8 +3631,9 @@ pub struct ThreadStartParams { #[experimental("thread/start.experimentalRawEvents")] #[serde(default)] pub experimental_raw_events: bool, - /// If true, persist additional rollout EventMsg variants required to - /// reconstruct a richer thread history on resume/fork/read. + /// If true, persist additional EventMsg variants to the rollout file. + /// However, `thread/read`, `thread/resume`, and `thread/fork` still only + /// return the limited form of thread history for scalability reasons. #[experimental("thread/start.persistFullHistory")] #[serde(default)] pub persist_extended_history: bool, @@ -3762,8 +3763,9 @@ pub struct ThreadResumeParams { #[experimental("thread/resume.excludeTurns")] #[serde(default, skip_serializing_if = "std::ops::Not::not")] pub exclude_turns: bool, - /// If true, persist additional rollout EventMsg variants required to - /// reconstruct a richer thread history on subsequent resume/fork/read. + /// If true, persist additional EventMsg variants to the rollout file. + /// However, `thread/read`, `thread/resume`, and `thread/fork` still only + /// return the limited form of thread history for scalability reasons. #[experimental("thread/resume.persistFullHistory")] #[serde(default)] pub persist_extended_history: bool, @@ -3867,8 +3869,9 @@ pub struct ThreadForkParams { #[experimental("thread/fork.excludeTurns")] #[serde(default, skip_serializing_if = "std::ops::Not::not")] pub exclude_turns: bool, - /// If true, persist additional rollout EventMsg variants required to - /// reconstruct a richer thread history on subsequent resume/fork/read. + /// If true, persist additional EventMsg variants to the rollout file. + /// However, `thread/read`, `thread/resume`, and `thread/fork` still only + /// return the limited form of thread history for scalability reasons. #[experimental("thread/fork.persistFullHistory")] #[serde(default)] pub persist_extended_history: bool, diff --git a/codex-rs/app-server/README.md b/codex-rs/app-server/README.md index dab47ec3a293..df41d859cde5 100644 --- a/codex-rs/app-server/README.md +++ b/codex-rs/app-server/README.md @@ -307,8 +307,6 @@ To branch from a stored session, call `thread/fork` with the `thread.id`. This c Like `thread/resume`, experimental clients can pass `excludeTurns: true` to `thread/fork` to return only thread metadata in `thread.turns` and page history with `thread/turns/list`. In that mode the server skips replaying restored `thread/tokenUsage/updated`, which keeps the fork path from rebuilding turns just to attribute historical usage. -Experimental API: `thread/start`, `thread/resume`, and `thread/fork` accept `persistExtendedHistory: true` to persist a richer subset of ThreadItems for non-lossy history when calling `thread/read`, `thread/resume`, and `thread/fork` later. This does not backfill events that were not persisted previously. - ### Example: List threads (with pagination & filters) `thread/list` lets you render a history UI. Results default to `createdAt` (newest first) descending. Pass any combination of: @@ -403,7 +401,7 @@ Later, after the idle unload timeout: ### Example: Read a thread -Use `thread/read` to fetch a stored thread by id without resuming it. Pass `includeTurns` when you want the full rollout history loaded into `thread.turns`. The returned thread includes `agentNickname` and `agentRole` for AgentControl-spawned thread sub-agents when available. +Use `thread/read` to fetch a stored thread by id without resuming it. Pass `includeTurns` when you want thread history loaded into `thread.turns`. The returned thread includes `agentNickname` and `agentRole` for AgentControl-spawned thread sub-agents when available. ```json { "method": "thread/read", "id": 22, "params": { "threadId": "thr_123" } } diff --git a/codex-rs/app-server/src/bespoke_event_handling.rs b/codex-rs/app-server/src/bespoke_event_handling.rs index 29fec1b32dd2..cc1a11c01faa 100644 --- a/codex-rs/app-server/src/bespoke_event_handling.rs +++ b/codex-rs/app-server/src/bespoke_event_handling.rs @@ -2,6 +2,7 @@ use crate::error_code::internal_error; use crate::error_code::invalid_request; use crate::outgoing_message::ClientRequestResult; use crate::outgoing_message::ThreadScopedOutgoingMessageSender; +use crate::request_processors::build_api_turns_from_rollout_items; use crate::request_processors::read_rollout_items_from_rollout; use crate::request_processors::read_summary_from_rollout; use crate::request_processors::summary_to_thread; @@ -81,7 +82,6 @@ use codex_app_server_protocol::TurnStartedNotification; use codex_app_server_protocol::TurnStatus; use codex_app_server_protocol::WarningNotification; use codex_app_server_protocol::build_item_from_guardian_event; -use codex_app_server_protocol::build_turns_from_rollout_items; use codex_app_server_protocol::guardian_auto_approval_review_notification; use codex_app_server_protocol::item_event_to_server_notification; use codex_core::CodexThread; @@ -1179,7 +1179,7 @@ pub(crate) async fn apply_bespoke_event_handling( let mut thread = summary_to_thread(summary, &fallback_cwd); match read_rollout_items_from_rollout(rollout_path.as_path()).await { Ok(items) => { - thread.turns = build_turns_from_rollout_items(&items); + thread.turns = build_api_turns_from_rollout_items(&items); thread.status = thread_watch_manager .loaded_status_for_thread(&thread.id) .await; diff --git a/codex-rs/app-server/src/request_processors.rs b/codex-rs/app-server/src/request_processors.rs index 45bc9c2629a9..b1c6ab98152c 100644 --- a/codex-rs/app-server/src/request_processors.rs +++ b/codex-rs/app-server/src/request_processors.rs @@ -169,6 +169,7 @@ use codex_app_server_protocol::ThreadGoalSetParams; use codex_app_server_protocol::ThreadGoalSetResponse; use codex_app_server_protocol::ThreadGoalStatus; use codex_app_server_protocol::ThreadGoalUpdatedNotification; +use codex_app_server_protocol::ThreadHistoryBuilder; use codex_app_server_protocol::ThreadIncrementElicitationParams; use codex_app_server_protocol::ThreadIncrementElicitationResponse; use codex_app_server_protocol::ThreadInjectItemsParams; @@ -233,7 +234,6 @@ use codex_app_server_protocol::WindowsSandboxSetupCompletedNotification; use codex_app_server_protocol::WindowsSandboxSetupMode; use codex_app_server_protocol::WindowsSandboxSetupStartParams; use codex_app_server_protocol::WindowsSandboxSetupStartResponse; -use codex_app_server_protocol::build_turns_from_rollout_items; use codex_arg0::Arg0DispatchPaths; use codex_backend_client::AddCreditsNudgeCreditType as BackendAddCreditsNudgeCreditType; use codex_backend_client::Client as BackendClient; @@ -366,6 +366,8 @@ use codex_protocol::protocol::W3cTraceContext; use codex_protocol::user_input::MAX_USER_INPUT_TEXT_CHARS; use codex_protocol::user_input::UserInput as CoreInputItem; use codex_rmcp_client::perform_oauth_login_return_url; +use codex_rollout::EventPersistenceMode; +use codex_rollout::is_persisted_rollout_item; use codex_rollout::state_db::StateDbHandle; use codex_rollout::state_db::get_state_db; use codex_rollout::state_db::reconcile_rollout; @@ -479,3 +481,13 @@ use self::thread_summary::*; pub(crate) use self::thread_summary::read_rollout_items_from_rollout; pub(crate) use self::thread_summary::read_summary_from_rollout; pub(crate) use self::thread_summary::summary_to_thread; + +pub(crate) fn build_api_turns_from_rollout_items(items: &[RolloutItem]) -> Vec { + let mut builder = ThreadHistoryBuilder::new(); + for item in items { + if is_persisted_rollout_item(item, EventPersistenceMode::Limited) { + builder.handle_rollout_item(item); + } + } + builder.finish() +} diff --git a/codex-rs/app-server/src/request_processors/thread_lifecycle.rs b/codex-rs/app-server/src/request_processors/thread_lifecycle.rs index 9b7b9fd57655..9dabdea44f32 100644 --- a/codex-rs/app-server/src/request_processors/thread_lifecycle.rs +++ b/codex-rs/app-server/src/request_processors/thread_lifecycle.rs @@ -542,17 +542,12 @@ pub(super) async fn handle_pending_thread_resume_request( let request_id = pending.request_id; let connection_id = request_id.connection_id; let mut thread = pending.thread_summary; - if pending.include_turns - && let Err(message) = populate_thread_turns_from_history( + if pending.include_turns { + populate_thread_turns_from_history( &mut thread, &pending.history_items, active_turn.as_ref(), - ) - { - outgoing - .send_error(request_id, internal_error(message)) - .await; - return; + ); } let thread_status = thread_watch_manager @@ -711,13 +706,12 @@ pub(super) fn populate_thread_turns_from_history( thread: &mut Thread, items: &[RolloutItem], active_turn: Option<&Turn>, -) -> std::result::Result<(), String> { - let mut turns = build_turns_from_rollout_items(items); +) { + let mut turns = build_api_turns_from_rollout_items(items); if let Some(active_turn) = active_turn { merge_turn_history_with_active_turn(&mut turns, active_turn.clone()); } thread.turns = turns; - Ok(()) } pub(super) async fn resolve_pending_server_request( diff --git a/codex-rs/app-server/src/request_processors/thread_processor.rs b/codex-rs/app-server/src/request_processors/thread_processor.rs index 495ec08cea4a..f10652a029e3 100644 --- a/codex-rs/app-server/src/request_processors/thread_processor.rs +++ b/codex-rs/app-server/src/request_processors/thread_processor.rs @@ -2048,7 +2048,7 @@ impl ThreadRequestProcessor { let (mut thread, history) = thread_from_stored_thread(stored_thread, fallback_provider, &self.config.cwd); if include_turns && let Some(history) = history { - thread.turns = build_turns_from_rollout_items(&history.items); + thread.turns = build_api_turns_from_rollout_items(&history.items); } Ok(Some(thread)) } @@ -2113,7 +2113,7 @@ impl ThreadRequestProcessor { .load_history(/*include_archived*/ true) .await .map_err(|err| thread_read_history_load_error(thread_id, err))?; - thread.turns = build_turns_from_rollout_items(&history.items); + thread.turns = build_api_turns_from_rollout_items(&history.items); } Ok(()) @@ -2662,17 +2662,11 @@ impl ThreadRequestProcessor { } let mut summary_source_thread = source_thread; summary_source_thread.history = None; - let thread_summary = match self - .stored_thread_to_api_thread( - summary_source_thread, - config_snapshot.model_provider_id.as_str(), - /*include_turns*/ false, - ) - .await - { - Ok(thread) => thread, - Err(message) => return Err(internal_error(message)), - }; + let thread_summary = self.stored_thread_to_api_thread( + summary_source_thread, + config_snapshot.model_provider_id.as_str(), + /*include_turns*/ false, + ); let mut config_for_instruction_sources = self.config.as_ref().clone(); config_for_instruction_sources.cwd = config_snapshot.cwd.clone(); let instruction_sources = @@ -2798,12 +2792,12 @@ impl ThreadRequestProcessor { })) } - async fn stored_thread_to_api_thread( + fn stored_thread_to_api_thread( &self, stored_thread: StoredThread, fallback_provider: &str, include_turns: bool, - ) -> std::result::Result { + ) -> Thread { let (mut thread, history) = thread_from_stored_thread(stored_thread, fallback_provider, &self.config.cwd); if include_turns && let Some(history) = history { @@ -2811,9 +2805,9 @@ impl ThreadRequestProcessor { &mut thread, &history.items, /*active_turn*/ None, - )?; + ); } - Ok(thread) + thread } async fn read_stored_thread_for_new_fork( @@ -2921,7 +2915,7 @@ impl ThreadRequestProcessor { &mut thread, &history_items, /*active_turn*/ None, - )?; + ); } self.attach_thread_name(thread_id, &mut thread).await; Ok(thread) @@ -3066,7 +3060,7 @@ impl ThreadRequestProcessor { // Persistent forks materialize their own rollout immediately. Ephemeral forks stay // pathless, so they rebuild their visible history from the copied source history instead. - let mut thread = if let Some(fork_rollout_path) = session_configured.rollout_path.as_ref() { + let mut thread = if session_configured.rollout_path.is_some() { let stored_thread = self .read_stored_thread_for_new_fork(thread_id, include_turns) .await?; @@ -3075,13 +3069,6 @@ impl ThreadRequestProcessor { fallback_model_provider.as_str(), include_turns, ) - .await - .map_err(|message| { - internal_error(format!( - "failed to load rollout `{}` for thread {thread_id}: {message}", - fork_rollout_path.display() - )) - })? } else { let config_snapshot = forked_thread.config_snapshot().await; // forked thread names do not inherit the source thread name @@ -3094,8 +3081,7 @@ impl ThreadRequestProcessor { &mut thread, &history_items, /*active_turn*/ None, - ) - .map_err(internal_error)?; + ); } thread }; @@ -3466,16 +3452,6 @@ fn parse_thread_turns_cursor(cursor: &str) -> Result Vec { - let mut turns = build_turns_from_rollout_items(items); - normalize_thread_turns_status(&mut turns, loaded_status, has_live_in_progress_turn); - turns -} - fn reconstruct_thread_turns_for_turns_list( items: &[RolloutItem], loaded_status: ThreadStatus, @@ -3486,11 +3462,8 @@ fn reconstruct_thread_turns_for_turns_list( || active_turn .as_ref() .is_some_and(|turn| matches!(turn.status, TurnStatus::InProgress)); - let mut turns = reconstruct_thread_turns_from_rollout_items( - items, - loaded_status, - has_live_in_progress_turn, - ); + let mut turns = build_api_turns_from_rollout_items(items); + normalize_thread_turns_status(&mut turns, loaded_status, has_live_in_progress_turn); if let Some(active_turn) = active_turn { merge_turn_history_with_active_turn(&mut turns, active_turn); } diff --git a/codex-rs/app-server/tests/suite/v2/thread_shell_command.rs b/codex-rs/app-server/tests/suite/v2/thread_shell_command.rs index 4580c1879d1e..eebc4077dff1 100644 --- a/codex-rs/app-server/tests/suite/v2/thread_shell_command.rs +++ b/codex-rs/app-server/tests/suite/v2/thread_shell_command.rs @@ -15,6 +15,9 @@ use codex_app_server_protocol::ItemStartedNotification; use codex_app_server_protocol::JSONRPCResponse; use codex_app_server_protocol::RequestId; use codex_app_server_protocol::ServerRequest; +use codex_app_server_protocol::SortDirection; +use codex_app_server_protocol::ThreadForkParams; +use codex_app_server_protocol::ThreadForkResponse; use codex_app_server_protocol::ThreadItem; use codex_app_server_protocol::ThreadReadParams; use codex_app_server_protocol::ThreadReadResponse; @@ -22,6 +25,8 @@ use codex_app_server_protocol::ThreadShellCommandParams; use codex_app_server_protocol::ThreadShellCommandResponse; use codex_app_server_protocol::ThreadStartParams; use codex_app_server_protocol::ThreadStartResponse; +use codex_app_server_protocol::ThreadTurnsListParams; +use codex_app_server_protocol::ThreadTurnsListResponse; use codex_app_server_protocol::TurnCompletedNotification; use codex_app_server_protocol::TurnStartParams; use codex_app_server_protocol::TurnStartResponse; @@ -38,7 +43,8 @@ use tokio::time::timeout; const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10); #[tokio::test] -async fn thread_shell_command_runs_as_standalone_turn_and_persists_history() -> Result<()> { +async fn thread_shell_command_history_responses_exclude_persisted_command_executions() -> Result<()> +{ let tmp = TempDir::new()?; let codex_home = tmp.path().join("codex_home"); std::fs::create_dir(&codex_home)?; @@ -126,7 +132,7 @@ async fn thread_shell_command_runs_as_standalone_turn_and_persists_history() -> let read_id = mcp .send_thread_read_request(ThreadReadParams { - thread_id: thread.id, + thread_id: thread.id.clone(), include_turns: true, }) .await?; @@ -137,22 +143,40 @@ async fn thread_shell_command_runs_as_standalone_turn_and_persists_history() -> .await??; let ThreadReadResponse { thread, .. } = to_response::(read_resp)?; assert_eq!(thread.turns.len(), 1); - let ThreadItem::CommandExecution { - source, - status, - aggregated_output, - .. - } = thread.turns[0] - .items - .iter() - .find(|item| matches!(item, ThreadItem::CommandExecution { .. })) - .expect("expected persisted command execution item") - else { - unreachable!("matched command execution item"); - }; - assert_eq!(source, &CommandExecutionSource::UserShell); - assert_eq!(status, &CommandExecutionStatus::Completed); - assert_eq!(aggregated_output.as_deref(), Some(expected_output.as_str())); + assert_no_command_executions(&thread.turns[0].items, "thread/read"); + + let turns_list_id = mcp + .send_thread_turns_list_request(ThreadTurnsListParams { + thread_id: thread.id.clone(), + cursor: None, + limit: None, + sort_direction: Some(SortDirection::Asc), + }) + .await?; + let turns_list_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(turns_list_id)), + ) + .await??; + let ThreadTurnsListResponse { data, .. } = + to_response::(turns_list_resp)?; + assert_eq!(data.len(), 1); + assert_no_command_executions(&data[0].items, "thread/turns/list"); + + let fork_id = mcp + .send_thread_fork_request(ThreadForkParams { + thread_id: thread.id, + ..Default::default() + }) + .await?; + let fork_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(fork_id)), + ) + .await??; + let ThreadForkResponse { thread, .. } = to_response::(fork_resp)?; + assert_eq!(thread.turns.len(), 1); + assert_no_command_executions(&thread.turns[0].items, "thread/fork"); Ok(()) } @@ -307,23 +331,20 @@ async fn thread_shell_command_uses_existing_active_turn() -> Result<()> { .await??; let ThreadReadResponse { thread, .. } = to_response::(read_resp)?; assert_eq!(thread.turns.len(), 1); - assert!( - thread.turns[0].items.iter().any(|item| { - matches!( - item, - ThreadItem::CommandExecution { - source: CommandExecutionSource::UserShell, - aggregated_output, - .. - } if aggregated_output.as_deref() == Some(expected_output.as_str()) - ) - }), - "expected active-turn shell command to be persisted on the existing turn" - ); + assert_no_command_executions(&thread.turns[0].items, "thread/read"); Ok(()) } +fn assert_no_command_executions(items: &[ThreadItem], context: &str) { + assert!( + items + .iter() + .all(|item| !matches!(item, ThreadItem::CommandExecution { .. })), + "{context} should always exclude command executions from returned turns" + ); +} + fn current_shell_output_command(text: &str) -> Result<(String, String)> { let command_and_output = match default_user_shell().name() { "powershell" => { diff --git a/codex-rs/rollout/src/lib.rs b/codex-rs/rollout/src/lib.rs index 4046beb635cc..d65ddd3d5b7b 100644 --- a/codex-rs/rollout/src/lib.rs +++ b/codex-rs/rollout/src/lib.rs @@ -53,6 +53,7 @@ pub use list::read_thread_item_from_rollout; pub use list::rollout_date_parts; pub use metadata::builder_from_items; pub use policy::EventPersistenceMode; +pub use policy::is_persisted_rollout_item; pub use policy::should_persist_response_item_for_memories; pub use recorder::RolloutRecorder; pub use recorder::RolloutRecorderParams; diff --git a/codex-rs/rollout/src/policy.rs b/codex-rs/rollout/src/policy.rs index 4115aa4ce760..71f6d128a84e 100644 --- a/codex-rs/rollout/src/policy.rs +++ b/codex-rs/rollout/src/policy.rs @@ -11,7 +11,7 @@ pub enum EventPersistenceMode { /// Whether a rollout `item` should be persisted in rollout files for the /// provided persistence `mode`. -pub fn is_persisted_response_item(item: &RolloutItem, mode: EventPersistenceMode) -> bool { +pub fn is_persisted_rollout_item(item: &RolloutItem, mode: EventPersistenceMode) -> bool { match item { RolloutItem::ResponseItem(item) => should_persist_response_item(item), RolloutItem::EventMsg(ev) => should_persist_event_msg(ev, mode), diff --git a/codex-rs/rollout/src/recorder.rs b/codex-rs/rollout/src/recorder.rs index dc2f08b7abb4..ea0c7e3e36f2 100644 --- a/codex-rs/rollout/src/recorder.rs +++ b/codex-rs/rollout/src/recorder.rs @@ -45,7 +45,7 @@ use super::list::parse_cursor; use super::list::parse_timestamp_uuid_from_filename; use super::metadata; use super::policy::EventPersistenceMode; -use super::policy::is_persisted_response_item; +use super::policy::is_persisted_rollout_item; use super::session_index::find_thread_names_by_ids; use crate::config::RolloutConfigView; use crate::default_client::originator; @@ -789,7 +789,7 @@ impl RolloutRecorder { // Note that function calls may look a bit strange if they are // "fully qualified MCP tool calls," so we could consider // reformatting them in that case. - if is_persisted_response_item(item, self.event_persistence_mode) { + if is_persisted_rollout_item(item, self.event_persistence_mode) { filtered.push(sanitize_rollout_item_for_persistence( item.clone(), self.event_persistence_mode,