Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions codex-rs/app-server/src/message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ impl ExternalAuth for ExternalAuthRefreshBridge {

pub(crate) struct MessageProcessor {
outgoing: Arc<OutgoingMessageSender>,
skills_watcher: Arc<SkillsWatcher>,
account_processor: AccountRequestProcessor,
apps_processor: AppsRequestProcessor,
catalog_processor: CatalogRequestProcessor,
Expand Down Expand Up @@ -499,6 +500,7 @@ impl MessageProcessor {

Self {
outgoing,
skills_watcher,
account_processor,
apps_processor,
catalog_processor,
Expand Down Expand Up @@ -527,6 +529,7 @@ impl MessageProcessor {

pub(crate) fn clear_runtime_references(&self) {
self.account_processor.clear_external_auth();
self.skills_watcher.shutdown();
}

pub(crate) async fn process_request(
Expand Down
30 changes: 27 additions & 3 deletions codex-rs/app-server/src/skills_watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use codex_file_watcher::ThrottledWatchReceiver;
use codex_file_watcher::WatchPath;
use codex_file_watcher::WatchRegistration;
use codex_protocol::protocol::TurnEnvironmentSelection;
use tokio::sync::oneshot;
use tracing::warn;

#[cfg(not(test))]
Expand All @@ -24,6 +25,7 @@ const WATCHER_THROTTLE_INTERVAL: Duration = Duration::from_millis(50);

pub(crate) struct SkillsWatcher {
subscriber: FileWatcherSubscriber,
shutdown_tx: std::sync::Mutex<Option<oneshot::Sender<()>>>,
}

impl SkillsWatcher {
Expand All @@ -39,8 +41,22 @@ impl SkillsWatcher {
}
};
let (subscriber, rx) = file_watcher.add_subscriber();
Self::spawn_event_loop(rx, skills_manager, outgoing);
Arc::new(Self { subscriber })
let (shutdown_tx, shutdown_rx) = oneshot::channel();
Self::spawn_event_loop(rx, skills_manager, outgoing, shutdown_rx);
Arc::new(Self {
subscriber,
shutdown_tx: std::sync::Mutex::new(Some(shutdown_tx)),
})
}

pub(crate) fn shutdown(&self) {
let mut shutdown_tx = self
.shutdown_tx
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
if let Some(shutdown_tx) = shutdown_tx.take() {
let _ = shutdown_tx.send(());
}
}

pub(crate) async fn register_thread_config(
Expand Down Expand Up @@ -92,14 +108,22 @@ impl SkillsWatcher {
rx: Receiver,
skills_manager: Arc<SkillsManager>,
outgoing: Arc<OutgoingMessageSender>,
mut shutdown_rx: oneshot::Receiver<()>,
) {
let mut rx = ThrottledWatchReceiver::new(rx, WATCHER_THROTTLE_INTERVAL);
let Ok(handle) = tokio::runtime::Handle::try_current() else {
warn!("skills watcher listener skipped: no Tokio runtime available");
return;
};
handle.spawn(async move {
while rx.recv().await.is_some() {
loop {
let event = tokio::select! {
_ = &mut shutdown_rx => break,
event = rx.recv() => event,
};
if event.is_none() {
break;
}
skills_manager.clear_cache();
outgoing
.send_server_notification(ServerNotification::SkillsChanged(
Expand Down
Loading