From 20c36b7e4d99730ae4c3a4697da16a6e166889c9 Mon Sep 17 00:00:00 2001 From: Anton Panasenko Date: Mon, 25 May 2026 11:22:38 -0700 Subject: [PATCH 1/7] remote control websocket stall diagnostics --- .../remote_control/client_tracker.rs | 60 ++++- .../src/transport/remote_control/mod.rs | 101 ++++++++- .../src/transport/remote_control/websocket.rs | 208 ++++++++++++++++-- 3 files changed, 341 insertions(+), 28 deletions(-) diff --git a/codex-rs/app-server-transport/src/transport/remote_control/client_tracker.rs b/codex-rs/app-server-transport/src/transport/remote_control/client_tracker.rs index 4639942b080..fc1b699f591 100644 --- a/codex-rs/app-server-transport/src/transport/remote_control/client_tracker.rs +++ b/codex-rs/app-server-transport/src/transport/remote_control/client_tracker.rs @@ -18,10 +18,16 @@ use tokio::sync::watch; use tokio::task::JoinSet; use tokio::time::Duration; use tokio::time::Instant; +use tokio::time::timeout; use tokio_util::sync::CancellationToken; +use tracing::warn; const REMOTE_CONTROL_CLIENT_IDLE_TIMEOUT: Duration = Duration::from_secs(10 * 60); pub(crate) const REMOTE_CONTROL_IDLE_SWEEP_INTERVAL: Duration = Duration::from_secs(30); +#[cfg(not(test))] +const REMOTE_CONTROL_TRANSPORT_EVENT_SEND_TIMEOUT: Duration = Duration::from_secs(5); +#[cfg(test)] +const REMOTE_CONTROL_TRANSPORT_EVENT_SEND_TIMEOUT: Duration = Duration::from_millis(10); #[derive(Debug)] pub(crate) struct Stopped; @@ -308,10 +314,32 @@ impl ClientTracker { } async fn send_transport_event(&self, event: TransportEvent) -> Result<(), Stopped> { - self.transport_event_tx - .send(event) - .await - .map_err(|_| Stopped) + let event_name = transport_event_name(&event); + match timeout( + REMOTE_CONTROL_TRANSPORT_EVENT_SEND_TIMEOUT, + self.transport_event_tx.send(event), + ) + .await + { + Ok(Ok(())) => Ok(()), + Ok(Err(_)) => Err(Stopped), + Err(_) => { + warn!( + transport_event = event_name, + timeout = ?REMOTE_CONTROL_TRANSPORT_EVENT_SEND_TIMEOUT, + "timed out forwarding remote control transport event" + ); + Err(Stopped) + } + } + } +} + +fn transport_event_name(event: &TransportEvent) -> &'static str { + match event { + TransportEvent::ConnectionOpened { .. } => "connection_opened", + TransportEvent::ConnectionClosed { .. } => "connection_closed", + TransportEvent::IncomingMessage { .. } => "incoming_message", } } @@ -487,6 +515,30 @@ mod tests { .expect("shutdown should not hang on blocked server forwarding"); } + #[tokio::test] + async fn transport_event_send_times_out_when_queue_stays_full() { + let (server_event_tx, _server_event_rx) = mpsc::channel(CHANNEL_CAPACITY); + let (transport_event_tx, _transport_event_rx) = mpsc::channel(1); + let shutdown_token = CancellationToken::new(); + let client_tracker = + ClientTracker::new(server_event_tx, transport_event_tx.clone(), &shutdown_token); + + transport_event_tx + .send(TransportEvent::ConnectionClosed { + connection_id: next_connection_id(), + }) + .await + .expect("transport event queue should accept prefill"); + + let send_result = client_tracker + .send_transport_event(TransportEvent::ConnectionClosed { + connection_id: next_connection_id(), + }) + .await; + + assert!(send_result.is_err()); + } + #[tokio::test] async fn initialize_with_new_stream_id_opens_new_connection_for_same_client() { let (server_event_tx, _server_event_rx) = mpsc::channel(CHANNEL_CAPACITY); diff --git a/codex-rs/app-server-transport/src/transport/remote_control/mod.rs b/codex-rs/app-server-transport/src/transport/remote_control/mod.rs index ae6d79462c4..357a98a30de 100644 --- a/codex-rs/app-server-transport/src/transport/remote_control/mod.rs +++ b/codex-rs/app-server-transport/src/transport/remote_control/mod.rs @@ -19,16 +19,20 @@ use codex_app_server_protocol::RemoteControlConnectionStatus; use codex_app_server_protocol::RemoteControlStatusChangedNotification; use codex_login::AuthManager; use codex_state::StateRuntime; +use futures::FutureExt; use gethostname::gethostname; use std::error::Error; use std::fmt; use std::io; +use std::panic::AssertUnwindSafe; use std::sync::Arc; use tokio::sync::mpsc; use tokio::sync::oneshot; use tokio::sync::watch; use tokio::task::JoinHandle; use tokio_util::sync::CancellationToken; +use tracing::error; +use tracing::info; use tracing::warn; pub struct RemoteControlStartConfig { @@ -73,13 +77,21 @@ impl RemoteControlHandle { return Err(RemoteControlUnavailable); } - self.enabled_tx.send_if_modified(|state| { + let enabled_changed = self.enabled_tx.send_if_modified(|state| { let changed = !*state; *state = true; changed }); let status = self.status(); + info!( + enabled_changed, + current_status = ?status.status, + environment_id = ?status.environment_id, + installation_id = %status.installation_id, + server_name = %status.server_name, + "remote control enable requested" + ); if matches!( status.status, RemoteControlConnectionStatus::Connected | RemoteControlConnectionStatus::Connecting @@ -91,12 +103,21 @@ impl RemoteControlHandle { } pub fn disable(&self) -> RemoteControlStatusChangedNotification { - self.enabled_tx.send_if_modified(|state| { + let enabled_changed = self.enabled_tx.send_if_modified(|state| { let changed = *state; *state = false; changed }); + let status = self.status(); + info!( + enabled_changed, + current_status = ?status.status, + environment_id = ?status.environment_id, + installation_id = %status.installation_id, + server_name = %status.server_name, + "remote control disable requested" + ); self.publish_status(RemoteControlConnectionStatus::Disabled) } @@ -112,6 +133,7 @@ impl RemoteControlHandle { &self, connection_status: RemoteControlConnectionStatus, ) -> RemoteControlStatusChangedNotification { + let mut status_change = None; self.status_tx.send_if_modified(|status| { let next_status = remote_control_status_with_connection_status(status, connection_status); @@ -119,9 +141,21 @@ impl RemoteControlHandle { return false; } + status_change = Some((status.clone(), next_status.clone())); *status = next_status; true }); + if let Some((previous_status, next_status)) = status_change { + info!( + previous_status = ?previous_status.status, + next_status = ?next_status.status, + previous_environment_id = ?previous_status.environment_id, + next_environment_id = ?next_status.environment_id, + installation_id = %next_status.installation_id, + server_name = %next_status.server_name, + "remote control handle status changed" + ); + } self.status() } } @@ -165,6 +199,8 @@ pub async fn start_remote_control( let (enabled_tx, enabled_rx) = watch::channel(initial_enabled); let server_name = gethostname().to_string_lossy().trim().to_string(); + let remote_control_url = config.remote_control_url; + let installation_id = config.installation_id; let initial_status = RemoteControlStatusChangedNotification { status: if initial_enabled { RemoteControlConnectionStatus::Connecting @@ -172,16 +208,35 @@ pub async fn start_remote_control( RemoteControlConnectionStatus::Disabled }, server_name: server_name.clone(), - installation_id: config.installation_id.clone(), + installation_id: installation_id.clone(), environment_id: None, }; let (status_tx, _status_rx) = watch::channel(initial_status); let status_publisher = RemoteControlStatusPublisher::new(status_tx.clone()); + info!( + remote_control_url = %remote_control_url, + installation_id = %installation_id, + server_name = %server_name, + state_db_available, + initial_enabled, + "starting app-server remote control websocket task" + ); + let remote_control_url_for_log = remote_control_url.clone(); + let installation_id_for_log = installation_id.clone(); + let server_name_for_log = server_name.clone(); + let shutdown_token_for_log = shutdown_token.clone(); let join_handle = tokio::spawn(async move { - RemoteControlWebsocket::new( + info!( + remote_control_url = %remote_control_url_for_log, + installation_id = %installation_id_for_log, + server_name = %server_name_for_log, + initial_enabled, + "app-server remote control websocket task started" + ); + let websocket_task = RemoteControlWebsocket::new( websocket::RemoteControlWebsocketConfig { - remote_control_url: config.remote_control_url, - installation_id: config.installation_id, + remote_control_url, + installation_id, remote_control_target, server_name, }, @@ -194,8 +249,38 @@ pub async fn start_remote_control( shutdown_token, enabled_rx, ) - .run(app_server_client_name_rx) - .await; + .run(app_server_client_name_rx); + match AssertUnwindSafe(websocket_task).catch_unwind().await { + Ok(()) => { + let shutdown_requested = shutdown_token_for_log.is_cancelled(); + if shutdown_requested { + info!( + remote_control_url = %remote_control_url_for_log, + installation_id = %installation_id_for_log, + server_name = %server_name_for_log, + shutdown_requested, + "app-server remote control websocket task exited" + ); + } else { + warn!( + remote_control_url = %remote_control_url_for_log, + installation_id = %installation_id_for_log, + server_name = %server_name_for_log, + shutdown_requested, + "app-server remote control websocket task exited without shutdown" + ); + } + } + Err(panic) => { + error!( + remote_control_url = %remote_control_url_for_log, + installation_id = %installation_id_for_log, + server_name = %server_name_for_log, + "app-server remote control websocket task panicked" + ); + std::panic::resume_unwind(panic); + } + } }); Ok(( diff --git a/codex-rs/app-server-transport/src/transport/remote_control/websocket.rs b/codex-rs/app-server-transport/src/transport/remote_control/websocket.rs index 1e696c2e462..5d41d5c44d2 100644 --- a/codex-rs/app-server-transport/src/transport/remote_control/websocket.rs +++ b/codex-rs/app-server-transport/src/transport/remote_control/websocket.rs @@ -66,6 +66,10 @@ const REMOTE_CONTROL_ACCOUNT_ID_RETRY_INTERVAL: std::time::Duration = std::time::Duration::from_secs(1); const REMOTE_CONTROL_RECONNECT_BACKOFF_CAP: std::time::Duration = std::time::Duration::from_secs(30); +const REMOTE_CONTROL_WEBSOCKET_CONNECT_TIMEOUT: std::time::Duration = + std::time::Duration::from_secs(30); +const REMOTE_CONTROL_CONNECTION_SHUTDOWN_TIMEOUT: std::time::Duration = + std::time::Duration::from_secs(5); struct BoundedOutboundBuffer { buffer_by_stream: HashMap<(ClientId, StreamId), VecDeque>, @@ -255,6 +259,14 @@ enum ConnectOutcome { Shutdown, } +#[derive(Debug, Clone, Copy)] +enum ConnectionEndReason { + Shutdown, + Disabled, + EnabledWatchClosed, + ConnectionWorkerStopped, +} + pub(super) struct RemoteControlChannels { pub(super) transport_event_tx: mpsc::Sender, pub(super) status_publisher: RemoteControlStatusPublisher, @@ -270,7 +282,12 @@ impl RemoteControlStatusPublisher { Self { tx } } + fn status(&self) -> RemoteControlStatusChangedNotification { + self.tx.borrow().clone() + } + fn publish_status(&self, connection_status: RemoteControlConnectionStatus) { + let mut status_change = None; self.tx.send_if_modified(|status| { let next_status = remote_control_status_with_connection_status(status, connection_status); @@ -278,12 +295,25 @@ impl RemoteControlStatusPublisher { return false; } + status_change = Some((status.clone(), next_status.clone())); *status = next_status; true }); + if let Some((previous_status, next_status)) = status_change { + info!( + previous_status = ?previous_status.status, + next_status = ?next_status.status, + previous_environment_id = ?previous_status.environment_id, + next_environment_id = ?next_status.environment_id, + installation_id = %next_status.installation_id, + server_name = %next_status.server_name, + "remote control websocket status changed" + ); + } } fn publish_environment_id(&self, environment_id: Option) { + let mut status_change = None; self.tx.send_if_modified(|status| { if status.status == RemoteControlConnectionStatus::Disabled { return false; @@ -298,9 +328,20 @@ impl RemoteControlStatusPublisher { return false; } + status_change = Some((status.clone(), next_status.clone())); *status = next_status; true }); + if let Some((previous_status, next_status)) = status_change { + info!( + status = ?next_status.status, + previous_environment_id = ?previous_status.environment_id, + next_environment_id = ?next_status.environment_id, + installation_id = %next_status.installation_id, + server_name = %next_status.server_name, + "remote control websocket environment changed" + ); + } } } @@ -367,12 +408,26 @@ impl RemoteControlWebsocket { mut self, app_server_client_name_rx: Option>, ) { + info!( + remote_control_url = %self.remote_control_url, + installation_id = %self.installation_id, + server_name = %self.server_name, + initial_enabled = *self.enabled_rx.borrow(), + "app-server remote control websocket loop started" + ); let app_server_client_name = match self .wait_for_app_server_client_name(app_server_client_name_rx) .await { Ok(app_server_client_name) => app_server_client_name, Err(_) => { + warn!( + remote_control_url = %self.remote_control_url, + installation_id = %self.installation_id, + server_name = %self.server_name, + shutdown_requested = self.shutdown_token.is_cancelled(), + "app-server remote control websocket loop stopped before client name was ready" + ); self.client_tracker.lock().await.shutdown().await; return; } @@ -380,9 +435,27 @@ impl RemoteControlWebsocket { loop { if !self.wait_until_enabled().await { + info!( + remote_control_url = %self.remote_control_url, + installation_id = %self.installation_id, + server_name = %self.server_name, + shutdown_requested = self.shutdown_token.is_cancelled(), + current_status = ?self.status_publisher.status().status, + "app-server remote control websocket loop exiting while waiting for enablement" + ); break; } + let status = self.status_publisher.status(); + info!( + remote_control_url = %self.remote_control_url, + installation_id = %self.installation_id, + server_name = %self.server_name, + reconnect_attempt = self.reconnect_attempt.saturating_add(1), + current_status = ?status.status, + environment_id = ?status.environment_id, + "starting app-server remote control websocket connection cycle" + ); let shutdown_token = self.shutdown_token.child_token(); let websocket_connection = match self .connect(&shutdown_token, app_server_client_name.as_deref()) @@ -397,11 +470,30 @@ impl RemoteControlWebsocket { ConnectOutcome::Shutdown => break, }; - self.run_connection(websocket_connection, shutdown_token) + let connection_end_reason = self + .run_connection(websocket_connection, shutdown_token) .await; + let status = self.status_publisher.status(); + info!( + remote_control_url = %self.remote_control_url, + installation_id = %self.installation_id, + server_name = %self.server_name, + connection_end_reason = ?connection_end_reason, + current_status = ?status.status, + environment_id = ?status.environment_id, + enabled = *self.enabled_rx.borrow(), + "app-server remote control websocket connection cycle ended" + ); } self.client_tracker.lock().await.shutdown().await; + info!( + remote_control_url = %self.remote_control_url, + installation_id = %self.installation_id, + server_name = %self.server_name, + shutdown_requested = self.shutdown_token.is_cancelled(), + "app-server remote control websocket loop exited" + ); } async fn wait_for_app_server_client_name( @@ -462,6 +554,19 @@ impl RemoteControlWebsocket { loop { let subscribe_cursor = self.state.lock().await.subscribe_cursor.clone(); + let enrollment = self.enrollment.as_ref(); + info!( + websocket_url = %remote_control_target.websocket_url, + installation_id = %self.installation_id, + server_name = %self.server_name, + reconnect_attempt = self.reconnect_attempt.saturating_add(1), + has_enrollment = enrollment.is_some(), + server_id = ?enrollment.map(|enrollment| enrollment.server_id.as_str()), + environment_id = ?enrollment.map(|enrollment| enrollment.environment_id.as_str()), + subscribe_cursor_present = subscribe_cursor.is_some(), + app_server_client_name = ?app_server_client_name, + "connecting to app-server remote control websocket" + ); let connect_options = RemoteControlConnectOptions { installation_id: &self.installation_id, server_name: &self.server_name, @@ -500,10 +605,16 @@ impl RemoteControlWebsocket { self.auth_recovery = self.auth_manager.unauthorized_recovery(); self.status_publisher .publish_status(RemoteControlConnectionStatus::Connected); + let enrollment = self.enrollment.as_ref(); info!( - "connected to app-server remote control websocket: {}, {}", - remote_control_target.websocket_url, - format_headers(response.headers()) + websocket_url = %remote_control_target.websocket_url, + installation_id = %self.installation_id, + server_name = %self.server_name, + server_id = ?enrollment.map(|enrollment| enrollment.server_id.as_str()), + environment_id = ?enrollment.map(|enrollment| enrollment.environment_id.as_str()), + subscribe_cursor_present = subscribe_cursor.is_some(), + response_headers = %format_headers(response.headers()), + "connected to app-server remote control websocket" ); return ConnectOutcome::Connected(Box::new(websocket_connection)); } @@ -519,12 +630,20 @@ impl RemoteControlWebsocket { let reconnect_attempt = self.reconnect_attempt.saturating_add(1); let (reconnect_delay, reconnect_backoff_reset) = next_reconnect_delay(&mut self.reconnect_attempt); + let enrollment = self.enrollment.as_ref(); warn!( websocket_url = %remote_control_target.websocket_url, + installation_id = %self.installation_id, + server_name = %self.server_name, error = %err, + error_kind = ?err.kind(), reconnect_attempt, reconnect_delay = ?reconnect_delay, reconnect_backoff_reset, + has_enrollment = enrollment.is_some(), + server_id = ?enrollment.map(|enrollment| enrollment.server_id.as_str()), + environment_id = ?enrollment.map(|enrollment| enrollment.environment_id.as_str()), + subscribe_cursor_present = subscribe_cursor.is_some(), "failed to connect to app-server remote control websocket" ); if reconnect_backoff_reset { @@ -562,7 +681,7 @@ impl RemoteControlWebsocket { &self, websocket_connection: WebSocketStream>, shutdown_token: CancellationToken, - ) { + ) -> ConnectionEndReason { let (websocket_writer, websocket_reader) = websocket_connection.split(); let mut join_set = tokio::task::JoinSet::new(); @@ -583,19 +702,48 @@ impl RemoteControlWebsocket { )); let mut enabled_rx = self.enabled_rx.clone(); - tokio::select! { - _ = shutdown_token.cancelled() => {} + let connection_end_reason = tokio::select! { + _ = shutdown_token.cancelled() => ConnectionEndReason::Shutdown, changed = enabled_rx.wait_for(|enabled| !*enabled) => { if changed.is_ok() { self.status_publisher .publish_status(RemoteControlConnectionStatus::Disabled); + ConnectionEndReason::Disabled + } else { + ConnectionEndReason::EnabledWatchClosed } } - _ = join_set.join_next() => {} + _ = join_set.join_next() => ConnectionEndReason::ConnectionWorkerStopped, }; shutdown_token.cancel(); - join_set.join_all().await; + Self::join_connection_workers(&mut join_set, REMOTE_CONTROL_CONNECTION_SHUTDOWN_TIMEOUT) + .await; + connection_end_reason + } + + async fn join_connection_workers( + join_set: &mut tokio::task::JoinSet<()>, + shutdown_timeout: std::time::Duration, + ) { + if tokio::time::timeout(shutdown_timeout, Self::drain_join_set(join_set)) + .await + .is_ok() + { + return; + } + + warn!( + shutdown_timeout = ?shutdown_timeout, + remaining_workers = join_set.len(), + "timed out waiting for remote control connection workers to stop; aborting" + ); + join_set.abort_all(); + Self::drain_join_set(join_set).await; + } + + async fn drain_join_set(join_set: &mut tokio::task::JoinSet<()>) { + while join_set.join_next().await.is_some() {} } async fn run_server_writer( @@ -675,12 +823,14 @@ impl RemoteControlWebsocket { let queued_server_envelope = tokio::select! { _ = shutdown_token.cancelled() => return Ok(()), _ = ping_interval.tick() => { - if let Err(err) = websocket_writer - .send(tungstenite::Message::Ping(Vec::new().into())) - .await - { - return Err(io::Error::other(err)); - } + tokio::select! { + _ = shutdown_token.cancelled() => return Ok(()), + send_result = websocket_writer.send(tungstenite::Message::Ping(Vec::new().into())) => { + if let Err(err) = send_result { + return Err(io::Error::other(err)); + } + } + }; continue; } wait_result = used_rx.changed(), if !outbound_has_capacity => @@ -1190,7 +1340,22 @@ pub(super) async fn connect_remote_control_websocket( connect_options.subscribe_cursor, )?; - match connect_async(request).await { + let websocket_connect_result = tokio::time::timeout( + REMOTE_CONTROL_WEBSOCKET_CONNECT_TIMEOUT, + connect_async(request), + ) + .await + .map_err(|_| { + io::Error::new( + ErrorKind::TimedOut, + format!( + "timed out connecting to remote control websocket at `{}` after {:?}", + remote_control_target.websocket_url, REMOTE_CONTROL_WEBSOCKET_CONNECT_TIMEOUT + ), + ) + })?; + + match websocket_connect_result { Ok((websocket_stream, response)) => Ok((websocket_stream, response.map(|_| ()))), Err(err) => { match &err { @@ -2013,6 +2178,17 @@ mod tests { .expect("writer should stop cleanly"); } + #[tokio::test] + async fn join_connection_workers_aborts_stuck_worker_after_timeout() { + let mut join_set = tokio::task::JoinSet::new(); + join_set.spawn(futures::future::pending::<()>()); + + RemoteControlWebsocket::join_connection_workers(&mut join_set, Duration::from_millis(10)) + .await; + + assert!(join_set.is_empty()); + } + #[tokio::test] async fn run_server_writer_inner_assigns_contiguous_seq_ids_per_stream() { let (client_stream, mut server_stream) = connected_websocket_pair().await; From b7fde572aeafd11b9196174ea4324576349ba9c2 Mon Sep 17 00:00:00 2001 From: Anton Panasenko Date: Mon, 25 May 2026 11:38:54 -0700 Subject: [PATCH 2/7] preserve remote control close events --- .../remote_control/client_tracker.rs | 89 ++++++++++++++++++- 1 file changed, 87 insertions(+), 2 deletions(-) diff --git a/codex-rs/app-server-transport/src/transport/remote_control/client_tracker.rs b/codex-rs/app-server-transport/src/transport/remote_control/client_tracker.rs index fc1b699f591..c63102b605c 100644 --- a/codex-rs/app-server-transport/src/transport/remote_control/client_tracker.rs +++ b/codex-rs/app-server-transport/src/transport/remote_control/client_tracker.rs @@ -314,6 +314,15 @@ impl ClientTracker { } async fn send_transport_event(&self, event: TransportEvent) -> Result<(), Stopped> { + // Dropping a close after removing the tracked client leaves app-server state stale. + if matches!(&event, TransportEvent::ConnectionClosed { .. }) { + return self + .transport_event_tx + .send(event) + .await + .map_err(|_| Stopped); + } + let event_name = transport_event_name(&event); match timeout( REMOTE_CONTROL_TRANSPORT_EVENT_SEND_TIMEOUT, @@ -516,7 +525,7 @@ mod tests { } #[tokio::test] - async fn transport_event_send_times_out_when_queue_stays_full() { + async fn non_close_transport_event_send_times_out_when_queue_stays_full() { let (server_event_tx, _server_event_rx) = mpsc::channel(CHANNEL_CAPACITY); let (transport_event_tx, _transport_event_rx) = mpsc::channel(1); let shutdown_token = CancellationToken::new(); @@ -531,14 +540,90 @@ mod tests { .expect("transport event queue should accept prefill"); let send_result = client_tracker - .send_transport_event(TransportEvent::ConnectionClosed { + .send_transport_event(TransportEvent::IncomingMessage { connection_id: next_connection_id(), + message: JSONRPCMessage::Notification( + codex_app_server_protocol::JSONRPCNotification { + method: "initialized".to_string(), + params: None, + }, + ), }) .await; assert!(send_result.is_err()); } + #[tokio::test] + async fn close_client_waits_for_transport_event_queue_capacity() { + let (server_event_tx, _server_event_rx) = mpsc::channel(CHANNEL_CAPACITY); + let (transport_event_tx, mut transport_event_rx) = mpsc::channel(2); + let shutdown_token = CancellationToken::new(); + let mut client_tracker = + ClientTracker::new(server_event_tx, transport_event_tx, &shutdown_token); + + client_tracker + .handle_message(initialize_envelope_with_stream_id( + "client-1", + Some("stream-1"), + )) + .await + .expect("initialize should open client"); + let connection_id = match transport_event_rx.recv().await.expect("open event") { + TransportEvent::ConnectionOpened { connection_id, .. } => connection_id, + other => panic!("expected connection opened, got {other:?}"), + }; + let _ = transport_event_rx.recv().await.expect("initialize event"); + + for _ in 0..2 { + client_tracker + .transport_event_tx + .send(TransportEvent::IncomingMessage { + connection_id, + message: JSONRPCMessage::Notification( + codex_app_server_protocol::JSONRPCNotification { + method: "initialized".to_string(), + params: None, + }, + ), + }) + .await + .expect("transport event queue should accept prefill"); + } + + let client_key = ( + ClientId("client-1".to_string()), + StreamId("stream-1".to_string()), + ); + let close_client = client_tracker.close_client(&client_key); + tokio::pin!(close_client); + assert!( + timeout(Duration::from_millis(20), &mut close_client) + .await + .is_err() + ); + + for _ in 0..2 { + match transport_event_rx.recv().await.expect("prefilled event") { + TransportEvent::IncomingMessage { + connection_id: queued_connection_id, + .. + } => assert_eq!(queued_connection_id, connection_id), + other => panic!("expected incoming message, got {other:?}"), + } + } + + close_client + .await + .expect("close should forward after queue drains"); + match transport_event_rx.recv().await.expect("close event") { + TransportEvent::ConnectionClosed { + connection_id: closed_connection_id, + } => assert_eq!(closed_connection_id, connection_id), + other => panic!("expected connection closed, got {other:?}"), + } + } + #[tokio::test] async fn initialize_with_new_stream_id_opens_new_connection_for_same_client() { let (server_event_tx, _server_event_rx) = mpsc::channel(CHANNEL_CAPACITY); From 09aadb6dd63a5d879585672cef721c950ae18685 Mon Sep 17 00:00:00 2001 From: Anton Panasenko Date: Mon, 25 May 2026 11:52:38 -0700 Subject: [PATCH 3/7] preserve remote control inbound delivery --- .../remote_control/client_tracker.rs | 232 +++++++++++++++--- 1 file changed, 204 insertions(+), 28 deletions(-) diff --git a/codex-rs/app-server-transport/src/transport/remote_control/client_tracker.rs b/codex-rs/app-server-transport/src/transport/remote_control/client_tracker.rs index c63102b605c..ea19b70c09e 100644 --- a/codex-rs/app-server-transport/src/transport/remote_control/client_tracker.rs +++ b/codex-rs/app-server-transport/src/transport/remote_control/client_tracker.rs @@ -144,9 +144,6 @@ impl ClientTracker { if let Some(connection_id) = self.clients.get_mut(&client_key).map(|client| { client.last_activity_at = Instant::now(); - if let Some(seq_id) = seq_id { - client.last_inbound_seq_id = Some(seq_id); - } client.connection_id }) { self.send_transport_event(TransportEvent::IncomingMessage { @@ -154,6 +151,11 @@ impl ClientTracker { message, }) .await?; + if let Some(seq_id) = seq_id + && let Some(client) = self.clients.get_mut(&client_key) + { + client.last_inbound_seq_id = Some(seq_id); + } return Ok(()); } @@ -183,23 +185,35 @@ impl ClientTracker { disconnect_token.clone(), )); self.clients.insert( - client_key, + client_key.clone(), ClientState { connection_id, disconnect_token, last_activity_at: Instant::now(), - last_inbound_seq_id: if is_legacy_stream_id { None } else { seq_id }, + last_inbound_seq_id: None, status_tx, }, ); if is_legacy_stream_id { self.legacy_stream_ids.insert(client_id.clone(), stream_id); } - self.send_transport_event(TransportEvent::IncomingMessage { - connection_id, - message, - }) - .await + if let Err(err) = self + .send_transport_event(TransportEvent::IncomingMessage { + connection_id, + message, + }) + .await + { + self.close_client(&client_key).await?; + return Err(err); + } + if !is_legacy_stream_id + && let Some(seq_id) = seq_id + && let Some(client) = self.clients.get_mut(&client_key) + { + client.last_inbound_seq_id = Some(seq_id); + } + Ok(()) } ClientEvent::ClientMessageChunk { .. } | ClientEvent::Ack { .. } => Ok(()), ClientEvent::Ping => { @@ -314,13 +328,14 @@ impl ClientTracker { } async fn send_transport_event(&self, event: TransportEvent) -> Result<(), Stopped> { - // Dropping a close after removing the tracked client leaves app-server state stale. + // Close owns app-server cleanup, so keep delivery alive if the caller is aborted. if matches!(&event, TransportEvent::ConnectionClosed { .. }) { - return self - .transport_event_tx - .send(event) - .await - .map_err(|_| Stopped); + return tokio::spawn({ + let transport_event_tx = self.transport_event_tx.clone(); + async move { transport_event_tx.send(event).await.map_err(|_| Stopped) } + }) + .await + .map_err(|_| Stopped)?; } let event_name = transport_event_name(&event); @@ -407,6 +422,13 @@ mod tests { } } + fn initialized_notification() -> JSONRPCMessage { + JSONRPCMessage::Notification(codex_app_server_protocol::JSONRPCNotification { + method: "initialized".to_string(), + params: None, + }) + } + #[tokio::test] async fn cancelled_outbound_task_emits_connection_closed() { let (server_event_tx, _server_event_rx) = mpsc::channel(CHANNEL_CAPACITY); @@ -542,18 +564,116 @@ mod tests { let send_result = client_tracker .send_transport_event(TransportEvent::IncomingMessage { connection_id: next_connection_id(), - message: JSONRPCMessage::Notification( - codex_app_server_protocol::JSONRPCNotification { - method: "initialized".to_string(), - params: None, - }, - ), + message: initialized_notification(), }) .await; assert!(send_result.is_err()); } + #[tokio::test] + async fn incoming_message_timeout_does_not_advance_seq_id() { + let (server_event_tx, _server_event_rx) = mpsc::channel(CHANNEL_CAPACITY); + let (transport_event_tx, mut transport_event_rx) = mpsc::channel(2); + let shutdown_token = CancellationToken::new(); + let mut client_tracker = + ClientTracker::new(server_event_tx, transport_event_tx.clone(), &shutdown_token); + + client_tracker + .handle_message(initialize_envelope_with_stream_id( + "client-1", + Some("stream-1"), + )) + .await + .expect("initialize should open client"); + let connection_id = match transport_event_rx.recv().await.expect("open event") { + TransportEvent::ConnectionOpened { connection_id, .. } => connection_id, + other => panic!("expected connection opened, got {other:?}"), + }; + let _ = transport_event_rx.recv().await.expect("initialize event"); + + for _ in 0..2 { + transport_event_tx + .send(TransportEvent::ConnectionClosed { + connection_id: next_connection_id(), + }) + .await + .expect("transport event queue should accept prefill"); + } + + let retry_envelope = ClientEnvelope { + event: ClientEvent::ClientMessage { + message: initialized_notification(), + }, + client_id: ClientId("client-1".to_string()), + stream_id: Some(StreamId("stream-1".to_string())), + seq_id: Some(1), + cursor: None, + }; + assert!( + client_tracker + .handle_message(retry_envelope.clone()) + .await + .is_err() + ); + for _ in 0..2 { + let _ = transport_event_rx.recv().await.expect("prefilled event"); + } + + client_tracker + .handle_message(retry_envelope) + .await + .expect("retry should forward after timeout"); + match transport_event_rx.recv().await.expect("retried event") { + TransportEvent::IncomingMessage { + connection_id: queued_connection_id, + .. + } => assert_eq!(queued_connection_id, connection_id), + other => panic!("expected incoming message, got {other:?}"), + } + } + + #[tokio::test] + async fn initialize_timeout_closes_open_connection() { + let (server_event_tx, _server_event_rx) = mpsc::channel(CHANNEL_CAPACITY); + let (transport_event_tx, mut transport_event_rx) = mpsc::channel(1); + let shutdown_token = CancellationToken::new(); + let client_tracker = + ClientTracker::new(server_event_tx, transport_event_tx, &shutdown_token); + let mut handle_message = tokio::spawn(async move { + let mut client_tracker = client_tracker; + client_tracker + .handle_message(initialize_envelope_with_stream_id( + "client-1", + Some("stream-1"), + )) + .await + }); + + assert!( + timeout(Duration::from_millis(50), &mut handle_message) + .await + .is_err() + ); + let connection_id = match transport_event_rx.recv().await.expect("open event") { + TransportEvent::ConnectionOpened { connection_id, .. } => connection_id, + other => panic!("expected connection opened, got {other:?}"), + }; + + assert!( + handle_message + .await + .expect("handle message task should not panic") + .is_err() + ); + match transport_event_rx.recv().await.expect("close event") { + TransportEvent::ConnectionClosed { + connection_id: closed_connection_id, + } => assert_eq!(closed_connection_id, connection_id), + other => panic!("expected connection closed, got {other:?}"), + } + } + #[tokio::test] async fn close_client_waits_for_transport_event_queue_capacity() { let (server_event_tx, _server_event_rx) = mpsc::channel(CHANNEL_CAPACITY); @@ -580,12 +700,7 @@ mod tests { .transport_event_tx .send(TransportEvent::IncomingMessage { connection_id, - message: JSONRPCMessage::Notification( - codex_app_server_protocol::JSONRPCNotification { - method: "initialized".to_string(), - params: None, - }, - ), + message: initialized_notification(), }) .await .expect("transport event queue should accept prefill"); @@ -624,6 +739,67 @@ mod tests { } } + #[tokio::test] + async fn close_client_keeps_forwarding_after_caller_is_aborted() { + let (server_event_tx, _server_event_rx) = mpsc::channel(CHANNEL_CAPACITY); + let (transport_event_tx, mut transport_event_rx) = mpsc::channel(2); + let shutdown_token = CancellationToken::new(); + let mut client_tracker = + ClientTracker::new(server_event_tx, transport_event_tx, &shutdown_token); + + client_tracker + .handle_message(initialize_envelope_with_stream_id( + "client-1", + Some("stream-1"), + )) + .await + .expect("initialize should open client"); + let connection_id = match transport_event_rx.recv().await.expect("open event") { + TransportEvent::ConnectionOpened { connection_id, .. } => connection_id, + other => panic!("expected connection opened, got {other:?}"), + }; + let _ = transport_event_rx.recv().await.expect("initialize event"); + + for _ in 0..2 { + client_tracker + .transport_event_tx + .send(TransportEvent::IncomingMessage { + connection_id, + message: initialized_notification(), + }) + .await + .expect("transport event queue should accept prefill"); + } + + let client_key = ( + ClientId("client-1".to_string()), + StreamId("stream-1".to_string()), + ); + let mut close_client = + tokio::spawn(async move { client_tracker.close_client(&client_key).await }); + assert!( + timeout(Duration::from_millis(20), &mut close_client) + .await + .is_err() + ); + close_client.abort(); + let _ = close_client.await; + + for _ in 0..2 { + let _ = transport_event_rx.recv().await.expect("prefilled event"); + } + match timeout(Duration::from_secs(1), transport_event_rx.recv()) + .await + .expect("close should be delivered") + .expect("close event") + { + TransportEvent::ConnectionClosed { + connection_id: closed_connection_id, + } => assert_eq!(closed_connection_id, connection_id), + other => panic!("expected connection closed, got {other:?}"), + } + } + #[tokio::test] async fn initialize_with_new_stream_id_opens_new_connection_for_same_client() { let (server_event_tx, _server_event_rx) = mpsc::channel(CHANNEL_CAPACITY); From ecb13f6ff1e3d47b7618e369ff041677275d8dc1 Mon Sep 17 00:00:00 2001 From: Anton Panasenko Date: Mon, 25 May 2026 12:16:52 -0700 Subject: [PATCH 4/7] delay remote control replay advancement --- .../remote_control/client_tracker.rs | 56 ++++++--- .../src/transport/remote_control/websocket.rs | 112 ++++++++++++++---- 2 files changed, 126 insertions(+), 42 deletions(-) diff --git a/codex-rs/app-server-transport/src/transport/remote_control/client_tracker.rs b/codex-rs/app-server-transport/src/transport/remote_control/client_tracker.rs index ea19b70c09e..dd79774e1e6 100644 --- a/codex-rs/app-server-transport/src/transport/remote_control/client_tracker.rs +++ b/codex-rs/app-server-transport/src/transport/remote_control/client_tracker.rs @@ -151,11 +151,7 @@ impl ClientTracker { message, }) .await?; - if let Some(seq_id) = seq_id - && let Some(client) = self.clients.get_mut(&client_key) - { - client.last_inbound_seq_id = Some(seq_id); - } + self.record_inbound_message_delivery(&client_key, seq_id); return Ok(()); } @@ -207,11 +203,8 @@ impl ClientTracker { self.close_client(&client_key).await?; return Err(err); } - if !is_legacy_stream_id - && let Some(seq_id) = seq_id - && let Some(client) = self.clients.get_mut(&client_key) - { - client.last_inbound_seq_id = Some(seq_id); + if !is_legacy_stream_id { + self.record_inbound_message_delivery(&client_key, seq_id); } Ok(()) } @@ -328,15 +321,12 @@ impl ClientTracker { } async fn send_transport_event(&self, event: TransportEvent) -> Result<(), Stopped> { - // Close owns app-server cleanup, so keep delivery alive if the caller is aborted. - if matches!(&event, TransportEvent::ConnectionClosed { .. }) { - return tokio::spawn({ - let transport_event_tx = self.transport_event_tx.clone(); - async move { transport_event_tx.send(event).await.map_err(|_| Stopped) } - }) - .await - .map_err(|_| Stopped)?; - } + let event = match event { + TransportEvent::ConnectionClosed { connection_id } => { + return self.send_connection_closed(connection_id).await; + } + event => event, + }; let event_name = transport_event_name(&event); match timeout( @@ -357,6 +347,34 @@ impl ClientTracker { } } } + + fn record_inbound_message_delivery( + &mut self, + client_key: &(ClientId, StreamId), + seq_id: Option, + ) { + // Timed forwarding can fail, so only dedupe retries after app-server receives it. + if let Some(seq_id) = seq_id + && let Some(client) = self.clients.get_mut(client_key) + { + client.last_inbound_seq_id = Some(seq_id); + } + } + + async fn send_connection_closed(&self, connection_id: ConnectionId) -> Result<(), Stopped> { + // Worker shutdown can abort the caller; detach the cleanup event before awaiting it. + tokio::spawn({ + let transport_event_tx = self.transport_event_tx.clone(); + async move { + transport_event_tx + .send(TransportEvent::ConnectionClosed { connection_id }) + .await + .map_err(|_| Stopped) + } + }) + .await + .map_err(|_| Stopped)? + } } fn transport_event_name(event: &TransportEvent) -> &'static str { diff --git a/codex-rs/app-server-transport/src/transport/remote_control/websocket.rs b/codex-rs/app-server-transport/src/transport/remote_control/websocket.rs index 5d41d5c44d2..ff5624a8337 100644 --- a/codex-rs/app-server-transport/src/transport/remote_control/websocket.rs +++ b/codex-rs/app-server-transport/src/transport/remote_control/websocket.rs @@ -184,13 +184,32 @@ impl WebsocketState { } let observation = self.client_segment_reassembler.observe(client_envelope); - if matches!(observation, ClientSegmentObservation::Forward(_)) - && let Some((key, seq_id)) = client_message_key - { + observation + } + + fn record_client_message_delivery( + &mut self, + client_envelope: &ClientEnvelope, + client_message_key: Option<((ClientId, Option), u64)>, + ) { + if let Some(cursor) = client_envelope.cursor.as_deref() { + self.subscribe_cursor = Some(cursor.to_string()); + } + if let Some((key, seq_id)) = client_message_key { self.last_completed_client_chunk_seq_id_by_stream .insert(key, seq_id); } - observation + if let ClientEvent::Ack { segment_id } = &client_envelope.event + && let Some(acked_seq_id) = client_envelope.seq_id + && let Some(stream_id) = client_envelope.stream_id.as_ref() + { + self.outbound_buffer.ack( + &client_envelope.client_id, + stream_id, + acked_seq_id, + *segment_id, + ); + } } fn invalidate_client_message_stream(&mut self, client_id: &ClientId, stream_id: &StreamId) { @@ -1036,6 +1055,7 @@ impl RemoteControlWebsocket { } }; + let client_message_key = WebsocketState::client_message_key(&client_envelope); let observation = { let mut websocket_state = state.lock().await; websocket_state.observe_client_message(client_envelope, wire_size_bytes) @@ -1045,24 +1065,6 @@ impl RemoteControlWebsocket { ClientSegmentObservation::Pending | ClientSegmentObservation::Dropped => continue, }; - { - let mut websocket_state = state.lock().await; - if let Some(cursor) = client_envelope.cursor.as_deref() { - websocket_state.subscribe_cursor = Some(cursor.to_string()); - } - if let ClientEvent::Ack { segment_id } = &client_envelope.event - && let Some(acked_seq_id) = client_envelope.seq_id - && let Some(stream_id) = client_envelope.stream_id.as_ref() - { - websocket_state.outbound_buffer.ack( - &client_envelope.client_id, - stream_id, - acked_seq_id, - *segment_id, - ); - } - } - let closed_client = matches!(&client_envelope.event, ClientEvent::ClientClosed).then(|| { ( @@ -1070,6 +1072,7 @@ impl RemoteControlWebsocket { client_envelope.stream_id.clone(), ) }); + let delivered_client_envelope = client_envelope.clone(); if client_tracker .handle_message(client_envelope) .await @@ -1077,6 +1080,10 @@ impl RemoteControlWebsocket { { return Ok(()); } + state + .lock() + .await + .record_client_message_delivery(&delivered_client_envelope, client_message_key); if let Some((client_id, stream_id)) = closed_client { let mut websocket_state = state.lock().await; if let Some(stream_id) = stream_id { @@ -2526,6 +2533,65 @@ mod tests { &raw[split..], ); + assert!(matches!( + observe_client_message(&mut state, first_chunk.clone()), + ClientSegmentObservation::Pending + )); + let completed_envelope = match observe_client_message(&mut state, second_chunk) { + ClientSegmentObservation::Forward(client_envelope) => *client_envelope, + _ => panic!("expected completed client message"), + }; + state.record_client_message_delivery( + &completed_envelope, + Some(( + ( + ClientId("client-1".to_string()), + Some(StreamId("stream-1".to_string())), + ), + 4, + )), + ); + assert!(matches!( + observe_client_message(&mut state, first_chunk), + ClientSegmentObservation::Dropped + )); + } + + #[test] + fn websocket_state_allows_replay_before_completed_chunk_delivery() { + let (outbound_buffer, _used_rx) = BoundedOutboundBuffer::new(); + let mut state = WebsocketState { + outbound_buffer, + subscribe_cursor: None, + next_seq_id_by_stream: HashMap::new(), + last_completed_client_chunk_seq_id_by_stream: HashMap::new(), + client_segment_reassembler: ClientSegmentReassembler::default(), + }; + let message = JSONRPCMessage::Notification(JSONRPCNotification { + method: "initialized".to_string(), + params: None, + }); + let raw = serde_json::to_vec(&message).expect("message should serialize"); + let split = raw.len() / 2; + let first_chunk = client_chunk_envelope( + "client-1", + "stream-1", + /*seq_id*/ 4, + /*segment_id*/ 0, + /*segment_count*/ 2, + raw.len(), + &raw[..split], + ); + let second_chunk = client_chunk_envelope( + "client-1", + "stream-1", + /*seq_id*/ 4, + /*segment_id*/ 1, + /*segment_count*/ 2, + raw.len(), + &raw[split..], + ); + assert!(matches!( observe_client_message(&mut state, first_chunk.clone()), ClientSegmentObservation::Pending @@ -2536,7 +2602,7 @@ mod tests { )); assert!(matches!( observe_client_message(&mut state, first_chunk), - ClientSegmentObservation::Dropped + ClientSegmentObservation::Pending )); } From 176f13201a0b103de914b5070a9d8690c6bcd664 Mon Sep 17 00:00:00 2001 From: Anton Panasenko Date: Mon, 25 May 2026 12:23:29 -0700 Subject: [PATCH 5/7] log remote control close forwarding --- .../remote_control/client_tracker.rs | 34 +++++++++++++++++-- 1 file changed, 31 insertions(+), 3 deletions(-) diff --git a/codex-rs/app-server-transport/src/transport/remote_control/client_tracker.rs b/codex-rs/app-server-transport/src/transport/remote_control/client_tracker.rs index dd79774e1e6..0cb8f1a27b2 100644 --- a/codex-rs/app-server-transport/src/transport/remote_control/client_tracker.rs +++ b/codex-rs/app-server-transport/src/transport/remote_control/client_tracker.rs @@ -20,6 +20,7 @@ use tokio::time::Duration; use tokio::time::Instant; use tokio::time::timeout; use tokio_util::sync::CancellationToken; +use tracing::info; use tracing::warn; const REMOTE_CONTROL_CLIENT_IDLE_TIMEOUT: Duration = Duration::from_secs(10 * 60); @@ -336,7 +337,13 @@ impl ClientTracker { .await { Ok(Ok(())) => Ok(()), - Ok(Err(_)) => Err(Stopped), + Ok(Err(_)) => { + warn!( + transport_event = event_name, + "remote control transport event receiver dropped" + ); + Err(Stopped) + } Err(_) => { warn!( transport_event = event_name, @@ -363,7 +370,11 @@ impl ClientTracker { async fn send_connection_closed(&self, connection_id: ConnectionId) -> Result<(), Stopped> { // Worker shutdown can abort the caller; detach the cleanup event before awaiting it. - tokio::spawn({ + info!( + connection_id = ?connection_id, + "forwarding remote control connection closed transport event" + ); + match tokio::spawn({ let transport_event_tx = self.transport_event_tx.clone(); async move { transport_event_tx @@ -373,7 +384,24 @@ impl ClientTracker { } }) .await - .map_err(|_| Stopped)? + { + Ok(Ok(())) => Ok(()), + Ok(Err(_)) => { + warn!( + transport_event = "connection_closed", + "remote control transport event receiver dropped" + ); + Err(Stopped) + } + Err(err) => { + warn!( + transport_event = "connection_closed", + ?err, + "remote control transport event forwarding task failed" + ); + Err(Stopped) + } + } } } From 1ed8d48939c8c3b7e22517b0782d95df0cd58c05 Mon Sep 17 00:00:00 2001 From: Anton Panasenko Date: Mon, 25 May 2026 12:28:49 -0700 Subject: [PATCH 6/7] avoid blocking initialize rollback --- .../remote_control/client_tracker.rs | 81 ++++++++++--------- 1 file changed, 45 insertions(+), 36 deletions(-) diff --git a/codex-rs/app-server-transport/src/transport/remote_control/client_tracker.rs b/codex-rs/app-server-transport/src/transport/remote_control/client_tracker.rs index 0cb8f1a27b2..22e5e74eda2 100644 --- a/codex-rs/app-server-transport/src/transport/remote_control/client_tracker.rs +++ b/codex-rs/app-server-transport/src/transport/remote_control/client_tracker.rs @@ -15,6 +15,7 @@ use codex_app_server_protocol::JSONRPCMessage; use std::collections::HashMap; use tokio::sync::mpsc; use tokio::sync::watch; +use tokio::task::JoinHandle; use tokio::task::JoinSet; use tokio::time::Duration; use tokio::time::Instant; @@ -201,7 +202,12 @@ impl ClientTracker { }) .await { - self.close_client(&client_key).await?; + if let Some(client) = self.remove_client(&client_key) { + client.disconnect_token.cancel(); + // The initialize send already timed out on this queue; preserve close + // delivery without blocking reconnect on the same backpressure. + drop(self.spawn_connection_closed(client.connection_id)); + } return Err(err); } if !is_legacy_stream_id { @@ -304,9 +310,18 @@ impl ClientTracker { &mut self, client_key: &(ClientId, StreamId), ) -> Result<(), Stopped> { - let Some(client) = self.clients.remove(client_key) else { + let Some(client) = self.remove_client(client_key) else { return Ok(()); }; + client.disconnect_token.cancel(); + self.send_transport_event(TransportEvent::ConnectionClosed { + connection_id: client.connection_id, + }) + .await + } + + fn remove_client(&mut self, client_key: &(ClientId, StreamId)) -> Option { + let client = self.clients.remove(client_key)?; if self .legacy_stream_ids .get(&client_key.0) @@ -314,11 +329,7 @@ impl ClientTracker { { self.legacy_stream_ids.remove(&client_key.0); } - client.disconnect_token.cancel(); - self.send_transport_event(TransportEvent::ConnectionClosed { - connection_id: client.connection_id, - }) - .await + Some(client) } async fn send_transport_event(&self, event: TransportEvent) -> Result<(), Stopped> { @@ -370,29 +381,8 @@ impl ClientTracker { async fn send_connection_closed(&self, connection_id: ConnectionId) -> Result<(), Stopped> { // Worker shutdown can abort the caller; detach the cleanup event before awaiting it. - info!( - connection_id = ?connection_id, - "forwarding remote control connection closed transport event" - ); - match tokio::spawn({ - let transport_event_tx = self.transport_event_tx.clone(); - async move { - transport_event_tx - .send(TransportEvent::ConnectionClosed { connection_id }) - .await - .map_err(|_| Stopped) - } - }) - .await - { - Ok(Ok(())) => Ok(()), - Ok(Err(_)) => { - warn!( - transport_event = "connection_closed", - "remote control transport event receiver dropped" - ); - Err(Stopped) - } + match self.spawn_connection_closed(connection_id).await { + Ok(result) => result, Err(err) => { warn!( transport_event = "connection_closed", @@ -403,6 +393,29 @@ impl ClientTracker { } } } + + fn spawn_connection_closed( + &self, + connection_id: ConnectionId, + ) -> JoinHandle> { + info!( + connection_id = ?connection_id, + "forwarding remote control connection closed transport event" + ); + let transport_event_tx = self.transport_event_tx.clone(); + tokio::spawn(async move { + transport_event_tx + .send(TransportEvent::ConnectionClosed { connection_id }) + .await + .map_err(|_| { + warn!( + transport_event = "connection_closed", + "remote control transport event receiver dropped" + ); + Stopped + }) + }) + } } fn transport_event_name(event: &TransportEvent) -> &'static str { @@ -699,6 +712,8 @@ mod tests { assert!( timeout(Duration::from_millis(50), &mut handle_message) .await + .expect("initialize timeout rollback should not wait for close delivery") + .expect("handle message task should not panic") .is_err() ); let connection_id = match transport_event_rx.recv().await.expect("open event") { @@ -706,12 +721,6 @@ mod tests { other => panic!("expected connection opened, got {other:?}"), }; - assert!( - handle_message - .await - .expect("handle message task should not panic") - .is_err() - ); match transport_event_rx.recv().await.expect("close event") { TransportEvent::ConnectionClosed { connection_id: closed_connection_id, From af5d171e161d14cae489f7bafd9d7a72d11f84c5 Mon Sep 17 00:00:00 2001 From: Anton Panasenko Date: Mon, 25 May 2026 12:39:46 -0700 Subject: [PATCH 7/7] codex: fix CI failure on PR #24473 --- .../src/transport/remote_control/websocket.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/codex-rs/app-server-transport/src/transport/remote_control/websocket.rs b/codex-rs/app-server-transport/src/transport/remote_control/websocket.rs index ff5624a8337..6358d592e42 100644 --- a/codex-rs/app-server-transport/src/transport/remote_control/websocket.rs +++ b/codex-rs/app-server-transport/src/transport/remote_control/websocket.rs @@ -183,8 +183,7 @@ impl WebsocketState { return ClientSegmentObservation::Dropped; } - let observation = self.client_segment_reassembler.observe(client_envelope); - observation + self.client_segment_reassembler.observe(client_envelope) } fn record_client_message_delivery(