Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix filewatching setup cookie path #6183

Merged
merged 5 commits into from
Oct 17, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 9 additions & 3 deletions crates/turborepo-filewatch/src/globwatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,9 @@ mod test {
setup(&repo_root);
let cookie_dir = repo_root.join_component(".git");

let watcher = FileSystemWatcher::new(&repo_root).await.unwrap();
let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root)
.await
.unwrap();
let cookie_jar = CookieJar::new(&cookie_dir, Duration::from_secs(2), watcher.subscribe());

let glob_watcher = GlobWatcher::new(&repo_root, cookie_jar, watcher.subscribe());
Expand Down Expand Up @@ -500,7 +502,9 @@ mod test {
setup(&repo_root);
let cookie_dir = repo_root.join_component(".git");

let watcher = FileSystemWatcher::new(&repo_root).await.unwrap();
let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root)
.await
.unwrap();
let cookie_jar = CookieJar::new(&cookie_dir, Duration::from_secs(2), watcher.subscribe());

let glob_watcher = GlobWatcher::new(&repo_root, cookie_jar, watcher.subscribe());
Expand Down Expand Up @@ -588,7 +592,9 @@ mod test {
setup(&repo_root);
let cookie_dir = repo_root.join_component(".git");

let watcher = FileSystemWatcher::new(&repo_root).await.unwrap();
let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root)
.await
.unwrap();
let cookie_jar = CookieJar::new(&cookie_dir, Duration::from_secs(2), watcher.subscribe());

let glob_watcher = GlobWatcher::new(&repo_root, cookie_jar, watcher.subscribe());
Expand Down
98 changes: 78 additions & 20 deletions crates/turborepo-filewatch/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ use std::{
time::Duration,
};

// windows -> no recursive watch, watch ancestors
// linux -> recursive watch, watch ancestors
// macos -> custom watcher impl in fsevents, no recursive watch, no watching ancestors
#[cfg(target_os = "macos")]
use fsevent::FsEventWatcher;
Expand All @@ -21,11 +23,7 @@ use notify::{Event, EventHandler, RecursiveMode, Watcher};
use thiserror::Error;
use tokio::sync::{broadcast, mpsc};
use tracing::{debug, warn};
// windows -> no recursive watch, watch ancestors
// linux -> recursive watch, watch ancestors
#[cfg(feature = "watch_ancestors")]
use turbopath::PathRelation;
use turbopath::{AbsoluteSystemPath, AbsoluteSystemPathBuf};
use turbopath::{AbsoluteSystemPath, AbsoluteSystemPathBuf, PathRelation};
#[cfg(feature = "manual_recursive_watch")]
use {
notify::{
Expand Down Expand Up @@ -86,10 +84,30 @@ pub struct FileSystemWatcher {
// dropping the other sender for the broadcast channel, causing all receivers
// to be notified of a close.
_exit_ch: tokio::sync::oneshot::Sender<()>,
pub cookie_dir: AbsoluteSystemPathBuf,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we only expect callers to query this field (and I think that's what we want) we should keep this private and provide a

pub fn cookie_dir(&self) -> &AbsoluteSystemPath {
    &self.cookie_dir
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}

impl FileSystemWatcher {
pub async fn new(root: &AbsoluteSystemPath) -> Result<Self, WatchError> {
pub async fn new_with_default_cookie_dir(
root: &AbsoluteSystemPath,
) -> Result<Self, WatchError> {
// We already store logs in .turbo and recommend it be gitignore'd.
// Watchman uses .git, but we can't guarantee that git is present _or_
// that the turbo root is the same as the git root.
Self::new(root, &root.join_components(&[".turbo", "cookies"])).await
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need a new method instead of just inlining into new()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The filewatcher is somewhat generic so I didn't want to fully hard-code where the cookie files should go.


pub async fn new(
root: &AbsoluteSystemPath,
cookie_dir: &AbsoluteSystemPath,
) -> Result<Self, WatchError> {
if root.relation_to_path(cookie_dir) != PathRelation::Parent {
return Err(WatchError::Setup(format!(
"Invalid cookie directory: {} does not contain {}",
root, cookie_dir
)));
}
setup_cookie_dir(cookie_dir)?;
let (sender, _) = broadcast::channel(1024);
let (send_file_events, mut recv_file_events) = mpsc::channel(1024);
let watch_root = root.to_owned();
Expand All @@ -99,7 +117,7 @@ impl FileSystemWatcher {
let (exit_ch, exit_signal) = tokio::sync::oneshot::channel();
// Ensure we are ready to receive new events, not events for existing state
debug!("waiting for initial filesystem cookie");
wait_for_cookie(root, &mut recv_file_events).await?;
wait_for_cookie(cookie_dir, &mut recv_file_events).await?;
tokio::task::spawn(watch_events(
watcher,
watch_root,
Expand All @@ -111,6 +129,7 @@ impl FileSystemWatcher {
Ok(Self {
sender,
_exit_ch: exit_ch,
cookie_dir: cookie_dir.to_owned(),
})
}

Expand All @@ -119,6 +138,20 @@ impl FileSystemWatcher {
}
}

fn setup_cookie_dir(cookie_dir: &AbsoluteSystemPath) -> Result<(), WatchError> {
// We need to ensure that the cookie directory is cleared out first so
// that we can start over with cookies.
if cookie_dir.exists() {
cookie_dir.remove_dir_all().map_err(|e| {
WatchError::Setup(format!("failed to clear cookie dir {}: {}", cookie_dir, e))
})?;
}
cookie_dir.create_dir_all().map_err(|e| {
WatchError::Setup(format!("failed to setup cookie dir {}: {}", cookie_dir, e))
})?;
Ok(())
}

#[cfg(not(any(feature = "watch_ancestors", feature = "manual_recursive_watch")))]
async fn watch_events(
_watcher: Backend,
Expand Down Expand Up @@ -333,10 +366,13 @@ fn make_watcher<F: EventHandler>(event_handler: F) -> Result<Backend, notify::Er
/// This ensures that we are ready to receive *new* filesystem events, rather
/// than receiving events from existing state, which some backends can do.
async fn wait_for_cookie(
root: &AbsoluteSystemPath,
cookie_dir: &AbsoluteSystemPath,
recv: &mut mpsc::Receiver<EventResult>,
) -> Result<(), WatchError> {
let cookie_path = root.join_component(".turbo-cookie");
// TODO: should this be passed in? Currently the caller guarantees that the
// directory is empty, but it could be the responsibility of the
// filewatcher...
let cookie_path = cookie_dir.join_component(".turbo-cookie");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe out of scope here, but can we call this something more descriptive than .turbo-cookie if there is more than one cookie?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are other cookies (one per RPC), that are numbered serially.

cookie_path.create_with_contents("cookie").map_err(|e| {
WatchError::Setup(format!("failed to write cookie to {}: {}", cookie_path, e))
})?;
Expand Down Expand Up @@ -441,7 +477,9 @@ mod test {
let sibling_path = parent_path.join_component("sibling");
sibling_path.create_dir_all().unwrap();

let watcher = FileSystemWatcher::new(&repo_root).await.unwrap();
let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root)
.await
.unwrap();
let mut recv = watcher.subscribe();

expect_watching(&mut recv, &[&repo_root, &parent_path, &child_path]).await;
Expand Down Expand Up @@ -499,7 +537,9 @@ mod test {
let child_path = parent_path.join_component("child");
child_path.create_dir_all().unwrap();

let watcher = FileSystemWatcher::new(&repo_root).await.unwrap();
let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root)
.await
.unwrap();
let mut recv = watcher.subscribe();

expect_watching(&mut recv, &[&repo_root, &parent_path, &child_path]).await;
Expand Down Expand Up @@ -541,7 +581,9 @@ mod test {
let child_path = parent_path.join_component("child");
child_path.create_dir_all().unwrap();

let watcher = FileSystemWatcher::new(&repo_root).await.unwrap();
let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root)
.await
.unwrap();
let mut recv = watcher.subscribe();
expect_watching(&mut recv, &[&repo_root, &parent_path, &child_path]).await;

Expand Down Expand Up @@ -570,7 +612,9 @@ mod test {
let child_path = parent_path.join_component("child");
child_path.create_dir_all().unwrap();

let watcher = FileSystemWatcher::new(&repo_root).await.unwrap();
let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root)
.await
.unwrap();
let mut recv = watcher.subscribe();
expect_watching(&mut recv, &[&repo_root, &parent_path, &child_path]).await;

Expand Down Expand Up @@ -602,7 +646,9 @@ mod test {
let child_path = parent_path.join_component("child");
child_path.create_dir_all().unwrap();

let watcher = FileSystemWatcher::new(&repo_root).await.unwrap();
let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root)
.await
.unwrap();
let mut recv = watcher.subscribe();
expect_watching(&mut recv, &[&repo_root, &parent_path, &child_path]).await;

Expand Down Expand Up @@ -633,7 +679,9 @@ mod test {
let child_path = parent_path.join_component("child");
child_path.create_dir_all().unwrap();

let watcher = FileSystemWatcher::new(&repo_root).await.unwrap();
let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root)
.await
.unwrap();
let mut recv = watcher.subscribe();
expect_watching(&mut recv, &[&repo_root, &parent_path, &child_path]).await;

Expand Down Expand Up @@ -674,7 +722,9 @@ mod test {
let symlink_path = repo_root.join_component("symlink");
symlink_path.symlink_to_dir(child_path.as_str()).unwrap();

let watcher = FileSystemWatcher::new(&repo_root).await.unwrap();
let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root)
.await
.unwrap();
let mut recv = watcher.subscribe();
expect_watching(&mut recv, &[&repo_root, &parent_path, &child_path]).await;

Expand Down Expand Up @@ -713,7 +763,9 @@ mod test {
let symlink_path = repo_root.join_component("symlink");
symlink_path.symlink_to_dir(child_path.as_str()).unwrap();

let watcher = FileSystemWatcher::new(&repo_root).await.unwrap();
let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root)
.await
.unwrap();
let mut recv = watcher.subscribe();
expect_watching(&mut recv, &[&repo_root, &parent_path, &child_path]).await;

Expand Down Expand Up @@ -757,7 +809,9 @@ mod test {
let child_path = parent_path.join_component("child");
child_path.create_dir_all().unwrap();

let watcher = FileSystemWatcher::new(&repo_root).await.unwrap();
let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root)
.await
.unwrap();
let mut recv = watcher.subscribe();
expect_watching(&mut recv, &[&repo_root, &parent_path, &child_path]).await;

Expand Down Expand Up @@ -795,7 +849,9 @@ mod test {
let child_path = parent_path.join_component("child");
child_path.create_dir_all().unwrap();

let watcher = FileSystemWatcher::new(&repo_root).await.unwrap();
let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root)
.await
.unwrap();
let mut recv = watcher.subscribe();
expect_watching(&mut recv, &[&repo_root, &parent_path, &child_path]).await;

Expand All @@ -816,7 +872,9 @@ mod test {
let mut recv = {
// create and immediately drop the watcher, which should trigger the exit
// channel
let watcher = FileSystemWatcher::new(&repo_root).await.unwrap();
let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root)
.await
.unwrap();
watcher.subscribe()
};

Expand Down
15 changes: 0 additions & 15 deletions crates/turborepo-lib/src/commands/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,23 +174,8 @@ pub async fn daemon_server(
}
CloseReason::Interrupt
});
// We already store logs in .turbo and recommend it be gitignore'd.
// Watchman uses .git, but we can't guarantee that git is present _or_
// that the turbo root is the same as the git root.
let cookie_dir = base.repo_root.join_components(&[".turbo", "cookies"]);
// We need to ensure that the cookie directory is cleared out first so
// that we can start over with cookies.
if cookie_dir.exists() {
cookie_dir
.remove_dir_all()
.map_err(|e| DaemonError::CookieDir(e, cookie_dir.clone()))?;
}
cookie_dir
.create_dir_all()
.map_err(|e| DaemonError::CookieDir(e, cookie_dir.clone()))?;
let reason = crate::daemon::serve(
&base.repo_root,
cookie_dir,
&daemon_root,
log_file,
timeout,
Expand Down
21 changes: 7 additions & 14 deletions crates/turborepo-lib/src/daemon/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,14 @@ impl From<RpcError> for tonic::Status {

async fn start_filewatching(
repo_root: AbsoluteSystemPathBuf,
cookie_dir: AbsoluteSystemPathBuf,
watcher_tx: watch::Sender<Option<Arc<FileWatching>>>,
) -> Result<(), WatchError> {
let watcher = FileSystemWatcher::new(&repo_root).await?;
let cookie_jar = CookieJar::new(&cookie_dir, Duration::from_millis(100), watcher.subscribe());
let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root).await?;
let cookie_jar = CookieJar::new(
&watcher.cookie_dir,
Duration::from_millis(100),
watcher.subscribe(),
);
let glob_watcher = GlobWatcher::new(&repo_root, cookie_jar, watcher.subscribe());
// We can ignore failures here, it means the server is shutting down and
// receivers have gone out of scope.
Expand All @@ -113,7 +116,6 @@ const REQUEST_TIMEOUT: Duration = Duration::from_millis(100);
/// to be wired to signal handling.
pub async fn serve<S>(
repo_root: &AbsoluteSystemPath,
cookie_dir: AbsoluteSystemPathBuf,
daemon_root: &AbsoluteSystemPath,
log_file: AbsoluteSystemPathBuf,
timeout: Duration,
Expand Down Expand Up @@ -143,7 +145,7 @@ where
// all references are dropped.
let fw_shutdown = trigger_shutdown.clone();
let fw_handle = tokio::task::spawn(async move {
if let Err(e) = start_filewatching(watcher_repo_root, cookie_dir, watcher_tx).await {
if let Err(e) = start_filewatching(watcher_repo_root, watcher_tx).await {
error!("filewatching failed to start: {}", e);
let _ = fw_shutdown.send(()).await;
}
Expand Down Expand Up @@ -436,8 +438,6 @@ mod test {
.unwrap();

let repo_root = path.join_component("repo");
let cookie_dir = repo_root.join_component(".git");
cookie_dir.create_dir_all().unwrap();
let daemon_root = path.join_component("daemon");
let log_file = daemon_root.join_component("log");
tracing::info!("start");
Expand All @@ -449,7 +449,6 @@ mod test {
let handle = tokio::task::spawn(async move {
serve(
&repo_root,
cookie_dir,
&daemon_root,
log_file,
Duration::from_secs(60 * 60),
Expand Down Expand Up @@ -484,8 +483,6 @@ mod test {
.unwrap();

let repo_root = path.join_component("repo");
let cookie_dir = repo_root.join_component(".git");
cookie_dir.create_dir_all().unwrap();
let daemon_root = path.join_component("daemon");
let log_file = daemon_root.join_component("log");

Expand All @@ -496,7 +493,6 @@ mod test {
let exit_signal = rx.map(|_result| CloseReason::Interrupt);
let close_reason = serve(
&repo_root,
cookie_dir,
&daemon_root,
log_file,
Duration::from_millis(5),
Expand Down Expand Up @@ -526,8 +522,6 @@ mod test {
.unwrap();

let repo_root = path.join_component("repo");
let cookie_dir = repo_root.join_component(".git");
cookie_dir.create_dir_all().unwrap();
let daemon_root = path.join_component("daemon");
daemon_root.create_dir_all().unwrap();
let log_file = daemon_root.join_component("log");
Expand All @@ -540,7 +534,6 @@ mod test {
let repo_root = server_repo_root;
serve(
&repo_root,
cookie_dir,
&daemon_root,
log_file,
Duration::from_secs(60 * 60),
Expand Down
4 changes: 4 additions & 0 deletions crates/turborepo-paths/src/absolute_system_path.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,10 @@ impl AbsoluteSystemPath {
self.0.as_str().as_bytes()
}

pub fn exists(&self) -> bool {
self.0.exists()
}

pub fn ancestors(&self) -> impl Iterator<Item = &AbsoluteSystemPath> {
self.0.ancestors().map(Self::new_unchecked)
}
Expand Down
4 changes: 0 additions & 4 deletions crates/turborepo-paths/src/absolute_system_path_buf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,10 +192,6 @@ impl AbsoluteSystemPathBuf {
self.0.file_name()
}

pub fn exists(&self) -> bool {
self.0.exists()
}

pub fn try_exists(&self) -> Result<bool, PathError> {
// try_exists is an experimental API and not yet in fs_err
Ok(std::fs::try_exists(&self.0)?)
Expand Down