diff --git a/crates/turborepo-filewatch/src/package_watcher.rs b/crates/turborepo-filewatch/src/package_watcher.rs index 7d0596fd9279a..5096fc68d3855 100644 --- a/crates/turborepo-filewatch/src/package_watcher.rs +++ b/crates/turborepo-filewatch/src/package_watcher.rs @@ -1,7 +1,7 @@ //! This module hosts the `PackageWatcher` type, which is used to watch the //! filesystem for changes to packages. -use std::{collections::HashMap, path::Path, sync::Arc}; +use std::{collections::HashMap, path::Path}; use futures::FutureExt; use notify::Event; @@ -16,92 +16,39 @@ use tokio::{ use turbopath::{AbsoluteSystemPath, AbsoluteSystemPathBuf}; use turborepo_repository::{ discovery::{ - self, DiscoveryResponse, LocalPackageDiscoveryBuilder, PackageDiscovery, - PackageDiscoveryBuilder, WorkspaceData, + DiscoveryResponse, LocalPackageDiscoveryBuilder, PackageDiscovery, PackageDiscoveryBuilder, + WorkspaceData, }, package_manager::{self, PackageManager, WorkspaceGlobs}, }; use crate::{ - cookies::{CookieError, CookieRegister, CookieWriter, CookiedOptionalWatch}, + cookies::{CookieRegister, CookieWriter, CookiedOptionalWatch}, optional_watch::OptionalWatch, NotifyError, }; -/// A package discovery strategy that watches the file system for changes. Basic -/// idea: -/// - Set up a watcher on file changes on the relevant workspace file for the -/// package manager -/// - When the workspace globs change, re-discover the workspace -/// - When a package.json changes, re-discover the workspace -/// - Keep an in-memory cache of the workspace -pub struct WatchingPackageDiscovery { - /// file watching may not be ready yet so we store a watcher - /// through which we can get the file watching stack - watcher: Arc, -} - -impl WatchingPackageDiscovery { - pub fn new(watcher: Arc) -> Self { - Self { watcher } - } -} - -impl PackageDiscovery for WatchingPackageDiscovery { - async fn discover_packages(&self) -> Result { - tracing::debug!("discovering packages using watcher implementation"); - - // this can either not have a value ready, or the sender has been dropped. in - // either case rust report that the value is unavailable - let package_manager = self - .watcher - .get_package_manager() - .await - .and_then(Result::ok) - .ok_or(discovery::Error::Unavailable)?; - let workspaces = self - .watcher - .get_package_data() - .await - .and_then(Result::ok) - .ok_or(discovery::Error::Unavailable)?; - - Ok(DiscoveryResponse { - workspaces, - package_manager, - }) - } - - // if the event that either of the dependencies will never resolve, - // this will still return unavailable - async fn discover_packages_blocking(&self) -> Result { - let package_manager = self - .watcher - .wait_for_package_manager() - .await - .map_err(|_| discovery::Error::Unavailable)?; - - let workspaces = self - .watcher - .wait_for_package_data() - .await - .map_err(|_| discovery::Error::Unavailable)?; - - Ok(DiscoveryResponse { - workspaces, - package_manager, - }) - } -} - #[derive(Debug, Error)] -enum PackageWatcherError { +enum PackageWatcherProcessError { #[error("filewatching not available, so package watching is not available")] Filewatching(watch::error::RecvError), #[error("filewatching closed, package watching no longer available")] FilewatchingClosed(broadcast::error::RecvError), } +#[derive(Debug, Error)] +pub enum PackageWatchError { + #[error("package layout is in an invalid state {0}")] + InvalidState(String), + #[error("package layout is not available")] + Unavailable, +} + +// If we're in an invalid state, this will be an Err with a description of the +// reason. Typically we don't care though, as the user could be in the middle of +// making a change. +type DiscoveryData = Result; + /// Watches the filesystem for changes to packages and package managers. pub struct PackageWatcher { // _exit_ch exists to trigger a close on the receiver when an instance @@ -110,12 +57,7 @@ pub struct PackageWatcher { // to be notified of a close. _exit_tx: oneshot::Sender<()>, _handle: tokio::task::JoinHandle<()>, - - /// The current package data, if available. - package_data: CookiedOptionalWatch, ()>, - - /// The current package manager, if available. - package_manager_lazy: CookiedOptionalWatch, + package_discovery_lazy: CookiedOptionalWatch, } impl PackageWatcher { @@ -129,61 +71,41 @@ impl PackageWatcher { ) -> Result { let (exit_tx, exit_rx) = oneshot::channel(); let subscriber = Subscriber::new(root, cookie_writer)?; - let package_manager_lazy = subscriber.manager_receiver(); - let package_data = subscriber.package_data(); + let package_discovery_lazy = subscriber.package_discovery(); let handle = tokio::spawn(subscriber.watch(exit_rx, recv)); Ok(Self { _exit_tx: exit_tx, _handle: handle, - package_data, - package_manager_lazy, + package_discovery_lazy, }) } - /// Get the package data. If the package data is not available, this will - /// block until it is. - pub async fn wait_for_package_data(&self) -> Result, CookieError> { - let mut recv = self.package_data.clone(); - recv.get().await.map(|v| v.values().cloned().collect()) - } - - /// A convenience wrapper around `FutureExt::now_or_never` to let you get - /// the package data if it is immediately available. - pub async fn get_package_data( - &self, - ) -> Option, watch::error::RecvError>> { - let mut recv = self.package_data.clone(); - let data = if let Some(Ok(inner)) = recv.get_immediate().await { - Some(Ok(inner.values().cloned().collect())) - } else { - None - }; - data - } - - /// Get the package manager. If the package manager is not available, this - /// will block until it is. - pub async fn wait_for_package_manager(&self) -> Result { - let mut recv = self.package_manager_lazy.clone(); - recv.get().await.map(|s| s.to_owned()) - } + pub async fn discover_packages(&self) -> Option> { + tracing::debug!("discovering packages using watcher implementation"); - /// A convenience wrapper around `FutureExt::now_or_never` to let you get - /// the package manager if it is immediately available. - pub async fn get_package_manager(&self) -> Option> { - let mut recv = self.package_manager_lazy.clone(); - // the borrow checker doesn't like returning immediately here so assign to a var - #[allow(clippy::let_and_return)] - let data = if let Some(Ok(inner)) = recv.get_immediate().await { - Some(Ok(*inner)) - } else { - None - }; - data + // this can either not have a value ready, or the sender has been dropped. in + // either case just report that the value is unavailable + let mut recv = self.package_discovery_lazy.clone(); + recv.get_immediate().await.map(|resp| { + resp.map_err(|_| PackageWatchError::Unavailable) + .and_then(|resp| match resp.to_owned() { + Ok(resp) => Ok(resp), + Err(error_reason) => Err(PackageWatchError::InvalidState(error_reason)), + }) + }) } - pub fn watch(&self) -> CookiedOptionalWatch, ()> { - self.package_data.clone() + // if the event that either of the dependencies will never resolve, + // this will still return unavailable + pub async fn discover_packages_blocking(&self) -> Result { + let mut recv = self.package_discovery_lazy.clone(); + recv.get() + .await + .map_err(|_| PackageWatchError::Unavailable) + .and_then(|resp| match resp.to_owned() { + Ok(resp) => Ok(resp), + Err(error_reason) => Err(PackageWatchError::InvalidState(error_reason)), + }) } } @@ -194,11 +116,8 @@ struct Subscriber { // This is the list of paths that will trigger rediscovering everything. invalidation_paths: Vec, - // package manager data - package_manager_tx: watch::Sender>, - package_manager_lazy: CookiedOptionalWatch, - package_data_tx: watch::Sender>>, - package_data_lazy: CookiedOptionalWatch, ()>, + package_discovery_tx: watch::Sender>, + package_discovery_lazy: CookiedOptionalWatch, cookie_tx: CookieRegister, } @@ -207,8 +126,8 @@ struct Subscriber { /// and some maybe-empty set of workspaces. #[derive(Debug)] enum State { - NoPackageManager, - InvalidGlobs(PackageManager), + NoPackageManager(Box), + InvalidGlobs(Box), ValidWorkspaces { package_manager: PackageManager, filter: WorkspaceGlobs, @@ -238,9 +157,8 @@ impl Subscriber { repo_root: AbsoluteSystemPathBuf, writer: CookieWriter, ) -> Result { - let (package_data_tx, cookie_tx, package_data_lazy) = CookiedOptionalWatch::new(writer); - let (package_manager_tx, package_manager_lazy) = package_data_lazy.new_sibling(); - + let (package_discovery_tx, cookie_tx, package_discovery_lazy) = + CookiedOptionalWatch::new(writer); let invalidation_paths = INVALIDATION_PATHS .iter() .map(|p| repo_root.join_component(p)) @@ -248,10 +166,8 @@ impl Subscriber { Ok(Self { repo_root, invalidation_paths, - package_data_lazy, - package_data_tx, - package_manager_lazy, - package_manager_tx, + package_discovery_tx, + package_discovery_lazy, cookie_tx, }) } @@ -259,18 +175,18 @@ impl Subscriber { async fn watch_process( mut self, mut recv: OptionalWatch>>, - ) -> PackageWatcherError { + ) -> PackageWatcherProcessError { tracing::debug!("starting package watcher"); let mut recv = match recv.get().await { Ok(r) => r.resubscribe(), - Err(e) => return PackageWatcherError::Filewatching(e), + Err(e) => return PackageWatcherProcessError::Filewatching(e), }; // state represents our current understanding of the underlying filesystem, and // is expected to be mutated in place by handle_file_event. Both // rediscover_everything and handle_file_event are responsible for // broadcasting updates to state. - let mut state = self.rediscover_everything().await; + let mut state = self.rediscover_and_write_state().await; tracing::debug!("package watcher ready {:?}", state); loop { @@ -278,12 +194,14 @@ impl Subscriber { match file_event { Ok(Ok(event)) => self.handle_file_event(&mut state, &event).await, // if we get an error, we need to re-discover the packages - Ok(Err(_)) => state = self.rediscover_everything().await, - Err(e @ RecvError::Closed) => return PackageWatcherError::FilewatchingClosed(e), + Ok(Err(_)) => state = self.rediscover_and_write_state().await, + Err(e @ RecvError::Closed) => { + return PackageWatcherProcessError::FilewatchingClosed(e) + } // if we end up lagging, warn and rediscover packages Err(RecvError::Lagged(count)) => { tracing::warn!("lagged behind {count} processing file watching events"); - state = self.rediscover_everything().await; + state = self.rediscover_and_write_state().await; } } tracing::debug!("package watcher state: {:?}", state); @@ -311,14 +229,8 @@ impl Subscriber { } } - pub fn manager_receiver(&self) -> CookiedOptionalWatch { - self.package_manager_lazy.clone() - } - - pub fn package_data( - &self, - ) -> CookiedOptionalWatch, ()> { - self.package_data_lazy.clone() + fn package_discovery(&self) -> CookiedOptionalWatch { + self.package_discovery_lazy.clone() } fn path_invalidates_everything(&self, path: &Path) -> bool { @@ -327,8 +239,6 @@ impl Subscriber { .any(|invalidation_path| path.eq(invalidation_path as &AbsoluteSystemPath)) } - /// Returns Err(()) if the package manager channel is closed, indicating - /// that the entire watching task should exit. async fn handle_file_event(&mut self, state: &mut State, file_event: &Event) { tracing::trace!("file event: {:?} {:?}", file_event.kind, file_event.paths); @@ -338,7 +248,7 @@ impl Subscriber { .any(|path| self.path_invalidates_everything(path)) { // root package.json changed, rediscover everything - *state = self.rediscover_everything().await; + *state = self.rediscover_and_write_state().await; } else { tracing::trace!("handling non-root package.json change"); self.handle_workspace_changes(state, file_event).await; @@ -433,14 +343,14 @@ impl Subscriber { } if changed { - self.write_workspace_state(workspaces); + self.write_state(state); } } - fn reset_package_manager(&self) { - self.package_manager_tx.send_if_modified(|package_manager| { - if package_manager.is_some() { - *package_manager = None; + fn reset_discovery_data(&self) { + self.package_discovery_tx.send_if_modified(|existing| { + if existing.is_some() { + *existing = None; true } else { false @@ -448,34 +358,28 @@ impl Subscriber { }); } - fn reset_workspaces(&self) { - self.package_data_tx.send_if_modified(|package_data| { - if package_data.is_some() { - *package_data = None; - true - } else { - false - } - }); + async fn rediscover_and_write_state(&mut self) -> State { + // If we're rediscovering the package manager, clear all data + self.reset_discovery_data(); + let state = self.rediscover().await; + self.write_state(&state); + state } - async fn rediscover_everything(&mut self) -> State { - // If we're rediscovering the package manager, clear all data - self.reset_package_manager(); - self.reset_workspaces(); + async fn rediscover(&self) -> State { // If we're rediscovering everything, we need to rediscover the package manager. // It may have changed if a lockfile changed or package.json changed. let discovery = match LocalPackageDiscoveryBuilder::new(self.repo_root.clone(), None, None).build() { Ok(discovery) => discovery, - Err(_) => return State::NoPackageManager, + Err(e) => return State::NoPackageManager(Box::new(e)), }; let initial_discovery = match discovery.discover_packages().await { Ok(discovery) => discovery, // If we failed the discovery, that's fine, we've reset the values, leave them as None Err(e) => { tracing::debug!("failed to rediscover packages: {}", e); - return State::NoPackageManager; + return State::NoPackageManager(Box::new(e)); } }; @@ -488,7 +392,7 @@ impl Subscriber { Err(e) => { // If the globs are invalid, leave everything set to None tracing::debug!("failed to get workspace globs: {}", e); - return State::InvalidGlobs(initial_discovery.package_manager); + return State::InvalidGlobs(Box::new(e)); } }; @@ -497,54 +401,42 @@ impl Subscriber { .into_iter() .map(|p| (p.package_json.parent().expect("non-root").to_owned(), p)) .collect::>(); - let state = State::ValidWorkspaces { + State::ValidWorkspaces { package_manager: initial_discovery.package_manager, filter, workspaces, - }; - self.write_state(&state); - state + } } fn write_state(&self, state: &State) { match state { - State::NoPackageManager => {} /* no state to write, we've already reset when */ - // invalidating - State::InvalidGlobs(package_manager) => self.write_package_manager(package_manager), + State::NoPackageManager(e) | State::InvalidGlobs(e) => { + self.package_discovery_tx.send_if_modified(|existing| { + let error_msg = e.to_string(); + match existing { + Some(Err(existing_error)) if *existing_error == error_msg => false, + Some(_) | None => { + *existing = Some(Err(error_msg)); + true + } + } + }); + } State::ValidWorkspaces { package_manager, workspaces, .. } => { - self.write_package_manager(package_manager); - self.write_workspace_state(workspaces); + let resp = DiscoveryResponse { + package_manager: *package_manager, + workspaces: workspaces.values().cloned().collect(), + }; + // Note that we could implement PartialEq for DiscoveryResponse, but we + // would need to sort the workspace data. + let _ = self.package_discovery_tx.send(Some(Ok(resp))); } } } - - fn write_package_manager(&self, package_manager: &PackageManager) { - let _ = self - .package_manager_tx - .send_if_modified(|existing| match existing { - Some(existing) if existing == package_manager => false, - Some(_) | None => { - *existing = Some(*package_manager); - true - } - }); - } - - fn write_workspace_state(&self, workspaces: &HashMap) { - let _ = self - .package_data_tx - .send_if_modified(|existing| match existing { - Some(existing) if existing == workspaces => false, - Some(_) | None => { - *existing = Some(workspaces.clone()); - true - } - }); - } } #[cfg(test)] @@ -605,18 +497,16 @@ mod test { let package_watcher = PackageWatcher::new(repo_root.clone(), recv, cookie_writer).unwrap(); - let mut data = tokio::time::timeout( - Duration::from_millis(100), - package_watcher.get_package_data(), - ) - .await - .unwrap() // timeout - .unwrap() // Option data exists - .unwrap(); // Result around package data - - data.sort_by_key(|workspace| workspace.package_json.clone()); + let mut data = package_watcher + .discover_packages() + .await + .unwrap() // Option data exists + .unwrap(); // Result around package data + + data.workspaces + .sort_by_key(|workspace| workspace.package_json.clone()); assert_eq!( - data, + data.workspaces, vec![ WorkspaceData { package_json: repo_root.join_components(&["packages", "bar", "package.json",]), @@ -637,17 +527,15 @@ mod test { .remove_file() .unwrap(); - let mut data = tokio::time::timeout( - Duration::from_millis(50), - package_watcher.get_package_data(), - ) - .await - .unwrap() // timeout - .unwrap() // Option data exists - .unwrap(); // Result around package data - data.sort_by_key(|workspace| workspace.package_json.clone()); + let mut data = package_watcher + .discover_packages() + .await + .unwrap() // Option data exists + .unwrap(); // Result around package data + data.workspaces + .sort_by_key(|workspace| workspace.package_json.clone()); assert_eq!( - data, + data.workspaces, vec![WorkspaceData { package_json: repo_root.join_components(&["packages", "bar", "package.json"]), turbo_json: None, @@ -660,16 +548,14 @@ mod test { .rename(&repo_root.join_component("bar")) .unwrap(); - let mut data = tokio::time::timeout( - Duration::from_millis(50), - package_watcher.get_package_data(), - ) - .await - .unwrap() // timeout - .unwrap() // Option data exists - .unwrap(); // Result around package data - data.sort_by_key(|workspace| workspace.package_json.clone()); - assert_eq!(data, vec![]); + let mut data = package_watcher + .discover_packages() + .await + .unwrap() // Option data exists + .unwrap(); // Result around package data + data.workspaces + .sort_by_key(|workspace| workspace.package_json.clone()); + assert_eq!(data.workspaces, vec![]); } #[tokio::test] @@ -727,19 +613,17 @@ mod test { let package_watcher = PackageWatcher::new(repo_root.clone(), recv, cookie_writer).unwrap(); - let mut data = tokio::time::timeout( - Duration::from_millis(100), - package_watcher.get_package_data(), - ) - .await - .unwrap() // timeout - .unwrap() // Option data exists - .unwrap(); // Result around package data + let mut data = package_watcher + .discover_packages() + .await + .unwrap() // Option data exists + .unwrap(); // Result around package data - data.sort_by_key(|workspace| workspace.package_json.clone()); + data.workspaces + .sort_by_key(|workspace| workspace.package_json.clone()); assert_eq!( - data, + data.workspaces, vec![ WorkspaceData { package_json: repo_root @@ -764,19 +648,17 @@ mod test { .create_with_contents(r#"{"workspaces":["packages/*"]}"#) .unwrap(); - let mut data = tokio::time::timeout( - Duration::from_millis(100), - package_watcher.get_package_data(), - ) - .await - .unwrap() // timeout - .unwrap() // Option data exists - .unwrap(); // Result around package data + let mut data = package_watcher + .discover_packages() + .await + .unwrap() // Option data exists + .unwrap(); // Result around package data - data.sort_by_key(|workspace| workspace.package_json.clone()); + data.workspaces + .sort_by_key(|workspace| workspace.package_json.clone()); assert_eq!( - data, + data.workspaces, vec![WorkspaceData { package_json: repo_root .join_component("packages") @@ -791,18 +673,16 @@ mod test { .join_components(&["packages2", "bar"]) .rename(&repo_root.join_components(&["packages", "bar"])) .unwrap(); - let mut data = tokio::time::timeout( - Duration::from_millis(100), - package_watcher.get_package_data(), - ) - .await - .unwrap() // timeout - .unwrap() // Option data exists - .unwrap(); // Result around package data - - data.sort_by_key(|workspace| workspace.package_json.clone()); + let mut data = package_watcher + .discover_packages() + .await + .unwrap() // Option data exists + .unwrap(); // Result around package data + + data.workspaces + .sort_by_key(|workspace| workspace.package_json.clone()); assert_eq!( - data, + data.workspaces, vec![ WorkspaceData { package_json: repo_root @@ -853,64 +733,47 @@ mod test { let package_watcher = PackageWatcher::new(repo_root.clone(), recv, cookie_writer).unwrap(); - // TODO: change this to expect either an empty result, or a package manager - // without globs - tokio::time::timeout( - Duration::from_millis(50), - package_watcher.wait_for_package_manager(), - ) - .await - .unwrap_err(); + let packages = package_watcher.discover_packages().await.unwrap(); // option around result + assert!(packages.is_err()); workspaces_path .create_with_contents(r#"packages: ["foo/*"]"#) .unwrap(); - let package_manager = tokio::time::timeout( - Duration::from_millis(200), - package_watcher.wait_for_package_manager(), - ) - .await - .unwrap() - .unwrap(); - assert_eq!(package_manager, PackageManager::Pnpm); + let resp = package_watcher + .discover_packages() + .await + .unwrap() // option around result + .unwrap(); // result around discovery response + assert_eq!(resp.package_manager, PackageManager::Pnpm); - // Remove workspaces file, verify we get a timeout + // Remove workspaces file, verify we get an error workspaces_path.remove_file().unwrap(); - // TODO: this should eventually be an empty result - tokio::time::timeout( - Duration::from_millis(50), - package_watcher.wait_for_package_manager(), - ) - .await - .unwrap_err(); + package_watcher + .discover_packages() + .await + .unwrap() + .unwrap_err(); - // Create an invalid workspace glob + // // Create an invalid workspace glob workspaces_path .create_with_contents(r#"packages: ["foo/***"]"#) .unwrap(); - // TODO: this should eventually be an empty result - tokio::time::timeout( - Duration::from_millis(50), - package_watcher.wait_for_package_manager(), - ) - .await - .unwrap_err(); + // we should still get an error since we don't have a valid glob + package_watcher + .discover_packages() + .await + .unwrap() + .unwrap_err(); // Set it back to valid, ensure we recover workspaces_path .create_with_contents(r#"packages: ["foo/*"]"#) .unwrap(); - let package_manager = tokio::time::timeout( - Duration::from_millis(200), - package_watcher.wait_for_package_manager(), - ) - .await - .unwrap() - .unwrap(); - assert_eq!(package_manager, PackageManager::Pnpm); + let resp = package_watcher.discover_packages().await.unwrap().unwrap(); + assert_eq!(resp.package_manager, PackageManager::Pnpm); } #[tokio::test] @@ -942,64 +805,43 @@ mod test { ); let package_watcher = PackageWatcher::new(repo_root.clone(), recv, cookie_writer).unwrap(); - - // TODO: change this to expect either an empty result, or a package manager - // without globs - tokio::time::timeout( - Duration::from_millis(50), - package_watcher.wait_for_package_manager(), - ) - .await - .unwrap_err(); + // expect an error, we don't have a workspaces glob + package_watcher + .discover_packages_blocking() + .await + .unwrap_err(); root_package_json_path .create_with_contents(r#"{"packageManager": "pnpm@7.0", "workspaces": ["foo/*"]}"#) .unwrap(); - let package_manager = tokio::time::timeout( - Duration::from_millis(200), - package_watcher.wait_for_package_manager(), - ) - .await - .unwrap() - .unwrap(); - assert_eq!(package_manager, PackageManager::Npm); + let resp = package_watcher.discover_packages_blocking().await.unwrap(); + assert_eq!(resp.package_manager, PackageManager::Npm); - // Remove workspaces file, verify we get a timeout + // Remove workspaces file, verify we get an error root_package_json_path.remove_file().unwrap(); - // TODO: this should eventually be an empty result - tokio::time::timeout( - Duration::from_millis(50), - package_watcher.wait_for_package_manager(), - ) - .await - .unwrap_err(); + package_watcher + .discover_packages_blocking() + .await + .unwrap_err(); // Create an invalid workspace glob root_package_json_path .create_with_contents(r#"{"packageManager": "pnpm@7.0", "workspaces": ["foo/***"]}"#) .unwrap(); - // TODO: this should eventually be an empty result - tokio::time::timeout( - Duration::from_millis(50), - package_watcher.wait_for_package_manager(), - ) - .await - .unwrap_err(); + // We expect an error due to invalid workspace glob + package_watcher + .discover_packages_blocking() + .await + .unwrap_err(); // Set it back to valid, ensure we recover root_package_json_path .create_with_contents(r#"{"packageManager": "pnpm@7.0", "workspaces": ["foo/*"]}"#) .unwrap(); - let package_manager = tokio::time::timeout( - Duration::from_millis(200), - package_watcher.wait_for_package_manager(), - ) - .await - .unwrap() - .unwrap(); - assert_eq!(package_manager, PackageManager::Npm); + let resp = package_watcher.discover_packages_blocking().await.unwrap(); + assert_eq!(resp.package_manager, PackageManager::Npm); } #[tokio::test] @@ -1034,45 +876,29 @@ mod test { let package_watcher = PackageWatcher::new(repo_root.clone(), recv, cookie_writer).unwrap(); - let package_manager = tokio::time::timeout( - Duration::from_millis(200), - package_watcher.wait_for_package_manager(), - ) - .await - .unwrap() - .unwrap(); - assert_eq!(package_manager, PackageManager::Pnpm); + let resp = package_watcher.discover_packages_blocking().await.unwrap(); + assert_eq!(resp.package_manager, PackageManager::Pnpm); pnpm_lock_file.remove_file().unwrap(); // No more lock file, verify we're in an invalid state - tokio::time::timeout( - Duration::from_millis(50), - package_watcher.wait_for_package_manager(), - ) - .await - .unwrap_err(); + package_watcher + .discover_packages_blocking() + .await + .unwrap_err(); let npm_lock_file = repo_root.join_component("package-lock.json"); npm_lock_file.create_with_contents("").unwrap(); // now we have an npm lockfile, but we don't have workspaces. Still invalid - tokio::time::timeout( - Duration::from_millis(50), - package_watcher.wait_for_package_manager(), - ) - .await - .unwrap_err(); + package_watcher + .discover_packages_blocking() + .await + .unwrap_err(); // update package.json to complete the transition root_package_json_path .create_with_contents(r#"{"packageManager": "npm@7.0", "workspaces": ["foo/*"]}"#) .unwrap(); - let package_manager = tokio::time::timeout( - Duration::from_millis(200), - package_watcher.wait_for_package_manager(), - ) - .await - .unwrap() - .unwrap(); - assert_eq!(package_manager, PackageManager::Npm); + let resp = package_watcher.discover_packages_blocking().await.unwrap(); + assert_eq!(resp.package_manager, PackageManager::Npm); } } diff --git a/crates/turborepo-lib/src/daemon/server.rs b/crates/turborepo-lib/src/daemon/server.rs index ce1b5135ed13f..87c8bcd77a335 100644 --- a/crates/turborepo-lib/src/daemon/server.rs +++ b/crates/turborepo-lib/src/daemon/server.rs @@ -28,10 +28,10 @@ use turbopath::{AbsoluteSystemPath, AbsoluteSystemPathBuf}; use turborepo_filewatch::{ cookies::CookieWriter, globwatcher::{Error as GlobWatcherError, GlobError, GlobSet, GlobWatcher}, - package_watcher::{PackageWatcher, WatchingPackageDiscovery}, + package_watcher::{PackageWatchError, PackageWatcher}, FileSystemWatcher, WatchError, }; -use turborepo_repository::{discovery::PackageDiscovery, package_manager}; +use turborepo_repository::package_manager; use super::{bump_timeout::BumpTimeout, endpoint::SocketOpenError, proto}; use crate::daemon::{ @@ -242,7 +242,7 @@ struct TurboGrpcServiceInner { times_saved: Arc>>, start_time: Instant, log_file: AbsoluteSystemPathBuf, - package_discovery: Arc, + package_watcher: Arc, } // we have a grpc service that uses watching package discovery, and where the @@ -261,9 +261,8 @@ impl TurboGrpcServiceInner { let file_watching = FileWatching::new(repo_root.clone()).unwrap(); tracing::debug!("initing package discovery"); - let package_discovery = Arc::new(WatchingPackageDiscovery::new( - file_watching.package_watcher.clone(), - )); + // Note that we're cloning the Arc, not the package watcher itself + let package_watcher = Arc::clone(&file_watching.package_watcher); // exit_root_watch delivers a signal to the root watch loop to exit. // In the event that the server shuts down via some other mechanism, this @@ -278,7 +277,7 @@ impl TurboGrpcServiceInner { ( TurboGrpcServiceInner { - package_discovery, + package_watcher, shutdown: trigger_shutdown, file_watching, times_saved: Arc::new(Mutex::new(HashMap::new())), @@ -469,60 +468,50 @@ impl proto::turbod_server::Turbod for TurboGrpcServiceInner { &self, _request: tonic::Request, ) -> Result, tonic::Status> { - self.package_discovery - .discover_packages() - .await - .map(|packages| { - tonic::Response::new(proto::DiscoverPackagesResponse { - package_files: packages - .workspaces - .into_iter() - .map(|d| proto::PackageFiles { - package_json: d.package_json.to_string(), - turbo_json: d.turbo_json.map(|t| t.to_string()), - }) - .collect(), - package_manager: proto::PackageManager::from(packages.package_manager).into(), - }) - }) - .map_err(|e| match e { - turborepo_repository::discovery::Error::Unavailable => { - tonic::Status::unavailable("package discovery unavailable") - } - turborepo_repository::discovery::Error::Failed(e) => { - tonic::Status::internal(format!("{}", e)) - } - }) + match self.package_watcher.discover_packages().await { + Some(Ok(packages)) => Ok(tonic::Response::new(proto::DiscoverPackagesResponse { + package_files: packages + .workspaces + .into_iter() + .map(|d| proto::PackageFiles { + package_json: d.package_json.to_string(), + turbo_json: d.turbo_json.map(|t| t.to_string()), + }) + .collect(), + package_manager: proto::PackageManager::from(packages.package_manager).into(), + })), + None | Some(Err(PackageWatchError::Unavailable)) => { + Err(tonic::Status::unavailable("package discovery unavailable")) + } + Some(Err(PackageWatchError::InvalidState(reason))) => { + Err(tonic::Status::failed_precondition(reason)) + } + } } async fn discover_packages_blocking( &self, _request: tonic::Request, ) -> Result, tonic::Status> { - self.package_discovery - .discover_packages_blocking() - .await - .map(|packages| { - tonic::Response::new(proto::DiscoverPackagesResponse { - package_files: packages - .workspaces - .into_iter() - .map(|d| proto::PackageFiles { - package_json: d.package_json.to_string(), - turbo_json: d.turbo_json.map(|t| t.to_string()), - }) - .collect(), - package_manager: proto::PackageManager::from(packages.package_manager).into(), - }) - }) - .map_err(|e| match e { - turborepo_repository::discovery::Error::Unavailable => { - tonic::Status::unavailable("package discovery unavailable") - } - turborepo_repository::discovery::Error::Failed(e) => { - tonic::Status::internal(format!("{}", e)) - } - }) + match self.package_watcher.discover_packages_blocking().await { + Ok(packages) => Ok(tonic::Response::new(proto::DiscoverPackagesResponse { + package_files: packages + .workspaces + .into_iter() + .map(|d| proto::PackageFiles { + package_json: d.package_json.to_string(), + turbo_json: d.turbo_json.map(|t| t.to_string()), + }) + .collect(), + package_manager: proto::PackageManager::from(packages.package_manager).into(), + })), + Err(PackageWatchError::Unavailable) => { + Err(tonic::Status::unavailable("package discovery unavailable")) + } + Err(PackageWatchError::InvalidState(reason)) => { + Err(tonic::Status::failed_precondition(reason)) + } + } } }