From e78c9345f275d041bc660f19ee881aa8f7e9affc Mon Sep 17 00:00:00 2001 From: celia-oai Date: Mon, 24 Nov 2025 15:41:16 -0800 Subject: [PATCH 1/2] changes --- .../src/protocol/common.rs | 1 + .../app-server-protocol/src/protocol/v2.rs | 59 +++++++ codex-rs/app-server/README.md | 2 +- .../app-server/src/bespoke_event_handling.rs | 166 +++++++++++++++++- 4 files changed, 218 insertions(+), 10 deletions(-) diff --git a/codex-rs/app-server-protocol/src/protocol/common.rs b/codex-rs/app-server-protocol/src/protocol/common.rs index 885cfae26c..bc7141f008 100644 --- a/codex-rs/app-server-protocol/src/protocol/common.rs +++ b/codex-rs/app-server-protocol/src/protocol/common.rs @@ -502,6 +502,7 @@ server_notification_definitions! { /// NEW NOTIFICATIONS Error => "error" (v2::ErrorNotification), ThreadStarted => "thread/started" (v2::ThreadStartedNotification), + ThreadTokenUsageUpdated => "thread/tokenUsage/updated" (v2::ThreadTokenUsageUpdatedNotification), TurnStarted => "turn/started" (v2::TurnStartedNotification), TurnCompleted => "turn/completed" (v2::TurnCompletedNotification), TurnDiffUpdated => "turn/diff/updated" (v2::TurnDiffUpdatedNotification), diff --git a/codex-rs/app-server-protocol/src/protocol/v2.rs b/codex-rs/app-server-protocol/src/protocol/v2.rs index 8c8e71a2ca..0cd7c5e9de 100644 --- a/codex-rs/app-server-protocol/src/protocol/v2.rs +++ b/codex-rs/app-server-protocol/src/protocol/v2.rs @@ -16,6 +16,8 @@ use codex_protocol::protocol::CreditsSnapshot as CoreCreditsSnapshot; use codex_protocol::protocol::RateLimitSnapshot as CoreRateLimitSnapshot; use codex_protocol::protocol::RateLimitWindow as CoreRateLimitWindow; use codex_protocol::protocol::SessionSource as CoreSessionSource; +use codex_protocol::protocol::TokenUsage as CoreTokenUsage; +use codex_protocol::protocol::TokenUsageInfo as CoreTokenUsageInfo; use codex_protocol::user_input::UserInput as CoreUserInput; use mcp_types::ContentBlock as McpContentBlock; use schemars::JsonSchema; @@ -780,6 +782,63 @@ pub struct AccountUpdatedNotification { pub auth_mode: Option, } +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "v2/")] +pub struct ThreadTokenUsageUpdatedNotification { + pub thread_id: String, + pub turn_id: String, + pub token_usage: Option, +} + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "v2/")] +pub struct ThreadTokenUsage { + pub total: TokenUsageBreakdown, + pub last: TokenUsageBreakdown, + #[ts(type = "number | null")] + pub model_context_window: Option, +} + +impl From for ThreadTokenUsage { + fn from(value: CoreTokenUsageInfo) -> Self { + Self { + total: value.total_token_usage.into(), + last: value.last_token_usage.into(), + model_context_window: value.model_context_window, + } + } +} + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "v2/")] +pub struct TokenUsageBreakdown { + #[ts(type = "number")] + pub total_tokens: i64, + #[ts(type = "number")] + pub input_tokens: i64, + #[ts(type = "number")] + pub cached_input_tokens: i64, + #[ts(type = "number")] + pub output_tokens: i64, + #[ts(type = "number")] + pub reasoning_output_tokens: i64, +} + +impl From for TokenUsageBreakdown { + fn from(value: CoreTokenUsage) -> Self { + Self { + total_tokens: value.total_tokens, + input_tokens: value.input_tokens, + cached_input_tokens: value.cached_input_tokens, + output_tokens: value.output_tokens, + reasoning_output_tokens: value.reasoning_output_tokens, + } + } +} + #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] #[serde(rename_all = "camelCase")] #[ts(export_to = "v2/")] diff --git a/codex-rs/app-server/README.md b/codex-rs/app-server/README.md index fa0a4baaa0..fb31f083ea 100644 --- a/codex-rs/app-server/README.md +++ b/codex-rs/app-server/README.md @@ -240,7 +240,7 @@ Event notifications are the server-initiated event stream for thread lifecycles, ### Turn events -The app-server streams JSON-RPC notifications while a turn is running. Each turn starts with `turn/started` (initial `turn`) and ends with `turn/completed` (final `turn` plus token `usage`), and clients subscribe to the events they care about, rendering each item incrementally as updates arrive. The per-item lifecycle is always: `item/started` → zero or more item-specific deltas → `item/completed`. +The app-server streams JSON-RPC notifications while a turn is running. Each turn starts with `turn/started` (initial `turn`) and ends with `turn/completed` (final `turn` status). Token usage deltas stream separately via `thread/tokenUsage/updated`. Clients subscribe to the events they care about, rendering each item incrementally as updates arrive. The per-item lifecycle is always: `item/started` → zero or more item-specific deltas → `item/completed`. - `turn/started` — `{ turn }` with the turn id, empty `items`, and `status: "inProgress"`. - `turn/completed` — `{ turn }` where `turn.status` is `completed`, `interrupted`, or `failed`; failures carry `{ error: { message, codexErrorInfo? } }`. diff --git a/codex-rs/app-server/src/bespoke_event_handling.rs b/codex-rs/app-server/src/bespoke_event_handling.rs index d5037358b3..92a05d11d2 100644 --- a/codex-rs/app-server/src/bespoke_event_handling.rs +++ b/codex-rs/app-server/src/bespoke_event_handling.rs @@ -36,6 +36,7 @@ use codex_app_server_protocol::SandboxCommandAssessment as V2SandboxCommandAsses use codex_app_server_protocol::ServerNotification; use codex_app_server_protocol::ServerRequestPayload; use codex_app_server_protocol::ThreadItem; +use codex_app_server_protocol::ThreadTokenUsageUpdatedNotification; use codex_app_server_protocol::Turn; use codex_app_server_protocol::TurnCompletedNotification; use codex_app_server_protocol::TurnDiffUpdatedNotification; @@ -54,6 +55,7 @@ use codex_core::protocol::McpToolCallBeginEvent; use codex_core::protocol::McpToolCallEndEvent; use codex_core::protocol::Op; use codex_core::protocol::ReviewDecision; +use codex_core::protocol::TokenCountEvent; use codex_core::protocol::TurnDiffEvent; use codex_core::review_format::format_review_findings_block; use codex_protocol::ConversationId; @@ -295,15 +297,7 @@ pub(crate) async fn apply_bespoke_event_handling( .await; } EventMsg::TokenCount(token_count_event) => { - if let Some(rate_limits) = token_count_event.rate_limits { - outgoing - .send_server_notification(ServerNotification::AccountRateLimitsUpdated( - AccountRateLimitsUpdatedNotification { - rate_limits: rate_limits.into(), - }, - )) - .await; - } + handle_token_count_event(conversation_id, event_id, token_count_event, &outgoing).await; } EventMsg::Error(ev) => { let turn_error = TurnError { @@ -694,6 +688,33 @@ async fn handle_turn_interrupted( .await; } +async fn handle_token_count_event( + conversation_id: ConversationId, + turn_id: String, + token_count_event: TokenCountEvent, + outgoing: &OutgoingMessageSender, +) { + let TokenCountEvent { info, rate_limits } = token_count_event; + let notification = ThreadTokenUsageUpdatedNotification { + thread_id: conversation_id.to_string(), + turn_id, + token_usage: info.map(Into::into), + }; + outgoing + .send_server_notification(ServerNotification::ThreadTokenUsageUpdated(notification)) + .await; + + if let Some(rate_limits) = rate_limits { + outgoing + .send_server_notification(ServerNotification::AccountRateLimitsUpdated( + AccountRateLimitsUpdatedNotification { + rate_limits: rate_limits.into(), + }, + )) + .await; + } +} + async fn handle_error( conversation_id: ConversationId, error: TurnError, @@ -1059,7 +1080,12 @@ mod tests { use anyhow::Result; use anyhow::anyhow; use anyhow::bail; + use codex_core::protocol::CreditsSnapshot; use codex_core::protocol::McpInvocation; + use codex_core::protocol::RateLimitSnapshot; + use codex_core::protocol::RateLimitWindow; + use codex_core::protocol::TokenUsage; + use codex_core::protocol::TokenUsageInfo; use mcp_types::CallToolResult; use mcp_types::ContentBlock; use mcp_types::TextContent; @@ -1219,6 +1245,128 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_handle_token_count_event_emits_usage_and_rate_limits() -> Result<()> { + let conversation_id = ConversationId::new(); + let turn_id = "turn-123".to_string(); + let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY); + let outgoing = Arc::new(OutgoingMessageSender::new(tx)); + + let info = TokenUsageInfo { + total_token_usage: TokenUsage { + input_tokens: 100, + cached_input_tokens: 25, + output_tokens: 50, + reasoning_output_tokens: 9, + total_tokens: 200, + }, + last_token_usage: TokenUsage { + input_tokens: 10, + cached_input_tokens: 5, + output_tokens: 7, + reasoning_output_tokens: 1, + total_tokens: 23, + }, + model_context_window: Some(4096), + }; + let rate_limits = RateLimitSnapshot { + primary: Some(RateLimitWindow { + used_percent: 42.5, + window_minutes: Some(15), + resets_at: Some(1700000000), + }), + secondary: None, + credits: Some(CreditsSnapshot { + has_credits: true, + unlimited: false, + balance: Some("5".to_string()), + }), + }; + + handle_token_count_event( + conversation_id, + turn_id.clone(), + TokenCountEvent { + info: Some(info), + rate_limits: Some(rate_limits), + }, + &outgoing, + ) + .await; + + let first = rx + .recv() + .await + .ok_or_else(|| anyhow!("expected usage notification"))?; + match first { + OutgoingMessage::AppServerNotification( + ServerNotification::ThreadTokenUsageUpdated(payload), + ) => { + assert_eq!(payload.thread_id, conversation_id.to_string()); + assert_eq!(payload.turn_id, turn_id); + let usage = payload + .token_usage + .expect("token usage should be present in notification"); + assert_eq!(usage.total.total_tokens, 200); + assert_eq!(usage.total.cached_input_tokens, 25); + assert_eq!(usage.last.output_tokens, 7); + assert_eq!(usage.model_context_window, Some(4096)); + } + other => bail!("unexpected notification: {other:?}"), + } + + let second = rx + .recv() + .await + .ok_or_else(|| anyhow!("expected rate limit notification"))?; + match second { + OutgoingMessage::AppServerNotification( + ServerNotification::AccountRateLimitsUpdated(payload), + ) => { + assert!(payload.rate_limits.primary.is_some()); + assert!(payload.rate_limits.credits.is_some()); + } + other => bail!("unexpected notification: {other:?}"), + } + Ok(()) + } + + #[tokio::test] + async fn test_handle_token_count_event_without_usage_info() -> Result<()> { + let conversation_id = ConversationId::new(); + let turn_id = "turn-456".to_string(); + let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY); + let outgoing = Arc::new(OutgoingMessageSender::new(tx)); + + handle_token_count_event( + conversation_id, + turn_id.clone(), + TokenCountEvent { + info: None, + rate_limits: None, + }, + &outgoing, + ) + .await; + + let msg = rx + .recv() + .await + .ok_or_else(|| anyhow!("expected usage notification even without info"))?; + match msg { + OutgoingMessage::AppServerNotification( + ServerNotification::ThreadTokenUsageUpdated(payload), + ) => { + assert_eq!(payload.thread_id, conversation_id.to_string()); + assert_eq!(payload.turn_id, turn_id); + assert!(payload.token_usage.is_none()); + } + other => bail!("unexpected notification: {other:?}"), + } + assert!(rx.try_recv().is_err(), "no extra messages expected"); + Ok(()) + } + #[tokio::test] async fn test_construct_mcp_tool_call_begin_notification_with_args() { let begin_event = McpToolCallBeginEvent { From c63b23ba99e0391fda90b261e71672c28f73458a Mon Sep 17 00:00:00 2001 From: celia-oai Date: Tue, 25 Nov 2025 10:45:30 -0800 Subject: [PATCH 2/2] comment --- .../app-server-protocol/src/protocol/v2.rs | 2 +- codex-rs/app-server/README.md | 2 +- .../app-server/src/bespoke_event_handling.rs | 179 ++++++++++-------- 3 files changed, 99 insertions(+), 84 deletions(-) diff --git a/codex-rs/app-server-protocol/src/protocol/v2.rs b/codex-rs/app-server-protocol/src/protocol/v2.rs index 0cd7c5e9de..3ee502ebd5 100644 --- a/codex-rs/app-server-protocol/src/protocol/v2.rs +++ b/codex-rs/app-server-protocol/src/protocol/v2.rs @@ -788,7 +788,7 @@ pub struct AccountUpdatedNotification { pub struct ThreadTokenUsageUpdatedNotification { pub thread_id: String, pub turn_id: String, - pub token_usage: Option, + pub token_usage: ThreadTokenUsage, } #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] diff --git a/codex-rs/app-server/README.md b/codex-rs/app-server/README.md index fb31f083ea..176cd27339 100644 --- a/codex-rs/app-server/README.md +++ b/codex-rs/app-server/README.md @@ -240,7 +240,7 @@ Event notifications are the server-initiated event stream for thread lifecycles, ### Turn events -The app-server streams JSON-RPC notifications while a turn is running. Each turn starts with `turn/started` (initial `turn`) and ends with `turn/completed` (final `turn` status). Token usage deltas stream separately via `thread/tokenUsage/updated`. Clients subscribe to the events they care about, rendering each item incrementally as updates arrive. The per-item lifecycle is always: `item/started` → zero or more item-specific deltas → `item/completed`. +The app-server streams JSON-RPC notifications while a turn is running. Each turn starts with `turn/started` (initial `turn`) and ends with `turn/completed` (final `turn` status). Token usage events stream separately via `thread/tokenUsage/updated`. Clients subscribe to the events they care about, rendering each item incrementally as updates arrive. The per-item lifecycle is always: `item/started` → zero or more item-specific deltas → `item/completed`. - `turn/started` — `{ turn }` with the turn id, empty `items`, and `status: "inProgress"`. - `turn/completed` — `{ turn }` where `turn.status` is `completed`, `interrupted`, or `failed`; failures carry `{ error: { message, codexErrorInfo? } }`. diff --git a/codex-rs/app-server/src/bespoke_event_handling.rs b/codex-rs/app-server/src/bespoke_event_handling.rs index 92a05d11d2..4b4e40d0c6 100644 --- a/codex-rs/app-server/src/bespoke_event_handling.rs +++ b/codex-rs/app-server/src/bespoke_event_handling.rs @@ -36,6 +36,7 @@ use codex_app_server_protocol::SandboxCommandAssessment as V2SandboxCommandAsses use codex_app_server_protocol::ServerNotification; use codex_app_server_protocol::ServerRequestPayload; use codex_app_server_protocol::ThreadItem; +use codex_app_server_protocol::ThreadTokenUsage; use codex_app_server_protocol::ThreadTokenUsageUpdatedNotification; use codex_app_server_protocol::Turn; use codex_app_server_protocol::TurnCompletedNotification; @@ -78,10 +79,19 @@ pub(crate) async fn apply_bespoke_event_handling( turn_summary_store: TurnSummaryStore, api_version: ApiVersion, ) { - let Event { id: event_id, msg } = event; + let Event { + id: event_turn_id, + msg, + } = event; match msg { EventMsg::TaskComplete(_ev) => { - handle_turn_complete(conversation_id, event_id, &outgoing, &turn_summary_store).await; + handle_turn_complete( + conversation_id, + event_turn_id, + &outgoing, + &turn_summary_store, + ) + .await; } EventMsg::ApplyPatchApprovalRequest(ApplyPatchApprovalRequestEvent { call_id, @@ -102,7 +112,7 @@ pub(crate) async fn apply_bespoke_event_handling( .send_request(ServerRequestPayload::ApplyPatchApproval(params)) .await; tokio::spawn(async move { - on_patch_approval_response(event_id, rx, conversation).await; + on_patch_approval_response(event_turn_id, rx, conversation).await; }); } ApiVersion::V2 => { @@ -124,7 +134,7 @@ pub(crate) async fn apply_bespoke_event_handling( }; let notification = ItemStartedNotification { thread_id: conversation_id.to_string(), - turn_id: event_id.clone(), + turn_id: event_turn_id.clone(), item, }; outgoing @@ -144,7 +154,7 @@ pub(crate) async fn apply_bespoke_event_handling( .await; tokio::spawn(async move { on_file_change_request_approval_response( - event_id, + event_turn_id, conversation_id, item_id, patch_changes, @@ -180,7 +190,7 @@ pub(crate) async fn apply_bespoke_event_handling( .send_request(ServerRequestPayload::ExecCommandApproval(params)) .await; tokio::spawn(async move { - on_exec_approval_response(event_id, rx, conversation).await; + on_exec_approval_response(event_turn_id, rx, conversation).await; }); } ApiVersion::V2 => { @@ -208,7 +218,7 @@ pub(crate) async fn apply_bespoke_event_handling( .await; tokio::spawn(async move { on_command_execution_request_approval_response( - event_id, + event_turn_id, conversation_id, item_id, command_string, @@ -227,7 +237,7 @@ pub(crate) async fn apply_bespoke_event_handling( let notification = construct_mcp_tool_call_notification( begin_event, conversation_id.to_string(), - event_id.clone(), + event_turn_id.clone(), ) .await; outgoing @@ -238,7 +248,7 @@ pub(crate) async fn apply_bespoke_event_handling( let notification = construct_mcp_tool_call_end_notification( end_event, conversation_id.to_string(), - event_id.clone(), + event_turn_id.clone(), ) .await; outgoing @@ -257,7 +267,7 @@ pub(crate) async fn apply_bespoke_event_handling( EventMsg::ContextCompacted(..) => { let notification = ContextCompactedNotification { thread_id: conversation_id.to_string(), - turn_id: event_id.clone(), + turn_id: event_turn_id.clone(), }; outgoing .send_server_notification(ServerNotification::ContextCompacted(notification)) @@ -297,7 +307,8 @@ pub(crate) async fn apply_bespoke_event_handling( .await; } EventMsg::TokenCount(token_count_event) => { - handle_token_count_event(conversation_id, event_id, token_count_event, &outgoing).await; + handle_token_count_event(conversation_id, event_turn_id, token_count_event, &outgoing) + .await; } EventMsg::Error(ev) => { let turn_error = TurnError { @@ -309,7 +320,7 @@ pub(crate) async fn apply_bespoke_event_handling( .send_server_notification(ServerNotification::Error(ErrorNotification { error: turn_error, thread_id: conversation_id.to_string(), - turn_id: event_id.clone(), + turn_id: event_turn_id.clone(), })) .await; } @@ -324,16 +335,16 @@ pub(crate) async fn apply_bespoke_event_handling( .send_server_notification(ServerNotification::Error(ErrorNotification { error: turn_error, thread_id: conversation_id.to_string(), - turn_id: event_id.clone(), + turn_id: event_turn_id.clone(), })) .await; } EventMsg::EnteredReviewMode(review_request) => { let notification = ItemStartedNotification { thread_id: conversation_id.to_string(), - turn_id: event_id.clone(), + turn_id: event_turn_id.clone(), item: ThreadItem::CodeReview { - id: event_id.clone(), + id: event_turn_id.clone(), review: review_request.user_facing_hint, }, }; @@ -345,7 +356,7 @@ pub(crate) async fn apply_bespoke_event_handling( let item: ThreadItem = item_started_event.item.clone().into(); let notification = ItemStartedNotification { thread_id: conversation_id.to_string(), - turn_id: event_id.clone(), + turn_id: event_turn_id.clone(), item, }; outgoing @@ -356,7 +367,7 @@ pub(crate) async fn apply_bespoke_event_handling( let item: ThreadItem = item_completed_event.item.clone().into(); let notification = ItemCompletedNotification { thread_id: conversation_id.to_string(), - turn_id: event_id.clone(), + turn_id: event_turn_id.clone(), item, }; outgoing @@ -368,10 +379,10 @@ pub(crate) async fn apply_bespoke_event_handling( Some(output) => render_review_output_text(&output), None => REVIEW_FALLBACK_MESSAGE.to_string(), }; - let review_item_id = event_id.clone(); + let review_item_id = event_turn_id.clone(); let notification = ItemCompletedNotification { thread_id: conversation_id.to_string(), - turn_id: event_id.clone(), + turn_id: event_turn_id.clone(), item: ThreadItem::CodeReview { id: review_item_id, review: review_text, @@ -399,7 +410,7 @@ pub(crate) async fn apply_bespoke_event_handling( }; let notification = ItemStartedNotification { thread_id: conversation_id.to_string(), - turn_id: event_id.clone(), + turn_id: event_turn_id.clone(), item, }; outgoing @@ -423,7 +434,7 @@ pub(crate) async fn apply_bespoke_event_handling( item_id, changes, status, - event_id.clone(), + event_turn_id.clone(), outgoing.as_ref(), &turn_summary_store, ) @@ -451,7 +462,7 @@ pub(crate) async fn apply_bespoke_event_handling( }; let notification = ItemStartedNotification { thread_id: conversation_id.to_string(), - turn_id: event_id.clone(), + turn_id: event_turn_id.clone(), item, }; outgoing @@ -512,7 +523,7 @@ pub(crate) async fn apply_bespoke_event_handling( let notification = ItemCompletedNotification { thread_id: conversation_id.to_string(), - turn_id: event_id.clone(), + turn_id: event_turn_id.clone(), item, }; outgoing @@ -542,11 +553,22 @@ pub(crate) async fn apply_bespoke_event_handling( } } - handle_turn_interrupted(conversation_id, event_id, &outgoing, &turn_summary_store) - .await; + handle_turn_interrupted( + conversation_id, + event_turn_id, + &outgoing, + &turn_summary_store, + ) + .await; } EventMsg::TurnDiff(turn_diff_event) => { - handle_turn_diff(&event_id, turn_diff_event, api_version, outgoing.as_ref()).await; + handle_turn_diff( + &event_turn_id, + turn_diff_event, + api_version, + outgoing.as_ref(), + ) + .await; } _ => {} @@ -554,14 +576,14 @@ pub(crate) async fn apply_bespoke_event_handling( } async fn handle_turn_diff( - event_id: &str, + event_turn_id: &str, turn_diff_event: TurnDiffEvent, api_version: ApiVersion, outgoing: &OutgoingMessageSender, ) { if let ApiVersion::V2 = api_version { let notification = TurnDiffUpdatedNotification { - turn_id: event_id.to_string(), + turn_id: event_turn_id.to_string(), diff: turn_diff_event.unified_diff, }; outgoing @@ -572,14 +594,14 @@ async fn handle_turn_diff( async fn emit_turn_completed_with_status( conversation_id: ConversationId, - event_id: String, + event_turn_id: String, status: TurnStatus, outgoing: &OutgoingMessageSender, ) { let notification = TurnCompletedNotification { thread_id: conversation_id.to_string(), turn: Turn { - id: event_id, + id: event_turn_id, items: vec![], status, }, @@ -661,7 +683,7 @@ async fn find_and_remove_turn_summary( async fn handle_turn_complete( conversation_id: ConversationId, - event_id: String, + event_turn_id: String, outgoing: &OutgoingMessageSender, turn_summary_store: &TurnSummaryStore, ) { @@ -673,19 +695,24 @@ async fn handle_turn_complete( TurnStatus::Completed }; - emit_turn_completed_with_status(conversation_id, event_id, status, outgoing).await; + emit_turn_completed_with_status(conversation_id, event_turn_id, status, outgoing).await; } async fn handle_turn_interrupted( conversation_id: ConversationId, - event_id: String, + event_turn_id: String, outgoing: &OutgoingMessageSender, turn_summary_store: &TurnSummaryStore, ) { find_and_remove_turn_summary(conversation_id, turn_summary_store).await; - emit_turn_completed_with_status(conversation_id, event_id, TurnStatus::Interrupted, outgoing) - .await; + emit_turn_completed_with_status( + conversation_id, + event_turn_id, + TurnStatus::Interrupted, + outgoing, + ) + .await; } async fn handle_token_count_event( @@ -695,15 +722,16 @@ async fn handle_token_count_event( outgoing: &OutgoingMessageSender, ) { let TokenCountEvent { info, rate_limits } = token_count_event; - let notification = ThreadTokenUsageUpdatedNotification { - thread_id: conversation_id.to_string(), - turn_id, - token_usage: info.map(Into::into), - }; - outgoing - .send_server_notification(ServerNotification::ThreadTokenUsageUpdated(notification)) - .await; - + if let Some(token_usage) = info.map(ThreadTokenUsage::from) { + let notification = ThreadTokenUsageUpdatedNotification { + thread_id: conversation_id.to_string(), + turn_id, + token_usage, + }; + outgoing + .send_server_notification(ServerNotification::ThreadTokenUsageUpdated(notification)) + .await; + } if let Some(rate_limits) = rate_limits { outgoing .send_server_notification(ServerNotification::AccountRateLimitsUpdated( @@ -725,7 +753,7 @@ async fn handle_error( } async fn on_patch_approval_response( - event_id: String, + event_turn_id: String, receiver: oneshot::Receiver, codex: Arc, ) { @@ -736,7 +764,7 @@ async fn on_patch_approval_response( error!("request failed: {err:?}"); if let Err(submit_err) = codex .submit(Op::PatchApproval { - id: event_id.clone(), + id: event_turn_id.clone(), decision: ReviewDecision::Denied, }) .await @@ -757,7 +785,7 @@ async fn on_patch_approval_response( if let Err(err) = codex .submit(Op::PatchApproval { - id: event_id, + id: event_turn_id, decision: response.decision, }) .await @@ -767,7 +795,7 @@ async fn on_patch_approval_response( } async fn on_exec_approval_response( - event_id: String, + event_turn_id: String, receiver: oneshot::Receiver, conversation: Arc, ) { @@ -793,7 +821,7 @@ async fn on_exec_approval_response( if let Err(err) = conversation .submit(Op::ExecApproval { - id: event_id, + id: event_turn_id, decision: response.decision, }) .await @@ -866,7 +894,7 @@ fn format_file_change_diff(change: &CoreFileChange) -> String { #[allow(clippy::too_many_arguments)] async fn on_file_change_request_approval_response( - event_id: String, + event_turn_id: String, conversation_id: ConversationId, item_id: String, changes: Vec, @@ -911,7 +939,7 @@ async fn on_file_change_request_approval_response( item_id, changes, status, - event_id.clone(), + event_turn_id.clone(), outgoing.as_ref(), &turn_summary_store, ) @@ -920,7 +948,7 @@ async fn on_file_change_request_approval_response( if let Err(err) = codex .submit(Op::PatchApproval { - id: event_id, + id: event_turn_id, decision, }) .await @@ -931,7 +959,7 @@ async fn on_file_change_request_approval_response( #[allow(clippy::too_many_arguments)] async fn on_command_execution_request_approval_response( - event_id: String, + event_turn_id: String, conversation_id: ConversationId, item_id: String, command: String, @@ -983,7 +1011,7 @@ async fn on_command_execution_request_approval_response( if let Some(status) = completion_status { complete_command_execution_item( conversation_id, - event_id.clone(), + event_turn_id.clone(), item_id.clone(), command.clone(), cwd.clone(), @@ -996,7 +1024,7 @@ async fn on_command_execution_request_approval_response( if let Err(err) = conversation .submit(Op::ExecApproval { - id: event_id, + id: event_turn_id, decision, }) .await @@ -1129,14 +1157,14 @@ mod tests { #[tokio::test] async fn test_handle_turn_complete_emits_completed_without_error() -> Result<()> { let conversation_id = ConversationId::new(); - let event_id = "complete1".to_string(); + let event_turn_id = "complete1".to_string(); let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY); let outgoing = Arc::new(OutgoingMessageSender::new(tx)); let turn_summary_store = new_turn_summary_store(); handle_turn_complete( conversation_id, - event_id.clone(), + event_turn_id.clone(), &outgoing, &turn_summary_store, ) @@ -1148,7 +1176,7 @@ mod tests { .ok_or_else(|| anyhow!("should send one notification"))?; match msg { OutgoingMessage::AppServerNotification(ServerNotification::TurnCompleted(n)) => { - assert_eq!(n.turn.id, event_id); + assert_eq!(n.turn.id, event_turn_id); assert_eq!(n.turn.status, TurnStatus::Completed); } other => bail!("unexpected message: {other:?}"), @@ -1160,7 +1188,7 @@ mod tests { #[tokio::test] async fn test_handle_turn_interrupted_emits_interrupted_with_error() -> Result<()> { let conversation_id = ConversationId::new(); - let event_id = "interrupt1".to_string(); + let event_turn_id = "interrupt1".to_string(); let turn_summary_store = new_turn_summary_store(); handle_error( conversation_id, @@ -1176,7 +1204,7 @@ mod tests { handle_turn_interrupted( conversation_id, - event_id.clone(), + event_turn_id.clone(), &outgoing, &turn_summary_store, ) @@ -1188,7 +1216,7 @@ mod tests { .ok_or_else(|| anyhow!("should send one notification"))?; match msg { OutgoingMessage::AppServerNotification(ServerNotification::TurnCompleted(n)) => { - assert_eq!(n.turn.id, event_id); + assert_eq!(n.turn.id, event_turn_id); assert_eq!(n.turn.status, TurnStatus::Interrupted); } other => bail!("unexpected message: {other:?}"), @@ -1200,7 +1228,7 @@ mod tests { #[tokio::test] async fn test_handle_turn_complete_emits_failed_with_error() -> Result<()> { let conversation_id = ConversationId::new(); - let event_id = "complete_err1".to_string(); + let event_turn_id = "complete_err1".to_string(); let turn_summary_store = new_turn_summary_store(); handle_error( conversation_id, @@ -1216,7 +1244,7 @@ mod tests { handle_turn_complete( conversation_id, - event_id.clone(), + event_turn_id.clone(), &outgoing, &turn_summary_store, ) @@ -1228,7 +1256,7 @@ mod tests { .ok_or_else(|| anyhow!("should send one notification"))?; match msg { OutgoingMessage::AppServerNotification(ServerNotification::TurnCompleted(n)) => { - assert_eq!(n.turn.id, event_id); + assert_eq!(n.turn.id, event_turn_id); assert_eq!( n.turn.status, TurnStatus::Failed { @@ -1304,9 +1332,7 @@ mod tests { ) => { assert_eq!(payload.thread_id, conversation_id.to_string()); assert_eq!(payload.turn_id, turn_id); - let usage = payload - .token_usage - .expect("token usage should be present in notification"); + let usage = payload.token_usage; assert_eq!(usage.total.total_tokens, 200); assert_eq!(usage.total.cached_input_tokens, 25); assert_eq!(usage.last.output_tokens, 7); @@ -1349,21 +1375,10 @@ mod tests { ) .await; - let msg = rx - .recv() - .await - .ok_or_else(|| anyhow!("expected usage notification even without info"))?; - match msg { - OutgoingMessage::AppServerNotification( - ServerNotification::ThreadTokenUsageUpdated(payload), - ) => { - assert_eq!(payload.thread_id, conversation_id.to_string()); - assert_eq!(payload.turn_id, turn_id); - assert!(payload.token_usage.is_none()); - } - other => bail!("unexpected notification: {other:?}"), - } - assert!(rx.try_recv().is_err(), "no extra messages expected"); + assert!( + rx.try_recv().is_err(), + "no notifications should be emitted when token usage info is absent" + ); Ok(()) }