From 93d74a7c35780a22d37d68f84573f2096586536c Mon Sep 17 00:00:00 2001 From: Greg Soltis Date: Thu, 28 Mar 2024 16:44:26 -0700 Subject: [PATCH] Working basic trie --- Cargo.lock | 27 + crates/turborepo-filewatch/Cargo.toml | 2 + crates/turborepo-filewatch/src/globwatcher.rs | 18 +- .../turborepo-filewatch/src/hash_watcher.rs | 463 +++++++++++++----- .../src/anchored_system_path.rs | 2 +- 5 files changed, 383 insertions(+), 129 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index fae1926791867..7106caaef0688 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2698,6 +2698,12 @@ dependencies = [ "cfg-if 1.0.0", ] +[[package]] +name = "endian-type" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c34f04666d835ff5d62e058c3995147c06f42fe86ff053337632bca83e42702d" + [[package]] name = "enum-iterator" version = "0.7.0" @@ -5072,6 +5078,15 @@ dependencies = [ "unicode-segmentation", ] +[[package]] +name = "nibble_vec" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77a5d83df9f36fe23f0c3648c6bbb8b0298bb5f1939c8f2704431371f4b84d43" +dependencies = [ + "smallvec 1.13.1", +] + [[package]] name = "nix" version = "0.25.1" @@ -6332,6 +6347,16 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ce082a9940a7ace2ad4a8b7d0b1eac6aa378895f18be598230c5f2284ac05426" +[[package]] +name = "radix_trie" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c069c179fcdc6a2fe24d8d18305cf085fdbd4f922c041943e203685d6a1c58fd" +dependencies = [ + "endian-type", + "nibble_vec", +] + [[package]] name = "rand" version = "0.4.6" @@ -11635,7 +11660,9 @@ dependencies = [ "git2 0.16.1", "itertools 0.10.5", "libc", + "nibble_vec", "notify", + "radix_trie", "tempfile", "thiserror", "tokio", diff --git a/crates/turborepo-filewatch/Cargo.toml b/crates/turborepo-filewatch/Cargo.toml index c4e17bae85ae2..5e5ca6a650c6b 100644 --- a/crates/turborepo-filewatch/Cargo.toml +++ b/crates/turborepo-filewatch/Cargo.toml @@ -14,7 +14,9 @@ anyhow = { workspace = true } dashmap = { workspace = true } futures = { version = "0.3.26" } itertools = { workspace = true } +nibble_vec = "0.1.0" notify = { workspace = true } +radix_trie = "0.2.1" thiserror = "1.0.38" tokio = { workspace = true, features = ["full", "time"] } tracing = "0.1.37" diff --git a/crates/turborepo-filewatch/src/globwatcher.rs b/crates/turborepo-filewatch/src/globwatcher.rs index 6466ae493174e..b82148dfb150f 100644 --- a/crates/turborepo-filewatch/src/globwatcher.rs +++ b/crates/turborepo-filewatch/src/globwatcher.rs @@ -112,9 +112,25 @@ impl GlobSet { Ok(Self { include, exclude, - exclude_raw: BTreeSet::from_iter(raw_excludes.into_iter()), + exclude_raw: BTreeSet::from_iter(raw_excludes), }) } + + pub fn from_raw_unfiltered(raw: Vec) -> Result { + let (includes, excludes): (Vec<_>, Vec<_>) = { + let mut includes = vec![]; + let mut excludes = vec![]; + for pattern in raw { + if let Some(exclude) = pattern.strip_prefix("!") { + excludes.push(exclude.to_string()); + } else { + includes.push(pattern); + } + } + (includes, excludes) + }; + Self::from_raw(includes, excludes) + } } #[derive(Debug, Error)] diff --git a/crates/turborepo-filewatch/src/hash_watcher.rs b/crates/turborepo-filewatch/src/hash_watcher.rs index 6f1cfaaefb5f1..f657582cda516 100644 --- a/crates/turborepo-filewatch/src/hash_watcher.rs +++ b/crates/turborepo-filewatch/src/hash_watcher.rs @@ -4,33 +4,23 @@ use std::{ }; use notify::Event; +use radix_trie::{Trie, TrieCommon}; use thiserror::Error; use tokio::{ select, - sync::{ - broadcast, mpsc, oneshot, - watch::{self, error::RecvError}, - }, -}; -use tracing::debug; -use turbopath::{ - AbsoluteSystemPathBuf, AnchoredSystemPath, AnchoredSystemPathBuf, PathRelation, - RelativeUnixPathBuf, + sync::{broadcast, mpsc, oneshot, watch}, }; +use tracing::{debug, trace}; +use turbopath::{AbsoluteSystemPathBuf, AnchoredSystemPath, AnchoredSystemPathBuf}; use turborepo_repository::discovery::DiscoveryResponse; use turborepo_scm::{package_deps::GitHashes, Error as SCMError, SCM}; -use crate::{ - cookies::{CookieWatcher, CookiedRequest}, - globwatcher::GlobSet, - package_watcher::DiscoveryData, - NotifyError, OptionalWatch, -}; +use crate::{globwatcher::GlobSet, package_watcher::DiscoveryData, NotifyError, OptionalWatch}; -struct HashWatcher { +pub struct HashWatcher { _exit_tx: oneshot::Sender<()>, _handle: tokio::task::JoinHandle<()>, - query_tx_lazy: OptionalWatch>, + query_tx: mpsc::Sender, } #[derive(Clone, Debug, PartialEq, Eq, Hash)] @@ -45,6 +35,8 @@ pub enum Error { HashingError(String), #[error("file hashing is not available: {0}")] Unavailable(String), + #[error("package not found: {} {:?}", .0.package_path, .0.inputs)] + UnknownPackage(HashSpec), } // Communication errors that all funnel to Unavailable @@ -68,44 +60,45 @@ impl From> for Error { } impl HashWatcher { - fn new( + pub fn new( repo_root: AbsoluteSystemPathBuf, package_discovery: watch::Receiver>, file_events: OptionalWatch>>, - scm: &SCM, + scm: SCM, ) -> Self { let (exit_tx, exit_rx) = oneshot::channel(); - //let (query_tx, query_rx) = mpsc::channel(16); - let (query_tx_state, query_tx_lazy) = OptionalWatch::new(); - let process = - HashWatchProcess::new(repo_root, package_discovery, scm.clone(), query_tx_state); + let (query_tx, query_rx) = mpsc::channel(16); + let process = HashWatchProcess::new(repo_root, package_discovery, scm, query_rx); let handle = tokio::spawn(process.watch(exit_rx, file_events)); Self { _exit_tx: exit_tx, _handle: handle, - query_tx_lazy, + query_tx, } } - async fn get_hash_blocking(&self, hash_spec: HashSpec) -> Result { + // Note that this does not wait for any sort of ready signal. The watching + // process won't respond until filewatching is ready, but there is no + // guarantee that package data or file hashing will be done before + // responding. Both package discovery and file hashing can fail depending on the + // state of the filesystem, so clients will need to be robust to receiving + // errors. + pub async fn get_file_hashes(&self, hash_spec: HashSpec) -> Result { let (tx, rx) = oneshot::channel(); - let query_tx = self.query_tx_lazy.clone().get().await?.clone(); - query_tx.send(Query::GetHash(hash_spec, tx)).await?; - let resp = rx.await?; - resp.map_err(|e| Error::HashingError(e)) + self.query_tx.send(Query::GetHash(hash_spec, tx)).await?; + rx.await? } } struct HashWatchProcess { repo_root: AbsoluteSystemPathBuf, package_discovery: watch::Receiver>, - query_tx_state: watch::Sender>>, + query_rx: mpsc::Receiver, scm: SCM, } enum Query { - GetHash(HashSpec, oneshot::Sender>), - //CookiedGetHash(CookiedRequest<(HashSpec, oneshot::Sender>)>) + GetHash(HashSpec, oneshot::Sender>), } // Version is a type that exists to stamp an asynchronous hash computation with @@ -129,9 +122,107 @@ impl Version { enum HashState { Hashes(GitHashes), - Pending(Version, Vec>>), + Pending(Version, Vec>>), Unavailable(String), } +// We use a radix_trie to store hashes so that we can quickly match a file path +// to a package without having to iterate over the set of all packages. We +// expect file changes to be the highest volume of events that this service +// handles, so we want to ensure we're efficient in deciding if a given change +// is relevant or not. +// +// Our Trie keys off of a String because of the orphan rule. Keys are required +// to be TrieKey, but this crate doesn't own TrieKey or AnchoredSystemPathBuf. +// We *could* implement TrieKey in AnchoredSystemPathBuf and avoid the String +// conversion, if we decide we want to add the radix_trie dependency to +// turbopath. +struct FileHashes(Trie, HashState>>); + +impl FileHashes { + fn new() -> Self { + Self(Trie::new()) + } + + fn drop_matching(&mut self, mut f: F, reason: &str) + where + F: FnMut(&AnchoredSystemPath) -> bool, + { + let mut new_trie = Trie::new(); + std::mem::swap(&mut self.0, &mut new_trie); + // rename for clarity now that we've swapped + let mut previous = new_trie; + + // radix_trie doesn't have an into_iter() implementation, so we have a slightly + // inefficient method for removing matching values. Fortunately, we only + // need to do this when the package layout changes. It's O(n) in the + // number of packages, on top of the trie internals. + let keys = previous.keys().map(|k| k.to_owned()).collect::>(); + for key in keys { + let previous_value = previous + .remove(&key) + .expect("this key was pulled from previous"); + let path_key = + AnchoredSystemPath::new(&key).expect("keys are valid AnchoredSystemPaths"); + if !f(path_key) { + // keep it, we didn't match the key. + self.0.insert(key, previous_value); + } else { + for state in previous_value.into_values() { + if let HashState::Pending(_, txs) = state { + for tx in txs { + let _ = tx.send(Err(Error::Unavailable(reason.to_string()))); + } + } + } + } + } + } + + fn get_package_path(&self, file_path: &AnchoredSystemPath) -> Option<&AnchoredSystemPath> { + self.0 + .get_ancestor(file_path.as_str()) + .and_then(|subtrie| subtrie.key()) + .map(|package_path| { + AnchoredSystemPath::new(package_path).expect("keys are valid AnchoredSystemPaths") + }) + } + + fn drain(&mut self, reason: &str) { + // funnel through drop_matching even though we could just swap with a new trie. + // We want to ensure we respond to any pending queries. + self.drop_matching(|_| true, reason); + } + + fn contains_key(&self, key: &HashSpec) -> bool { + self.0 + .get(key.package_path.as_str()) + .and_then(|states| states.get(&key.inputs)) + .is_some() + } + + fn insert(&mut self, key: HashSpec, value: HashState) { + if let Some(states) = self.0.get_mut(key.package_path.as_str()) { + match states.entry(key.inputs) { + Entry::Occupied(mut entry) => { + entry.insert(value); + } + Entry::Vacant(entry) => { + entry.insert(value); + } + } + } else { + let mut states = HashMap::new(); + states.insert(key.inputs, value); + self.0.insert(key.package_path.as_str().to_owned(), states); + } + } + + fn get_mut(&mut self, key: &HashSpec) -> Option<&mut HashState> { + self.0 + .get_mut(key.package_path.as_str()) + .and_then(|states| states.get_mut(&key.inputs)) + } +} struct HashUpdate { spec: HashSpec, @@ -144,13 +235,13 @@ impl HashWatchProcess { repo_root: AbsoluteSystemPathBuf, package_discovery: watch::Receiver>, scm: SCM, - query_tx_state: watch::Sender>>, + query_rx: mpsc::Receiver, ) -> Self { Self { repo_root, package_discovery, scm, - query_tx_state, + query_rx, } } @@ -167,18 +258,15 @@ impl HashWatchProcess { return; } }; - let (query_tx, mut query_rx) = mpsc::channel(16); let (hash_update_tx, mut hash_update_rx) = mpsc::channel::(16); - let mut hashes: HashMap = HashMap::new(); + let mut hashes = FileHashes::new(); - // We need to wait for the first non-null (error is ok) update from the package - // watcher before signalling ourselves as ready. let package_data = self.package_discovery.borrow().to_owned(); - let mut ready = package_data.is_some(); self.handle_package_data_update(package_data, &mut hashes, &hash_update_tx); - if ready { - let _ = self.query_tx_state.send(Some(query_tx.clone())); - } + // We've gotten the ready signal from filewatching, and *some* state from + // package discovery, but there is no guarantee that package discovery + // is ready. This means that initial queries may be returned with errors + // until we've completed package discovery and then hashing. loop { select! { biased; @@ -188,17 +276,12 @@ impl HashWatchProcess { }, _ = self.package_discovery.changed() => { let package_data = self.package_discovery.borrow().to_owned(); - // If we weren't already ready, and this update is non-null, we are now ready - ready = !ready && package_data.is_some(); self.handle_package_data_update(package_data, &mut hashes, &hash_update_tx); - if ready { - let _ = self.query_tx_state.send(Some(query_tx.clone())); - } }, file_event = file_events_recv.recv() => { match file_event { Ok(Ok(event)) => { - self.handle_file_event(event, &mut hashes); + self.handle_file_event(event, &mut hashes, &hash_update_tx); }, Ok(Err(e)) => { debug!("file watcher error: {:?}", e); @@ -220,7 +303,7 @@ impl HashWatchProcess { unreachable!("hash update channel closed, but we have a live reference to it"); } }, - query = query_rx.recv() => { + query = self.query_rx.recv() => { if let Some(query) = query { self.handle_query(query, &mut hashes); } @@ -229,7 +312,7 @@ impl HashWatchProcess { } } - fn handle_query(&self, query: Query, hashes: &mut HashMap) { + fn handle_query(&self, query: Query, hashes: &mut FileHashes) { match query { Query::GetHash(spec, tx) => { if let Some(state) = hashes.get_mut(&spec) { @@ -241,17 +324,17 @@ impl HashWatchProcess { txs.push(tx); } HashState::Unavailable(e) => { - let _ = tx.send(Err(e.clone())); + let _ = tx.send(Err(Error::HashingError(e.clone()))); } } } else { - let _ = tx.send(Err(format!("package not found: {}", spec.package_path))); + let _ = tx.send(Err(Error::UnknownPackage(spec))); } } } } - fn handle_hash_update(&self, update: HashUpdate, hashes: &mut HashMap) { + fn handle_hash_update(&self, update: HashUpdate, hashes: &mut FileHashes) { let HashUpdate { spec, version, @@ -260,8 +343,9 @@ impl HashWatchProcess { // If we have a pending hash computation, update the state. If we don't, ignore // this update if let Some(state) = hashes.get_mut(&spec) { - // If we have a pending hash computation, update the state. If we don't, ignore - // this update + // We need mutable access to 'state' to update it, as well as being able to + // extract the pending state, so we need two separate if statements + // to pull the value apart. if let HashState::Pending(existing_version, pending_queries) = state { if *existing_version == version { match result { @@ -277,7 +361,7 @@ impl HashWatchProcess { let error = e.to_string(); for pending_query in pending_queries.drain(..) { // We don't care if the client has gone away - let _ = pending_query.send(Err(error.clone())); + let _ = pending_query.send(Err(Error::HashingError(error.clone()))); } *state = HashState::Unavailable(error); } @@ -304,10 +388,7 @@ impl HashWatchProcess { let result = scm.get_package_file_hashes( &repo_root, &spec.package_path, - inputs - .as_ref() - .map(|inputs| inputs.as_slice()) - .unwrap_or_default(), + inputs.as_deref().unwrap_or_default(), telemetry, ); //let result = self.hash_package(&spec_copy); @@ -320,58 +401,49 @@ impl HashWatchProcess { version } - fn handle_file_event(&self, event: Event, hashes: &mut HashMap) { - let mut changed_packages: HashSet = HashSet::new(); - 'change_path: for path in event.paths { + fn handle_file_event( + &self, + event: Event, + hashes: &mut FileHashes, + hash_update_tx: &mpsc::Sender, + ) { + let mut changed_packages: HashSet = HashSet::new(); + for path in event.paths { let path = AbsoluteSystemPathBuf::try_from(path).expect("event path is a valid path"); let repo_relative_change_path = self .repo_root .anchor(&path) .expect("event path is in the repository"); - // TODO: better data structure to make this more efficient - for hash_spec in changed_packages.iter() { - if hash_spec - .package_path - .relation_to_path(&repo_relative_change_path) - == PathRelation::Parent - { - // We've already seen a change in a parent package, no need to check this one - continue 'change_path; - } - } - for hash_spec in hashes.keys() { - if hash_spec - .package_path - .relation_to_path(&repo_relative_change_path) - == PathRelation::Parent - { - changed_packages.insert(hash_spec.clone()); - } + // If this change is not relevant to a package, ignore it + trace!("file change at {:?}", repo_relative_change_path); + if let Some(package_path) = hashes.get_package_path(&repo_relative_change_path) { + // We have a file change in a package, and we haven't seen this package yet. + // Queue it for rehashing. + // TODO: further qualification. Which sets of inputs? Is this file .gitignored? + // We are somewhat saved here by deferring to the SCM to do the hashing. A + // change to a gitignored file will trigger a re-hash, but won't + // actually affect what the hash is. + trace!("package changed: {:?}", package_path); + changed_packages.insert(package_path.to_owned()); + } else { + trace!("Ignoring change to {repo_relative_change_path}"); } } - // let package_path = self.repo_root.anchor(&path).expect("event path is - // in the repository"); let spec = HashSpec { - // package_path, - // inputs: None, - // }; - // if let Some(state) = hashes.get_mut(&spec) { - // match state { - // HashState::Pending(count) => { - // *count += 1; - // }, - // HashState::Hash(_) => { - // *state = HashState::Pending(1); - // }, - // } - // } else { - // hashes.insert(spec, HashState::Pending(1)); - // } + // TODO: handle different sets of inputs + for package_path in changed_packages { + let spec = HashSpec { + package_path, + inputs: None, + }; + let version = self.queue_package_hash(&spec, hash_update_tx); + hashes.insert(spec, HashState::Pending(version, vec![])); + } } fn handle_package_data_update( &self, package_data: Option>, - hashes: &mut HashMap, + hashes: &mut FileHashes, hash_update_tx: &mpsc::Sender, ) { debug!("handling package data {:?}", package_data); @@ -389,7 +461,11 @@ impl HashWatchProcess { })); // We have new package data. Drop any packages we don't need anymore, add any // new ones - hashes.retain(|spec, _| package_paths.contains(&spec.package_path)); + //hashes.retain(|spec, _| package_paths.contains(&spec.package_path)); + hashes.drop_matching( + |package_path| !package_paths.contains(package_path), + "package was removed", + ); for package_path in package_paths { let spec = HashSpec { package_path, @@ -404,13 +480,7 @@ impl HashWatchProcess { } None | Some(Err(_)) => { // package data invalidated, flush everything - for (_, state) in hashes.drain() { - if let HashState::Pending(_, txs) = state { - for tx in txs { - let _ = tx.send(Err("package discovery is unavailable".to_string())); - } - } - } + hashes.drain("package discovery is unavailable"); } } } @@ -418,11 +488,14 @@ impl HashWatchProcess { #[cfg(test)] mod tests { - use std::time::{Duration, Instant}; + use std::{ + assert_matches::assert_matches, + time::{Duration, Instant}, + }; use git2::Repository; use tempfile::{tempdir, TempDir}; - use turbopath::{AbsoluteSystemPathBuf, RelativeUnixPathBuf}; + use turbopath::{AbsoluteSystemPath, AbsoluteSystemPathBuf, RelativeUnixPathBuf}; use turborepo_scm::{package_deps::GitHashes, SCM}; use crate::{ @@ -529,10 +602,28 @@ mod tests { (tmp, repo, repo_root) } + fn create_fixture_branch(repo: &Repository, repo_root: &AbsoluteSystemPath) { + // create a branch that deletes bar-file and adds baz-file to the bar package + let bar_dir = repo_root.join_components(&["packages", "bar"]); + bar_dir.join_component("bar-file").remove().unwrap(); + bar_dir + .join_component("baz-file") + .create_with_contents("baz file contents") + .unwrap(); + let current_commit = repo + .head() + .ok() + .map(|r| r.peel_to_commit().unwrap()) + .unwrap(); + repo.branch("test-branch", ¤t_commit, false).unwrap(); + repo.set_head("refs/heads/test-branch").unwrap(); + commit_all(&repo); + } + #[tokio::test] #[tracing_test::traced_test] async fn test_basic_file_changes() { - let (_tmp, repo, repo_root) = setup_fixture(); + let (_tmp, _repo, repo_root) = setup_fixture(); let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root).unwrap(); @@ -548,23 +639,25 @@ mod tests { let package_watcher = PackageWatcher::new(repo_root.clone(), recv, cookie_writer).unwrap(); let package_discovery = package_watcher.watch_discovery(); let hash_watcher = - HashWatcher::new(repo_root.clone(), package_discovery, watcher.watch(), &scm); + HashWatcher::new(repo_root.clone(), package_discovery, watcher.watch(), scm); let foo_path = repo_root.join_components(&["packages", "foo"]); - - let foo_hash = hash_watcher - .get_hash_blocking(HashSpec { + // We need to give filewatching time to do the initial scan, + // but this should resolve in short order to the expected value. + retry_get_hash( + &hash_watcher, + HashSpec { package_path: repo_root.anchor(&foo_path).unwrap(), inputs: None, - }) - .await - .unwrap(); - let expected = make_expected(vec![ - ("foo-file", "9317666a2e7b729b740c706ab79724952c97bde4"), - ("package.json", "395351bdd7167f351af3396d3225ebe97a7a4d13"), - (".gitignore", "89f9ac04aac6c8ee66e158853e7d0439b3ec782d"), - ]); - assert_eq!(foo_hash, expected); + }, + Duration::from_secs(2), + make_expected(vec![ + ("foo-file", "9317666a2e7b729b740c706ab79724952c97bde4"), + ("package.json", "395351bdd7167f351af3396d3225ebe97a7a4d13"), + (".gitignore", "89f9ac04aac6c8ee66e158853e7d0439b3ec782d"), + ]), + ) + .await; // update foo-file let foo_file_path = repo_root.join_components(&["packages", "foo", "foo-file"]); @@ -579,7 +672,35 @@ mod tests { }, Duration::from_secs(2), make_expected(vec![ - ("foo-file", "new-hash"), + ("foo-file", "5f6796bbd23dcdc9d30d07a2d8a4817c34b7f1e7"), + ("package.json", "395351bdd7167f351af3396d3225ebe97a7a4d13"), + (".gitignore", "89f9ac04aac6c8ee66e158853e7d0439b3ec782d"), + ]), + ) + .await; + + // update files in dist/ and out/ and foo-file + // verify we don't get hashes for the gitignored files + repo_root + .join_components(&["packages", "foo", "out", "some-file"]) + .create_with_contents("an ignored file") + .unwrap(); + repo_root + .join_components(&["packages", "foo", "dist", "some-other-file"]) + .create_with_contents("an ignored file") + .unwrap(); + foo_file_path + .create_with_contents("even more foo-file contents") + .unwrap(); + retry_get_hash( + &hash_watcher, + HashSpec { + package_path: repo_root.anchor(&foo_path).unwrap(), + inputs: None, + }, + Duration::from_secs(2), + make_expected(vec![ + ("foo-file", "0cb73634538618658f092cd7a3a373c243513a6a"), ("package.json", "395351bdd7167f351af3396d3225ebe97a7a4d13"), (".gitignore", "89f9ac04aac6c8ee66e158853e7d0439b3ec782d"), ]), @@ -587,6 +708,94 @@ mod tests { .await; } + #[tokio::test] + #[tracing_test::traced_test] + async fn test_switch_branch() { + let (_tmp, repo, repo_root) = setup_fixture(); + + let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root).unwrap(); + + let recv = watcher.watch(); + let cookie_writer = CookieWriter::new( + watcher.cookie_dir(), + Duration::from_millis(100), + recv.clone(), + ); + + let scm = SCM::new(&repo_root); + assert!(!scm.is_manual()); + let package_watcher = PackageWatcher::new(repo_root.clone(), recv, cookie_writer).unwrap(); + let package_discovery = package_watcher.watch_discovery(); + let hash_watcher = + HashWatcher::new(repo_root.clone(), package_discovery, watcher.watch(), scm); + + let bar_path = repo_root.join_components(&["packages", "bar"]); + + // We need to give filewatching time to do the initial scan, + // but this should resolve in short order to the expected value. + retry_get_hash( + &hash_watcher, + HashSpec { + package_path: repo_root.anchor(&bar_path).unwrap(), + inputs: None, + }, + Duration::from_secs(2), + make_expected(vec![ + ("bar-file", "b9bdb1e4875f7397b3f68c104bc249de0ecd3f8e"), + ("package.json", "b39117e03f0dbe217b957f58a2ad78b993055088"), + ]), + ) + .await; + + create_fixture_branch(&repo, &repo_root); + + retry_get_hash( + &hash_watcher, + HashSpec { + package_path: repo_root.anchor(&bar_path).unwrap(), + inputs: None, + }, + Duration::from_secs(2), + make_expected(vec![ + ("baz-file", "a5395ccf1b8966f3ea805aff0851eac13acb3540"), + ("package.json", "b39117e03f0dbe217b957f58a2ad78b993055088"), + ]), + ) + .await; + } + + #[tokio::test] + #[tracing_test::traced_test] + async fn test_non_existent_package() { + let (_tmp, _repo, repo_root) = setup_fixture(); + + let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root).unwrap(); + + let recv = watcher.watch(); + let cookie_writer = CookieWriter::new( + watcher.cookie_dir(), + Duration::from_millis(100), + recv.clone(), + ); + + let scm = SCM::new(&repo_root); + assert!(!scm.is_manual()); + let package_watcher = PackageWatcher::new(repo_root.clone(), recv, cookie_writer).unwrap(); + let package_discovery = package_watcher.watch_discovery(); + let hash_watcher = + HashWatcher::new(repo_root.clone(), package_discovery, watcher.watch(), scm); + + let non_existent_path = repo_root.join_components(&["packages", "non-existent"]); + let relative_non_existent_path = repo_root.anchor(&non_existent_path).unwrap(); + let result = hash_watcher + .get_file_hashes(HashSpec { + package_path: relative_non_existent_path.clone(), + inputs: None, + }) + .await; + assert_matches!(result, Err(crate::hash_watcher::Error::UnknownPackage(unknown_spec)) if unknown_spec.package_path == relative_non_existent_path); + } + // we don't have a signal for when hashing is complete after having made a file // change set a long timeout, but retry several times to try to hit the // success case quickly @@ -600,7 +809,7 @@ mod tests { let mut error = None; let mut last_value = None; while Instant::now() < deadline { - match hash_watcher.get_hash_blocking(spec.clone()).await { + match hash_watcher.get_file_hashes(spec.clone()).await { Ok(hashes) => { if hashes == expected { return; diff --git a/crates/turborepo-paths/src/anchored_system_path.rs b/crates/turborepo-paths/src/anchored_system_path.rs index 0eb38b4ad7258..620cd416b3bbb 100644 --- a/crates/turborepo-paths/src/anchored_system_path.rs +++ b/crates/turborepo-paths/src/anchored_system_path.rs @@ -6,7 +6,7 @@ use serde::Serialize; use crate::{AnchoredSystemPathBuf, PathError, PathRelation, RelativeUnixPathBuf}; -#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Serialize)] +#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Hash)] #[serde(transparent)] pub struct AnchoredSystemPath(Utf8Path);