Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(daemon): kill daemon when root is removed #5038

Merged
merged 7 commits into from
May 22, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions crates/turborepo-globwatch/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,8 @@ pub enum ConfigError {
ServerStopped,
/// Watch error
WatchError(Vec<notify::Error>),
/// The server has already been consumed.
WatchingAlready,
}

impl<T: Watcher> WatchConfig<T> {
Expand Down Expand Up @@ -287,6 +289,24 @@ impl<T: Watcher> WatchConfig<T> {
.map_err(ConfigError::WatchError)
}

/// Register a single path to be included by the watcher.
pub async fn include_path(&self, path: &Path) -> Result<(), ConfigError> {
trace!("watching {:?}", path);
// Windows doesn't create an event when a watched directory itself is deleted
// we watch the parent directory instead.
// More information at https://github.com/notify-rs/notify/issues/403
#[cfg(windows)]
let watched_path = path.parent().expect("turbo is unusable at filesytem root");
#[cfg(not(windows))]
let watched_path = path;

self.watcher
.lock()
.expect("watcher lock poisoned")
.watch(watched_path, notify::RecursiveMode::NonRecursive)
.map_err(|e| ConfigError::WatchError(vec![e]))
}

/// Register a glob to be excluded by the watcher.
#[tracing::instrument(skip(self))]
pub async fn exclude(&self, relative_to: &Path, glob: &str) {
Expand Down
10 changes: 9 additions & 1 deletion crates/turborepo-lib/src/daemon/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,15 @@ impl<T: Watcher + Send + 'static> DaemonServer<T> {
None => CloseReason::ServerClosed,
}
},
_ = watcher_fut => CloseReason::WatcherClosed,
watch_res = watcher_fut => {
match watch_res {
Ok(()) => CloseReason::WatcherClosed,
Err(e) => {
error!("Globwatch config error: {:?}", e);
CloseReason::WatcherClosed
},
}
},
}

// here the stop token is dropped, and the pid lock is dropped
Expand Down
50 changes: 46 additions & 4 deletions crates/turborepo-lib/src/globwatcher/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::{
use futures::{stream::iter, StreamExt};
use globwatch::{ConfigError, GlobWatcher, StopToken, WatchConfig, Watcher};
use itertools::Itertools;
use notify::RecommendedWatcher;
use notify::{EventKind, RecommendedWatcher};
use tokio::time::timeout;
use tracing::{trace, warn};
use turbopath::AbsoluteSystemPathBuf;
Expand Down Expand Up @@ -66,7 +66,7 @@ impl<T: Watcher> HashGlobWatcher<T> {
/// Watches a given path, using the flush_folder as temporary storage to
/// make sure that file events are handled in the appropriate order.
#[tracing::instrument(skip(self, token))]
pub async fn watch(&self, token: StopToken) {
pub async fn watch(&self, token: StopToken) -> Result<(), ConfigError> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do other callers need to handle this error as well?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, the only caller happens in a macro that ended up suppressing the lint.

let start_globs = {
let lock = self.hash_globs.lock().expect("only fails if poisoned");
lock.iter()
Expand All @@ -79,17 +79,25 @@ impl<T: Watcher> HashGlobWatcher<T> {
Some(watcher) => watcher.into_stream(token),
None => {
warn!("watcher already consumed");
return;
return Err(ConfigError::WatchingAlready);
}
};

// watch the root of the repo to shut down if the folder is deleted
self.config.include_path(&self.relative_to).await?;

// watch all the globs currently in the map
for glob in start_globs {
self.config.include(&self.relative_to, &glob).await.ok();
}

while let Some(Ok(event)) = stream.next().await {
trace!("processing event: {:?}", event);
if event.paths.contains(&self.relative_to) && matches!(event.kind, EventKind::Remove(_))
{
// if the root of the repo is deleted, we shut down
trace!("repo root was removed, shutting down");
break;
}

let repo_relative_paths = event
.paths
Expand All @@ -115,6 +123,8 @@ impl<T: Watcher> HashGlobWatcher<T> {
self.config.exclude(&self.relative_to, &glob).await;
}
}

Ok(())
}

/// registers a hash with a set of globs to watch for changes
Expand Down Expand Up @@ -657,4 +667,36 @@ mod test {
watcher.glob_statuses.lock().unwrap()
);
}

#[tokio::test]
#[tracing_test::traced_test]
async fn delete_root_kill_daemon() {
let dir = setup();
let flush = tempdir::TempDir::new("globwatch-flush").unwrap();
let watcher = Arc::new(
super::HashGlobWatcher::new(
AbsoluteSystemPathBuf::new(dir.path()).unwrap(),
flush.path().to_path_buf(),
)
.unwrap(),
);

let stop = StopSource::new();

let task_watcher = watcher.clone();
let token = stop.token();

// dropped when the test ends
let task = tokio::task::spawn(async move { task_watcher.watch(token).await });

watcher.config.flush().await.unwrap();
std::fs::remove_dir_all(dir.path()).unwrap();

// it should shut down
let finish = task.await;
assert!(
finish.is_ok(),
"expected task to finish when root is deleted"
);
}
}
Loading