Skip to content

Commit

Permalink
wire up package hash watcher
Browse files Browse the repository at this point in the history
  • Loading branch information
arlyon committed Jan 31, 2024
1 parent 2041865 commit d02a644
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 10 deletions.
1 change: 1 addition & 0 deletions crates/turborepo-filewatch/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ pub mod cookie_jar;
#[cfg(target_os = "macos")]
mod fsevent;
pub mod globwatcher;
pub mod package_hash_watcher;
pub mod package_watcher;

#[cfg(not(target_os = "macos"))]
Expand Down
15 changes: 15 additions & 0 deletions crates/turborepo-filewatch/src/package_hash_watcher.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
use futures::{stream, Stream};

pub struct PackageHashWatcher;

pub struct HashUpdate {
pub package: String,
pub task: String,
pub hash: String,
}

impl PackageHashWatcher {
pub fn subscribe(&self) -> impl Stream<Item = HashUpdate> {
stream::empty()
}
}
17 changes: 12 additions & 5 deletions crates/turborepo-lib/src/daemon/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use turbopath::{AbsoluteSystemPath, AbsoluteSystemPathBuf};
use turborepo_filewatch::{
cookie_jar::CookieJar,
globwatcher::{Error as GlobWatcherError, GlobError, GlobSet, GlobWatcher},
package_hash_watcher::PackageHashWatcher,
package_watcher::PackageWatcher,
FileSystemWatcher, WatchError,
};
Expand Down Expand Up @@ -72,6 +73,7 @@ pub struct FileWatching {
_watcher: FileSystemWatcher,
pub glob_watcher: GlobWatcher,
pub package_watcher: PackageWatcher,
pub package_hash_watcher: PackageHashWatcher,
}

#[derive(Debug, Error)]
Expand Down Expand Up @@ -121,6 +123,7 @@ async fn start_filewatching<PD: PackageDiscovery + Send + 'static>(
_watcher: watcher,
glob_watcher,
package_watcher,
package_hash_watcher: PackageHashWatcher,
})));
Ok(())
}
Expand Down Expand Up @@ -272,11 +275,15 @@ where
};

let package_discovery = Arc::new(AsyncMutex::new(package_discovery));
let package_hashes = AsyncMutex::new(WatchingPackageHasher::new(
package_discovery.clone(),
None::<LocalPackageHashes>,
Duration::from_secs(60 * 5),
));
let package_hashes = AsyncMutex::new(
WatchingPackageHasher::new(
package_discovery.clone(),
None::<LocalPackageHashes>,
Duration::from_secs(60 * 5),
watcher_rx.clone(),
)
.await,
);

// Run the actual service. It takes ownership of the struct given to it,
// so we use a private struct with just the pieces of state needed to handle
Expand Down
68 changes: 63 additions & 5 deletions crates/turborepo-lib/src/run/package_hashes/watch.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
use std::{collections::HashMap, sync::Arc, time::Duration};

use tokio::sync::Mutex;
use futures::StreamExt;
use tokio::{
sync::{watch, watch::error::RecvError, Mutex},
time::error::Elapsed,
};
use turborepo_repository::discovery::PackageDiscovery;
use turborepo_telemetry::events::generic::GenericEventBuilder;

use crate::{
daemon::FileWatching,
run::{package_hashes::PackageHasher, task_id::TaskId, Error},
task_hash::PackageInputsHashes,
};
Expand All @@ -18,18 +23,71 @@ pub struct WatchingPackageHasher<PD, PH> {
/// are logged
fallback: PH,
interval: Duration,
map: Mutex<HashMap<TaskId<'static>, String>>,
map: Arc<Mutex<HashMap<TaskId<'static>, String>>>,

watcher_rx: watch::Receiver<Option<Arc<FileWatching>>>,
}

impl<PD, PH> WatchingPackageHasher<PD, PH> {
pub fn new(discovery: Arc<Mutex<PD>>, fallback: PH, interval: Duration) -> Self {
#[derive(thiserror::Error, Debug)]
enum WaitError {
#[error(transparent)]
Elapsed(#[from] Elapsed),
#[error(transparent)]
Unavailable(#[from] RecvError),
}

impl<PD, PH: PackageHasher> WatchingPackageHasher<PD, PH> {
pub async fn new(
discovery: Arc<Mutex<PD>>,
mut fallback: PH,
interval: Duration,
watcher_rx: watch::Receiver<Option<Arc<FileWatching>>>,
) -> Self {
let map = Arc::new(Mutex::new(
fallback
.calculate_hashes(Default::default())
.await
.unwrap()
.hashes,
));

/// listen to updates from the file watcher and update the map

Check warning on line 54 in crates/turborepo-lib/src/run/package_hashes/watch.rs

View workflow job for this annotation

GitHub Actions / Build Turborepo (ubuntu, self-hosted, linux, x64, metal)

unused doc comment

Check warning on line 54 in crates/turborepo-lib/src/run/package_hashes/watch.rs

View workflow job for this annotation

GitHub Actions / Build Turborepo (macos, macos-latest)

unused doc comment

Check warning on line 54 in crates/turborepo-lib/src/run/package_hashes/watch.rs

View workflow job for this annotation

GitHub Actions / Build Turborepo (windows, windows-latest)

unused doc comment

Check warning on line 54 in crates/turborepo-lib/src/run/package_hashes/watch.rs

View workflow job for this annotation

GitHub Actions / Turborepo Integration (ubuntu-latest)

unused doc comment

Check warning on line 54 in crates/turborepo-lib/src/run/package_hashes/watch.rs

View workflow job for this annotation

GitHub Actions / Turborepo Integration (macos-latest)

unused doc comment

Check warning on line 54 in crates/turborepo-lib/src/run/package_hashes/watch.rs

View workflow job for this annotation

GitHub Actions / Turborepo Integration (windows-latest)

unused doc comment
let subscriber = tokio::task::spawn({

Check warning on line 55 in crates/turborepo-lib/src/run/package_hashes/watch.rs

View workflow job for this annotation

GitHub Actions / Build Turborepo (ubuntu, self-hosted, linux, x64, metal)

unused variable: `subscriber`

Check warning on line 55 in crates/turborepo-lib/src/run/package_hashes/watch.rs

View workflow job for this annotation

GitHub Actions / Build Turborepo (macos, macos-latest)

unused variable: `subscriber`

Check warning on line 55 in crates/turborepo-lib/src/run/package_hashes/watch.rs

View workflow job for this annotation

GitHub Actions / Turborepo Integration (ubuntu-latest)

unused variable: `subscriber`

Check warning on line 55 in crates/turborepo-lib/src/run/package_hashes/watch.rs

View workflow job for this annotation

GitHub Actions / Turborepo Integration (macos-latest)

unused variable: `subscriber`
let watcher_rx = watcher_rx.clone();
let map = map.clone();
async move {
let watch = Self::wait_for_filewatching(watcher_rx.clone())
.await
.unwrap();
let mut stream = watch.package_hash_watcher.subscribe();
while let Some(update) = stream.next().await {
let mut map = map.lock().await;
map.insert(
TaskId::new(&update.package, &update.task).into_owned(),
update.hash,
);
}
}
});

Self {
interval,
package_discovery: discovery,
fallback,
map: Default::default(),
map,
watcher_rx,
}
}

async fn wait_for_filewatching(
watcher_rx: watch::Receiver<Option<Arc<FileWatching>>>,
) -> Result<Arc<FileWatching>, WaitError> {
let mut rx = watcher_rx.clone();
let fw = tokio::time::timeout(Duration::from_secs(1), rx.wait_for(|opt| opt.is_some()))
.await??;

return Ok(fw.as_ref().expect("guaranteed some above").clone());
}
}

impl<PD: PackageDiscovery + Send, PH: PackageHasher + Send> PackageHasher
Expand Down

0 comments on commit d02a644

Please sign in to comment.