From 0f4fd5ad24d62143984354a86795bdb78c2982df Mon Sep 17 00:00:00 2001 From: celia-oai Date: Thu, 13 Nov 2025 18:07:34 -0800 Subject: [PATCH 1/3] changes --- .../app-server/src/bespoke_event_handling.rs | 323 ++++++++++++++++++ .../app-server/src/codex_message_processor.rs | 253 +------------- codex-rs/app-server/src/lib.rs | 1 + 3 files changed, 327 insertions(+), 250 deletions(-) create mode 100644 codex-rs/app-server/src/bespoke_event_handling.rs diff --git a/codex-rs/app-server/src/bespoke_event_handling.rs b/codex-rs/app-server/src/bespoke_event_handling.rs new file mode 100644 index 0000000000..d1176c2a84 --- /dev/null +++ b/codex-rs/app-server/src/bespoke_event_handling.rs @@ -0,0 +1,323 @@ +use crate::codex_message_processor::ApiVersion; +use crate::codex_message_processor::PendingInterrupts; +use crate::outgoing_message::OutgoingMessageSender; +use codex_app_server_protocol::AccountRateLimitsUpdatedNotification; +use codex_app_server_protocol::AgentMessageDeltaNotification; +use codex_app_server_protocol::ApplyPatchApprovalParams; +use codex_app_server_protocol::ApplyPatchApprovalResponse; +use codex_app_server_protocol::ExecCommandApprovalParams; +use codex_app_server_protocol::ExecCommandApprovalResponse; +use codex_app_server_protocol::InterruptConversationResponse; +use codex_app_server_protocol::ItemCompletedNotification; +use codex_app_server_protocol::ItemStartedNotification; +use codex_app_server_protocol::ReasoningSummaryPartAddedNotification; +use codex_app_server_protocol::ReasoningSummaryTextDeltaNotification; +use codex_app_server_protocol::ReasoningTextDeltaNotification; +use codex_app_server_protocol::ServerNotification; +use codex_app_server_protocol::ServerRequestPayload; +use codex_app_server_protocol::ThreadItem; +use codex_app_server_protocol::TurnInterruptResponse; +use codex_core::CodexConversation; +use codex_core::protocol::ApplyPatchApprovalRequestEvent; +use codex_core::protocol::Event; +use codex_core::protocol::EventMsg; +use codex_core::protocol::ExecApprovalRequestEvent; +use codex_core::protocol::Op; +use codex_core::protocol::ReviewDecision; +use codex_protocol::ConversationId; +use std::sync::Arc; +use tokio::sync::oneshot; +use tracing::error; + +type JsonRpcResult = serde_json::Value; + +pub(crate) async fn apply_bespoke_event_handling( + event: Event, + conversation_id: ConversationId, + conversation: Arc, + outgoing: Arc, + pending_interrupts: PendingInterrupts, +) { + let Event { id: event_id, msg } = event; + match msg { + EventMsg::ApplyPatchApprovalRequest(ApplyPatchApprovalRequestEvent { + call_id, + changes, + reason, + grant_root, + }) => { + let params = ApplyPatchApprovalParams { + conversation_id, + call_id, + file_changes: changes, + reason, + grant_root, + }; + let rx = outgoing + .send_request(ServerRequestPayload::ApplyPatchApproval(params)) + .await; + // TODO(mbolin): Enforce a timeout so this task does not live indefinitely? + tokio::spawn(async move { + on_patch_approval_response(event_id, rx, conversation).await; + }); + } + EventMsg::McpToolCallBegin(begin_event) => { + let item = ThreadItem::McpToolCall { + id: begin_event.call_id, + server: begin_event.invocation.server, + tool: begin_event.invocation.tool, + status: codex_app_server_protocol::McpToolCallStatus::InProgress, + arguments: begin_event + .invocation + .arguments + .unwrap_or(serde_json::Value::Null), + result: None, + error: None, + }; + let notification = ItemStartedNotification { item }; + outgoing + .send_server_notification(ServerNotification::ItemStarted(notification)) + .await; + } + EventMsg::McpToolCallEnd(end_event) => { + let status = if end_event.is_success() { + codex_app_server_protocol::McpToolCallStatus::Completed + } else { + codex_app_server_protocol::McpToolCallStatus::Failed + }; + + let (result, error) = match &end_event.result { + Ok(value) => ( + Some(codex_app_server_protocol::McpToolCallResult { + content: value.content.clone(), + structured_content: value + .structured_content + .clone() + .unwrap_or(serde_json::Value::Null), + }), + None, + ), + Err(message) => ( + None, + Some(codex_app_server_protocol::McpToolCallError { + message: message.clone(), + }), + ), + }; + + let item = ThreadItem::McpToolCall { + id: end_event.call_id, + server: end_event.invocation.server, + tool: end_event.invocation.tool, + status, + arguments: end_event + .invocation + .arguments + .clone() + .unwrap_or(serde_json::Value::Null), + result, + error, + }; + let notification = ItemCompletedNotification { item }; + outgoing + .send_server_notification(ServerNotification::ItemCompleted(notification)) + .await; + } + EventMsg::AgentMessageContentDelta(event) => { + let notification = AgentMessageDeltaNotification { + item_id: event.item_id, + delta: event.delta, + }; + outgoing + .send_server_notification(ServerNotification::AgentMessageDelta(notification)) + .await; + } + EventMsg::ReasoningContentDelta(event) => { + let notification = ReasoningSummaryTextDeltaNotification { + item_id: event.item_id, + delta: event.delta, + summary_index: event.summary_index, + }; + outgoing + .send_server_notification(ServerNotification::ReasoningSummaryTextDelta( + notification, + )) + .await; + } + EventMsg::ReasoningRawContentDelta(event) => { + let notification = ReasoningTextDeltaNotification { + item_id: event.item_id, + delta: event.delta, + content_index: event.content_index, + }; + outgoing + .send_server_notification(ServerNotification::ReasoningTextDelta(notification)) + .await; + } + EventMsg::AgentReasoningSectionBreak(event) => { + let notification = ReasoningSummaryPartAddedNotification { + item_id: event.item_id, + summary_index: event.summary_index, + }; + outgoing + .send_server_notification(ServerNotification::ReasoningSummaryPartAdded( + notification, + )) + .await; + } + EventMsg::ExecApprovalRequest(ExecApprovalRequestEvent { + call_id, + command, + cwd, + reason, + risk, + parsed_cmd, + }) => { + let params = ExecCommandApprovalParams { + conversation_id, + call_id, + command, + cwd, + reason, + risk, + parsed_cmd, + }; + let rx = outgoing + .send_request(ServerRequestPayload::ExecCommandApproval(params)) + .await; + + // TODO(mbolin): Enforce a timeout so this task does not live indefinitely? + tokio::spawn(async move { + on_exec_approval_response(event_id, rx, conversation).await; + }); + } + EventMsg::TokenCount(token_count_event) => { + if let Some(rate_limits) = token_count_event.rate_limits { + outgoing + .send_server_notification(ServerNotification::AccountRateLimitsUpdated( + AccountRateLimitsUpdatedNotification { + rate_limits: rate_limits.into(), + }, + )) + .await; + } + } + EventMsg::ItemStarted(item_started_event) => { + let item: ThreadItem = item_started_event.item.clone().into(); + let notification = ItemStartedNotification { item }; + outgoing + .send_server_notification(ServerNotification::ItemStarted(notification)) + .await; + } + EventMsg::ItemCompleted(item_completed_event) => { + let item: ThreadItem = item_completed_event.item.clone().into(); + let notification = ItemCompletedNotification { item }; + outgoing + .send_server_notification(ServerNotification::ItemCompleted(notification)) + .await; + } + // If this is a TurnAborted, reply to any pending interrupt requests. + EventMsg::TurnAborted(turn_aborted_event) => { + let pending = { + let mut map = pending_interrupts.lock().await; + map.remove(&conversation_id).unwrap_or_default() + }; + if !pending.is_empty() { + for (rid, ver) in pending { + match ver { + ApiVersion::V1 => { + let response = InterruptConversationResponse { + abort_reason: turn_aborted_event.reason.clone(), + }; + outgoing.send_response(rid, response).await; + } + ApiVersion::V2 => { + let response = TurnInterruptResponse {}; + outgoing.send_response(rid, response).await; + } + } + } + } + } + + _ => {} + } +} + +async fn on_patch_approval_response( + event_id: String, + receiver: oneshot::Receiver, + codex: Arc, +) { + let response = receiver.await; + let value = match response { + Ok(value) => value, + Err(err) => { + error!("request failed: {err:?}"); + if let Err(submit_err) = codex + .submit(Op::PatchApproval { + id: event_id.clone(), + decision: ReviewDecision::Denied, + }) + .await + { + error!("failed to submit denied PatchApproval after request failure: {submit_err}"); + } + return; + } + }; + + let response = + serde_json::from_value::(value).unwrap_or_else(|err| { + error!("failed to deserialize ApplyPatchApprovalResponse: {err}"); + ApplyPatchApprovalResponse { + decision: ReviewDecision::Denied, + } + }); + + if let Err(err) = codex + .submit(Op::PatchApproval { + id: event_id, + decision: response.decision, + }) + .await + { + error!("failed to submit PatchApproval: {err}"); + } +} + +async fn on_exec_approval_response( + event_id: String, + receiver: oneshot::Receiver, + conversation: Arc, +) { + let response = receiver.await; + let value = match response { + Ok(value) => value, + Err(err) => { + error!("request failed: {err:?}"); + return; + } + }; + + // Try to deserialize `value` and then make the appropriate call to `codex`. + let response = + serde_json::from_value::(value).unwrap_or_else(|err| { + error!("failed to deserialize ExecCommandApprovalResponse: {err}"); + // If we cannot deserialize the response, we deny the request to be + // conservative. + ExecCommandApprovalResponse { + decision: ReviewDecision::Denied, + } + }); + + if let Err(err) = conversation + .submit(Op::ExecApproval { + id: event_id, + decision: response.decision, + }) + .await + { + error!("failed to submit ExecApproval: {err}"); + } +} diff --git a/codex-rs/app-server/src/codex_message_processor.rs b/codex-rs/app-server/src/codex_message_processor.rs index b012458da9..c694486187 100644 --- a/codex-rs/app-server/src/codex_message_processor.rs +++ b/codex-rs/app-server/src/codex_message_processor.rs @@ -1,3 +1,4 @@ +use crate::bespoke_event_handling::apply_bespoke_event_handling; use crate::error_code::INTERNAL_ERROR_CODE; use crate::error_code::INVALID_REQUEST_ERROR_CODE; use crate::fuzzy_file_search::run_fuzzy_file_search; @@ -8,13 +9,9 @@ use chrono::DateTime; use chrono::Utc; use codex_app_server_protocol::Account; use codex_app_server_protocol::AccountLoginCompletedNotification; -use codex_app_server_protocol::AccountRateLimitsUpdatedNotification; use codex_app_server_protocol::AccountUpdatedNotification; use codex_app_server_protocol::AddConversationListenerParams; use codex_app_server_protocol::AddConversationSubscriptionResponse; -use codex_app_server_protocol::AgentMessageDeltaNotification; -use codex_app_server_protocol::ApplyPatchApprovalParams; -use codex_app_server_protocol::ApplyPatchApprovalResponse; use codex_app_server_protocol::ArchiveConversationParams; use codex_app_server_protocol::ArchiveConversationResponse; use codex_app_server_protocol::AskForApproval; @@ -26,8 +23,6 @@ use codex_app_server_protocol::CancelLoginChatGptResponse; use codex_app_server_protocol::ClientRequest; use codex_app_server_protocol::ConversationGitInfo; use codex_app_server_protocol::ConversationSummary; -use codex_app_server_protocol::ExecCommandApprovalParams; -use codex_app_server_protocol::ExecCommandApprovalResponse; use codex_app_server_protocol::ExecOneOffCommandParams; use codex_app_server_protocol::ExecOneOffCommandResponse; use codex_app_server_protocol::FeedbackUploadParams; @@ -46,9 +41,6 @@ use codex_app_server_protocol::GetUserSavedConfigResponse; use codex_app_server_protocol::GitDiffToRemoteResponse; use codex_app_server_protocol::InputItem as WireInputItem; use codex_app_server_protocol::InterruptConversationParams; -use codex_app_server_protocol::InterruptConversationResponse; -use codex_app_server_protocol::ItemCompletedNotification; -use codex_app_server_protocol::ItemStartedNotification; use codex_app_server_protocol::JSONRPCErrorError; use codex_app_server_protocol::ListConversationsParams; use codex_app_server_protocol::ListConversationsResponse; @@ -63,13 +55,9 @@ use codex_app_server_protocol::ModelListParams; use codex_app_server_protocol::ModelListResponse; use codex_app_server_protocol::NewConversationParams; use codex_app_server_protocol::NewConversationResponse; -use codex_app_server_protocol::ReasoningSummaryPartAddedNotification; -use codex_app_server_protocol::ReasoningSummaryTextDeltaNotification; -use codex_app_server_protocol::ReasoningTextDeltaNotification; use codex_app_server_protocol::RemoveConversationListenerParams; use codex_app_server_protocol::RemoveConversationSubscriptionResponse; use codex_app_server_protocol::RequestId; -use codex_app_server_protocol::Result as JsonRpcResult; use codex_app_server_protocol::ResumeConversationParams; use codex_app_server_protocol::ResumeConversationResponse; use codex_app_server_protocol::SandboxMode; @@ -78,7 +66,6 @@ use codex_app_server_protocol::SendUserMessageResponse; use codex_app_server_protocol::SendUserTurnParams; use codex_app_server_protocol::SendUserTurnResponse; use codex_app_server_protocol::ServerNotification; -use codex_app_server_protocol::ServerRequestPayload; use codex_app_server_protocol::SessionConfiguredNotification; use codex_app_server_protocol::SetDefaultModelParams; use codex_app_server_protocol::SetDefaultModelResponse; @@ -95,7 +82,6 @@ use codex_app_server_protocol::ThreadStartResponse; use codex_app_server_protocol::ThreadStartedNotification; use codex_app_server_protocol::Turn; use codex_app_server_protocol::TurnInterruptParams; -use codex_app_server_protocol::TurnInterruptResponse; use codex_app_server_protocol::TurnStartParams; use codex_app_server_protocol::TurnStartResponse; use codex_app_server_protocol::TurnStartedNotification; @@ -127,12 +113,8 @@ use codex_core::find_conversation_path_by_id_str; use codex_core::get_platform_sandbox; use codex_core::git_info::git_diff_to_remote; use codex_core::parse_cursor; -use codex_core::protocol::ApplyPatchApprovalRequestEvent; -use codex_core::protocol::Event; use codex_core::protocol::EventMsg; -use codex_core::protocol::ExecApprovalRequestEvent; use codex_core::protocol::Op; -use codex_core::protocol::ReviewDecision; use codex_core::read_head_for_summary; use codex_feedback::CodexFeedback; use codex_login::ServerOptions as LoginServerOptions; @@ -167,7 +149,7 @@ use tracing::warn; use uuid::Uuid; type PendingInterruptQueue = Vec<(RequestId, ApiVersion)>; -type PendingInterrupts = Arc>>; +pub(crate) type PendingInterrupts = Arc>>; // Duration before a ChatGPT login attempt is abandoned. const LOGIN_CHATGPT_TIMEOUT: Duration = Duration::from_secs(10 * 60); @@ -198,7 +180,7 @@ pub(crate) struct CodexMessageProcessor { } #[derive(Clone, Copy, Debug)] -enum ApiVersion { +pub(crate) enum ApiVersion { V1, V2, } @@ -2655,157 +2637,6 @@ impl CodexMessageProcessor { } } -async fn apply_bespoke_event_handling( - event: Event, - conversation_id: ConversationId, - conversation: Arc, - outgoing: Arc, - pending_interrupts: PendingInterrupts, -) { - let Event { id: event_id, msg } = event; - match msg { - EventMsg::ApplyPatchApprovalRequest(ApplyPatchApprovalRequestEvent { - call_id, - changes, - reason, - grant_root, - }) => { - let params = ApplyPatchApprovalParams { - conversation_id, - call_id, - file_changes: changes, - reason, - grant_root, - }; - let rx = outgoing - .send_request(ServerRequestPayload::ApplyPatchApproval(params)) - .await; - // TODO(mbolin): Enforce a timeout so this task does not live indefinitely? - tokio::spawn(async move { - on_patch_approval_response(event_id, rx, conversation).await; - }); - } - EventMsg::AgentMessageContentDelta(event) => { - let notification = AgentMessageDeltaNotification { - item_id: event.item_id, - delta: event.delta, - }; - outgoing - .send_server_notification(ServerNotification::AgentMessageDelta(notification)) - .await; - } - EventMsg::ReasoningContentDelta(event) => { - let notification = ReasoningSummaryTextDeltaNotification { - item_id: event.item_id, - delta: event.delta, - summary_index: event.summary_index, - }; - outgoing - .send_server_notification(ServerNotification::ReasoningSummaryTextDelta( - notification, - )) - .await; - } - EventMsg::ReasoningRawContentDelta(event) => { - let notification = ReasoningTextDeltaNotification { - item_id: event.item_id, - delta: event.delta, - content_index: event.content_index, - }; - outgoing - .send_server_notification(ServerNotification::ReasoningTextDelta(notification)) - .await; - } - EventMsg::AgentReasoningSectionBreak(event) => { - let notification = ReasoningSummaryPartAddedNotification { - item_id: event.item_id, - summary_index: event.summary_index, - }; - outgoing - .send_server_notification(ServerNotification::ReasoningSummaryPartAdded( - notification, - )) - .await; - } - EventMsg::ExecApprovalRequest(ExecApprovalRequestEvent { - call_id, - command, - cwd, - reason, - risk, - parsed_cmd, - }) => { - let params = ExecCommandApprovalParams { - conversation_id, - call_id, - command, - cwd, - reason, - risk, - parsed_cmd, - }; - let rx = outgoing - .send_request(ServerRequestPayload::ExecCommandApproval(params)) - .await; - - // TODO(mbolin): Enforce a timeout so this task does not live indefinitely? - tokio::spawn(async move { - on_exec_approval_response(event_id, rx, conversation).await; - }); - } - EventMsg::TokenCount(token_count_event) => { - if let Some(rate_limits) = token_count_event.rate_limits { - outgoing - .send_server_notification(ServerNotification::AccountRateLimitsUpdated( - AccountRateLimitsUpdatedNotification { - rate_limits: rate_limits.into(), - }, - )) - .await; - } - } - EventMsg::ItemStarted(item_started_event) => { - let item: ThreadItem = item_started_event.item.clone().into(); - let notification = ItemStartedNotification { item }; - outgoing - .send_server_notification(ServerNotification::ItemStarted(notification)) - .await; - } - EventMsg::ItemCompleted(item_completed_event) => { - let item: ThreadItem = item_completed_event.item.clone().into(); - let notification = ItemCompletedNotification { item }; - outgoing - .send_server_notification(ServerNotification::ItemCompleted(notification)) - .await; - } - // If this is a TurnAborted, reply to any pending interrupt requests. - EventMsg::TurnAborted(turn_aborted_event) => { - let pending = { - let mut map = pending_interrupts.lock().await; - map.remove(&conversation_id).unwrap_or_default() - }; - if !pending.is_empty() { - for (rid, ver) in pending { - match ver { - ApiVersion::V1 => { - let response = InterruptConversationResponse { - abort_reason: turn_aborted_event.reason.clone(), - }; - outgoing.send_response(rid, response).await; - } - ApiVersion::V2 => { - let response = TurnInterruptResponse {}; - outgoing.send_response(rid, response).await; - } - } - } - } - } - - _ => {} - } -} - async fn derive_config_from_params( overrides: ConfigOverrides, cli_overrides: Option>, @@ -2819,84 +2650,6 @@ async fn derive_config_from_params( Config::load_with_cli_overrides(cli_overrides, overrides).await } -async fn on_patch_approval_response( - event_id: String, - receiver: oneshot::Receiver, - codex: Arc, -) { - let response = receiver.await; - let value = match response { - Ok(value) => value, - Err(err) => { - error!("request failed: {err:?}"); - if let Err(submit_err) = codex - .submit(Op::PatchApproval { - id: event_id.clone(), - decision: ReviewDecision::Denied, - }) - .await - { - error!("failed to submit denied PatchApproval after request failure: {submit_err}"); - } - return; - } - }; - - let response = - serde_json::from_value::(value).unwrap_or_else(|err| { - error!("failed to deserialize ApplyPatchApprovalResponse: {err}"); - ApplyPatchApprovalResponse { - decision: ReviewDecision::Denied, - } - }); - - if let Err(err) = codex - .submit(Op::PatchApproval { - id: event_id, - decision: response.decision, - }) - .await - { - error!("failed to submit PatchApproval: {err}"); - } -} - -async fn on_exec_approval_response( - event_id: String, - receiver: oneshot::Receiver, - conversation: Arc, -) { - let response = receiver.await; - let value = match response { - Ok(value) => value, - Err(err) => { - error!("request failed: {err:?}"); - return; - } - }; - - // Try to deserialize `value` and then make the appropriate call to `codex`. - let response = - serde_json::from_value::(value).unwrap_or_else(|err| { - error!("failed to deserialize ExecCommandApprovalResponse: {err}"); - // If we cannot deserialize the response, we deny the request to be - // conservative. - ExecCommandApprovalResponse { - decision: ReviewDecision::Denied, - } - }); - - if let Err(err) = conversation - .submit(Op::ExecApproval { - id: event_id, - decision: response.decision, - }) - .await - { - error!("failed to submit ExecApproval: {err}"); - } -} - async fn read_summary_from_rollout( path: &Path, fallback_provider: &str, diff --git a/codex-rs/app-server/src/lib.rs b/codex-rs/app-server/src/lib.rs index 6ef986919f..4b65e66d2c 100644 --- a/codex-rs/app-server/src/lib.rs +++ b/codex-rs/app-server/src/lib.rs @@ -28,6 +28,7 @@ use tracing_subscriber::filter::Targets; use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::util::SubscriberInitExt; +mod bespoke_event_handling; mod codex_message_processor; mod error_code; mod fuzzy_file_search; From b6a62795d15025084516d8e1fdb3d297b388f8ee Mon Sep 17 00:00:00 2001 From: celia-oai Date: Thu, 13 Nov 2025 21:18:30 -0800 Subject: [PATCH 2/3] refactor --- .../app-server/src/bespoke_event_handling.rs | 123 ++++++++++-------- 1 file changed, 71 insertions(+), 52 deletions(-) diff --git a/codex-rs/app-server/src/bespoke_event_handling.rs b/codex-rs/app-server/src/bespoke_event_handling.rs index d1176c2a84..0d70fe211c 100644 --- a/codex-rs/app-server/src/bespoke_event_handling.rs +++ b/codex-rs/app-server/src/bespoke_event_handling.rs @@ -10,6 +10,9 @@ use codex_app_server_protocol::ExecCommandApprovalResponse; use codex_app_server_protocol::InterruptConversationResponse; use codex_app_server_protocol::ItemCompletedNotification; use codex_app_server_protocol::ItemStartedNotification; +use codex_app_server_protocol::McpToolCallError; +use codex_app_server_protocol::McpToolCallResult; +use codex_app_server_protocol::McpToolCallStatus; use codex_app_server_protocol::ReasoningSummaryPartAddedNotification; use codex_app_server_protocol::ReasoningSummaryTextDeltaNotification; use codex_app_server_protocol::ReasoningTextDeltaNotification; @@ -22,6 +25,8 @@ use codex_core::protocol::ApplyPatchApprovalRequestEvent; use codex_core::protocol::Event; use codex_core::protocol::EventMsg; use codex_core::protocol::ExecApprovalRequestEvent; +use codex_core::protocol::McpToolCallBeginEvent; +use codex_core::protocol::McpToolCallEndEvent; use codex_core::protocol::Op; use codex_core::protocol::ReviewDecision; use codex_protocol::ConversationId; @@ -61,64 +66,15 @@ pub(crate) async fn apply_bespoke_event_handling( on_patch_approval_response(event_id, rx, conversation).await; }); } + // TODO(celia): properly construct McpToolCall TurnItem in core. EventMsg::McpToolCallBegin(begin_event) => { - let item = ThreadItem::McpToolCall { - id: begin_event.call_id, - server: begin_event.invocation.server, - tool: begin_event.invocation.tool, - status: codex_app_server_protocol::McpToolCallStatus::InProgress, - arguments: begin_event - .invocation - .arguments - .unwrap_or(serde_json::Value::Null), - result: None, - error: None, - }; - let notification = ItemStartedNotification { item }; + let notification = construct_mcp_tool_call_notification(begin_event).await; outgoing .send_server_notification(ServerNotification::ItemStarted(notification)) .await; } EventMsg::McpToolCallEnd(end_event) => { - let status = if end_event.is_success() { - codex_app_server_protocol::McpToolCallStatus::Completed - } else { - codex_app_server_protocol::McpToolCallStatus::Failed - }; - - let (result, error) = match &end_event.result { - Ok(value) => ( - Some(codex_app_server_protocol::McpToolCallResult { - content: value.content.clone(), - structured_content: value - .structured_content - .clone() - .unwrap_or(serde_json::Value::Null), - }), - None, - ), - Err(message) => ( - None, - Some(codex_app_server_protocol::McpToolCallError { - message: message.clone(), - }), - ), - }; - - let item = ThreadItem::McpToolCall { - id: end_event.call_id, - server: end_event.invocation.server, - tool: end_event.invocation.tool, - status, - arguments: end_event - .invocation - .arguments - .clone() - .unwrap_or(serde_json::Value::Null), - result, - error, - }; - let notification = ItemCompletedNotification { item }; + let notification = construct_mcp_tool_call_end_notification(end_event).await; outgoing .send_server_notification(ServerNotification::ItemCompleted(notification)) .await; @@ -321,3 +277,66 @@ async fn on_exec_approval_response( error!("failed to submit ExecApproval: {err}"); } } + +/// similar to handle_mcp_tool_call_begin in exec +async fn construct_mcp_tool_call_notification( + begin_event: McpToolCallBeginEvent, +) -> ItemStartedNotification { + let item = ThreadItem::McpToolCall { + id: begin_event.call_id, + server: begin_event.invocation.server, + tool: begin_event.invocation.tool, + status: McpToolCallStatus::InProgress, + arguments: begin_event + .invocation + .arguments + .unwrap_or(JsonRpcResult::Null), + result: None, + error: None, + }; + ItemStartedNotification { item } +} + +/// simiilar to handle_mcp_tool_call_end in exec +async fn construct_mcp_tool_call_end_notification( + end_event: McpToolCallEndEvent, +) -> ItemCompletedNotification { + let status = if end_event.is_success() { + McpToolCallStatus::Completed + } else { + McpToolCallStatus::Failed + }; + + let (result, error) = match &end_event.result { + Ok(value) => ( + Some(McpToolCallResult { + content: value.content.clone(), + structured_content: value + .structured_content + .clone() + .unwrap_or(JsonRpcResult::Null), + }), + None, + ), + Err(message) => ( + None, + Some(McpToolCallError { + message: message.clone(), + }), + ), + }; + + let item = ThreadItem::McpToolCall { + id: end_event.call_id, + server: end_event.invocation.server, + tool: end_event.invocation.tool, + status, + arguments: end_event + .invocation + .arguments + .unwrap_or(JsonRpcResult::Null), + result, + error, + }; + ItemCompletedNotification { item } +} From cfcbbae71b79d7d0627abeef4cfde3900779cc3b Mon Sep 17 00:00:00 2001 From: celia-oai Date: Thu, 13 Nov 2025 21:24:56 -0800 Subject: [PATCH 3/3] tests --- codex-rs/Cargo.lock | 1 + .../app-server-protocol/src/protocol/v2.rs | 2 +- codex-rs/app-server/Cargo.toml | 1 + .../app-server/src/bespoke_event_handling.rs | 149 +++++++++++++++++- 4 files changed, 148 insertions(+), 5 deletions(-) diff --git a/codex-rs/Cargo.lock b/codex-rs/Cargo.lock index d288c0f668..7ae5f48601 100644 --- a/codex-rs/Cargo.lock +++ b/codex-rs/Cargo.lock @@ -850,6 +850,7 @@ dependencies = [ "codex-protocol", "codex-utils-json-to-toml", "core_test_support", + "mcp-types", "opentelemetry-appender-tracing", "os_info", "pretty_assertions", diff --git a/codex-rs/app-server-protocol/src/protocol/v2.rs b/codex-rs/app-server-protocol/src/protocol/v2.rs index cc74a8de72..1c2c0b46c9 100644 --- a/codex-rs/app-server-protocol/src/protocol/v2.rs +++ b/codex-rs/app-server-protocol/src/protocol/v2.rs @@ -640,7 +640,7 @@ pub enum McpToolCallStatus { #[ts(export_to = "v2/")] pub struct McpToolCallResult { pub content: Vec, - pub structured_content: JsonValue, + pub structured_content: Option, } #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] diff --git a/codex-rs/app-server/Cargo.toml b/codex-rs/app-server/Cargo.toml index d693e7bb7f..96f64afdf5 100644 --- a/codex-rs/app-server/Cargo.toml +++ b/codex-rs/app-server/Cargo.toml @@ -46,6 +46,7 @@ app_test_support = { workspace = true } assert_cmd = { workspace = true } base64 = { workspace = true } core_test_support = { workspace = true } +mcp-types = { workspace = true } os_info = { workspace = true } pretty_assertions = { workspace = true } serial_test = { workspace = true } diff --git a/codex-rs/app-server/src/bespoke_event_handling.rs b/codex-rs/app-server/src/bespoke_event_handling.rs index 0d70fe211c..3f6abb0976 100644 --- a/codex-rs/app-server/src/bespoke_event_handling.rs +++ b/codex-rs/app-server/src/bespoke_event_handling.rs @@ -311,10 +311,7 @@ async fn construct_mcp_tool_call_end_notification( Ok(value) => ( Some(McpToolCallResult { content: value.content.clone(), - structured_content: value - .structured_content - .clone() - .unwrap_or(JsonRpcResult::Null), + structured_content: value.structured_content.clone(), }), None, ), @@ -340,3 +337,147 @@ async fn construct_mcp_tool_call_end_notification( }; ItemCompletedNotification { item } } + +#[cfg(test)] +mod tests { + use super::*; + use codex_core::protocol::McpInvocation; + use mcp_types::CallToolResult; + use mcp_types::ContentBlock; + use mcp_types::TextContent; + use pretty_assertions::assert_eq; + use serde_json::Value as JsonValue; + use std::time::Duration; + + #[tokio::test] + async fn test_construct_mcp_tool_call_begin_notification_with_args() { + let begin_event = McpToolCallBeginEvent { + call_id: "call_123".to_string(), + invocation: McpInvocation { + server: "codex".to_string(), + tool: "list_mcp_resources".to_string(), + arguments: Some(serde_json::json!({"server": ""})), + }, + }; + + let notification = construct_mcp_tool_call_notification(begin_event.clone()).await; + + let expected = ItemStartedNotification { + item: ThreadItem::McpToolCall { + id: begin_event.call_id, + server: begin_event.invocation.server, + tool: begin_event.invocation.tool, + status: McpToolCallStatus::InProgress, + arguments: serde_json::json!({"server": ""}), + result: None, + error: None, + }, + }; + + assert_eq!(notification, expected); + } + + #[tokio::test] + async fn test_construct_mcp_tool_call_begin_notification_without_args() { + let begin_event = McpToolCallBeginEvent { + call_id: "call_456".to_string(), + invocation: McpInvocation { + server: "codex".to_string(), + tool: "list_mcp_resources".to_string(), + arguments: None, + }, + }; + + let notification = construct_mcp_tool_call_notification(begin_event.clone()).await; + + let expected = ItemStartedNotification { + item: ThreadItem::McpToolCall { + id: begin_event.call_id, + server: begin_event.invocation.server, + tool: begin_event.invocation.tool, + status: McpToolCallStatus::InProgress, + arguments: JsonValue::Null, + result: None, + error: None, + }, + }; + + assert_eq!(notification, expected); + } + + #[tokio::test] + async fn test_construct_mcp_tool_call_end_notification_success() { + let content = vec![ContentBlock::TextContent(TextContent { + annotations: None, + text: "{\"resources\":[]}".to_string(), + r#type: "text".to_string(), + })]; + let result = CallToolResult { + content: content.clone(), + is_error: Some(false), + structured_content: None, + }; + + let end_event = McpToolCallEndEvent { + call_id: "call_789".to_string(), + invocation: McpInvocation { + server: "codex".to_string(), + tool: "list_mcp_resources".to_string(), + arguments: Some(serde_json::json!({"server": ""})), + }, + duration: Duration::from_nanos(92708), + result: Ok(result), + }; + + let notification = construct_mcp_tool_call_end_notification(end_event.clone()).await; + + let expected = ItemCompletedNotification { + item: ThreadItem::McpToolCall { + id: end_event.call_id, + server: end_event.invocation.server, + tool: end_event.invocation.tool, + status: McpToolCallStatus::Completed, + arguments: serde_json::json!({"server": ""}), + result: Some(McpToolCallResult { + content, + structured_content: None, + }), + error: None, + }, + }; + + assert_eq!(notification, expected); + } + + #[tokio::test] + async fn test_construct_mcp_tool_call_end_notification_error() { + let end_event = McpToolCallEndEvent { + call_id: "call_err".to_string(), + invocation: McpInvocation { + server: "codex".to_string(), + tool: "list_mcp_resources".to_string(), + arguments: None, + }, + duration: Duration::from_millis(1), + result: Err("boom".to_string()), + }; + + let notification = construct_mcp_tool_call_end_notification(end_event.clone()).await; + + let expected = ItemCompletedNotification { + item: ThreadItem::McpToolCall { + id: end_event.call_id, + server: end_event.invocation.server, + tool: end_event.invocation.tool, + status: McpToolCallStatus::Failed, + arguments: JsonValue::Null, + result: None, + error: Some(McpToolCallError { + message: "boom".to_string(), + }), + }, + }; + + assert_eq!(notification, expected); + } +}