diff --git a/Cargo.lock b/Cargo.lock index c0864d7f39745..7fd1dd5350e3b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -372,6 +372,12 @@ dependencies = [ "async-std", ] +[[package]] +name = "async-once-cell" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9338790e78aa95a416786ec8389546c4b6a1dfc3dc36071ed9518a9413a542eb" + [[package]] name = "async-process" version = "1.6.0" @@ -11770,6 +11776,7 @@ name = "turborepo-repository" version = "0.1.0" dependencies = [ "anyhow", + "async-once-cell", "globwalk", "itertools 0.10.5", "lazy-regex", diff --git a/crates/turborepo-filewatch/src/cookies.rs b/crates/turborepo-filewatch/src/cookies.rs index 41058b5f16bc2..f012675335592 100644 --- a/crates/turborepo-filewatch/src/cookies.rs +++ b/crates/turborepo-filewatch/src/cookies.rs @@ -3,12 +3,14 @@ use std::{collections::BinaryHeap, fs::OpenOptions, time::Duration}; use notify::EventKind; use thiserror::Error; use tokio::{ - sync::{mpsc, oneshot}, + sync::{broadcast, mpsc, oneshot}, time::error::Elapsed, }; use tracing::trace; use turbopath::{AbsoluteSystemPath, AbsoluteSystemPathBuf, PathRelation}; +use crate::{NotifyError, OptionalWatch}; + #[derive(Debug, Error)] pub enum CookieError { #[error("cookie timeout expired")] @@ -22,6 +24,8 @@ pub enum CookieError { io_err: std::io::Error, path: AbsoluteSystemPathBuf, }, + #[error("cookie queue is not available")] + Unavailable, } /// CookieWriter is responsible for assigning filesystem cookies to a request @@ -30,7 +34,8 @@ pub enum CookieError { pub struct CookieWriter { root: AbsoluteSystemPathBuf, timeout: Duration, - cookie_request_tx: mpsc::Sender>>, + cookie_request_sender_lazy: + OptionalWatch>>>, // _exit_ch exists to trigger a close on the receiver when all instances // of this struct are dropped. The task that is receiving events will exit, // dropping the other sender for the broadcast channel, causing all receivers @@ -138,18 +143,40 @@ impl CookieWatcher { } impl CookieWriter { - pub fn new(root: &AbsoluteSystemPath, timeout: Duration) -> Self { + pub fn new( + root: &AbsoluteSystemPath, + timeout: Duration, + mut recv: OptionalWatch>>, + ) -> Self { + let (cookie_request_sender_tx, cookie_request_sender_lazy) = OptionalWatch::new(); let (exit_ch, exit_signal) = mpsc::channel(16); - let (cookie_requests_tx, cookie_requests_rx) = mpsc::channel(16); - tokio::spawn(watch_cookies( - root.to_owned(), - cookie_requests_rx, - exit_signal, - )); + tokio::spawn({ + let root = root.to_owned(); + async move { + if recv.get().await.is_err() { + // here we need to wait for confirmation that the watching end is ready + // before we start sending requests. this has the side effect of not + // enabling the cookie writing mechanism until the watcher is ready + return; + } + + let (cookie_requests_tx, cookie_requests_rx) = mpsc::channel(16); + + if cookie_request_sender_tx + .send(Some(cookie_requests_tx)) + .is_err() + { + // the receiver has already been dropped + tracing::debug!("nobody listening for cookie requests, exiting"); + return; + }; + watch_cookies(root.to_owned(), cookie_requests_rx, exit_signal).await; + } + }); Self { root: root.to_owned(), timeout, - cookie_request_tx: cookie_requests_tx, + cookie_request_sender_lazy, _exit_ch: exit_ch, } } @@ -163,9 +190,18 @@ impl CookieWriter { request: T, ) -> Result, CookieError> { // we need to write the cookie from a single task so as to serialize them + tokio::time::timeout(self.timeout, self.cookie_request_inner(request)).await? + } + + async fn cookie_request_inner(&self, request: T) -> Result, CookieError> { let (resp_tx, resp_rx) = oneshot::channel(); - self.cookie_request_tx.clone().send(resp_tx).await?; - let serial = tokio::time::timeout(self.timeout, resp_rx).await???; + let mut cookie_request_tx = self.cookie_request_sender_lazy.clone(); + let Ok(cookie_request_tx) = cookie_request_tx.get().await.map(|s| s.to_owned()) else { + // the cookie queue is not ready and will never be ready + return Err(CookieError::Unavailable); + }; + cookie_request_tx.send(resp_tx).await?; + let serial = resp_rx.await??; Ok(CookiedRequest { request, serial }) } } @@ -224,7 +260,7 @@ mod test { use turbopath::AbsoluteSystemPathBuf; use super::{CookieWatcher, CookiedRequest}; - use crate::{cookies::CookieWriter, NotifyError}; + use crate::{cookies::CookieWriter, NotifyError, OptionalWatch}; struct TestQuery { resp: oneshot::Sender<()>, @@ -288,8 +324,9 @@ mod test { .unwrap(); let (send_file_events, file_events) = broadcast::channel(16); + let recv = OptionalWatch::once(file_events.resubscribe()); let (reqs_tx, reqs_rx) = mpsc::channel(16); - let cookie_writer = CookieWriter::new(&path, Duration::from_secs(2)); + let cookie_writer = CookieWriter::new(&path, Duration::from_secs(2), recv); let (exit_tx, exit_rx) = oneshot::channel(); let service = TestService { @@ -351,8 +388,9 @@ mod test { .unwrap(); let (send_file_events, file_events) = broadcast::channel(16); + let recv = OptionalWatch::once(file_events.resubscribe()); let (reqs_tx, reqs_rx) = mpsc::channel(16); - let cookie_writer = CookieWriter::new(&path, Duration::from_secs(2)); + let cookie_writer = CookieWriter::new(&path, Duration::from_secs(2), recv); let (exit_tx, exit_rx) = oneshot::channel(); let service = TestService { diff --git a/crates/turborepo-filewatch/src/globwatcher.rs b/crates/turborepo-filewatch/src/globwatcher.rs index 0b7f2c3f95c86..0fbd14b24591c 100644 --- a/crates/turborepo-filewatch/src/globwatcher.rs +++ b/crates/turborepo-filewatch/src/globwatcher.rs @@ -10,12 +10,12 @@ use notify::Event; use thiserror::Error; use tokio::sync::{broadcast, mpsc, oneshot}; use tracing::{debug, warn}; -use turbopath::{AbsoluteSystemPath, AbsoluteSystemPathBuf, RelativeUnixPath}; +use turbopath::{AbsoluteSystemPathBuf, RelativeUnixPath}; use wax::{Any, Glob, Program}; use crate::{ cookies::{CookieError, CookieWatcher, CookieWriter, CookiedRequest}, - NotifyError, + NotifyError, OptionalWatch, }; type Hash = String; @@ -101,6 +101,8 @@ pub enum Error { Closed, #[error("globwatcher request timed out")] Timeout(#[from] tokio::time::error::Elapsed), + #[error("glob watching is unavailable")] + Unavailable, } impl From> for Error { @@ -122,7 +124,7 @@ pub struct GlobWatcher { // dropping the other sender for the broadcast channel, causing all receivers // to be notified of a close. _exit_ch: oneshot::Sender<()>, - query_ch: mpsc::Sender>, + query_ch_lazy: OptionalWatch>>, } #[derive(Debug)] @@ -160,23 +162,43 @@ struct GlobTracker { impl GlobWatcher { pub fn new( - root: &AbsoluteSystemPath, + root: AbsoluteSystemPathBuf, cookie_jar: CookieWriter, - recv: broadcast::Receiver>, + mut recv: OptionalWatch>>, ) -> Self { let (exit_ch, exit_signal) = tokio::sync::oneshot::channel(); - let (query_ch, query_recv) = mpsc::channel(256); + let (query_ch_tx, query_ch_lazy) = OptionalWatch::new(); let cookie_root = cookie_jar.root().to_owned(); - tokio::task::spawn( - GlobTracker::new(root.to_owned(), cookie_root, exit_signal, recv, query_recv).watch(), - ); + tokio::task::spawn(async move { + let Ok(recv) = recv.get().await.map(|r| r.resubscribe()) else { + // if this fails, it means that the filewatcher is not available + // so starting the glob tracker is pointless + return; + }; + + // if the receiver is closed, it means the glob watcher is closed and we + // probably don't want to start the glob tracker + let (query_ch, query_recv) = mpsc::channel(128); + if query_ch_tx.send(Some(query_ch)).is_err() { + tracing::debug!("no queryers for glob watcher, exiting"); + return; + } + + GlobTracker::new(root, cookie_root, exit_signal, recv, query_recv) + .watch() + .await + }); Self { cookie_jar, _exit_ch: exit_ch, - query_ch, + query_ch_lazy, } } + /// Watch a set of globs for a given hash. + /// + /// This function will return `Error::Unavailable` if the globwatcher is not + /// yet available. pub async fn watch_globs( &self, hash: Hash, @@ -189,11 +211,14 @@ impl GlobWatcher { glob_set: globs, resp: tx, }; - let cookied_request = self.cookie_jar.cookie_request(req).await?; - self.query_ch.send(cookied_request).await?; + self.send_request(req).await?; tokio::time::timeout(timeout, rx).await?? } + /// Get the globs that have changed for a given hash. + /// + /// This function will return `Error::Unavailable` if the globwatcher is not + /// yet available. pub async fn get_changed_globs( &self, hash: Hash, @@ -206,10 +231,22 @@ impl GlobWatcher { candidates, resp: tx, }; - let cookied_request = self.cookie_jar.cookie_request(req).await?; - self.query_ch.send(cookied_request).await?; + self.send_request(req).await?; tokio::time::timeout(timeout, rx).await?? } + + async fn send_request(&self, req: Query) -> Result<(), Error> { + let cookied_request = self.cookie_jar.cookie_request(req).await?; + let mut query_ch = self.query_ch_lazy.clone(); + let query_ch = query_ch + .get_immediate() + .ok_or(Error::Unavailable)? + .map(|ch| ch.clone()) + .map_err(|_| Error::Unavailable)?; + + query_ch.send(cookied_request).await?; + Ok(()) + } } #[derive(Debug, Error)] @@ -468,12 +505,10 @@ mod test { setup(&repo_root); let cookie_dir = repo_root.join_component(".git"); - let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root) - .await - .unwrap(); - let cookie_jar = CookieWriter::new(&cookie_dir, Duration::from_secs(2)); - - let glob_watcher = GlobWatcher::new(&repo_root, cookie_jar, watcher.subscribe()); + let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root).unwrap(); + let recv = watcher.watch(); + let cookie_jar = CookieWriter::new(&cookie_dir, Duration::from_secs(2), recv.clone()); + let glob_watcher = GlobWatcher::new(repo_root.clone(), cookie_jar, recv); let raw_includes = &["my-pkg/dist/**", "my-pkg/.next/**"]; let raw_excludes = ["my-pkg/.next/cache/**"]; @@ -553,12 +588,11 @@ mod test { setup(&repo_root); let cookie_dir = repo_root.join_component(".git"); - let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root) - .await - .unwrap(); - let cookie_jar = CookieWriter::new(&cookie_dir, Duration::from_secs(2)); + let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root).unwrap(); + let recv = watcher.watch(); + let cookie_jar = CookieWriter::new(&cookie_dir, Duration::from_secs(2), recv.clone()); - let glob_watcher = GlobWatcher::new(&repo_root, cookie_jar, watcher.subscribe()); + let glob_watcher = GlobWatcher::new(repo_root.clone(), cookie_jar, recv); let raw_includes = &["my-pkg/dist/**", "my-pkg/.next/**"]; let raw_excludes: [&str; 0] = []; @@ -649,12 +683,11 @@ mod test { setup(&repo_root); let cookie_dir = repo_root.join_component(".git"); - let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root) - .await - .unwrap(); - let cookie_jar = CookieWriter::new(&cookie_dir, Duration::from_secs(2)); + let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root).unwrap(); + let recv = watcher.watch(); + let cookie_jar = CookieWriter::new(&cookie_dir, Duration::from_secs(2), recv.clone()); - let glob_watcher = GlobWatcher::new(&repo_root, cookie_jar, watcher.subscribe()); + let glob_watcher = GlobWatcher::new(repo_root.clone(), cookie_jar, recv); // On windows, we expect different sanitization before the // globs are passed in, due to alternative data streams in files. diff --git a/crates/turborepo-filewatch/src/lib.rs b/crates/turborepo-filewatch/src/lib.rs index 3a376abedcfbf..211bb52498f57 100644 --- a/crates/turborepo-filewatch/src/lib.rs +++ b/crates/turborepo-filewatch/src/lib.rs @@ -21,7 +21,7 @@ use notify::event::EventKind; use notify::{Config, RecommendedWatcher}; use notify::{Event, EventHandler, RecursiveMode, Watcher}; use thiserror::Error; -use tokio::sync::{broadcast, mpsc}; +use tokio::sync::{broadcast, mpsc, watch::error::RecvError}; use tracing::{debug, warn}; use turbopath::{AbsoluteSystemPath, AbsoluteSystemPathBuf, PathRelation}; #[cfg(feature = "manual_recursive_watch")] @@ -39,8 +39,11 @@ pub mod cookies; #[cfg(target_os = "macos")] mod fsevent; pub mod globwatcher; +mod optional_watch; pub mod package_watcher; +pub use optional_watch::OptionalWatch; + #[cfg(not(target_os = "macos"))] type Backend = RecommendedWatcher; #[cfg(target_os = "macos")] @@ -79,7 +82,7 @@ impl Display for NotifyError { } pub struct FileSystemWatcher { - sender: broadcast::Sender>, + receiver: OptionalWatch>>, // _exit_ch exists to trigger a close on the receiver when an instance // of this struct is dropped. The task that is receiving events will exit, // dropping the other sender for the broadcast channel, causing all receivers @@ -89,53 +92,90 @@ pub struct FileSystemWatcher { } impl FileSystemWatcher { - pub async fn new_with_default_cookie_dir( - root: &AbsoluteSystemPath, - ) -> Result { + pub fn new_with_default_cookie_dir(root: &AbsoluteSystemPath) -> Result { // We already store logs in .turbo and recommend it be gitignore'd. // Watchman uses .git, but we can't guarantee that git is present _or_ // that the turbo root is the same as the git root. - Self::new(root, &root.join_components(&[".turbo", "cookies"])).await + Self::new(root, root.join_components(&[".turbo", "cookies"])) } - pub async fn new( + pub fn new( root: &AbsoluteSystemPath, - cookie_dir: &AbsoluteSystemPath, + cookie_dir: AbsoluteSystemPathBuf, ) -> Result { - if root.relation_to_path(cookie_dir) != PathRelation::Parent { + tracing::debug!("initing file-system watcher"); + + if root.relation_to_path(&cookie_dir) != PathRelation::Parent { return Err(WatchError::Setup(format!( "Invalid cookie directory: {} does not contain {}", root, cookie_dir ))); } - setup_cookie_dir(cookie_dir)?; - let (sender, _) = broadcast::channel(1024); + + let (file_events_receiver_tx, file_events_receiver_lazy) = OptionalWatch::new(); let (send_file_events, mut recv_file_events) = mpsc::channel(1024); - let watch_root = root.to_owned(); - let broadcast_sender = sender.clone(); - debug!("starting filewatcher"); - let watcher = run_watcher(&watch_root, send_file_events)?; let (exit_ch, exit_signal) = tokio::sync::oneshot::channel(); - // Ensure we are ready to receive new events, not events for existing state - debug!("waiting for initial filesystem cookie"); - wait_for_cookie(cookie_dir, &mut recv_file_events).await?; - tokio::task::spawn(watch_events( - watcher, - watch_root, - recv_file_events, - exit_signal, - broadcast_sender, - )); - debug!("filewatching ready"); + + tokio::task::spawn({ + let cookie_dir = cookie_dir.clone(); + let watch_root = root.to_owned(); + async move { + // this task never yields, so run it in the blocking threadpool + let watch_root_task = watch_root.clone(); + let cookie_dir_task = cookie_dir.clone(); + let task = tokio::task::spawn_blocking(move || { + setup_cookie_dir(&cookie_dir_task)?; + run_watcher(&watch_root_task, send_file_events) + }); + + let Ok(Ok(watcher)) = task.await else { + // if the watcher fails, just return. we don't set the event sender, and other + // services will never start + return; + }; + + // Ensure we are ready to receive new events, not events for existing state + debug!("waiting for initial filesystem cookie"); + if let Err(e) = wait_for_cookie(&cookie_dir, &mut recv_file_events).await { + // if we can't get a cookie here, we should not make the file + // watching available to downstream services + warn!("failed to wait for initial filesystem cookie: {}", e); + return; + } + debug!("filewatching ready"); + + let (sender, receiver) = broadcast::channel(1024); + + if file_events_receiver_tx.send(Some(receiver)).is_err() { + // if this fails, it means that nobody is listening (and + // nobody ever will) likely because the + // watcher has been dropped. We can just exit early. + tracing::debug!("no downstream listeners, exiting"); + return; + } + + watch_events(watcher, watch_root, recv_file_events, exit_signal, sender).await; + } + }); + Ok(Self { - sender, + receiver: file_events_receiver_lazy, _exit_ch: exit_ch, - cookie_dir: cookie_dir.to_owned(), + cookie_dir, }) } - pub fn subscribe(&self) -> broadcast::Receiver> { - self.sender.subscribe() + /// A convenience method around the sender watcher that waits for file + /// watching to be ready and then returns a handle to the file stream. + pub async fn subscribe( + &self, + ) -> Result>, RecvError> { + let mut receiver = self.receiver.clone(); + receiver.get().await.map(|r| r.resubscribe()) + } + + pub fn watch(&self) -> OptionalWatch>> { + self.receiver.clone() } pub fn cookie_dir(&self) -> &AbsoluteSystemPath { @@ -146,6 +186,8 @@ impl FileSystemWatcher { fn setup_cookie_dir(cookie_dir: &AbsoluteSystemPath) -> Result<(), WatchError> { // We need to ensure that the cookie directory is cleared out first so // that we can start over with cookies. + tracing::debug!("setting up the cookie dir"); + if cookie_dir.exists() { cookie_dir.remove_dir_all().map_err(|e| { WatchError::Setup(format!("failed to clear cookie dir {}: {}", cookie_dir, e)) @@ -482,10 +524,8 @@ mod test { let sibling_path = parent_path.join_component("sibling"); sibling_path.create_dir_all().unwrap(); - let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root) - .await - .unwrap(); - let mut recv = watcher.subscribe(); + let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root).unwrap(); + let mut recv = watcher.subscribe().await.unwrap(); expect_watching(&mut recv, &[&repo_root, &parent_path, &child_path]).await; let foo_path = child_path.join_component("foo"); @@ -542,10 +582,8 @@ mod test { let child_path = parent_path.join_component("child"); child_path.create_dir_all().unwrap(); - let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root) - .await - .unwrap(); - let mut recv = watcher.subscribe(); + let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root).unwrap(); + let mut recv = watcher.subscribe().await.unwrap(); expect_watching(&mut recv, &[&repo_root, &parent_path, &child_path]).await; @@ -586,10 +624,8 @@ mod test { let child_path = parent_path.join_component("child"); child_path.create_dir_all().unwrap(); - let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root) - .await - .unwrap(); - let mut recv = watcher.subscribe(); + let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root).unwrap(); + let mut recv = watcher.subscribe().await.unwrap(); expect_watching(&mut recv, &[&repo_root, &parent_path, &child_path]).await; repo_root.remove_dir_all().unwrap(); @@ -617,10 +653,8 @@ mod test { let child_path = parent_path.join_component("child"); child_path.create_dir_all().unwrap(); - let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root) - .await - .unwrap(); - let mut recv = watcher.subscribe(); + let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root).unwrap(); + let mut recv = watcher.subscribe().await.unwrap(); expect_watching(&mut recv, &[&repo_root, &parent_path, &child_path]).await; let new_parent = repo_root.join_component("new_parent"); @@ -651,10 +685,8 @@ mod test { let child_path = parent_path.join_component("child"); child_path.create_dir_all().unwrap(); - let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root) - .await - .unwrap(); - let mut recv = watcher.subscribe(); + let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root).unwrap(); + let mut recv = watcher.subscribe().await.unwrap(); expect_watching(&mut recv, &[&repo_root, &parent_path, &child_path]).await; let new_repo_root = repo_root.parent().unwrap().join_component("new_repo_root"); @@ -684,10 +716,8 @@ mod test { let child_path = parent_path.join_component("child"); child_path.create_dir_all().unwrap(); - let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root) - .await - .unwrap(); - let mut recv = watcher.subscribe(); + let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root).unwrap(); + let mut recv = watcher.subscribe().await.unwrap(); expect_watching(&mut recv, &[&repo_root, &parent_path, &child_path]).await; // Create symlink during file watching @@ -727,10 +757,8 @@ mod test { let symlink_path = repo_root.join_component("symlink"); symlink_path.symlink_to_dir(child_path.as_str()).unwrap(); - let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root) - .await - .unwrap(); - let mut recv = watcher.subscribe(); + let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root).unwrap(); + let mut recv = watcher.subscribe().await.unwrap(); expect_watching(&mut recv, &[&repo_root, &parent_path, &child_path]).await; // Delete symlink during file watching @@ -768,10 +796,8 @@ mod test { let symlink_path = repo_root.join_component("symlink"); symlink_path.symlink_to_dir(child_path.as_str()).unwrap(); - let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root) - .await - .unwrap(); - let mut recv = watcher.subscribe(); + let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root).unwrap(); + let mut recv = watcher.subscribe().await.unwrap(); expect_watching(&mut recv, &[&repo_root, &parent_path, &child_path]).await; // Delete symlink during file watching @@ -814,10 +840,8 @@ mod test { let child_path = parent_path.join_component("child"); child_path.create_dir_all().unwrap(); - let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root) - .await - .unwrap(); - let mut recv = watcher.subscribe(); + let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root).unwrap(); + let mut recv = watcher.subscribe().await.unwrap(); expect_watching(&mut recv, &[&repo_root, &parent_path, &child_path]).await; let repo_parent = repo_root.parent().unwrap(); @@ -854,10 +878,8 @@ mod test { let child_path = parent_path.join_component("child"); child_path.create_dir_all().unwrap(); - let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root) - .await - .unwrap(); - let mut recv = watcher.subscribe(); + let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root).unwrap(); + let mut recv = watcher.subscribe().await.unwrap(); expect_watching(&mut recv, &[&repo_root, &parent_path, &child_path]).await; let repo_parent = repo_root.parent().unwrap(); @@ -877,10 +899,8 @@ mod test { let mut recv = { // create and immediately drop the watcher, which should trigger the exit // channel - let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root) - .await - .unwrap(); - watcher.subscribe() + let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root).unwrap(); + watcher.subscribe().await.unwrap() }; // There may be spurious events, but we should expect a close in short order diff --git a/crates/turborepo-filewatch/src/optional_watch.rs b/crates/turborepo-filewatch/src/optional_watch.rs new file mode 100644 index 0000000000000..f448bc902ce05 --- /dev/null +++ b/crates/turborepo-filewatch/src/optional_watch.rs @@ -0,0 +1,75 @@ +use futures::FutureExt; +use tokio::sync::watch::{self, error::RecvError, Ref}; + +#[derive(Debug)] +pub struct OptionalWatch(watch::Receiver>); + +impl Clone for OptionalWatch { + fn clone(&self) -> Self { + Self(self.0.clone()) + } +} + +/// A handy wrapper around types that are watched and may be None. +/// `SomeRef` is a reference type that is known to be `Some`. +impl OptionalWatch { + /// Create a new `OptionalWatch` with no initial value. + /// + /// Keep in mind that when the sender is dropped, down stream + /// dependencies that are currently waiting will get a RecvError. + pub fn new() -> (watch::Sender>, OptionalWatch) { + let (tx, rx) = watch::channel(None); + (tx, OptionalWatch(rx)) + } + + /// Create a new `OptionalWatch` with an initial, unchanging value. + #[cfg(test)] + pub fn once(init: T) -> Self { + let (_tx, rx) = watch::channel(Some(init)); + OptionalWatch(rx) + } + + /// Wait for the value to be available and then return it. + /// + /// If you receive a `RecvError`, the sender has been dropped, meaning you + /// will not receive any more updates. For efficiencies sake, you should + /// exit the task and drop any senders to other dependencies so that the + /// exit can propagate up the chain. + pub async fn get(&mut self) -> Result, RecvError> { + let recv = self.0.wait_for(|f| f.is_some()).await?; + Ok(SomeRef(recv)) + } + + /// Get the current value, if it is available. + pub fn get_immediate(&mut self) -> Option, RecvError>> { + self.get().now_or_never() + } +} + +pub struct SomeRef<'a, T>(Ref<'a, Option>); + +impl<'a, T> std::ops::Deref for SomeRef<'a, T> { + type Target = T; + + fn deref(&self) -> &Self::Target { + self.0.as_ref().expect("checked") + } +} + +#[cfg(test)] +mod test { + use futures::FutureExt; + + /// Futures have a method that allow you to fetch the value of a future + /// if it is immediately available. This is useful for, for example, + /// allowing consumers to poll a future and get the value if it is + /// available, but otherwise just continue on, rather than wait. + #[tokio::test] + pub async fn now_or_never_works() { + let (tx, mut rx) = super::OptionalWatch::new(); + + tx.send(Some(42)).unwrap(); + + assert_eq!(*rx.get().now_or_never().unwrap().unwrap(), 42); + } +} diff --git a/crates/turborepo-filewatch/src/package_watcher.rs b/crates/turborepo-filewatch/src/package_watcher.rs index 97fa1da0b1f20..13f5e2324af68 100644 --- a/crates/turborepo-filewatch/src/package_watcher.rs +++ b/crates/turborepo-filewatch/src/package_watcher.rs @@ -1,13 +1,9 @@ //! This module hosts the `PackageWatcher` type, which is used to watch the //! filesystem for changes to packages. -use std::{ - collections::HashMap, - future::IntoFuture, - path::PathBuf, - sync::{Arc, Mutex}, -}; +use std::{collections::HashMap, sync::Arc}; +use futures::FutureExt; use notify::Event; use tokio::{ join, @@ -18,11 +14,54 @@ use tokio::{ }; use turbopath::{AbsoluteSystemPath, AbsoluteSystemPathBuf}; use turborepo_repository::{ - discovery::{PackageDiscovery, WorkspaceData}, + discovery::{self, DiscoveryResponse, PackageDiscovery, WorkspaceData}, package_manager::{self, Error, PackageManager, WorkspaceGlobs}, }; -use crate::NotifyError; +use crate::{optional_watch::OptionalWatch, NotifyError}; + +/// A package discovery strategy that watches the file system for changes. Basic +/// idea: +/// - Set up a watcher on file changes on the relevant workspace file for the +/// package manager +/// - When the workspace globs change, re-discover the workspace +/// - When a package.json changes, re-discover the workspace +/// - Keep an in-memory cache of the workspace +pub struct WatchingPackageDiscovery { + /// file watching may not be ready yet so we store a watcher + /// through which we can get the file watching stack + watcher: Arc, +} + +impl WatchingPackageDiscovery { + pub fn new(watcher: Arc) -> Self { + Self { watcher } + } +} + +impl PackageDiscovery for WatchingPackageDiscovery { + async fn discover_packages(&self) -> Result { + tracing::debug!("discovering packages using watcher implementation"); + + // this can either not have a value ready, or the sender has been dropped. in + // either case rust report that the value is unavailable + let package_manager = self + .watcher + .get_package_manager() + .and_then(Result::ok) + .ok_or(discovery::Error::Unavailable)?; + let workspaces = self + .watcher + .get_package_data() + .and_then(Result::ok) + .ok_or(discovery::Error::Unavailable)?; + + Ok(DiscoveryResponse { + workspaces, + package_manager, + }) + } +} /// Watches the filesystem for changes to packages and package managers. pub struct PackageWatcher { @@ -33,114 +72,260 @@ pub struct PackageWatcher { _exit_tx: oneshot::Sender<()>, _handle: tokio::task::JoinHandle<()>, - package_data: Arc>>, - manager_rx: watch::Receiver, + /// The current package data, if available. + package_data: OptionalWatch>, + + /// The current package manager, if available. + package_manager_lazy: OptionalWatch, } impl PackageWatcher { /// Creates a new package watcher whose current package data can be queried. - pub async fn new( + /// `backup_discovery` is used to perform the initial discovery of packages, + /// to populate the state before we can watch. + pub fn new( root: AbsoluteSystemPathBuf, - recv: broadcast::Receiver>, + recv: OptionalWatch>>, backup_discovery: T, ) -> Result { let (exit_tx, exit_rx) = oneshot::channel(); - let subscriber = Subscriber::new(exit_rx, root, recv, backup_discovery).await?; - let manager_rx = subscriber.manager_receiver(); + let subscriber = Subscriber::new(root, recv, backup_discovery)?; + let package_manager_lazy = subscriber.manager_receiver(); let package_data = subscriber.package_data(); - let handle = tokio::spawn(subscriber.watch()); + let handle = tokio::spawn(subscriber.watch(exit_rx)); Ok(Self { _exit_tx: exit_tx, _handle: handle, package_data, - manager_rx, + package_manager_lazy, }) } - pub async fn get_package_data(&self) -> Vec { - self.package_data - .lock() - .expect("not poisoned") - .clone() - .into_values() - .collect() + /// Get the package data. If the package data is not available, this will + /// block until it is. + pub async fn wait_for_package_data( + &self, + ) -> Result, watch::error::RecvError> { + let mut recv = self.package_data.clone(); + recv.get().await.map(|v| v.values().cloned().collect()) + } + + /// A convenience wrapper around `FutureExt::now_or_never` to let you get + /// the package data if it is immediately available. + pub fn get_package_data(&self) -> Option, watch::error::RecvError>> { + let mut recv = self.package_data.clone(); + let data = if let Some(Ok(inner)) = recv.get().now_or_never() { + Some(Ok(inner.values().cloned().collect())) + } else { + None + }; + data + } + + /// Get the package manager. If the package manager is not available, this + /// will block until it is. + pub async fn wait_for_package_manager( + &self, + ) -> Result { + let mut recv = self.package_manager_lazy.clone(); + recv.get().await.map(|s| s.manager) + } + + /// A convenience wrapper around `FutureExt::now_or_never` to let you get + /// the package manager if it is immediately available. + pub fn get_package_manager(&self) -> Option> { + let mut recv = self.package_manager_lazy.clone(); + // the borrow checker doesn't like returning immediately here so assign to a var + let data = if let Some(Ok(inner)) = recv.get().now_or_never() { + Some(Ok(inner.manager)) + } else { + None + }; + data } - pub async fn get_package_manager(&self) -> PackageManager { - *self.manager_rx.borrow() + pub fn watch(&self) -> OptionalWatch> { + self.package_data.clone() } } /// The underlying task that listens to file system events and updates the /// internal package state. struct Subscriber { - exit_rx: oneshot::Receiver<()>, - filter: WorkspaceGlobs, - recv: broadcast::Receiver>, + /// we need to hold on to this. dropping it will close the downstream + /// data dependencies + #[allow(clippy::type_complexity)] + #[allow(dead_code)] + file_event_receiver_tx: + Arc>>>>, + + file_event_receiver_lazy: OptionalWatch>>, + backup_discovery: Arc, + repo_root: AbsoluteSystemPathBuf, - backup_discovery: T, + root_package_json_path: AbsoluteSystemPathBuf, // package manager data - package_data: Arc>>, - manager_rx: watch::Receiver, - manager_tx: watch::Sender, - - // stored as PathBuf to avoid processing later. - // if package_json changes, we need to re-infer - // the package manager - package_json_path: std::path::PathBuf, - workspace_config_path: std::path::PathBuf, + package_manager_tx: Arc>>, + package_manager_lazy: OptionalWatch, + package_data_tx: Arc>>>, + package_data_lazy: OptionalWatch>, +} + +/// A collection of state inferred from a package manager. All this data will +/// change if the package manager changes. +#[derive(Clone)] +struct PackageManagerState { + manager: PackageManager, + // we need to wrap in Arc to make it send / sync + filter: Arc, + workspace_config_path: AbsoluteSystemPathBuf, } -impl Subscriber { - async fn new( - exit_rx: oneshot::Receiver<()>, +impl Subscriber { + /// Creates a new instance of PackageDiscovery. This will start a task that + /// performs the initial discovery using the `backup_discovery` of your + /// choice, and then listens to file system events to keep the package + /// data up to date. + fn new( repo_root: AbsoluteSystemPathBuf, - recv: broadcast::Receiver>, - mut discovery: T, + mut recv: OptionalWatch>>, + backup_discovery: T, ) -> Result { - let initial_discovery = discovery.discover_packages().await?; + let (package_data_tx, package_data_lazy) = OptionalWatch::new(); + let package_data_tx = Arc::new(package_data_tx); + let (package_manager_tx, package_manager_lazy) = OptionalWatch::new(); + let package_manager_tx = Arc::new(package_manager_tx); + + // we create a second optional watch here so that we can ensure it is ready and + // pass it down stream after the initial discovery, otherwise our package + // discovery watcher will consume events before we have our initial state + let (file_event_receiver_tx, file_event_receiver_lazy) = OptionalWatch::new(); + let file_event_receiver_tx = Arc::new(file_event_receiver_tx); + + let backup_discovery = Arc::new(backup_discovery); + + let package_json_path = repo_root.join_component("package.json"); + + let _task = tokio::spawn({ + let package_data_tx = package_data_tx.clone(); + let manager_tx = package_manager_tx.clone(); + let backup_discovery = backup_discovery.clone(); + let repo_root = repo_root.clone(); + let package_json_path = package_json_path.clone(); + let recv_tx = file_event_receiver_tx.clone(); + async move { + // wait for the watcher, so we can process events that happen during discovery + let Ok(recv) = recv.get().await.map(|r| r.resubscribe()) else { + // if we get here, it means that file watching has not started, so we should + // just report that the package watcher is not available + tracing::debug!("file watching shut down, package watcher not available"); + return; + }; + + let initial_discovery = backup_discovery.discover_packages().await; + + let Ok(initial_discovery) = initial_discovery else { + // if initial discovery fails, there is nothing we can do. we should just report + // that the package watcher is not available + // + // NOTE: in the future, if we decide to differentiate between 'not ready' and + // unavailable, we MUST update the status here to unavailable or the client will + // hang + return; + }; - let (package_json_path, workspace_config_path, filter) = - Self::update_package_manager(&initial_discovery.package_manager, &repo_root)?; + let Ok((workspace_config_path, filter)) = Self::update_package_manager( + &initial_discovery.package_manager, + &repo_root, + &package_json_path, + ) else { + // similar story here, if the package manager cannot be read, we should just + // report that the package watcher is not available + return; + }; - let (manager_tx, manager_rx) = watch::channel(initial_discovery.package_manager); + // now that the two pieces of data are available, we can send the package + // manager and set the packages + + let state = PackageManagerState { + manager: initial_discovery.package_manager, + filter: Arc::new(filter), + workspace_config_path, + }; + + // if either of these fail, it means that there are no more subscribers and we + // should just ignore it, since we are likely closing + let manager_listeners = if manager_tx.send(Some(state)).is_err() { + tracing::debug!("no listeners for package manager"); + false + } else { + true + }; + + let package_data_listeners = if package_data_tx + .send(Some( + initial_discovery + .workspaces + .into_iter() + .map(|p| (p.package_json.parent().expect("non-root").to_owned(), p)) + .collect::>(), + )) + .is_err() + { + tracing::debug!("no listeners for package data"); + false + } else { + true + }; + + // if we have no listeners for either, we should just exit + if manager_listeners || package_data_listeners { + _ = recv_tx.send(Some(recv)); + } + } + }); Ok(Self { - exit_rx, - filter, - package_data: Arc::new(Mutex::new( - initial_discovery - .workspaces - .into_iter() - .map(|p| (p.package_json.parent().expect("non-root").to_owned(), p)) - .collect(), - )), - recv, - manager_rx, - manager_tx, + file_event_receiver_tx, + file_event_receiver_lazy, + backup_discovery, repo_root, - backup_discovery: discovery, - - package_json_path, - workspace_config_path, + root_package_json_path: package_json_path, + package_data_lazy, + package_data_tx, + package_manager_lazy, + package_manager_tx, }) } - async fn watch(mut self) { + async fn watch(mut self, exit_rx: oneshot::Receiver<()>) { // initialize the contents self.rediscover_packages().await; - // respond to changes - loop { - tokio::select! { - biased; - _ = &mut self.exit_rx => { - tracing::info!("exiting package watcher"); - return - }, - file_event = self.recv.recv().into_future() => match file_event { - Ok(Ok(event)) => self.handle_file_event(event).await, + let process = async move { + let Ok(mut recv) = self + .file_event_receiver_lazy + .get() + .await + .map(|r| r.resubscribe()) + else { + // if the channel is closed, we should just exit + tracing::debug!("file watcher shut down, exiting"); + return; + }; + + tracing::debug!("package watcher ready"); + loop { + let file_event = recv.recv().await; + match file_event { + Ok(Ok(event)) => match self.handle_file_event(&event).await { + Ok(()) => {} + Err(()) => { + tracing::debug!("package watching is closing, exiting"); + return; + } + }, // if we get an error, we need to re-discover the packages Ok(Err(_)) => self.rediscover_packages().await, Err(RecvError::Closed) => return, @@ -148,188 +333,293 @@ impl Subscriber { Err(RecvError::Lagged(count)) => { tracing::warn!("lagged behind {count} processing file watching events"); self.rediscover_packages().await; - }, + } } } + }; + + // respond to changes + + tokio::select! { + biased; + _ = exit_rx => { + tracing::debug!("exiting package watcher due to signal"); + }, + _ = process => { + tracing::debug!("exiting package watcher due to process end"); + } } } fn update_package_manager( manager: &PackageManager, repo_root: &AbsoluteSystemPath, - ) -> Result<(PathBuf, PathBuf, WorkspaceGlobs), Error> { - let package_json_path = repo_root - .join_component("package.json") - .as_std_path() - .to_owned(); - - let workspace_config_path = manager - .workspace_configuration_path() - .map_or(package_json_path.clone(), |p| { - repo_root.join_component(p).as_std_path().to_owned() - }); + package_json_path: &AbsoluteSystemPath, + ) -> Result<(AbsoluteSystemPathBuf, WorkspaceGlobs), Error> { + let workspace_config_path = manager.workspace_configuration_path().map_or_else( + || package_json_path.to_owned(), + |p| repo_root.join_component(p), + ); let filter = manager.get_workspace_globs(repo_root)?; - Ok((package_json_path, workspace_config_path, filter)) + Ok((workspace_config_path, filter)) } - pub fn manager_receiver(&self) -> watch::Receiver { - self.manager_rx.clone() + pub fn manager_receiver(&self) -> OptionalWatch { + self.package_manager_lazy.clone() } - pub fn package_data(&self) -> Arc>> { - self.package_data.clone() - } - - fn manager(&self) -> PackageManager { - *self.manager_rx.borrow() + pub fn package_data(&self) -> OptionalWatch> { + self.package_data_lazy.clone() } - async fn handle_file_event(&mut self, file_event: Event) { + /// Returns Err(()) if the package manager channel is closed, indicating + /// that the entire watching task should exit. + async fn handle_file_event(&mut self, file_event: &Event) -> Result<(), ()> { tracing::trace!("file event: {:?} {:?}", file_event.kind, file_event.paths); if file_event .paths .iter() - .any(|p| self.package_json_path.eq(p)) + .any(|p| self.root_package_json_path.as_std_path().eq(p)) + { + if let Err(e) = self.handle_root_package_json_change().await { + tracing::error!("error discovering package manager: {}", e); + } + } + + match self.have_workspace_globs_changed(file_event).await { + Ok(true) => { + // + self.rediscover_packages().await; + Ok(()) + } + Ok(false) => { + // it is the end of the function so we are going to return regardless + self.handle_package_json_change(file_event).await + } + Err(()) => Err(()), + } + } + + /// Returns Err(()) if the package manager channel is closed, indicating + /// that the entire watching task should exit. + async fn handle_package_json_change(&mut self, file_event: &Event) -> Result<(), ()> { + let Ok(state) = self.package_manager_lazy.get().await.map(|x| x.to_owned()) else { + // the channel is closed, so there is no state to write into, return + return Err(()); + }; + + // here, we can only update if we have a valid package state + + // if a path is not a valid utf8 string, it is not a valid path, so ignore + for path in file_event + .paths + .iter() + .filter_map(|p| p.as_os_str().to_str()) { - // if the package.json changed, we need to re-infer the package manager - // and update the glob list - tracing::debug!("package.json changed"); + let path_file = AbsoluteSystemPathBuf::new(path).expect("watched paths are absolute"); - let resp = match self.backup_discovery.discover_packages().await { - Ok(pm) => pm, + // the path to the workspace this file is in is the parent + let path_workspace = path_file + .parent() + .expect("watched paths will not be at the root") + .to_owned(); + + let is_workspace = match state + .filter + .target_is_workspace(&self.repo_root, &path_workspace) + { + Ok(is_workspace) => is_workspace, Err(e) => { - tracing::error!("error discovering package manager: {}", e); - return; + // this will only error if `repo_root` is not an anchor of `path_workspace`. + // if we hit this case, we can safely ignore it + tracing::debug!("yielded path not in workspace: {:?}", e); + continue; } }; - let new_manager = Self::update_package_manager(&resp.package_manager, &self.repo_root) - .map(|(a, b, c)| (resp, a, b, c)); - - match new_manager { - Ok((new_manager, package_json_path, workspace_config_path, filter)) => { - tracing::debug!( - "new package manager data: {:?}, {:?}, {:?}", - new_manager.package_manager, - package_json_path, - filter - ); - // if this fails, we are closing anyways so ignore - self.manager_tx.send(new_manager.package_manager).ok(); - { - let mut data = self.package_data.lock().unwrap(); - *data = new_manager - .workspaces - .into_iter() - .map(|p| (p.package_json.parent().expect("non-root").to_owned(), p)) - .collect(); - } - self.package_json_path = package_json_path; - self.workspace_config_path = workspace_config_path; - self.filter = filter; - } - Err(e) => { - // a change in the package json does not necessarily mean - // that the package manager has changed, so continue with - // best effort - tracing::error!("error getting package manager: {}", e); - } + if is_workspace { + tracing::debug!("tracing file in package: {:?}", path_file); + let package_json = path_workspace.join_component("package.json"); + let turbo_json = path_workspace.join_component("turbo.json"); + + let (package_exists, turbo_exists) = join!( + tokio::fs::try_exists(&package_json), + tokio::fs::try_exists(&turbo_json) + ); + + self.package_data_tx + .send_modify(|mut data| match (&mut data, package_exists) { + (Some(data), Ok(true)) => { + data.insert( + path_workspace, + WorkspaceData { + package_json, + turbo_json: turbo_exists + .unwrap_or_default() + .then_some(turbo_json), + }, + ); + } + (Some(data), Ok(false)) => { + data.remove(&path_workspace); + } + (None, Ok(true)) => { + let mut map = HashMap::new(); + map.insert( + path_workspace, + WorkspaceData { + package_json, + turbo_json: turbo_exists + .unwrap_or_default() + .then_some(turbo_json), + }, + ); + *data = Some(map); + } + (None, Ok(false)) => {} // do nothing + (_, Err(_)) => todo!(), + }); } } - // if it is the package manager, update the glob list - let changed = if file_event + Ok(()) + } + + /// A change to the workspace config path could mean a change to the package + /// glob list. If this happens, we need to re-walk the packages. + /// + /// Returns Err(()) if the package manager channel is closed, indicating + /// that the entire watching task should exit. + async fn have_workspace_globs_changed(&mut self, file_event: &Event) -> Result { + // here, we can only update if we have a valid package state + let Ok(state) = self.package_manager_lazy.get().await.map(|s| s.to_owned()) else { + // we can only fail receiving if the channel is closed, + // which indicated that the entire watching task should exit + return Err(()); + }; + + if file_event .paths .iter() - .any(|p| self.workspace_config_path.eq(p)) + .any(|p| state.workspace_config_path.as_std_path().eq(p)) { - let new_filter = self - .manager() + let new_filter = state + .manager .get_workspace_globs(&self.repo_root) + .map(Arc::new) // under some saving strategies a file can be totally empty for a moment // during a save. these strategies emit multiple events and so we can // a previous or subsequent event in the 'cluster' will still trigger - .unwrap_or_else(|_| self.filter.clone()); - - let changed = self.filter != new_filter; - self.filter = new_filter; - changed + .unwrap_or_else(|_| state.filter.clone()); + + Ok(self.package_manager_tx.send_if_modified(|f| match f { + Some(state) if state.filter == new_filter => false, + Some(state) => { + tracing::debug!("workspace globs changed: {:?}", new_filter); + state.filter = new_filter; + true + } + // if we haven't got a valid manager, then it probably means + // that we are currently calcuating one, so we should just + // ignore this event + None => false, + })) } else { - false - }; + Ok(false) + } + } - if changed { - // if the glob list has changed, do a recursive walk and replace - self.rediscover_packages().await; - } else { - // if a path is not a valid utf8 string, it is not a valid path, so ignore - for path in file_event - .paths - .iter() - .filter_map(|p| p.as_os_str().to_str()) - { - let path_file = - AbsoluteSystemPathBuf::new(path).expect("watched paths are absolute"); - - // the path to the workspace this file is in is the parent - let path_workspace = path_file - .parent() - .expect("watched paths will not be at the root") - .to_owned(); - - let is_workspace = match self - .filter - .target_is_workspace(&self.repo_root, &path_workspace) - { - Ok(is_workspace) => is_workspace, - Err(e) => { - // this will only error if `repo_root` is not an anchor of `path_workspace`. - // if we hit this case, we can safely ignore it - tracing::debug!("yielded path not in workspace: {:?}", e); - continue; - } + /// A change to the root package json means we need to re-infer the package + /// manager, update the glob list, and re-walk the packages. + /// + /// todo: we can probably improve the uptime here by splitting the package + /// manager out of the package discovery. if the package manager has + /// not changed, we probably do not need to re-walk the packages + async fn handle_root_package_json_change(&mut self) -> Result<(), discovery::Error> { + { + // clear all data + self.package_manager_tx.send(None).ok(); + self.package_data_tx.send(None).ok(); + } + tracing::debug!("root package.json changed, refreshing package manager and globs"); + let resp = self.backup_discovery.discover_packages().await?; + let new_manager = Self::update_package_manager( + &resp.package_manager, + &self.repo_root, + &self.root_package_json_path, + ) + .map(move |(a, b)| (resp, a, b)); + + // if the package.json changed, we need to re-infer the package manager + // and update the glob list + + match new_manager { + Ok((new_manager, workspace_config_path, filter)) => { + tracing::debug!( + "new package manager data: {:?}, {:?}", + new_manager.package_manager, + filter + ); + + let state = PackageManagerState { + manager: new_manager.package_manager, + filter: Arc::new(filter), + workspace_config_path, }; - if is_workspace { - tracing::debug!("tracing file in package: {:?}", path_file); - let package_json = path_workspace.join_component("package.json"); - let turbo_json = path_workspace.join_component("turbo.json"); - - let (package_exists, turbo_exists) = join!( - tokio::fs::try_exists(&package_json), - tokio::fs::try_exists(&turbo_json) - ); - - let mut data = self.package_data.lock().expect("not poisoned"); - if let Ok(true) = package_exists { - data.insert( - path_workspace, - WorkspaceData { - package_json, - turbo_json: turbo_exists.unwrap_or_default().then_some(turbo_json), - }, - ); - } else { - data.remove(&path_workspace); - } + { + // if this fails, we are closing anyways so ignore + self.package_manager_tx.send(Some(state)).ok(); + self.package_data_tx.send_modify(move |mut d| { + let new_data = new_manager + .workspaces + .into_iter() + .map(|p| (p.package_json.parent().expect("non-root").to_owned(), p)) + .collect::>(); + if let Some(data) = &mut d { + *data = new_data; + } else { + *d = Some(new_data); + } + }); } } + Err(e) => { + // if we cannot update the package manager, we should just leave + // the package manager as None and make the package data unavailable + tracing::error!("error getting package manager: {}", e); + } } + + Ok(()) } async fn rediscover_packages(&mut self) { tracing::debug!("rediscovering packages"); - if let Ok(data) = self.backup_discovery.discover_packages().await { - let workspace = data - .workspaces - .into_iter() - .map(|p| (p.package_json.parent().expect("non-root").to_owned(), p)) - .collect(); - let mut data = self.package_data.lock().expect("not poisoned"); - *data = workspace; + + // make sure package data is unavailable while we are updating + // if this fails, we have no subscribers so we can just exit + if self.package_data_tx.send(None).is_err() { + return; + } + + if let Ok(response) = self.backup_discovery.discover_packages().await { + self.package_data_tx.send_modify(|d| { + let new_data = response + .workspaces + .into_iter() + .map(|p| (p.package_json.parent().expect("non-root").to_owned(), p)) + .collect::>(); + if let Some(data) = d { + *data = new_data; + } else { + *d = Some(new_data); + } + }); } else { + // package data stays unavailable tracing::error!("error discovering packages"); } } @@ -348,6 +638,7 @@ mod test { }; use super::Subscriber; + use crate::OptionalWatch; #[derive(Debug)] struct MockDiscovery { @@ -356,7 +647,7 @@ mod test { } impl super::PackageDiscovery for MockDiscovery { - async fn discover_packages(&mut self) -> Result { + async fn discover_packages(&self) -> Result { Ok(DiscoveryResponse { package_manager: self.manager, workspaces: self.package_data.lock().unwrap().clone(), @@ -370,6 +661,7 @@ mod test { let tmp = tempfile::tempdir().unwrap(); let (tx, rx) = broadcast::channel(10); + let rx = OptionalWatch::once(rx); let (_exit_tx, exit_rx) = tokio::sync::oneshot::channel(); let root: AbsoluteSystemPathBuf = tmp.path().try_into().unwrap(); @@ -410,13 +702,11 @@ mod test { package_data: Arc::new(Mutex::new(package_data)), }; - let subscriber = Subscriber::new(exit_rx, root.clone(), rx, mock_discovery) - .await - .unwrap(); + let subscriber = Subscriber::new(root.clone(), rx, mock_discovery).unwrap(); - let package_data = subscriber.package_data(); + let mut package_data = subscriber.package_data(); - let _handle = tokio::spawn(subscriber.watch()); + let _handle = tokio::spawn(subscriber.watch(exit_rx)); tx.send(Ok(notify::Event { kind: notify::EventKind::Create(notify::event::CreateKind::File), @@ -428,7 +718,8 @@ mod test { assert_eq!( package_data - .lock() + .get() + .await .unwrap() .values() .cloned() @@ -476,7 +767,8 @@ mod test { assert_eq!( package_data - .lock() + .get() + .await .unwrap() .values() .cloned() @@ -494,6 +786,7 @@ mod test { let tmp = tempfile::tempdir().unwrap(); let (tx, rx) = broadcast::channel(10); + let rx = OptionalWatch::once(rx); let (_exit_tx, exit_rx) = tokio::sync::oneshot::channel(); let root = AbsoluteSystemPathBuf::new(tmp.path().to_string_lossy()).unwrap(); @@ -543,19 +836,18 @@ mod test { package_data: package_data_raw.clone(), }; - let subscriber = Subscriber::new(exit_rx, root.clone(), rx, mock_discovery) - .await - .unwrap(); + let subscriber = Subscriber::new(root.clone(), rx, mock_discovery).unwrap(); - let package_data = subscriber.package_data(); + let mut package_data = subscriber.package_data(); - let _handle = tokio::spawn(subscriber.watch()); + let _handle = tokio::spawn(subscriber.watch(exit_rx)); tokio::time::sleep(std::time::Duration::from_millis(100)).await; assert_eq!( package_data - .lock() + .get() + .await .unwrap() .values() .cloned() @@ -616,7 +908,8 @@ mod test { assert_eq!( package_data - .lock() + .get() + .await .unwrap() .values() .cloned() diff --git a/crates/turborepo-lib/src/daemon/server.rs b/crates/turborepo-lib/src/daemon/server.rs index d293af88405d6..583585fb1daad 100644 --- a/crates/turborepo-lib/src/daemon/server.rs +++ b/crates/turborepo-lib/src/daemon/server.rs @@ -26,7 +26,8 @@ use semver::Version; use thiserror::Error; use tokio::{ select, - sync::{mpsc, oneshot, watch}, + sync::{mpsc, oneshot}, + task::JoinHandle, }; use tonic::transport::{NamedService, Server}; use tower::ServiceBuilder; @@ -35,15 +36,15 @@ use turbopath::{AbsoluteSystemPath, AbsoluteSystemPathBuf}; use turborepo_filewatch::{ cookies::CookieWriter, globwatcher::{Error as GlobWatcherError, GlobError, GlobSet, GlobWatcher}, - package_watcher::PackageWatcher, + package_watcher::{PackageWatcher, WatchingPackageDiscovery}, FileSystemWatcher, WatchError, }; use turborepo_repository::discovery::{ - DiscoveryResponse, LocalPackageDiscoveryBuilder, PackageDiscovery, PackageDiscoveryBuilder, + LocalPackageDiscoveryBuilder, PackageDiscovery, PackageDiscoveryBuilder, }; -use super::{bump_timeout::BumpTimeout, endpoint::SocketOpenError, proto, Paths}; -use crate::daemon::{bump_timeout_layer::BumpTimeoutLayer, endpoint::listen_socket}; +use super::{bump_timeout::BumpTimeout, endpoint::SocketOpenError, proto}; +use crate::daemon::{bump_timeout_layer::BumpTimeoutLayer, endpoint::listen_socket, Paths}; #[derive(Debug)] #[allow(dead_code)] @@ -56,10 +57,14 @@ pub enum CloseReason { SocketOpenError(SocketOpenError), } +/// We may need to pass out references to a subset of these, so +/// we'll make them public Arcs. Eventually we can stabilize on +/// a general API and close this up. +#[derive(Clone)] pub struct FileWatching { - _watcher: FileSystemWatcher, - pub glob_watcher: GlobWatcher, - pub package_watcher: PackageWatcher, + watcher: Arc, + pub glob_watcher: Arc, + pub package_watcher: Arc, } #[derive(Debug, Error)] @@ -87,34 +92,47 @@ impl From for tonic::Status { } } -async fn start_filewatching( - repo_root: AbsoluteSystemPathBuf, - watcher_tx: watch::Sender>>, - backup_discovery: PD, -) -> Result<(), WatchError> { - let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root).await?; - let cookie_writer = CookieWriter::new(watcher.cookie_dir(), Duration::from_millis(100)); - let glob_watcher = GlobWatcher::new(&repo_root, cookie_writer, watcher.subscribe()); - let package_watcher = - PackageWatcher::new(repo_root.clone(), watcher.subscribe(), backup_discovery) - .await - .map_err(|e| WatchError::Setup(format!("{:?}", e)))?; - // We can ignore failures here, it means the server is shutting down and - // receivers have gone out of scope. - let _ = watcher_tx.send(Some(Arc::new(FileWatching { - _watcher: watcher, - glob_watcher, - package_watcher, - }))); - Ok(()) +impl FileWatching { + /// This function is called in the constructor for the `TurboGrpcService` + /// and should defer ALL heavy computation to the background, making use + /// of `OptionalWatch` to ensure that the server can start up without + /// waiting for the filewatcher to be ready. Using `OptionalWatch`, + /// dependent services can wait for resources they need to become + /// available, and the server can start up without waiting for them. + pub fn new( + repo_root: AbsoluteSystemPathBuf, + backup_discovery: PD, + ) -> Result { + let watcher = Arc::new(FileSystemWatcher::new_with_default_cookie_dir(&repo_root)?); + let recv = watcher.watch(); + + let cookie_watcher = CookieWriter::new( + watcher.cookie_dir(), + Duration::from_millis(100), + recv.clone(), + ); + let glob_watcher = Arc::new(GlobWatcher::new( + repo_root.clone(), + cookie_watcher, + recv.clone(), + )); + let package_watcher = Arc::new( + PackageWatcher::new(repo_root.clone(), recv.clone(), backup_discovery) + .map_err(|e| WatchError::Setup(format!("{:?}", e)))?, + ); + + Ok(FileWatching { + watcher, + glob_watcher, + package_watcher, + }) + } } /// Timeout for every RPC the server handles const REQUEST_TIMEOUT: Duration = Duration::from_millis(100); pub struct TurboGrpcService { - watcher_tx: watch::Sender>>, - watcher_rx: watch::Receiver>>, repo_root: AbsoluteSystemPathBuf, paths: Paths, timeout: Duration, @@ -139,8 +157,6 @@ where timeout: Duration, external_shutdown: S, ) -> Self { - let (watcher_tx, watcher_rx) = watch::channel(None); - let package_discovery_backup = LocalPackageDiscoveryBuilder::new(repo_root.clone(), None, None); @@ -148,8 +164,6 @@ where // so we use a private struct with just the pieces of state needed to handle // RPCs. TurboGrpcService { - watcher_tx, - watcher_rx, repo_root, paths, timeout, @@ -163,7 +177,7 @@ impl TurboGrpcService where S: Future, PDB: PackageDiscoveryBuilder, - PDB::Output: PackageDiscovery + Send + 'static, + PDB::Output: PackageDiscovery + Send + Sync + 'static, { /// If errors are encountered when loading the package discovery, this /// builder will be used as a backup to refresh the state. @@ -176,16 +190,12 @@ where paths: self.paths, repo_root: self.repo_root, timeout: self.timeout, - watcher_rx: self.watcher_rx, - watcher_tx: self.watcher_tx, package_discovery_backup, } } pub async fn serve(self) -> Result { let Self { - watcher_tx, - watcher_rx, external_shutdown, paths, repo_root, @@ -193,6 +203,19 @@ where package_discovery_backup, } = self; + // A channel to trigger the shutdown of the gRPC server. This is handed out + // to components internal to the server process such as root watching, as + // well as available to the gRPC server itself to handle the shutdown RPC. + let (trigger_shutdown, mut shutdown_signal) = mpsc::channel::<()>(1); + + let package_discovery_backup = package_discovery_backup.build()?; + let (service, exit_root_watch, watch_root_handle) = TurboGrpcServiceInner::new( + package_discovery_backup, + repo_root.clone(), + trigger_shutdown, + paths.log_file, + ); + let running = Arc::new(AtomicBool::new(true)); let (_pid_lock, stream) = match listen_socket(&paths.pid_file, &paths.sock_file, running.clone()).await { @@ -201,37 +224,6 @@ where }; trace!("acquired connection stream for socket"); - let watcher_repo_root = repo_root.to_owned(); - // A channel to trigger the shutdown of the gRPC server. This is handed out - // to components internal to the server process such as root watching, as - // well as available to the gRPC server itself to handle the shutdown RPC. - let (trigger_shutdown, mut shutdown_signal) = mpsc::channel::<()>(1); - - let backup_discovery = package_discovery_backup.build()?; - - // watch receivers as a group own the filewatcher, which will exit when - // all references are dropped. - let fw_shutdown = trigger_shutdown.clone(); - let fw_handle = tokio::task::spawn(async move { - if let Err(e) = - start_filewatching(watcher_repo_root, watcher_tx, backup_discovery).await - { - error!("filewatching failed to start: {}", e); - let _ = fw_shutdown.send(()).await; - } - info!("filewatching started"); - }); - // exit_root_watch delivers a signal to the root watch loop to exit. - // In the event that the server shuts down via some other mechanism, this - // cleans up root watching task. - let (exit_root_watch, root_watch_exit_signal) = oneshot::channel(); - let watch_root_handle = tokio::task::spawn(watch_root( - watcher_rx.clone(), - repo_root.to_owned(), - trigger_shutdown.clone(), - root_watch_exit_signal, - )); - let bump_timeout = Arc::new(BumpTimeout::new(timeout)); let timeout_fut = bump_timeout.wait(); @@ -245,16 +237,6 @@ where }; }; - // Run the actual service. It takes ownership of the struct given to it, - // so we use a private struct with just the pieces of state needed to handle - // RPCs. - let service = TurboGrpcServiceInner { - shutdown: trigger_shutdown, - watcher_rx, - times_saved: Arc::new(Mutex::new(HashMap::new())), - start_time: Instant::now(), - log_file: paths.log_file.clone(), - }; let server_fut = { let service = ServiceBuilder::new() .layer(BumpTimeoutLayer::new(bump_timeout.clone())) @@ -270,8 +252,9 @@ where }; // Wait for the server to exit. // This can be triggered by timeout, root watcher, or an RPC + tracing::debug!("server started"); let _ = server_fut.await; - info!("gRPC server exited"); + tracing::debug!("server exited"); // Ensure our timer will exit running.store(false, Ordering::SeqCst); // We expect to have a signal from the grpc server on what triggered the exit @@ -283,33 +266,75 @@ where let _ = exit_root_watch.send(()); let _ = watch_root_handle.await; trace!("root watching exited"); - // Clean up the filewatching handle in the event that we never even got - // started with filewatching. Again, we don't care about the error here. - let _ = fw_handle.await; - trace!("filewatching handle joined"); Ok(close_reason) } } -struct TurboGrpcServiceInner { +struct TurboGrpcServiceInner { shutdown: mpsc::Sender<()>, - watcher_rx: watch::Receiver>>, + file_watching: FileWatching, times_saved: Arc>>, start_time: Instant, log_file: AbsoluteSystemPathBuf, + package_discovery: PD, +} + +// we have a grpc service that uses watching package discovery, and where the +// watching package hasher also uses watching package discovery as well as +// falling back to a local package hasher +impl TurboGrpcServiceInner> { + pub fn new( + package_discovery_backup: PD, + repo_root: AbsoluteSystemPathBuf, + trigger_shutdown: mpsc::Sender<()>, + log_file: AbsoluteSystemPathBuf, + ) -> ( + Self, + oneshot::Sender<()>, + JoinHandle>, + ) { + let file_watching = FileWatching::new(repo_root.clone(), package_discovery_backup).unwrap(); + + tracing::debug!("initing package discovery"); + let package_discovery = Arc::new(WatchingPackageDiscovery::new( + file_watching.package_watcher.clone(), + )); + + // exit_root_watch delivers a signal to the root watch loop to exit. + // In the event that the server shuts down via some other mechanism, this + // cleans up root watching task. + let (exit_root_watch, root_watch_exit_signal) = oneshot::channel(); + let watch_root_handle = tokio::task::spawn(watch_root( + file_watching.clone(), + repo_root.clone(), + trigger_shutdown.clone(), + root_watch_exit_signal, + )); + + ( + TurboGrpcServiceInner { + package_discovery, + shutdown: trigger_shutdown, + file_watching, + times_saved: Arc::new(Mutex::new(HashMap::new())), + start_time: Instant::now(), + log_file, + }, + exit_root_watch, + watch_root_handle, + ) + } } -impl TurboGrpcServiceInner { +impl TurboGrpcServiceInner +where + PD: PackageDiscovery + Send + Sync + 'static, +{ async fn trigger_shutdown(&self) { info!("triggering shutdown"); let _ = self.shutdown.send(()).await; } - async fn wait_for_filewatching(&self) -> Result, RpcError> { - let rx = self.watcher_rx.clone(); - wait_for_filewatching(rx, Duration::from_millis(100)).await - } - async fn watch_globs( &self, hash: String, @@ -318,8 +343,8 @@ impl TurboGrpcServiceInner { time_saved: u64, ) -> Result<(), RpcError> { let glob_set = GlobSet::from_raw(output_globs, output_glob_exclusions)?; - let fw = self.wait_for_filewatching().await?; - fw.glob_watcher + self.file_watching + .glob_watcher .watch_globs(hash.clone(), glob_set, REQUEST_TIMEOUT) .await?; { @@ -338,59 +363,40 @@ impl TurboGrpcServiceInner { let times_saved = self.times_saved.lock().expect("times saved lock poisoned"); times_saved.get(hash.as_str()).copied().unwrap_or_default() }; - let fw = self.wait_for_filewatching().await?; - let changed_globs = fw + let changed_globs = self + .file_watching .glob_watcher .get_changed_globs(hash, candidates, REQUEST_TIMEOUT) .await?; Ok((changed_globs, time_saved)) } - - async fn discover_packages(&self) -> Result { - let fw = self.wait_for_filewatching().await?; - Ok(DiscoveryResponse { - workspaces: fw.package_watcher.get_package_data().await, - package_manager: fw.package_watcher.get_package_manager().await, - }) - } -} - -async fn wait_for_filewatching( - mut rx: watch::Receiver>>, - timeout: Duration, -) -> Result, RpcError> { - let fw = tokio::time::timeout(timeout, rx.wait_for(|opt| opt.is_some())) - .await - .map_err(|_| RpcError::DeadlineExceeded)? // timeout case - .map_err(|_| RpcError::NoFileWatching)?; // sender dropped - - return Ok(fw.as_ref().expect("guaranteed some above").clone()); } async fn watch_root( - filewatching_access: watch::Receiver>>, + filewatching_access: FileWatching, root: AbsoluteSystemPathBuf, trigger_shutdown: mpsc::Sender<()>, mut exit_signal: oneshot::Receiver<()>, ) -> Result<(), WatchError> { - let mut recv_events = { - let Ok(fw) = wait_for_filewatching(filewatching_access, Duration::from_secs(5)).await - else { - return Ok(()); - }; + let mut recv_events = filewatching_access + .watcher + .subscribe() + .await + // we can only encounter an error here if the file watcher is closed (a recv error) + .map_err(|_| WatchError::Setup("file watching shut down".to_string()))?; - fw._watcher.subscribe() - }; + tracing::debug!("watching root: {:?}", root); loop { // Ignore the outer layer of Result, if the sender has closed, filewatching has // gone away and we can return. select! { - _ = &mut exit_signal => return Ok(()), + _ = &mut exit_signal => break, event = recv_events.recv() => { let Ok(event) = event else { - return Ok(()); + break; }; + tracing::debug!("root watcher received event: {:?}", event); let should_trigger_shutdown = match event { // filewatching can throw some weird events, so check that the root is actually gone // before triggering a shutdown @@ -403,15 +409,21 @@ async fn watch_root( // We don't care if a shutdown has already been triggered, // so we can ignore the error. let _ = trigger_shutdown.send(()).await; - return Ok(()); + break; } } } } + + tracing::debug!("no longer watching root"); + + Ok(()) } #[tonic::async_trait] -impl proto::turbod_server::Turbod for TurboGrpcServiceInner { +impl proto::turbod_server::Turbod + for TurboGrpcServiceInner +{ async fn hello( &self, request: tonic::Request, @@ -499,18 +511,30 @@ impl proto::turbod_server::Turbod for TurboGrpcServiceInner { &self, _request: tonic::Request, ) -> Result, tonic::Status> { - let resp = self.discover_packages().await?; - Ok(tonic::Response::new(proto::DiscoverPackagesResponse { - package_files: resp - .workspaces - .into_iter() - .map(|d| proto::PackageFiles { - package_json: d.package_json.to_string(), - turbo_json: d.turbo_json.map(|t| t.to_string()), + self.package_discovery + .discover_packages() + .await + .map(|packages| { + tonic::Response::new(proto::DiscoverPackagesResponse { + package_files: packages + .workspaces + .into_iter() + .map(|d| proto::PackageFiles { + package_json: d.package_json.to_string(), + turbo_json: d.turbo_json.map(|t| t.to_string()), + }) + .collect(), + package_manager: proto::PackageManager::from(packages.package_manager).into(), }) - .collect(), - package_manager: proto::PackageManager::from(resp.package_manager).into(), - })) + }) + .map_err(|e| match e { + turborepo_repository::discovery::Error::Unavailable => { + tonic::Status::unavailable("package discovery unavailable") + } + turborepo_repository::discovery::Error::Failed(e) => { + tonic::Status::internal(format!("{}", e)) + } + }) } } @@ -537,7 +561,7 @@ fn compare_versions(client: Version, server: Version, constraint: proto::Version } } -impl NamedService for TurboGrpcServiceInner { +impl NamedService for TurboGrpcServiceInner { const NAME: &'static str = "turborepo.Daemon"; } @@ -591,7 +615,7 @@ mod test { struct MockDiscovery; impl PackageDiscovery for MockDiscovery { async fn discover_packages( - &mut self, + &self, ) -> Result< turborepo_repository::discovery::DiscoveryResponse, turborepo_repository::discovery::Error, diff --git a/crates/turborepo-lib/src/engine/builder.rs b/crates/turborepo-lib/src/engine/builder.rs index 6cb708783e968..73f20353d8f67 100644 --- a/crates/turborepo-lib/src/engine/builder.rs +++ b/crates/turborepo-lib/src/engine/builder.rs @@ -473,7 +473,7 @@ mod test { struct MockDiscovery; impl PackageDiscovery for MockDiscovery { async fn discover_packages( - &mut self, + &self, ) -> Result< turborepo_repository::discovery::DiscoveryResponse, turborepo_repository::discovery::Error, diff --git a/crates/turborepo-lib/src/engine/mod.rs b/crates/turborepo-lib/src/engine/mod.rs index 0b5ceabc20f01..b5819373dab58 100644 --- a/crates/turborepo-lib/src/engine/mod.rs +++ b/crates/turborepo-lib/src/engine/mod.rs @@ -294,7 +294,7 @@ mod test { impl<'a> PackageDiscovery for DummyDiscovery<'a> { async fn discover_packages( - &mut self, + &self, ) -> Result< turborepo_repository::discovery::DiscoveryResponse, turborepo_repository::discovery::Error, diff --git a/crates/turborepo-lib/src/run/mod.rs b/crates/turborepo-lib/src/run/mod.rs index 23ba4f310720c..051bf6794e551 100644 --- a/crates/turborepo-lib/src/run/mod.rs +++ b/crates/turborepo-lib/src/run/mod.rs @@ -304,7 +304,7 @@ impl Run { ) }; let fallback_discovery = FallbackPackageDiscovery::new( - daemon.as_mut().map(DaemonPackageDiscovery::new), + daemon.clone().map(DaemonPackageDiscovery::new), fallback, duration, ); diff --git a/crates/turborepo-lib/src/run/package_discovery/mod.rs b/crates/turborepo-lib/src/run/package_discovery/mod.rs index cc9983edebbbd..2adbe0a59edf1 100644 --- a/crates/turborepo-lib/src/run/package_discovery/mod.rs +++ b/crates/turborepo-lib/src/run/package_discovery/mod.rs @@ -4,22 +4,24 @@ use turborepo_repository::discovery::{DiscoveryResponse, Error, PackageDiscovery use crate::daemon::{proto::PackageManager, DaemonClient}; #[derive(Debug)] -pub struct DaemonPackageDiscovery<'a, C: Clone> { - daemon: &'a mut DaemonClient, +pub struct DaemonPackageDiscovery { + daemon: DaemonClient, } -impl<'a, C: Clone> DaemonPackageDiscovery<'a, C> { - pub fn new(daemon: &'a mut DaemonClient) -> Self { +impl DaemonPackageDiscovery { + pub fn new(daemon: DaemonClient) -> Self { Self { daemon } } } -impl<'a, C: Clone + Send> PackageDiscovery for DaemonPackageDiscovery<'a, C> { - async fn discover_packages(&mut self) -> Result { +impl PackageDiscovery for DaemonPackageDiscovery { + async fn discover_packages(&self) -> Result { tracing::debug!("discovering packages using daemon"); - let response = self - .daemon + // clone here so we can make concurrent requests + let mut daemon = self.daemon.clone(); + + let response = daemon .discover_packages() .await .map_err(|e| Error::Failed(Box::new(e)))?; diff --git a/crates/turborepo-lib/src/run/scope/filter.rs b/crates/turborepo-lib/src/run/scope/filter.rs index d7777fb2b552e..3c31fe5cb335f 100644 --- a/crates/turborepo-lib/src/run/scope/filter.rs +++ b/crates/turborepo-lib/src/run/scope/filter.rs @@ -621,7 +621,7 @@ mod test { struct MockDiscovery; impl PackageDiscovery for MockDiscovery { async fn discover_packages( - &mut self, + &self, ) -> Result< turborepo_repository::discovery::DiscoveryResponse, turborepo_repository::discovery::Error, diff --git a/crates/turborepo-lsp/src/lib.rs b/crates/turborepo-lsp/src/lib.rs index a5aa77a0b7a89..bc54652b1173a 100644 --- a/crates/turborepo-lsp/src/lib.rs +++ b/crates/turborepo-lsp/src/lib.rs @@ -580,7 +580,7 @@ impl Backend { } pub async fn package_discovery(&self) -> Result { - let mut daemon = { + let daemon = { let mut daemon = self.daemon.clone(); let daemon = daemon.wait_for(|d| d.is_some()).await; let daemon = daemon.as_ref().expect("only fails if self is dropped"); @@ -590,7 +590,7 @@ impl Backend { .clone() }; - DaemonPackageDiscovery::new(&mut daemon) + DaemonPackageDiscovery::new(daemon) .discover_packages() .await } diff --git a/crates/turborepo-repository/Cargo.toml b/crates/turborepo-repository/Cargo.toml index a56c6bc801b6e..f9b43dbede73f 100644 --- a/crates/turborepo-repository/Cargo.toml +++ b/crates/turborepo-repository/Cargo.toml @@ -9,6 +9,7 @@ workspace = true [dependencies] anyhow = { workspace = true } +async-once-cell = "0.5.3" globwalk = { version = "0.1.0", path = "../turborepo-globwalk" } itertools = { workspace = true } lazy-regex = "2.5.0" diff --git a/crates/turborepo-repository/src/discovery.rs b/crates/turborepo-repository/src/discovery.rs index c5d60d7d638b9..3f4cb5ab43f76 100644 --- a/crates/turborepo-repository/src/discovery.rs +++ b/crates/turborepo-repository/src/discovery.rs @@ -9,6 +9,8 @@ //! these strategies will implement some sort of monad-style composition so that //! we can track areas of run that are performing sub-optimally. +use std::sync::Arc; + use tokio_stream::{iter, StreamExt}; use turbopath::AbsoluteSystemPathBuf; @@ -41,7 +43,7 @@ pub enum Error { pub trait PackageDiscovery { // desugar to assert that the future is Send fn discover_packages( - &mut self, + &self, ) -> impl std::future::Future> + Send; } @@ -57,8 +59,8 @@ pub trait PackageDiscoveryBuilder { fn build(self) -> Result; } -impl PackageDiscovery for Option { - async fn discover_packages(&mut self) -> Result { +impl PackageDiscovery for Option { + async fn discover_packages(&self) -> Result { tracing::debug!("discovering packages using optional strategy"); match self { @@ -71,6 +73,12 @@ impl PackageDiscovery for Option { } } +impl PackageDiscovery for Arc { + async fn discover_packages(&self) -> Result { + self.as_ref().discover_packages().await + } +} + pub struct LocalPackageDiscovery { repo_root: AbsoluteSystemPathBuf, package_manager: PackageManager, @@ -125,7 +133,7 @@ impl PackageDiscoveryBuilder for LocalPackageDiscoveryBuilder { } impl PackageDiscovery for LocalPackageDiscovery { - async fn discover_packages(&mut self) -> Result { + async fn discover_packages(&self) -> Result { tracing::debug!("discovering packages using local strategy"); let package_paths = match self.package_manager.get_package_jsons(&self.repo_root) { @@ -192,10 +200,10 @@ impl PackageDiscoveryBuilder for T { } } -impl PackageDiscovery +impl PackageDiscovery for FallbackPackageDiscovery { - async fn discover_packages(&mut self) -> Result { + async fn discover_packages(&self) -> Result { tracing::debug!("discovering packages using fallback strategy"); tracing::debug!("attempting primary strategy"); @@ -220,36 +228,37 @@ impl PackageDiscovery pub struct CachingPackageDiscovery { primary: P, - data: Option, + data: async_once_cell::OnceCell, } impl CachingPackageDiscovery

{ pub fn new(primary: P) -> Self { Self { primary, - data: None, + data: Default::default(), } } } -impl PackageDiscovery for CachingPackageDiscovery

{ - async fn discover_packages(&mut self) -> Result { +impl PackageDiscovery for CachingPackageDiscovery

{ + async fn discover_packages(&self) -> Result { tracing::debug!("discovering packages using caching strategy"); - match self.data.clone() { - Some(data) => Ok(data), - None => { - tracing::debug!("no cached data, running primary strategy"); - let data = self.primary.discover_packages().await?; - self.data = Some(data.clone()); - Ok(data) - } - } + self.data + .get_or_try_init(async { + tracing::debug!("discovering packages using primary strategy"); + self.primary.discover_packages().await + }) + .await + .map(ToOwned::to_owned) } } #[cfg(test)] mod fallback_tests { - use std::time::Duration; + use std::{ + sync::atomic::{AtomicUsize, Ordering}, + time::Duration, + }; use tokio::runtime::Runtime; @@ -257,20 +266,20 @@ mod fallback_tests { struct MockDiscovery { should_fail: bool, - calls: usize, + calls: AtomicUsize, } impl MockDiscovery { fn new(should_fail: bool) -> Self { Self { should_fail, - calls: 0, + calls: Default::default(), } } } impl PackageDiscovery for MockDiscovery { - async fn discover_packages(&mut self) -> Result { + async fn discover_packages(&self) -> Result { if self.should_fail { Err(Error::Failed(Box::new(std::io::Error::new( std::io::ErrorKind::Other, @@ -278,7 +287,7 @@ mod fallback_tests { )))) } else { tokio::time::sleep(Duration::from_millis(100)).await; - self.calls += 1; + self.calls.fetch_add(1, Ordering::SeqCst); // Simulate successful package discovery Ok(DiscoveryResponse { package_manager: PackageManager::Npm, @@ -305,8 +314,8 @@ mod fallback_tests { assert!(result.is_ok()); // Assert that the fallback was used - assert_eq!(discovery.primary.calls, 0); - assert_eq!(discovery.fallback.calls, 1); + assert_eq!(*discovery.primary.calls.get_mut(), 0); + assert_eq!(*discovery.fallback.calls.get_mut(), 1); }); } @@ -327,25 +336,27 @@ mod fallback_tests { assert!(result.is_ok()); // Assert that the fallback was used - assert_eq!(discovery.primary.calls, 0); - assert_eq!(discovery.fallback.calls, 1); + assert_eq!(*discovery.primary.calls.get_mut(), 0); + assert_eq!(*discovery.fallback.calls.get_mut(), 1); }); } } #[cfg(test)] mod caching_tests { + use std::sync::atomic::{AtomicUsize, Ordering}; + use tokio::runtime::Runtime; use super::*; struct MockPackageDiscovery { - call_count: usize, + call_count: AtomicUsize, } impl PackageDiscovery for MockPackageDiscovery { - async fn discover_packages(&mut self) -> Result { - self.call_count += 1; + async fn discover_packages(&self) -> Result { + self.call_count.fetch_add(1, Ordering::SeqCst); // Simulate successful package discovery Ok(DiscoveryResponse { package_manager: PackageManager::Npm, @@ -358,16 +369,18 @@ mod caching_tests { fn test_caching_package_discovery() { let rt = Runtime::new().unwrap(); rt.block_on(async { - let primary = MockPackageDiscovery { call_count: 0 }; + let primary = MockPackageDiscovery { + call_count: Default::default(), + }; let mut discovery = CachingPackageDiscovery::new(primary); // First call should use primary discovery let _first_result = discovery.discover_packages().await.unwrap(); - assert_eq!(discovery.primary.call_count, 1); + assert_eq!(*discovery.primary.call_count.get_mut(), 1); // Second call should use cached data and not increase call count let _second_result = discovery.discover_packages().await.unwrap(); - assert_eq!(discovery.primary.call_count, 1); + assert_eq!(*discovery.primary.call_count.get_mut(), 1); }); } } diff --git a/crates/turborepo-repository/src/package_graph/builder.rs b/crates/turborepo-repository/src/package_graph/builder.rs index 6a81fe48d5f85..fe76bdda8048a 100644 --- a/crates/turborepo-repository/src/package_graph/builder.rs +++ b/crates/turborepo-repository/src/package_graph/builder.rs @@ -120,7 +120,7 @@ impl<'a, P> PackageGraphBuilder<'a, P> { impl<'a, T> PackageGraphBuilder<'a, T> where T: PackageDiscoveryBuilder, - T::Output: Send, + T::Output: Send + Sync, T::Error: Into, { /// Build the `PackageGraph`. @@ -324,7 +324,7 @@ impl<'a, T: PackageDiscovery> BuildState<'a, ResolvedPackageManager, T> { workspace_graph, node_lookup, lockfile, - mut package_discovery, + package_discovery, .. } = self; @@ -766,7 +766,7 @@ mod test { struct MockDiscovery; impl PackageDiscovery for MockDiscovery { async fn discover_packages( - &mut self, + &self, ) -> Result { Ok(crate::discovery::DiscoveryResponse { package_manager: crate::package_manager::PackageManager::Npm, diff --git a/crates/turborepo-repository/src/package_graph/mod.rs b/crates/turborepo-repository/src/package_graph/mod.rs index bdfb30aff72ab..db2169e48dab4 100644 --- a/crates/turborepo-repository/src/package_graph/mod.rs +++ b/crates/turborepo-repository/src/package_graph/mod.rs @@ -455,7 +455,7 @@ mod test { struct MockDiscovery; impl PackageDiscovery for MockDiscovery { async fn discover_packages( - &mut self, + &self, ) -> Result { Ok(crate::discovery::DiscoveryResponse { package_manager: PackageManager::Npm, diff --git a/crates/turborepo-repository/src/package_json.rs b/crates/turborepo-repository/src/package_json.rs index d71b8e51af8f1..6e4e92476ed30 100644 --- a/crates/turborepo-repository/src/package_json.rs +++ b/crates/turborepo-repository/src/package_json.rs @@ -55,6 +55,7 @@ pub enum Error { impl PackageJson { pub fn load(path: &AbsoluteSystemPath) -> Result { + tracing::debug!("loading package.json from {}", path); let contents = path.read_to_string()?; Self::from_str(&contents) }