From f1dedf9614a2d22770428a7b17b0c617fda888d8 Mon Sep 17 00:00:00 2001 From: Felipe Coury Date: Tue, 19 May 2026 18:25:00 -0300 Subject: [PATCH] fix(app-server): stop skills watcher on shutdown --- codex-rs/app-server/src/message_processor.rs | 3 ++ codex-rs/app-server/src/skills_watcher.rs | 30 ++++++++++++++++++-- 2 files changed, 30 insertions(+), 3 deletions(-) diff --git a/codex-rs/app-server/src/message_processor.rs b/codex-rs/app-server/src/message_processor.rs index fdcd44d01aea..d26529095d34 100644 --- a/codex-rs/app-server/src/message_processor.rs +++ b/codex-rs/app-server/src/message_processor.rs @@ -160,6 +160,7 @@ impl ExternalAuth for ExternalAuthRefreshBridge { pub(crate) struct MessageProcessor { outgoing: Arc, + skills_watcher: Arc, account_processor: AccountRequestProcessor, apps_processor: AppsRequestProcessor, catalog_processor: CatalogRequestProcessor, @@ -499,6 +500,7 @@ impl MessageProcessor { Self { outgoing, + skills_watcher, account_processor, apps_processor, catalog_processor, @@ -527,6 +529,7 @@ impl MessageProcessor { pub(crate) fn clear_runtime_references(&self) { self.account_processor.clear_external_auth(); + self.skills_watcher.shutdown(); } pub(crate) async fn process_request( diff --git a/codex-rs/app-server/src/skills_watcher.rs b/codex-rs/app-server/src/skills_watcher.rs index d20628d3866d..e40f945c268c 100644 --- a/codex-rs/app-server/src/skills_watcher.rs +++ b/codex-rs/app-server/src/skills_watcher.rs @@ -15,6 +15,7 @@ use codex_file_watcher::ThrottledWatchReceiver; use codex_file_watcher::WatchPath; use codex_file_watcher::WatchRegistration; use codex_protocol::protocol::TurnEnvironmentSelection; +use tokio::sync::oneshot; use tracing::warn; #[cfg(not(test))] @@ -24,6 +25,7 @@ const WATCHER_THROTTLE_INTERVAL: Duration = Duration::from_millis(50); pub(crate) struct SkillsWatcher { subscriber: FileWatcherSubscriber, + shutdown_tx: std::sync::Mutex>>, } impl SkillsWatcher { @@ -39,8 +41,22 @@ impl SkillsWatcher { } }; let (subscriber, rx) = file_watcher.add_subscriber(); - Self::spawn_event_loop(rx, skills_manager, outgoing); - Arc::new(Self { subscriber }) + let (shutdown_tx, shutdown_rx) = oneshot::channel(); + Self::spawn_event_loop(rx, skills_manager, outgoing, shutdown_rx); + Arc::new(Self { + subscriber, + shutdown_tx: std::sync::Mutex::new(Some(shutdown_tx)), + }) + } + + pub(crate) fn shutdown(&self) { + let mut shutdown_tx = self + .shutdown_tx + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner); + if let Some(shutdown_tx) = shutdown_tx.take() { + let _ = shutdown_tx.send(()); + } } pub(crate) async fn register_thread_config( @@ -92,6 +108,7 @@ impl SkillsWatcher { rx: Receiver, skills_manager: Arc, outgoing: Arc, + mut shutdown_rx: oneshot::Receiver<()>, ) { let mut rx = ThrottledWatchReceiver::new(rx, WATCHER_THROTTLE_INTERVAL); let Ok(handle) = tokio::runtime::Handle::try_current() else { @@ -99,7 +116,14 @@ impl SkillsWatcher { return; }; handle.spawn(async move { - while rx.recv().await.is_some() { + loop { + let event = tokio::select! { + _ = &mut shutdown_rx => break, + event = rx.recv() => event, + }; + if event.is_none() { + break; + } skills_manager.clear_cache(); outgoing .send_server_notification(ServerNotification::SkillsChanged(