From 8db539461d73a3eb637aaa34d51541dbf6cc5a4d Mon Sep 17 00:00:00 2001 From: celia-oai Date: Tue, 18 Nov 2025 17:41:45 -0800 Subject: [PATCH] changes --- .../app-server-protocol/src/protocol/v2.rs | 12 +- codex-rs/app-server-test-client/src/main.rs | 4 +- .../app-server/src/bespoke_event_handling.rs | 306 ++++++++++++++++++ .../app-server/src/codex_message_processor.rs | 22 +- .../tests/suite/v2/turn_interrupt.rs | 16 +- .../app-server/tests/suite/v2/turn_start.rs | 24 +- 6 files changed, 362 insertions(+), 22 deletions(-) diff --git a/codex-rs/app-server-protocol/src/protocol/v2.rs b/codex-rs/app-server-protocol/src/protocol/v2.rs index f4a6d636aa..05e0e3560e 100644 --- a/codex-rs/app-server-protocol/src/protocol/v2.rs +++ b/codex-rs/app-server-protocol/src/protocol/v2.rs @@ -519,9 +519,11 @@ pub struct AccountUpdatedNotification { #[ts(export_to = "v2/")] pub struct Turn { pub id: String, + /// This is currently only populated for resumed threads. + /// TODO: properly populate items for all turns. pub items: Vec, + #[serde(flatten)] pub status: TurnStatus, - pub error: Option, } #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] @@ -532,12 +534,12 @@ pub struct TurnError { } #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] -#[serde(rename_all = "camelCase")] -#[ts(export_to = "v2/")] +#[serde(tag = "status", rename_all = "camelCase")] +#[ts(tag = "status", export_to = "v2/")] pub enum TurnStatus { Completed, Interrupted, - Failed, + Failed { error: TurnError }, InProgress, } @@ -853,8 +855,6 @@ pub struct Usage { #[ts(export_to = "v2/")] pub struct TurnCompletedNotification { pub turn: Turn, - // TODO: should usage be stored on the Turn object, and we return that instead? - pub usage: Usage, } #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] diff --git a/codex-rs/app-server-test-client/src/main.rs b/codex-rs/app-server-test-client/src/main.rs index a243937b28..a2363200f6 100644 --- a/codex-rs/app-server-test-client/src/main.rs +++ b/codex-rs/app-server-test-client/src/main.rs @@ -40,6 +40,7 @@ use codex_app_server_protocol::ThreadStartParams; use codex_app_server_protocol::ThreadStartResponse; use codex_app_server_protocol::TurnStartParams; use codex_app_server_protocol::TurnStartResponse; +use codex_app_server_protocol::TurnStatus; use codex_app_server_protocol::UserInput as V2UserInput; use codex_protocol::ConversationId; use codex_protocol::protocol::Event; @@ -502,10 +503,9 @@ impl CodexClient { ServerNotification::TurnCompleted(payload) => { if payload.turn.id == turn_id { println!("\n< turn/completed notification: {:?}", payload.turn.status); - if let Some(error) = payload.turn.error { + if let TurnStatus::Failed { error } = &payload.turn.status { println!("[turn error] {}", error.message); } - println!("< usage: {:?}", payload.usage); break; } } diff --git a/codex-rs/app-server/src/bespoke_event_handling.rs b/codex-rs/app-server/src/bespoke_event_handling.rs index ce4f7590e2..b4fdbe3e03 100644 --- a/codex-rs/app-server/src/bespoke_event_handling.rs +++ b/codex-rs/app-server/src/bespoke_event_handling.rs @@ -1,5 +1,7 @@ use crate::codex_message_processor::ApiVersion; use crate::codex_message_processor::PendingInterrupts; +use crate::codex_message_processor::TurnSummary; +use crate::codex_message_processor::TurnSummaryStore; use crate::outgoing_message::OutgoingMessageSender; use codex_app_server_protocol::AccountRateLimitsUpdatedNotification; use codex_app_server_protocol::AgentMessageDeltaNotification; @@ -26,7 +28,11 @@ use codex_app_server_protocol::SandboxCommandAssessment as V2SandboxCommandAsses use codex_app_server_protocol::ServerNotification; use codex_app_server_protocol::ServerRequestPayload; use codex_app_server_protocol::ThreadItem; +use codex_app_server_protocol::Turn; +use codex_app_server_protocol::TurnCompletedNotification; +use codex_app_server_protocol::TurnError; use codex_app_server_protocol::TurnInterruptResponse; +use codex_app_server_protocol::TurnStatus; use codex_core::CodexConversation; use codex_core::parse_command::shlex_join; use codex_core::protocol::ApplyPatchApprovalRequestEvent; @@ -54,10 +60,14 @@ pub(crate) async fn apply_bespoke_event_handling( conversation: Arc, outgoing: Arc, pending_interrupts: PendingInterrupts, + turn_summary_store: TurnSummaryStore, api_version: ApiVersion, ) { let Event { id: event_id, msg } = event; match msg { + EventMsg::TaskComplete(_ev) => { + handle_turn_complete(conversation_id, event_id, &outgoing, &turn_summary_store).await; + } EventMsg::ApplyPatchApprovalRequest(ApplyPatchApprovalRequestEvent { call_id, changes, @@ -191,6 +201,9 @@ pub(crate) async fn apply_bespoke_event_handling( .await; } } + EventMsg::Error(ev) => { + handle_error(conversation_id, ev.message, &turn_summary_store).await; + } EventMsg::EnteredReviewMode(review_request) => { let notification = ItemStartedNotification { item: ThreadItem::CodeReview { @@ -326,12 +339,79 @@ pub(crate) async fn apply_bespoke_event_handling( } } } + + handle_turn_interrupted(conversation_id, event_id, &outgoing, &turn_summary_store) + .await; } _ => {} } } +async fn emit_turn_completed_with_status( + event_id: String, + status: TurnStatus, + outgoing: &OutgoingMessageSender, +) { + let notification = TurnCompletedNotification { + turn: Turn { + id: event_id, + items: vec![], + status, + }, + }; + outgoing + .send_server_notification(ServerNotification::TurnCompleted(notification)) + .await; +} + +async fn find_and_remove_turn_summary( + conversation_id: ConversationId, + turn_summary_store: &TurnSummaryStore, +) -> TurnSummary { + let mut map = turn_summary_store.lock().await; + map.remove(&conversation_id).unwrap_or_default() +} + +async fn handle_turn_complete( + conversation_id: ConversationId, + event_id: String, + outgoing: &OutgoingMessageSender, + turn_summary_store: &TurnSummaryStore, +) { + let turn_summary = find_and_remove_turn_summary(conversation_id, turn_summary_store).await; + + let status = if let Some(message) = turn_summary.last_error_message { + TurnStatus::Failed { + error: TurnError { message }, + } + } else { + TurnStatus::Completed + }; + + emit_turn_completed_with_status(event_id, status, outgoing).await; +} + +async fn handle_turn_interrupted( + conversation_id: ConversationId, + event_id: String, + outgoing: &OutgoingMessageSender, + turn_summary_store: &TurnSummaryStore, +) { + find_and_remove_turn_summary(conversation_id, turn_summary_store).await; + + emit_turn_completed_with_status(event_id, TurnStatus::Interrupted, outgoing).await; +} + +async fn handle_error( + conversation_id: ConversationId, + message: String, + turn_summary_store: &TurnSummaryStore, +) { + let mut map = turn_summary_store.lock().await; + map.entry(conversation_id).or_default().last_error_message = Some(message); +} + async fn on_patch_approval_response( event_id: String, receiver: oneshot::Receiver, @@ -536,13 +616,140 @@ async fn construct_mcp_tool_call_end_notification( #[cfg(test)] mod tests { use super::*; + use crate::CHANNEL_CAPACITY; + use crate::outgoing_message::OutgoingMessage; + use crate::outgoing_message::OutgoingMessageSender; + use anyhow::Result; + use anyhow::anyhow; + use anyhow::bail; use codex_core::protocol::McpInvocation; use mcp_types::CallToolResult; use mcp_types::ContentBlock; use mcp_types::TextContent; use pretty_assertions::assert_eq; use serde_json::Value as JsonValue; + use std::collections::HashMap; use std::time::Duration; + use tokio::sync::Mutex; + use tokio::sync::mpsc; + + fn new_turn_summary_store() -> TurnSummaryStore { + Arc::new(Mutex::new(HashMap::new())) + } + + #[tokio::test] + async fn test_handle_error_records_message() -> Result<()> { + let conversation_id = ConversationId::new(); + let turn_summary_store = new_turn_summary_store(); + + handle_error(conversation_id, "boom".to_string(), &turn_summary_store).await; + + let turn_summary = find_and_remove_turn_summary(conversation_id, &turn_summary_store).await; + assert_eq!(turn_summary.last_error_message, Some("boom".to_string())); + Ok(()) + } + + #[tokio::test] + async fn test_handle_turn_complete_emits_completed_without_error() -> Result<()> { + let conversation_id = ConversationId::new(); + let event_id = "complete1".to_string(); + let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY); + let outgoing = Arc::new(OutgoingMessageSender::new(tx)); + let turn_summary_store = new_turn_summary_store(); + + handle_turn_complete( + conversation_id, + event_id.clone(), + &outgoing, + &turn_summary_store, + ) + .await; + + let msg = rx + .recv() + .await + .ok_or_else(|| anyhow!("should send one notification"))?; + match msg { + OutgoingMessage::AppServerNotification(ServerNotification::TurnCompleted(n)) => { + assert_eq!(n.turn.id, event_id); + assert_eq!(n.turn.status, TurnStatus::Completed); + } + other => bail!("unexpected message: {other:?}"), + } + assert!(rx.try_recv().is_err(), "no extra messages expected"); + Ok(()) + } + + #[tokio::test] + async fn test_handle_turn_interrupted_emits_interrupted_with_error() -> Result<()> { + let conversation_id = ConversationId::new(); + let event_id = "interrupt1".to_string(); + let turn_summary_store = new_turn_summary_store(); + handle_error(conversation_id, "oops".to_string(), &turn_summary_store).await; + let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY); + let outgoing = Arc::new(OutgoingMessageSender::new(tx)); + + handle_turn_interrupted( + conversation_id, + event_id.clone(), + &outgoing, + &turn_summary_store, + ) + .await; + + let msg = rx + .recv() + .await + .ok_or_else(|| anyhow!("should send one notification"))?; + match msg { + OutgoingMessage::AppServerNotification(ServerNotification::TurnCompleted(n)) => { + assert_eq!(n.turn.id, event_id); + assert_eq!(n.turn.status, TurnStatus::Interrupted); + } + other => bail!("unexpected message: {other:?}"), + } + assert!(rx.try_recv().is_err(), "no extra messages expected"); + Ok(()) + } + + #[tokio::test] + async fn test_handle_turn_complete_emits_failed_with_error() -> Result<()> { + let conversation_id = ConversationId::new(); + let event_id = "complete_err1".to_string(); + let turn_summary_store = new_turn_summary_store(); + handle_error(conversation_id, "bad".to_string(), &turn_summary_store).await; + let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY); + let outgoing = Arc::new(OutgoingMessageSender::new(tx)); + + handle_turn_complete( + conversation_id, + event_id.clone(), + &outgoing, + &turn_summary_store, + ) + .await; + + let msg = rx + .recv() + .await + .ok_or_else(|| anyhow!("should send one notification"))?; + match msg { + OutgoingMessage::AppServerNotification(ServerNotification::TurnCompleted(n)) => { + assert_eq!(n.turn.id, event_id); + assert_eq!( + n.turn.status, + TurnStatus::Failed { + error: TurnError { + message: "bad".to_string(), + } + } + ); + } + other => bail!("unexpected message: {other:?}"), + } + assert!(rx.try_recv().is_err(), "no extra messages expected"); + Ok(()) + } #[tokio::test] async fn test_construct_mcp_tool_call_begin_notification_with_args() { @@ -572,6 +779,105 @@ mod tests { assert_eq!(notification, expected); } + #[tokio::test] + async fn test_handle_turn_complete_emits_error_multiple_turns() -> Result<()> { + // Conversation A will have two turns; Conversation B will have one turn. + let conversation_a = ConversationId::new(); + let conversation_b = ConversationId::new(); + let turn_summary_store = new_turn_summary_store(); + + let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY); + let outgoing = Arc::new(OutgoingMessageSender::new(tx)); + + // Turn 1 on conversation A + let a_turn1 = "a_turn1".to_string(); + handle_error(conversation_a, "a1".to_string(), &turn_summary_store).await; + handle_turn_complete( + conversation_a, + a_turn1.clone(), + &outgoing, + &turn_summary_store, + ) + .await; + + // Turn 1 on conversation B + let b_turn1 = "b_turn1".to_string(); + handle_error(conversation_b, "b1".to_string(), &turn_summary_store).await; + handle_turn_complete( + conversation_b, + b_turn1.clone(), + &outgoing, + &turn_summary_store, + ) + .await; + + // Turn 2 on conversation A + let a_turn2 = "a_turn2".to_string(); + handle_turn_complete( + conversation_a, + a_turn2.clone(), + &outgoing, + &turn_summary_store, + ) + .await; + + // Verify: A turn 1 + let msg = rx + .recv() + .await + .ok_or_else(|| anyhow!("should send first notification"))?; + match msg { + OutgoingMessage::AppServerNotification(ServerNotification::TurnCompleted(n)) => { + assert_eq!(n.turn.id, a_turn1); + assert_eq!( + n.turn.status, + TurnStatus::Failed { + error: TurnError { + message: "a1".to_string(), + } + } + ); + } + other => bail!("unexpected message: {other:?}"), + } + + // Verify: B turn 1 + let msg = rx + .recv() + .await + .ok_or_else(|| anyhow!("should send second notification"))?; + match msg { + OutgoingMessage::AppServerNotification(ServerNotification::TurnCompleted(n)) => { + assert_eq!(n.turn.id, b_turn1); + assert_eq!( + n.turn.status, + TurnStatus::Failed { + error: TurnError { + message: "b1".to_string(), + } + } + ); + } + other => bail!("unexpected message: {other:?}"), + } + + // Verify: A turn 2 + let msg = rx + .recv() + .await + .ok_or_else(|| anyhow!("should send third notification"))?; + match msg { + OutgoingMessage::AppServerNotification(ServerNotification::TurnCompleted(n)) => { + assert_eq!(n.turn.id, a_turn2); + assert_eq!(n.turn.status, TurnStatus::Completed); + } + other => bail!("unexpected message: {other:?}"), + } + + assert!(rx.try_recv().is_err(), "no extra messages expected"); + Ok(()) + } + #[tokio::test] async fn test_construct_mcp_tool_call_begin_notification_without_args() { let begin_event = McpToolCallBeginEvent { diff --git a/codex-rs/app-server/src/codex_message_processor.rs b/codex-rs/app-server/src/codex_message_processor.rs index 0172458074..6191fdf4a7 100644 --- a/codex-rs/app-server/src/codex_message_processor.rs +++ b/codex-rs/app-server/src/codex_message_processor.rs @@ -154,6 +154,14 @@ use uuid::Uuid; type PendingInterruptQueue = Vec<(RequestId, ApiVersion)>; pub(crate) type PendingInterrupts = Arc>>; +/// Per-conversation accumulation of the latest states e.g. error message while a turn runs. +#[derive(Default, Clone)] +pub(crate) struct TurnSummary { + pub(crate) last_error_message: Option, +} + +pub(crate) type TurnSummaryStore = Arc>>; + // Duration before a ChatGPT login attempt is abandoned. const LOGIN_CHATGPT_TIMEOUT: Duration = Duration::from_secs(10 * 60); struct ActiveLogin { @@ -178,6 +186,7 @@ pub(crate) struct CodexMessageProcessor { active_login: Arc>>, // Queue of pending interrupt requests per conversation. We reply when TurnAborted arrives. pending_interrupts: PendingInterrupts, + turn_summary_store: TurnSummaryStore, pending_fuzzy_searches: Arc>>>, feedback: CodexFeedback, } @@ -230,6 +239,7 @@ impl CodexMessageProcessor { conversation_listeners: HashMap::new(), active_login: Arc::new(Mutex::new(None)), pending_interrupts: Arc::new(Mutex::new(HashMap::new())), + turn_summary_store: Arc::new(Mutex::new(HashMap::new())), pending_fuzzy_searches: Arc::new(Mutex::new(HashMap::new())), feedback, } @@ -2363,9 +2373,6 @@ impl CodexMessageProcessor { } }; - // Keep a copy of v2 inputs for the notification payload. - let v2_inputs_for_notif = params.input.clone(); - // Map v2 input items to core input items. let mapped_items: Vec = params .input @@ -2405,12 +2412,8 @@ impl CodexMessageProcessor { Ok(turn_id) => { let turn = Turn { id: turn_id.clone(), - items: vec![ThreadItem::UserMessage { - id: turn_id, - content: v2_inputs_for_notif, - }], + items: vec![], status: TurnStatus::InProgress, - error: None, }; let response = TurnStartResponse { turn: turn.clone() }; @@ -2471,7 +2474,6 @@ impl CodexMessageProcessor { id: turn_id.clone(), items, status: TurnStatus::InProgress, - error: None, }; let response = TurnStartResponse { turn: turn.clone() }; self.outgoing.send_response(request_id, response).await; @@ -2591,6 +2593,7 @@ impl CodexMessageProcessor { let outgoing_for_task = self.outgoing.clone(); let pending_interrupts = self.pending_interrupts.clone(); + let turn_summary_store = self.turn_summary_store.clone(); let api_version_for_task = api_version; tokio::spawn(async move { loop { @@ -2647,6 +2650,7 @@ impl CodexMessageProcessor { conversation.clone(), outgoing_for_task.clone(), pending_interrupts.clone(), + turn_summary_store.clone(), api_version_for_task, ) .await; diff --git a/codex-rs/app-server/tests/suite/v2/turn_interrupt.rs b/codex-rs/app-server/tests/suite/v2/turn_interrupt.rs index d1deb60801..aa140c9e7f 100644 --- a/codex-rs/app-server/tests/suite/v2/turn_interrupt.rs +++ b/codex-rs/app-server/tests/suite/v2/turn_interrupt.rs @@ -5,14 +5,17 @@ use app_test_support::McpProcess; use app_test_support::create_mock_chat_completions_server; use app_test_support::create_shell_sse_response; use app_test_support::to_response; +use codex_app_server_protocol::JSONRPCNotification; use codex_app_server_protocol::JSONRPCResponse; use codex_app_server_protocol::RequestId; use codex_app_server_protocol::ThreadStartParams; use codex_app_server_protocol::ThreadStartResponse; +use codex_app_server_protocol::TurnCompletedNotification; use codex_app_server_protocol::TurnInterruptParams; use codex_app_server_protocol::TurnInterruptResponse; use codex_app_server_protocol::TurnStartParams; use codex_app_server_protocol::TurnStartResponse; +use codex_app_server_protocol::TurnStatus; use codex_app_server_protocol::UserInput as V2UserInput; use tempfile::TempDir; use tokio::time::timeout; @@ -99,7 +102,18 @@ async fn turn_interrupt_aborts_running_turn() -> Result<()> { .await??; let _resp: TurnInterruptResponse = to_response::(interrupt_resp)?; - // No fields to assert on; successful deserialization confirms proper response shape. + let completed_notif: JSONRPCNotification = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_notification_message("turn/completed"), + ) + .await??; + let completed: TurnCompletedNotification = serde_json::from_value( + completed_notif + .params + .expect("turn/completed params must be present"), + )?; + assert_eq!(completed.turn.status, TurnStatus::Interrupted); + Ok(()) } diff --git a/codex-rs/app-server/tests/suite/v2/turn_start.rs b/codex-rs/app-server/tests/suite/v2/turn_start.rs index 433c7b4486..753e33c243 100644 --- a/codex-rs/app-server/tests/suite/v2/turn_start.rs +++ b/codex-rs/app-server/tests/suite/v2/turn_start.rs @@ -14,9 +14,11 @@ use codex_app_server_protocol::ServerRequest; use codex_app_server_protocol::ThreadItem; use codex_app_server_protocol::ThreadStartParams; use codex_app_server_protocol::ThreadStartResponse; +use codex_app_server_protocol::TurnCompletedNotification; use codex_app_server_protocol::TurnStartParams; use codex_app_server_protocol::TurnStartResponse; use codex_app_server_protocol::TurnStartedNotification; +use codex_app_server_protocol::TurnStatus; use codex_app_server_protocol::UserInput as V2UserInput; use codex_core::protocol_config_types::ReasoningEffort; use codex_core::protocol_config_types::ReasoningSummary; @@ -118,13 +120,17 @@ async fn turn_start_emits_notifications_and_accepts_model_override() -> Result<( ) .await??; - // And we should ultimately get a task_complete without having to add a - // legacy conversation listener explicitly (auto-attached by thread/start). - let _task_complete: JSONRPCNotification = timeout( + let completed_notif: JSONRPCNotification = timeout( DEFAULT_READ_TIMEOUT, - mcp.read_stream_until_notification_message("codex/event/task_complete"), + mcp.read_stream_until_notification_message("turn/completed"), ) .await??; + let completed: TurnCompletedNotification = serde_json::from_value( + completed_notif + .params + .expect("turn/completed params must be present"), + )?; + assert_eq!(completed.turn.status, TurnStatus::Completed); Ok(()) } @@ -274,6 +280,11 @@ async fn turn_start_exec_approval_toggle_v2() -> Result<()> { mcp.read_stream_until_notification_message("codex/event/task_complete"), ) .await??; + timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_notification_message("turn/completed"), + ) + .await??; // Second turn with approval_policy=never should not elicit approval let second_turn_id = mcp @@ -302,6 +313,11 @@ async fn turn_start_exec_approval_toggle_v2() -> Result<()> { mcp.read_stream_until_notification_message("codex/event/task_complete"), ) .await??; + timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_notification_message("turn/completed"), + ) + .await??; Ok(()) }