Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion codex-rs/core/src/chat_completions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ pub(crate) async fn stream_chat_completions(
ResponseItem::CustomToolCallOutput { .. } => {}
ResponseItem::WebSearchCall { .. } => {}
ResponseItem::GhostSnapshot { .. } => {}
ResponseItem::CompactionSummary { .. } => {}
}
}

Expand Down Expand Up @@ -320,7 +321,8 @@ pub(crate) async fn stream_chat_completions(
}
ResponseItem::Reasoning { .. }
| ResponseItem::WebSearchCall { .. }
| ResponseItem::Other => {
| ResponseItem::Other
| ResponseItem::CompactionSummary { .. } => {
// Omit these items from the conversation history.
continue;
}
Expand Down
77 changes: 77 additions & 0 deletions codex-rs/core/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use tokio::sync::mpsc;
use tokio::time::timeout;
use tokio_util::io::ReaderStream;
use tracing::debug;
use tracing::enabled;
use tracing::trace;
use tracing::warn;

Expand Down Expand Up @@ -78,6 +79,18 @@ struct Error {
resets_at: Option<i64>,
}

#[derive(Debug, Serialize)]
struct CompactHistoryRequest<'a> {
model: &'a str,
input: &'a [ResponseItem],
instructions: &'a str,
}

#[derive(Debug, Deserialize)]
struct CompactHistoryResponse {
output: Vec<ResponseItem>,
}

#[derive(Debug, Clone)]
pub struct ModelClient {
config: Arc<Config>,
Expand Down Expand Up @@ -507,6 +520,70 @@ impl ModelClient {
pub fn get_auth_manager(&self) -> Option<Arc<AuthManager>> {
self.auth_manager.clone()
}

pub async fn compact_conversation_history(&self, prompt: &Prompt) -> Result<Vec<ResponseItem>> {
if prompt.input.is_empty() {
return Ok(Vec::new());
}
let auth_manager = self.auth_manager.clone();
let auth = auth_manager.as_ref().and_then(|m| m.auth());
let mut req_builder = self
.provider
.create_compact_request_builder(&self.client, &auth)
.await?;
if let SessionSource::SubAgent(sub) = &self.session_source {
let subagent = if let crate::protocol::SubAgentSource::Other(label) = sub {
label.clone()
} else {
serde_json::to_value(sub)
.ok()
.and_then(|v| v.as_str().map(std::string::ToString::to_string))
.unwrap_or_else(|| "other".to_string())
};
req_builder = req_builder.header("x-openai-subagent", subagent);
}
if let Some(auth) = auth.as_ref()
&& auth.mode == AuthMode::ChatGPT
&& let Some(account_id) = auth.get_account_id()
{
req_builder = req_builder.header("chatgpt-account-id", account_id);
}
Comment on lines +528 to +550
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: share request building

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will refactor this in a dedicated PR. This code is way too messy

let payload = CompactHistoryRequest {
model: &self.config.model,
input: &prompt.input,
instructions: &prompt.get_full_instructions(&self.config.model_family),
};

if enabled!(tracing::Level::TRACE) {
trace!(
"POST to {}: {}",
self.provider
.get_compact_url(&auth)
.unwrap_or("<none>".to_string()),
serde_json::to_value(&payload).unwrap_or_default()
);
}

let response = req_builder
.json(&payload)
.send()
.await
.map_err(|source| CodexErr::ConnectionFailed(ConnectionFailedError { source }))?;
let status = response.status();
let body = response
.text()
.await
.map_err(|source| CodexErr::ConnectionFailed(ConnectionFailedError { source }))?;
if !status.is_success() {
return Err(CodexErr::UnexpectedStatus(UnexpectedResponseError {
status,
body,
request_id: None,
}));
}
let CompactHistoryResponse { output } = serde_json::from_str(&body)?;
Ok(output)
}
}

enum StreamAttemptError {
Expand Down
63 changes: 31 additions & 32 deletions codex-rs/core/src/codex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ use crate::ModelProviderInfo;
use crate::client::ModelClient;
use crate::client_common::Prompt;
use crate::client_common::ResponseEvent;
use crate::compact::collect_user_messages;
use crate::config::Config;
use crate::config::types::ShellEnvironmentPolicy;
use crate::context_manager::ContextManager;
Expand All @@ -63,10 +64,6 @@ use crate::error::CodexErr;
use crate::error::Result as CodexResult;
#[cfg(test)]
use crate::exec::StreamOutput;
// Removed: legacy executor wiring replaced by ToolOrchestrator flows.
// legacy normalize_exec_result no longer used after orchestrator migration
use crate::compact::build_compacted_history;
use crate::compact::collect_user_messages;
use crate::mcp::auth::compute_auth_statuses;
use crate::mcp_connection_manager::McpConnectionManager;
use crate::model_family::find_family_for_model;
Expand Down Expand Up @@ -956,13 +953,18 @@ impl Session {
}
RolloutItem::Compacted(compacted) => {
let snapshot = history.get_history();
let user_messages = collect_user_messages(&snapshot);
let rebuilt = build_compacted_history(
self.build_initial_context(turn_context),
&user_messages,
&compacted.message,
);
history.replace(rebuilt);
// TODO(jif) clean
if let Some(replacement) = &compacted.replacement_history {
history.replace(replacement.clone());
} else {
let user_messages = collect_user_messages(&snapshot);
let rebuilt = compact::build_compacted_history(
self.build_initial_context(turn_context),
&user_messages,
&compacted.message,
);
history.replace(rebuilt);
}
}
_ => {}
}
Expand Down Expand Up @@ -990,6 +992,15 @@ impl Session {
self.persist_rollout_items(&rollout_items).await;
}

pub async fn enabled(&self, feature: Feature) -> bool {
self.state
.lock()
.await
.session_configuration
.features
.enabled(feature)
}

async fn send_raw_response_items(&self, turn_context: &TurnContext, items: &[ResponseItem]) {
for item in items {
self.send_event(
Expand Down Expand Up @@ -1167,14 +1178,7 @@ impl Session {
turn_context: Arc<TurnContext>,
cancellation_token: CancellationToken,
) {
if !self
.state
.lock()
.await
.session_configuration
.features
.enabled(Feature::GhostCommit)
{
if !self.enabled(Feature::GhostCommit).await {
return;
}
let token = match turn_context.tool_call_gate.subscribe().await {
Expand Down Expand Up @@ -1415,7 +1419,7 @@ mod handlers {
use codex_protocol::protocol::ReviewDecision;
use codex_protocol::protocol::ReviewRequest;
use codex_protocol::protocol::TurnAbortReason;
use codex_protocol::user_input::UserInput;

use std::sync::Arc;
use tracing::info;
use tracing::warn;
Expand Down Expand Up @@ -1623,16 +1627,9 @@ mod handlers {
let turn_context = sess
.new_turn_with_sub_id(sub_id, SessionSettingsUpdate::default())
.await;
// Attempt to inject input into current task
if let Err(items) = sess
.inject_input(vec![UserInput::Text {
text: turn_context.compact_prompt().to_string(),
}])
.await
{
sess.spawn_task(Arc::clone(&turn_context), items, CompactTask)
.await;
}

sess.spawn_task(Arc::clone(&turn_context), vec![], CompactTask)
.await;
}

pub async fn shutdown(sess: &Arc<Session>, sub_id: String) -> bool {
Expand Down Expand Up @@ -2896,14 +2893,15 @@ mod tests {
let summary1 = "summary one";
let snapshot1 = live_history.get_history();
let user_messages1 = collect_user_messages(&snapshot1);
let rebuilt1 = build_compacted_history(
let rebuilt1 = compact::build_compacted_history(
session.build_initial_context(turn_context),
&user_messages1,
summary1,
);
live_history.replace(rebuilt1);
rollout_items.push(RolloutItem::Compacted(CompactedItem {
message: summary1.to_string(),
replacement_history: None,
}));

let user2 = ResponseItem::Message {
Expand All @@ -2929,14 +2927,15 @@ mod tests {
let summary2 = "summary two";
let snapshot2 = live_history.get_history();
let user_messages2 = collect_user_messages(&snapshot2);
let rebuilt2 = build_compacted_history(
let rebuilt2 = compact::build_compacted_history(
session.build_initial_context(turn_context),
&user_messages2,
summary2,
);
live_history.replace(rebuilt2);
rollout_items.push(RolloutItem::Compacted(CompactedItem {
message: summary2.to_string(),
replacement_history: None,
}));

let user3 = ResponseItem::Message {
Expand Down
1 change: 1 addition & 0 deletions codex-rs/core/src/compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ async fn run_compact_task_inner(

let rollout_item = RolloutItem::Compacted(CompactedItem {
message: summary_text.clone(),
replacement_history: None,
});
sess.persist_rollout_items(&[rollout_item]).await;

Expand Down
97 changes: 97 additions & 0 deletions codex-rs/core/src/compact_remote.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
use std::sync::Arc;

use crate::Prompt;
use crate::codex::Session;
use crate::codex::TurnContext;
use crate::error::Result as CodexResult;
use crate::protocol::AgentMessageEvent;
use crate::protocol::CompactedItem;
use crate::protocol::ErrorEvent;
use crate::protocol::EventMsg;
use crate::protocol::RolloutItem;
use crate::protocol::TaskStartedEvent;
use codex_protocol::models::ResponseInputItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::user_input::UserInput;

pub(crate) async fn run_remote_compact_task(
sess: Arc<Session>,
turn_context: Arc<TurnContext>,
input: Vec<UserInput>,
) -> Option<String> {
let start_event = EventMsg::TaskStarted(TaskStartedEvent {
model_context_window: turn_context.client.get_model_context_window(),
});
sess.send_event(&turn_context, start_event).await;

match run_remote_compact_task_inner(&sess, &turn_context, input).await {
Ok(()) => {
let event = EventMsg::AgentMessage(AgentMessageEvent {
message: "Compact task completed".to_string(),
});
sess.send_event(&turn_context, event).await;
}
Err(err) => {
let event = EventMsg::Error(ErrorEvent {
message: err.to_string(),
});
sess.send_event(&turn_context, event).await;
}
}

None
}

async fn run_remote_compact_task_inner(
sess: &Arc<Session>,
turn_context: &Arc<TurnContext>,
input: Vec<UserInput>,
) -> CodexResult<()> {
let mut history = sess.clone_history().await;
if !input.is_empty() {
let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input);
history.record_items(&[initial_input_for_turn.into()]);
}

let prompt = Prompt {
input: history.get_history_for_prompt(),
tools: vec![],
parallel_tool_calls: false,
base_instructions_override: turn_context.base_instructions.clone(),
output_schema: None,
};

let mut new_history = turn_context
.client
.compact_conversation_history(&prompt)
.await?;
// Required to keep `/undo` available after compaction
let ghost_snapshots: Vec<ResponseItem> = history
.get_history()
.iter()
.filter(|item| matches!(item, ResponseItem::GhostSnapshot { .. }))
.cloned()
.collect();

if !ghost_snapshots.is_empty() {
new_history.extend(ghost_snapshots);
}
sess.replace_history(new_history.clone()).await;

if let Some(estimated_tokens) = sess
.clone_history()
.await
.estimate_token_count(turn_context.as_ref())
{
sess.override_last_token_usage_estimate(turn_context.as_ref(), estimated_tokens)
.await;
}

let compacted_item = CompactedItem {
message: String::new(),
replacement_history: Some(new_history),
};
sess.persist_rollout_items(&[RolloutItem::Compacted(compacted_item)])
.await;
Ok(())
}
4 changes: 3 additions & 1 deletion codex-rs/core/src/context_manager/history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ impl ContextManager {
| ResponseItem::FunctionCall { .. }
| ResponseItem::WebSearchCall { .. }
| ResponseItem::CustomToolCall { .. }
| ResponseItem::CompactionSummary { .. }
| ResponseItem::GhostSnapshot { .. }
| ResponseItem::Other => item.clone(),
}
Expand All @@ -205,7 +206,8 @@ fn is_api_message(message: &ResponseItem) -> bool {
| ResponseItem::CustomToolCallOutput { .. }
| ResponseItem::LocalShellCall { .. }
| ResponseItem::Reasoning { .. }
| ResponseItem::WebSearchCall { .. } => true,
| ResponseItem::WebSearchCall { .. }
| ResponseItem::CompactionSummary { .. } => true,
ResponseItem::GhostSnapshot { .. } => false,
ResponseItem::Other => false,
}
Expand Down
Loading
Loading