From 048d5f5ef313a705b22a37e3e83ff30952569faf Mon Sep 17 00:00:00 2001 From: Alexander Lyon Date: Thu, 8 Feb 2024 14:38:20 +0000 Subject: [PATCH] refactor: make all init synchronous to clean up data initialization This also makes generous use of a new type called an OptionalWatch which allows downstream data dependencies to wait for a resource to become available. All data dependencies are wrapped in an OptionalWatch, and all initialization is strictly synchronous. --- Cargo.lock | 3 + crates/turborepo-filewatch/Cargo.toml | 4 + crates/turborepo-filewatch/src/cookies.rs | 64 +- crates/turborepo-filewatch/src/globwatcher.rs | 50 +- crates/turborepo-filewatch/src/lib.rs | 182 +++-- .../turborepo-filewatch/src/optional_watch.rs | 68 ++ .../src/package_watcher.rs | 712 ++++++++++++------ crates/turborepo-lib/src/daemon/server.rs | 297 ++++---- 8 files changed, 919 insertions(+), 461 deletions(-) create mode 100644 crates/turborepo-filewatch/src/optional_watch.rs diff --git a/Cargo.lock b/Cargo.lock index 43d13f51de9db..460b2a463f040 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11655,10 +11655,13 @@ dependencies = [ "itertools 0.10.5", "libc", "notify 6.1.1", + "serde", + "serde_json", "tempfile", "thiserror", "tokio", "tokio-scoped", + "tokio-stream", "tracing", "tracing-test", "turbopath", diff --git a/crates/turborepo-filewatch/Cargo.toml b/crates/turborepo-filewatch/Cargo.toml index 4bee1f236901f..847a5b189401d 100644 --- a/crates/turborepo-filewatch/Cargo.toml +++ b/crates/turborepo-filewatch/Cargo.toml @@ -15,8 +15,11 @@ dashmap = { workspace = true } futures = { version = "0.3.26" } itertools = { workspace = true } notify = "6.0.1" +serde = { version = "1.0.190", features = ["derive"] } +serde_json = "1.0.106" thiserror = "1.0.38" tokio = { workspace = true, features = ["full", "time"] } +tokio-stream = "0.1.14" tracing = "0.1.37" tracing-test = "0.2.4" turbopath = { workspace = true } @@ -36,6 +39,7 @@ version = "0.2.4" [dev-dependencies] tempfile = { workspace = true } +tokio = { workspace = true, features = ["macros", "rt"] } tokio-scoped = "0.2.0" [features] diff --git a/crates/turborepo-filewatch/src/cookies.rs b/crates/turborepo-filewatch/src/cookies.rs index 41058b5f16bc2..f38f336fc028c 100644 --- a/crates/turborepo-filewatch/src/cookies.rs +++ b/crates/turborepo-filewatch/src/cookies.rs @@ -3,12 +3,14 @@ use std::{collections::BinaryHeap, fs::OpenOptions, time::Duration}; use notify::EventKind; use thiserror::Error; use tokio::{ - sync::{mpsc, oneshot}, + sync::{broadcast, mpsc, oneshot}, time::error::Elapsed, }; use tracing::trace; use turbopath::{AbsoluteSystemPath, AbsoluteSystemPathBuf, PathRelation}; +use crate::{NotifyError, OptionalWatch}; + #[derive(Debug, Error)] pub enum CookieError { #[error("cookie timeout expired")] @@ -22,6 +24,8 @@ pub enum CookieError { io_err: std::io::Error, path: AbsoluteSystemPathBuf, }, + #[error("cookie queue is not available")] + Unavailable, } /// CookieWriter is responsible for assigning filesystem cookies to a request @@ -30,7 +34,7 @@ pub enum CookieError { pub struct CookieWriter { root: AbsoluteSystemPathBuf, timeout: Duration, - cookie_request_tx: mpsc::Sender>>, + cookie_request_tx: OptionalWatch>>>, // _exit_ch exists to trigger a close on the receiver when all instances // of this struct are dropped. The task that is receiving events will exit, // dropping the other sender for the broadcast channel, causing all receivers @@ -138,18 +142,40 @@ impl CookieWatcher { } impl CookieWriter { - pub fn new(root: &AbsoluteSystemPath, timeout: Duration) -> Self { + pub fn new( + root: &AbsoluteSystemPath, + timeout: Duration, + mut recv: OptionalWatch>>, + ) -> Self { + let (cookie_request_sender_tx, cookie_request_sender_rx) = OptionalWatch::new(); let (exit_ch, exit_signal) = mpsc::channel(16); - let (cookie_requests_tx, cookie_requests_rx) = mpsc::channel(16); - tokio::spawn(watch_cookies( - root.to_owned(), - cookie_requests_rx, - exit_signal, - )); + tokio::spawn({ + let root = root.to_owned(); + async move { + if recv.get().await.is_err() { + // here we need to wait for confirmation that the watching end is ready + // before we start sending requests. this has the side effect of not + // enabling the cookie writing mechanism until the watcher is ready + return; + } + + let (cookie_requests_tx, cookie_requests_rx) = mpsc::channel(16); + + if cookie_request_sender_tx + .send(Some(cookie_requests_tx)) + .is_err() + { + // the receiver has already been dropped + tracing::debug!("nobody listening for cookie requests, exiting"); + return; + }; + watch_cookies(root.to_owned(), cookie_requests_rx, exit_signal).await; + } + }); Self { root: root.to_owned(), timeout, - cookie_request_tx: cookie_requests_tx, + cookie_request_tx: cookie_request_sender_rx, _exit_ch: exit_ch, } } @@ -164,7 +190,15 @@ impl CookieWriter { ) -> Result, CookieError> { // we need to write the cookie from a single task so as to serialize them let (resp_tx, resp_rx) = oneshot::channel(); - self.cookie_request_tx.clone().send(resp_tx).await?; + + // make sure the cookie writer is ready + let mut cookie_request_tx = self.cookie_request_tx.clone(); + let Ok(cookie_request_tx) = cookie_request_tx.get().await.map(|s| s.to_owned()) else { + // the cookie queue is not ready and will never be ready + return Err(CookieError::Unavailable); + }; + + cookie_request_tx.send(resp_tx).await?; let serial = tokio::time::timeout(self.timeout, resp_rx).await???; Ok(CookiedRequest { request, serial }) } @@ -224,7 +258,7 @@ mod test { use turbopath::AbsoluteSystemPathBuf; use super::{CookieWatcher, CookiedRequest}; - use crate::{cookies::CookieWriter, NotifyError}; + use crate::{cookies::CookieWriter, NotifyError, OptionalWatch}; struct TestQuery { resp: oneshot::Sender<()>, @@ -288,8 +322,9 @@ mod test { .unwrap(); let (send_file_events, file_events) = broadcast::channel(16); + let recv = OptionalWatch::once(file_events.resubscribe()); let (reqs_tx, reqs_rx) = mpsc::channel(16); - let cookie_writer = CookieWriter::new(&path, Duration::from_secs(2)); + let cookie_writer = CookieWriter::new(&path, Duration::from_secs(2), recv); let (exit_tx, exit_rx) = oneshot::channel(); let service = TestService { @@ -351,8 +386,9 @@ mod test { .unwrap(); let (send_file_events, file_events) = broadcast::channel(16); + let recv = OptionalWatch::once(file_events.resubscribe()); let (reqs_tx, reqs_rx) = mpsc::channel(16); - let cookie_writer = CookieWriter::new(&path, Duration::from_secs(2)); + let cookie_writer = CookieWriter::new(&path, Duration::from_secs(2), recv); let (exit_tx, exit_rx) = oneshot::channel(); let service = TestService { diff --git a/crates/turborepo-filewatch/src/globwatcher.rs b/crates/turborepo-filewatch/src/globwatcher.rs index 8ac2cf7518a4e..3719fc1f339e7 100644 --- a/crates/turborepo-filewatch/src/globwatcher.rs +++ b/crates/turborepo-filewatch/src/globwatcher.rs @@ -10,12 +10,12 @@ use notify::Event; use thiserror::Error; use tokio::sync::{broadcast, mpsc, oneshot}; use tracing::warn; -use turbopath::{AbsoluteSystemPath, AbsoluteSystemPathBuf, RelativeUnixPath}; +use turbopath::{AbsoluteSystemPathBuf, RelativeUnixPath}; use wax::{Any, Glob, Program}; use crate::{ cookies::{CookieError, CookieWatcher, CookieWriter, CookiedRequest}, - NotifyError, + NotifyError, OptionalWatch, }; type Hash = String; @@ -146,16 +146,24 @@ struct GlobTracker { impl GlobWatcher { pub fn new( - root: &AbsoluteSystemPath, + root: AbsoluteSystemPathBuf, cookie_jar: CookieWriter, - recv: broadcast::Receiver>, + mut recv: OptionalWatch>>, ) -> Self { let (exit_ch, exit_signal) = tokio::sync::oneshot::channel(); let (query_ch, query_recv) = mpsc::channel(256); let cookie_root = cookie_jar.root().to_owned(); - tokio::task::spawn( - GlobTracker::new(root.to_owned(), cookie_root, exit_signal, recv, query_recv).watch(), - ); + tokio::task::spawn(async move { + let Ok(recv) = recv.get().await.map(|r| r.resubscribe()) else { + // if this fails, it means that the filewatcher is not available + // so starting the glob tracker is pointless + return; + }; + + GlobTracker::new(root, cookie_root, exit_signal, recv, query_recv) + .watch() + .await + }); Self { cookie_jar, _exit_ch: exit_ch, @@ -452,12 +460,10 @@ mod test { setup(&repo_root); let cookie_dir = repo_root.join_component(".git"); - let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root) - .await - .unwrap(); - let cookie_jar = CookieWriter::new(&cookie_dir, Duration::from_secs(2)); - - let glob_watcher = GlobWatcher::new(&repo_root, cookie_jar, watcher.subscribe()); + let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root).unwrap(); + let recv = watcher.watch(); + let cookie_jar = CookieWriter::new(&cookie_dir, Duration::from_secs(2), recv.clone()); + let glob_watcher = GlobWatcher::new(repo_root.clone(), cookie_jar, recv); let raw_includes = &["my-pkg/dist/**", "my-pkg/.next/**"]; let raw_excludes = ["my-pkg/.next/cache/**"]; @@ -536,12 +542,11 @@ mod test { setup(&repo_root); let cookie_dir = repo_root.join_component(".git"); - let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root) - .await - .unwrap(); - let cookie_jar = CookieWriter::new(&cookie_dir, Duration::from_secs(2)); + let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root).unwrap(); + let recv = watcher.watch(); + let cookie_jar = CookieWriter::new(&cookie_dir, Duration::from_secs(2), recv.clone()); - let glob_watcher = GlobWatcher::new(&repo_root, cookie_jar, watcher.subscribe()); + let glob_watcher = GlobWatcher::new(repo_root.clone(), cookie_jar, recv); let raw_includes = &["my-pkg/dist/**", "my-pkg/.next/**"]; let raw_excludes: [&str; 0] = []; @@ -630,12 +635,11 @@ mod test { setup(&repo_root); let cookie_dir = repo_root.join_component(".git"); - let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root) - .await - .unwrap(); - let cookie_jar = CookieWriter::new(&cookie_dir, Duration::from_secs(2)); + let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root).unwrap(); + let recv = watcher.watch(); + let cookie_jar = CookieWriter::new(&cookie_dir, Duration::from_secs(2), recv.clone()); - let glob_watcher = GlobWatcher::new(&repo_root, cookie_jar, watcher.subscribe()); + let glob_watcher = GlobWatcher::new(repo_root.clone(), cookie_jar, recv); // On windows, we expect different sanitization before the // globs are passed in, due to alternative data streams in files. diff --git a/crates/turborepo-filewatch/src/lib.rs b/crates/turborepo-filewatch/src/lib.rs index 3a376abedcfbf..594aa23cb5418 100644 --- a/crates/turborepo-filewatch/src/lib.rs +++ b/crates/turborepo-filewatch/src/lib.rs @@ -21,7 +21,7 @@ use notify::event::EventKind; use notify::{Config, RecommendedWatcher}; use notify::{Event, EventHandler, RecursiveMode, Watcher}; use thiserror::Error; -use tokio::sync::{broadcast, mpsc}; +use tokio::sync::{broadcast, mpsc, watch::error::RecvError}; use tracing::{debug, warn}; use turbopath::{AbsoluteSystemPath, AbsoluteSystemPathBuf, PathRelation}; #[cfg(feature = "manual_recursive_watch")] @@ -39,8 +39,11 @@ pub mod cookies; #[cfg(target_os = "macos")] mod fsevent; pub mod globwatcher; +mod optional_watch; pub mod package_watcher; +pub use optional_watch::OptionalWatch; + #[cfg(not(target_os = "macos"))] type Backend = RecommendedWatcher; #[cfg(target_os = "macos")] @@ -79,7 +82,7 @@ impl Display for NotifyError { } pub struct FileSystemWatcher { - sender: broadcast::Sender>, + receiver: OptionalWatch>>, // _exit_ch exists to trigger a close on the receiver when an instance // of this struct is dropped. The task that is receiving events will exit, // dropping the other sender for the broadcast channel, causing all receivers @@ -89,53 +92,96 @@ pub struct FileSystemWatcher { } impl FileSystemWatcher { - pub async fn new_with_default_cookie_dir( - root: &AbsoluteSystemPath, - ) -> Result { + pub fn new_with_default_cookie_dir(root: &AbsoluteSystemPath) -> Result { // We already store logs in .turbo and recommend it be gitignore'd. // Watchman uses .git, but we can't guarantee that git is present _or_ // that the turbo root is the same as the git root. - Self::new(root, &root.join_components(&[".turbo", "cookies"])).await + Self::new(root, root.join_components(&[".turbo", "cookies"])) } - pub async fn new( + pub fn new( root: &AbsoluteSystemPath, - cookie_dir: &AbsoluteSystemPath, + cookie_dir: AbsoluteSystemPathBuf, ) -> Result { - if root.relation_to_path(cookie_dir) != PathRelation::Parent { + tracing::debug!("initing file-system watcher"); + + if root.relation_to_path(&cookie_dir) != PathRelation::Parent { return Err(WatchError::Setup(format!( "Invalid cookie directory: {} does not contain {}", root, cookie_dir ))); } - setup_cookie_dir(cookie_dir)?; - let (sender, _) = broadcast::channel(1024); + + let (sender_tx, sender_rx) = OptionalWatch::new(); + let (receiver_tx, receiver_rx) = OptionalWatch::new(); let (send_file_events, mut recv_file_events) = mpsc::channel(1024); - let watch_root = root.to_owned(); - let broadcast_sender = sender.clone(); - debug!("starting filewatcher"); - let watcher = run_watcher(&watch_root, send_file_events)?; let (exit_ch, exit_signal) = tokio::sync::oneshot::channel(); - // Ensure we are ready to receive new events, not events for existing state - debug!("waiting for initial filesystem cookie"); - wait_for_cookie(cookie_dir, &mut recv_file_events).await?; - tokio::task::spawn(watch_events( - watcher, - watch_root, - recv_file_events, - exit_signal, - broadcast_sender, - )); - debug!("filewatching ready"); + + tokio::task::spawn({ + let cookie_dir = cookie_dir.clone(); + let watch_root = root.to_owned(); + async move { + // this task never yields, so run it in the blocking threadpool + let watch_root_task = watch_root.clone(); + let cookie_dir_task = cookie_dir.clone(); + let task = tokio::task::spawn_blocking(move || { + setup_cookie_dir(&cookie_dir_task)?; + run_watcher(&watch_root_task, send_file_events) + }); + + let Ok(Ok(watcher)) = task.await else { + // if the watcher fails, just return. we don't set the event sender, and other + // services will never start + return; + }; + + // Ensure we are ready to receive new events, not events for existing state + debug!("waiting for initial filesystem cookie"); + if let Err(e) = wait_for_cookie(&cookie_dir, &mut recv_file_events).await { + // if we can't get a cookie here, we should not make the file + // watching available to downstream services + warn!("failed to wait for initial filesystem cookie: {}", e); + return; + } + debug!("filewatching ready"); + + let (sender, receiver) = broadcast::channel(1024); + + // if either of these things fail, it means that events that we either won't + // receive events or don't have any downstream listeners. In either case, we + // should probably not run the watcher + _ = sender_tx.send(Some(sender.clone())); + _ = receiver_tx.send(Some(receiver)); + + watch_events( + watcher, + watch_root, + recv_file_events, + exit_signal, + sender_rx, + ) + .await; + } + }); + Ok(Self { - sender, + receiver: receiver_rx, _exit_ch: exit_ch, - cookie_dir: cookie_dir.to_owned(), + cookie_dir, }) } - pub fn subscribe(&self) -> broadcast::Receiver> { - self.sender.subscribe() + /// A convenience method around the sender watcher that waits for file + /// watching to be ready and then returns a handle to the file stream. + pub async fn subscribe( + &self, + ) -> Result>, RecvError> { + let mut receiver = self.receiver.clone(); + receiver.get().await.map(|r| r.resubscribe()) + } + + pub fn watch(&self) -> OptionalWatch>> { + self.receiver.clone() } pub fn cookie_dir(&self) -> &AbsoluteSystemPath { @@ -146,6 +192,8 @@ impl FileSystemWatcher { fn setup_cookie_dir(cookie_dir: &AbsoluteSystemPath) -> Result<(), WatchError> { // We need to ensure that the cookie directory is cleared out first so // that we can start over with cookies. + tracing::debug!("setting up the cookie dir"); + if cookie_dir.exists() { cookie_dir.remove_dir_all().map_err(|e| { WatchError::Setup(format!("failed to clear cookie dir {}: {}", cookie_dir, e)) @@ -184,8 +232,14 @@ async fn watch_events( watch_root: AbsoluteSystemPathBuf, mut recv_file_events: mpsc::Receiver, exit_signal: tokio::sync::oneshot::Receiver<()>, - broadcast_sender: broadcast::Sender>, + mut broadcast_sender: OptionalWatch>>, ) { + let Ok(broadcast_sender) = broadcast_sender.get().await.map(|b| b.clone()) else { + // if we are never sent a sender, we should not run the watcher + tracing::debug!("no downstream listeners, exiting"); + return; + }; + let mut exit_signal = exit_signal; 'outer: loop { tokio::select! { @@ -482,10 +536,8 @@ mod test { let sibling_path = parent_path.join_component("sibling"); sibling_path.create_dir_all().unwrap(); - let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root) - .await - .unwrap(); - let mut recv = watcher.subscribe(); + let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root).unwrap(); + let mut recv = watcher.subscribe().await.unwrap(); expect_watching(&mut recv, &[&repo_root, &parent_path, &child_path]).await; let foo_path = child_path.join_component("foo"); @@ -542,10 +594,8 @@ mod test { let child_path = parent_path.join_component("child"); child_path.create_dir_all().unwrap(); - let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root) - .await - .unwrap(); - let mut recv = watcher.subscribe(); + let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root).unwrap(); + let mut recv = watcher.subscribe().await.unwrap(); expect_watching(&mut recv, &[&repo_root, &parent_path, &child_path]).await; @@ -586,10 +636,8 @@ mod test { let child_path = parent_path.join_component("child"); child_path.create_dir_all().unwrap(); - let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root) - .await - .unwrap(); - let mut recv = watcher.subscribe(); + let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root).unwrap(); + let mut recv = watcher.subscribe().await.unwrap(); expect_watching(&mut recv, &[&repo_root, &parent_path, &child_path]).await; repo_root.remove_dir_all().unwrap(); @@ -617,10 +665,8 @@ mod test { let child_path = parent_path.join_component("child"); child_path.create_dir_all().unwrap(); - let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root) - .await - .unwrap(); - let mut recv = watcher.subscribe(); + let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root).unwrap(); + let mut recv = watcher.subscribe().await.unwrap(); expect_watching(&mut recv, &[&repo_root, &parent_path, &child_path]).await; let new_parent = repo_root.join_component("new_parent"); @@ -651,10 +697,8 @@ mod test { let child_path = parent_path.join_component("child"); child_path.create_dir_all().unwrap(); - let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root) - .await - .unwrap(); - let mut recv = watcher.subscribe(); + let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root).unwrap(); + let mut recv = watcher.subscribe().await.unwrap(); expect_watching(&mut recv, &[&repo_root, &parent_path, &child_path]).await; let new_repo_root = repo_root.parent().unwrap().join_component("new_repo_root"); @@ -684,10 +728,8 @@ mod test { let child_path = parent_path.join_component("child"); child_path.create_dir_all().unwrap(); - let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root) - .await - .unwrap(); - let mut recv = watcher.subscribe(); + let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root).unwrap(); + let mut recv = watcher.subscribe().await.unwrap(); expect_watching(&mut recv, &[&repo_root, &parent_path, &child_path]).await; // Create symlink during file watching @@ -727,10 +769,8 @@ mod test { let symlink_path = repo_root.join_component("symlink"); symlink_path.symlink_to_dir(child_path.as_str()).unwrap(); - let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root) - .await - .unwrap(); - let mut recv = watcher.subscribe(); + let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root).unwrap(); + let mut recv = watcher.subscribe().await.unwrap(); expect_watching(&mut recv, &[&repo_root, &parent_path, &child_path]).await; // Delete symlink during file watching @@ -768,10 +808,8 @@ mod test { let symlink_path = repo_root.join_component("symlink"); symlink_path.symlink_to_dir(child_path.as_str()).unwrap(); - let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root) - .await - .unwrap(); - let mut recv = watcher.subscribe(); + let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root).unwrap(); + let mut recv = watcher.subscribe().await.unwrap(); expect_watching(&mut recv, &[&repo_root, &parent_path, &child_path]).await; // Delete symlink during file watching @@ -814,10 +852,8 @@ mod test { let child_path = parent_path.join_component("child"); child_path.create_dir_all().unwrap(); - let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root) - .await - .unwrap(); - let mut recv = watcher.subscribe(); + let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root).unwrap(); + let mut recv = watcher.subscribe().await.unwrap(); expect_watching(&mut recv, &[&repo_root, &parent_path, &child_path]).await; let repo_parent = repo_root.parent().unwrap(); @@ -854,10 +890,8 @@ mod test { let child_path = parent_path.join_component("child"); child_path.create_dir_all().unwrap(); - let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root) - .await - .unwrap(); - let mut recv = watcher.subscribe(); + let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root).unwrap(); + let mut recv = watcher.subscribe().await.unwrap(); expect_watching(&mut recv, &[&repo_root, &parent_path, &child_path]).await; let repo_parent = repo_root.parent().unwrap(); @@ -877,10 +911,8 @@ mod test { let mut recv = { // create and immediately drop the watcher, which should trigger the exit // channel - let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root) - .await - .unwrap(); - watcher.subscribe() + let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root).unwrap(); + watcher.subscribe().await.unwrap() }; // There may be spurious events, but we should expect a close in short order diff --git a/crates/turborepo-filewatch/src/optional_watch.rs b/crates/turborepo-filewatch/src/optional_watch.rs new file mode 100644 index 0000000000000..7c8d6961901fa --- /dev/null +++ b/crates/turborepo-filewatch/src/optional_watch.rs @@ -0,0 +1,68 @@ +use tokio::sync::watch::{self, error::RecvError, Ref}; + +#[derive(Debug)] +pub struct OptionalWatch(watch::Receiver>); + +impl Clone for OptionalWatch { + fn clone(&self) -> Self { + Self(self.0.clone()) + } +} + +/// A handy wrapper around types that are watched and may be None. +/// `SomeRef` is a reference type that is known to be `Some`. +impl OptionalWatch { + /// Create a new `OptionalWatch` with no initial value. + /// + /// Keep in mind that when the sender is dropped, down stream + /// dependencies will get a RecvError. + pub fn new() -> (watch::Sender>, OptionalWatch) { + let (tx, rx) = watch::channel(None); + (tx, OptionalWatch(rx)) + } + + /// Create a new `OptionalWatch` with an initial, unchanging value. + pub fn once(init: T) -> Self { + let (_tx, rx) = watch::channel(Some(init)); + OptionalWatch(rx) + } + + /// Wait for the value to be available and then return it. + /// + /// If you receive a `RecvError`, the sender has been dropped, meaning you + /// will not receive any more updates. For efficiencies sake, you should + /// exit the task and drop any senders to other dependencies so that the + /// exit can propagate up the chain. + pub async fn get(&mut self) -> Result, RecvError> { + let recv = self.0.wait_for(|f| f.is_some()).await?; + Ok(SomeRef(recv)) + } +} + +pub struct SomeRef<'a, T>(Ref<'a, Option>); + +impl<'a, T> std::ops::Deref for SomeRef<'a, T> { + type Target = T; + + fn deref(&self) -> &Self::Target { + self.0.as_ref().expect("checked") + } +} + +#[cfg(test)] +mod test { + use futures::FutureExt; + + /// Futures have a method that allow you to fetch the value of a future + /// if it is immediately available. This is useful for, for example, + /// allowing consumers to poll a future and get the value if it is + /// available, but otherwise just continue on, rather than wait. + #[tokio::test] + pub async fn now_or_never_works() { + let (tx, mut rx) = super::OptionalWatch::new(); + + tx.send(Some(42)).unwrap(); + + assert_eq!(*rx.get().now_or_never().unwrap().unwrap(), 42); + } +} diff --git a/crates/turborepo-filewatch/src/package_watcher.rs b/crates/turborepo-filewatch/src/package_watcher.rs index 8e284caa1491b..8bb948f258ce7 100644 --- a/crates/turborepo-filewatch/src/package_watcher.rs +++ b/crates/turborepo-filewatch/src/package_watcher.rs @@ -1,28 +1,67 @@ //! This module hosts the `PackageWatcher` type, which is used to watch the //! filesystem for changes to packages. -use std::{ - collections::HashMap, - future::IntoFuture, - path::PathBuf, - sync::{Arc, Mutex}, -}; +use std::{collections::HashMap, sync::Arc}; +use futures::FutureExt; use notify::Event; use tokio::{ join, sync::{ broadcast::{self, error::RecvError}, - oneshot, watch, + oneshot, watch, Mutex as AsyncMutex, }, }; use turbopath::{AbsoluteSystemPath, AbsoluteSystemPathBuf}; use turborepo_repository::{ - discovery::{PackageDiscovery, WorkspaceData}, + discovery::{self, DiscoveryResponse, PackageDiscovery, WorkspaceData}, package_manager::{self, Error, PackageManager, WorkspaceGlobs}, }; -use crate::NotifyError; +use crate::{optional_watch::OptionalWatch, NotifyError}; + +/// A package discovery strategy that watches the file system for changes. Basic +/// idea: +/// - Set up a watcher on file changes on the relevant workspace file for the +/// package manager +/// - When the workspace globs change, re-discover the workspace +/// - When a package.json changes, re-discover the workspace +/// - Keep an in-memory cache of the workspace +pub struct WatchingPackageDiscovery { + /// file watching may not be ready yet so we store a watcher + /// through which we can get the file watching stack + watcher: Arc, +} + +impl WatchingPackageDiscovery { + pub fn new(watcher: Arc) -> Self { + Self { watcher } + } +} + +impl PackageDiscovery for WatchingPackageDiscovery { + async fn discover_packages(&self) -> Result { + tracing::debug!("discovering packages using watcher implementation"); + + // this can either not have a value ready, or the sender has been dropped. in + // either case rust report that the value is unavailable + let package_manager = self + .watcher + .get_package_manager() + .and_then(Result::ok) + .ok_or(discovery::Error::Unavailable)?; + let workspaces = self + .watcher + .get_package_data() + .and_then(Result::ok) + .ok_or(discovery::Error::Unavailable)?; + + Ok(DiscoveryResponse { + workspaces, + package_manager, + }) + } +} /// Watches the filesystem for changes to packages and package managers. pub struct PackageWatcher { @@ -33,22 +72,23 @@ pub struct PackageWatcher { _exit_tx: oneshot::Sender<()>, _handle: tokio::task::JoinHandle<()>, - package_data: Arc>>, - manager_rx: watch::Receiver, + package_data: OptionalWatch>, + + manager_rx: OptionalWatch, } impl PackageWatcher { /// Creates a new package watcher whose current package data can be queried. - pub async fn new( + pub fn new( root: AbsoluteSystemPathBuf, - recv: broadcast::Receiver>, + recv: OptionalWatch>>, backup_discovery: T, ) -> Result { let (exit_tx, exit_rx) = oneshot::channel(); - let subscriber = Subscriber::new(exit_rx, root, recv, backup_discovery).await?; + let subscriber = Subscriber::new(root, recv, backup_discovery)?; let manager_rx = subscriber.manager_receiver(); let package_data = subscriber.package_data(); - let handle = tokio::spawn(subscriber.watch()); + let handle = tokio::spawn(subscriber.watch(exit_rx)); Ok(Self { _exit_tx: exit_tx, _handle: handle, @@ -57,90 +97,219 @@ impl PackageWatcher { }) } - pub async fn get_package_data(&self) -> Vec { - self.package_data - .lock() - .expect("not poisoned") - .clone() - .into_values() - .collect() + /// Get the package data. If the package data is not available, this will + /// block until it is. + pub async fn wait_for_package_data( + &self, + ) -> Result, watch::error::RecvError> { + let mut recv = self.package_data.clone(); + recv.get().await.map(|v| v.values().cloned().collect()) + } + + /// A convenience wrapper around `FutureExt::now_or_never` to let you get + /// the package data if it is immediately available. + pub fn get_package_data(&self) -> Option, watch::error::RecvError>> { + let mut recv = self.package_data.clone(); + let data = if let Some(Ok(inner)) = recv.get().now_or_never() { + Some(Ok(inner.values().cloned().collect())) + } else { + None + }; + data + } + + /// Get the package manager. If the package manager is not available, this + /// will block until it is. + pub async fn wait_for_package_manager( + &self, + ) -> Result { + let mut recv = self.manager_rx.clone(); + recv.get().await.map(|s| s.manager) + } + + /// A convenience wrapper around `FutureExt::now_or_never` to let you get + /// the package manager if it is immediately available. + pub fn get_package_manager(&self) -> Option> { + let mut recv = self.manager_rx.clone(); + // the borrow checker doesn't like returning immediately here so assign to a var + let data = if let Some(Ok(inner)) = recv.get().now_or_never() { + Some(Ok(inner.manager)) + } else { + None + }; + data } - pub async fn get_package_manager(&self) -> PackageManager { - *self.manager_rx.borrow() + pub fn watch(&self) -> OptionalWatch> { + self.package_data.clone() } } /// The underlying task that listens to file system events and updates the /// internal package state. struct Subscriber { - exit_rx: oneshot::Receiver<()>, - filter: WorkspaceGlobs, - recv: broadcast::Receiver>, + /// we need to hold on to this. dropping it will close the downstream + /// data dependencies + _recv_tx: Arc>>>>, + + recv: OptionalWatch>>, + backup_discovery: Arc>, + repo_root: AbsoluteSystemPathBuf, - backup_discovery: T, + root_package_json_path: AbsoluteSystemPathBuf, // package manager data - package_data: Arc>>, - manager_rx: watch::Receiver, - manager_tx: watch::Sender, - - // stored as PathBuf to avoid processing later. - // if package_json changes, we need to re-infer - // the package manager - package_json_path: std::path::PathBuf, - workspace_config_path: std::path::PathBuf, + manager_tx: Arc>>, + manager_rx: OptionalWatch, + package_data_rx: OptionalWatch>, + package_data_tx: Arc>>>, +} + +/// A collection of state inferred from a package manager. All this data will +/// change if the package manager changes. +#[derive(Clone)] +struct PackageManagerState { + manager: PackageManager, + // we need to wrap in Arc to make it send / sync + filter: Arc, + workspace_config_path: AbsoluteSystemPathBuf, } impl Subscriber { - async fn new( - exit_rx: oneshot::Receiver<()>, + fn new( repo_root: AbsoluteSystemPathBuf, - recv: broadcast::Receiver>, - discovery: T, + mut recv: OptionalWatch>>, + backup_discovery: T, ) -> Result { - let initial_discovery = discovery.discover_packages().await?; + let (package_data_tx, package_data_rx) = OptionalWatch::new(); + let package_data_tx = Arc::new(package_data_tx); + let (manager_tx, manager_rx) = OptionalWatch::new(); + let manager_tx = Arc::new(manager_tx); + + // we create a second optional watch here so that we can ensure it is ready and + // pass it down stream after the initial discovery + let (recv_tx, recv_rx) = OptionalWatch::new(); + let recv_tx = Arc::new(recv_tx); + + let backup_discovery = Arc::new(AsyncMutex::new(backup_discovery)); + + let package_json_path = repo_root.join_component("package.json"); + + let _task = tokio::spawn({ + let package_data_tx = package_data_tx.clone(); + let manager_tx = manager_tx.clone(); + let backup_discovery = backup_discovery.clone(); + let repo_root = repo_root.clone(); + let package_json_path = package_json_path.clone(); + let recv_tx = recv_tx.clone(); + async move { + // wait for the watcher, so we can process events that happen during discovery + let Ok(recv) = recv.get().await.map(|r| r.resubscribe()) else { + // if we get here, it means that file watching has not started, so we should + // just report that the package watcher is not available + tracing::debug!("file watching shut down, package watcher not available"); + return; + }; - let (package_json_path, workspace_config_path, filter) = - Self::update_package_manager(&initial_discovery.package_manager, &repo_root)?; + let initial_discovery = backup_discovery.lock().await.discover_packages().await; - let (manager_tx, manager_rx) = watch::channel(initial_discovery.package_manager); + let Ok(initial_discovery) = initial_discovery else { + // if initial discovery fails, there is nothing we can do. we should just report + // that the package watcher is not available + // + // NOTE: in the future, if we decide to differentiate between 'not ready' and + // unavailable, we MUST update the status here to unavailable or the client will + // hang + return; + }; + + let Ok((workspace_config_path, filter)) = Self::update_package_manager( + &initial_discovery.package_manager, + &repo_root, + &package_json_path, + ) else { + // similar story here, if the package manager cannot be read, we should just + // report that the package watcher is not available + return; + }; + + // now that the two pieces of data are available, we can send the package + // manager and set the packages + + let state = PackageManagerState { + manager: initial_discovery.package_manager, + filter: Arc::new(filter), + workspace_config_path, + }; + + // if either of these fail, it means that there are no more subscribers and we + // should just ignore it, since we are likely closing + let a = if manager_tx.send(Some(state)).is_err() { + tracing::debug!("no listeners for package manager"); + false + } else { + true + }; + + let b = if package_data_tx + .send(Some( + initial_discovery + .workspaces + .into_iter() + .map(|p| (p.package_json.parent().expect("non-root").to_owned(), p)) + .collect::>() + .into(), + )) + .is_err() + { + tracing::debug!("no listeners for package data"); + false + } else { + true + }; + + // if we have no listeners for either, we should just exit + if a || b { + _ = recv_tx.send(Some(recv)); + } + } + }); Ok(Self { - exit_rx, - filter, - package_data: Arc::new(Mutex::new( - initial_discovery - .workspaces - .into_iter() - .map(|p| (p.package_json.parent().expect("non-root").to_owned(), p)) - .collect(), - )), - recv, + _recv_tx: recv_tx, + recv: recv_rx, + backup_discovery, + repo_root, + root_package_json_path: package_json_path, + package_data_rx, + package_data_tx, manager_rx, manager_tx, - repo_root, - backup_discovery: discovery, - - package_json_path, - workspace_config_path, }) } - async fn watch(mut self) { + async fn watch(mut self, exit_rx: oneshot::Receiver<()>) { // initialize the contents self.rediscover_packages().await; - // respond to changes - loop { - tokio::select! { - biased; - _ = &mut self.exit_rx => { - tracing::info!("exiting package watcher"); - return - }, - file_event = self.recv.recv().into_future() => match file_event { - Ok(Ok(event)) => self.handle_file_event(event).await, + let process = async move { + let Ok(mut recv) = self.recv.get().await.map(|r| r.resubscribe()) else { + // if the channel is closed, we should just exit + tracing::debug!("file watcher shut down, exiting"); + return; + }; + + tracing::debug!("package watcher ready"); + loop { + let file_event = recv.recv().await; + match file_event { + Ok(Ok(event)) => match self.handle_file_event(&event).await { + Ok(()) => {} + Err(()) => { + tracing::debug!("package watching is closing, exiting"); + return; + } + }, // if we get an error, we need to re-discover the packages Ok(Err(_)) => self.rediscover_packages().await, Err(RecvError::Closed) => return, @@ -148,188 +317,296 @@ impl Subscriber { Err(RecvError::Lagged(count)) => { tracing::warn!("lagged behind {count} processing file watching events"); self.rediscover_packages().await; - }, + } } } + }; + + // respond to changes + + tokio::select! { + biased; + _ = exit_rx => { + tracing::debug!("exiting package watcher due to signal"); + }, + _ = process => { + tracing::debug!("exiting package watcher due to process end"); + } } } fn update_package_manager( manager: &PackageManager, repo_root: &AbsoluteSystemPath, - ) -> Result<(PathBuf, PathBuf, WorkspaceGlobs), Error> { - let package_json_path = repo_root - .join_component("package.json") - .as_std_path() - .to_owned(); - - let workspace_config_path = manager - .workspace_configuration_path() - .map_or(package_json_path.clone(), |p| { - repo_root.join_component(p).as_std_path().to_owned() - }); + package_json_path: &AbsoluteSystemPath, + ) -> Result<(AbsoluteSystemPathBuf, WorkspaceGlobs), Error> { + let workspace_config_path = manager.workspace_configuration_path().map_or_else( + || package_json_path.to_owned(), + |p| repo_root.join_component(p), + ); let filter = manager.get_workspace_globs(repo_root)?; - Ok((package_json_path, workspace_config_path, filter)) + Ok((workspace_config_path, filter)) } - pub fn manager_receiver(&self) -> watch::Receiver { + pub fn manager_receiver(&self) -> OptionalWatch { self.manager_rx.clone() } - pub fn package_data(&self) -> Arc>> { - self.package_data.clone() - } - - fn manager(&self) -> PackageManager { - *self.manager_rx.borrow() + pub fn package_data(&self) -> OptionalWatch> { + self.package_data_rx.clone() } - async fn handle_file_event(&mut self, file_event: Event) { + /// Returns Err(()) if the package manager channel is closed, indicating + /// that the entire watching task should exit. + async fn handle_file_event(&mut self, file_event: &Event) -> Result<(), ()> { tracing::trace!("file event: {:?} {:?}", file_event.kind, file_event.paths); if file_event .paths .iter() - .any(|p| self.package_json_path.eq(p)) + .any(|p| self.root_package_json_path.as_std_path().eq(p)) + { + if let Err(e) = self.handle_root_package_json_change().await { + tracing::error!("error discovering package manager: {}", e); + } + } + + match self.have_workspace_globs_changed(file_event).await { + Ok(true) => { + self.rediscover_packages().await; + Ok(()) + } + Ok(false) => { + // it is the end of the function so we are going to return regardless + self.handle_package_json_change(file_event).await + } + Err(()) => Err(()), + } + } + + /// Returns Err(()) if the package manager channel is closed, indicating + /// that the entire watching task should exit. + async fn handle_package_json_change(&mut self, file_event: &Event) -> Result<(), ()> { + let Ok(state) = self.manager_rx.get().await.map(|x| x.to_owned()) else { + // the channel is closed, so there is no state to write into, return + return Err(()); + }; + + // here, we can only update if we have a valid package state + + // if a path is not a valid utf8 string, it is not a valid path, so ignore + for path in file_event + .paths + .iter() + .filter_map(|p| p.as_os_str().to_str()) { - // if the package.json changed, we need to re-infer the package manager - // and update the glob list - tracing::debug!("package.json changed"); + let path_file = AbsoluteSystemPathBuf::new(path).expect("watched paths are absolute"); + + // the path to the workspace this file is in is the parent + let path_workspace = path_file + .parent() + .expect("watched paths will not be at the root") + .to_owned(); - let resp = match self.backup_discovery.discover_packages().await { - Ok(pm) => pm, + let is_workspace = match state + .filter + .target_is_workspace(&self.repo_root, &path_workspace) + { + Ok(is_workspace) => is_workspace, Err(e) => { - tracing::error!("error discovering package manager: {}", e); - return; + // this will only error if `repo_root` is not an anchor of `path_workspace`. + // if we hit this case, we can safely ignore it + tracing::debug!("yielded path not in workspace: {:?}", e); + continue; } }; - let new_manager = Self::update_package_manager(&resp.package_manager, &self.repo_root) - .map(|(a, b, c)| (resp, a, b, c)); - - match new_manager { - Ok((new_manager, package_json_path, workspace_config_path, filter)) => { - tracing::debug!( - "new package manager data: {:?}, {:?}, {:?}", - new_manager.package_manager, - package_json_path, - filter - ); - // if this fails, we are closing anyways so ignore - self.manager_tx.send(new_manager.package_manager).ok(); - { - let mut data = self.package_data.lock().unwrap(); - *data = new_manager - .workspaces - .into_iter() - .map(|p| (p.package_json.parent().expect("non-root").to_owned(), p)) - .collect(); - } - self.package_json_path = package_json_path; - self.workspace_config_path = workspace_config_path; - self.filter = filter; - } - Err(e) => { - // a change in the package json does not necessarily mean - // that the package manager has changed, so continue with - // best effort - tracing::error!("error getting package manager: {}", e); - } + if is_workspace { + tracing::debug!("tracing file in package: {:?}", path_file); + let package_json = path_workspace.join_component("package.json"); + let turbo_json = path_workspace.join_component("turbo.json"); + + let (package_exists, turbo_exists) = join!( + tokio::fs::try_exists(&package_json), + tokio::fs::try_exists(&turbo_json) + ); + + self.package_data_tx + .send_modify(|mut data| match (&mut data, package_exists) { + (Some(data), Ok(true)) => { + data.insert( + path_workspace, + WorkspaceData { + package_json, + turbo_json: turbo_exists + .unwrap_or_default() + .then_some(turbo_json), + }, + ); + } + (Some(data), Ok(false)) => { + data.remove(&path_workspace); + } + (None, Ok(true)) => { + let mut map = HashMap::new(); + map.insert( + path_workspace, + WorkspaceData { + package_json, + turbo_json: turbo_exists + .unwrap_or_default() + .then_some(turbo_json), + }, + ); + *data = Some(map.into()); + } + (None, Ok(false)) => {} // do nothing + (_, Err(_)) => todo!(), + }); } } - // if it is the package manager, update the glob list - let changed = if file_event + Ok(()) + } + + /// A change to the workspace config path could mean a change to the package + /// glob list. If this happens, we need to re-walk the packages. + /// + /// Returns Err(()) if the package manager channel is closed, indicating + /// that the entire watching task should exit. + async fn have_workspace_globs_changed(&mut self, file_event: &Event) -> Result { + // here, we can only update if we have a valid package state + let Ok(state) = self.manager_rx.get().await.map(|s| s.to_owned()) else { + // we can only fail receiving if the channel is closed, + // which indicated that the entire watching task should exit + return Err(()); + }; + + if file_event .paths .iter() - .any(|p| self.workspace_config_path.eq(p)) + .any(|p| state.workspace_config_path.as_std_path().eq(p)) { - let new_filter = self - .manager() + let new_filter = state + .manager .get_workspace_globs(&self.repo_root) + .map(Arc::new) // under some saving strategies a file can be totally empty for a moment // during a save. these strategies emit multiple events and so we can // a previous or subsequent event in the 'cluster' will still trigger - .unwrap_or_else(|_| self.filter.clone()); + .unwrap_or_else(|_| state.filter.clone()); - let changed = self.filter != new_filter; - self.filter = new_filter; - changed - } else { - false - }; + let changed = state.filter != new_filter; - if changed { - // if the glob list has changed, do a recursive walk and replace - self.rediscover_packages().await; - } else { - // if a path is not a valid utf8 string, it is not a valid path, so ignore - for path in file_event - .paths - .iter() - .filter_map(|p| p.as_os_str().to_str()) - { - let path_file = - AbsoluteSystemPathBuf::new(path).expect("watched paths are absolute"); - - // the path to the workspace this file is in is the parent - let path_workspace = path_file - .parent() - .expect("watched paths will not be at the root") - .to_owned(); - - let is_workspace = match self - .filter - .target_is_workspace(&self.repo_root, &path_workspace) - { - Ok(is_workspace) => is_workspace, - Err(e) => { - // this will only error if `repo_root` is not an anchor of `path_workspace`. - // if we hit this case, we can safely ignore it - tracing::debug!("yielded path not in workspace: {:?}", e); - continue; + if changed { + self.manager_tx.send_modify(|f| { + if let Some(state) = f { + state.filter = new_filter; } + }); + } + + Ok(changed) + } else { + Ok(false) + } + } + + /// A change to the root package json means we need to re-infer the package + /// manager, update the glob list, and re-walk the packages. + /// + /// todo: we can probably improve the uptime here by splitting the package + /// manager out of the package discovery. if the package manager has + /// not changed, we probably do not need to re-walk the packages + async fn handle_root_package_json_change(&mut self) -> Result<(), discovery::Error> { + { + // clear all data + self.manager_tx.send(None).ok(); + self.package_data_tx.send(None).ok(); + } + tracing::debug!("root package.json changed, refreshing package manager and globs"); + let resp = self + .backup_discovery + .lock() + .await + .discover_packages() + .await?; + let new_manager = Self::update_package_manager( + &resp.package_manager, + &self.repo_root, + &self.root_package_json_path, + ) + .map(move |(a, b)| (resp, a, b)); + + // if the package.json changed, we need to re-infer the package manager + // and update the glob list + + match new_manager { + Ok((new_manager, workspace_config_path, filter)) => { + tracing::debug!( + "new package manager data: {:?}, {:?}", + new_manager.package_manager, + filter + ); + + let state = PackageManagerState { + manager: new_manager.package_manager, + filter: Arc::new(filter), + workspace_config_path, }; - if is_workspace { - tracing::debug!("tracing file in package: {:?}", path_file); - let package_json = path_workspace.join_component("package.json"); - let turbo_json = path_workspace.join_component("turbo.json"); - - let (package_exists, turbo_exists) = join!( - tokio::fs::try_exists(&package_json), - tokio::fs::try_exists(&turbo_json) - ); - - let mut data = self.package_data.lock().expect("not poisoned"); - if let Ok(true) = package_exists { - data.insert( - path_workspace, - WorkspaceData { - package_json, - turbo_json: turbo_exists.unwrap_or_default().then_some(turbo_json), - }, - ); - } else { - data.remove(&path_workspace); - } + { + // if this fails, we are closing anyways so ignore + self.manager_tx.send(Some(state)).ok(); + self.package_data_tx.send_modify(move |mut d| { + let new_data = new_manager + .workspaces + .into_iter() + .map(|p| (p.package_json.parent().expect("non-root").to_owned(), p)) + .collect::>(); + if let Some(data) = &mut d { + *data = new_data; + } else { + *d = Some(new_data.into()); + } + }); } } + Err(e) => { + // if we cannot update the package manager, we should just leave + // the package manager as None and make the package data unavailable + tracing::error!("error getting package manager: {}", e); + } } + + Ok(()) } async fn rediscover_packages(&mut self) { tracing::debug!("rediscovering packages"); - if let Ok(data) = self.backup_discovery.discover_packages().await { - let workspace = data - .workspaces - .into_iter() - .map(|p| (p.package_json.parent().expect("non-root").to_owned(), p)) - .collect(); - let mut data = self.package_data.lock().expect("not poisoned"); - *data = workspace; + + // make sure package data is unavailable while we are updating + // if this fails, we have no subscribers so we can just exit + if self.package_data_tx.send(None).is_err() { + return; + } + + if let Ok(response) = self.backup_discovery.lock().await.discover_packages().await { + self.package_data_tx.send_modify(|d| { + let new_data = response + .workspaces + .into_iter() + .map(|p| (p.package_json.parent().expect("non-root").to_owned(), p)) + .collect::>(); + if let Some(data) = d { + *data = new_data; + } else { + *d = Some(new_data.into()); + } + }); } else { + // package data stays unavailable tracing::error!("error discovering packages"); } } @@ -348,6 +625,7 @@ mod test { }; use super::Subscriber; + use crate::OptionalWatch; #[derive(Debug)] struct MockDiscovery { @@ -370,6 +648,7 @@ mod test { let tmp = tempfile::tempdir().unwrap(); let (tx, rx) = broadcast::channel(10); + let rx = OptionalWatch::once(rx); let (_exit_tx, exit_rx) = tokio::sync::oneshot::channel(); let root: AbsoluteSystemPathBuf = tmp.path().try_into().unwrap(); @@ -410,13 +689,11 @@ mod test { package_data: Arc::new(Mutex::new(package_data)), }; - let subscriber = Subscriber::new(exit_rx, root.clone(), rx, mock_discovery) - .await - .unwrap(); + let subscriber = Subscriber::new(root.clone(), rx, mock_discovery).unwrap(); - let package_data = subscriber.package_data(); + let mut package_data = subscriber.package_data(); - let _handle = tokio::spawn(subscriber.watch()); + let _handle = tokio::spawn(subscriber.watch(exit_rx)); tx.send(Ok(notify::Event { kind: notify::EventKind::Create(notify::event::CreateKind::File), @@ -428,7 +705,8 @@ mod test { assert_eq!( package_data - .lock() + .get() + .await .unwrap() .values() .cloned() @@ -476,7 +754,8 @@ mod test { assert_eq!( package_data - .lock() + .get() + .await .unwrap() .values() .cloned() @@ -494,6 +773,7 @@ mod test { let tmp = tempfile::tempdir().unwrap(); let (tx, rx) = broadcast::channel(10); + let rx = OptionalWatch::once(rx); let (_exit_tx, exit_rx) = tokio::sync::oneshot::channel(); let root = AbsoluteSystemPathBuf::new(tmp.path().to_string_lossy()).unwrap(); @@ -543,19 +823,18 @@ mod test { package_data: package_data_raw.clone(), }; - let subscriber = Subscriber::new(exit_rx, root.clone(), rx, mock_discovery) - .await - .unwrap(); + let subscriber = Subscriber::new(root.clone(), rx, mock_discovery).unwrap(); - let package_data = subscriber.package_data(); + let mut package_data = subscriber.package_data(); - let _handle = tokio::spawn(subscriber.watch()); + let _handle = tokio::spawn(subscriber.watch(exit_rx)); tokio::time::sleep(std::time::Duration::from_millis(100)).await; assert_eq!( package_data - .lock() + .get() + .await .unwrap() .values() .cloned() @@ -616,7 +895,8 @@ mod test { assert_eq!( package_data - .lock() + .get() + .await .unwrap() .values() .cloned() diff --git a/crates/turborepo-lib/src/daemon/server.rs b/crates/turborepo-lib/src/daemon/server.rs index c5c8190663e8e..3411e79c540c5 100644 --- a/crates/turborepo-lib/src/daemon/server.rs +++ b/crates/turborepo-lib/src/daemon/server.rs @@ -26,7 +26,8 @@ use semver::Version; use thiserror::Error; use tokio::{ select, - sync::{mpsc, oneshot, watch}, + sync::{mpsc, oneshot}, + task::JoinHandle, }; use tonic::transport::{NamedService, Server}; use tower::ServiceBuilder; @@ -35,11 +36,11 @@ use turbopath::{AbsoluteSystemPath, AbsoluteSystemPathBuf}; use turborepo_filewatch::{ cookies::CookieWriter, globwatcher::{Error as GlobWatcherError, GlobError, GlobSet, GlobWatcher}, - package_watcher::PackageWatcher, + package_watcher::{PackageWatcher, WatchingPackageDiscovery}, FileSystemWatcher, WatchError, }; use turborepo_repository::discovery::{ - DiscoveryResponse, LocalPackageDiscoveryBuilder, PackageDiscovery, PackageDiscoveryBuilder, + LocalPackageDiscoveryBuilder, PackageDiscovery, PackageDiscoveryBuilder, }; use super::{ @@ -60,10 +61,14 @@ pub enum CloseReason { SocketOpenError(SocketOpenError), } +/// We may need to pass out references to a subset of these, so +/// we'll make them public Arcs. Eventually we can stabilize on +/// a general API and close this up. +#[derive(Clone)] pub struct FileWatching { - _watcher: FileSystemWatcher, - pub glob_watcher: GlobWatcher, - pub package_watcher: PackageWatcher, + watcher: Arc, + pub glob_watcher: Arc, + pub package_watcher: Arc, } #[derive(Debug, Error)] @@ -91,34 +96,47 @@ impl From for tonic::Status { } } -async fn start_filewatching( - repo_root: AbsoluteSystemPathBuf, - watcher_tx: watch::Sender>>, - backup_discovery: PD, -) -> Result<(), WatchError> { - let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root).await?; - let cookie_writer = CookieWriter::new(watcher.cookie_dir(), Duration::from_millis(100)); - let glob_watcher = GlobWatcher::new(&repo_root, cookie_writer, watcher.subscribe()); - let package_watcher = - PackageWatcher::new(repo_root.clone(), watcher.subscribe(), backup_discovery) - .await - .map_err(|e| WatchError::Setup(format!("{:?}", e)))?; - // We can ignore failures here, it means the server is shutting down and - // receivers have gone out of scope. - let _ = watcher_tx.send(Some(Arc::new(FileWatching { - _watcher: watcher, - glob_watcher, - package_watcher, - }))); - Ok(()) +impl FileWatching { + /// This function is called in the constructor for the `TurboGrpcService` + /// and should defer ALL heavy computation to the background, making use + /// of `OptionalWatch` to ensure that the server can start up without + /// waiting for the filewatcher to be ready. Using `OptionalWatch`, + /// dependent services can wait for resources they need to become + /// available, and the server can start up without waiting for them. + pub fn new( + repo_root: AbsoluteSystemPathBuf, + backup_discovery: PD, + ) -> Result { + let watcher = Arc::new(FileSystemWatcher::new_with_default_cookie_dir(&repo_root)?); + let recv = watcher.watch(); + + let cookie_watcher = CookieWriter::new( + watcher.cookie_dir(), + Duration::from_millis(100), + recv.clone(), + ); + let glob_watcher = Arc::new(GlobWatcher::new( + repo_root.clone(), + cookie_watcher, + recv.clone(), + )); + let package_watcher = Arc::new( + PackageWatcher::new(repo_root.clone(), recv.clone(), backup_discovery) + .map_err(|e| WatchError::Setup(format!("{:?}", e)))?, + ); + + Ok(FileWatching { + watcher, + glob_watcher, + package_watcher, + }) + } } /// Timeout for every RPC the server handles const REQUEST_TIMEOUT: Duration = Duration::from_millis(100); pub struct TurboGrpcService { - watcher_tx: watch::Sender>>, - watcher_rx: watch::Receiver>>, repo_root: AbsoluteSystemPathBuf, daemon_root: AbsoluteSystemPathBuf, log_file: AbsoluteSystemPathBuf, @@ -145,8 +163,6 @@ where timeout: Duration, external_shutdown: S, ) -> Self { - let (watcher_tx, watcher_rx) = watch::channel(None); - let package_discovery_backup = LocalPackageDiscoveryBuilder::new(repo_root.clone(), None, None); @@ -154,8 +170,6 @@ where // so we use a private struct with just the pieces of state needed to handle // RPCs. TurboGrpcService { - watcher_tx, - watcher_rx, repo_root, daemon_root, log_file, @@ -170,7 +184,7 @@ impl TurboGrpcService where S: Future, PDB: PackageDiscoveryBuilder, - PDB::Output: PackageDiscovery + Send + 'static, + PDB::Output: PackageDiscovery + Send + Sync + 'static, { /// If errors are encountered when loading the package discovery, this /// builder will be used as a backup to refresh the state. @@ -184,16 +198,12 @@ where log_file: self.log_file, repo_root: self.repo_root, timeout: self.timeout, - watcher_rx: self.watcher_rx, - watcher_tx: self.watcher_tx, package_discovery_backup, } } pub async fn serve(self) -> Result { let Self { - watcher_tx, - watcher_rx, daemon_root, external_shutdown, log_file, @@ -202,6 +212,19 @@ where package_discovery_backup, } = self; + // A channel to trigger the shutdown of the gRPC server. This is handed out + // to components internal to the server process such as root watching, as + // well as available to the gRPC server itself to handle the shutdown RPC. + let (trigger_shutdown, mut shutdown_signal) = mpsc::channel::<()>(1); + + let package_discovery_backup = package_discovery_backup.build()?; + let (service, exit_root_watch, watch_root_handle) = TurboGrpcServiceInner::new( + package_discovery_backup, + repo_root.clone(), + trigger_shutdown, + log_file, + ); + let running = Arc::new(AtomicBool::new(true)); let (_pid_lock, stream) = match listen_socket(&daemon_root, running.clone()).await { Ok((pid_lock, stream)) => (pid_lock, stream), @@ -209,37 +232,6 @@ where }; trace!("acquired connection stream for socket"); - let watcher_repo_root = repo_root.to_owned(); - // A channel to trigger the shutdown of the gRPC server. This is handed out - // to components internal to the server process such as root watching, as - // well as available to the gRPC server itself to handle the shutdown RPC. - let (trigger_shutdown, mut shutdown_signal) = mpsc::channel::<()>(1); - - let backup_discovery = package_discovery_backup.build()?; - - // watch receivers as a group own the filewatcher, which will exit when - // all references are dropped. - let fw_shutdown = trigger_shutdown.clone(); - let fw_handle = tokio::task::spawn(async move { - if let Err(e) = - start_filewatching(watcher_repo_root, watcher_tx, backup_discovery).await - { - error!("filewatching failed to start: {}", e); - let _ = fw_shutdown.send(()).await; - } - info!("filewatching started"); - }); - // exit_root_watch delivers a signal to the root watch loop to exit. - // In the event that the server shuts down via some other mechanism, this - // cleans up root watching task. - let (exit_root_watch, root_watch_exit_signal) = oneshot::channel(); - let watch_root_handle = tokio::task::spawn(watch_root( - watcher_rx.clone(), - repo_root.to_owned(), - trigger_shutdown.clone(), - root_watch_exit_signal, - )); - let bump_timeout = Arc::new(BumpTimeout::new(timeout)); let timeout_fut = bump_timeout.wait(); @@ -256,13 +248,6 @@ where // Run the actual service. It takes ownership of the struct given to it, // so we use a private struct with just the pieces of state needed to handle // RPCs. - let service = TurboGrpcServiceInner { - shutdown: trigger_shutdown, - watcher_rx, - times_saved: Arc::new(Mutex::new(HashMap::new())), - start_time: Instant::now(), - log_file: log_file.to_owned(), - }; let server_fut = { let service = ServiceBuilder::new() .layer(BumpTimeoutLayer::new(bump_timeout.clone())) @@ -278,8 +263,9 @@ where }; // Wait for the server to exit. // This can be triggered by timeout, root watcher, or an RPC + tracing::debug!("server started"); let _ = server_fut.await; - info!("gRPC server exited"); + tracing::debug!("server exited"); // Ensure our timer will exit running.store(false, Ordering::SeqCst); // We expect to have a signal from the grpc server on what triggered the exit @@ -293,31 +279,77 @@ where trace!("root watching exited"); // Clean up the filewatching handle in the event that we never even got // started with filewatching. Again, we don't care about the error here. - let _ = fw_handle.await; + // let _ = fw_handle.await; trace!("filewatching handle joined"); Ok(close_reason) } } -struct TurboGrpcServiceInner { +struct TurboGrpcServiceInner { shutdown: mpsc::Sender<()>, - watcher_rx: watch::Receiver>>, + file_watching: FileWatching, times_saved: Arc>>, start_time: Instant, log_file: AbsoluteSystemPathBuf, + package_discovery: PD, } -impl TurboGrpcServiceInner { +// we have a grpc service that uses watching package discovery, and where the +// watching package hasher also uses watching package discovery as well as +// falling back to a local package hasher +impl TurboGrpcServiceInner> { + pub fn new( + package_discovery_backup: PD, + repo_root: AbsoluteSystemPathBuf, + trigger_shutdown: mpsc::Sender<()>, + log_file: AbsoluteSystemPathBuf, + ) -> ( + Self, + oneshot::Sender<()>, + JoinHandle>, + ) { + let file_watching = FileWatching::new(repo_root.clone(), package_discovery_backup).unwrap(); + + tracing::debug!("initing package discovery"); + let package_discovery = Arc::new(WatchingPackageDiscovery::new( + file_watching.package_watcher.clone(), + )); + + // exit_root_watch delivers a signal to the root watch loop to exit. + // In the event that the server shuts down via some other mechanism, this + // cleans up root watching task. + let (exit_root_watch, root_watch_exit_signal) = oneshot::channel(); + let watch_root_handle = tokio::task::spawn(watch_root( + file_watching.clone(), + repo_root.clone(), + trigger_shutdown.clone(), + root_watch_exit_signal, + )); + + ( + TurboGrpcServiceInner { + package_discovery, + shutdown: trigger_shutdown, + file_watching, + times_saved: Arc::new(Mutex::new(HashMap::new())), + start_time: Instant::now(), + log_file, + }, + exit_root_watch, + watch_root_handle, + ) + } +} + +impl TurboGrpcServiceInner +where + PD: PackageDiscovery + Send + Sync + 'static, +{ async fn trigger_shutdown(&self) { info!("triggering shutdown"); let _ = self.shutdown.send(()).await; } - async fn wait_for_filewatching(&self) -> Result, RpcError> { - let rx = self.watcher_rx.clone(); - wait_for_filewatching(rx, Duration::from_millis(100)).await - } - async fn watch_globs( &self, hash: String, @@ -326,8 +358,8 @@ impl TurboGrpcServiceInner { time_saved: u64, ) -> Result<(), RpcError> { let glob_set = GlobSet::from_raw(output_globs, output_glob_exclusions)?; - let fw = self.wait_for_filewatching().await?; - fw.glob_watcher + self.file_watching + .glob_watcher .watch_globs(hash.clone(), glob_set, REQUEST_TIMEOUT) .await?; { @@ -346,59 +378,40 @@ impl TurboGrpcServiceInner { let times_saved = self.times_saved.lock().expect("times saved lock poisoned"); times_saved.get(hash.as_str()).copied().unwrap_or_default() }; - let fw = self.wait_for_filewatching().await?; - let changed_globs = fw + let changed_globs = self + .file_watching .glob_watcher .get_changed_globs(hash, candidates, REQUEST_TIMEOUT) .await?; Ok((changed_globs, time_saved)) } - - async fn discover_packages(&self) -> Result { - let fw = self.wait_for_filewatching().await?; - Ok(DiscoveryResponse { - workspaces: fw.package_watcher.get_package_data().await, - package_manager: fw.package_watcher.get_package_manager().await, - }) - } -} - -async fn wait_for_filewatching( - mut rx: watch::Receiver>>, - timeout: Duration, -) -> Result, RpcError> { - let fw = tokio::time::timeout(timeout, rx.wait_for(|opt| opt.is_some())) - .await - .map_err(|_| RpcError::DeadlineExceeded)? // timeout case - .map_err(|_| RpcError::NoFileWatching)?; // sender dropped - - return Ok(fw.as_ref().expect("guaranteed some above").clone()); } async fn watch_root( - filewatching_access: watch::Receiver>>, + filewatching_access: FileWatching, root: AbsoluteSystemPathBuf, trigger_shutdown: mpsc::Sender<()>, mut exit_signal: oneshot::Receiver<()>, ) -> Result<(), WatchError> { - let mut recv_events = { - let Ok(fw) = wait_for_filewatching(filewatching_access, Duration::from_secs(5)).await - else { - return Ok(()); - }; + let mut recv_events = filewatching_access + .watcher + .subscribe() + .await + // we can only encounter an error here if the file watcher is closed (a recv error) + .map_err(|_| WatchError::Setup("file watching shut down".to_string()))?; - fw._watcher.subscribe() - }; + tracing::debug!("watching root: {:?}", root); loop { // Ignore the outer layer of Result, if the sender has closed, filewatching has // gone away and we can return. select! { - _ = &mut exit_signal => return Ok(()), + _ = &mut exit_signal => break, event = recv_events.recv() => { let Ok(event) = event else { - return Ok(()); + break; }; + tracing::debug!("root watcher received event: {:?}", event); let should_trigger_shutdown = match event { // filewatching can throw some weird events, so check that the root is actually gone // before triggering a shutdown @@ -411,15 +424,21 @@ async fn watch_root( // We don't care if a shutdown has already been triggered, // so we can ignore the error. let _ = trigger_shutdown.send(()).await; - return Ok(()); + break; } } } } + + tracing::debug!("no longer watching root"); + + Ok(()) } #[tonic::async_trait] -impl proto::turbod_server::Turbod for TurboGrpcServiceInner { +impl proto::turbod_server::Turbod + for TurboGrpcServiceInner +{ async fn hello( &self, request: tonic::Request, @@ -507,18 +526,30 @@ impl proto::turbod_server::Turbod for TurboGrpcServiceInner { &self, _request: tonic::Request, ) -> Result, tonic::Status> { - let resp = self.discover_packages().await?; - Ok(tonic::Response::new(proto::DiscoverPackagesResponse { - package_files: resp - .workspaces - .into_iter() - .map(|d| proto::PackageFiles { - package_json: d.package_json.to_string(), - turbo_json: d.turbo_json.map(|t| t.to_string()), + self.package_discovery + .discover_packages() + .await + .map(|packages| { + tonic::Response::new(proto::DiscoverPackagesResponse { + package_files: packages + .workspaces + .into_iter() + .map(|d| proto::PackageFiles { + package_json: d.package_json.to_string(), + turbo_json: d.turbo_json.map(|t| t.to_string()), + }) + .collect(), + package_manager: proto::PackageManager::from(packages.package_manager).into(), }) - .collect(), - package_manager: proto::PackageManager::from(resp.package_manager).into(), - })) + }) + .map_err(|e| match e { + turborepo_repository::discovery::Error::Unavailable => { + tonic::Status::unavailable("package discovery unavailable") + } + turborepo_repository::discovery::Error::Failed(e) => { + tonic::Status::internal(format!("{}", e)) + } + }) } } @@ -545,7 +576,7 @@ fn compare_versions(client: Version, server: Version, constraint: proto::Version } } -impl NamedService for TurboGrpcServiceInner { +impl NamedService for TurboGrpcServiceInner { const NAME: &'static str = "turborepo.Daemon"; }