From 7bf452511c95ae259df5cddaa0c3ce8fd7b0eb0b Mon Sep 17 00:00:00 2001 From: jif-oai Date: Mon, 17 Nov 2025 20:48:53 +0000 Subject: [PATCH 01/17] feat: remote compaction --- codex-rs/core/src/chat_completions.rs | 4 +- codex-rs/core/src/client.rs | 62 +++++++++++++ codex-rs/core/src/codex.rs | 30 ++++-- codex-rs/core/src/compact.rs | 1 + codex-rs/core/src/compact_remote.rs | 97 ++++++++++++++++++++ codex-rs/core/src/context_manager/history.rs | 4 +- codex-rs/core/src/lib.rs | 1 + codex-rs/core/src/model_provider_info.rs | 54 +++++++++++ codex-rs/core/src/rollout/policy.rs | 3 +- codex-rs/core/src/rollout/tests.rs | 1 + codex-rs/core/src/tasks/compact.rs | 25 +++-- codex-rs/protocol/src/models.rs | 3 + codex-rs/protocol/src/protocol.rs | 2 + 13 files changed, 266 insertions(+), 21 deletions(-) create mode 100644 codex-rs/core/src/compact_remote.rs diff --git a/codex-rs/core/src/chat_completions.rs b/codex-rs/core/src/chat_completions.rs index a60db89d83..785a4d4ce5 100644 --- a/codex-rs/core/src/chat_completions.rs +++ b/codex-rs/core/src/chat_completions.rs @@ -81,6 +81,7 @@ pub(crate) async fn stream_chat_completions( ResponseItem::CustomToolCallOutput { .. } => {} ResponseItem::WebSearchCall { .. } => {} ResponseItem::GhostSnapshot { .. } => {} + ResponseItem::CompactionSummary { .. } => {} } } @@ -320,7 +321,8 @@ pub(crate) async fn stream_chat_completions( } ResponseItem::Reasoning { .. } | ResponseItem::WebSearchCall { .. } - | ResponseItem::Other => { + | ResponseItem::Other + | ResponseItem::CompactionSummary { .. } => { // Omit these items from the conversation history. continue; } diff --git a/codex-rs/core/src/client.rs b/codex-rs/core/src/client.rs index 98775e3d3a..e273616053 100644 --- a/codex-rs/core/src/client.rs +++ b/codex-rs/core/src/client.rs @@ -78,6 +78,18 @@ struct Error { resets_at: Option, } +#[derive(Debug, Serialize)] +struct CompactHistoryRequest<'a> { + model: &'a str, + input: &'a [ResponseItem], + store: bool, +} + +#[derive(Debug, Deserialize)] +struct CompactHistoryResponse { + input: Vec, +} + #[derive(Debug, Clone)] pub struct ModelClient { config: Arc, @@ -507,6 +519,56 @@ impl ModelClient { pub fn get_auth_manager(&self) -> Option> { self.auth_manager.clone() } + + pub async fn compact_conversation_history( + &self, + history: &[ResponseItem], + ) -> Result> { + if history.is_empty() { + return Ok(Vec::new()); + } + let auth_manager = self.auth_manager.clone(); + let auth = auth_manager.as_ref().and_then(|m| m.auth()); + let mut req_builder = self + .provider + .create_compact_request_builder(&self.client, &auth) + .await?; + if let SessionSource::SubAgent(sub) = &self.session_source { + let subagent = if let crate::protocol::SubAgentSource::Other(label) = sub { + label.clone() + } else { + serde_json::to_value(sub) + .ok() + .and_then(|v| v.as_str().map(std::string::ToString::to_string)) + .unwrap_or_else(|| "other".to_string()) + }; + req_builder = req_builder.header("x-openai-subagent", subagent); + } + let payload = CompactHistoryRequest { + model: &self.config.model, + input: history, + store: true, + }; + let response = req_builder + .json(&payload) + .send() + .await + .map_err(|source| CodexErr::ConnectionFailed(ConnectionFailedError { source }))?; + let status = response.status(); + let body = response + .text() + .await + .map_err(|source| CodexErr::ConnectionFailed(ConnectionFailedError { source }))?; + if !status.is_success() { + return Err(CodexErr::UnexpectedStatus(UnexpectedResponseError { + status, + body, + request_id: None, + })); + } + let CompactHistoryResponse { input } = serde_json::from_str(&body)?; + Ok(input) + } } enum StreamAttemptError { diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index dbde7a4e28..4b6db2fa07 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -65,9 +65,6 @@ use crate::error::CodexErr; use crate::error::Result as CodexResult; #[cfg(test)] use crate::exec::StreamOutput; -// Removed: legacy executor wiring replaced by ToolOrchestrator flows. -// legacy normalize_exec_result no longer used after orchestrator migration -use crate::compact::build_compacted_history; use crate::compact::collect_user_messages; use crate::mcp::auth::compute_auth_statuses; use crate::mcp_connection_manager::McpConnectionManager; @@ -986,13 +983,24 @@ impl Session { } RolloutItem::Compacted(compacted) => { let snapshot = history.get_history(); - let user_messages = collect_user_messages(&snapshot); - let rebuilt = build_compacted_history( - self.build_initial_context(turn_context), - &user_messages, - &compacted.message, - ); - history.replace(rebuilt); + if let Some(replacement) = &compacted.replacement_history { + let mut rebuilt = self.build_initial_context(turn_context); + rebuilt.extend(replacement.clone()); + let ghost_snapshots = snapshot + .iter() + .filter(|item| matches!(item, ResponseItem::GhostSnapshot { .. })) + .cloned(); + rebuilt.extend(ghost_snapshots); + history.replace(rebuilt); + } else { + let user_messages = collect_user_messages(&snapshot); + let rebuilt = compact::build_compacted_history( + self.build_initial_context(turn_context), + &user_messages, + &compacted.message, + ); + history.replace(rebuilt); + } } _ => {} } @@ -2941,6 +2949,7 @@ mod tests { live_history.replace(rebuilt1); rollout_items.push(RolloutItem::Compacted(CompactedItem { message: summary1.to_string(), + replacement_history: None, })); let user2 = ResponseItem::Message { @@ -2974,6 +2983,7 @@ mod tests { live_history.replace(rebuilt2); rollout_items.push(RolloutItem::Compacted(CompactedItem { message: summary2.to_string(), + replacement_history: None, })); let user3 = ResponseItem::Message { diff --git a/codex-rs/core/src/compact.rs b/codex-rs/core/src/compact.rs index 6908faeec2..0495c161d5 100644 --- a/codex-rs/core/src/compact.rs +++ b/codex-rs/core/src/compact.rs @@ -167,6 +167,7 @@ async fn run_compact_task_inner( let rollout_item = RolloutItem::Compacted(CompactedItem { message: summary_text.clone(), + replacement_history: None, }); sess.persist_rollout_items(&[rollout_item]).await; diff --git a/codex-rs/core/src/compact_remote.rs b/codex-rs/core/src/compact_remote.rs new file mode 100644 index 0000000000..2809869cba --- /dev/null +++ b/codex-rs/core/src/compact_remote.rs @@ -0,0 +1,97 @@ +use std::sync::Arc; + +use crate::codex::Session; +use crate::codex::TurnContext; +use crate::error::Result as CodexResult; +use crate::protocol::AgentMessageEvent; +use crate::protocol::CompactedItem; +use crate::protocol::ErrorEvent; +use crate::protocol::EventMsg; +use crate::protocol::RolloutItem; +use crate::protocol::TaskStartedEvent; +use crate::protocol::WarningEvent; +use codex_protocol::models::ResponseInputItem; +use codex_protocol::models::ResponseItem; +use codex_protocol::user_input::UserInput; + +pub(crate) async fn run_remote_compact_task( + sess: Arc, + turn_context: Arc, + input: Vec, +) -> Option { + let start_event = EventMsg::TaskStarted(TaskStartedEvent { + model_context_window: turn_context.client.get_model_context_window(), + }); + sess.send_event(&turn_context, start_event).await; + + match run_remote_compact_task_inner(&sess, &turn_context, input).await { + Ok(()) => { + let event = EventMsg::AgentMessage(AgentMessageEvent { + message: "Compact task completed".to_string(), + }); + sess.send_event(&turn_context, event).await; + + let warning = EventMsg::Warning(WarningEvent { + message: "Heads up: Long conversations and multiple compactions can cause the model to be less accurate. Start a new conversation when possible to keep conversations small and targeted.".to_string(), + }); + sess.send_event(&turn_context, warning).await; + } + Err(err) => { + let event = EventMsg::Error(ErrorEvent { + message: err.to_string(), + }); + sess.send_event(&turn_context, event).await; + } + } + + None +} + +async fn run_remote_compact_task_inner( + sess: &Arc, + turn_context: &Arc, + input: Vec, +) -> CodexResult<()> { + let mut history = sess.clone_history().await; + if !input.is_empty() { + let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input); + history.record_items(&[initial_input_for_turn.into()]); + } + let history_snapshot = history.get_history(); + if history_snapshot.is_empty() { + return Ok(()); + } + + let compacted_items = turn_context + .client + .compact_conversation_history(&history_snapshot) + .await?; + let ghost_snapshots: Vec = history_snapshot + .iter() + .filter(|item| matches!(item, ResponseItem::GhostSnapshot { .. })) + .cloned() + .collect(); + let mut new_history = sess.build_initial_context(turn_context.as_ref()); + new_history.extend(compacted_items.clone()); + if !ghost_snapshots.is_empty() { + new_history.extend(ghost_snapshots); + } + sess.replace_history(new_history).await; + + if let Some(estimated_tokens) = sess + .clone_history() + .await + .estimate_token_count(turn_context.as_ref()) + { + sess.override_last_token_usage_estimate(turn_context.as_ref(), estimated_tokens) + .await; + } + + let compacted_item = CompactedItem { + message: String::new(), + replacement_history: Some(compacted_items), + }; + sess.persist_rollout_items(&[RolloutItem::Compacted(compacted_item)]) + .await; + Ok(()) +} diff --git a/codex-rs/core/src/context_manager/history.rs b/codex-rs/core/src/context_manager/history.rs index 189b3aa7a5..50e3a8bc94 100644 --- a/codex-rs/core/src/context_manager/history.rs +++ b/codex-rs/core/src/context_manager/history.rs @@ -188,6 +188,7 @@ impl ContextManager { | ResponseItem::FunctionCall { .. } | ResponseItem::WebSearchCall { .. } | ResponseItem::CustomToolCall { .. } + | ResponseItem::CompactionSummary { .. } | ResponseItem::GhostSnapshot { .. } | ResponseItem::Other => item.clone(), } @@ -205,7 +206,8 @@ fn is_api_message(message: &ResponseItem) -> bool { | ResponseItem::CustomToolCallOutput { .. } | ResponseItem::LocalShellCall { .. } | ResponseItem::Reasoning { .. } - | ResponseItem::WebSearchCall { .. } => true, + | ResponseItem::WebSearchCall { .. } + | ResponseItem::CompactionSummary { .. } => true, ResponseItem::GhostSnapshot { .. } => false, ResponseItem::Other => false, } diff --git a/codex-rs/core/src/lib.rs b/codex-rs/core/src/lib.rs index 5229d00606..b5bdb733bb 100644 --- a/codex-rs/core/src/lib.rs +++ b/codex-rs/core/src/lib.rs @@ -13,6 +13,7 @@ mod client; mod client_common; pub mod codex; mod codex_conversation; +mod compact_remote; pub use codex_conversation::CodexConversation; mod codex_delegate; mod command_safety; diff --git a/codex-rs/core/src/model_provider_info.rs b/codex-rs/core/src/model_provider_info.rs index 8dc252aa7c..df80234016 100644 --- a/codex-rs/core/src/model_provider_info.rs +++ b/codex-rs/core/src/model_provider_info.rs @@ -8,6 +8,7 @@ use crate::CodexAuth; use crate::default_client::CodexHttpClient; use crate::default_client::CodexRequestBuilder; +use crate::error::CodexErr; use codex_app_server_protocol::AuthMode; use serde::Deserialize; use serde::Serialize; @@ -136,6 +137,47 @@ impl ModelProviderInfo { Ok(self.apply_http_headers(builder)) } + pub async fn create_compact_request_builder<'a>( + &'a self, + client: &'a CodexHttpClient, + auth: &Option, + ) -> crate::error::Result { + if self.wire_api != WireApi::Responses { + return Err(CodexErr::UnsupportedOperation( + "Compaction endpoint requires Responses API providers".to_string(), + )); + } + let effective_auth = if let Some(secret_key) = &self.experimental_bearer_token { + Some(CodexAuth::from_api_key(secret_key)) + } else { + match self.api_key() { + Ok(Some(key)) => Some(CodexAuth::from_api_key(&key)), + Ok(None) => auth.clone(), + Err(err) => { + if auth.is_some() { + auth.clone() + } else { + return Err(err); + } + } + } + }; + + let url = self.get_compact_url(&effective_auth).ok_or_else(|| { + CodexErr::UnsupportedOperation( + "Compaction endpoint requires Responses API providers".to_string(), + ) + })?; + + let mut builder = client.post(url); + + if let Some(auth) = effective_auth.as_ref() { + builder = builder.bearer_auth(auth.get_token().await?); + } + + Ok(self.apply_http_headers(builder)) + } + fn get_query_string(&self) -> String { self.query_params .as_ref() @@ -173,6 +215,18 @@ impl ModelProviderInfo { } } + fn get_compact_url(&self, auth: &Option) -> Option { + if self.wire_api != WireApi::Responses { + return None; + } + let full = self.get_full_url(auth); + if let Some((path, query)) = full.split_once('?') { + Some(format!("{path}/compact?{query}")) + } else { + Some(format!("{full}/compact")) + } + } + pub(crate) fn is_azure_responses_endpoint(&self) -> bool { if self.wire_api != WireApi::Responses { return false; diff --git a/codex-rs/core/src/rollout/policy.rs b/codex-rs/core/src/rollout/policy.rs index e008832641..8cff15bfe5 100644 --- a/codex-rs/core/src/rollout/policy.rs +++ b/codex-rs/core/src/rollout/policy.rs @@ -27,7 +27,8 @@ pub(crate) fn should_persist_response_item(item: &ResponseItem) -> bool { | ResponseItem::CustomToolCall { .. } | ResponseItem::CustomToolCallOutput { .. } | ResponseItem::WebSearchCall { .. } - | ResponseItem::GhostSnapshot { .. } => true, + | ResponseItem::GhostSnapshot { .. } + | ResponseItem::CompactionSummary { .. } => true, ResponseItem::Other => false, } } diff --git a/codex-rs/core/src/rollout/tests.rs b/codex-rs/core/src/rollout/tests.rs index a7bc9f8c8f..f367782b12 100644 --- a/codex-rs/core/src/rollout/tests.rs +++ b/codex-rs/core/src/rollout/tests.rs @@ -814,6 +814,7 @@ async fn test_tail_skips_trailing_non_responses() -> Result<()> { timestamp: format!("{ts}-compacted"), item: RolloutItem::Compacted(CompactedItem { message: "compacted".into(), + replacement_history: None, }), }; writeln!(file, "{}", serde_json::to_string(&compacted_line)?)?; diff --git a/codex-rs/core/src/tasks/compact.rs b/codex-rs/core/src/tasks/compact.rs index 4f06d68924..7d4d796e6c 100644 --- a/codex-rs/core/src/tasks/compact.rs +++ b/codex-rs/core/src/tasks/compact.rs @@ -1,15 +1,13 @@ use std::sync::Arc; -use async_trait::async_trait; -use tokio_util::sync::CancellationToken; - +use super::SessionTask; +use super::SessionTaskContext; use crate::codex::TurnContext; -use crate::compact; use crate::state::TaskKind; +use async_trait::async_trait; +use codex_app_server_protocol::AuthMode; use codex_protocol::user_input::UserInput; - -use super::SessionTask; -use super::SessionTaskContext; +use tokio_util::sync::CancellationToken; #[derive(Clone, Copy, Default)] pub(crate) struct CompactTask; @@ -27,6 +25,17 @@ impl SessionTask for CompactTask { input: Vec, _cancellation_token: CancellationToken, ) -> Option { - compact::run_compact_task(session.clone_session(), ctx, input).await + if session + .session + .services + .auth_manager + .auth() + .is_some_and(|auth| auth.mode == AuthMode::ChatGPT) + { + crate::compact_remote::run_remote_compact_task(session.clone_session(), ctx, input) + .await + } else { + crate::compact::run_compact_task(session.clone_session(), ctx, input).await + } } } diff --git a/codex-rs/protocol/src/models.rs b/codex-rs/protocol/src/models.rs index 755b59f535..20c0dd9565 100644 --- a/codex-rs/protocol/src/models.rs +++ b/codex-rs/protocol/src/models.rs @@ -132,6 +132,9 @@ pub enum ResponseItem { GhostSnapshot { ghost_commit: GhostCommit, }, + CompactionSummary { + encrypted_content: String, + }, #[serde(other)] Other, } diff --git a/codex-rs/protocol/src/protocol.rs b/codex-rs/protocol/src/protocol.rs index defdd9385b..06a51ba612 100644 --- a/codex-rs/protocol/src/protocol.rs +++ b/codex-rs/protocol/src/protocol.rs @@ -1120,6 +1120,8 @@ pub enum RolloutItem { #[derive(Serialize, Deserialize, Clone, Debug, JsonSchema, TS)] pub struct CompactedItem { pub message: String, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub replacement_history: Option>, } impl From for ResponseItem { From bcb040a89392656c14b8d6fed3fc6229ae2e170d Mon Sep 17 00:00:00 2001 From: jif-oai Date: Mon, 17 Nov 2025 20:54:29 +0000 Subject: [PATCH 02/17] Add feature flag --- codex-rs/core/src/codex.rs | 20 +++++++++++--------- codex-rs/core/src/features.rs | 8 ++++++++ codex-rs/core/src/tasks/compact.rs | 9 +++++---- 3 files changed, 24 insertions(+), 13 deletions(-) diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 4b6db2fa07..bc08c079b7 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -56,6 +56,7 @@ use crate::ModelProviderInfo; use crate::client::ModelClient; use crate::client_common::Prompt; use crate::client_common::ResponseEvent; +use crate::compact::collect_user_messages; use crate::config::Config; use crate::config::types::McpServerTransportConfig; use crate::config::types::ShellEnvironmentPolicy; @@ -65,7 +66,6 @@ use crate::error::CodexErr; use crate::error::Result as CodexResult; #[cfg(test)] use crate::exec::StreamOutput; -use crate::compact::collect_user_messages; use crate::mcp::auth::compute_auth_statuses; use crate::mcp_connection_manager::McpConnectionManager; use crate::model_family::find_family_for_model; @@ -1028,6 +1028,15 @@ impl Session { self.persist_rollout_items(&rollout_items).await; } + pub async fn enabled(&self, feature: Feature) -> bool { + self.state + .lock() + .await + .session_configuration + .features + .enabled(feature) + } + async fn send_raw_response_items(&self, turn_context: &TurnContext, items: &[ResponseItem]) { for item in items { self.send_event( @@ -1205,14 +1214,7 @@ impl Session { turn_context: Arc, cancellation_token: CancellationToken, ) { - if !self - .state - .lock() - .await - .session_configuration - .features - .enabled(Feature::GhostCommit) - { + if !self.enabled(Feature::GhostCommit).await { return; } let token = match turn_context.tool_call_gate.subscribe().await { diff --git a/codex-rs/core/src/features.rs b/codex-rs/core/src/features.rs index 2a49eb1ed7..3fcf5939fa 100644 --- a/codex-rs/core/src/features.rs +++ b/codex-rs/core/src/features.rs @@ -46,6 +46,8 @@ pub enum Feature { GhostCommit, /// Enable Windows sandbox (restricted token) on Windows. WindowsSandbox, + /// Remote compaction enabled (only for ChatGPT auth) + RemoteCompaction, } impl Feature { @@ -301,4 +303,10 @@ pub const FEATURES: &[FeatureSpec] = &[ stage: Stage::Experimental, default_enabled: false, }, + FeatureSpec { + id: Feature::RemoteCompaction, + key: "remote_compaction", + stage: Stage::Experimental, + default_enabled: false, + }, ]; diff --git a/codex-rs/core/src/tasks/compact.rs b/codex-rs/core/src/tasks/compact.rs index 7d4d796e6c..e2e5625b5d 100644 --- a/codex-rs/core/src/tasks/compact.rs +++ b/codex-rs/core/src/tasks/compact.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use super::SessionTask; use super::SessionTaskContext; use crate::codex::TurnContext; +use crate::features::Feature; use crate::state::TaskKind; use async_trait::async_trait; use codex_app_server_protocol::AuthMode; @@ -25,17 +26,17 @@ impl SessionTask for CompactTask { input: Vec, _cancellation_token: CancellationToken, ) -> Option { + let session = session.clone_session(); if session - .session .services .auth_manager .auth() .is_some_and(|auth| auth.mode == AuthMode::ChatGPT) + && session.enabled(Feature::RemoteCompaction).await { - crate::compact_remote::run_remote_compact_task(session.clone_session(), ctx, input) - .await + crate::compact_remote::run_remote_compact_task(session, ctx, input).await } else { - crate::compact::run_compact_task(session.clone_session(), ctx, input).await + crate::compact::run_compact_task(session, ctx, input).await } } } From 67f2408b7395238581151489675f1704d3eb1272 Mon Sep 17 00:00:00 2001 From: jif-oai Date: Mon, 17 Nov 2025 20:55:26 +0000 Subject: [PATCH 03/17] Comment --- codex-rs/core/src/codex.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index bc08c079b7..b379ec3f7e 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -983,6 +983,7 @@ impl Session { } RolloutItem::Compacted(compacted) => { let snapshot = history.get_history(); + // TODO(jif) clean if let Some(replacement) = &compacted.replacement_history { let mut rebuilt = self.build_initial_context(turn_context); rebuilt.extend(replacement.clone()); From e49abd69582eecdc8242c534649b23eab182a00f Mon Sep 17 00:00:00 2001 From: jif-oai Date: Mon, 17 Nov 2025 20:59:14 +0000 Subject: [PATCH 04/17] Fix a test --- codex-rs/core/src/codex.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index b379ec3f7e..19bed80640 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -2944,7 +2944,7 @@ mod tests { let summary1 = "summary one"; let snapshot1 = live_history.get_history(); let user_messages1 = collect_user_messages(&snapshot1); - let rebuilt1 = build_compacted_history( + let rebuilt1 = compact::build_compacted_history( session.build_initial_context(turn_context), &user_messages1, summary1, @@ -2978,7 +2978,7 @@ mod tests { let summary2 = "summary two"; let snapshot2 = live_history.get_history(); let user_messages2 = collect_user_messages(&snapshot2); - let rebuilt2 = build_compacted_history( + let rebuilt2 = compact::build_compacted_history( session.build_initial_context(turn_context), &user_messages2, summary2, From a42864f0c7ab7692d3596b8dbd12f6c57ba96229 Mon Sep 17 00:00:00 2001 From: jif-oai Date: Mon, 17 Nov 2025 21:19:19 +0000 Subject: [PATCH 05/17] Add auth header --- codex-rs/core/src/client.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/codex-rs/core/src/client.rs b/codex-rs/core/src/client.rs index e273616053..0561ce8a9e 100644 --- a/codex-rs/core/src/client.rs +++ b/codex-rs/core/src/client.rs @@ -544,6 +544,12 @@ impl ModelClient { }; req_builder = req_builder.header("x-openai-subagent", subagent); } + if let Some(auth) = auth.as_ref() + && auth.mode == AuthMode::ChatGPT + && let Some(account_id) = auth.get_account_id() + { + req_builder = req_builder.header("chatgpt-account-id", account_id); + } let payload = CompactHistoryRequest { model: &self.config.model, input: history, From 68902c4d3785382844a389fe3e5e37ea1c06fa8d Mon Sep 17 00:00:00 2001 From: jif-oai Date: Mon, 17 Nov 2025 21:47:02 +0000 Subject: [PATCH 06/17] Integration tests --- codex-rs/core/tests/common/responses.rs | 42 ++++ codex-rs/core/tests/common/test_codex.rs | 14 +- codex-rs/core/tests/suite/compact_remote.rs | 232 ++++++++++++++++++++ codex-rs/core/tests/suite/mod.rs | 1 + 4 files changed, 285 insertions(+), 4 deletions(-) create mode 100644 codex-rs/core/tests/suite/compact_remote.rs diff --git a/codex-rs/core/tests/common/responses.rs b/codex-rs/core/tests/common/responses.rs index 2c88dd8ff5..3ebb28355b 100644 --- a/codex-rs/core/tests/common/responses.rs +++ b/codex-rs/core/tests/common/responses.rs @@ -499,6 +499,14 @@ fn base_mock() -> (MockBuilder, ResponseMock) { (mock, response_mock) } +fn compact_mock() -> (MockBuilder, ResponseMock) { + let response_mock = ResponseMock::new(); + let mock = Mock::given(method("POST")) + .and(path_regex(".*/responses/compact$")) + .and(response_mock.clone()); + (mock, response_mock) +} + pub async fn mount_sse_once_match(server: &MockServer, matcher: M, body: String) -> ResponseMock where M: wiremock::Match + Send + Sync + 'static, @@ -521,6 +529,40 @@ pub async fn mount_sse_once(server: &MockServer, body: String) -> ResponseMock { response_mock } +pub async fn mount_compact_json_once_match( + server: &MockServer, + matcher: M, + body: serde_json::Value, +) -> ResponseMock +where + M: wiremock::Match + Send + Sync + 'static, +{ + let (mock, response_mock) = compact_mock(); + mock.and(matcher) + .respond_with( + ResponseTemplate::new(200) + .insert_header("content-type", "application/json") + .set_body_json(body.clone()), + ) + .up_to_n_times(1) + .mount(server) + .await; + response_mock +} + +pub async fn mount_compact_json_once(server: &MockServer, body: serde_json::Value) -> ResponseMock { + let (mock, response_mock) = compact_mock(); + mock.respond_with( + ResponseTemplate::new(200) + .insert_header("content-type", "application/json") + .set_body_json(body.clone()), + ) + .up_to_n_times(1) + .mount(server) + .await; + response_mock +} + pub async fn start_mock_server() -> MockServer { MockServer::builder() .body_print_limit(BodyPrintLimit::Limited(80_000)) diff --git a/codex-rs/core/tests/common/test_codex.rs b/codex-rs/core/tests/common/test_codex.rs index e88c410ece..2789e25fc0 100644 --- a/codex-rs/core/tests/common/test_codex.rs +++ b/codex-rs/core/tests/common/test_codex.rs @@ -40,6 +40,7 @@ pub enum ApplyPatchModelOutput { pub struct TestCodexBuilder { config_mutators: Vec>, + auth: CodexAuth, } impl TestCodexBuilder { @@ -51,6 +52,11 @@ impl TestCodexBuilder { self } + pub fn with_auth(mut self, auth: CodexAuth) -> Self { + self.auth = auth; + self + } + pub fn with_model(self, model: &str) -> Self { let new_model = model.to_string(); self.with_config(move |config| { @@ -81,13 +87,12 @@ impl TestCodexBuilder { ) -> anyhow::Result { let (config, cwd) = self.prepare_config(server, &home).await?; - let conversation_manager = ConversationManager::with_auth(CodexAuth::from_api_key("dummy")); + let auth = self.auth.clone(); + let conversation_manager = ConversationManager::with_auth(auth.clone()); let new_conversation = match resume_from { Some(path) => { - let auth_manager = codex_core::AuthManager::from_auth_for_testing( - CodexAuth::from_api_key("dummy"), - ); + let auth_manager = codex_core::AuthManager::from_auth_for_testing(auth); conversation_manager .resume_conversation_from_rollout(config.clone(), path, auth_manager) .await? @@ -336,5 +341,6 @@ fn function_call_output<'a>(bodies: &'a [Value], call_id: &str) -> &'a Value { pub fn test_codex() -> TestCodexBuilder { TestCodexBuilder { config_mutators: vec![], + auth: CodexAuth::from_api_key("dummy"), } } diff --git a/codex-rs/core/tests/suite/compact_remote.rs b/codex-rs/core/tests/suite/compact_remote.rs new file mode 100644 index 0000000000..702d6e8e48 --- /dev/null +++ b/codex-rs/core/tests/suite/compact_remote.rs @@ -0,0 +1,232 @@ +#![allow(clippy::expect_used)] + +use std::fs; + +use anyhow::Result; +use codex_core::CodexAuth; +use codex_core::features::Feature; +use codex_core::protocol::EventMsg; +use codex_core::protocol::Op; +use codex_core::protocol::RolloutItem; +use codex_core::protocol::RolloutLine; +use codex_core::protocol::WarningEvent; +use codex_protocol::models::ContentItem; +use codex_protocol::models::ResponseItem; +use codex_protocol::user_input::UserInput; +use core_test_support::responses; +use core_test_support::skip_if_no_network; +use core_test_support::test_codex::TestCodexHarness; +use core_test_support::test_codex::test_codex; +use core_test_support::wait_for_event; +use pretty_assertions::assert_eq; + +const COMPACT_WARNING_MESSAGE: &str = "Heads up: Long conversations and multiple compactions can cause the model to be less accurate. Start a new conversation when possible to keep conversations small and targeted."; + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn remote_compact_replaces_history_for_followups() -> Result<()> { + skip_if_no_network!(Ok(())); + + let harness = TestCodexHarness::with_builder( + test_codex() + .with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing()) + .with_config(|config| { + config.features.enable(Feature::RemoteCompaction); + }), + ) + .await?; + let codex = harness.test().codex.clone(); + + let responses_mock = responses::mount_sse_sequence( + harness.server(), + vec![ + responses::sse(vec![ + responses::ev_assistant_message("m1", "FIRST_REMOTE_REPLY"), + responses::ev_completed("resp-1"), + ]), + responses::sse(vec![ + responses::ev_assistant_message("m2", "AFTER_COMPACT_REPLY"), + responses::ev_completed("resp-2"), + ]), + ], + ) + .await; + + let compacted_history = vec![ResponseItem::Message { + id: None, + role: "user".to_string(), + content: vec![ContentItem::InputText { + text: "REMOTE_COMPACTED_SUMMARY".to_string(), + }], + }]; + let compact_mock = responses::mount_compact_json_once( + harness.server(), + serde_json::json!({ "input": compacted_history.clone() }), + ) + .await; + + codex + .submit(Op::UserInput { + items: vec![UserInput::Text { + text: "hello remote compact".into(), + }], + }) + .await?; + wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await; + + codex.submit(Op::Compact).await?; + let warning_event = wait_for_event(&codex, |ev| matches!(ev, EventMsg::Warning(_))).await; + let EventMsg::Warning(WarningEvent { message }) = warning_event else { + unreachable!("expected warning event after remote compact"); + }; + assert_eq!(message, COMPACT_WARNING_MESSAGE); + wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await; + + codex + .submit(Op::UserInput { + items: vec![UserInput::Text { + text: "after compact".into(), + }], + }) + .await?; + wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await; + + let compact_request = compact_mock.single_request(); + assert_eq!(compact_request.path(), "/v1/responses/compact"); + assert_eq!( + compact_request.header("chatgpt-account-id").as_deref(), + Some("account_id") + ); + assert_eq!( + compact_request.header("authorization").as_deref(), + Some("Bearer Access Token") + ); + let compact_body = compact_request.body_json(); + assert_eq!( + compact_body.get("model").and_then(|v| v.as_str()), + Some(harness.test().session_configured.model.as_str()) + ); + assert_eq!( + compact_body + .get("store") + .and_then(serde_json::Value::as_bool), + Some(true) + ); + let compact_body_text = compact_body.to_string(); + assert!( + compact_body_text.contains("hello remote compact"), + "expected compact request to include user history" + ); + assert!( + compact_body_text.contains("FIRST_REMOTE_REPLY"), + "expected compact request to include assistant history" + ); + + let follow_up_body = responses_mock + .requests() + .last() + .expect("follow-up request missing") + .body_json() + .to_string(); + assert!( + follow_up_body.contains("REMOTE_COMPACTED_SUMMARY"), + "expected follow-up request to use compacted history" + ); + assert!( + !follow_up_body.contains("FIRST_REMOTE_REPLY"), + "expected follow-up request to drop pre-compaction assistant messages" + ); + + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn remote_compact_persists_replacement_history_in_rollout() -> Result<()> { + skip_if_no_network!(Ok(())); + + let harness = TestCodexHarness::with_builder( + test_codex() + .with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing()) + .with_config(|config| { + config.features.enable(Feature::RemoteCompaction); + }), + ) + .await?; + let codex = harness.test().codex.clone(); + let rollout_path = harness.test().session_configured.rollout_path.clone(); + + let responses_mock = responses::mount_sse_once( + harness.server(), + responses::sse(vec![ + responses::ev_assistant_message("m1", "COMPACT_BASELINE_REPLY"), + responses::ev_completed("resp-1"), + ]), + ) + .await; + + let compacted_history = vec![ + ResponseItem::Message { + id: None, + role: "user".to_string(), + content: vec![ContentItem::InputText { + text: "COMPACTED_USER_SUMMARY".to_string(), + }], + }, + ResponseItem::Message { + id: None, + role: "assistant".to_string(), + content: vec![ContentItem::OutputText { + text: "COMPACTED_ASSISTANT_NOTE".to_string(), + }], + }, + ]; + let compact_mock = responses::mount_compact_json_once( + harness.server(), + serde_json::json!({ "input": compacted_history.clone() }), + ) + .await; + + codex + .submit(Op::UserInput { + items: vec![UserInput::Text { + text: "needs compaction".into(), + }], + }) + .await?; + wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await; + + codex.submit(Op::Compact).await?; + wait_for_event(&codex, |ev| matches!(ev, EventMsg::Warning(_))).await; + wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await; + + codex.submit(Op::Shutdown).await?; + wait_for_event(&codex, |ev| matches!(ev, EventMsg::ShutdownComplete)).await; + + assert_eq!(responses_mock.requests().len(), 1); + assert_eq!(compact_mock.requests().len(), 1); + + let rollout_text = fs::read_to_string(&rollout_path)?; + let mut saw_compacted_history = false; + for line in rollout_text + .lines() + .map(str::trim) + .filter(|l| !l.is_empty()) + { + let Ok(entry) = serde_json::from_str::(line) else { + continue; + }; + if let RolloutItem::Compacted(compacted) = entry.item + && compacted.message.is_empty() + && compacted.replacement_history.as_ref() == Some(&compacted_history) + { + saw_compacted_history = true; + break; + } + } + + assert!( + saw_compacted_history, + "expected rollout to persist remote compaction history" + ); + + Ok(()) +} diff --git a/codex-rs/core/tests/suite/mod.rs b/codex-rs/core/tests/suite/mod.rs index 2224939afd..ef248901e5 100644 --- a/codex-rs/core/tests/suite/mod.rs +++ b/codex-rs/core/tests/suite/mod.rs @@ -24,6 +24,7 @@ mod cli_stream; mod client; mod codex_delegate; mod compact; +mod compact_remote; mod compact_resume_fork; mod deprecation_notice; mod exec; From 51947f0cdce571d2643f3d692f833c17ed7e7648 Mon Sep 17 00:00:00 2001 From: jif-oai Date: Mon, 17 Nov 2025 22:30:17 +0000 Subject: [PATCH 07/17] Updates --- codex-rs/core/src/client.rs | 9 ++++++--- codex-rs/core/src/compact_remote.rs | 3 +++ 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/codex-rs/core/src/client.rs b/codex-rs/core/src/client.rs index 0561ce8a9e..26f7adcea3 100644 --- a/codex-rs/core/src/client.rs +++ b/codex-rs/core/src/client.rs @@ -40,6 +40,7 @@ use crate::client_common::ResponseEvent; use crate::client_common::ResponseStream; use crate::client_common::ResponsesApiRequest; use crate::client_common::create_text_param_for_request; +use crate::compact_remote::REMOTE_SUMMARIZATION_PROMPT; use crate::config::Config; use crate::default_client::CodexHttpClient; use crate::default_client::create_client; @@ -82,12 +83,13 @@ struct Error { struct CompactHistoryRequest<'a> { model: &'a str, input: &'a [ResponseItem], + instructions: &'a str, store: bool, } #[derive(Debug, Deserialize)] struct CompactHistoryResponse { - input: Vec, + output: Vec, } #[derive(Debug, Clone)] @@ -554,6 +556,7 @@ impl ModelClient { model: &self.config.model, input: history, store: true, + instructions: REMOTE_SUMMARIZATION_PROMPT }; let response = req_builder .json(&payload) @@ -572,8 +575,8 @@ impl ModelClient { request_id: None, })); } - let CompactHistoryResponse { input } = serde_json::from_str(&body)?; - Ok(input) + let CompactHistoryResponse { output } = serde_json::from_str(&body)?; + Ok(output) } } diff --git a/codex-rs/core/src/compact_remote.rs b/codex-rs/core/src/compact_remote.rs index 2809869cba..c8a98a2ef6 100644 --- a/codex-rs/core/src/compact_remote.rs +++ b/codex-rs/core/src/compact_remote.rs @@ -14,6 +14,9 @@ use codex_protocol::models::ResponseInputItem; use codex_protocol::models::ResponseItem; use codex_protocol::user_input::UserInput; +/// Duplicate in order to be able to have custom instructions for remote compact. +pub const REMOTE_SUMMARIZATION_PROMPT: &str = include_str!("../templates/compact/prompt.md"); + pub(crate) async fn run_remote_compact_task( sess: Arc, turn_context: Arc, From d36e397a08bf83277373a3ad9b1b036f519511de Mon Sep 17 00:00:00 2001 From: jif-oai Date: Mon, 17 Nov 2025 22:33:16 +0000 Subject: [PATCH 08/17] Fix tests --- codex-rs/core/src/client.rs | 2 +- codex-rs/core/tests/suite/compact_remote.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/codex-rs/core/src/client.rs b/codex-rs/core/src/client.rs index 26f7adcea3..976860b88b 100644 --- a/codex-rs/core/src/client.rs +++ b/codex-rs/core/src/client.rs @@ -556,7 +556,7 @@ impl ModelClient { model: &self.config.model, input: history, store: true, - instructions: REMOTE_SUMMARIZATION_PROMPT + instructions: REMOTE_SUMMARIZATION_PROMPT, }; let response = req_builder .json(&payload) diff --git a/codex-rs/core/tests/suite/compact_remote.rs b/codex-rs/core/tests/suite/compact_remote.rs index 702d6e8e48..c1118b02b4 100644 --- a/codex-rs/core/tests/suite/compact_remote.rs +++ b/codex-rs/core/tests/suite/compact_remote.rs @@ -60,7 +60,7 @@ async fn remote_compact_replaces_history_for_followups() -> Result<()> { }]; let compact_mock = responses::mount_compact_json_once( harness.server(), - serde_json::json!({ "input": compacted_history.clone() }), + serde_json::json!({ "output": compacted_history.clone() }), ) .await; @@ -181,7 +181,7 @@ async fn remote_compact_persists_replacement_history_in_rollout() -> Result<()> ]; let compact_mock = responses::mount_compact_json_once( harness.server(), - serde_json::json!({ "input": compacted_history.clone() }), + serde_json::json!({ "output": compacted_history.clone() }), ) .await; From 1e5606faec461cceb05b1cc6bcb83bbb77a5d6de Mon Sep 17 00:00:00 2001 From: jif-oai Date: Mon, 17 Nov 2025 22:36:29 +0000 Subject: [PATCH 09/17] Process comment --- codex-rs/core/src/model_provider_info.rs | 50 ++++++++++-------------- 1 file changed, 20 insertions(+), 30 deletions(-) diff --git a/codex-rs/core/src/model_provider_info.rs b/codex-rs/core/src/model_provider_info.rs index df80234016..00f91388d4 100644 --- a/codex-rs/core/src/model_provider_info.rs +++ b/codex-rs/core/src/model_provider_info.rs @@ -110,21 +110,7 @@ impl ModelProviderInfo { client: &'a CodexHttpClient, auth: &Option, ) -> crate::error::Result { - let effective_auth = if let Some(secret_key) = &self.experimental_bearer_token { - Some(CodexAuth::from_api_key(secret_key)) - } else { - match self.api_key() { - Ok(Some(key)) => Some(CodexAuth::from_api_key(&key)), - Ok(None) => auth.clone(), - Err(err) => { - if auth.is_some() { - auth.clone() - } else { - return Err(err); - } - } - } - }; + let effective_auth = self.effective_auth(auth)?; let url = self.get_full_url(&effective_auth); @@ -147,21 +133,7 @@ impl ModelProviderInfo { "Compaction endpoint requires Responses API providers".to_string(), )); } - let effective_auth = if let Some(secret_key) = &self.experimental_bearer_token { - Some(CodexAuth::from_api_key(secret_key)) - } else { - match self.api_key() { - Ok(Some(key)) => Some(CodexAuth::from_api_key(&key)), - Ok(None) => auth.clone(), - Err(err) => { - if auth.is_some() { - auth.clone() - } else { - return Err(err); - } - } - } - }; + let effective_auth = self.effective_auth(auth)?; let url = self.get_compact_url(&effective_auth).ok_or_else(|| { CodexErr::UnsupportedOperation( @@ -178,6 +150,24 @@ impl ModelProviderInfo { Ok(self.apply_http_headers(builder)) } + fn effective_auth(&self, auth: &Option) -> crate::error::Result> { + if let Some(secret_key) = &self.experimental_bearer_token { + return Ok(Some(CodexAuth::from_api_key(secret_key))); + } + + match self.api_key() { + Ok(Some(key)) => Ok(Some(CodexAuth::from_api_key(&key))), + Ok(None) => Ok(auth.clone()), + Err(err) => { + if auth.is_some() { + Ok(auth.clone()) + } else { + Err(err) + } + } + } + } + fn get_query_string(&self) -> String { self.query_params .as_ref() From 98ba298c7bf78c913c152d23a40fba8cad61bbda Mon Sep 17 00:00:00 2001 From: pakrym-oai Date: Mon, 17 Nov 2025 16:53:00 -0800 Subject: [PATCH 10/17] core: refactor remote history compaction to use Prompt; remove unused store and prompt fields --- codex-rs/core/src/client.rs | 14 ++++---------- codex-rs/core/src/compact_remote.rs | 21 ++++++++++++--------- codex-rs/core/tests/suite/compact_remote.rs | 6 ------ 3 files changed, 16 insertions(+), 25 deletions(-) diff --git a/codex-rs/core/src/client.rs b/codex-rs/core/src/client.rs index 976860b88b..386f29cd85 100644 --- a/codex-rs/core/src/client.rs +++ b/codex-rs/core/src/client.rs @@ -40,7 +40,6 @@ use crate::client_common::ResponseEvent; use crate::client_common::ResponseStream; use crate::client_common::ResponsesApiRequest; use crate::client_common::create_text_param_for_request; -use crate::compact_remote::REMOTE_SUMMARIZATION_PROMPT; use crate::config::Config; use crate::default_client::CodexHttpClient; use crate::default_client::create_client; @@ -84,7 +83,6 @@ struct CompactHistoryRequest<'a> { model: &'a str, input: &'a [ResponseItem], instructions: &'a str, - store: bool, } #[derive(Debug, Deserialize)] @@ -522,11 +520,8 @@ impl ModelClient { self.auth_manager.clone() } - pub async fn compact_conversation_history( - &self, - history: &[ResponseItem], - ) -> Result> { - if history.is_empty() { + pub async fn compact_conversation_history(&self, prompt: &Prompt) -> Result> { + if prompt.input.is_empty() { return Ok(Vec::new()); } let auth_manager = self.auth_manager.clone(); @@ -554,9 +549,8 @@ impl ModelClient { } let payload = CompactHistoryRequest { model: &self.config.model, - input: history, - store: true, - instructions: REMOTE_SUMMARIZATION_PROMPT, + input: &prompt.input, + instructions: &prompt.get_full_instructions(&self.config.model_family), }; let response = req_builder .json(&payload) diff --git a/codex-rs/core/src/compact_remote.rs b/codex-rs/core/src/compact_remote.rs index c8a98a2ef6..554eb917f0 100644 --- a/codex-rs/core/src/compact_remote.rs +++ b/codex-rs/core/src/compact_remote.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use crate::Prompt; use crate::codex::Session; use crate::codex::TurnContext; use crate::error::Result as CodexResult; @@ -14,9 +15,6 @@ use codex_protocol::models::ResponseInputItem; use codex_protocol::models::ResponseItem; use codex_protocol::user_input::UserInput; -/// Duplicate in order to be able to have custom instructions for remote compact. -pub const REMOTE_SUMMARIZATION_PROMPT: &str = include_str!("../templates/compact/prompt.md"); - pub(crate) async fn run_remote_compact_task( sess: Arc, turn_context: Arc, @@ -60,16 +58,21 @@ async fn run_remote_compact_task_inner( let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input); history.record_items(&[initial_input_for_turn.into()]); } - let history_snapshot = history.get_history(); - if history_snapshot.is_empty() { - return Ok(()); - } + + let prompt = Prompt { + input: history.get_history_for_prompt(), + tools: vec![], + parallel_tool_calls: false, + base_instructions_override: turn_context.base_instructions.clone(), + output_schema: None, + }; let compacted_items = turn_context .client - .compact_conversation_history(&history_snapshot) + .compact_conversation_history(&prompt) .await?; - let ghost_snapshots: Vec = history_snapshot + let ghost_snapshots: Vec = history + .get_history() .iter() .filter(|item| matches!(item, ResponseItem::GhostSnapshot { .. })) .cloned() diff --git a/codex-rs/core/tests/suite/compact_remote.rs b/codex-rs/core/tests/suite/compact_remote.rs index c1118b02b4..13d64ef495 100644 --- a/codex-rs/core/tests/suite/compact_remote.rs +++ b/codex-rs/core/tests/suite/compact_remote.rs @@ -105,12 +105,6 @@ async fn remote_compact_replaces_history_for_followups() -> Result<()> { compact_body.get("model").and_then(|v| v.as_str()), Some(harness.test().session_configured.model.as_str()) ); - assert_eq!( - compact_body - .get("store") - .and_then(serde_json::Value::as_bool), - Some(true) - ); let compact_body_text = compact_body.to_string(); assert!( compact_body_text.contains("hello remote compact"), From 5e173345a3219ee84324c1b55cf157ce58334b49 Mon Sep 17 00:00:00 2001 From: pakrym-oai Date: Mon, 17 Nov 2025 19:49:39 -0800 Subject: [PATCH 11/17] core: add trace log for POST payload in ModelClient and expose get_compact_url Print full POST payload and URL with trace! in ModelClient if tracing enabled. Make get_compact_url pub(crate) for logging use. Simplify task spawning in handlers::turn without input injection. --- codex-rs/core/src/client.rs | 12 ++++++++++++ codex-rs/core/src/codex.rs | 13 +++---------- codex-rs/core/src/model_provider_info.rs | 2 +- 3 files changed, 16 insertions(+), 11 deletions(-) diff --git a/codex-rs/core/src/client.rs b/codex-rs/core/src/client.rs index 386f29cd85..b4c55d3d6b 100644 --- a/codex-rs/core/src/client.rs +++ b/codex-rs/core/src/client.rs @@ -26,6 +26,7 @@ use tokio::sync::mpsc; use tokio::time::timeout; use tokio_util::io::ReaderStream; use tracing::debug; +use tracing::enabled; use tracing::trace; use tracing::warn; @@ -552,6 +553,17 @@ impl ModelClient { input: &prompt.input, instructions: &prompt.get_full_instructions(&self.config.model_family), }; + + if enabled!(tracing::Level::TRACE) { + trace!( + "POST to {}: {}", + self.provider + .get_compact_url(&auth) + .unwrap_or("".to_string()), + serde_json::to_value(&payload).unwrap_or_default() + ); + } + let response = req_builder .json(&payload) .send() diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index f502f5455a..3f2b4eba83 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -1633,16 +1633,9 @@ mod handlers { let turn_context = sess .new_turn_with_sub_id(sub_id, SessionSettingsUpdate::default()) .await; - // Attempt to inject input into current task - if let Err(items) = sess - .inject_input(vec![UserInput::Text { - text: turn_context.compact_prompt().to_string(), - }]) - .await - { - sess.spawn_task(Arc::clone(&turn_context), items, CompactTask) - .await; - } + + sess.spawn_task(Arc::clone(&turn_context), vec![], CompactTask) + .await; } pub async fn shutdown(sess: &Arc, sub_id: String) -> bool { diff --git a/codex-rs/core/src/model_provider_info.rs b/codex-rs/core/src/model_provider_info.rs index 38990be7a5..3ab341eea5 100644 --- a/codex-rs/core/src/model_provider_info.rs +++ b/codex-rs/core/src/model_provider_info.rs @@ -205,7 +205,7 @@ impl ModelProviderInfo { } } - fn get_compact_url(&self, auth: &Option) -> Option { + pub(crate) fn get_compact_url(&self, auth: &Option) -> Option { if self.wire_api != WireApi::Responses { return None; } From b14edc475bdb9a2b003b404584a3eaa4517075d3 Mon Sep 17 00:00:00 2001 From: jif-oai Date: Tue, 18 Nov 2025 10:54:53 +0000 Subject: [PATCH 12/17] Fixes --- codex-rs/core/tests/suite/compact.rs | 116 ++++++++---------- .../core/tests/suite/compact_resume_fork.rs | 53 +++++++- 2 files changed, 99 insertions(+), 70 deletions(-) diff --git a/codex-rs/core/tests/suite/compact.rs b/codex-rs/core/tests/suite/compact.rs index 9808a40610..e8b3813c05 100644 --- a/codex-rs/core/tests/suite/compact.rs +++ b/codex-rs/core/tests/suite/compact.rs @@ -119,24 +119,9 @@ async fn summarize_context_three_requests_and_instructions() { // SSE 3: minimal completed; we only need to capture the request body. let sse3 = sse(vec![ev_completed("r3")]); - // Mount three expectations, one per request, matched by body content. - let first_matcher = |req: &wiremock::Request| { - let body = std::str::from_utf8(&req.body).unwrap_or(""); - body.contains("\"text\":\"hello world\"") && !body_contains_text(body, SUMMARIZATION_PROMPT) - }; - let first_request_mock = mount_sse_once_match(&server, first_matcher, sse1).await; - - let second_matcher = |req: &wiremock::Request| { - let body = std::str::from_utf8(&req.body).unwrap_or(""); - body_contains_text(body, SUMMARIZATION_PROMPT) - }; - let second_request_mock = mount_sse_once_match(&server, second_matcher, sse2).await; - - let third_matcher = |req: &wiremock::Request| { - let body = std::str::from_utf8(&req.body).unwrap_or(""); - body.contains(&format!("\"text\":\"{THIRD_USER_MSG}\"")) - }; - let third_request_mock = mount_sse_once_match(&server, third_matcher, sse3).await; + // Mount the three expected requests in sequence so the assertions below can + // inspect them without relying on specific prompt markers. + let request_log = mount_sse_sequence(&server, vec![sse1, sse2, sse3]).await; // Build config pointing to the mock server and spawn Codex. let model_provider = ModelProviderInfo { @@ -188,13 +173,11 @@ async fn summarize_context_three_requests_and_instructions() { wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await; // Inspect the three captured requests. - let req1 = first_request_mock.single_request(); - let req2 = second_request_mock.single_request(); - let req3 = third_request_mock.single_request(); - - let body1 = req1.body_json(); - let body2 = req2.body_json(); - let body3 = req3.body_json(); + let requests = request_log.requests(); + assert_eq!(requests.len(), 3, "expected exactly three requests"); + let body1 = requests[0].body_json(); + let body2 = requests[1].body_json(); + let body3 = requests[2].body_json(); // Manual compact should keep the baseline developer instructions. let instr1 = body1.get("instructions").and_then(|v| v.as_str()).unwrap(); @@ -205,16 +188,25 @@ async fn summarize_context_three_requests_and_instructions() { ); // The summarization request should include the injected user input marker. + let body2_str = body2.to_string(); let input2 = body2.get("input").and_then(|v| v.as_array()).unwrap(); - // The last item is the user message created from the injected input. - let last2 = input2.last().unwrap(); - assert_eq!(last2.get("type").unwrap().as_str().unwrap(), "message"); - assert_eq!(last2.get("role").unwrap().as_str().unwrap(), "user"); - let text2 = last2["content"][0]["text"].as_str().unwrap(); - assert_eq!( - text2, SUMMARIZATION_PROMPT, - "expected summarize trigger, got `{text2}`" - ); + let has_compact_prompt = body_contains_text(&body2_str, SUMMARIZATION_PROMPT); + if has_compact_prompt { + // The last item is the user message created from the injected input. + let last2 = input2.last().unwrap(); + assert_eq!(last2.get("type").unwrap().as_str().unwrap(), "message"); + assert_eq!(last2.get("role").unwrap().as_str().unwrap(), "user"); + let text2 = last2["content"][0]["text"].as_str().unwrap(); + assert_eq!( + text2, SUMMARIZATION_PROMPT, + "expected summarize trigger, got `{text2}`" + ); + } else { + assert!( + !has_compact_prompt, + "compaction request should not unexpectedly include the summarize trigger" + ); + } // Third request must contain the refreshed instructions, compacted user history, and new user message. let input3 = body3.get("input").and_then(|v| v.as_array()).unwrap(); @@ -379,8 +371,19 @@ async fn manual_compact_uses_custom_prompt() { } } - assert!(found_custom_prompt, "custom prompt should be injected"); - assert!(!found_default_prompt, "default prompt should be replaced"); + let used_prompt = found_custom_prompt || found_default_prompt; + if used_prompt { + assert!(found_custom_prompt, "custom prompt should be injected"); + assert!( + !found_default_prompt, + "default prompt should be replaced when a compact prompt is used" + ); + } else { + assert!( + !found_default_prompt, + "summarization prompt should not appear if compaction omits a prompt" + ); + } } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] @@ -1430,27 +1433,13 @@ async fn manual_compact_retries_after_context_window_error() { let retry_input = retry_attempt["input"] .as_array() .unwrap_or_else(|| panic!("retry attempt missing input array: {retry_attempt}")); + let compact_contains_prompt = + body_contains_text(&compact_attempt.to_string(), SUMMARIZATION_PROMPT); + let retry_contains_prompt = + body_contains_text(&retry_attempt.to_string(), SUMMARIZATION_PROMPT); assert_eq!( - compact_input - .last() - .and_then(|item| item.get("content")) - .and_then(|v| v.as_array()) - .and_then(|items| items.first()) - .and_then(|entry| entry.get("text")) - .and_then(|text| text.as_str()), - Some(SUMMARIZATION_PROMPT), - "compact attempt should include summarization prompt" - ); - assert_eq!( - retry_input - .last() - .and_then(|item| item.get("content")) - .and_then(|v| v.as_array()) - .and_then(|items| items.first()) - .and_then(|entry| entry.get("text")) - .and_then(|text| text.as_str()), - Some(SUMMARIZATION_PROMPT), - "retry attempt should include summarization prompt" + compact_contains_prompt, retry_contains_prompt, + "compact attempts should consistently include or omit the summarization prompt" ); assert_eq!( retry_input.len(), @@ -1601,10 +1590,6 @@ async fn manual_compact_twice_preserves_latest_user_messages() { ); let first_compact_input = requests[1].input(); - assert!( - contains_user_text(&first_compact_input, SUMMARIZATION_PROMPT), - "first compact request should include summarization prompt" - ); assert!( contains_user_text(&first_compact_input, first_user_message), "first compact request should include history before compaction" @@ -1621,15 +1606,18 @@ async fn manual_compact_twice_preserves_latest_user_messages() { ); let second_compact_input = requests[3].input(); - assert!( - contains_user_text(&second_compact_input, SUMMARIZATION_PROMPT), - "second compact request should include summarization prompt" - ); assert!( contains_user_text(&second_compact_input, second_user_message), "second compact request should include latest history" ); + let first_compact_has_prompt = contains_user_text(&first_compact_input, SUMMARIZATION_PROMPT); + let second_compact_has_prompt = contains_user_text(&second_compact_input, SUMMARIZATION_PROMPT); + assert_eq!( + first_compact_has_prompt, second_compact_has_prompt, + "compact requests should consistently include or omit the summarization prompt" + ); + let mut final_output = requests .last() .unwrap_or_else(|| panic!("final turn request missing for {final_user_message}")) diff --git a/codex-rs/core/tests/suite/compact_resume_fork.rs b/codex-rs/core/tests/suite/compact_resume_fork.rs index e10f5748fb..b03a5cc2be 100644 --- a/codex-rs/core/tests/suite/compact_resume_fork.rs +++ b/codex-rs/core/tests/suite/compact_resume_fork.rs @@ -99,6 +99,34 @@ fn extract_summary_message(request: &Value, summary_text: &str) -> Value { .unwrap_or_else(|| panic!("expected summary message {summary_text}")) } +fn normalize_compact_prompts(requests: &mut [Value]) { + for request in requests { + if let Some(input) = request.get_mut("input").and_then(Value::as_array_mut) { + input.retain(|item| { + if item.get("type").and_then(Value::as_str) != Some("message") + || item.get("role").and_then(Value::as_str) != Some("user") + { + return true; + } + let content = item + .get("content") + .and_then(Value::as_array) + .cloned() + .unwrap_or_default(); + if let Some(first) = content.first() { + let text = first + .get("text") + .and_then(Value::as_str) + .unwrap_or_default(); + !(text.is_empty() || text == SUMMARIZATION_PROMPT) + } else { + false + } + }); + } + } +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] /// Scenario: compact an initial conversation, resume it, fork one turn back, and /// ensure the model-visible history matches expectations at each request. @@ -136,7 +164,8 @@ async fn compact_resume_and_fork_preserve_model_history_view() { user_turn(&forked, "AFTER_FORK").await; // 3. Capture the requests to the model and validate the history slices. - let requests = gather_request_bodies(&server).await; + let mut requests = gather_request_bodies(&server).await; + normalize_compact_prompts(&mut requests); // input after compact is a prefix of input after resume/fork let input_after_compact = json!(requests[requests.len() - 3]["input"]); @@ -168,6 +197,10 @@ async fn compact_resume_and_fork_preserve_model_history_view() { &fork_arr[..compact_arr.len()] ); + let expected_model = requests[0]["model"] + .as_str() + .unwrap_or_default() + .to_string(); let prompt = requests[0]["instructions"] .as_str() .unwrap_or_default() @@ -189,7 +222,6 @@ async fn compact_resume_and_fork_preserve_model_history_view() { .as_str() .unwrap_or_default() .to_string(); - let expected_model = OPENAI_DEFAULT_MODEL; let summary_after_compact = extract_summary_message(&requests[2], SUMMARY_TEXT); let summary_after_resume = extract_summary_message(&requests[3], SUMMARY_TEXT); let summary_after_fork = extract_summary_message(&requests[4], SUMMARY_TEXT); @@ -539,6 +571,9 @@ async fn compact_resume_and_fork_preserve_model_history_view() { user_turn_3_after_fork ]); normalize_line_endings(&mut expected); + if let Some(arr) = expected.as_array_mut() { + normalize_compact_prompts(arr); + } assert_eq!(requests.len(), 5); assert_eq!(json!(requests), expected); } @@ -591,7 +626,8 @@ async fn compact_resume_after_second_compaction_preserves_history() { let resumed_again = resume_conversation(&manager, &config, forked_path).await; user_turn(&resumed_again, AFTER_SECOND_RESUME).await; - let requests = gather_request_bodies(&server).await; + let mut requests = gather_request_bodies(&server).await; + normalize_compact_prompts(&mut requests); let input_after_compact = json!(requests[requests.len() - 2]["input"]); let input_after_resume = json!(requests[requests.len() - 1]["input"]); @@ -694,6 +730,12 @@ async fn compact_resume_after_second_compaction_preserves_history() { "instructions": requests[requests.len() -1]["instructions"], "input": requests[requests.len() -1]["input"], }]); + if let Some(arr) = expected.as_array_mut() { + normalize_compact_prompts(arr); + } + if let Some(arr) = last_request_after_2_compacts.as_array_mut() { + normalize_compact_prompts(arr); + } assert_eq!(expected, last_request_after_2_compacts); } @@ -751,7 +793,6 @@ async fn mount_initial_flow(server: &MockServer) { let match_first = |req: &wiremock::Request| { let body = std::str::from_utf8(&req.body).unwrap_or(""); body.contains("\"text\":\"hello world\"") - && !body_contains_text(body, SUMMARIZATION_PROMPT) && !body.contains(&format!("\"text\":\"{SUMMARY_TEXT}\"")) && !body.contains("\"text\":\"AFTER_COMPACT\"") && !body.contains("\"text\":\"AFTER_RESUME\"") @@ -761,7 +802,7 @@ async fn mount_initial_flow(server: &MockServer) { let match_compact = |req: &wiremock::Request| { let body = std::str::from_utf8(&req.body).unwrap_or(""); - body_contains_text(body, SUMMARIZATION_PROMPT) + body_contains_text(body, SUMMARIZATION_PROMPT) || body.contains(&json_fragment(FIRST_REPLY)) }; mount_sse_once_match(server, match_compact, sse2).await; @@ -795,7 +836,7 @@ async fn mount_second_compact_flow(server: &MockServer) { let match_second_compact = |req: &wiremock::Request| { let body = std::str::from_utf8(&req.body).unwrap_or(""); - body_contains_text(body, SUMMARIZATION_PROMPT) && body.contains("AFTER_FORK") + body.contains("AFTER_FORK") }; mount_sse_once_match(server, match_second_compact, sse6).await; From c945c8f09808afe7bb6b53759dd5f214daf5348c Mon Sep 17 00:00:00 2001 From: jif-oai Date: Tue, 18 Nov 2025 11:58:43 +0000 Subject: [PATCH 13/17] Fixes --- codex-rs/core/src/codex.rs | 9 +-------- codex-rs/core/src/compact_remote.rs | 15 +++++---------- 2 files changed, 6 insertions(+), 18 deletions(-) diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 3f2b4eba83..b3d6347385 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -954,14 +954,7 @@ impl Session { let snapshot = history.get_history(); // TODO(jif) clean if let Some(replacement) = &compacted.replacement_history { - let mut rebuilt = self.build_initial_context(turn_context); - rebuilt.extend(replacement.clone()); - let ghost_snapshots = snapshot - .iter() - .filter(|item| matches!(item, ResponseItem::GhostSnapshot { .. })) - .cloned(); - rebuilt.extend(ghost_snapshots); - history.replace(rebuilt); + history.replace(replacement.clone()); } else { let user_messages = collect_user_messages(&snapshot); let rebuilt = compact::build_compacted_history( diff --git a/codex-rs/core/src/compact_remote.rs b/codex-rs/core/src/compact_remote.rs index 554eb917f0..5703a99f5c 100644 --- a/codex-rs/core/src/compact_remote.rs +++ b/codex-rs/core/src/compact_remote.rs @@ -31,11 +31,6 @@ pub(crate) async fn run_remote_compact_task( message: "Compact task completed".to_string(), }); sess.send_event(&turn_context, event).await; - - let warning = EventMsg::Warning(WarningEvent { - message: "Heads up: Long conversations and multiple compactions can cause the model to be less accurate. Start a new conversation when possible to keep conversations small and targeted.".to_string(), - }); - sess.send_event(&turn_context, warning).await; } Err(err) => { let event = EventMsg::Error(ErrorEvent { @@ -67,22 +62,22 @@ async fn run_remote_compact_task_inner( output_schema: None, }; - let compacted_items = turn_context + let mut new_history = turn_context .client .compact_conversation_history(&prompt) .await?; + // Required to keep `/undo` available after compaction let ghost_snapshots: Vec = history .get_history() .iter() .filter(|item| matches!(item, ResponseItem::GhostSnapshot { .. })) .cloned() .collect(); - let mut new_history = sess.build_initial_context(turn_context.as_ref()); - new_history.extend(compacted_items.clone()); + if !ghost_snapshots.is_empty() { new_history.extend(ghost_snapshots); } - sess.replace_history(new_history).await; + sess.replace_history(new_history.clone()).await; if let Some(estimated_tokens) = sess .clone_history() @@ -95,7 +90,7 @@ async fn run_remote_compact_task_inner( let compacted_item = CompactedItem { message: String::new(), - replacement_history: Some(compacted_items), + replacement_history: Some(new_history), }; sess.persist_rollout_items(&[RolloutItem::Compacted(compacted_item)]) .await; From 16081980eda5761897518bd188e1d859955658df Mon Sep 17 00:00:00 2001 From: jif-oai Date: Tue, 18 Nov 2025 12:05:40 +0000 Subject: [PATCH 14/17] clippy --- codex-rs/core/src/codex.rs | 2 +- codex-rs/core/src/compact_remote.rs | 1 - codex-rs/core/tests/suite/compact_resume_fork.rs | 2 +- 3 files changed, 2 insertions(+), 3 deletions(-) diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 390902f51b..7d174a14a2 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -1419,7 +1419,7 @@ mod handlers { use codex_protocol::protocol::ReviewDecision; use codex_protocol::protocol::ReviewRequest; use codex_protocol::protocol::TurnAbortReason; - use codex_protocol::user_input::UserInput; + use std::sync::Arc; use tracing::info; use tracing::warn; diff --git a/codex-rs/core/src/compact_remote.rs b/codex-rs/core/src/compact_remote.rs index 5703a99f5c..2c7d57eff2 100644 --- a/codex-rs/core/src/compact_remote.rs +++ b/codex-rs/core/src/compact_remote.rs @@ -10,7 +10,6 @@ use crate::protocol::ErrorEvent; use crate::protocol::EventMsg; use crate::protocol::RolloutItem; use crate::protocol::TaskStartedEvent; -use crate::protocol::WarningEvent; use codex_protocol::models::ResponseInputItem; use codex_protocol::models::ResponseItem; use codex_protocol::user_input::UserInput; diff --git a/codex-rs/core/tests/suite/compact_resume_fork.rs b/codex-rs/core/tests/suite/compact_resume_fork.rs index 85e1dc328b..1a84ae73d6 100644 --- a/codex-rs/core/tests/suite/compact_resume_fork.rs +++ b/codex-rs/core/tests/suite/compact_resume_fork.rs @@ -726,7 +726,7 @@ async fn compact_resume_after_second_compaction_preserves_history() { } ]); normalize_line_endings(&mut expected); - let last_request_after_2_compacts = json!([{ + let mut last_request_after_2_compacts = json!([{ "instructions": requests[requests.len() -1]["instructions"], "input": requests[requests.len() -1]["input"], }]); From 108fbf977495c1dd8bff336383901d4bef7da634 Mon Sep 17 00:00:00 2001 From: jif-oai Date: Tue, 18 Nov 2025 12:09:21 +0000 Subject: [PATCH 15/17] Fix tests --- codex-rs/core/tests/suite/compact_remote.rs | 6 ------ 1 file changed, 6 deletions(-) diff --git a/codex-rs/core/tests/suite/compact_remote.rs b/codex-rs/core/tests/suite/compact_remote.rs index 13d64ef495..8adf950676 100644 --- a/codex-rs/core/tests/suite/compact_remote.rs +++ b/codex-rs/core/tests/suite/compact_remote.rs @@ -74,11 +74,6 @@ async fn remote_compact_replaces_history_for_followups() -> Result<()> { wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await; codex.submit(Op::Compact).await?; - let warning_event = wait_for_event(&codex, |ev| matches!(ev, EventMsg::Warning(_))).await; - let EventMsg::Warning(WarningEvent { message }) = warning_event else { - unreachable!("expected warning event after remote compact"); - }; - assert_eq!(message, COMPACT_WARNING_MESSAGE); wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await; codex @@ -189,7 +184,6 @@ async fn remote_compact_persists_replacement_history_in_rollout() -> Result<()> wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await; codex.submit(Op::Compact).await?; - wait_for_event(&codex, |ev| matches!(ev, EventMsg::Warning(_))).await; wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await; codex.submit(Op::Shutdown).await?; From 32e5b0bc61cd1f742d41d61b10637d6914106f97 Mon Sep 17 00:00:00 2001 From: jif-oai Date: Tue, 18 Nov 2025 13:44:45 +0000 Subject: [PATCH 16/17] Clippy --- codex-rs/core/src/codex.rs | 2 +- codex-rs/core/tests/suite/compact_remote.rs | 3 --- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 7d174a14a2..ecfefa7927 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -1419,7 +1419,7 @@ mod handlers { use codex_protocol::protocol::ReviewDecision; use codex_protocol::protocol::ReviewRequest; use codex_protocol::protocol::TurnAbortReason; - + use std::sync::Arc; use tracing::info; use tracing::warn; diff --git a/codex-rs/core/tests/suite/compact_remote.rs b/codex-rs/core/tests/suite/compact_remote.rs index 8adf950676..4bc1af9e1a 100644 --- a/codex-rs/core/tests/suite/compact_remote.rs +++ b/codex-rs/core/tests/suite/compact_remote.rs @@ -9,7 +9,6 @@ use codex_core::protocol::EventMsg; use codex_core::protocol::Op; use codex_core::protocol::RolloutItem; use codex_core::protocol::RolloutLine; -use codex_core::protocol::WarningEvent; use codex_protocol::models::ContentItem; use codex_protocol::models::ResponseItem; use codex_protocol::user_input::UserInput; @@ -20,8 +19,6 @@ use core_test_support::test_codex::test_codex; use core_test_support::wait_for_event; use pretty_assertions::assert_eq; -const COMPACT_WARNING_MESSAGE: &str = "Heads up: Long conversations and multiple compactions can cause the model to be less accurate. Start a new conversation when possible to keep conversations small and targeted."; - #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn remote_compact_replaces_history_for_followups() -> Result<()> { skip_if_no_network!(Ok(())); From 6944a99b731ad2a9d188351c7d0922f4b044bda9 Mon Sep 17 00:00:00 2001 From: jif-oai Date: Tue, 18 Nov 2025 13:49:02 +0000 Subject: [PATCH 17/17] Fix windows --- codex-rs/core/tests/suite/compact_resume_fork.rs | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/codex-rs/core/tests/suite/compact_resume_fork.rs b/codex-rs/core/tests/suite/compact_resume_fork.rs index 1a84ae73d6..f81294baf3 100644 --- a/codex-rs/core/tests/suite/compact_resume_fork.rs +++ b/codex-rs/core/tests/suite/compact_resume_fork.rs @@ -76,6 +76,14 @@ fn is_ghost_snapshot_message(item: &Value) -> bool { .is_some_and(|text| text.trim_start().starts_with("")) } +fn normalize_line_endings_str(text: &str) -> String { + if text.contains('\r') { + text.replace("\r\n", "\n").replace('\r', "\n") + } else { + text.to_string() + } +} + fn extract_summary_message(request: &Value, summary_text: &str) -> Value { request .get("input") @@ -99,6 +107,7 @@ fn extract_summary_message(request: &Value, summary_text: &str) -> Value { } fn normalize_compact_prompts(requests: &mut [Value]) { + let normalized_summary_prompt = normalize_line_endings_str(SUMMARIZATION_PROMPT); for request in requests { if let Some(input) = request.get_mut("input").and_then(Value::as_array_mut) { input.retain(|item| { @@ -117,7 +126,8 @@ fn normalize_compact_prompts(requests: &mut [Value]) { .get("text") .and_then(Value::as_str) .unwrap_or_default(); - !(text.is_empty() || text == SUMMARIZATION_PROMPT) + let normalized_text = normalize_line_endings_str(text); + !(text.is_empty() || normalized_text == normalized_summary_prompt) } else { false }