From 2cce26831022c52dac7271d9d10bc1ecdc7e65a4 Mon Sep 17 00:00:00 2001 From: Greg Soltis Date: Fri, 9 Feb 2024 10:58:34 -0800 Subject: [PATCH] fix(Turborepo): Pass validated globs to daemon for watching (#7327) ### Description - Change type signature on daemon client to require `ValidatedGlob` instances be passed - Add some debug logging to `GlobTracker` about what globs we are tracking and what file paths invalidate a glob ### Testing Instructions Existing test suite Fixes #7131 Closes TURBO-2301 --------- Co-authored-by: Greg Soltis --- crates/turborepo-filewatch/src/globwatcher.rs | 26 ++++++++++++++++--- crates/turborepo-lib/src/daemon/client.rs | 13 +++++----- crates/turborepo-lib/src/run/cache.rs | 24 ++++++++++------- 3 files changed, 44 insertions(+), 19 deletions(-) diff --git a/crates/turborepo-filewatch/src/globwatcher.rs b/crates/turborepo-filewatch/src/globwatcher.rs index 8ac2cf7518a4e..0b7f2c3f95c86 100644 --- a/crates/turborepo-filewatch/src/globwatcher.rs +++ b/crates/turborepo-filewatch/src/globwatcher.rs @@ -9,7 +9,7 @@ use std::{ use notify::Event; use thiserror::Error; use tokio::sync::{broadcast, mpsc, oneshot}; -use tracing::warn; +use tracing::{debug, warn}; use turbopath::{AbsoluteSystemPath, AbsoluteSystemPathBuf, RelativeUnixPath}; use wax::{Any, Glob, Program}; @@ -20,10 +20,19 @@ use crate::{ type Hash = String; -#[derive(Debug)] pub struct GlobSet { include: HashMap>, exclude: Any<'static>, + exclude_raw: Vec, +} + +impl std::fmt::Debug for GlobSet { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("GlobSet") + .field("include", &self.include.keys()) + .field("exclude", &self.exclude_raw) + .finish() + } } #[derive(Debug, Error)] @@ -61,6 +70,7 @@ impl GlobSet { }) .collect::, GlobError>>()?; let excludes = raw_excludes + .clone() .iter() .map(|raw_glob| { let glob = compile_glob(raw_glob)?; @@ -73,7 +83,11 @@ impl GlobSet { raw_glob: format!("{{{}}}", raw_excludes.join(",")), })? .to_owned(); - Ok(Self { include, exclude }) + Ok(Self { + include, + exclude, + exclude_raw: raw_excludes, + }) } } @@ -238,6 +252,7 @@ impl GlobTracker { glob_set, resp, } => { + debug!("watching globs {:?} for hash {}", glob_set, hash); // Assume cookie handling has happened external to this component. // Other tasks _could_ write to the // same output directories, however we are relying on task @@ -353,6 +368,7 @@ impl GlobTracker { return true; } // We didn't match an exclusion, we can remove this glob + debug!("file change at {} invalidated glob {}", path, glob_str); glob_set.include.remove(glob_str); // We removed the last include, we can stop tracking this hash @@ -465,6 +481,7 @@ mod test { let globs = GlobSet { include: make_includes(raw_includes), exclude, + exclude_raw: raw_excludes.iter().map(|s| s.to_string()).collect(), }; let hash = "the-hash".to_string(); @@ -548,6 +565,7 @@ mod test { let globs = GlobSet { include: make_includes(raw_includes), exclude: any(raw_excludes).unwrap(), + exclude_raw: raw_excludes.iter().map(|s| s.to_string()).collect(), }; let hash = "the-hash".to_string(); @@ -569,6 +587,7 @@ mod test { let second_globs = GlobSet { include: make_includes(second_raw_includes), exclude: any(second_raw_excludes).unwrap(), + exclude_raw: second_raw_excludes.iter().map(|s| s.to_string()).collect(), }; let second_hash = "the-second-hash".to_string(); glob_watcher @@ -647,6 +666,7 @@ mod test { let globs = GlobSet { include: make_includes(raw_includes), exclude: any(raw_excludes).unwrap(), + exclude_raw: raw_excludes.iter().map(|s| s.to_string()).collect(), }; let hash = "the-hash".to_string(); diff --git a/crates/turborepo-lib/src/daemon/client.rs b/crates/turborepo-lib/src/daemon/client.rs index a3f1b2bfbfb36..689f9d6a6e198 100644 --- a/crates/turborepo-lib/src/daemon/client.rs +++ b/crates/turborepo-lib/src/daemon/client.rs @@ -1,5 +1,6 @@ use std::io; +use globwalk::ValidatedGlob; use thiserror::Error; use tonic::{Code, Status}; use tracing::info; @@ -71,11 +72,11 @@ impl DaemonClient { pub async fn get_changed_outputs( &mut self, hash: String, - output_globs: Vec, + output_globs: &[ValidatedGlob], ) -> Result, DaemonError> { let output_globs = output_globs .iter() - .map(|raw_glob| format_repo_relative_glob(raw_glob)) + .map(|validated_glob| validated_glob.as_str().to_string()) .collect(); Ok(self .client @@ -88,17 +89,17 @@ impl DaemonClient { pub async fn notify_outputs_written( &mut self, hash: String, - output_globs: Vec, - output_exclusion_globs: Vec, + output_globs: &[ValidatedGlob], + output_exclusion_globs: &[ValidatedGlob], time_saved: u64, ) -> Result<(), DaemonError> { let output_globs = output_globs .iter() - .map(|raw_glob| format_repo_relative_glob(raw_glob)) + .map(|validated_glob| validated_glob.as_str().to_string()) .collect(); let output_exclusion_globs = output_exclusion_globs .iter() - .map(|raw_glob| format_repo_relative_glob(raw_glob)) + .map(|validated_glob| validated_glob.as_str().to_string()) .collect(); self.client .notify_outputs_written(proto::NotifyOutputsWrittenRequest { diff --git a/crates/turborepo-lib/src/run/cache.rs b/crates/turborepo-lib/src/run/cache.rs index c713abe2fa9c9..f73c8d7e5e134 100644 --- a/crates/turborepo-lib/src/run/cache.rs +++ b/crates/turborepo-lib/src/run/cache.rs @@ -204,12 +204,11 @@ impl TaskCache { return Ok(None); } + let validated_inclusions = self.repo_relative_globs.validated_inclusions()?; + let changed_output_count = if let Some(daemon_client) = &mut self.daemon_client { match daemon_client - .get_changed_outputs( - self.hash.to_string(), - self.repo_relative_globs.inclusions.clone(), - ) + .get_changed_outputs(self.hash.to_string(), &validated_inclusions) .await { Ok(changed_output_globs) => changed_output_globs.len(), @@ -256,11 +255,14 @@ impl TaskCache { self.expanded_outputs = restored_files; if let Some(daemon_client) = &mut self.daemon_client { + // Do we want to error the process if we can't parse the globs? We probably + // won't have even gotten this far if this fails... + let validated_exclusions = self.repo_relative_globs.validated_exclusions()?; if let Err(err) = daemon_client .notify_outputs_written( self.hash.clone(), - self.repo_relative_globs.inclusions.clone(), - self.repo_relative_globs.exclusions.clone(), + &validated_inclusions, + &validated_exclusions, cache_hit_metadata.time_saved, ) .await @@ -323,10 +325,12 @@ impl TaskCache { debug!("caching outputs: outputs: {:?}", &self.repo_relative_globs); + let validated_inclusions = self.repo_relative_globs.validated_inclusions()?; + let validated_exclusions = self.repo_relative_globs.validated_exclusions()?; let files_to_be_cached = globwalk::globwalk( &self.run_cache.repo_root, - &self.repo_relative_globs.validated_inclusions()?, - &self.repo_relative_globs.validated_exclusions()?, + &validated_inclusions, + &validated_exclusions, globwalk::WalkType::All, )?; @@ -351,8 +355,8 @@ impl TaskCache { let notify_result = daemon_client .notify_outputs_written( self.hash.to_string(), - self.repo_relative_globs.inclusions.clone(), - self.repo_relative_globs.exclusions.clone(), + &validated_inclusions, + &validated_exclusions, duration.as_millis() as u64, ) .await