diff --git a/codex-rs/core/src/chat_completions.rs b/codex-rs/core/src/chat_completions.rs index a60db89d83..785a4d4ce5 100644 --- a/codex-rs/core/src/chat_completions.rs +++ b/codex-rs/core/src/chat_completions.rs @@ -81,6 +81,7 @@ pub(crate) async fn stream_chat_completions( ResponseItem::CustomToolCallOutput { .. } => {} ResponseItem::WebSearchCall { .. } => {} ResponseItem::GhostSnapshot { .. } => {} + ResponseItem::CompactionSummary { .. } => {} } } @@ -320,7 +321,8 @@ pub(crate) async fn stream_chat_completions( } ResponseItem::Reasoning { .. } | ResponseItem::WebSearchCall { .. } - | ResponseItem::Other => { + | ResponseItem::Other + | ResponseItem::CompactionSummary { .. } => { // Omit these items from the conversation history. continue; } diff --git a/codex-rs/core/src/client.rs b/codex-rs/core/src/client.rs index 4614458646..13c277a776 100644 --- a/codex-rs/core/src/client.rs +++ b/codex-rs/core/src/client.rs @@ -26,6 +26,7 @@ use tokio::sync::mpsc; use tokio::time::timeout; use tokio_util::io::ReaderStream; use tracing::debug; +use tracing::enabled; use tracing::trace; use tracing::warn; @@ -78,6 +79,18 @@ struct Error { resets_at: Option, } +#[derive(Debug, Serialize)] +struct CompactHistoryRequest<'a> { + model: &'a str, + input: &'a [ResponseItem], + instructions: &'a str, +} + +#[derive(Debug, Deserialize)] +struct CompactHistoryResponse { + output: Vec, +} + #[derive(Debug, Clone)] pub struct ModelClient { config: Arc, @@ -507,6 +520,70 @@ impl ModelClient { pub fn get_auth_manager(&self) -> Option> { self.auth_manager.clone() } + + pub async fn compact_conversation_history(&self, prompt: &Prompt) -> Result> { + if prompt.input.is_empty() { + return Ok(Vec::new()); + } + let auth_manager = self.auth_manager.clone(); + let auth = auth_manager.as_ref().and_then(|m| m.auth()); + let mut req_builder = self + .provider + .create_compact_request_builder(&self.client, &auth) + .await?; + if let SessionSource::SubAgent(sub) = &self.session_source { + let subagent = if let crate::protocol::SubAgentSource::Other(label) = sub { + label.clone() + } else { + serde_json::to_value(sub) + .ok() + .and_then(|v| v.as_str().map(std::string::ToString::to_string)) + .unwrap_or_else(|| "other".to_string()) + }; + req_builder = req_builder.header("x-openai-subagent", subagent); + } + if let Some(auth) = auth.as_ref() + && auth.mode == AuthMode::ChatGPT + && let Some(account_id) = auth.get_account_id() + { + req_builder = req_builder.header("chatgpt-account-id", account_id); + } + let payload = CompactHistoryRequest { + model: &self.config.model, + input: &prompt.input, + instructions: &prompt.get_full_instructions(&self.config.model_family), + }; + + if enabled!(tracing::Level::TRACE) { + trace!( + "POST to {}: {}", + self.provider + .get_compact_url(&auth) + .unwrap_or("".to_string()), + serde_json::to_value(&payload).unwrap_or_default() + ); + } + + let response = req_builder + .json(&payload) + .send() + .await + .map_err(|source| CodexErr::ConnectionFailed(ConnectionFailedError { source }))?; + let status = response.status(); + let body = response + .text() + .await + .map_err(|source| CodexErr::ConnectionFailed(ConnectionFailedError { source }))?; + if !status.is_success() { + return Err(CodexErr::UnexpectedStatus(UnexpectedResponseError { + status, + body, + request_id: None, + })); + } + let CompactHistoryResponse { output } = serde_json::from_str(&body)?; + Ok(output) + } } enum StreamAttemptError { diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 3196176823..ecfefa7927 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -55,6 +55,7 @@ use crate::ModelProviderInfo; use crate::client::ModelClient; use crate::client_common::Prompt; use crate::client_common::ResponseEvent; +use crate::compact::collect_user_messages; use crate::config::Config; use crate::config::types::ShellEnvironmentPolicy; use crate::context_manager::ContextManager; @@ -63,10 +64,6 @@ use crate::error::CodexErr; use crate::error::Result as CodexResult; #[cfg(test)] use crate::exec::StreamOutput; -// Removed: legacy executor wiring replaced by ToolOrchestrator flows. -// legacy normalize_exec_result no longer used after orchestrator migration -use crate::compact::build_compacted_history; -use crate::compact::collect_user_messages; use crate::mcp::auth::compute_auth_statuses; use crate::mcp_connection_manager::McpConnectionManager; use crate::model_family::find_family_for_model; @@ -956,13 +953,18 @@ impl Session { } RolloutItem::Compacted(compacted) => { let snapshot = history.get_history(); - let user_messages = collect_user_messages(&snapshot); - let rebuilt = build_compacted_history( - self.build_initial_context(turn_context), - &user_messages, - &compacted.message, - ); - history.replace(rebuilt); + // TODO(jif) clean + if let Some(replacement) = &compacted.replacement_history { + history.replace(replacement.clone()); + } else { + let user_messages = collect_user_messages(&snapshot); + let rebuilt = compact::build_compacted_history( + self.build_initial_context(turn_context), + &user_messages, + &compacted.message, + ); + history.replace(rebuilt); + } } _ => {} } @@ -990,6 +992,15 @@ impl Session { self.persist_rollout_items(&rollout_items).await; } + pub async fn enabled(&self, feature: Feature) -> bool { + self.state + .lock() + .await + .session_configuration + .features + .enabled(feature) + } + async fn send_raw_response_items(&self, turn_context: &TurnContext, items: &[ResponseItem]) { for item in items { self.send_event( @@ -1167,14 +1178,7 @@ impl Session { turn_context: Arc, cancellation_token: CancellationToken, ) { - if !self - .state - .lock() - .await - .session_configuration - .features - .enabled(Feature::GhostCommit) - { + if !self.enabled(Feature::GhostCommit).await { return; } let token = match turn_context.tool_call_gate.subscribe().await { @@ -1415,7 +1419,7 @@ mod handlers { use codex_protocol::protocol::ReviewDecision; use codex_protocol::protocol::ReviewRequest; use codex_protocol::protocol::TurnAbortReason; - use codex_protocol::user_input::UserInput; + use std::sync::Arc; use tracing::info; use tracing::warn; @@ -1623,16 +1627,9 @@ mod handlers { let turn_context = sess .new_turn_with_sub_id(sub_id, SessionSettingsUpdate::default()) .await; - // Attempt to inject input into current task - if let Err(items) = sess - .inject_input(vec![UserInput::Text { - text: turn_context.compact_prompt().to_string(), - }]) - .await - { - sess.spawn_task(Arc::clone(&turn_context), items, CompactTask) - .await; - } + + sess.spawn_task(Arc::clone(&turn_context), vec![], CompactTask) + .await; } pub async fn shutdown(sess: &Arc, sub_id: String) -> bool { @@ -2896,7 +2893,7 @@ mod tests { let summary1 = "summary one"; let snapshot1 = live_history.get_history(); let user_messages1 = collect_user_messages(&snapshot1); - let rebuilt1 = build_compacted_history( + let rebuilt1 = compact::build_compacted_history( session.build_initial_context(turn_context), &user_messages1, summary1, @@ -2904,6 +2901,7 @@ mod tests { live_history.replace(rebuilt1); rollout_items.push(RolloutItem::Compacted(CompactedItem { message: summary1.to_string(), + replacement_history: None, })); let user2 = ResponseItem::Message { @@ -2929,7 +2927,7 @@ mod tests { let summary2 = "summary two"; let snapshot2 = live_history.get_history(); let user_messages2 = collect_user_messages(&snapshot2); - let rebuilt2 = build_compacted_history( + let rebuilt2 = compact::build_compacted_history( session.build_initial_context(turn_context), &user_messages2, summary2, @@ -2937,6 +2935,7 @@ mod tests { live_history.replace(rebuilt2); rollout_items.push(RolloutItem::Compacted(CompactedItem { message: summary2.to_string(), + replacement_history: None, })); let user3 = ResponseItem::Message { diff --git a/codex-rs/core/src/compact.rs b/codex-rs/core/src/compact.rs index 6908faeec2..0495c161d5 100644 --- a/codex-rs/core/src/compact.rs +++ b/codex-rs/core/src/compact.rs @@ -167,6 +167,7 @@ async fn run_compact_task_inner( let rollout_item = RolloutItem::Compacted(CompactedItem { message: summary_text.clone(), + replacement_history: None, }); sess.persist_rollout_items(&[rollout_item]).await; diff --git a/codex-rs/core/src/compact_remote.rs b/codex-rs/core/src/compact_remote.rs new file mode 100644 index 0000000000..2c7d57eff2 --- /dev/null +++ b/codex-rs/core/src/compact_remote.rs @@ -0,0 +1,97 @@ +use std::sync::Arc; + +use crate::Prompt; +use crate::codex::Session; +use crate::codex::TurnContext; +use crate::error::Result as CodexResult; +use crate::protocol::AgentMessageEvent; +use crate::protocol::CompactedItem; +use crate::protocol::ErrorEvent; +use crate::protocol::EventMsg; +use crate::protocol::RolloutItem; +use crate::protocol::TaskStartedEvent; +use codex_protocol::models::ResponseInputItem; +use codex_protocol::models::ResponseItem; +use codex_protocol::user_input::UserInput; + +pub(crate) async fn run_remote_compact_task( + sess: Arc, + turn_context: Arc, + input: Vec, +) -> Option { + let start_event = EventMsg::TaskStarted(TaskStartedEvent { + model_context_window: turn_context.client.get_model_context_window(), + }); + sess.send_event(&turn_context, start_event).await; + + match run_remote_compact_task_inner(&sess, &turn_context, input).await { + Ok(()) => { + let event = EventMsg::AgentMessage(AgentMessageEvent { + message: "Compact task completed".to_string(), + }); + sess.send_event(&turn_context, event).await; + } + Err(err) => { + let event = EventMsg::Error(ErrorEvent { + message: err.to_string(), + }); + sess.send_event(&turn_context, event).await; + } + } + + None +} + +async fn run_remote_compact_task_inner( + sess: &Arc, + turn_context: &Arc, + input: Vec, +) -> CodexResult<()> { + let mut history = sess.clone_history().await; + if !input.is_empty() { + let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input); + history.record_items(&[initial_input_for_turn.into()]); + } + + let prompt = Prompt { + input: history.get_history_for_prompt(), + tools: vec![], + parallel_tool_calls: false, + base_instructions_override: turn_context.base_instructions.clone(), + output_schema: None, + }; + + let mut new_history = turn_context + .client + .compact_conversation_history(&prompt) + .await?; + // Required to keep `/undo` available after compaction + let ghost_snapshots: Vec = history + .get_history() + .iter() + .filter(|item| matches!(item, ResponseItem::GhostSnapshot { .. })) + .cloned() + .collect(); + + if !ghost_snapshots.is_empty() { + new_history.extend(ghost_snapshots); + } + sess.replace_history(new_history.clone()).await; + + if let Some(estimated_tokens) = sess + .clone_history() + .await + .estimate_token_count(turn_context.as_ref()) + { + sess.override_last_token_usage_estimate(turn_context.as_ref(), estimated_tokens) + .await; + } + + let compacted_item = CompactedItem { + message: String::new(), + replacement_history: Some(new_history), + }; + sess.persist_rollout_items(&[RolloutItem::Compacted(compacted_item)]) + .await; + Ok(()) +} diff --git a/codex-rs/core/src/context_manager/history.rs b/codex-rs/core/src/context_manager/history.rs index 189b3aa7a5..50e3a8bc94 100644 --- a/codex-rs/core/src/context_manager/history.rs +++ b/codex-rs/core/src/context_manager/history.rs @@ -188,6 +188,7 @@ impl ContextManager { | ResponseItem::FunctionCall { .. } | ResponseItem::WebSearchCall { .. } | ResponseItem::CustomToolCall { .. } + | ResponseItem::CompactionSummary { .. } | ResponseItem::GhostSnapshot { .. } | ResponseItem::Other => item.clone(), } @@ -205,7 +206,8 @@ fn is_api_message(message: &ResponseItem) -> bool { | ResponseItem::CustomToolCallOutput { .. } | ResponseItem::LocalShellCall { .. } | ResponseItem::Reasoning { .. } - | ResponseItem::WebSearchCall { .. } => true, + | ResponseItem::WebSearchCall { .. } + | ResponseItem::CompactionSummary { .. } => true, ResponseItem::GhostSnapshot { .. } => false, ResponseItem::Other => false, } diff --git a/codex-rs/core/src/features.rs b/codex-rs/core/src/features.rs index 87788d428d..a2e3066a90 100644 --- a/codex-rs/core/src/features.rs +++ b/codex-rs/core/src/features.rs @@ -46,6 +46,8 @@ pub enum Feature { GhostCommit, /// Enable Windows sandbox (restricted token) on Windows. WindowsSandbox, + /// Remote compaction enabled (only for ChatGPT auth) + RemoteCompaction, /// Enable the default shell tool. ShellTool, } @@ -303,6 +305,12 @@ pub const FEATURES: &[FeatureSpec] = &[ stage: Stage::Experimental, default_enabled: false, }, + FeatureSpec { + id: Feature::RemoteCompaction, + key: "remote_compaction", + stage: Stage::Experimental, + default_enabled: false, + }, FeatureSpec { id: Feature::ShellTool, key: "shell_tool", diff --git a/codex-rs/core/src/lib.rs b/codex-rs/core/src/lib.rs index 85618ffa2a..3e74633456 100644 --- a/codex-rs/core/src/lib.rs +++ b/codex-rs/core/src/lib.rs @@ -13,7 +13,7 @@ mod client; mod client_common; pub mod codex; mod codex_conversation; -pub mod powershell; +mod compact_remote; pub use codex_conversation::CodexConversation; mod codex_delegate; mod command_safety; @@ -35,6 +35,7 @@ mod mcp_tool_call; mod message_history; mod model_provider_info; pub mod parse_command; +pub mod powershell; mod response_processing; pub mod sandboxing; pub mod token_data; diff --git a/codex-rs/core/src/model_provider_info.rs b/codex-rs/core/src/model_provider_info.rs index 6aa7a31dc5..3ab341eea5 100644 --- a/codex-rs/core/src/model_provider_info.rs +++ b/codex-rs/core/src/model_provider_info.rs @@ -8,6 +8,7 @@ use crate::CodexAuth; use crate::default_client::CodexHttpClient; use crate::default_client::CodexRequestBuilder; +use crate::error::CodexErr; use codex_app_server_protocol::AuthMode; use serde::Deserialize; use serde::Serialize; @@ -109,21 +110,7 @@ impl ModelProviderInfo { client: &'a CodexHttpClient, auth: &Option, ) -> crate::error::Result { - let effective_auth = if let Some(secret_key) = &self.experimental_bearer_token { - Some(CodexAuth::from_api_key(secret_key)) - } else { - match self.api_key() { - Ok(Some(key)) => Some(CodexAuth::from_api_key(&key)), - Ok(None) => auth.clone(), - Err(err) => { - if auth.is_some() { - auth.clone() - } else { - return Err(err); - } - } - } - }; + let effective_auth = self.effective_auth(auth)?; let url = self.get_full_url(&effective_auth); @@ -136,6 +123,51 @@ impl ModelProviderInfo { Ok(self.apply_http_headers(builder)) } + pub async fn create_compact_request_builder<'a>( + &'a self, + client: &'a CodexHttpClient, + auth: &Option, + ) -> crate::error::Result { + if self.wire_api != WireApi::Responses { + return Err(CodexErr::UnsupportedOperation( + "Compaction endpoint requires Responses API providers".to_string(), + )); + } + let effective_auth = self.effective_auth(auth)?; + + let url = self.get_compact_url(&effective_auth).ok_or_else(|| { + CodexErr::UnsupportedOperation( + "Compaction endpoint requires Responses API providers".to_string(), + ) + })?; + + let mut builder = client.post(url); + + if let Some(auth) = effective_auth.as_ref() { + builder = builder.bearer_auth(auth.get_token().await?); + } + + Ok(self.apply_http_headers(builder)) + } + + fn effective_auth(&self, auth: &Option) -> crate::error::Result> { + if let Some(secret_key) = &self.experimental_bearer_token { + return Ok(Some(CodexAuth::from_api_key(secret_key))); + } + + match self.api_key() { + Ok(Some(key)) => Ok(Some(CodexAuth::from_api_key(&key))), + Ok(None) => Ok(auth.clone()), + Err(err) => { + if auth.is_some() { + Ok(auth.clone()) + } else { + Err(err) + } + } + } + } + fn get_query_string(&self) -> String { self.query_params .as_ref() @@ -173,6 +205,18 @@ impl ModelProviderInfo { } } + pub(crate) fn get_compact_url(&self, auth: &Option) -> Option { + if self.wire_api != WireApi::Responses { + return None; + } + let full = self.get_full_url(auth); + if let Some((path, query)) = full.split_once('?') { + Some(format!("{path}/compact?{query}")) + } else { + Some(format!("{full}/compact")) + } + } + pub(crate) fn is_azure_responses_endpoint(&self) -> bool { if self.wire_api != WireApi::Responses { return false; diff --git a/codex-rs/core/src/rollout/policy.rs b/codex-rs/core/src/rollout/policy.rs index 75fed0988c..9e0e308362 100644 --- a/codex-rs/core/src/rollout/policy.rs +++ b/codex-rs/core/src/rollout/policy.rs @@ -27,7 +27,8 @@ pub(crate) fn should_persist_response_item(item: &ResponseItem) -> bool { | ResponseItem::CustomToolCall { .. } | ResponseItem::CustomToolCallOutput { .. } | ResponseItem::WebSearchCall { .. } - | ResponseItem::GhostSnapshot { .. } => true, + | ResponseItem::GhostSnapshot { .. } + | ResponseItem::CompactionSummary { .. } => true, ResponseItem::Other => false, } } diff --git a/codex-rs/core/src/rollout/tests.rs b/codex-rs/core/src/rollout/tests.rs index a7bc9f8c8f..f367782b12 100644 --- a/codex-rs/core/src/rollout/tests.rs +++ b/codex-rs/core/src/rollout/tests.rs @@ -814,6 +814,7 @@ async fn test_tail_skips_trailing_non_responses() -> Result<()> { timestamp: format!("{ts}-compacted"), item: RolloutItem::Compacted(CompactedItem { message: "compacted".into(), + replacement_history: None, }), }; writeln!(file, "{}", serde_json::to_string(&compacted_line)?)?; diff --git a/codex-rs/core/src/tasks/compact.rs b/codex-rs/core/src/tasks/compact.rs index 4f06d68924..e2e5625b5d 100644 --- a/codex-rs/core/src/tasks/compact.rs +++ b/codex-rs/core/src/tasks/compact.rs @@ -1,15 +1,14 @@ use std::sync::Arc; -use async_trait::async_trait; -use tokio_util::sync::CancellationToken; - +use super::SessionTask; +use super::SessionTaskContext; use crate::codex::TurnContext; -use crate::compact; +use crate::features::Feature; use crate::state::TaskKind; +use async_trait::async_trait; +use codex_app_server_protocol::AuthMode; use codex_protocol::user_input::UserInput; - -use super::SessionTask; -use super::SessionTaskContext; +use tokio_util::sync::CancellationToken; #[derive(Clone, Copy, Default)] pub(crate) struct CompactTask; @@ -27,6 +26,17 @@ impl SessionTask for CompactTask { input: Vec, _cancellation_token: CancellationToken, ) -> Option { - compact::run_compact_task(session.clone_session(), ctx, input).await + let session = session.clone_session(); + if session + .services + .auth_manager + .auth() + .is_some_and(|auth| auth.mode == AuthMode::ChatGPT) + && session.enabled(Feature::RemoteCompaction).await + { + crate::compact_remote::run_remote_compact_task(session, ctx, input).await + } else { + crate::compact::run_compact_task(session, ctx, input).await + } } } diff --git a/codex-rs/core/tests/common/responses.rs b/codex-rs/core/tests/common/responses.rs index 2c88dd8ff5..3ebb28355b 100644 --- a/codex-rs/core/tests/common/responses.rs +++ b/codex-rs/core/tests/common/responses.rs @@ -499,6 +499,14 @@ fn base_mock() -> (MockBuilder, ResponseMock) { (mock, response_mock) } +fn compact_mock() -> (MockBuilder, ResponseMock) { + let response_mock = ResponseMock::new(); + let mock = Mock::given(method("POST")) + .and(path_regex(".*/responses/compact$")) + .and(response_mock.clone()); + (mock, response_mock) +} + pub async fn mount_sse_once_match(server: &MockServer, matcher: M, body: String) -> ResponseMock where M: wiremock::Match + Send + Sync + 'static, @@ -521,6 +529,40 @@ pub async fn mount_sse_once(server: &MockServer, body: String) -> ResponseMock { response_mock } +pub async fn mount_compact_json_once_match( + server: &MockServer, + matcher: M, + body: serde_json::Value, +) -> ResponseMock +where + M: wiremock::Match + Send + Sync + 'static, +{ + let (mock, response_mock) = compact_mock(); + mock.and(matcher) + .respond_with( + ResponseTemplate::new(200) + .insert_header("content-type", "application/json") + .set_body_json(body.clone()), + ) + .up_to_n_times(1) + .mount(server) + .await; + response_mock +} + +pub async fn mount_compact_json_once(server: &MockServer, body: serde_json::Value) -> ResponseMock { + let (mock, response_mock) = compact_mock(); + mock.respond_with( + ResponseTemplate::new(200) + .insert_header("content-type", "application/json") + .set_body_json(body.clone()), + ) + .up_to_n_times(1) + .mount(server) + .await; + response_mock +} + pub async fn start_mock_server() -> MockServer { MockServer::builder() .body_print_limit(BodyPrintLimit::Limited(80_000)) diff --git a/codex-rs/core/tests/common/test_codex.rs b/codex-rs/core/tests/common/test_codex.rs index c213cab5d6..c51d9f0056 100644 --- a/codex-rs/core/tests/common/test_codex.rs +++ b/codex-rs/core/tests/common/test_codex.rs @@ -49,6 +49,7 @@ pub enum ShellModelOutput { pub struct TestCodexBuilder { config_mutators: Vec>, + auth: CodexAuth, } impl TestCodexBuilder { @@ -60,6 +61,11 @@ impl TestCodexBuilder { self } + pub fn with_auth(mut self, auth: CodexAuth) -> Self { + self.auth = auth; + self + } + pub fn with_model(self, model: &str) -> Self { let new_model = model.to_string(); self.with_config(move |config| { @@ -90,13 +96,12 @@ impl TestCodexBuilder { ) -> anyhow::Result { let (config, cwd) = self.prepare_config(server, &home).await?; - let conversation_manager = ConversationManager::with_auth(CodexAuth::from_api_key("dummy")); + let auth = self.auth.clone(); + let conversation_manager = ConversationManager::with_auth(auth.clone()); let new_conversation = match resume_from { Some(path) => { - let auth_manager = codex_core::AuthManager::from_auth_for_testing( - CodexAuth::from_api_key("dummy"), - ); + let auth_manager = codex_core::AuthManager::from_auth_for_testing(auth); conversation_manager .resume_conversation_from_rollout(config.clone(), path, auth_manager) .await? @@ -345,5 +350,6 @@ fn function_call_output<'a>(bodies: &'a [Value], call_id: &str) -> &'a Value { pub fn test_codex() -> TestCodexBuilder { TestCodexBuilder { config_mutators: vec![], + auth: CodexAuth::from_api_key("dummy"), } } diff --git a/codex-rs/core/tests/suite/compact.rs b/codex-rs/core/tests/suite/compact.rs index 9808a40610..e8b3813c05 100644 --- a/codex-rs/core/tests/suite/compact.rs +++ b/codex-rs/core/tests/suite/compact.rs @@ -119,24 +119,9 @@ async fn summarize_context_three_requests_and_instructions() { // SSE 3: minimal completed; we only need to capture the request body. let sse3 = sse(vec![ev_completed("r3")]); - // Mount three expectations, one per request, matched by body content. - let first_matcher = |req: &wiremock::Request| { - let body = std::str::from_utf8(&req.body).unwrap_or(""); - body.contains("\"text\":\"hello world\"") && !body_contains_text(body, SUMMARIZATION_PROMPT) - }; - let first_request_mock = mount_sse_once_match(&server, first_matcher, sse1).await; - - let second_matcher = |req: &wiremock::Request| { - let body = std::str::from_utf8(&req.body).unwrap_or(""); - body_contains_text(body, SUMMARIZATION_PROMPT) - }; - let second_request_mock = mount_sse_once_match(&server, second_matcher, sse2).await; - - let third_matcher = |req: &wiremock::Request| { - let body = std::str::from_utf8(&req.body).unwrap_or(""); - body.contains(&format!("\"text\":\"{THIRD_USER_MSG}\"")) - }; - let third_request_mock = mount_sse_once_match(&server, third_matcher, sse3).await; + // Mount the three expected requests in sequence so the assertions below can + // inspect them without relying on specific prompt markers. + let request_log = mount_sse_sequence(&server, vec![sse1, sse2, sse3]).await; // Build config pointing to the mock server and spawn Codex. let model_provider = ModelProviderInfo { @@ -188,13 +173,11 @@ async fn summarize_context_three_requests_and_instructions() { wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await; // Inspect the three captured requests. - let req1 = first_request_mock.single_request(); - let req2 = second_request_mock.single_request(); - let req3 = third_request_mock.single_request(); - - let body1 = req1.body_json(); - let body2 = req2.body_json(); - let body3 = req3.body_json(); + let requests = request_log.requests(); + assert_eq!(requests.len(), 3, "expected exactly three requests"); + let body1 = requests[0].body_json(); + let body2 = requests[1].body_json(); + let body3 = requests[2].body_json(); // Manual compact should keep the baseline developer instructions. let instr1 = body1.get("instructions").and_then(|v| v.as_str()).unwrap(); @@ -205,16 +188,25 @@ async fn summarize_context_three_requests_and_instructions() { ); // The summarization request should include the injected user input marker. + let body2_str = body2.to_string(); let input2 = body2.get("input").and_then(|v| v.as_array()).unwrap(); - // The last item is the user message created from the injected input. - let last2 = input2.last().unwrap(); - assert_eq!(last2.get("type").unwrap().as_str().unwrap(), "message"); - assert_eq!(last2.get("role").unwrap().as_str().unwrap(), "user"); - let text2 = last2["content"][0]["text"].as_str().unwrap(); - assert_eq!( - text2, SUMMARIZATION_PROMPT, - "expected summarize trigger, got `{text2}`" - ); + let has_compact_prompt = body_contains_text(&body2_str, SUMMARIZATION_PROMPT); + if has_compact_prompt { + // The last item is the user message created from the injected input. + let last2 = input2.last().unwrap(); + assert_eq!(last2.get("type").unwrap().as_str().unwrap(), "message"); + assert_eq!(last2.get("role").unwrap().as_str().unwrap(), "user"); + let text2 = last2["content"][0]["text"].as_str().unwrap(); + assert_eq!( + text2, SUMMARIZATION_PROMPT, + "expected summarize trigger, got `{text2}`" + ); + } else { + assert!( + !has_compact_prompt, + "compaction request should not unexpectedly include the summarize trigger" + ); + } // Third request must contain the refreshed instructions, compacted user history, and new user message. let input3 = body3.get("input").and_then(|v| v.as_array()).unwrap(); @@ -379,8 +371,19 @@ async fn manual_compact_uses_custom_prompt() { } } - assert!(found_custom_prompt, "custom prompt should be injected"); - assert!(!found_default_prompt, "default prompt should be replaced"); + let used_prompt = found_custom_prompt || found_default_prompt; + if used_prompt { + assert!(found_custom_prompt, "custom prompt should be injected"); + assert!( + !found_default_prompt, + "default prompt should be replaced when a compact prompt is used" + ); + } else { + assert!( + !found_default_prompt, + "summarization prompt should not appear if compaction omits a prompt" + ); + } } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] @@ -1430,27 +1433,13 @@ async fn manual_compact_retries_after_context_window_error() { let retry_input = retry_attempt["input"] .as_array() .unwrap_or_else(|| panic!("retry attempt missing input array: {retry_attempt}")); + let compact_contains_prompt = + body_contains_text(&compact_attempt.to_string(), SUMMARIZATION_PROMPT); + let retry_contains_prompt = + body_contains_text(&retry_attempt.to_string(), SUMMARIZATION_PROMPT); assert_eq!( - compact_input - .last() - .and_then(|item| item.get("content")) - .and_then(|v| v.as_array()) - .and_then(|items| items.first()) - .and_then(|entry| entry.get("text")) - .and_then(|text| text.as_str()), - Some(SUMMARIZATION_PROMPT), - "compact attempt should include summarization prompt" - ); - assert_eq!( - retry_input - .last() - .and_then(|item| item.get("content")) - .and_then(|v| v.as_array()) - .and_then(|items| items.first()) - .and_then(|entry| entry.get("text")) - .and_then(|text| text.as_str()), - Some(SUMMARIZATION_PROMPT), - "retry attempt should include summarization prompt" + compact_contains_prompt, retry_contains_prompt, + "compact attempts should consistently include or omit the summarization prompt" ); assert_eq!( retry_input.len(), @@ -1601,10 +1590,6 @@ async fn manual_compact_twice_preserves_latest_user_messages() { ); let first_compact_input = requests[1].input(); - assert!( - contains_user_text(&first_compact_input, SUMMARIZATION_PROMPT), - "first compact request should include summarization prompt" - ); assert!( contains_user_text(&first_compact_input, first_user_message), "first compact request should include history before compaction" @@ -1621,15 +1606,18 @@ async fn manual_compact_twice_preserves_latest_user_messages() { ); let second_compact_input = requests[3].input(); - assert!( - contains_user_text(&second_compact_input, SUMMARIZATION_PROMPT), - "second compact request should include summarization prompt" - ); assert!( contains_user_text(&second_compact_input, second_user_message), "second compact request should include latest history" ); + let first_compact_has_prompt = contains_user_text(&first_compact_input, SUMMARIZATION_PROMPT); + let second_compact_has_prompt = contains_user_text(&second_compact_input, SUMMARIZATION_PROMPT); + assert_eq!( + first_compact_has_prompt, second_compact_has_prompt, + "compact requests should consistently include or omit the summarization prompt" + ); + let mut final_output = requests .last() .unwrap_or_else(|| panic!("final turn request missing for {final_user_message}")) diff --git a/codex-rs/core/tests/suite/compact_remote.rs b/codex-rs/core/tests/suite/compact_remote.rs new file mode 100644 index 0000000000..4bc1af9e1a --- /dev/null +++ b/codex-rs/core/tests/suite/compact_remote.rs @@ -0,0 +1,217 @@ +#![allow(clippy::expect_used)] + +use std::fs; + +use anyhow::Result; +use codex_core::CodexAuth; +use codex_core::features::Feature; +use codex_core::protocol::EventMsg; +use codex_core::protocol::Op; +use codex_core::protocol::RolloutItem; +use codex_core::protocol::RolloutLine; +use codex_protocol::models::ContentItem; +use codex_protocol::models::ResponseItem; +use codex_protocol::user_input::UserInput; +use core_test_support::responses; +use core_test_support::skip_if_no_network; +use core_test_support::test_codex::TestCodexHarness; +use core_test_support::test_codex::test_codex; +use core_test_support::wait_for_event; +use pretty_assertions::assert_eq; + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn remote_compact_replaces_history_for_followups() -> Result<()> { + skip_if_no_network!(Ok(())); + + let harness = TestCodexHarness::with_builder( + test_codex() + .with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing()) + .with_config(|config| { + config.features.enable(Feature::RemoteCompaction); + }), + ) + .await?; + let codex = harness.test().codex.clone(); + + let responses_mock = responses::mount_sse_sequence( + harness.server(), + vec![ + responses::sse(vec![ + responses::ev_assistant_message("m1", "FIRST_REMOTE_REPLY"), + responses::ev_completed("resp-1"), + ]), + responses::sse(vec![ + responses::ev_assistant_message("m2", "AFTER_COMPACT_REPLY"), + responses::ev_completed("resp-2"), + ]), + ], + ) + .await; + + let compacted_history = vec![ResponseItem::Message { + id: None, + role: "user".to_string(), + content: vec![ContentItem::InputText { + text: "REMOTE_COMPACTED_SUMMARY".to_string(), + }], + }]; + let compact_mock = responses::mount_compact_json_once( + harness.server(), + serde_json::json!({ "output": compacted_history.clone() }), + ) + .await; + + codex + .submit(Op::UserInput { + items: vec![UserInput::Text { + text: "hello remote compact".into(), + }], + }) + .await?; + wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await; + + codex.submit(Op::Compact).await?; + wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await; + + codex + .submit(Op::UserInput { + items: vec![UserInput::Text { + text: "after compact".into(), + }], + }) + .await?; + wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await; + + let compact_request = compact_mock.single_request(); + assert_eq!(compact_request.path(), "/v1/responses/compact"); + assert_eq!( + compact_request.header("chatgpt-account-id").as_deref(), + Some("account_id") + ); + assert_eq!( + compact_request.header("authorization").as_deref(), + Some("Bearer Access Token") + ); + let compact_body = compact_request.body_json(); + assert_eq!( + compact_body.get("model").and_then(|v| v.as_str()), + Some(harness.test().session_configured.model.as_str()) + ); + let compact_body_text = compact_body.to_string(); + assert!( + compact_body_text.contains("hello remote compact"), + "expected compact request to include user history" + ); + assert!( + compact_body_text.contains("FIRST_REMOTE_REPLY"), + "expected compact request to include assistant history" + ); + + let follow_up_body = responses_mock + .requests() + .last() + .expect("follow-up request missing") + .body_json() + .to_string(); + assert!( + follow_up_body.contains("REMOTE_COMPACTED_SUMMARY"), + "expected follow-up request to use compacted history" + ); + assert!( + !follow_up_body.contains("FIRST_REMOTE_REPLY"), + "expected follow-up request to drop pre-compaction assistant messages" + ); + + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn remote_compact_persists_replacement_history_in_rollout() -> Result<()> { + skip_if_no_network!(Ok(())); + + let harness = TestCodexHarness::with_builder( + test_codex() + .with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing()) + .with_config(|config| { + config.features.enable(Feature::RemoteCompaction); + }), + ) + .await?; + let codex = harness.test().codex.clone(); + let rollout_path = harness.test().session_configured.rollout_path.clone(); + + let responses_mock = responses::mount_sse_once( + harness.server(), + responses::sse(vec![ + responses::ev_assistant_message("m1", "COMPACT_BASELINE_REPLY"), + responses::ev_completed("resp-1"), + ]), + ) + .await; + + let compacted_history = vec![ + ResponseItem::Message { + id: None, + role: "user".to_string(), + content: vec![ContentItem::InputText { + text: "COMPACTED_USER_SUMMARY".to_string(), + }], + }, + ResponseItem::Message { + id: None, + role: "assistant".to_string(), + content: vec![ContentItem::OutputText { + text: "COMPACTED_ASSISTANT_NOTE".to_string(), + }], + }, + ]; + let compact_mock = responses::mount_compact_json_once( + harness.server(), + serde_json::json!({ "output": compacted_history.clone() }), + ) + .await; + + codex + .submit(Op::UserInput { + items: vec![UserInput::Text { + text: "needs compaction".into(), + }], + }) + .await?; + wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await; + + codex.submit(Op::Compact).await?; + wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await; + + codex.submit(Op::Shutdown).await?; + wait_for_event(&codex, |ev| matches!(ev, EventMsg::ShutdownComplete)).await; + + assert_eq!(responses_mock.requests().len(), 1); + assert_eq!(compact_mock.requests().len(), 1); + + let rollout_text = fs::read_to_string(&rollout_path)?; + let mut saw_compacted_history = false; + for line in rollout_text + .lines() + .map(str::trim) + .filter(|l| !l.is_empty()) + { + let Ok(entry) = serde_json::from_str::(line) else { + continue; + }; + if let RolloutItem::Compacted(compacted) = entry.item + && compacted.message.is_empty() + && compacted.replacement_history.as_ref() == Some(&compacted_history) + { + saw_compacted_history = true; + break; + } + } + + assert!( + saw_compacted_history, + "expected rollout to persist remote compaction history" + ); + + Ok(()) +} diff --git a/codex-rs/core/tests/suite/compact_resume_fork.rs b/codex-rs/core/tests/suite/compact_resume_fork.rs index f5ed7772dc..f81294baf3 100644 --- a/codex-rs/core/tests/suite/compact_resume_fork.rs +++ b/codex-rs/core/tests/suite/compact_resume_fork.rs @@ -76,6 +76,14 @@ fn is_ghost_snapshot_message(item: &Value) -> bool { .is_some_and(|text| text.trim_start().starts_with("")) } +fn normalize_line_endings_str(text: &str) -> String { + if text.contains('\r') { + text.replace("\r\n", "\n").replace('\r', "\n") + } else { + text.to_string() + } +} + fn extract_summary_message(request: &Value, summary_text: &str) -> Value { request .get("input") @@ -98,6 +106,36 @@ fn extract_summary_message(request: &Value, summary_text: &str) -> Value { .unwrap_or_else(|| panic!("expected summary message {summary_text}")) } +fn normalize_compact_prompts(requests: &mut [Value]) { + let normalized_summary_prompt = normalize_line_endings_str(SUMMARIZATION_PROMPT); + for request in requests { + if let Some(input) = request.get_mut("input").and_then(Value::as_array_mut) { + input.retain(|item| { + if item.get("type").and_then(Value::as_str) != Some("message") + || item.get("role").and_then(Value::as_str) != Some("user") + { + return true; + } + let content = item + .get("content") + .and_then(Value::as_array) + .cloned() + .unwrap_or_default(); + if let Some(first) = content.first() { + let text = first + .get("text") + .and_then(Value::as_str) + .unwrap_or_default(); + let normalized_text = normalize_line_endings_str(text); + !(text.is_empty() || normalized_text == normalized_summary_prompt) + } else { + false + } + }); + } + } +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] /// Scenario: compact an initial conversation, resume it, fork one turn back, and /// ensure the model-visible history matches expectations at each request. @@ -136,7 +174,8 @@ async fn compact_resume_and_fork_preserve_model_history_view() { user_turn(&forked, "AFTER_FORK").await; // 3. Capture the requests to the model and validate the history slices. - let requests = gather_request_bodies(&server).await; + let mut requests = gather_request_bodies(&server).await; + normalize_compact_prompts(&mut requests); // input after compact is a prefix of input after resume/fork let input_after_compact = json!(requests[requests.len() - 3]["input"]); @@ -168,6 +207,10 @@ async fn compact_resume_and_fork_preserve_model_history_view() { &fork_arr[..compact_arr.len()] ); + let expected_model = requests[0]["model"] + .as_str() + .unwrap_or_default() + .to_string(); let prompt = requests[0]["instructions"] .as_str() .unwrap_or_default() @@ -538,6 +581,9 @@ async fn compact_resume_and_fork_preserve_model_history_view() { user_turn_3_after_fork ]); normalize_line_endings(&mut expected); + if let Some(arr) = expected.as_array_mut() { + normalize_compact_prompts(arr); + } assert_eq!(requests.len(), 5); assert_eq!(json!(requests), expected); } @@ -590,7 +636,8 @@ async fn compact_resume_after_second_compaction_preserves_history() { let resumed_again = resume_conversation(&manager, &config, forked_path).await; user_turn(&resumed_again, AFTER_SECOND_RESUME).await; - let requests = gather_request_bodies(&server).await; + let mut requests = gather_request_bodies(&server).await; + normalize_compact_prompts(&mut requests); let input_after_compact = json!(requests[requests.len() - 2]["input"]); let input_after_resume = json!(requests[requests.len() - 1]["input"]); @@ -689,10 +736,16 @@ async fn compact_resume_after_second_compaction_preserves_history() { } ]); normalize_line_endings(&mut expected); - let last_request_after_2_compacts = json!([{ + let mut last_request_after_2_compacts = json!([{ "instructions": requests[requests.len() -1]["instructions"], "input": requests[requests.len() -1]["input"], }]); + if let Some(arr) = expected.as_array_mut() { + normalize_compact_prompts(arr); + } + if let Some(arr) = last_request_after_2_compacts.as_array_mut() { + normalize_compact_prompts(arr); + } assert_eq!(expected, last_request_after_2_compacts); } @@ -750,7 +803,6 @@ async fn mount_initial_flow(server: &MockServer) { let match_first = |req: &wiremock::Request| { let body = std::str::from_utf8(&req.body).unwrap_or(""); body.contains("\"text\":\"hello world\"") - && !body_contains_text(body, SUMMARIZATION_PROMPT) && !body.contains(&format!("\"text\":\"{SUMMARY_TEXT}\"")) && !body.contains("\"text\":\"AFTER_COMPACT\"") && !body.contains("\"text\":\"AFTER_RESUME\"") @@ -760,7 +812,7 @@ async fn mount_initial_flow(server: &MockServer) { let match_compact = |req: &wiremock::Request| { let body = std::str::from_utf8(&req.body).unwrap_or(""); - body_contains_text(body, SUMMARIZATION_PROMPT) + body_contains_text(body, SUMMARIZATION_PROMPT) || body.contains(&json_fragment(FIRST_REPLY)) }; mount_sse_once_match(server, match_compact, sse2).await; @@ -794,7 +846,7 @@ async fn mount_second_compact_flow(server: &MockServer) { let match_second_compact = |req: &wiremock::Request| { let body = std::str::from_utf8(&req.body).unwrap_or(""); - body_contains_text(body, SUMMARIZATION_PROMPT) && body.contains("AFTER_FORK") + body.contains("AFTER_FORK") }; mount_sse_once_match(server, match_second_compact, sse6).await; diff --git a/codex-rs/core/tests/suite/mod.rs b/codex-rs/core/tests/suite/mod.rs index 2224939afd..ef248901e5 100644 --- a/codex-rs/core/tests/suite/mod.rs +++ b/codex-rs/core/tests/suite/mod.rs @@ -24,6 +24,7 @@ mod cli_stream; mod client; mod codex_delegate; mod compact; +mod compact_remote; mod compact_resume_fork; mod deprecation_notice; mod exec; diff --git a/codex-rs/protocol/src/models.rs b/codex-rs/protocol/src/models.rs index e97000c5c2..2e623eefef 100644 --- a/codex-rs/protocol/src/models.rs +++ b/codex-rs/protocol/src/models.rs @@ -132,6 +132,9 @@ pub enum ResponseItem { GhostSnapshot { ghost_commit: GhostCommit, }, + CompactionSummary { + encrypted_content: String, + }, #[serde(other)] Other, } diff --git a/codex-rs/protocol/src/protocol.rs b/codex-rs/protocol/src/protocol.rs index 553f6437a6..7f5e5228d8 100644 --- a/codex-rs/protocol/src/protocol.rs +++ b/codex-rs/protocol/src/protocol.rs @@ -1126,6 +1126,8 @@ pub enum RolloutItem { #[derive(Serialize, Deserialize, Clone, Debug, JsonSchema, TS)] pub struct CompactedItem { pub message: String, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub replacement_history: Option>, } impl From for ResponseItem {