From f1dedf9614a2d22770428a7b17b0c617fda888d8 Mon Sep 17 00:00:00 2001 From: Felipe Coury Date: Tue, 19 May 2026 18:25:00 -0300 Subject: [PATCH 1/7] fix(app-server): stop skills watcher on shutdown --- codex-rs/app-server/src/message_processor.rs | 3 ++ codex-rs/app-server/src/skills_watcher.rs | 30 ++++++++++++++++++-- 2 files changed, 30 insertions(+), 3 deletions(-) diff --git a/codex-rs/app-server/src/message_processor.rs b/codex-rs/app-server/src/message_processor.rs index fdcd44d01aea..d26529095d34 100644 --- a/codex-rs/app-server/src/message_processor.rs +++ b/codex-rs/app-server/src/message_processor.rs @@ -160,6 +160,7 @@ impl ExternalAuth for ExternalAuthRefreshBridge { pub(crate) struct MessageProcessor { outgoing: Arc, + skills_watcher: Arc, account_processor: AccountRequestProcessor, apps_processor: AppsRequestProcessor, catalog_processor: CatalogRequestProcessor, @@ -499,6 +500,7 @@ impl MessageProcessor { Self { outgoing, + skills_watcher, account_processor, apps_processor, catalog_processor, @@ -527,6 +529,7 @@ impl MessageProcessor { pub(crate) fn clear_runtime_references(&self) { self.account_processor.clear_external_auth(); + self.skills_watcher.shutdown(); } pub(crate) async fn process_request( diff --git a/codex-rs/app-server/src/skills_watcher.rs b/codex-rs/app-server/src/skills_watcher.rs index d20628d3866d..e40f945c268c 100644 --- a/codex-rs/app-server/src/skills_watcher.rs +++ b/codex-rs/app-server/src/skills_watcher.rs @@ -15,6 +15,7 @@ use codex_file_watcher::ThrottledWatchReceiver; use codex_file_watcher::WatchPath; use codex_file_watcher::WatchRegistration; use codex_protocol::protocol::TurnEnvironmentSelection; +use tokio::sync::oneshot; use tracing::warn; #[cfg(not(test))] @@ -24,6 +25,7 @@ const WATCHER_THROTTLE_INTERVAL: Duration = Duration::from_millis(50); pub(crate) struct SkillsWatcher { subscriber: FileWatcherSubscriber, + shutdown_tx: std::sync::Mutex>>, } impl SkillsWatcher { @@ -39,8 +41,22 @@ impl SkillsWatcher { } }; let (subscriber, rx) = file_watcher.add_subscriber(); - Self::spawn_event_loop(rx, skills_manager, outgoing); - Arc::new(Self { subscriber }) + let (shutdown_tx, shutdown_rx) = oneshot::channel(); + Self::spawn_event_loop(rx, skills_manager, outgoing, shutdown_rx); + Arc::new(Self { + subscriber, + shutdown_tx: std::sync::Mutex::new(Some(shutdown_tx)), + }) + } + + pub(crate) fn shutdown(&self) { + let mut shutdown_tx = self + .shutdown_tx + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner); + if let Some(shutdown_tx) = shutdown_tx.take() { + let _ = shutdown_tx.send(()); + } } pub(crate) async fn register_thread_config( @@ -92,6 +108,7 @@ impl SkillsWatcher { rx: Receiver, skills_manager: Arc, outgoing: Arc, + mut shutdown_rx: oneshot::Receiver<()>, ) { let mut rx = ThrottledWatchReceiver::new(rx, WATCHER_THROTTLE_INTERVAL); let Ok(handle) = tokio::runtime::Handle::try_current() else { @@ -99,7 +116,14 @@ impl SkillsWatcher { return; }; handle.spawn(async move { - while rx.recv().await.is_some() { + loop { + let event = tokio::select! { + _ = &mut shutdown_rx => break, + event = rx.recv() => event, + }; + if event.is_none() { + break; + } skills_manager.clear_cache(); outgoing .send_server_notification(ServerNotification::SkillsChanged( From be2961f36218192e475b24d4a74a5da08ca99477 Mon Sep 17 00:00:00 2001 From: Felipe Coury Date: Tue, 19 May 2026 20:30:50 -0300 Subject: [PATCH 2/7] fix(app-server): avoid attestation shutdown hold --- codex-rs/app-server/src/attestation.rs | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/codex-rs/app-server/src/attestation.rs b/codex-rs/app-server/src/attestation.rs index 17bb10c38c76..695f4b72df4e 100644 --- a/codex-rs/app-server/src/attestation.rs +++ b/codex-rs/app-server/src/attestation.rs @@ -1,4 +1,5 @@ use std::sync::Arc; +use std::sync::Weak; use axum::http::HeaderValue; use codex_app_server_protocol::AttestationGenerateParams; @@ -22,13 +23,13 @@ pub(crate) fn app_server_attestation_provider( thread_state_manager: ThreadStateManager, ) -> Arc { Arc::new(AppServerAttestationProvider { - outgoing, + outgoing: Arc::downgrade(&outgoing), thread_state_manager, }) } struct AppServerAttestationProvider { - outgoing: Arc, + outgoing: Weak, thread_state_manager: ThreadStateManager, } @@ -42,9 +43,12 @@ impl std::fmt::Debug for AppServerAttestationProvider { impl AttestationProvider for AppServerAttestationProvider { fn header_for_request(&self, context: AttestationContext) -> GenerateAttestationFuture<'_> { - let outgoing = self.outgoing.clone(); + let outgoing = self.outgoing.upgrade(); let thread_state_manager = self.thread_state_manager.clone(); Box::pin(async move { + let Some(outgoing) = outgoing else { + return None; + }; request_attestation_header_value_with_timeout( outgoing, thread_state_manager, From 13f5908ff3cbad434ed4410236e0e00725155812 Mon Sep 17 00:00:00 2001 From: Felipe Coury Date: Tue, 19 May 2026 20:43:10 -0300 Subject: [PATCH 3/7] fix(app-server): satisfy attestation clippy --- codex-rs/app-server/src/attestation.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/codex-rs/app-server/src/attestation.rs b/codex-rs/app-server/src/attestation.rs index 695f4b72df4e..f0ecb29ab22e 100644 --- a/codex-rs/app-server/src/attestation.rs +++ b/codex-rs/app-server/src/attestation.rs @@ -46,9 +46,7 @@ impl AttestationProvider for AppServerAttestationProvider { let outgoing = self.outgoing.upgrade(); let thread_state_manager = self.thread_state_manager.clone(); Box::pin(async move { - let Some(outgoing) = outgoing else { - return None; - }; + let outgoing = outgoing?; request_attestation_header_value_with_timeout( outgoing, thread_state_manager, From 15e0fecd0489b617b94a294e2112c4e0e509ca2d Mon Sep 17 00:00:00 2001 From: Felipe Coury Date: Tue, 19 May 2026 22:50:34 -0300 Subject: [PATCH 4/7] fix(app-server): cancel apps list on shutdown --- codex-rs/app-server/src/message_processor.rs | 4 ++++ .../src/request_processors/apps_processor.rs | 13 ++++++++++++- 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/codex-rs/app-server/src/message_processor.rs b/codex-rs/app-server/src/message_processor.rs index d26529095d34..4b630d2ac37b 100644 --- a/codex-rs/app-server/src/message_processor.rs +++ b/codex-rs/app-server/src/message_processor.rs @@ -86,6 +86,7 @@ use tokio::sync::broadcast; use tokio::sync::watch; use tokio::time::Duration; use tokio::time::timeout; +use tokio_util::sync::CancellationToken; use tracing::Instrument; const EXTERNAL_AUTH_REFRESH_TIMEOUT: Duration = Duration::from_secs(10); @@ -350,6 +351,7 @@ impl MessageProcessor { let thread_list_state_permit = Arc::new(Semaphore::new(/*permits*/ 1)); let workspace_settings_cache = Arc::new(workspace_settings::WorkspaceSettingsCache::default()); + let app_list_shutdown_token = CancellationToken::new(); let account_processor = AccountRequestProcessor::new( auth_manager.clone(), Arc::clone(&thread_manager), @@ -363,6 +365,7 @@ impl MessageProcessor { outgoing.clone(), config_manager.clone(), Arc::clone(&workspace_settings_cache), + app_list_shutdown_token, ); let catalog_processor = CatalogRequestProcessor::new( auth_manager.clone(), @@ -529,6 +532,7 @@ impl MessageProcessor { pub(crate) fn clear_runtime_references(&self) { self.account_processor.clear_external_auth(); + self.apps_processor.shutdown(); self.skills_watcher.shutdown(); } diff --git a/codex-rs/app-server/src/request_processors/apps_processor.rs b/codex-rs/app-server/src/request_processors/apps_processor.rs index caf60a91a014..7f44df1b7572 100644 --- a/codex-rs/app-server/src/request_processors/apps_processor.rs +++ b/codex-rs/app-server/src/request_processors/apps_processor.rs @@ -7,6 +7,7 @@ pub(crate) struct AppsRequestProcessor { outgoing: Arc, config_manager: ConfigManager, workspace_settings_cache: Arc, + shutdown_token: CancellationToken, } impl AppsRequestProcessor { @@ -16,6 +17,7 @@ impl AppsRequestProcessor { outgoing: Arc, config_manager: ConfigManager, workspace_settings_cache: Arc, + shutdown_token: CancellationToken, ) -> Self { Self { auth_manager, @@ -23,6 +25,7 @@ impl AppsRequestProcessor { outgoing, config_manager, workspace_settings_cache, + shutdown_token, } } @@ -83,12 +86,20 @@ impl AppsRequestProcessor { let request = request_id.clone(); let outgoing = Arc::clone(&self.outgoing); let environment_manager = self.thread_manager.environment_manager(); + let shutdown_token = self.shutdown_token.child_token(); tokio::spawn(async move { - Self::apps_list_task(outgoing, request, params, config, environment_manager).await; + tokio::select! { + _ = shutdown_token.cancelled() => {} + _ = Self::apps_list_task(outgoing, request, params, config, environment_manager) => {} + } }); Ok(None) } + pub(crate) fn shutdown(&self) { + self.shutdown_token.cancel(); + } + async fn apps_list_task( outgoing: Arc, request_id: ConnectionRequestId, From dc25d80d2d79aa717bbdf530c30c47840a1cad9b Mon Sep 17 00:00:00 2001 From: Felipe Coury Date: Wed, 20 May 2026 00:00:04 -0300 Subject: [PATCH 5/7] fix(app-server): address shutdown review feedback --- codex-rs/app-server/src/attestation.rs | 5 +++-- codex-rs/app-server/src/skills_watcher.rs | 22 ++++++++-------------- 2 files changed, 11 insertions(+), 16 deletions(-) diff --git a/codex-rs/app-server/src/attestation.rs b/codex-rs/app-server/src/attestation.rs index f0ecb29ab22e..206c38cee17e 100644 --- a/codex-rs/app-server/src/attestation.rs +++ b/codex-rs/app-server/src/attestation.rs @@ -43,10 +43,11 @@ impl std::fmt::Debug for AppServerAttestationProvider { impl AttestationProvider for AppServerAttestationProvider { fn header_for_request(&self, context: AttestationContext) -> GenerateAttestationFuture<'_> { - let outgoing = self.outgoing.upgrade(); + let Some(outgoing) = self.outgoing.upgrade() else { + return Box::pin(async { None }); + }; let thread_state_manager = self.thread_state_manager.clone(); Box::pin(async move { - let outgoing = outgoing?; request_attestation_header_value_with_timeout( outgoing, thread_state_manager, diff --git a/codex-rs/app-server/src/skills_watcher.rs b/codex-rs/app-server/src/skills_watcher.rs index e40f945c268c..d8e54b217be5 100644 --- a/codex-rs/app-server/src/skills_watcher.rs +++ b/codex-rs/app-server/src/skills_watcher.rs @@ -15,7 +15,7 @@ use codex_file_watcher::ThrottledWatchReceiver; use codex_file_watcher::WatchPath; use codex_file_watcher::WatchRegistration; use codex_protocol::protocol::TurnEnvironmentSelection; -use tokio::sync::oneshot; +use tokio_util::sync::CancellationToken; use tracing::warn; #[cfg(not(test))] @@ -25,7 +25,7 @@ const WATCHER_THROTTLE_INTERVAL: Duration = Duration::from_millis(50); pub(crate) struct SkillsWatcher { subscriber: FileWatcherSubscriber, - shutdown_tx: std::sync::Mutex>>, + shutdown_token: CancellationToken, } impl SkillsWatcher { @@ -41,22 +41,16 @@ impl SkillsWatcher { } }; let (subscriber, rx) = file_watcher.add_subscriber(); - let (shutdown_tx, shutdown_rx) = oneshot::channel(); - Self::spawn_event_loop(rx, skills_manager, outgoing, shutdown_rx); + let shutdown_token = CancellationToken::new(); + Self::spawn_event_loop(rx, skills_manager, outgoing, shutdown_token.child_token()); Arc::new(Self { subscriber, - shutdown_tx: std::sync::Mutex::new(Some(shutdown_tx)), + shutdown_token, }) } pub(crate) fn shutdown(&self) { - let mut shutdown_tx = self - .shutdown_tx - .lock() - .unwrap_or_else(std::sync::PoisonError::into_inner); - if let Some(shutdown_tx) = shutdown_tx.take() { - let _ = shutdown_tx.send(()); - } + self.shutdown_token.cancel(); } pub(crate) async fn register_thread_config( @@ -108,7 +102,7 @@ impl SkillsWatcher { rx: Receiver, skills_manager: Arc, outgoing: Arc, - mut shutdown_rx: oneshot::Receiver<()>, + shutdown_token: CancellationToken, ) { let mut rx = ThrottledWatchReceiver::new(rx, WATCHER_THROTTLE_INTERVAL); let Ok(handle) = tokio::runtime::Handle::try_current() else { @@ -118,7 +112,7 @@ impl SkillsWatcher { handle.spawn(async move { loop { let event = tokio::select! { - _ = &mut shutdown_rx => break, + _ = shutdown_token.cancelled() => break, event = rx.recv() => event, }; if event.is_none() { From 8416b3970d3889c9e3f7fa2d4078cdf4db8bf210 Mon Sep 17 00:00:00 2001 From: Felipe Coury Date: Wed, 20 May 2026 12:51:52 -0300 Subject: [PATCH 6/7] fix(app-server): cancel shutdown tasks on drop --- .../app-server/src/request_processors/apps_processor.rs | 6 ++++++ codex-rs/app-server/src/skills_watcher.rs | 6 ++++++ 2 files changed, 12 insertions(+) diff --git a/codex-rs/app-server/src/request_processors/apps_processor.rs b/codex-rs/app-server/src/request_processors/apps_processor.rs index 7f44df1b7572..6563c06ed901 100644 --- a/codex-rs/app-server/src/request_processors/apps_processor.rs +++ b/codex-rs/app-server/src/request_processors/apps_processor.rs @@ -379,3 +379,9 @@ async fn send_app_list_updated_notification( )) .await; } + +impl Drop for AppsRequestProcessor { + fn drop(&mut self) { + self.shutdown(); + } +} diff --git a/codex-rs/app-server/src/skills_watcher.rs b/codex-rs/app-server/src/skills_watcher.rs index d8e54b217be5..7823f12c32bb 100644 --- a/codex-rs/app-server/src/skills_watcher.rs +++ b/codex-rs/app-server/src/skills_watcher.rs @@ -128,3 +128,9 @@ impl SkillsWatcher { }); } } + +impl Drop for SkillsWatcher { + fn drop(&mut self) { + self.shutdown(); + } +} From 5fcbb088f71f8a1581b25edeca85f6fe2b9d868e Mon Sep 17 00:00:00 2001 From: Felipe Coury Date: Wed, 20 May 2026 14:07:51 -0300 Subject: [PATCH 7/7] fix(app-server): use shutdown drop guards --- codex-rs/app-server/src/request_processors.rs | 1 + .../src/request_processors/apps_processor.rs | 10 +++------- codex-rs/app-server/src/skills_watcher.rs | 10 ++++------ 3 files changed, 8 insertions(+), 13 deletions(-) diff --git a/codex-rs/app-server/src/request_processors.rs b/codex-rs/app-server/src/request_processors.rs index 3e84c397f833..1dd8684ea6f9 100644 --- a/codex-rs/app-server/src/request_processors.rs +++ b/codex-rs/app-server/src/request_processors.rs @@ -423,6 +423,7 @@ use tokio::sync::broadcast; use tokio::sync::oneshot; use tokio::sync::watch; use tokio_util::sync::CancellationToken; +use tokio_util::sync::DropGuard; use tokio_util::task::TaskTracker; use toml::Value as TomlValue; use tracing::Instrument; diff --git a/codex-rs/app-server/src/request_processors/apps_processor.rs b/codex-rs/app-server/src/request_processors/apps_processor.rs index 6563c06ed901..5a1c9ffe1c4e 100644 --- a/codex-rs/app-server/src/request_processors/apps_processor.rs +++ b/codex-rs/app-server/src/request_processors/apps_processor.rs @@ -1,6 +1,5 @@ use super::*; -#[derive(Clone)] pub(crate) struct AppsRequestProcessor { auth_manager: Arc, thread_manager: Arc, @@ -8,6 +7,7 @@ pub(crate) struct AppsRequestProcessor { config_manager: ConfigManager, workspace_settings_cache: Arc, shutdown_token: CancellationToken, + _shutdown_drop_guard: DropGuard, } impl AppsRequestProcessor { @@ -19,6 +19,7 @@ impl AppsRequestProcessor { workspace_settings_cache: Arc, shutdown_token: CancellationToken, ) -> Self { + let shutdown_drop_guard = shutdown_token.clone().drop_guard(); Self { auth_manager, thread_manager, @@ -26,6 +27,7 @@ impl AppsRequestProcessor { config_manager, workspace_settings_cache, shutdown_token, + _shutdown_drop_guard: shutdown_drop_guard, } } @@ -379,9 +381,3 @@ async fn send_app_list_updated_notification( )) .await; } - -impl Drop for AppsRequestProcessor { - fn drop(&mut self) { - self.shutdown(); - } -} diff --git a/codex-rs/app-server/src/skills_watcher.rs b/codex-rs/app-server/src/skills_watcher.rs index 7823f12c32bb..9e34cdac8e3d 100644 --- a/codex-rs/app-server/src/skills_watcher.rs +++ b/codex-rs/app-server/src/skills_watcher.rs @@ -16,6 +16,7 @@ use codex_file_watcher::WatchPath; use codex_file_watcher::WatchRegistration; use codex_protocol::protocol::TurnEnvironmentSelection; use tokio_util::sync::CancellationToken; +use tokio_util::sync::DropGuard; use tracing::warn; #[cfg(not(test))] @@ -26,6 +27,7 @@ const WATCHER_THROTTLE_INTERVAL: Duration = Duration::from_millis(50); pub(crate) struct SkillsWatcher { subscriber: FileWatcherSubscriber, shutdown_token: CancellationToken, + _shutdown_drop_guard: DropGuard, } impl SkillsWatcher { @@ -42,10 +44,12 @@ impl SkillsWatcher { }; let (subscriber, rx) = file_watcher.add_subscriber(); let shutdown_token = CancellationToken::new(); + let shutdown_drop_guard = shutdown_token.clone().drop_guard(); Self::spawn_event_loop(rx, skills_manager, outgoing, shutdown_token.child_token()); Arc::new(Self { subscriber, shutdown_token, + _shutdown_drop_guard: shutdown_drop_guard, }) } @@ -128,9 +132,3 @@ impl SkillsWatcher { }); } } - -impl Drop for SkillsWatcher { - fn drop(&mut self) { - self.shutdown(); - } -}