diff --git a/crates/turborepo-filewatch/src/cookie_jar.rs b/crates/turborepo-filewatch/src/cookie_jar.rs deleted file mode 100644 index 4e6873f0f4730..0000000000000 --- a/crates/turborepo-filewatch/src/cookie_jar.rs +++ /dev/null @@ -1,338 +0,0 @@ -use std::{ - collections::HashMap, - fs::OpenOptions, - path::PathBuf, - sync::{atomic::AtomicUsize, Arc, Mutex}, - time::Duration, -}; - -use notify::{Event, EventKind}; -use thiserror::Error; -use tokio::{ - sync::{broadcast, oneshot}, - time::error::Elapsed, -}; -use tracing::{debug, trace}; -use turbopath::{AbsoluteSystemPath, AbsoluteSystemPathBuf, PathRelation}; - -use crate::NotifyError; - -#[derive(Clone, Debug, Error)] -pub enum WatchError { - #[error(transparent)] - RecvError(#[from] broadcast::error::RecvError), - #[error("filewatching encountered errors: {0}")] - NotifyError(#[from] NotifyError), - #[error("filewatching has closed, cannot watch cookies")] - Closed, -} - -#[derive(Debug, Error)] -pub enum CookieError { - #[error(transparent)] - Watch(#[from] WatchError), - #[error("cookie timeout expired")] - Timeout(#[from] Elapsed), - #[error("failed to receiver cookie notification: {0}")] - RecvError(#[from] oneshot::error::RecvError), - #[error("failed to write cookie file at {path}: {io_err}")] - IO { - io_err: std::io::Error, - path: AbsoluteSystemPathBuf, - }, -} - -type CookieResponse = Result<(), WatchError>; - -pub struct CookieJar { - root: AbsoluteSystemPathBuf, - serial: AtomicUsize, - timeout: Duration, - watches: Arc>, - // _exit_ch exists to trigger a close on the receiver when an instance - // of this struct is dropped. The task that is receiving events will exit, - // dropping the other sender for the broadcast channel, causing all receivers - // to be notified of a close. - _exit_ch: tokio::sync::oneshot::Sender<()>, -} - -#[derive(Default)] -struct Watches { - closed: bool, - cookies: HashMap>, -} - -impl CookieJar { - pub fn new( - root: &AbsoluteSystemPath, - timeout: Duration, - file_events: broadcast::Receiver>, - ) -> Self { - let (exit_ch, exit_signal) = tokio::sync::oneshot::channel(); - let watches = Arc::new(Mutex::new(Watches::default())); - tokio::spawn(watch_cookies( - root.to_owned(), - watches.clone(), - file_events, - exit_signal, - )); - Self { - root: root.to_owned(), - serial: AtomicUsize::new(0), - timeout, - watches, - _exit_ch: exit_ch, - } - } - - pub async fn wait_for_cookie(&self) -> Result<(), CookieError> { - let serial = self - .serial - .fetch_add(1, std::sync::atomic::Ordering::SeqCst); - let cookie_path = self.root.join_component(&format!("{}.cookie", serial)); - let (tx, rx) = oneshot::channel(); - { - let mut watches = self.watches.lock().expect("mutex poisoned"); - if watches.closed { - return Err(CookieError::Watch(WatchError::Closed)); - } - watches - .cookies - .insert(cookie_path.as_std_path().to_owned(), tx); - } - let mut opts = OpenOptions::new(); - opts.truncate(true).create(true).write(true); - { - // dropping the resulting file closes the handle - trace!("writing cookie {}", cookie_path); - _ = cookie_path - .ensure_dir() - .and_then(|_| cookie_path.open_with_options(opts)) - .map_err(|io_err| CookieError::IO { - io_err, - path: cookie_path.clone(), - })?; - } - // ??? -> timeout, recv failure, actual cookie failure - tokio::time::timeout(self.timeout, rx).await???; - Ok(()) - } -} - -async fn watch_cookies( - root: AbsoluteSystemPathBuf, - watches: Arc>, - mut file_events: broadcast::Receiver>, - mut exit_signal: tokio::sync::oneshot::Receiver<()>, -) { - loop { - tokio::select! { - _ = &mut exit_signal => return, - event = file_events.recv() => { - match flatten_event(event) { - Ok(event) => { - if matches!(event.kind, EventKind::Create(_)) { - let mut watches = watches.lock().expect("mutex poisoned"); - for path in event.paths { - let abs_path: &AbsoluteSystemPath = path - .as_path() - .try_into() - .expect("Non-absolute path from filewatching"); - if root.relation_to_path(abs_path) == PathRelation::Parent { - trace!("saw cookie: {}", abs_path); - if let Some(responder) = watches.cookies.remove(&path) { - if responder.send(Ok(())).is_err() { - // Note that cookie waiters will time out if they don't get a - // response, so we don't necessarily - // need to panic here, although we could decide to do that in the - // future. - debug!("failed to notify cookie waiter of cookie success"); - } - } - } - } - } - } - Err(e) => { - // we got an error, notify all waiters that their cookie failed - let is_closing = matches!( - e, - WatchError::RecvError(broadcast::error::RecvError::Closed) - ); - let resp = if is_closing { WatchError::Closed } else { e }; - let mut watches = watches.lock().expect("mutex poisoned"); - for (_, sender) in watches.cookies.drain() { - if sender.send(Err(resp.clone())).is_err() { - // Note that cookie waiters will time out if they don't get a response, so - // we don't necessarily need to panic here, although - // we could decide to do that in the future. - debug!("failed to notify cookie waiter of error: {}", resp); - } - } - if is_closing { - watches.closed = true; - return; - } - } - } - } - } - } -} - -// result flattening is an unstable feature, so add a manual helper to do so. -// This version is for unwrapping events coming from filewatching -fn flatten_event( - event: Result, broadcast::error::RecvError>, -) -> Result { - Ok(event??) -} - -#[cfg(test)] -mod test { - use std::{assert_matches::assert_matches, sync::Arc, time::Duration}; - - use notify::{event::CreateKind, ErrorKind, Event, EventKind}; - use tokio::{sync::broadcast, time}; - use turbopath::{AbsoluteSystemPath, AbsoluteSystemPathBuf}; - - use crate::{ - cookie_jar::{CookieError, CookieJar, WatchError}, - NotifyError, - }; - - async fn ensure_tracked(cookie_jar: &CookieJar, path: &AbsoluteSystemPath) { - let path = path.as_std_path(); - let mut interval = time::interval(Duration::from_millis(2)); - for _i in 0..50 { - interval.tick().await; - let watches = cookie_jar.watches.lock().expect("mutex poisoned"); - if watches.cookies.contains_key(path) { - return; - } - } - panic!("failed to find path in cookie_jar") - } - - #[tokio::test(flavor = "multi_thread")] - async fn test_wait_for_cookie() { - let tempdir = tempfile::tempdir().unwrap(); - let path = AbsoluteSystemPathBuf::try_from(tempdir.path()) - .unwrap() - .to_realpath() - .unwrap(); - - let (send_file_events, file_events) = broadcast::channel(16); - - let cookie_jar = CookieJar::new(&path, Duration::from_millis(100), file_events); - let cookie_path = path.join_component("0.cookie"); - tokio_scoped::scope(|scope| { - scope.spawn(async { cookie_jar.wait_for_cookie().await.unwrap() }); - - scope.block_on(ensure_tracked(&cookie_jar, &cookie_path)); - - send_file_events - .send(Ok(Event { - kind: EventKind::Create(CreateKind::File), - paths: vec![cookie_path.as_std_path().to_owned()], - ..Default::default() - })) - .unwrap(); - }); - } - - #[tokio::test(flavor = "multi_thread")] - async fn test_wait_for_cookie_after_close() { - let tempdir = tempfile::tempdir().unwrap(); - let path = AbsoluteSystemPathBuf::try_from(tempdir.path()) - .unwrap() - .to_realpath() - .unwrap(); - - let (send_file_events, file_events) = broadcast::channel(16); - - let cookie_jar = CookieJar::new(&path, Duration::from_millis(1000), file_events); - tokio_scoped::scope(|scope| { - scope.spawn(async { - let result = cookie_jar.wait_for_cookie().await; - assert_matches!(result, Err(CookieError::Watch(WatchError::Closed))); - }); - // We don't care whether or not we're tracking the cookie yet, either codepath - // should result in the same error - - // Dropping the [last, only] sender closes the channel, which closes - // the loop watching the cookie folder - drop(send_file_events); - }); - } - - #[tokio::test(flavor = "multi_thread")] - async fn test_wait_for_cookie_timeout() { - let tempdir = tempfile::tempdir().unwrap(); - let path = AbsoluteSystemPathBuf::try_from(tempdir.path()) - .unwrap() - .to_realpath() - .unwrap(); - - let (_send_file_events, file_events) = broadcast::channel(16); - - let cookie_jar = CookieJar::new(&path, Duration::from_millis(10), file_events); - tokio_scoped::scope(|scope| { - scope.spawn(async { - let result = cookie_jar.wait_for_cookie().await; - assert_matches!(result, Err(CookieError::Timeout(_))); - }); - - // Don't send any events, expect to timeout. - }); - } - - #[tokio::test(flavor = "multi_thread")] - async fn test_wait_for_cookie_with_error() { - let tempdir = tempfile::tempdir().unwrap(); - let path = AbsoluteSystemPathBuf::try_from(tempdir.path()) - .unwrap() - .to_realpath() - .unwrap(); - - let (send_file_events, file_events) = broadcast::channel(16); - - let cookie_jar = CookieJar::new(&path, Duration::from_millis(10), file_events); - let cookie_path = path.join_component("0.cookie"); - tokio_scoped::scope(|scope| { - scope.spawn(async { - let result = cookie_jar.wait_for_cookie().await; - assert_matches!(result, Err(CookieError::Watch(WatchError::NotifyError(_)))); - }); - - scope.block_on(ensure_tracked(&cookie_jar, &cookie_path)); - - // send an error, assert that we fail to get our cookie - send_file_events - .send(Err(NotifyError(Arc::new(notify::Error { - kind: ErrorKind::Generic("test error".to_string()), - paths: vec![cookie_path.as_std_path().to_owned()], - })))) - .unwrap(); - }); - - let cookie_path = path.join_component("1.cookie"); - tokio_scoped::scope(|scope| { - scope.spawn(async { - cookie_jar.wait_for_cookie().await.unwrap(); - }); - - scope.block_on(ensure_tracked(&cookie_jar, &cookie_path)); - - // ensure that we can still wait for new cookies even though an error occurred - // previously - send_file_events - .send(Ok(Event { - kind: EventKind::Create(CreateKind::File), - paths: vec![cookie_path.as_std_path().to_owned()], - ..Default::default() - })) - .unwrap(); - }); - } -} diff --git a/crates/turborepo-filewatch/src/cookies.rs b/crates/turborepo-filewatch/src/cookies.rs new file mode 100644 index 0000000000000..41058b5f16bc2 --- /dev/null +++ b/crates/turborepo-filewatch/src/cookies.rs @@ -0,0 +1,406 @@ +use std::{collections::BinaryHeap, fs::OpenOptions, time::Duration}; + +use notify::EventKind; +use thiserror::Error; +use tokio::{ + sync::{mpsc, oneshot}, + time::error::Elapsed, +}; +use tracing::trace; +use turbopath::{AbsoluteSystemPath, AbsoluteSystemPathBuf, PathRelation}; + +#[derive(Debug, Error)] +pub enum CookieError { + #[error("cookie timeout expired")] + Timeout(#[from] Elapsed), + #[error("failed to receiver cookie notification: {0}")] + RecvError(#[from] oneshot::error::RecvError), + #[error("failed to send cookie file request: {0}")] + SendError(#[from] mpsc::error::SendError>>), + #[error("failed to write cookie file at {path}: {io_err}")] + IO { + io_err: std::io::Error, + path: AbsoluteSystemPathBuf, + }, +} + +/// CookieWriter is responsible for assigning filesystem cookies to a request +/// for a downstream, filewatching-backed service. +#[derive(Clone)] +pub struct CookieWriter { + root: AbsoluteSystemPathBuf, + timeout: Duration, + cookie_request_tx: mpsc::Sender>>, + // _exit_ch exists to trigger a close on the receiver when all instances + // of this struct are dropped. The task that is receiving events will exit, + // dropping the other sender for the broadcast channel, causing all receivers + // to be notified of a close. + _exit_ch: mpsc::Sender<()>, +} + +#[derive(Debug)] +pub struct CookiedRequest { + request: T, + serial: usize, +} + +impl PartialEq for CookiedRequest { + fn eq(&self, other: &Self) -> bool { + self.serial == other.serial + } +} + +impl Eq for CookiedRequest {} + +impl PartialOrd for CookiedRequest { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for CookiedRequest { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + // Lower serials should be sorted higher, since the heap pops the highest values + // first + other.serial.cmp(&self.serial) + } +} + +/// CookieWatcher is used by downstream filewatching-backed services to +/// know when it is safe to handle a particular request. +pub(crate) struct CookieWatcher { + root: AbsoluteSystemPathBuf, + // We don't necessarily get requests in serial-order, but we want to keep them + // in order so we don't have to scan all requests every time we get a new cookie. + pending_requests: BinaryHeap>, + latest: usize, +} + +impl CookieWatcher { + pub(crate) fn new(root: AbsoluteSystemPathBuf) -> Self { + Self { + root, + pending_requests: BinaryHeap::new(), + latest: 0, + } + } + + /// Check if this request can be handled immediately. If so, return it. If + /// not, queue it + pub(crate) fn check_request(&mut self, cookied_request: CookiedRequest) -> Option { + if cookied_request.serial <= self.latest { + // We've already seen the cookie for this request, handle it now + Some(cookied_request.request) + } else { + // We haven't seen the cookie for this request yet, hold onto it + self.pending_requests.push(cookied_request); + None + } + } + + /// If this is a cookie file, pop all requests that are ready to be handled. + /// The returned vector might be empty if this was a cookie file but we + /// don't have any requests that are ready to be handled. None is + /// returned if this is not a cookie file being created. + pub(crate) fn pop_ready_requests( + &mut self, + event_kind: EventKind, + path: &AbsoluteSystemPath, + ) -> Option> { + if !matches!(event_kind, EventKind::Create(_)) { + return None; + } + if let Some(serial) = self.serial_for_path(path) { + self.latest = serial; + let mut ready_requests = Vec::new(); + while let Some(cookied_request) = self.pending_requests.pop() { + if cookied_request.serial <= serial { + ready_requests.push(cookied_request.request); + } else { + self.pending_requests.push(cookied_request); + break; + } + } + Some(ready_requests) + } else { + None + } + } + + fn serial_for_path(&self, path: &AbsoluteSystemPath) -> Option { + if self.root.relation_to_path(path) == PathRelation::Parent { + let filename = path.file_name()?; + filename.strip_suffix(".cookie")?.parse().ok() + } else { + None + } + } +} + +impl CookieWriter { + pub fn new(root: &AbsoluteSystemPath, timeout: Duration) -> Self { + let (exit_ch, exit_signal) = mpsc::channel(16); + let (cookie_requests_tx, cookie_requests_rx) = mpsc::channel(16); + tokio::spawn(watch_cookies( + root.to_owned(), + cookie_requests_rx, + exit_signal, + )); + Self { + root: root.to_owned(), + timeout, + cookie_request_tx: cookie_requests_tx, + _exit_ch: exit_ch, + } + } + + pub(crate) fn root(&self) -> &AbsoluteSystemPath { + &self.root + } + + pub(crate) async fn cookie_request( + &self, + request: T, + ) -> Result, CookieError> { + // we need to write the cookie from a single task so as to serialize them + let (resp_tx, resp_rx) = oneshot::channel(); + self.cookie_request_tx.clone().send(resp_tx).await?; + let serial = tokio::time::timeout(self.timeout, resp_rx).await???; + Ok(CookiedRequest { request, serial }) + } +} + +async fn watch_cookies( + root: AbsoluteSystemPathBuf, + mut cookie_requests: mpsc::Receiver>>, + mut exit_signal: mpsc::Receiver<()>, +) { + let mut serial: usize = 0; + loop { + tokio::select! { + biased; + _ = exit_signal.recv() => return, + req = cookie_requests.recv() => handle_cookie_request(&root, &mut serial, req), + } + } +} + +fn handle_cookie_request( + root: &AbsoluteSystemPath, + serial: &mut usize, + req: Option>>, +) { + if let Some(req) = req { + *serial += 1; + let cookie_path = root.join_component(&format!("{}.cookie", serial)); + let mut opts = OpenOptions::new(); + opts.truncate(true).create(true).write(true); + let result = { + // dropping the resulting file closes the handle + trace!("writing cookie {}", cookie_path); + cookie_path + .ensure_dir() + .and_then(|_| cookie_path.open_with_options(opts)) + .map_err(|io_err| CookieError::IO { + io_err, + path: cookie_path.clone(), + }) + }; + let result = result.map(|_| *serial); + // We don't care if the client has timed out and gone away + let _ = req.send(result); + } +} + +#[cfg(test)] +mod test { + use std::time::Duration; + + use notify::{event::CreateKind, Event, EventKind}; + use tokio::{ + sync::{broadcast, mpsc, oneshot}, + task::JoinSet, + }; + use turbopath::AbsoluteSystemPathBuf; + + use super::{CookieWatcher, CookiedRequest}; + use crate::{cookies::CookieWriter, NotifyError}; + + struct TestQuery { + resp: oneshot::Sender<()>, + } + + struct TestService { + file_events: broadcast::Receiver>, + cookie_watcher: CookieWatcher, + reqs_rx: mpsc::Receiver>, + } + + impl TestService { + async fn watch(mut self, mut exit_ch: oneshot::Receiver<()>) { + loop { + tokio::select! { + biased; + _ = &mut exit_ch => return, + Some(req) = self.reqs_rx.recv() => { + if let Some(query) = self.cookie_watcher.check_request(req) { + query.resp.send(()).unwrap(); + } + } + file_event = self.file_events.recv() => { + let event = file_event.unwrap().unwrap(); + for path in event.paths { + let path = AbsoluteSystemPathBuf::try_from(path).unwrap(); + if let Some(queries) = self.cookie_watcher.pop_ready_requests(event.kind, &path) { + for query in queries { + query.resp.send(()).unwrap(); + } + } + } + } + } + } + } + } + + #[derive(Clone)] + struct TestClient { + reqs_tx: mpsc::Sender>, + cookie_writer: CookieWriter, + } + + impl TestClient { + async fn request(&self) { + let (resp_tx, resp_rx) = oneshot::channel(); + let query = TestQuery { resp: resp_tx }; + let req = self.cookie_writer.cookie_request(query).await.unwrap(); + self.reqs_tx.send(req).await.unwrap(); + resp_rx.await.unwrap(); + } + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_service_cookies() { + let tempdir = tempfile::tempdir().unwrap(); + let path = AbsoluteSystemPathBuf::try_from(tempdir.path()) + .unwrap() + .to_realpath() + .unwrap(); + + let (send_file_events, file_events) = broadcast::channel(16); + let (reqs_tx, reqs_rx) = mpsc::channel(16); + let cookie_writer = CookieWriter::new(&path, Duration::from_secs(2)); + let (exit_tx, exit_rx) = oneshot::channel(); + + let service = TestService { + file_events, + cookie_watcher: CookieWatcher::new(path.clone()), + reqs_rx, + }; + let service_handle = tokio::spawn(service.watch(exit_rx)); + + let client = TestClient { + reqs_tx, + cookie_writer, + }; + // race request and file event. Either order should work. + tokio_scoped::scope(|scope| { + scope.spawn(client.request()); + send_file_events + .send(Ok(Event { + kind: EventKind::Create(CreateKind::File), + paths: vec![path.join_component("1.cookie").as_std_path().to_owned()], + ..Default::default() + })) + .unwrap(); + }); + + // explicitly send the cookie first + tokio_scoped::scope(|scope| { + send_file_events + .send(Ok(Event { + kind: EventKind::Create(CreateKind::File), + paths: vec![path.join_component("2.cookie").as_std_path().to_owned()], + ..Default::default() + })) + .unwrap(); + scope.spawn(client.request()); + }); + + // send a cookie with a much higher value + tokio_scoped::scope(|scope| { + send_file_events + .send(Ok(Event { + kind: EventKind::Create(CreateKind::File), + paths: vec![path.join_component("20.cookie").as_std_path().to_owned()], + ..Default::default() + })) + .unwrap(); + scope.spawn(client.request()); + }); + exit_tx.send(()).unwrap(); + service_handle.await.unwrap(); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_out_of_order_requests() { + let tempdir = tempfile::tempdir().unwrap(); + let path = AbsoluteSystemPathBuf::try_from(tempdir.path()) + .unwrap() + .to_realpath() + .unwrap(); + + let (send_file_events, file_events) = broadcast::channel(16); + let (reqs_tx, reqs_rx) = mpsc::channel(16); + let cookie_writer = CookieWriter::new(&path, Duration::from_secs(2)); + let (exit_tx, exit_rx) = oneshot::channel(); + + let service = TestService { + file_events, + cookie_watcher: CookieWatcher::new(path.clone()), + reqs_rx, + }; + let service_handle = tokio::spawn(service.watch(exit_rx)); + + let client = TestClient { + reqs_tx, + cookie_writer, + }; + + let mut join_set = JoinSet::new(); + let client_1 = client.clone(); + join_set.spawn(async move { client_1.request().await }); + + let client_2 = client.clone(); + join_set.spawn(async move { client_2.request().await }); + + let client_3 = client.clone(); + join_set.spawn(async move { client_3.request().await }); + + send_file_events + .send(Ok(Event { + kind: EventKind::Create(CreateKind::File), + paths: vec![path.join_component("2.cookie").as_std_path().to_owned()], + ..Default::default() + })) + .unwrap(); + + // Expect 2 rpcs to be ready. We don't know which ones they will be + // but we also don't care. We don't have ordering semantics on the client + // side. + join_set.join_next().await.unwrap().unwrap(); + join_set.join_next().await.unwrap().unwrap(); + + send_file_events + .send(Ok(Event { + kind: EventKind::Create(CreateKind::File), + paths: vec![path.join_component("3.cookie").as_std_path().to_owned()], + ..Default::default() + })) + .unwrap(); + join_set.join_next().await.unwrap().unwrap(); + + exit_tx.send(()).unwrap(); + service_handle.await.unwrap(); + } +} diff --git a/crates/turborepo-filewatch/src/globwatcher.rs b/crates/turborepo-filewatch/src/globwatcher.rs index 498478b6e48ab..8ac2cf7518a4e 100644 --- a/crates/turborepo-filewatch/src/globwatcher.rs +++ b/crates/turborepo-filewatch/src/globwatcher.rs @@ -3,6 +3,7 @@ use std::{ fmt::Display, future::IntoFuture, str::FromStr, + time::Duration, }; use notify::Event; @@ -13,7 +14,7 @@ use turbopath::{AbsoluteSystemPath, AbsoluteSystemPathBuf, RelativeUnixPath}; use wax::{Any, Glob, Program}; use crate::{ - cookie_jar::{CookieError, CookieJar}, + cookies::{CookieError, CookieWatcher, CookieWriter, CookiedRequest}, NotifyError, }; @@ -80,8 +81,12 @@ impl GlobSet { pub enum Error { #[error(transparent)] CookieError(#[from] CookieError), + #[error("failed to send query to globwatcher: {0}")] + SendError(#[from] mpsc::error::SendError>), #[error("globwatcher has closed")] Closed, + #[error("globwatcher request timed out")] + Timeout(#[from] tokio::time::error::Elapsed), } impl From> for Error { @@ -97,13 +102,13 @@ impl From for Error { } pub struct GlobWatcher { - cookie_jar: CookieJar, + cookie_jar: CookieWriter, // _exit_ch exists to trigger a close on the receiver when an instance // of this struct is dropped. The task that is receiving events will exit, // dropping the other sender for the broadcast channel, causing all receivers // to be notified of a close. _exit_ch: oneshot::Sender<()>, - query_ch: mpsc::Sender, + query_ch: mpsc::Sender>, } #[derive(Debug)] @@ -134,19 +139,22 @@ struct GlobTracker { recv: broadcast::Receiver>, - query_recv: mpsc::Receiver, + query_recv: mpsc::Receiver>, + + cookie_watcher: CookieWatcher, } impl GlobWatcher { pub fn new( root: &AbsoluteSystemPath, - cookie_jar: CookieJar, + cookie_jar: CookieWriter, recv: broadcast::Receiver>, ) -> Self { let (exit_ch, exit_signal) = tokio::sync::oneshot::channel(); let (query_ch, query_recv) = mpsc::channel(256); + let cookie_root = cookie_jar.root().to_owned(); tokio::task::spawn( - GlobTracker::new(root.to_owned(), exit_signal, recv, query_recv).watch(), + GlobTracker::new(root.to_owned(), cookie_root, exit_signal, recv, query_recv).watch(), ); Self { cookie_jar, @@ -155,34 +163,38 @@ impl GlobWatcher { } } - pub async fn watch_globs(&self, hash: Hash, globs: GlobSet) -> Result<(), Error> { - self.cookie_jar.wait_for_cookie().await?; + pub async fn watch_globs( + &self, + hash: Hash, + globs: GlobSet, + timeout: Duration, + ) -> Result<(), Error> { let (tx, rx) = oneshot::channel(); - self.query_ch - .send(Query::WatchGlobs { - hash, - glob_set: globs, - resp: tx, - }) - .await?; - rx.await? + let req = Query::WatchGlobs { + hash, + glob_set: globs, + resp: tx, + }; + let cookied_request = self.cookie_jar.cookie_request(req).await?; + self.query_ch.send(cookied_request).await?; + tokio::time::timeout(timeout, rx).await?? } pub async fn get_changed_globs( &self, hash: Hash, candidates: HashSet, + timeout: Duration, ) -> Result, Error> { - self.cookie_jar.wait_for_cookie().await?; let (tx, rx) = oneshot::channel(); - self.query_ch - .send(Query::GetChangedGlobs { - hash, - candidates, - resp: tx, - }) - .await?; - rx.await? + let req = Query::GetChangedGlobs { + hash, + candidates, + resp: tx, + }; + let cookied_request = self.cookie_jar.cookie_request(req).await?; + self.query_ch.send(cookied_request).await?; + tokio::time::timeout(timeout, rx).await?? } } @@ -197,9 +209,10 @@ enum WatchError { impl GlobTracker { fn new( root: AbsoluteSystemPathBuf, + cookie_root: AbsoluteSystemPathBuf, exit_signal: oneshot::Receiver<()>, recv: broadcast::Receiver>, - query_recv: mpsc::Receiver, + query_recv: mpsc::Receiver>, ) -> Self { Self { root, @@ -208,6 +221,13 @@ impl GlobTracker { exit_signal, recv, query_recv, + cookie_watcher: CookieWatcher::new(cookie_root), + } + } + + fn handle_cookied_query(&mut self, cookied_query: CookiedRequest) { + if let Some(request) = self.cookie_watcher.check_request(cookied_query) { + self.handle_query(request); } } @@ -267,6 +287,15 @@ impl GlobTracker { for path in file_event.paths { let path = AbsoluteSystemPathBuf::try_from(path) .expect("filewatching should produce absolute paths"); + if let Some(queries) = self + .cookie_watcher + .pop_ready_requests(file_event.kind, &path) + { + for query in queries { + self.handle_query(query); + } + return; + } let Ok(to_match) = self.root.anchor(path) else { // irrelevant filesystem update return; @@ -281,7 +310,7 @@ impl GlobTracker { loop { tokio::select! { _ = &mut self.exit_signal => return, - Some(query) = self.query_recv.recv().into_future() => self.handle_query(query), + Some(query) = self.query_recv.recv().into_future() => self.handle_cookied_query(query), file_event = self.recv.recv().into_future() => self.handle_file_event(file_event) } } @@ -350,7 +379,7 @@ mod test { use wax::{any, Glob}; use crate::{ - cookie_jar::CookieJar, + cookies::CookieWriter, globwatcher::{GlobSet, GlobWatcher}, FileSystemWatcher, }; @@ -418,6 +447,7 @@ mod test { #[tokio::test] async fn test_track_outputs() { + let timeout = Duration::from_secs(2); let (repo_root, _tmp_dir) = temp_dir(); setup(&repo_root); let cookie_dir = repo_root.join_component(".git"); @@ -425,7 +455,7 @@ mod test { let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root) .await .unwrap(); - let cookie_jar = CookieJar::new(&cookie_dir, Duration::from_secs(2), watcher.subscribe()); + let cookie_jar = CookieWriter::new(&cookie_dir, Duration::from_secs(2)); let glob_watcher = GlobWatcher::new(&repo_root, cookie_jar, watcher.subscribe()); @@ -439,11 +469,14 @@ mod test { let hash = "the-hash".to_string(); - glob_watcher.watch_globs(hash.clone(), globs).await.unwrap(); + glob_watcher + .watch_globs(hash.clone(), globs, timeout) + .await + .unwrap(); let candidates = HashSet::from_iter(raw_includes.iter().map(|s| s.to_string())); let results = glob_watcher - .get_changed_globs(hash.clone(), candidates.clone()) + .get_changed_globs(hash.clone(), candidates.clone(), timeout) .await .unwrap(); assert!(results.is_empty()); @@ -454,7 +487,7 @@ mod test { .create_with_contents("some bytes") .unwrap(); let results = glob_watcher - .get_changed_globs(hash.clone(), candidates.clone()) + .get_changed_globs(hash.clone(), candidates.clone(), timeout) .await .unwrap(); assert!(results.is_empty()); @@ -465,7 +498,7 @@ mod test { .create_with_contents("some bytes") .unwrap(); let results = glob_watcher - .get_changed_globs(hash.clone(), candidates.clone()) + .get_changed_globs(hash.clone(), candidates.clone(), timeout) .await .unwrap(); assert!(results.is_empty()); @@ -476,7 +509,7 @@ mod test { .create_with_contents("some bytes") .unwrap(); let results = glob_watcher - .get_changed_globs(hash.clone(), candidates.clone()) + .get_changed_globs(hash.clone(), candidates.clone(), timeout) .await .unwrap(); let expected = HashSet::from_iter(["my-pkg/dist/**".to_string()]); @@ -488,7 +521,7 @@ mod test { .create_with_contents("some bytes") .unwrap(); let results = glob_watcher - .get_changed_globs(hash.clone(), candidates.clone()) + .get_changed_globs(hash.clone(), candidates.clone(), timeout) .await .unwrap(); let expected = @@ -498,6 +531,7 @@ mod test { #[tokio::test] async fn test_track_multiple_hashes() { + let timeout = Duration::from_secs(2); let (repo_root, _tmp_dir) = temp_dir(); setup(&repo_root); let cookie_dir = repo_root.join_component(".git"); @@ -505,7 +539,7 @@ mod test { let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root) .await .unwrap(); - let cookie_jar = CookieJar::new(&cookie_dir, Duration::from_secs(2), watcher.subscribe()); + let cookie_jar = CookieWriter::new(&cookie_dir, Duration::from_secs(2)); let glob_watcher = GlobWatcher::new(&repo_root, cookie_jar, watcher.subscribe()); @@ -518,11 +552,14 @@ mod test { let hash = "the-hash".to_string(); - glob_watcher.watch_globs(hash.clone(), globs).await.unwrap(); + glob_watcher + .watch_globs(hash.clone(), globs, timeout) + .await + .unwrap(); let candidates = HashSet::from_iter(raw_includes.iter().map(|s| s.to_string())); let results = glob_watcher - .get_changed_globs(hash.clone(), candidates.clone()) + .get_changed_globs(hash.clone(), candidates.clone(), timeout) .await .unwrap(); assert!(results.is_empty()); @@ -535,20 +572,20 @@ mod test { }; let second_hash = "the-second-hash".to_string(); glob_watcher - .watch_globs(second_hash.clone(), second_globs) + .watch_globs(second_hash.clone(), second_globs, timeout) .await .unwrap(); let second_candidates = HashSet::from_iter(second_raw_includes.iter().map(|s| s.to_string())); let results = glob_watcher - .get_changed_globs(hash.clone(), candidates.clone()) + .get_changed_globs(hash.clone(), candidates.clone(), timeout) .await .unwrap(); assert!(results.is_empty()); let results = glob_watcher - .get_changed_globs(second_hash.clone(), second_candidates.clone()) + .get_changed_globs(second_hash.clone(), second_candidates.clone(), timeout) .await .unwrap(); assert!(results.is_empty()); @@ -560,7 +597,7 @@ mod test { .unwrap(); // expect one changed glob for the first hash let results = glob_watcher - .get_changed_globs(hash.clone(), candidates.clone()) + .get_changed_globs(hash.clone(), candidates.clone(), timeout) .await .unwrap(); let expected = HashSet::from_iter(["my-pkg/.next/**".to_string()]); @@ -569,7 +606,7 @@ mod test { // The second hash which excludes the change should still not have any changed // globs let results = glob_watcher - .get_changed_globs(second_hash.clone(), second_candidates.clone()) + .get_changed_globs(second_hash.clone(), second_candidates.clone(), timeout) .await .unwrap(); assert!(results.is_empty()); @@ -580,7 +617,7 @@ mod test { .create_with_contents("hello") .unwrap(); let results = glob_watcher - .get_changed_globs(second_hash.clone(), second_candidates.clone()) + .get_changed_globs(second_hash.clone(), second_candidates.clone(), timeout) .await .unwrap(); assert_eq!(results, second_candidates); @@ -588,6 +625,7 @@ mod test { #[tokio::test] async fn test_watch_single_file() { + let timeout = Duration::from_secs(2); let (repo_root, _tmp_dir) = temp_dir(); setup(&repo_root); let cookie_dir = repo_root.join_component(".git"); @@ -595,7 +633,7 @@ mod test { let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root) .await .unwrap(); - let cookie_jar = CookieJar::new(&cookie_dir, Duration::from_secs(2), watcher.subscribe()); + let cookie_jar = CookieWriter::new(&cookie_dir, Duration::from_secs(2)); let glob_watcher = GlobWatcher::new(&repo_root, cookie_jar, watcher.subscribe()); @@ -613,7 +651,10 @@ mod test { let hash = "the-hash".to_string(); - glob_watcher.watch_globs(hash.clone(), globs).await.unwrap(); + glob_watcher + .watch_globs(hash.clone(), globs, timeout) + .await + .unwrap(); // A change to an irrelevant file repo_root @@ -623,7 +664,7 @@ mod test { let candidates = HashSet::from_iter(raw_includes.iter().map(|s| s.to_string())); let results = glob_watcher - .get_changed_globs(hash.clone(), candidates.clone()) + .get_changed_globs(hash.clone(), candidates.clone(), timeout) .await .unwrap(); assert!(results.is_empty()); @@ -632,7 +673,7 @@ mod test { let watched_file = repo_root.join_components(&["my-pkg", ".next", "next-file:build"]); watched_file.create_with_contents("hello").unwrap(); let results = glob_watcher - .get_changed_globs(hash.clone(), candidates.clone()) + .get_changed_globs(hash.clone(), candidates.clone(), timeout) .await .unwrap(); assert_eq!(results, candidates); diff --git a/crates/turborepo-filewatch/src/lib.rs b/crates/turborepo-filewatch/src/lib.rs index b2414da55b4b0..3a376abedcfbf 100644 --- a/crates/turborepo-filewatch/src/lib.rs +++ b/crates/turborepo-filewatch/src/lib.rs @@ -35,7 +35,7 @@ use { walkdir::WalkDir, }; -pub mod cookie_jar; +pub mod cookies; #[cfg(target_os = "macos")] mod fsevent; pub mod globwatcher; diff --git a/crates/turborepo-lib/src/daemon/server.rs b/crates/turborepo-lib/src/daemon/server.rs index 150ee0861dfe3..e14076eca4ddf 100644 --- a/crates/turborepo-lib/src/daemon/server.rs +++ b/crates/turborepo-lib/src/daemon/server.rs @@ -33,7 +33,7 @@ use tower::ServiceBuilder; use tracing::{error, info, trace, warn}; use turbopath::{AbsoluteSystemPath, AbsoluteSystemPathBuf}; use turborepo_filewatch::{ - cookie_jar::CookieJar, + cookies::CookieWriter, globwatcher::{Error as GlobWatcherError, GlobError, GlobSet, GlobWatcher}, package_watcher::PackageWatcher, FileSystemWatcher, WatchError, @@ -97,12 +97,8 @@ async fn start_filewatching( backup_discovery: PD, ) -> Result<(), WatchError> { let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root).await?; - let cookie_jar = CookieJar::new( - watcher.cookie_dir(), - Duration::from_millis(100), - watcher.subscribe(), - ); - let glob_watcher = GlobWatcher::new(&repo_root, cookie_jar, watcher.subscribe()); + let cookie_writer = CookieWriter::new(watcher.cookie_dir(), Duration::from_millis(100)); + let glob_watcher = GlobWatcher::new(&repo_root, cookie_writer, watcher.subscribe()); let package_watcher = PackageWatcher::new(repo_root.clone(), watcher.subscribe(), backup_discovery) .await @@ -331,7 +327,9 @@ impl TurboGrpcServiceInner { ) -> Result<(), RpcError> { let glob_set = GlobSet::from_raw(output_globs, output_glob_exclusions)?; let fw = self.wait_for_filewatching().await?; - fw.glob_watcher.watch_globs(hash.clone(), glob_set).await?; + fw.glob_watcher + .watch_globs(hash.clone(), glob_set, REQUEST_TIMEOUT) + .await?; { let mut times_saved = self.times_saved.lock().expect("times saved lock poisoned"); times_saved.insert(hash, time_saved); @@ -349,7 +347,10 @@ impl TurboGrpcServiceInner { times_saved.get(hash.as_str()).copied().unwrap_or_default() }; let fw = self.wait_for_filewatching().await?; - let changed_globs = fw.glob_watcher.get_changed_globs(hash, candidates).await?; + let changed_globs = fw + .glob_watcher + .get_changed_globs(hash, candidates, REQUEST_TIMEOUT) + .await?; Ok((changed_globs, time_saved)) } diff --git a/crates/turborepo-paths/src/absolute_system_path.rs b/crates/turborepo-paths/src/absolute_system_path.rs index 107b7826e588d..e9af365fdad16 100644 --- a/crates/turborepo-paths/src/absolute_system_path.rs +++ b/crates/turborepo-paths/src/absolute_system_path.rs @@ -401,6 +401,10 @@ impl AbsoluteSystemPath { self.0.parent().map(Self::new_unchecked) } + pub fn file_name(&self) -> Option<&str> { + self.0.file_name() + } + /// Opens file and sets the `FILE_FLAG_SEQUENTIAL_SCAN` flag on Windows to /// help with performance pub fn open(&self) -> Result {