Skip to content

Commit

Permalink
fix(Turborepo): Pass validated globs to daemon for watching (#7327)
Browse files Browse the repository at this point in the history
### Description

- Change type signature on daemon client to require `ValidatedGlob`
instances be passed
- Add some debug logging to `GlobTracker` about what globs we are
tracking and what file paths invalidate a glob

### Testing Instructions

Existing test suite

Fixes #7131 

Closes TURBO-2301

---------

Co-authored-by: Greg Soltis <Greg Soltis>
  • Loading branch information
Greg Soltis authored Feb 9, 2024
1 parent d9639a4 commit 2cce268
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 19 deletions.
26 changes: 23 additions & 3 deletions crates/turborepo-filewatch/src/globwatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::{
use notify::Event;
use thiserror::Error;
use tokio::sync::{broadcast, mpsc, oneshot};
use tracing::warn;
use tracing::{debug, warn};
use turbopath::{AbsoluteSystemPath, AbsoluteSystemPathBuf, RelativeUnixPath};
use wax::{Any, Glob, Program};

Expand All @@ -20,10 +20,19 @@ use crate::{

type Hash = String;

#[derive(Debug)]
pub struct GlobSet {
include: HashMap<String, wax::Glob<'static>>,
exclude: Any<'static>,
exclude_raw: Vec<String>,
}

impl std::fmt::Debug for GlobSet {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("GlobSet")
.field("include", &self.include.keys())
.field("exclude", &self.exclude_raw)
.finish()
}
}

#[derive(Debug, Error)]
Expand Down Expand Up @@ -61,6 +70,7 @@ impl GlobSet {
})
.collect::<Result<HashMap<_, _>, GlobError>>()?;
let excludes = raw_excludes
.clone()
.iter()
.map(|raw_glob| {
let glob = compile_glob(raw_glob)?;
Expand All @@ -73,7 +83,11 @@ impl GlobSet {
raw_glob: format!("{{{}}}", raw_excludes.join(",")),
})?
.to_owned();
Ok(Self { include, exclude })
Ok(Self {
include,
exclude,
exclude_raw: raw_excludes,
})
}
}

Expand Down Expand Up @@ -238,6 +252,7 @@ impl GlobTracker {
glob_set,
resp,
} => {
debug!("watching globs {:?} for hash {}", glob_set, hash);
// Assume cookie handling has happened external to this component.
// Other tasks _could_ write to the
// same output directories, however we are relying on task
Expand Down Expand Up @@ -353,6 +368,7 @@ impl GlobTracker {
return true;
}
// We didn't match an exclusion, we can remove this glob
debug!("file change at {} invalidated glob {}", path, glob_str);
glob_set.include.remove(glob_str);

// We removed the last include, we can stop tracking this hash
Expand Down Expand Up @@ -465,6 +481,7 @@ mod test {
let globs = GlobSet {
include: make_includes(raw_includes),
exclude,
exclude_raw: raw_excludes.iter().map(|s| s.to_string()).collect(),
};

let hash = "the-hash".to_string();
Expand Down Expand Up @@ -548,6 +565,7 @@ mod test {
let globs = GlobSet {
include: make_includes(raw_includes),
exclude: any(raw_excludes).unwrap(),
exclude_raw: raw_excludes.iter().map(|s| s.to_string()).collect(),
};

let hash = "the-hash".to_string();
Expand All @@ -569,6 +587,7 @@ mod test {
let second_globs = GlobSet {
include: make_includes(second_raw_includes),
exclude: any(second_raw_excludes).unwrap(),
exclude_raw: second_raw_excludes.iter().map(|s| s.to_string()).collect(),
};
let second_hash = "the-second-hash".to_string();
glob_watcher
Expand Down Expand Up @@ -647,6 +666,7 @@ mod test {
let globs = GlobSet {
include: make_includes(raw_includes),
exclude: any(raw_excludes).unwrap(),
exclude_raw: raw_excludes.iter().map(|s| s.to_string()).collect(),
};

let hash = "the-hash".to_string();
Expand Down
13 changes: 7 additions & 6 deletions crates/turborepo-lib/src/daemon/client.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::io;

use globwalk::ValidatedGlob;
use thiserror::Error;
use tonic::{Code, Status};
use tracing::info;
Expand Down Expand Up @@ -71,11 +72,11 @@ impl<T> DaemonClient<T> {
pub async fn get_changed_outputs(
&mut self,
hash: String,
output_globs: Vec<String>,
output_globs: &[ValidatedGlob],
) -> Result<Vec<String>, DaemonError> {
let output_globs = output_globs
.iter()
.map(|raw_glob| format_repo_relative_glob(raw_glob))
.map(|validated_glob| validated_glob.as_str().to_string())
.collect();
Ok(self
.client
Expand All @@ -88,17 +89,17 @@ impl<T> DaemonClient<T> {
pub async fn notify_outputs_written(
&mut self,
hash: String,
output_globs: Vec<String>,
output_exclusion_globs: Vec<String>,
output_globs: &[ValidatedGlob],
output_exclusion_globs: &[ValidatedGlob],
time_saved: u64,
) -> Result<(), DaemonError> {
let output_globs = output_globs
.iter()
.map(|raw_glob| format_repo_relative_glob(raw_glob))
.map(|validated_glob| validated_glob.as_str().to_string())
.collect();
let output_exclusion_globs = output_exclusion_globs
.iter()
.map(|raw_glob| format_repo_relative_glob(raw_glob))
.map(|validated_glob| validated_glob.as_str().to_string())
.collect();
self.client
.notify_outputs_written(proto::NotifyOutputsWrittenRequest {
Expand Down
24 changes: 14 additions & 10 deletions crates/turborepo-lib/src/run/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,12 +204,11 @@ impl TaskCache {
return Ok(None);
}

let validated_inclusions = self.repo_relative_globs.validated_inclusions()?;

let changed_output_count = if let Some(daemon_client) = &mut self.daemon_client {
match daemon_client
.get_changed_outputs(
self.hash.to_string(),
self.repo_relative_globs.inclusions.clone(),
)
.get_changed_outputs(self.hash.to_string(), &validated_inclusions)
.await
{
Ok(changed_output_globs) => changed_output_globs.len(),
Expand Down Expand Up @@ -256,11 +255,14 @@ impl TaskCache {
self.expanded_outputs = restored_files;

if let Some(daemon_client) = &mut self.daemon_client {
// Do we want to error the process if we can't parse the globs? We probably
// won't have even gotten this far if this fails...
let validated_exclusions = self.repo_relative_globs.validated_exclusions()?;
if let Err(err) = daemon_client
.notify_outputs_written(
self.hash.clone(),
self.repo_relative_globs.inclusions.clone(),
self.repo_relative_globs.exclusions.clone(),
&validated_inclusions,
&validated_exclusions,
cache_hit_metadata.time_saved,
)
.await
Expand Down Expand Up @@ -323,10 +325,12 @@ impl TaskCache {

debug!("caching outputs: outputs: {:?}", &self.repo_relative_globs);

let validated_inclusions = self.repo_relative_globs.validated_inclusions()?;
let validated_exclusions = self.repo_relative_globs.validated_exclusions()?;
let files_to_be_cached = globwalk::globwalk(
&self.run_cache.repo_root,
&self.repo_relative_globs.validated_inclusions()?,
&self.repo_relative_globs.validated_exclusions()?,
&validated_inclusions,
&validated_exclusions,
globwalk::WalkType::All,
)?;

Expand All @@ -351,8 +355,8 @@ impl TaskCache {
let notify_result = daemon_client
.notify_outputs_written(
self.hash.to_string(),
self.repo_relative_globs.inclusions.clone(),
self.repo_relative_globs.exclusions.clone(),
&validated_inclusions,
&validated_exclusions,
duration.as_millis() as u64,
)
.await
Expand Down

0 comments on commit 2cce268

Please sign in to comment.