From 3c14267ac0aecae6c305f845cada0f1088d43344 Mon Sep 17 00:00:00 2001 From: pakrym-oai Date: Tue, 18 Nov 2025 22:24:12 -0800 Subject: [PATCH 1/5] Run remote auto compaction --- codex-rs/core/src/codex.rs | 13 ++++ codex-rs/core/src/compact.rs | 15 ++++- codex-rs/core/src/compact_remote.rs | 15 +++-- codex-rs/core/src/tasks/compact.rs | 12 +--- codex-rs/core/tests/common/responses.rs | 7 +++ codex-rs/core/tests/suite/compact_remote.rs | 66 +++++++++++++++++++++ 6 files changed, 113 insertions(+), 15 deletions(-) diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 7cf90ad1e342..c5e679c30d65 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -1890,6 +1890,19 @@ pub(crate) async fn run_task( // as long as compaction works well in getting us way below the token limit, we shouldn't worry about being in an infinite loop. if token_limit_reached { + if crate::compact::should_use_remote_compact_task(&sess).await { + crate::compact_remote::run_inline_remote_auto_compact_task( + sess.clone(), + turn_context.clone(), + ) + .await; + } else { + crate::compact::run_inline_auto_compact_task( + sess.clone(), + turn_context.clone(), + ) + .await; + } compact::run_inline_auto_compact_task(sess.clone(), turn_context.clone()).await; continue; } diff --git a/codex-rs/core/src/compact.rs b/codex-rs/core/src/compact.rs index 33d38091f6fa..b5ece9089206 100644 --- a/codex-rs/core/src/compact.rs +++ b/codex-rs/core/src/compact.rs @@ -7,6 +7,7 @@ use crate::codex::TurnContext; use crate::codex::get_last_assistant_message_from_turn; use crate::error::CodexErr; use crate::error::Result as CodexResult; +use crate::features::Feature; use crate::protocol::AgentMessageEvent; use crate::protocol::CompactedItem; use crate::protocol::ErrorEvent; @@ -18,6 +19,7 @@ use crate::truncate::TruncationPolicy; use crate::truncate::approx_token_count; use crate::truncate::truncate_text; use crate::util::backoff; +use codex_app_server_protocol::AuthMode; use codex_protocol::items::TurnItem; use codex_protocol::models::ContentItem; use codex_protocol::models::ResponseInputItem; @@ -31,12 +33,22 @@ pub const SUMMARIZATION_PROMPT: &str = include_str!("../templates/compact/prompt pub const SUMMARY_PREFIX: &str = include_str!("../templates/compact/summary_prefix.md"); const COMPACT_USER_MESSAGE_MAX_TOKENS: usize = 20_000; +pub(crate) async fn should_use_remote_compact_task(session: &Session) -> bool { + session + .services + .auth_manager + .auth() + .is_some_and(|auth| auth.mode == AuthMode::ChatGPT) + && session.enabled(Feature::RemoteCompaction).await +} + pub(crate) async fn run_inline_auto_compact_task( sess: Arc, turn_context: Arc, ) { let prompt = turn_context.compact_prompt().to_string(); let input = vec![UserInput::Text { text: prompt }]; + run_compact_task_inner(sess, turn_context, input).await; } @@ -44,13 +56,12 @@ pub(crate) async fn run_compact_task( sess: Arc, turn_context: Arc, input: Vec, -) -> Option { +) { let start_event = EventMsg::TaskStarted(TaskStartedEvent { model_context_window: turn_context.client.get_model_context_window(), }); sess.send_event(&turn_context, start_event).await; run_compact_task_inner(sess.clone(), turn_context, input).await; - None } async fn run_compact_task_inner( diff --git a/codex-rs/core/src/compact_remote.rs b/codex-rs/core/src/compact_remote.rs index 75b81d408c1d..582644173961 100644 --- a/codex-rs/core/src/compact_remote.rs +++ b/codex-rs/core/src/compact_remote.rs @@ -12,10 +12,19 @@ use crate::protocol::RolloutItem; use crate::protocol::TaskStartedEvent; use codex_protocol::models::ResponseItem; -pub(crate) async fn run_remote_compact_task( +pub(crate) async fn run_inline_remote_auto_compact_task( sess: Arc, turn_context: Arc, -) -> Option { +) { + if let Err(err) = run_remote_compact_task_inner(&sess, &turn_context).await { + let event = EventMsg::Error(ErrorEvent { + message: format!("Error running remote compact task: {err}"), + }); + sess.send_event(&turn_context, event).await; + } +} + +pub(crate) async fn run_remote_compact_task(sess: Arc, turn_context: Arc) { let start_event = EventMsg::TaskStarted(TaskStartedEvent { model_context_window: turn_context.client.get_model_context_window(), }); @@ -35,8 +44,6 @@ pub(crate) async fn run_remote_compact_task( sess.send_event(&turn_context, event).await; } } - - None } async fn run_remote_compact_task_inner( diff --git a/codex-rs/core/src/tasks/compact.rs b/codex-rs/core/src/tasks/compact.rs index 4f161267a879..893c0c476a10 100644 --- a/codex-rs/core/src/tasks/compact.rs +++ b/codex-rs/core/src/tasks/compact.rs @@ -3,10 +3,8 @@ use std::sync::Arc; use super::SessionTask; use super::SessionTaskContext; use crate::codex::TurnContext; -use crate::features::Feature; use crate::state::TaskKind; use async_trait::async_trait; -use codex_app_server_protocol::AuthMode; use codex_protocol::user_input::UserInput; use tokio_util::sync::CancellationToken; @@ -27,16 +25,12 @@ impl SessionTask for CompactTask { _cancellation_token: CancellationToken, ) -> Option { let session = session.clone_session(); - if session - .services - .auth_manager - .auth() - .is_some_and(|auth| auth.mode == AuthMode::ChatGPT) - && session.enabled(Feature::RemoteCompaction).await - { + if crate::compact::should_use_remote_compact_task(&session).await { crate::compact_remote::run_remote_compact_task(session, ctx).await } else { crate::compact::run_compact_task(session, ctx, input).await } + + None } } diff --git a/codex-rs/core/tests/common/responses.rs b/codex-rs/core/tests/common/responses.rs index 3ebb28355b12..7c5a103f424d 100644 --- a/codex-rs/core/tests/common/responses.rs +++ b/codex-rs/core/tests/common/responses.rs @@ -460,6 +460,13 @@ pub fn ev_apply_patch_function_call(call_id: &str, patch: &str) -> Value { }) } +pub fn ev_shell_command_call(call_id: &str, command: &str) -> Value { + let args = serde_json::json!({ "command": command }); + let arguments = serde_json::to_string(&args).expect("serialize shell arguments"); + + ev_function_call(call_id, "shell_command", &arguments) +} + pub fn ev_apply_patch_shell_call(call_id: &str, patch: &str) -> Value { let args = serde_json::json!({ "command": ["apply_patch", patch] }); let arguments = serde_json::to_string(&args).expect("serialize apply_patch arguments"); diff --git a/codex-rs/core/tests/suite/compact_remote.rs b/codex-rs/core/tests/suite/compact_remote.rs index 4bc1af9e1a5a..883a1772910c 100644 --- a/codex-rs/core/tests/suite/compact_remote.rs +++ b/codex-rs/core/tests/suite/compact_remote.rs @@ -13,6 +13,9 @@ use codex_protocol::models::ContentItem; use codex_protocol::models::ResponseItem; use codex_protocol::user_input::UserInput; use core_test_support::responses; +use core_test_support::responses::mount_sse_once; +use core_test_support::responses::mount_sse_once_match; +use core_test_support::responses::sse; use core_test_support::skip_if_no_network; use core_test_support::test_codex::TestCodexHarness; use core_test_support::test_codex::test_codex; @@ -125,6 +128,69 @@ async fn remote_compact_replaces_history_for_followups() -> Result<()> { Ok(()) } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn remote_compact_runs_automatically() -> Result<()> { + skip_if_no_network!(Ok(())); + + let harness = TestCodexHarness::with_builder( + test_codex() + .with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing()) + .with_config(|config| { + config.features.enable(Feature::RemoteCompaction); + }), + ) + .await?; + let codex = harness.test().codex.clone(); + + mount_sse_once( + harness.server(), + sse(vec![ + responses::ev_shell_command_call("m1", "echo 'hi'"), + responses::ev_completed_with_tokens("resp-1", 100000000), // over token limit + ]), + ) + .await; + let responses_mock = mount_sse_once( + harness.server(), + responses::sse(vec![ + responses::ev_assistant_message("m2", "AFTER_COMPACT_REPLY"), + responses::ev_completed("resp-2"), + ]), + ) + .await; + + let compacted_history = vec![ResponseItem::Message { + id: None, + role: "user".to_string(), + content: vec![ContentItem::InputText { + text: "REMOTE_COMPACTED_SUMMARY".to_string(), + }], + }]; + let compact_mock = responses::mount_compact_json_once( + harness.server(), + serde_json::json!({ "output": compacted_history.clone() }), + ) + .await; + + codex + .submit(Op::UserInput { + items: vec![UserInput::Text { + text: "hello remote compact".into(), + }], + }) + .await?; + wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await; + + assert_eq!(compact_mock.requests().len(), 1); + let follow_up_body = responses_mock.single_request().body_json().to_string(); + assert!( + follow_up_body.contains("REMOTE_COMPACTED_SUMMARY"), + "expected follow-up request to use compacted history" + ); + + Ok(()) +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn remote_compact_persists_replacement_history_in_rollout() -> Result<()> { skip_if_no_network!(Ok(())); From 7909bd42be2d7eeff01dee9ece37ea80f68d4166 Mon Sep 17 00:00:00 2001 From: pakrym-oai Date: Tue, 18 Nov 2025 23:15:26 -0800 Subject: [PATCH 2/5] compact: remove unused auto compact call and test helper import --- codex-rs/core/src/codex.rs | 1 - codex-rs/core/tests/suite/compact_remote.rs | 1 - 2 files changed, 2 deletions(-) diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index c5e679c30d65..200d1fe0aa79 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -1903,7 +1903,6 @@ pub(crate) async fn run_task( ) .await; } - compact::run_inline_auto_compact_task(sess.clone(), turn_context.clone()).await; continue; } diff --git a/codex-rs/core/tests/suite/compact_remote.rs b/codex-rs/core/tests/suite/compact_remote.rs index 883a1772910c..28a8a65e41ba 100644 --- a/codex-rs/core/tests/suite/compact_remote.rs +++ b/codex-rs/core/tests/suite/compact_remote.rs @@ -14,7 +14,6 @@ use codex_protocol::models::ResponseItem; use codex_protocol::user_input::UserInput; use core_test_support::responses; use core_test_support::responses::mount_sse_once; -use core_test_support::responses::mount_sse_once_match; use core_test_support::responses::sse; use core_test_support::skip_if_no_network; use core_test_support::test_codex::TestCodexHarness; From 12ba9c50807b5985542ed14238249df02e8f7619 Mon Sep 17 00:00:00 2001 From: pakrym-oai Date: Wed, 19 Nov 2025 00:07:08 -0800 Subject: [PATCH 3/5] compact_remote: centralize status events and simplify calls Inline compact tasks no longer emit events directly; instead `run_remote_compact_task_inner` now wraps an impl function to standardize success/error events. Update callers to use imported helpers and adjust tests to assert the completion message. --- codex-rs/core/src/codex.rs | 9 +++-- codex-rs/core/src/compact_remote.rs | 40 ++++++++++----------- codex-rs/core/tests/suite/compact_remote.rs | 10 ++++-- 3 files changed, 34 insertions(+), 25 deletions(-) diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 200d1fe0aa79..1eff9c2c60d2 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -7,6 +7,9 @@ use std::sync::atomic::AtomicU64; use crate::AuthManager; use crate::client_common::REVIEW_PROMPT; use crate::compact; +use crate::compact::run_inline_auto_compact_task; +use crate::compact::should_use_remote_compact_task; +use crate::compact_remote::run_inline_remote_auto_compact_task; use crate::features::Feature; use crate::function_tool::FunctionCallError; use crate::parse_command::parse_command; @@ -1890,14 +1893,14 @@ pub(crate) async fn run_task( // as long as compaction works well in getting us way below the token limit, we shouldn't worry about being in an infinite loop. if token_limit_reached { - if crate::compact::should_use_remote_compact_task(&sess).await { - crate::compact_remote::run_inline_remote_auto_compact_task( + if should_use_remote_compact_task(&sess).await { + run_inline_remote_auto_compact_task( sess.clone(), turn_context.clone(), ) .await; } else { - crate::compact::run_inline_auto_compact_task( + run_inline_auto_compact_task( sess.clone(), turn_context.clone(), ) diff --git a/codex-rs/core/src/compact_remote.rs b/codex-rs/core/src/compact_remote.rs index 582644173961..c5fcdda40cbe 100644 --- a/codex-rs/core/src/compact_remote.rs +++ b/codex-rs/core/src/compact_remote.rs @@ -16,12 +16,7 @@ pub(crate) async fn run_inline_remote_auto_compact_task( sess: Arc, turn_context: Arc, ) { - if let Err(err) = run_remote_compact_task_inner(&sess, &turn_context).await { - let event = EventMsg::Error(ErrorEvent { - message: format!("Error running remote compact task: {err}"), - }); - sess.send_event(&turn_context, event).await; - } + run_remote_compact_task_inner(&sess, &turn_context).await; } pub(crate) async fn run_remote_compact_task(sess: Arc, turn_context: Arc) { @@ -30,25 +25,24 @@ pub(crate) async fn run_remote_compact_task(sess: Arc, turn_context: Ar }); sess.send_event(&turn_context, start_event).await; - match run_remote_compact_task_inner(&sess, &turn_context).await { - Ok(()) => { - let event = EventMsg::AgentMessage(AgentMessageEvent { - message: "Compact task completed".to_string(), - }); - sess.send_event(&turn_context, event).await; - } - Err(err) => { - let event = EventMsg::Error(ErrorEvent { - message: err.to_string(), - }); - sess.send_event(&turn_context, event).await; - } - } + run_remote_compact_task_inner(&sess, &turn_context).await; } async fn run_remote_compact_task_inner( sess: &Arc, turn_context: &Arc, +) { + if let Err(err) = run_remote_compact_task_inner_impl(&sess, &turn_context).await { + let event = EventMsg::Error(ErrorEvent { + message: format!("Error running remote compact task: {err}"), + }); + sess.send_event(&turn_context, event).await; + } +} + +async fn run_remote_compact_task_inner_impl( + sess: &Arc, + turn_context: &Arc, ) -> CodexResult<()> { let mut history = sess.clone_history().await; let prompt = Prompt { @@ -91,5 +85,11 @@ async fn run_remote_compact_task_inner( }; sess.persist_rollout_items(&[RolloutItem::Compacted(compacted_item)]) .await; + + let event = EventMsg::AgentMessage(AgentMessageEvent { + message: "Compact task completed".to_string(), + }); + sess.send_event(&turn_context, event).await; + Ok(()) } diff --git a/codex-rs/core/tests/suite/compact_remote.rs b/codex-rs/core/tests/suite/compact_remote.rs index 28a8a65e41ba..e4a72e673c2c 100644 --- a/codex-rs/core/tests/suite/compact_remote.rs +++ b/codex-rs/core/tests/suite/compact_remote.rs @@ -19,6 +19,7 @@ use core_test_support::skip_if_no_network; use core_test_support::test_codex::TestCodexHarness; use core_test_support::test_codex::test_codex; use core_test_support::wait_for_event; +use core_test_support::wait_for_event_match; use pretty_assertions::assert_eq; #[tokio::test(flavor = "multi_thread", worker_threads = 2)] @@ -178,13 +179,18 @@ async fn remote_compact_runs_automatically() -> Result<()> { }], }) .await?; + let message = wait_for_event_match(&codex, |ev| match ev { + EventMsg::AgentMessage(ev) => Some(ev.message.clone()), + _ => None, + }) + .await; wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await; + assert_eq!(message, "Compact task completed"); assert_eq!(compact_mock.requests().len(), 1); let follow_up_body = responses_mock.single_request().body_json().to_string(); assert!( - follow_up_body.contains("REMOTE_COMPACTED_SUMMARY"), - "expected follow-up request to use compacted history" + follow_up_body.contains("REMOTE_COMPACTED_SUMMARY") ); Ok(()) From 731139d8e8dda4c824052d4cc03e15b691325237 Mon Sep 17 00:00:00 2001 From: pakrym-oai Date: Wed, 19 Nov 2025 00:13:22 -0800 Subject: [PATCH 4/5] compact_remote: remove redundant reference passing to sess methods --- codex-rs/core/src/compact_remote.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/codex-rs/core/src/compact_remote.rs b/codex-rs/core/src/compact_remote.rs index c5fcdda40cbe..a0ffb9f16816 100644 --- a/codex-rs/core/src/compact_remote.rs +++ b/codex-rs/core/src/compact_remote.rs @@ -32,11 +32,11 @@ async fn run_remote_compact_task_inner( sess: &Arc, turn_context: &Arc, ) { - if let Err(err) = run_remote_compact_task_inner_impl(&sess, &turn_context).await { + if let Err(err) = run_remote_compact_task_inner_impl(sess, turn_context).await { let event = EventMsg::Error(ErrorEvent { message: format!("Error running remote compact task: {err}"), }); - sess.send_event(&turn_context, event).await; + sess.send_event(turn_context, event).await; } } @@ -89,7 +89,7 @@ async fn run_remote_compact_task_inner_impl( let event = EventMsg::AgentMessage(AgentMessageEvent { message: "Compact task completed".to_string(), }); - sess.send_event(&turn_context, event).await; + sess.send_event(turn_context, event).await; Ok(()) } From 8f4f9edb74ca4829b886b4cff69e63db82137b30 Mon Sep 17 00:00:00 2001 From: pakrym-oai Date: Wed, 19 Nov 2025 00:17:06 -0800 Subject: [PATCH 5/5] core: clean up formatting in remote compact logic and tests --- codex-rs/core/src/codex.rs | 13 +++---------- codex-rs/core/src/compact_remote.rs | 7 ++----- codex-rs/core/tests/suite/compact_remote.rs | 4 +--- 3 files changed, 6 insertions(+), 18 deletions(-) diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 1eff9c2c60d2..5156ed645328 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -1894,17 +1894,10 @@ pub(crate) async fn run_task( // as long as compaction works well in getting us way below the token limit, we shouldn't worry about being in an infinite loop. if token_limit_reached { if should_use_remote_compact_task(&sess).await { - run_inline_remote_auto_compact_task( - sess.clone(), - turn_context.clone(), - ) - .await; + run_inline_remote_auto_compact_task(sess.clone(), turn_context.clone()) + .await; } else { - run_inline_auto_compact_task( - sess.clone(), - turn_context.clone(), - ) - .await; + run_inline_auto_compact_task(sess.clone(), turn_context.clone()).await; } continue; } diff --git a/codex-rs/core/src/compact_remote.rs b/codex-rs/core/src/compact_remote.rs index a0ffb9f16816..51c35baf37f0 100644 --- a/codex-rs/core/src/compact_remote.rs +++ b/codex-rs/core/src/compact_remote.rs @@ -28,10 +28,7 @@ pub(crate) async fn run_remote_compact_task(sess: Arc, turn_context: Ar run_remote_compact_task_inner(&sess, &turn_context).await; } -async fn run_remote_compact_task_inner( - sess: &Arc, - turn_context: &Arc, -) { +async fn run_remote_compact_task_inner(sess: &Arc, turn_context: &Arc) { if let Err(err) = run_remote_compact_task_inner_impl(sess, turn_context).await { let event = EventMsg::Error(ErrorEvent { message: format!("Error running remote compact task: {err}"), @@ -90,6 +87,6 @@ async fn run_remote_compact_task_inner_impl( message: "Compact task completed".to_string(), }); sess.send_event(turn_context, event).await; - + Ok(()) } diff --git a/codex-rs/core/tests/suite/compact_remote.rs b/codex-rs/core/tests/suite/compact_remote.rs index e4a72e673c2c..dc88bc574760 100644 --- a/codex-rs/core/tests/suite/compact_remote.rs +++ b/codex-rs/core/tests/suite/compact_remote.rs @@ -189,9 +189,7 @@ async fn remote_compact_runs_automatically() -> Result<()> { assert_eq!(message, "Compact task completed"); assert_eq!(compact_mock.requests().len(), 1); let follow_up_body = responses_mock.single_request().body_json().to_string(); - assert!( - follow_up_body.contains("REMOTE_COMPACTED_SUMMARY") - ); + assert!(follow_up_body.contains("REMOTE_COMPACTED_SUMMARY")); Ok(()) }