Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions codex-rs/app-server/src/attestation.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::sync::Arc;
use std::sync::Weak;

use axum::http::HeaderValue;
use codex_app_server_protocol::AttestationGenerateParams;
Expand All @@ -22,13 +23,13 @@ pub(crate) fn app_server_attestation_provider(
thread_state_manager: ThreadStateManager,
) -> Arc<dyn AttestationProvider> {
Arc::new(AppServerAttestationProvider {
outgoing,
outgoing: Arc::downgrade(&outgoing),
thread_state_manager,
})
}

struct AppServerAttestationProvider {
outgoing: Arc<OutgoingMessageSender>,
outgoing: Weak<OutgoingMessageSender>,
thread_state_manager: ThreadStateManager,
}

Expand All @@ -42,7 +43,9 @@ impl std::fmt::Debug for AppServerAttestationProvider {

impl AttestationProvider for AppServerAttestationProvider {
fn header_for_request(&self, context: AttestationContext) -> GenerateAttestationFuture<'_> {
let outgoing = self.outgoing.clone();
let Some(outgoing) = self.outgoing.upgrade() else {
return Box::pin(async { None });
};
let thread_state_manager = self.thread_state_manager.clone();
Box::pin(async move {
request_attestation_header_value_with_timeout(
Expand Down
7 changes: 7 additions & 0 deletions codex-rs/app-server/src/message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ use tokio::sync::broadcast;
use tokio::sync::watch;
use tokio::time::Duration;
use tokio::time::timeout;
use tokio_util::sync::CancellationToken;
use tracing::Instrument;

const EXTERNAL_AUTH_REFRESH_TIMEOUT: Duration = Duration::from_secs(10);
Expand Down Expand Up @@ -160,6 +161,7 @@ impl ExternalAuth for ExternalAuthRefreshBridge {

pub(crate) struct MessageProcessor {
outgoing: Arc<OutgoingMessageSender>,
skills_watcher: Arc<SkillsWatcher>,
account_processor: AccountRequestProcessor,
apps_processor: AppsRequestProcessor,
catalog_processor: CatalogRequestProcessor,
Expand Down Expand Up @@ -349,6 +351,7 @@ impl MessageProcessor {
let thread_list_state_permit = Arc::new(Semaphore::new(/*permits*/ 1));
let workspace_settings_cache =
Arc::new(workspace_settings::WorkspaceSettingsCache::default());
let app_list_shutdown_token = CancellationToken::new();
let account_processor = AccountRequestProcessor::new(
auth_manager.clone(),
Arc::clone(&thread_manager),
Expand All @@ -362,6 +365,7 @@ impl MessageProcessor {
outgoing.clone(),
config_manager.clone(),
Arc::clone(&workspace_settings_cache),
app_list_shutdown_token,
);
let catalog_processor = CatalogRequestProcessor::new(
auth_manager.clone(),
Expand Down Expand Up @@ -499,6 +503,7 @@ impl MessageProcessor {

Self {
outgoing,
skills_watcher,
account_processor,
apps_processor,
catalog_processor,
Expand Down Expand Up @@ -527,6 +532,8 @@ impl MessageProcessor {

pub(crate) fn clear_runtime_references(&self) {
self.account_processor.clear_external_auth();
self.apps_processor.shutdown();
self.skills_watcher.shutdown();
}

pub(crate) async fn process_request(
Expand Down
1 change: 1 addition & 0 deletions codex-rs/app-server/src/request_processors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,7 @@ use tokio::sync::broadcast;
use tokio::sync::oneshot;
use tokio::sync::watch;
use tokio_util::sync::CancellationToken;
use tokio_util::sync::DropGuard;
use tokio_util::task::TaskTracker;
use toml::Value as TomlValue;
use tracing::Instrument;
Expand Down
17 changes: 15 additions & 2 deletions codex-rs/app-server/src/request_processors/apps_processor.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use super::*;

#[derive(Clone)]
pub(crate) struct AppsRequestProcessor {
auth_manager: Arc<AuthManager>,
thread_manager: Arc<ThreadManager>,
outgoing: Arc<OutgoingMessageSender>,
config_manager: ConfigManager,
workspace_settings_cache: Arc<workspace_settings::WorkspaceSettingsCache>,
shutdown_token: CancellationToken,
_shutdown_drop_guard: DropGuard,
}

impl AppsRequestProcessor {
Expand All @@ -16,13 +17,17 @@ impl AppsRequestProcessor {
outgoing: Arc<OutgoingMessageSender>,
config_manager: ConfigManager,
workspace_settings_cache: Arc<workspace_settings::WorkspaceSettingsCache>,
shutdown_token: CancellationToken,
) -> Self {
let shutdown_drop_guard = shutdown_token.clone().drop_guard();
Self {
auth_manager,
thread_manager,
outgoing,
config_manager,
workspace_settings_cache,
shutdown_token,
_shutdown_drop_guard: shutdown_drop_guard,
}
}

Expand Down Expand Up @@ -83,12 +88,20 @@ impl AppsRequestProcessor {
let request = request_id.clone();
let outgoing = Arc::clone(&self.outgoing);
let environment_manager = self.thread_manager.environment_manager();
let shutdown_token = self.shutdown_token.child_token();
tokio::spawn(async move {
Self::apps_list_task(outgoing, request, params, config, environment_manager).await;
tokio::select! {
_ = shutdown_token.cancelled() => {}
_ = Self::apps_list_task(outgoing, request, params, config, environment_manager) => {}
}
});
Ok(None)
}

pub(crate) fn shutdown(&self) {
Comment thread
fcoury-oai marked this conversation as resolved.
self.shutdown_token.cancel();
}

async fn apps_list_task(
outgoing: Arc<OutgoingMessageSender>,
request_id: ConnectionRequestId,
Expand Down
28 changes: 25 additions & 3 deletions codex-rs/app-server/src/skills_watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ use codex_file_watcher::ThrottledWatchReceiver;
use codex_file_watcher::WatchPath;
use codex_file_watcher::WatchRegistration;
use codex_protocol::protocol::TurnEnvironmentSelection;
use tokio_util::sync::CancellationToken;
use tokio_util::sync::DropGuard;
use tracing::warn;

#[cfg(not(test))]
Expand All @@ -24,6 +26,8 @@ const WATCHER_THROTTLE_INTERVAL: Duration = Duration::from_millis(50);

pub(crate) struct SkillsWatcher {
subscriber: FileWatcherSubscriber,
shutdown_token: CancellationToken,
Comment thread
fcoury-oai marked this conversation as resolved.
_shutdown_drop_guard: DropGuard,
}

impl SkillsWatcher {
Expand All @@ -39,8 +43,18 @@ impl SkillsWatcher {
}
};
let (subscriber, rx) = file_watcher.add_subscriber();
Self::spawn_event_loop(rx, skills_manager, outgoing);
Arc::new(Self { subscriber })
let shutdown_token = CancellationToken::new();
let shutdown_drop_guard = shutdown_token.clone().drop_guard();
Self::spawn_event_loop(rx, skills_manager, outgoing, shutdown_token.child_token());
Arc::new(Self {
subscriber,
shutdown_token,
_shutdown_drop_guard: shutdown_drop_guard,
})
}

pub(crate) fn shutdown(&self) {
self.shutdown_token.cancel();
}

pub(crate) async fn register_thread_config(
Expand Down Expand Up @@ -92,14 +106,22 @@ impl SkillsWatcher {
rx: Receiver,
skills_manager: Arc<SkillsManager>,
outgoing: Arc<OutgoingMessageSender>,
shutdown_token: CancellationToken,
) {
let mut rx = ThrottledWatchReceiver::new(rx, WATCHER_THROTTLE_INTERVAL);
let Ok(handle) = tokio::runtime::Handle::try_current() else {
warn!("skills watcher listener skipped: no Tokio runtime available");
return;
};
handle.spawn(async move {
while rx.recv().await.is_some() {
loop {
let event = tokio::select! {
_ = shutdown_token.cancelled() => break,
event = rx.recv() => event,
};
if event.is_none() {
break;
}
skills_manager.clear_cache();
outgoing
.send_server_notification(ServerNotification::SkillsChanged(
Expand Down
Loading