diff --git a/crates/turborepo-filewatch/src/cookies.rs b/crates/turborepo-filewatch/src/cookies.rs index cfb8f8dcff542..625f695a30c31 100644 --- a/crates/turborepo-filewatch/src/cookies.rs +++ b/crates/turborepo-filewatch/src/cookies.rs @@ -193,11 +193,6 @@ impl CookieWriter { Self::new(&cookie_root, timeout, recv) } - #[cfg(test)] - pub(crate) fn cookie_dir(&self) -> &AbsoluteSystemPath { - &self.cookie_root - } - pub fn new( cookie_root: &AbsoluteSystemPath, timeout: Duration, diff --git a/crates/turborepo-filewatch/src/package_watcher.rs b/crates/turborepo-filewatch/src/package_watcher.rs index 0296c9728315d..d82d2027d30fa 100644 --- a/crates/turborepo-filewatch/src/package_watcher.rs +++ b/crates/turborepo-filewatch/src/package_watcher.rs @@ -15,7 +15,10 @@ use tokio::{ }; use turbopath::{AbsoluteSystemPath, AbsoluteSystemPathBuf}; use turborepo_repository::{ - discovery::{self, DiscoveryResponse, PackageDiscovery, WorkspaceData}, + discovery::{ + self, DiscoveryResponse, LocalPackageDiscoveryBuilder, PackageDiscovery, + PackageDiscoveryBuilder, WorkspaceData, + }, package_manager::{self, PackageManager, WorkspaceGlobs}, }; @@ -93,8 +96,6 @@ impl PackageDiscovery for WatchingPackageDiscovery { #[derive(Debug, Error)] enum PackageWatcherError { - #[error("failed to resolve package manager {0}")] - PackageManager(#[from] package_manager::Error), #[error("filewatching not available, so package watching is not available")] Filewatching(watch::error::RecvError), #[error("filewatching closed, package watching no longer available")] @@ -114,21 +115,20 @@ pub struct PackageWatcher { package_data: CookiedOptionalWatch, ()>, /// The current package manager, if available. - package_manager_lazy: CookiedOptionalWatch, + package_manager_lazy: CookiedOptionalWatch, } impl PackageWatcher { /// Creates a new package watcher whose current package data can be queried. /// `backup_discovery` is used to perform the initial discovery of packages, /// to populate the state before we can watch. - pub fn new( + pub fn new( root: AbsoluteSystemPathBuf, recv: OptionalWatch>>, - backup_discovery: T, cookie_writer: CookieWriter, ) -> Result { let (exit_tx, exit_rx) = oneshot::channel(); - let subscriber = Subscriber::new(root, backup_discovery, cookie_writer)?; + let subscriber = Subscriber::new(root, cookie_writer)?; let package_manager_lazy = subscriber.manager_receiver(); let package_data = subscriber.package_data(); let handle = tokio::spawn(subscriber.watch(exit_rx, recv)); @@ -165,7 +165,7 @@ impl PackageWatcher { /// will block until it is. pub async fn wait_for_package_manager(&self) -> Result { let mut recv = self.package_manager_lazy.clone(); - recv.get().await.map(|s| s.manager) + recv.get().await.map(|s| s.to_owned()) } /// A convenience wrapper around `FutureExt::now_or_never` to let you get @@ -175,7 +175,7 @@ impl PackageWatcher { // the borrow checker doesn't like returning immediately here so assign to a var #[allow(clippy::let_and_return)] let data = if let Some(Ok(inner)) = recv.get_immediate().await { - Some(Ok(inner.manager)) + Some(Ok(*inner)) } else { None }; @@ -189,57 +189,65 @@ impl PackageWatcher { /// The underlying task that listens to file system events and updates the /// internal package state. -struct Subscriber { - backup_discovery: T, - +struct Subscriber { repo_root: AbsoluteSystemPathBuf, - root_package_json_path: AbsoluteSystemPathBuf, - // pnpm workspace file is handled specifically, the rest of the package managers - // use package.json for workspaces. We need to invalidate everything on a workspace glob - // change because we couple package manager detection with valid workspace globs. - pnpm_workspace_path: AbsoluteSystemPathBuf, + // This is the list of paths that will trigger rediscovering everything. + invalidation_paths: Vec, // package manager data - package_manager_tx: watch::Sender>, - package_manager_lazy: CookiedOptionalWatch, + package_manager_tx: watch::Sender>, + package_manager_lazy: CookiedOptionalWatch, package_data_tx: watch::Sender>>, package_data_lazy: CookiedOptionalWatch, ()>, cookie_tx: CookieRegister, } -/// A collection of state inferred from a package manager. All this data will -/// change if the package manager changes. -#[derive(Clone)] -struct PackageManagerState { - manager: PackageManager, - // we need to wrap in Arc to make it send / sync - filter: Arc, - workspace_config_path: AbsoluteSystemPathBuf, +/// PackageWatcher state. We either don't have a valid package manager, +/// don't have valid globs, or we have both a package manager and globs +/// and some maybe-empty set of workspaces. +#[derive(Debug)] +enum State { + NoPackageManager, + InvalidGlobs(PackageManager), + ValidWorkspaces { + package_manager: PackageManager, + filter: WorkspaceGlobs, + workspaces: HashMap, + }, } -impl Subscriber { +// Because our package manager detection is coupled with the workspace globs, we +// need to recheck all workspaces any time any of these files change. A change +// in any of these might result in a different package manager being detected, +// or going from no package manager to some package manager. +const INVALIDATION_PATHS: &[&str] = &[ + "package.json", + "pnpm-workspace.yaml", + "pnpm-lock.yaml", + "package-lock.json", + "yarn.lock", + "bun.lockb", +]; + +impl Subscriber { /// Creates a new instance of PackageDiscovery. This will start a task that /// performs the initial discovery using the `backup_discovery` of your /// choice, and then listens to file system events to keep the package /// data up to date. fn new( repo_root: AbsoluteSystemPathBuf, - backup_discovery: T, writer: CookieWriter, ) -> Result { let (package_data_tx, cookie_tx, package_data_lazy) = CookiedOptionalWatch::new(writer); let (package_manager_tx, package_manager_lazy) = package_data_lazy.new_sibling(); - // we create a second optional watch here so that we can ensure it is ready and - // pass it down stream after the initial discovery, otherwise our package - // discovery watcher will consume events before we have our initial state - let package_json_path = repo_root.join_component("package.json"); - let pnpm_workspace_path = repo_root.join_component("pnpm-workspace.yaml"); + let invalidation_paths = INVALIDATION_PATHS + .iter() + .map(|p| repo_root.join_component(p)) + .collect(); Ok(Self { - backup_discovery, repo_root, - pnpm_workspace_path, - root_package_json_path: package_json_path, + invalidation_paths, package_data_lazy, package_data_tx, package_manager_lazy, @@ -258,27 +266,27 @@ impl Subscriber { Err(e) => return PackageWatcherError::Filewatching(e), }; - self.rediscover_everything().await; + // state represents our current understanding of the underlying filesystem, and + // is expected to be mutated in place by handle_file_event. Both + // rediscover_everything and handle_file_event are responsible for + // broadcasting updates to state. + let mut state = self.rediscover_everything().await; - tracing::debug!("package watcher ready"); + tracing::debug!("package watcher ready {:?}", state); loop { let file_event = recv.recv().await; match file_event { - Ok(Ok(event)) => { - if let Err(e) = self.handle_file_event(&event).await { - tracing::debug!("package watching is closing, exiting"); - return e; - } - } + Ok(Ok(event)) => self.handle_file_event(&mut state, &event).await, // if we get an error, we need to re-discover the packages - Ok(Err(_)) => self.rediscover_everything().await, + Ok(Err(_)) => state = self.rediscover_everything().await, Err(e @ RecvError::Closed) => return PackageWatcherError::FilewatchingClosed(e), // if we end up lagging, warn and rediscover packages Err(RecvError::Lagged(count)) => { tracing::warn!("lagged behind {count} processing file watching events"); - self.rediscover_everything().await; + state = self.rediscover_everything().await; } } + tracing::debug!("package watcher state: {:?}", state); } } @@ -303,7 +311,7 @@ impl Subscriber { } } - pub fn manager_receiver(&self) -> CookiedOptionalWatch { + pub fn manager_receiver(&self) -> CookiedOptionalWatch { self.package_manager_lazy.clone() } @@ -314,13 +322,14 @@ impl Subscriber { } fn path_invalidates_everything(&self, path: &Path) -> bool { - path.eq(&self.root_package_json_path as &AbsoluteSystemPath) - || path.eq(&self.pnpm_workspace_path as &AbsoluteSystemPath) + self.invalidation_paths + .iter() + .any(|invalidation_path| path.eq(invalidation_path as &AbsoluteSystemPath)) } /// Returns Err(()) if the package manager channel is closed, indicating /// that the entire watching task should exit. - async fn handle_file_event(&mut self, file_event: &Event) -> Result<(), PackageWatcherError> { + async fn handle_file_event(&mut self, state: &mut State, file_event: &Event) { tracing::trace!("file event: {:?} {:?}", file_event.kind, file_event.paths); if file_event @@ -329,17 +338,10 @@ impl Subscriber { .any(|path| self.path_invalidates_everything(path)) { // root package.json changed, rediscover everything - self.rediscover_everything().await; + *state = self.rediscover_everything().await; } else { tracing::trace!("handling non-root package.json change"); - let globs_have_changed = self.have_workspace_globs_changed(file_event).await; - if globs_have_changed { - tracing::trace!("glob change?"); - self.rediscover_packages().await; - } else { - tracing::trace!("checking for package.json change"); - self.handle_package_json_change(file_event).await?; - } + self.handle_workspace_changes(state, file_event).await; } tracing::trace!("updating the cookies"); @@ -353,35 +355,22 @@ impl Subscriber { .map(|p| AbsoluteSystemPath::from_std_path(p).expect("these paths are absolute")) .collect::>(), ); - - Ok(()) } - /// Returns Err(()) if the package manager channel is closed, indicating - /// that the entire watching task should exit. - async fn handle_package_json_change( - &mut self, - file_event: &Event, - ) -> Result<(), PackageWatcherError> { - // we can only fail receiving if the channel is closed, - // which indicated that the entire watching task should exit - let state = { - let package_manager_state = self - .package_manager_lazy - .get_immediate_raw("this is called from the file event loop") - .await; - match package_manager_state { - // We don't have a package manager, no sense trying to handle workspace changes. - None => return Ok(()), - Some(state) => state.map(|s| s.to_owned()).expect( - "this is called from the file event loop, and only when we have a package \ - manager", - ), - } + // checks if the file event contains any changes to package.json files, or + // directories that would map to a workspace. + async fn handle_workspace_changes(&mut self, state: &mut State, file_event: &Event) { + // If we don't have a valid package manager and workspace globs, nothing to be + // done here + let State::ValidWorkspaces { + filter, workspaces, .. + } = state + else { + return; }; // here, we can only update if we have a valid package state - + let mut changed = false; // if a path is not a valid utf8 string, it is not a valid path, so ignore for path in file_event .paths @@ -395,8 +384,7 @@ impl Subscriber { let path_parent = path_file .parent() .expect("watched paths will not be at the root"); - if state - .filter + if filter .target_is_workspace(&self.repo_root, path_parent) .unwrap_or(false) { @@ -406,8 +394,7 @@ impl Subscriber { // matching workspace globs continue; } - } else if state - .filter + } else if filter .target_is_workspace(&self.repo_root, &path_file) .unwrap_or(false) { @@ -430,92 +417,23 @@ impl Subscriber { tokio::fs::try_exists(&turbo_json) ); - self.package_data_tx - .send_modify(|mut data| match (&mut data, package_exists) { - // We have initial data, and this workspace exists - (Some(data), true) => { - data.insert( - path_workspace.to_owned(), - WorkspaceData { - package_json, - turbo_json: turbo_exists.unwrap_or_default().then_some(turbo_json), - }, - ); - } - // we have initial data, and this workspace does not exist - (Some(data), false) => { - data.remove(path_workspace); - } - // this is our first workspace, and it exists - (None, true) => { - let mut map = HashMap::new(); - map.insert( - path_workspace.to_owned(), - WorkspaceData { - package_json, - turbo_json: turbo_exists.unwrap_or_default().then_some(turbo_json), - }, - ); - *data = Some(map); - } - // we have no workspaces, and this one does not exist, - // so there's nothing to do. - (None, false) => {} - }); + changed |= if package_exists { + workspaces + .insert( + path_workspace.to_owned(), + WorkspaceData { + package_json, + turbo_json: turbo_exists.unwrap_or_default().then_some(turbo_json), + }, + ) + .is_none() + } else { + workspaces.remove(path_workspace).is_some() + } } - Ok(()) - } - - /// A change to the workspace config path could mean a change to the package - /// glob list. If this happens, we need to re-walk the packages. - /// - /// Returns Err(()) if the package manager channel is closed, indicating - /// that the entire watching task should exit. - async fn have_workspace_globs_changed(&mut self, file_event: &Event) -> bool { - // here, we can only update if we have a valid package state - // we can only fail receiving if the channel is closed, - // which indicated that the entire watching task should exit - let package_manager_state = self - .package_manager_lazy - .get_immediate_raw("this is called from the file event loop") - .await; - let state = match package_manager_state { - // We don't have a package manager, no sense trying to parse globs - None => return false, - Some(state) => state.map(|s| s.to_owned()).expect( - "this is called from the file event loop, and only when we have a package manager", - ), - }; - - if file_event - .paths - .iter() - .any(|p| state.workspace_config_path.as_std_path().eq(p)) - { - let new_filter = state - .manager - .get_workspace_globs(&self.repo_root) - .map(Arc::new) - // under some saving strategies a file can be totally empty for a moment - // during a save. these strategies emit multiple events and so we can - // a previous or subsequent event in the 'cluster' will still trigger - .unwrap_or_else(|_| state.filter.clone()); - - self.package_manager_tx.send_if_modified(|f| match f { - Some(state) if state.filter == new_filter => false, - Some(state) => { - tracing::debug!("workspace globs changed: {:?}", new_filter); - state.filter = new_filter; - true - } - // if we haven't got a valid manager, then it probably means - // that we are currently calcuating one, so we should just - // ignore this event - None => false, - }) - } else { - false + if changed { + self.write_workspace_state(workspaces); } } @@ -541,147 +459,119 @@ impl Subscriber { }); } - async fn rediscover_everything(&mut self) { + async fn rediscover_everything(&mut self) -> State { // If we're rediscovering the package manager, clear all data self.reset_package_manager(); self.reset_workspaces(); - let initial_discovery = match self.backup_discovery.discover_packages().await { + // If we're rediscovering everything, we need to rediscover the package manager. + // It may have changed if a lockfile changed or package.json changed. + let discovery = + match LocalPackageDiscoveryBuilder::new(self.repo_root.clone(), None, None).build() { + Ok(discovery) => discovery, + Err(_) => return State::NoPackageManager, + }; + let initial_discovery = match discovery.discover_packages().await { Ok(discovery) => discovery, // If we failed the discovery, that's fine, we've reset the values, leave them as None Err(e) => { tracing::debug!("failed to rediscover packages: {}", e); - return; + return State::NoPackageManager; } }; - tracing::debug!("rediscovered packages: {:?}", initial_discovery); - let workspace_config_path = initial_discovery - .package_manager - .workspace_configuration_path() - .map_or_else( - || self.root_package_json_path.to_owned(), - |p| self.repo_root.join_component(p), - ); + tracing::debug!("rediscovered packages: {:?}", initial_discovery); let filter = match initial_discovery .package_manager .get_workspace_globs(&self.repo_root) { - Ok(filter) => Arc::new(filter), + Ok(filter) => filter, Err(e) => { // If the globs are invalid, leave everything set to None tracing::debug!("failed to get workspace globs: {}", e); - return; + return State::InvalidGlobs(initial_discovery.package_manager); } }; - let state = PackageManagerState { - manager: initial_discovery.package_manager, + let workspaces = initial_discovery + .workspaces + .into_iter() + .map(|p| (p.package_json.parent().expect("non-root").to_owned(), p)) + .collect::>(); + let state = State::ValidWorkspaces { + package_manager: initial_discovery.package_manager, filter, - workspace_config_path, + workspaces, }; - - // if either of these fail, it means that there are no more subscribers and we - // should just ignore it, since we are likely closing - let _ = self.package_manager_tx.send(Some(state)); - let _ = self.package_data_tx.send(Some( - initial_discovery - .workspaces - .into_iter() - .map(|p| (p.package_json.parent().expect("non-root").to_owned(), p)) - .collect::>(), - )); + self.write_state(&state); + state } - async fn rediscover_packages(&mut self) { - tracing::debug!("rediscovering packages"); + fn write_state(&self, state: &State) { + match state { + State::NoPackageManager => {} /* no state to write, we've already reset when */ + // invalidating + State::InvalidGlobs(package_manager) => self.write_package_manager(package_manager), + State::ValidWorkspaces { + package_manager, + workspaces, + .. + } => { + self.write_package_manager(package_manager); + self.write_workspace_state(workspaces); + } + } + } - // make sure package data is unavailable while we are updating - self.reset_workspaces(); + fn write_package_manager(&self, package_manager: &PackageManager) { + let _ = self + .package_manager_tx + .send_if_modified(|existing| match existing { + Some(existing) if existing == package_manager => false, + Some(_) | None => { + *existing = Some(*package_manager); + true + } + }); + } - let response = match self.backup_discovery.discover_packages().await { - Ok(discovery) => discovery, - // If we failed the discovery, that's fine, we've reset the values, leave them as None - Err(e) => { - tracing::debug!("failed to rediscover packages: {}", e); - return; - } - }; - self.package_data_tx.send_modify(|d| { - let new_data = response - .workspaces - .into_iter() - .map(|p| (p.package_json.parent().expect("non-root").to_owned(), p)) - .collect::>(); - let _ = d.insert(new_data); - }); + fn write_workspace_state(&self, workspaces: &HashMap) { + let _ = self + .package_data_tx + .send_if_modified(|existing| match existing { + Some(existing) if existing == workspaces => false, + Some(_) | None => { + *existing = Some(workspaces.clone()); + true + } + }); } } #[cfg(test)] mod test { - use std::{ - sync::{Arc, Mutex}, - time::Duration, - }; + use std::time::Duration; - use itertools::Itertools; - use tokio::{join, sync::broadcast}; use turbopath::AbsoluteSystemPathBuf; - use turborepo_repository::{ - discovery::{ - self, DiscoveryResponse, LocalPackageDiscoveryBuilder, PackageDiscoveryBuilder, - WorkspaceData, - }, - package_manager::PackageManager, - }; - - use super::Subscriber; - use crate::{ - cookies::CookieWriter, package_watcher::PackageWatcher, FileSystemWatcher, OptionalWatch, - }; - - #[derive(Debug)] - struct MockDiscovery { - pub manager: PackageManager, - pub package_data: Arc>>, - } - - impl super::PackageDiscovery for MockDiscovery { - async fn discover_packages(&self) -> Result { - Ok(DiscoveryResponse { - package_manager: self.manager, - workspaces: self.package_data.lock().unwrap().clone(), - }) - } + use turborepo_repository::{discovery::WorkspaceData, package_manager::PackageManager}; - async fn discover_packages_blocking(&self) -> Result { - self.discover_packages().await - } - } + use crate::{cookies::CookieWriter, package_watcher::PackageWatcher, FileSystemWatcher}; #[tokio::test] #[tracing_test::traced_test] async fn subscriber_test() { let tmp = tempfile::tempdir().unwrap(); - - let (tx, rx) = broadcast::channel(10); - let rx = OptionalWatch::once(rx); - let (_exit_tx, exit_rx) = tokio::sync::oneshot::channel(); - - let root: AbsoluteSystemPathBuf = tmp.path().try_into().unwrap(); - let manager = PackageManager::Yarn; + let repo_root = AbsoluteSystemPathBuf::try_from(tmp.path()) + .unwrap() + .to_realpath() + .unwrap(); let package_data = vec![ WorkspaceData { - package_json: root.join_component("package.json"), + package_json: repo_root.join_components(&["packages", "foo", "package.json"]), turbo_json: None, }, WorkspaceData { - package_json: root.join_components(&["packages", "foo", "package.json"]), - turbo_json: None, - }, - WorkspaceData { - package_json: root.join_components(&["packages", "bar", "package.json"]), + package_json: repo_root.join_components(&["packages", "bar", "package.json"]), turbo_json: None, }, ]; @@ -689,67 +579,43 @@ mod test { // create folders and files for data in &package_data { data.package_json.ensure_dir().unwrap(); - data.package_json.create_with_contents("{}").unwrap(); + let name = data.package_json.parent().unwrap().file_name().unwrap(); + data.package_json + .create_with_contents(format!("{{\"name\": \"{name}\"}}")) + .unwrap(); } + repo_root + .join_component("package-lock.json") + .create_with_contents("") + .unwrap(); // write workspaces to root - root.join_component("package.json") + repo_root + .join_component("package.json") .create_with_contents(r#"{"workspaces":["packages/*"]}"#) .unwrap(); - let mock_discovery = MockDiscovery { - manager, - package_data: Arc::new(Mutex::new(package_data)), - }; - let cookie_writer = - CookieWriter::new_with_default_cookie_dir(&root, Duration::from_secs(2), rx.clone()); - - let subscriber = - Subscriber::new(root.clone(), mock_discovery, cookie_writer.clone()).unwrap(); - - let mut package_data = subscriber.package_data(); - let _handle = tokio::spawn(subscriber.watch(exit_rx, rx)); - - tx.send(Ok(notify::Event { - kind: notify::EventKind::Create(notify::event::CreateKind::File), - paths: vec![root.join_component("package.json").as_std_path().to_owned()], - ..Default::default() - })) - .unwrap(); + let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root).unwrap(); + let recv = watcher.watch(); + let cookie_writer = CookieWriter::new( + watcher.cookie_dir(), + Duration::from_millis(100), + recv.clone(), + ); - let (data, _) = join! { - package_data.get(), - async { - // simulate fs round trip - tokio::time::sleep(std::time::Duration::from_millis(100)).await; - - let path = cookie_writer.cookie_dir().join_component("1.cookie").as_std_path().to_owned(); - tracing::info!("writing cookie at {}", path.to_string_lossy()); - tx.send(Ok(notify::Event { - kind: notify::EventKind::Create(notify::event::CreateKind::File), - paths: vec![path], - ..Default::default() - })).unwrap(); - } - }; + let package_watcher = PackageWatcher::new(repo_root.clone(), recv, cookie_writer).unwrap(); + let mut data = package_watcher.wait_for_package_data().await.unwrap(); + data.sort_by_key(|workspace| workspace.package_json.clone()); assert_eq!( - data.unwrap() - .values() - .cloned() - .sorted_by_key(|f| f.package_json.clone()) - .collect::>(), + data, vec![ WorkspaceData { - package_json: root.join_component("package.json"), - turbo_json: None, - }, - WorkspaceData { - package_json: root.join_components(&["packages", "bar", "package.json",]), + package_json: repo_root.join_components(&["packages", "bar", "package.json",]), turbo_json: None, }, WorkspaceData { - package_json: root.join_components(&["packages", "foo", "package.json",]), + package_json: repo_root.join_components(&["packages", "foo", "package.json",]), turbo_json: None, }, ] @@ -758,124 +624,51 @@ mod test { tracing::info!("removing subpackage"); // delete package.json in foo - root.join_components(&["packages", "foo", "package.json"]) + repo_root + .join_components(&["packages", "foo", "package.json"]) .remove_file() .unwrap(); - tx.send(Ok(notify::Event { - kind: notify::EventKind::Remove(notify::event::RemoveKind::File), - paths: vec![root - .join_components(&["packages", "foo", "package.json"]) - .as_std_path() - .to_owned()], - ..Default::default() - })) - .unwrap(); - - let (data, _) = join! { - package_data.get(), - async { - // simulate fs round trip - tokio::time::sleep(std::time::Duration::from_millis(100)).await; - - let path = cookie_writer.cookie_dir().join_component("2.cookie").as_std_path().to_owned(); - tracing::info!("writing cookie at {}", path.to_string_lossy()); - tx.send(Ok(notify::Event { - kind: notify::EventKind::Create(notify::event::CreateKind::File), - paths: vec![path], - ..Default::default() - })).unwrap(); - } - }; - + let mut data = package_watcher.wait_for_package_data().await.unwrap(); + data.sort_by_key(|workspace| workspace.package_json.clone()); assert_eq!( - data.unwrap() - .values() - .cloned() - .sorted_by_key(|f| f.package_json.clone()) - .collect::>(), - vec![ - WorkspaceData { - package_json: root.join_component("package.json"), - turbo_json: None, - }, - WorkspaceData { - package_json: root.join_components(&["packages", "bar", "package.json"]), - turbo_json: None, - } - ] + data, + vec![WorkspaceData { + package_json: repo_root.join_components(&["packages", "bar", "package.json"]), + turbo_json: None, + }] ); // move package bar - root.join_components(&["packages", "bar"]) - .rename(&root.join_component("bar")) + repo_root + .join_components(&["packages", "bar"]) + .rename(&repo_root.join_component("bar")) .unwrap(); - tx.send(Ok(notify::Event { - kind: notify::EventKind::Modify(notify::event::ModifyKind::Any), - paths: vec![root - .join_components(&["packages", "bar"]) - .as_std_path() - .to_owned()], - ..Default::default() - })) - .unwrap(); - - let (data, _) = join! { - package_data.get(), - async { - // simulate fs round trip - tokio::time::sleep(std::time::Duration::from_millis(100)).await; - - let path = cookie_writer.cookie_dir().join_component("3.cookie").as_std_path().to_owned(); - tracing::info!("writing cookie at {}", path.to_string_lossy()); - tx.send(Ok(notify::Event { - kind: notify::EventKind::Create(notify::event::CreateKind::File), - paths: vec![path], - ..Default::default() - })).unwrap(); - } - }; - - assert_eq!( - data.unwrap() - .values() - .cloned() - .sorted_by_key(|f| f.package_json.clone()) - .collect::>(), - vec![WorkspaceData { - package_json: root.join_component("package.json"), - turbo_json: None, - }] - ); + let mut data = package_watcher.wait_for_package_data().await.unwrap(); + data.sort_by_key(|workspace| workspace.package_json.clone()); + assert_eq!(data, vec![]); } #[tokio::test] #[tracing_test::traced_test] async fn subscriber_update_workspaces() { let tmp = tempfile::tempdir().unwrap(); - - let (tx, rx) = broadcast::channel(10); - let rx = OptionalWatch::once(rx); - let (_exit_tx, exit_rx) = tokio::sync::oneshot::channel(); - - let root = AbsoluteSystemPathBuf::new(tmp.path().to_string_lossy()).unwrap(); - let manager = PackageManager::Yarn; + let repo_root = AbsoluteSystemPathBuf::try_from(tmp.path()) + .unwrap() + .to_realpath() + .unwrap(); let package_data = vec![ WorkspaceData { - package_json: root.join_component("package.json"), - turbo_json: None, - }, - WorkspaceData { - package_json: root + package_json: repo_root .join_component("packages") .join_component("foo") .join_component("package.json"), turbo_json: None, }, WorkspaceData { - package_json: root + package_json: repo_root .join_component("packages2") .join_component("bar") .join_component("package.json"), @@ -885,72 +678,48 @@ mod test { // create folders and files for data in &package_data { - tokio::fs::create_dir_all(data.package_json.parent().unwrap()) - .await + data.package_json.ensure_dir().unwrap(); + let name = data.package_json.parent().unwrap().file_name().unwrap(); + data.package_json + .create_with_contents(format!("{{\"name\": \"{name}\"}}")) .unwrap(); - tokio::fs::write(&data.package_json, b"{}").await.unwrap(); } + repo_root + .join_component("package-lock.json") + .create_with_contents("") + .unwrap(); // write workspaces to root - tokio::fs::write( - root.join_component("package.json"), - r#"{"workspaces":["packages/*", "packages2/*"]}"#, - ) - .await - .unwrap(); - - let package_data_raw = Arc::new(Mutex::new(package_data)); - - let mock_discovery = MockDiscovery { - manager, - package_data: package_data_raw.clone(), - }; - - let cookie_writer = - CookieWriter::new_with_default_cookie_dir(&root, Duration::from_secs(2), rx.clone()); - let subscriber = - Subscriber::new(root.clone(), mock_discovery, cookie_writer.clone()).unwrap(); - - let mut package_data = subscriber.package_data(); + repo_root + .join_component("package.json") + .create_with_contents(r#"{"workspaces":["packages/*", "packages2/*"]}"#) + .unwrap(); - let _handle = tokio::spawn(subscriber.watch(exit_rx, rx)); + let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root).unwrap(); + let recv = watcher.watch(); + let cookie_writer = CookieWriter::new( + watcher.cookie_dir(), + Duration::from_millis(100), + recv.clone(), + ); - let (data, _) = join! { - package_data.get(), - async { - // simulate fs round trip - tokio::time::sleep(std::time::Duration::from_millis(100)).await; + let package_watcher = PackageWatcher::new(repo_root.clone(), recv, cookie_writer).unwrap(); - let path = cookie_writer.cookie_dir().join_component("1.cookie").as_std_path().to_owned(); - tracing::info!("writing cookie at {}", path.to_string_lossy()); - tx.send(Ok(notify::Event { - kind: notify::EventKind::Create(notify::event::CreateKind::File), - paths: vec![path], - ..Default::default() - })).unwrap(); - } - }; + let mut data = package_watcher.wait_for_package_data().await.unwrap(); + data.sort_by_key(|workspace| workspace.package_json.clone()); assert_eq!( - data.unwrap() - .values() - .cloned() - .sorted_by_key(|f| f.package_json.clone()) - .collect::>(), + data, vec![ WorkspaceData { - package_json: root.join_component("package.json"), - turbo_json: None, - }, - WorkspaceData { - package_json: root + package_json: repo_root .join_component("packages") .join_component("foo") .join_component("package.json"), turbo_json: None, }, WorkspaceData { - package_json: root + package_json: repo_root .join_component("packages2") .join_component("bar") .join_component("package.json"), @@ -959,69 +728,50 @@ mod test { ] ); - // update workspaces - tracing::info!("updating workspaces"); - *package_data_raw.lock().unwrap() = vec![ - WorkspaceData { - package_json: root.join_component("package.json"), - turbo_json: None, - }, - WorkspaceData { - package_json: root + // update workspaces to no longer cover packages2 + repo_root + .join_component("package.json") + .create_with_contents(r#"{"workspaces":["packages/*"]}"#) + .unwrap(); + + let mut data = package_watcher.wait_for_package_data().await.unwrap(); + data.sort_by_key(|workspace| workspace.package_json.clone()); + + assert_eq!( + data, + vec![WorkspaceData { + package_json: repo_root .join_component("packages") .join_component("foo") .join_component("package.json"), turbo_json: None, - }, - ]; - tokio::fs::write( - root.join_component("package.json"), - r#"{"workspaces":["packages/*"]}"#, - ) - .await - .unwrap(); - - tx.send(Ok(notify::Event { - kind: notify::EventKind::Modify(notify::event::ModifyKind::Any), - paths: vec![root.join_component("package.json").as_std_path().to_owned()], - ..Default::default() - })) - .unwrap(); - - let (data, _) = join! { - package_data.get(), - async { - // simulate fs round trip - tokio::time::sleep(std::time::Duration::from_millis(100)).await; - - let path = cookie_writer.cookie_dir().join_component("2.cookie").as_std_path().to_owned(); - tracing::info!("writing cookie at {}", path.to_string_lossy()); - tx.send(Ok(notify::Event { - kind: notify::EventKind::Create(notify::event::CreateKind::File), - paths: vec![path], - ..Default::default() - })).unwrap(); - } - }; + }] + ); + // move the packages2 workspace into package + repo_root + .join_components(&["packages2", "bar"]) + .rename(&repo_root.join_components(&["packages", "bar"])) + .unwrap(); + let mut data = package_watcher.wait_for_package_data().await.unwrap(); + data.sort_by_key(|workspace| workspace.package_json.clone()); assert_eq!( - data.unwrap() - .values() - .cloned() - .sorted_by_key(|f| f.package_json.clone()) - .collect::>(), + data, vec![ WorkspaceData { - package_json: root.join_component("package.json"), + package_json: repo_root + .join_component("packages") + .join_component("bar") + .join_component("package.json"), turbo_json: None, }, WorkspaceData { - package_json: root + package_json: repo_root .join_component("packages") .join_component("foo") .join_component("package.json"), turbo_json: None, - } + }, ] ); } @@ -1055,15 +805,7 @@ mod test { recv.clone(), ); - let package_watcher = PackageWatcher::new( - repo_root.clone(), - recv, - LocalPackageDiscoveryBuilder::new(repo_root.clone(), None, None) - .build() - .unwrap(), - cookie_writer, - ) - .unwrap(); + let package_watcher = PackageWatcher::new(repo_root.clone(), recv, cookie_writer).unwrap(); // TODO: change this to expect either an empty result, or a package manager // without globs @@ -1078,13 +820,7 @@ mod test { .create_with_contents(r#"packages: ["foo/*"]"#) .unwrap(); - let package_manager = tokio::time::timeout( - Duration::from_millis(200), - package_watcher.wait_for_package_manager(), - ) - .await - .unwrap() - .unwrap(); + let package_manager = package_watcher.wait_for_package_manager().await.unwrap(); assert_eq!(package_manager, PackageManager::Pnpm); // Remove workspaces file, verify we get a timeout @@ -1153,15 +889,7 @@ mod test { recv.clone(), ); - let package_watcher = PackageWatcher::new( - repo_root.clone(), - recv, - LocalPackageDiscoveryBuilder::new(repo_root.clone(), None, None) - .build() - .unwrap(), - cookie_writer, - ) - .unwrap(); + let package_watcher = PackageWatcher::new(repo_root.clone(), recv, cookie_writer).unwrap(); // TODO: change this to expect either an empty result, or a package manager // without globs @@ -1176,13 +904,7 @@ mod test { .create_with_contents(r#"{"packageManager": "pnpm@7.0", "workspaces": ["foo/*"]}"#) .unwrap(); - let package_manager = tokio::time::timeout( - Duration::from_millis(200), - package_watcher.wait_for_package_manager(), - ) - .await - .unwrap() - .unwrap(); + let package_manager = package_watcher.wait_for_package_manager().await.unwrap(); assert_eq!(package_manager, PackageManager::Npm); // Remove workspaces file, verify we get a timeout @@ -1221,4 +943,72 @@ mod test { .unwrap(); assert_eq!(package_manager, PackageManager::Npm); } + + #[tokio::test] + #[tracing_test::traced_test] + async fn test_change_package_manager() { + let tmp = tempfile::tempdir().unwrap(); + let repo_root = AbsoluteSystemPathBuf::try_from(tmp.path()) + .unwrap() + .to_realpath() + .unwrap(); + + let workspaces_path = repo_root.join_component("pnpm-workspace.yaml"); + workspaces_path + .create_with_contents(r#"packages: ["foo/*"]"#) + .unwrap(); + // Currently we require valid state to start the daemon + let root_package_json_path = repo_root.join_component("package.json"); + // Start with no workspace glob + root_package_json_path + .create_with_contents(r#"{"packageManager": "pnpm@7.0"}"#) + .unwrap(); + let pnpm_lock_file = repo_root.join_component("pnpm-lock.yaml"); + pnpm_lock_file.create_with_contents("").unwrap(); + + let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root).unwrap(); + let recv = watcher.watch(); + let cookie_writer = CookieWriter::new( + watcher.cookie_dir(), + Duration::from_millis(100), + recv.clone(), + ); + + let package_watcher = PackageWatcher::new(repo_root.clone(), recv, cookie_writer).unwrap(); + + let package_manager = package_watcher.wait_for_package_manager().await.unwrap(); + assert_eq!(package_manager, PackageManager::Pnpm); + + pnpm_lock_file.remove_file().unwrap(); + // No more lock file, verify we're in an invalid state + tokio::time::timeout( + Duration::from_millis(50), + package_watcher.wait_for_package_manager(), + ) + .await + .unwrap_err(); + + let npm_lock_file = repo_root.join_component("package-lock.json"); + npm_lock_file.create_with_contents("").unwrap(); + // now we have an npm lockfile, but we don't have workspaces. Still invalid + tokio::time::timeout( + Duration::from_millis(50), + package_watcher.wait_for_package_manager(), + ) + .await + .unwrap_err(); + + // update package.json to complete the transition + root_package_json_path + .create_with_contents(r#"{"packageManager": "npm@7.0", "workspaces": ["foo/*"]}"#) + .unwrap(); + let package_manager = tokio::time::timeout( + Duration::from_millis(200), + package_watcher.wait_for_package_manager(), + ) + .await + .unwrap() + .unwrap(); + assert_eq!(package_manager, PackageManager::Npm); + } } diff --git a/crates/turborepo-lib/src/daemon/server.rs b/crates/turborepo-lib/src/daemon/server.rs index bc2aa514626df..ce1b5135ed13f 100644 --- a/crates/turborepo-lib/src/daemon/server.rs +++ b/crates/turborepo-lib/src/daemon/server.rs @@ -31,10 +31,7 @@ use turborepo_filewatch::{ package_watcher::{PackageWatcher, WatchingPackageDiscovery}, FileSystemWatcher, WatchError, }; -use turborepo_repository::{ - discovery::{LocalPackageDiscoveryBuilder, PackageDiscovery, PackageDiscoveryBuilder}, - package_manager, -}; +use turborepo_repository::{discovery::PackageDiscovery, package_manager}; use super::{bump_timeout::BumpTimeout, endpoint::SocketOpenError, proto}; use crate::daemon::{ @@ -95,10 +92,7 @@ impl FileWatching { /// waiting for the filewatcher to be ready. Using `OptionalWatch`, /// dependent services can wait for resources they need to become /// available, and the server can start up without waiting for them. - pub fn new( - repo_root: AbsoluteSystemPathBuf, - backup_discovery: PD, - ) -> Result { + pub fn new(repo_root: AbsoluteSystemPathBuf) -> Result { let watcher = Arc::new(FileSystemWatcher::new_with_default_cookie_dir(&repo_root)?); let recv = watcher.watch(); @@ -113,13 +107,8 @@ impl FileWatching { recv.clone(), )); let package_watcher = Arc::new( - PackageWatcher::new( - repo_root.clone(), - recv.clone(), - backup_discovery, - cookie_writer, - ) - .map_err(|e| WatchError::Setup(format!("{:?}", e)))?, + PackageWatcher::new(repo_root.clone(), recv.clone(), cookie_writer) + .map_err(|e| WatchError::Setup(format!("{:?}", e)))?, ); Ok(FileWatching { @@ -138,8 +127,6 @@ pub struct TurboGrpcService { paths: Paths, timeout: Duration, external_shutdown: S, - - package_discovery_backup: LocalPackageDiscoveryBuilder, } impl TurboGrpcService @@ -158,9 +145,6 @@ where timeout: Duration, external_shutdown: S, ) -> Self { - let package_discovery_backup = - LocalPackageDiscoveryBuilder::new(repo_root.clone(), None, None); - // Run the actual service. It takes ownership of the struct given to it, // so we use a private struct with just the pieces of state needed to handle // RPCs. @@ -169,7 +153,6 @@ where paths, timeout, external_shutdown, - package_discovery_backup, } } } @@ -184,7 +167,6 @@ where paths, repo_root, timeout, - package_discovery_backup, } = self; // A channel to trigger the shutdown of the gRPC server. This is handed out @@ -192,13 +174,8 @@ where // well as available to the gRPC server itself to handle the shutdown RPC. let (trigger_shutdown, mut shutdown_signal) = mpsc::channel::<()>(1); - let package_discovery_backup = package_discovery_backup.build()?; - let (service, exit_root_watch, watch_root_handle) = TurboGrpcServiceInner::new( - package_discovery_backup, - repo_root.clone(), - trigger_shutdown, - paths.log_file, - ); + let (service, exit_root_watch, watch_root_handle) = + TurboGrpcServiceInner::new(repo_root.clone(), trigger_shutdown, paths.log_file); let running = Arc::new(AtomicBool::new(true)); let (_pid_lock, stream) = @@ -272,8 +249,7 @@ struct TurboGrpcServiceInner { // watching package hasher also uses watching package discovery as well as // falling back to a local package hasher impl TurboGrpcServiceInner { - pub fn new( - package_discovery_backup: PD, + pub fn new( repo_root: AbsoluteSystemPathBuf, trigger_shutdown: mpsc::Sender<()>, log_file: AbsoluteSystemPathBuf, @@ -282,7 +258,7 @@ impl TurboGrpcServiceInner { oneshot::Sender<()>, JoinHandle>, ) { - let file_watching = FileWatching::new(repo_root.clone(), package_discovery_backup).unwrap(); + let file_watching = FileWatching::new(repo_root.clone()).unwrap(); tracing::debug!("initing package discovery"); let package_discovery = Arc::new(WatchingPackageDiscovery::new(