Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion codex-rs/core/src/codex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ use std::sync::atomic::AtomicU64;
use crate::AuthManager;
use crate::client_common::REVIEW_PROMPT;
use crate::compact;
use crate::compact::run_inline_auto_compact_task;
use crate::compact::should_use_remote_compact_task;
use crate::compact_remote::run_inline_remote_auto_compact_task;
use crate::features::Feature;
use crate::function_tool::FunctionCallError;
use crate::parse_command::parse_command;
Expand Down Expand Up @@ -1890,7 +1893,12 @@ pub(crate) async fn run_task(

// as long as compaction works well in getting us way below the token limit, we shouldn't worry about being in an infinite loop.
if token_limit_reached {
compact::run_inline_auto_compact_task(sess.clone(), turn_context.clone()).await;
if should_use_remote_compact_task(&sess).await {
run_inline_remote_auto_compact_task(sess.clone(), turn_context.clone())
.await;
} else {
run_inline_auto_compact_task(sess.clone(), turn_context.clone()).await;
}
continue;
}

Expand Down
15 changes: 13 additions & 2 deletions codex-rs/core/src/compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::codex::TurnContext;
use crate::codex::get_last_assistant_message_from_turn;
use crate::error::CodexErr;
use crate::error::Result as CodexResult;
use crate::features::Feature;
use crate::protocol::AgentMessageEvent;
use crate::protocol::CompactedItem;
use crate::protocol::ErrorEvent;
Expand All @@ -18,6 +19,7 @@ use crate::truncate::TruncationPolicy;
use crate::truncate::approx_token_count;
use crate::truncate::truncate_text;
use crate::util::backoff;
use codex_app_server_protocol::AuthMode;
use codex_protocol::items::TurnItem;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseInputItem;
Expand All @@ -31,26 +33,35 @@ pub const SUMMARIZATION_PROMPT: &str = include_str!("../templates/compact/prompt
pub const SUMMARY_PREFIX: &str = include_str!("../templates/compact/summary_prefix.md");
const COMPACT_USER_MESSAGE_MAX_TOKENS: usize = 20_000;

pub(crate) async fn should_use_remote_compact_task(session: &Session) -> bool {
session
.services
.auth_manager
.auth()
.is_some_and(|auth| auth.mode == AuthMode::ChatGPT)
&& session.enabled(Feature::RemoteCompaction).await
}

pub(crate) async fn run_inline_auto_compact_task(
sess: Arc<Session>,
turn_context: Arc<TurnContext>,
) {
let prompt = turn_context.compact_prompt().to_string();
let input = vec![UserInput::Text { text: prompt }];

run_compact_task_inner(sess, turn_context, input).await;
}

pub(crate) async fn run_compact_task(
sess: Arc<Session>,
turn_context: Arc<TurnContext>,
input: Vec<UserInput>,
) -> Option<String> {
) {
let start_event = EventMsg::TaskStarted(TaskStartedEvent {
model_context_window: turn_context.client.get_model_context_window(),
});
sess.send_event(&turn_context, start_event).await;
run_compact_task_inner(sess.clone(), turn_context, input).await;
None
}

async fn run_compact_task_inner(
Expand Down
40 changes: 22 additions & 18 deletions codex-rs/core/src/compact_remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,34 +12,32 @@ use crate::protocol::RolloutItem;
use crate::protocol::TaskStartedEvent;
use codex_protocol::models::ResponseItem;

pub(crate) async fn run_remote_compact_task(
pub(crate) async fn run_inline_remote_auto_compact_task(
sess: Arc<Session>,
turn_context: Arc<TurnContext>,
) -> Option<String> {
) {
run_remote_compact_task_inner(&sess, &turn_context).await;
}

pub(crate) async fn run_remote_compact_task(sess: Arc<Session>, turn_context: Arc<TurnContext>) {
let start_event = EventMsg::TaskStarted(TaskStartedEvent {
model_context_window: turn_context.client.get_model_context_window(),
});
sess.send_event(&turn_context, start_event).await;

match run_remote_compact_task_inner(&sess, &turn_context).await {
Ok(()) => {
let event = EventMsg::AgentMessage(AgentMessageEvent {
message: "Compact task completed".to_string(),
});
sess.send_event(&turn_context, event).await;
}
Err(err) => {
let event = EventMsg::Error(ErrorEvent {
message: err.to_string(),
});
sess.send_event(&turn_context, event).await;
}
}
run_remote_compact_task_inner(&sess, &turn_context).await;
}

None
async fn run_remote_compact_task_inner(sess: &Arc<Session>, turn_context: &Arc<TurnContext>) {
if let Err(err) = run_remote_compact_task_inner_impl(sess, turn_context).await {
let event = EventMsg::Error(ErrorEvent {
message: format!("Error running remote compact task: {err}"),
});
sess.send_event(turn_context, event).await;
}
}

async fn run_remote_compact_task_inner(
async fn run_remote_compact_task_inner_impl(
sess: &Arc<Session>,
turn_context: &Arc<TurnContext>,
) -> CodexResult<()> {
Expand Down Expand Up @@ -84,5 +82,11 @@ async fn run_remote_compact_task_inner(
};
sess.persist_rollout_items(&[RolloutItem::Compacted(compacted_item)])
.await;

let event = EventMsg::AgentMessage(AgentMessageEvent {
message: "Compact task completed".to_string(),
});
sess.send_event(turn_context, event).await;

Ok(())
}
12 changes: 3 additions & 9 deletions codex-rs/core/src/tasks/compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,8 @@ use std::sync::Arc;
use super::SessionTask;
use super::SessionTaskContext;
use crate::codex::TurnContext;
use crate::features::Feature;
use crate::state::TaskKind;
use async_trait::async_trait;
use codex_app_server_protocol::AuthMode;
use codex_protocol::user_input::UserInput;
use tokio_util::sync::CancellationToken;

Expand All @@ -27,16 +25,12 @@ impl SessionTask for CompactTask {
_cancellation_token: CancellationToken,
) -> Option<String> {
let session = session.clone_session();
if session
.services
.auth_manager
.auth()
.is_some_and(|auth| auth.mode == AuthMode::ChatGPT)
&& session.enabled(Feature::RemoteCompaction).await
{
if crate::compact::should_use_remote_compact_task(&session).await {
crate::compact_remote::run_remote_compact_task(session, ctx).await
} else {
crate::compact::run_compact_task(session, ctx, input).await
}

None
}
}
7 changes: 7 additions & 0 deletions codex-rs/core/tests/common/responses.rs
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,13 @@ pub fn ev_apply_patch_function_call(call_id: &str, patch: &str) -> Value {
})
}

pub fn ev_shell_command_call(call_id: &str, command: &str) -> Value {
let args = serde_json::json!({ "command": command });
let arguments = serde_json::to_string(&args).expect("serialize shell arguments");

ev_function_call(call_id, "shell_command", &arguments)
}

pub fn ev_apply_patch_shell_call(call_id: &str, patch: &str) -> Value {
let args = serde_json::json!({ "command": ["apply_patch", patch] });
let arguments = serde_json::to_string(&args).expect("serialize apply_patch arguments");
Expand Down
69 changes: 69 additions & 0 deletions codex-rs/core/tests/suite/compact_remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,13 @@ use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::user_input::UserInput;
use core_test_support::responses;
use core_test_support::responses::mount_sse_once;
use core_test_support::responses::sse;
use core_test_support::skip_if_no_network;
use core_test_support::test_codex::TestCodexHarness;
use core_test_support::test_codex::test_codex;
use core_test_support::wait_for_event;
use core_test_support::wait_for_event_match;
use pretty_assertions::assert_eq;

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
Expand Down Expand Up @@ -125,6 +128,72 @@ async fn remote_compact_replaces_history_for_followups() -> Result<()> {
Ok(())
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn remote_compact_runs_automatically() -> Result<()> {
skip_if_no_network!(Ok(()));

let harness = TestCodexHarness::with_builder(
test_codex()
.with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing())
.with_config(|config| {
config.features.enable(Feature::RemoteCompaction);
}),
)
.await?;
let codex = harness.test().codex.clone();

mount_sse_once(
harness.server(),
sse(vec![
responses::ev_shell_command_call("m1", "echo 'hi'"),
responses::ev_completed_with_tokens("resp-1", 100000000), // over token limit
]),
)
.await;
let responses_mock = mount_sse_once(
harness.server(),
responses::sse(vec![
responses::ev_assistant_message("m2", "AFTER_COMPACT_REPLY"),
responses::ev_completed("resp-2"),
]),
)
.await;

let compacted_history = vec![ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: "REMOTE_COMPACTED_SUMMARY".to_string(),
}],
}];
let compact_mock = responses::mount_compact_json_once(
harness.server(),
serde_json::json!({ "output": compacted_history.clone() }),
)
.await;

codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "hello remote compact".into(),
}],
})
.await?;
let message = wait_for_event_match(&codex, |ev| match ev {
EventMsg::AgentMessage(ev) => Some(ev.message.clone()),
_ => None,
})
.await;
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await;

assert_eq!(message, "Compact task completed");
assert_eq!(compact_mock.requests().len(), 1);
let follow_up_body = responses_mock.single_request().body_json().to_string();
assert!(follow_up_body.contains("REMOTE_COMPACTED_SUMMARY"));

Ok(())
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn remote_compact_persists_replacement_history_in_rollout() -> Result<()> {
skip_if_no_network!(Ok(()));
Expand Down
Loading