Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 4 additions & 44 deletions codex-rs/app-server/src/fs_watch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,66 +8,24 @@ use codex_app_server_protocol::FsWatchParams;
use codex_app_server_protocol::FsWatchResponse;
use codex_app_server_protocol::JSONRPCErrorError;
use codex_app_server_protocol::ServerNotification;
use codex_file_watcher::DebouncedWatchReceiver;
use codex_file_watcher::FileWatcher;
use codex_file_watcher::FileWatcherEvent;
use codex_file_watcher::FileWatcherSubscriber;
use codex_file_watcher::Receiver;
use codex_file_watcher::WatchPath;
use codex_file_watcher::WatchRegistration;
use std::collections::HashMap;
use std::collections::HashSet;
use std::collections::hash_map::Entry;
use std::hash::Hash;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex as AsyncMutex;
#[cfg(test)]
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tokio::time::Instant;
use tracing::warn;

const FS_CHANGED_NOTIFICATION_DEBOUNCE: Duration = Duration::from_millis(200);

struct DebouncedReceiver {
rx: Receiver,
interval: Duration,
changed_paths: HashSet<PathBuf>,
next_allowance: Option<Instant>,
}

impl DebouncedReceiver {
fn new(rx: Receiver, interval: Duration) -> Self {
Self {
rx,
interval,
changed_paths: HashSet::new(),
next_allowance: None,
}
}

async fn recv(&mut self) -> Option<FileWatcherEvent> {
while self.changed_paths.is_empty() {
self.changed_paths.extend(self.rx.recv().await?.paths);
}
let next_allowance = *self
.next_allowance
.get_or_insert_with(|| Instant::now() + self.interval);

loop {
tokio::select! {
event = self.rx.recv() => self.changed_paths.extend(event?.paths),
_ = tokio::time::sleep_until(next_allowance) => break,
}
}

Some(FileWatcherEvent {
paths: self.changed_paths.drain().collect(),
})
}
}

#[derive(Clone)]
pub(crate) struct FsWatchManager {
outgoing: Arc<OutgoingMessageSender>,
Expand Down Expand Up @@ -151,7 +109,7 @@ impl FsWatchManager {

let task_watch_id = watch_id.clone();
tokio::spawn(async move {
let mut rx = DebouncedReceiver::new(rx, FS_CHANGED_NOTIFICATION_DEBOUNCE);
let mut rx = DebouncedWatchReceiver::new(rx, FS_CHANGED_NOTIFICATION_DEBOUNCE);
tokio::pin!(terminate_rx);
loop {
let event = tokio::select! {
Expand Down Expand Up @@ -219,6 +177,8 @@ mod tests {
use super::*;
use codex_utils_absolute_path::AbsolutePathBuf;
use pretty_assertions::assert_eq;
use std::collections::HashSet;
use std::path::PathBuf;
use tempfile::TempDir;

fn absolute_path(path: PathBuf) -> AbsolutePathBuf {
Expand Down
56 changes: 56 additions & 0 deletions codex-rs/file-watcher/src/file_watcher_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,62 @@ async fn throttled_receiver_flushes_pending_on_shutdown() {
assert_eq!(closed, None);
}

#[tokio::test]
async fn debounced_receiver_coalesces_each_event_batch() {
let (tx, rx) = watch_channel();
let mut debounced = DebouncedWatchReceiver::new(rx, TEST_THROTTLE_INTERVAL);

tx.add_changed_paths(&[path("a")]).await;
let first = timeout(TEST_THROTTLE_INTERVAL * 2, debounced.recv())
.await
.expect("first emit timeout");
assert_eq!(
first,
Some(FileWatcherEvent {
paths: vec![path("a")],
})
);

tx.add_changed_paths(&[path("c")]).await;
let blocked = timeout(TEST_THROTTLE_INTERVAL / 2, debounced.recv()).await;
assert_eq!(blocked.is_err(), true);

tx.add_changed_paths(&[path("d")]).await;
let second = timeout(TEST_THROTTLE_INTERVAL * 2, debounced.recv())
.await
.expect("second emit timeout");
assert_eq!(
second,
Some(FileWatcherEvent {
paths: vec![path("c"), path("d")],
})
);
}

#[tokio::test]
async fn debounced_receiver_flushes_pending_on_shutdown() {
let (tx, rx) = watch_channel();
let mut debounced = DebouncedWatchReceiver::new(rx, TEST_THROTTLE_INTERVAL);

tx.add_changed_paths(&[path("a")]).await;
drop(tx);

let flushed = timeout(Duration::from_secs(1), debounced.recv())
.await
.expect("shutdown flush timeout");
assert_eq!(
flushed,
Some(FileWatcherEvent {
paths: vec![path("a")],
})
);

let closed = timeout(Duration::from_secs(1), debounced.recv())
.await
.expect("closed recv timeout");
assert_eq!(closed, None);
}

#[test]
fn is_mutating_event_filters_non_mutating_event_kinds() {
assert_eq!(
Expand Down
43 changes: 43 additions & 0 deletions codex-rs/file-watcher/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,49 @@ impl ThrottledWatchReceiver {
}
}

/// Coalesces file watcher notifications that arrive within a fixed debounce
/// window after the first event in each batch.
pub struct DebouncedWatchReceiver {
rx: Receiver,
interval: Duration,
changed_paths: BTreeSet<PathBuf>,
}

impl DebouncedWatchReceiver {
/// Creates a debouncing wrapper around a raw watcher [`Receiver`].
pub fn new(rx: Receiver, interval: Duration) -> Self {
Self {
rx,
interval,
changed_paths: BTreeSet::new(),
}
}

/// Receives the next debounced event batch.
pub async fn recv(&mut self) -> Option<FileWatcherEvent> {
while self.changed_paths.is_empty() {
self.changed_paths.extend(self.rx.recv().await?.paths);
}
let deadline = Instant::now() + self.interval;

loop {
tokio::select! {
event = self.rx.recv() => match event {
Some(event) => self.changed_paths.extend(event.paths),
None => break,
},
_ = sleep_until(deadline) => break,
}
}

Some(FileWatcherEvent {
paths: std::mem::take(&mut self.changed_paths)
.into_iter()
.collect(),
})
}
}

/// Handle used to register watched paths for one logical consumer.
pub struct FileWatcherSubscriber {
id: SubscriberId,
Expand Down
Loading