diff --git a/codex-rs/app-server/src/attestation.rs b/codex-rs/app-server/src/attestation.rs index 17bb10c38c76..206c38cee17e 100644 --- a/codex-rs/app-server/src/attestation.rs +++ b/codex-rs/app-server/src/attestation.rs @@ -1,4 +1,5 @@ use std::sync::Arc; +use std::sync::Weak; use axum::http::HeaderValue; use codex_app_server_protocol::AttestationGenerateParams; @@ -22,13 +23,13 @@ pub(crate) fn app_server_attestation_provider( thread_state_manager: ThreadStateManager, ) -> Arc { Arc::new(AppServerAttestationProvider { - outgoing, + outgoing: Arc::downgrade(&outgoing), thread_state_manager, }) } struct AppServerAttestationProvider { - outgoing: Arc, + outgoing: Weak, thread_state_manager: ThreadStateManager, } @@ -42,7 +43,9 @@ impl std::fmt::Debug for AppServerAttestationProvider { impl AttestationProvider for AppServerAttestationProvider { fn header_for_request(&self, context: AttestationContext) -> GenerateAttestationFuture<'_> { - let outgoing = self.outgoing.clone(); + let Some(outgoing) = self.outgoing.upgrade() else { + return Box::pin(async { None }); + }; let thread_state_manager = self.thread_state_manager.clone(); Box::pin(async move { request_attestation_header_value_with_timeout( diff --git a/codex-rs/app-server/src/message_processor.rs b/codex-rs/app-server/src/message_processor.rs index fdcd44d01aea..4b630d2ac37b 100644 --- a/codex-rs/app-server/src/message_processor.rs +++ b/codex-rs/app-server/src/message_processor.rs @@ -86,6 +86,7 @@ use tokio::sync::broadcast; use tokio::sync::watch; use tokio::time::Duration; use tokio::time::timeout; +use tokio_util::sync::CancellationToken; use tracing::Instrument; const EXTERNAL_AUTH_REFRESH_TIMEOUT: Duration = Duration::from_secs(10); @@ -160,6 +161,7 @@ impl ExternalAuth for ExternalAuthRefreshBridge { pub(crate) struct MessageProcessor { outgoing: Arc, + skills_watcher: Arc, account_processor: AccountRequestProcessor, apps_processor: AppsRequestProcessor, catalog_processor: CatalogRequestProcessor, @@ -349,6 +351,7 @@ impl MessageProcessor { let thread_list_state_permit = Arc::new(Semaphore::new(/*permits*/ 1)); let workspace_settings_cache = Arc::new(workspace_settings::WorkspaceSettingsCache::default()); + let app_list_shutdown_token = CancellationToken::new(); let account_processor = AccountRequestProcessor::new( auth_manager.clone(), Arc::clone(&thread_manager), @@ -362,6 +365,7 @@ impl MessageProcessor { outgoing.clone(), config_manager.clone(), Arc::clone(&workspace_settings_cache), + app_list_shutdown_token, ); let catalog_processor = CatalogRequestProcessor::new( auth_manager.clone(), @@ -499,6 +503,7 @@ impl MessageProcessor { Self { outgoing, + skills_watcher, account_processor, apps_processor, catalog_processor, @@ -527,6 +532,8 @@ impl MessageProcessor { pub(crate) fn clear_runtime_references(&self) { self.account_processor.clear_external_auth(); + self.apps_processor.shutdown(); + self.skills_watcher.shutdown(); } pub(crate) async fn process_request( diff --git a/codex-rs/app-server/src/request_processors.rs b/codex-rs/app-server/src/request_processors.rs index 3e84c397f833..1dd8684ea6f9 100644 --- a/codex-rs/app-server/src/request_processors.rs +++ b/codex-rs/app-server/src/request_processors.rs @@ -423,6 +423,7 @@ use tokio::sync::broadcast; use tokio::sync::oneshot; use tokio::sync::watch; use tokio_util::sync::CancellationToken; +use tokio_util::sync::DropGuard; use tokio_util::task::TaskTracker; use toml::Value as TomlValue; use tracing::Instrument; diff --git a/codex-rs/app-server/src/request_processors/apps_processor.rs b/codex-rs/app-server/src/request_processors/apps_processor.rs index caf60a91a014..5a1c9ffe1c4e 100644 --- a/codex-rs/app-server/src/request_processors/apps_processor.rs +++ b/codex-rs/app-server/src/request_processors/apps_processor.rs @@ -1,12 +1,13 @@ use super::*; -#[derive(Clone)] pub(crate) struct AppsRequestProcessor { auth_manager: Arc, thread_manager: Arc, outgoing: Arc, config_manager: ConfigManager, workspace_settings_cache: Arc, + shutdown_token: CancellationToken, + _shutdown_drop_guard: DropGuard, } impl AppsRequestProcessor { @@ -16,13 +17,17 @@ impl AppsRequestProcessor { outgoing: Arc, config_manager: ConfigManager, workspace_settings_cache: Arc, + shutdown_token: CancellationToken, ) -> Self { + let shutdown_drop_guard = shutdown_token.clone().drop_guard(); Self { auth_manager, thread_manager, outgoing, config_manager, workspace_settings_cache, + shutdown_token, + _shutdown_drop_guard: shutdown_drop_guard, } } @@ -83,12 +88,20 @@ impl AppsRequestProcessor { let request = request_id.clone(); let outgoing = Arc::clone(&self.outgoing); let environment_manager = self.thread_manager.environment_manager(); + let shutdown_token = self.shutdown_token.child_token(); tokio::spawn(async move { - Self::apps_list_task(outgoing, request, params, config, environment_manager).await; + tokio::select! { + _ = shutdown_token.cancelled() => {} + _ = Self::apps_list_task(outgoing, request, params, config, environment_manager) => {} + } }); Ok(None) } + pub(crate) fn shutdown(&self) { + self.shutdown_token.cancel(); + } + async fn apps_list_task( outgoing: Arc, request_id: ConnectionRequestId, diff --git a/codex-rs/app-server/src/skills_watcher.rs b/codex-rs/app-server/src/skills_watcher.rs index d20628d3866d..9e34cdac8e3d 100644 --- a/codex-rs/app-server/src/skills_watcher.rs +++ b/codex-rs/app-server/src/skills_watcher.rs @@ -15,6 +15,8 @@ use codex_file_watcher::ThrottledWatchReceiver; use codex_file_watcher::WatchPath; use codex_file_watcher::WatchRegistration; use codex_protocol::protocol::TurnEnvironmentSelection; +use tokio_util::sync::CancellationToken; +use tokio_util::sync::DropGuard; use tracing::warn; #[cfg(not(test))] @@ -24,6 +26,8 @@ const WATCHER_THROTTLE_INTERVAL: Duration = Duration::from_millis(50); pub(crate) struct SkillsWatcher { subscriber: FileWatcherSubscriber, + shutdown_token: CancellationToken, + _shutdown_drop_guard: DropGuard, } impl SkillsWatcher { @@ -39,8 +43,18 @@ impl SkillsWatcher { } }; let (subscriber, rx) = file_watcher.add_subscriber(); - Self::spawn_event_loop(rx, skills_manager, outgoing); - Arc::new(Self { subscriber }) + let shutdown_token = CancellationToken::new(); + let shutdown_drop_guard = shutdown_token.clone().drop_guard(); + Self::spawn_event_loop(rx, skills_manager, outgoing, shutdown_token.child_token()); + Arc::new(Self { + subscriber, + shutdown_token, + _shutdown_drop_guard: shutdown_drop_guard, + }) + } + + pub(crate) fn shutdown(&self) { + self.shutdown_token.cancel(); } pub(crate) async fn register_thread_config( @@ -92,6 +106,7 @@ impl SkillsWatcher { rx: Receiver, skills_manager: Arc, outgoing: Arc, + shutdown_token: CancellationToken, ) { let mut rx = ThrottledWatchReceiver::new(rx, WATCHER_THROTTLE_INTERVAL); let Ok(handle) = tokio::runtime::Handle::try_current() else { @@ -99,7 +114,14 @@ impl SkillsWatcher { return; }; handle.spawn(async move { - while rx.recv().await.is_some() { + loop { + let event = tokio::select! { + _ = shutdown_token.cancelled() => break, + event = rx.recv() => event, + }; + if event.is_none() { + break; + } skills_manager.clear_cache(); outgoing .send_server_notification(ServerNotification::SkillsChanged(