Skip to content

Commit

Permalink
feat(Turborepo): add a debouncer to file hash watching
Browse files Browse the repository at this point in the history
  • Loading branch information
Greg Soltis authored and Greg Soltis committed Apr 4, 2024
1 parent 0979cb4 commit f77c042
Showing 1 changed file with 167 additions and 27 deletions.
194 changes: 167 additions & 27 deletions crates/turborepo-filewatch/src/hash_watcher.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
use std::{
collections::{hash_map::Entry, HashMap, HashSet},
sync::Arc,
sync::{Arc, Mutex},
time::Duration,
};

use notify::Event;
use radix_trie::{Trie, TrieCommon};
use thiserror::Error;
use tokio::{
select,
sync::{broadcast, mpsc, oneshot, watch},
sync::{self, broadcast, mpsc, oneshot, watch},
time::Instant,
};
use tracing::{debug, trace};
use turbopath::{AbsoluteSystemPathBuf, AnchoredSystemPath, AnchoredSystemPathBuf};
Expand Down Expand Up @@ -120,9 +122,94 @@ impl Version {
}
}

struct HashDebouncer {
bump: sync::Notify,
serial: Mutex<Option<usize>>,
timeout: Duration,
}

const DEFAULT_DEBOUNCE_TIMEOUT: Duration = Duration::from_millis(10);

impl Default for HashDebouncer {
fn default() -> Self {
Self::new(DEFAULT_DEBOUNCE_TIMEOUT)
}
}

impl HashDebouncer {
fn new(timeout: Duration) -> Self {
let bump = sync::Notify::new();
let serial = Mutex::new(Some(0));
Self {
bump,
serial,
timeout,
}
}

fn bump(&self) -> bool {
let mut serial = self.serial.lock().expect("lock is valid");
match *serial {
None => false,
Some(previous) => {
*serial = Some(previous + 1);
self.bump.notify_one();
true
}
}
}

async fn debounce(&self) {
let mut serial = {
self.serial
.lock()
.expect("lock is valid")
.expect("only this thread sets the value to None")
};
let mut deadline = Instant::now() + self.timeout;
loop {
let timeout = tokio::time::sleep_until(deadline);
select! {
_ = self.bump.notified() => {
debug!("debouncer notified");
// reset timeout
let current_serial = self.serial.lock().expect("lock is valid").expect("only this thread sets the value to None");
if current_serial == serial {
// we timed out between the serial update and the notification.
// ignore this notification, we've already bumped the timer
continue;
} else {
serial = current_serial;
deadline = Instant::now() + self.timeout;
}
}
_ = timeout => {
// check if serial is still valid. It's possible a bump came in before the timeout,
// but we haven't been notified yet.
let mut current_serial_opt = self.serial.lock().expect("lock is valid");
let current_serial = current_serial_opt.expect("only this thread sets the value to None");
if current_serial == serial {
// if the serial is what we last observed, and the timer expired, we timed out.
// we're done. Mark that we won't accept any more bumps and return
*current_serial_opt = None;
return;
} else {
serial = current_serial;
deadline = Instant::now() + self.timeout;
}
}
}
}
}
}

enum HashState {
Hashes(GitHashes),
Pending(Version, Vec<oneshot::Sender<Result<GitHashes, Error>>>),
Pending(
Version,
Arc<HashDebouncer>,
Vec<oneshot::Sender<Result<GitHashes, Error>>>,
),
Unavailable(String),
}
// We use a radix_trie to store hashes so that we can quickly match a file path
Expand Down Expand Up @@ -168,7 +255,7 @@ impl FileHashes {
self.0.insert(key, previous_value);
} else {
for state in previous_value.into_values() {
if let HashState::Pending(_, txs) = state {
if let HashState::Pending(_, _, txs) = state {
for tx in txs {
let _ = tx.send(Err(Error::Unavailable(reason.to_string())));
}
Expand Down Expand Up @@ -348,7 +435,7 @@ impl HashWatchProcess {
HashState::Hashes(hashes) => {
tx.send(Ok(hashes.clone())).unwrap();
}
HashState::Pending(_, txs) => {
HashState::Pending(_, _, txs) => {
txs.push(tx);
}
HashState::Unavailable(e) => {
Expand All @@ -374,7 +461,7 @@ impl HashWatchProcess {
// We need mutable access to 'state' to update it, as well as being able to
// extract the pending state, so we need two separate if statements
// to pull the value apart.
if let HashState::Pending(existing_version, pending_queries) = state {
if let HashState::Pending(existing_version, _, pending_queries) = state {
if *existing_version == version {
match result {
Ok(hashes) => {
Expand Down Expand Up @@ -403,30 +490,35 @@ impl HashWatchProcess {
&self,
spec: &HashSpec,
hash_update_tx: &mpsc::Sender<HashUpdate>,
) -> Version {
) -> (Version, Arc<HashDebouncer>) {
let version = Version::new();
let version_copy = version.clone();
let tx = hash_update_tx.clone();
let spec = spec.clone();
let repo_root = self.repo_root.clone();
let scm = self.scm.clone();
// Package hashing involves blocking IO calls, so run on a blocking thread.
tokio::task::spawn_blocking(move || {
let telemetry = None;
let inputs = spec.inputs.as_ref().map(|globs| globs.as_inputs());
let result = scm.get_package_file_hashes(
&repo_root,
&spec.package_path,
inputs.as_deref().unwrap_or_default(),
telemetry,
);
let _ = tx.blocking_send(HashUpdate {
spec,
version: version_copy,
result,
let debouncer = Arc::new(HashDebouncer::default());
let debouncer_copy = debouncer.clone();
tokio::task::spawn(async move {
debouncer_copy.debounce().await;
// Package hashing involves blocking IO calls, so run on a blocking thread.
tokio::task::spawn_blocking(move || {
let telemetry = None;
let inputs = spec.inputs.as_ref().map(|globs| globs.as_inputs());
let result = scm.get_package_file_hashes(
&repo_root,
&spec.package_path,
inputs.as_deref().unwrap_or_default(),
telemetry,
);
let _ = tx.blocking_send(HashUpdate {
spec,
version: version_copy,
result,
});
});
});
version
(version, debouncer)
}

fn handle_file_event(
Expand Down Expand Up @@ -463,8 +555,33 @@ impl HashWatchProcess {
package_path,
inputs: None,
};
let version = self.queue_package_hash(&spec, hash_update_tx);
hashes.insert(spec, HashState::Pending(version, vec![]));
match hashes.get_mut(&spec) {
// Technically this shouldn't happen, the package_paths are sourced from keys in
// hashes.
None => {
let (version, debouncer) = self.queue_package_hash(&spec, hash_update_tx);
hashes.insert(spec, HashState::Pending(version, debouncer, vec![]));
}
Some(entry) => {
if let HashState::Pending(_, debouncer, txs) = entry {
if !debouncer.bump() {
// we failed to bump the debouncer, the hash must already be in
// progress. Drop this calculation and start
// a new one
let (version, debouncer) =
self.queue_package_hash(&spec, hash_update_tx);
let mut swap_target = vec![];
std::mem::swap(txs, &mut swap_target);
*entry = HashState::Pending(version, debouncer, swap_target);
}
} else {
// it's not a pending hash calculation, overwrite the entry with a new
// pending calculation
let (version, debouncer) = self.queue_package_hash(&spec, hash_update_tx);
*entry = HashState::Pending(version, debouncer, vec![]);
}
}
}
}
}

Expand Down Expand Up @@ -500,8 +617,8 @@ impl HashWatchProcess {
inputs: None,
};
if !hashes.contains_key(&spec) {
let version = self.queue_package_hash(&spec, hash_update_tx);
hashes.insert(spec, HashState::Pending(version, vec![]));
let (version, debouncer) = self.queue_package_hash(&spec, hash_update_tx);
hashes.insert(spec, HashState::Pending(version, debouncer, vec![]));
}
}
tracing::debug!("received package discovery data: {:?}", data);
Expand All @@ -518,6 +635,7 @@ impl HashWatchProcess {
mod tests {
use std::{
assert_matches::assert_matches,
sync::Arc,
time::{Duration, Instant},
};

Expand All @@ -528,7 +646,7 @@ mod tests {

use crate::{
cookies::CookieWriter,
hash_watcher::{HashSpec, HashWatcher},
hash_watcher::{HashDebouncer, HashSpec, HashWatcher},
package_watcher::PackageWatcher,
FileSystemWatcher,
};
Expand Down Expand Up @@ -863,4 +981,26 @@ mod tests {
}
map
}

#[tokio::test]
async fn test_debouncer() {
let debouncer = Arc::new(HashDebouncer::new(Duration::from_millis(10)));
let debouncer_copy = debouncer.clone();
let handle = tokio::task::spawn(async move {
debouncer_copy.debounce().await;
});
for _ in 0..10 {
// assert that we can continue bumping it past the original timeout
tokio::time::sleep(Duration::from_millis(2)).await;
assert!(debouncer.bump());
}
let start = Instant::now();
handle.await.unwrap();
let end = Instant::now();
// give some wiggle room to account for race conditions, but assert that we
// didn't immediately complete after the last bump
assert!(end - start > Duration::from_millis(5));
// we shouldn't be able to bump it after it's run out it's timeout
assert!(!debouncer.bump());
}
}

0 comments on commit f77c042

Please sign in to comment.