Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve daemon startup times #7322

Merged
merged 14 commits into from Feb 15, 2024
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

68 changes: 53 additions & 15 deletions crates/turborepo-filewatch/src/cookies.rs
Expand Up @@ -3,12 +3,14 @@ use std::{collections::BinaryHeap, fs::OpenOptions, time::Duration};
use notify::EventKind;
use thiserror::Error;
use tokio::{
sync::{mpsc, oneshot},
sync::{broadcast, mpsc, oneshot},
time::error::Elapsed,
};
use tracing::trace;
use turbopath::{AbsoluteSystemPath, AbsoluteSystemPathBuf, PathRelation};

use crate::{NotifyError, OptionalWatch};

#[derive(Debug, Error)]
pub enum CookieError {
#[error("cookie timeout expired")]
Expand All @@ -22,6 +24,8 @@ pub enum CookieError {
io_err: std::io::Error,
path: AbsoluteSystemPathBuf,
},
#[error("cookie queue is not available")]
Unavailable,
}

/// CookieWriter is responsible for assigning filesystem cookies to a request
Expand All @@ -30,7 +34,8 @@ pub enum CookieError {
pub struct CookieWriter {
root: AbsoluteSystemPathBuf,
timeout: Duration,
cookie_request_tx: mpsc::Sender<oneshot::Sender<Result<usize, CookieError>>>,
cookie_request_sender_lazy:
OptionalWatch<mpsc::Sender<oneshot::Sender<Result<usize, CookieError>>>>,
// _exit_ch exists to trigger a close on the receiver when all instances
// of this struct are dropped. The task that is receiving events will exit,
// dropping the other sender for the broadcast channel, causing all receivers
Expand Down Expand Up @@ -138,18 +143,40 @@ impl<T> CookieWatcher<T> {
}

impl CookieWriter {
pub fn new(root: &AbsoluteSystemPath, timeout: Duration) -> Self {
pub fn new(
root: &AbsoluteSystemPath,
timeout: Duration,
mut recv: OptionalWatch<broadcast::Receiver<Result<notify::Event, NotifyError>>>,
) -> Self {
let (cookie_request_sender_tx, cookie_request_sender_lazy) = OptionalWatch::new();
let (exit_ch, exit_signal) = mpsc::channel(16);
let (cookie_requests_tx, cookie_requests_rx) = mpsc::channel(16);
tokio::spawn(watch_cookies(
root.to_owned(),
cookie_requests_rx,
exit_signal,
));
tokio::spawn({
let root = root.to_owned();
async move {
if recv.get().await.is_err() {
// here we need to wait for confirmation that the watching end is ready
// before we start sending requests. this has the side effect of not
// enabling the cookie writing mechanism until the watcher is ready
return;
}

let (cookie_requests_tx, cookie_requests_rx) = mpsc::channel(16);

if cookie_request_sender_tx
.send(Some(cookie_requests_tx))
.is_err()
{
// the receiver has already been dropped
tracing::debug!("nobody listening for cookie requests, exiting");
return;
};
watch_cookies(root.to_owned(), cookie_requests_rx, exit_signal).await;
}
});
Self {
root: root.to_owned(),
timeout,
cookie_request_tx: cookie_requests_tx,
cookie_request_sender_lazy,
_exit_ch: exit_ch,
}
}
Expand All @@ -163,9 +190,18 @@ impl CookieWriter {
request: T,
) -> Result<CookiedRequest<T>, CookieError> {
// we need to write the cookie from a single task so as to serialize them
tokio::time::timeout(self.timeout, self.cookie_request_inner(request)).await?
}

async fn cookie_request_inner<T>(&self, request: T) -> Result<CookiedRequest<T>, CookieError> {
let (resp_tx, resp_rx) = oneshot::channel();
self.cookie_request_tx.clone().send(resp_tx).await?;
let serial = tokio::time::timeout(self.timeout, resp_rx).await???;
let mut cookie_request_tx = self.cookie_request_sender_lazy.clone();
let Ok(cookie_request_tx) = cookie_request_tx.get().await.map(|s| s.to_owned()) else {
arlyon marked this conversation as resolved.
Show resolved Hide resolved
// the cookie queue is not ready and will never be ready
return Err(CookieError::Unavailable);
};
cookie_request_tx.send(resp_tx).await?;
let serial = resp_rx.await??;
Ok(CookiedRequest { request, serial })
}
}
Expand Down Expand Up @@ -224,7 +260,7 @@ mod test {
use turbopath::AbsoluteSystemPathBuf;

use super::{CookieWatcher, CookiedRequest};
use crate::{cookies::CookieWriter, NotifyError};
use crate::{cookies::CookieWriter, NotifyError, OptionalWatch};

struct TestQuery {
resp: oneshot::Sender<()>,
Expand Down Expand Up @@ -288,8 +324,9 @@ mod test {
.unwrap();

let (send_file_events, file_events) = broadcast::channel(16);
let recv = OptionalWatch::once(file_events.resubscribe());
let (reqs_tx, reqs_rx) = mpsc::channel(16);
let cookie_writer = CookieWriter::new(&path, Duration::from_secs(2));
let cookie_writer = CookieWriter::new(&path, Duration::from_secs(2), recv);
let (exit_tx, exit_rx) = oneshot::channel();

let service = TestService {
Expand Down Expand Up @@ -351,8 +388,9 @@ mod test {
.unwrap();

let (send_file_events, file_events) = broadcast::channel(16);
let recv = OptionalWatch::once(file_events.resubscribe());
let (reqs_tx, reqs_rx) = mpsc::channel(16);
let cookie_writer = CookieWriter::new(&path, Duration::from_secs(2));
let cookie_writer = CookieWriter::new(&path, Duration::from_secs(2), recv);
let (exit_tx, exit_rx) = oneshot::channel();

let service = TestService {
Expand Down
93 changes: 63 additions & 30 deletions crates/turborepo-filewatch/src/globwatcher.rs
Expand Up @@ -10,12 +10,12 @@ use notify::Event;
use thiserror::Error;
use tokio::sync::{broadcast, mpsc, oneshot};
use tracing::{debug, warn};
use turbopath::{AbsoluteSystemPath, AbsoluteSystemPathBuf, RelativeUnixPath};
use turbopath::{AbsoluteSystemPathBuf, RelativeUnixPath};
use wax::{Any, Glob, Program};

use crate::{
cookies::{CookieError, CookieWatcher, CookieWriter, CookiedRequest},
NotifyError,
NotifyError, OptionalWatch,
};

type Hash = String;
Expand Down Expand Up @@ -101,6 +101,8 @@ pub enum Error {
Closed,
#[error("globwatcher request timed out")]
Timeout(#[from] tokio::time::error::Elapsed),
#[error("glob watching is unavailable")]
Unavailable,
}

impl From<mpsc::error::SendError<Query>> for Error {
Expand All @@ -122,7 +124,7 @@ pub struct GlobWatcher {
// dropping the other sender for the broadcast channel, causing all receivers
// to be notified of a close.
_exit_ch: oneshot::Sender<()>,
query_ch: mpsc::Sender<CookiedRequest<Query>>,
query_ch_lazy: OptionalWatch<mpsc::Sender<CookiedRequest<Query>>>,
}

#[derive(Debug)]
Expand Down Expand Up @@ -160,23 +162,43 @@ struct GlobTracker {

impl GlobWatcher {
pub fn new(
root: &AbsoluteSystemPath,
root: AbsoluteSystemPathBuf,
cookie_jar: CookieWriter,
recv: broadcast::Receiver<Result<Event, NotifyError>>,
mut recv: OptionalWatch<broadcast::Receiver<Result<Event, NotifyError>>>,
) -> Self {
let (exit_ch, exit_signal) = tokio::sync::oneshot::channel();
let (query_ch, query_recv) = mpsc::channel(256);
let (query_ch_tx, query_ch_lazy) = OptionalWatch::new();
let cookie_root = cookie_jar.root().to_owned();
tokio::task::spawn(
GlobTracker::new(root.to_owned(), cookie_root, exit_signal, recv, query_recv).watch(),
);
tokio::task::spawn(async move {
let Ok(recv) = recv.get().await.map(|r| r.resubscribe()) else {
arlyon marked this conversation as resolved.
Show resolved Hide resolved
// if this fails, it means that the filewatcher is not available
// so starting the glob tracker is pointless
return;
};

// if the receiver is closed, it means the glob watcher is closed and we
// probably don't want to start the glob tracker
let (query_ch, query_recv) = mpsc::channel(128);
if query_ch_tx.send(Some(query_ch)).is_err() {
tracing::debug!("no queryers for glob watcher, exiting");
return;
}

GlobTracker::new(root, cookie_root, exit_signal, recv, query_recv)
.watch()
.await
});
Self {
cookie_jar,
_exit_ch: exit_ch,
query_ch,
query_ch_lazy,
}
}

/// Watch a set of globs for a given hash.
///
/// This function will return `Error::Unavailable` if the globwatcher is not
/// yet available.
pub async fn watch_globs(
&self,
hash: Hash,
Expand All @@ -189,11 +211,14 @@ impl GlobWatcher {
glob_set: globs,
resp: tx,
};
let cookied_request = self.cookie_jar.cookie_request(req).await?;
self.query_ch.send(cookied_request).await?;
self.send_request(req).await?;
tokio::time::timeout(timeout, rx).await??
}

/// Get the globs that have changed for a given hash.
///
/// This function will return `Error::Unavailable` if the globwatcher is not
/// yet available.
pub async fn get_changed_globs(
&self,
hash: Hash,
Expand All @@ -206,10 +231,22 @@ impl GlobWatcher {
candidates,
resp: tx,
};
let cookied_request = self.cookie_jar.cookie_request(req).await?;
self.query_ch.send(cookied_request).await?;
self.send_request(req).await?;
tokio::time::timeout(timeout, rx).await??
}

async fn send_request(&self, req: Query) -> Result<(), Error> {
let cookied_request = self.cookie_jar.cookie_request(req).await?;
let mut query_ch = self.query_ch_lazy.clone();
let query_ch = query_ch
.get_immediate()
.ok_or(Error::Unavailable)?
.map(|ch| ch.clone())
.map_err(|_| Error::Unavailable)?;

query_ch.send(cookied_request).await?;
Ok(())
}
}

#[derive(Debug, Error)]
Expand Down Expand Up @@ -468,12 +505,10 @@ mod test {
setup(&repo_root);
let cookie_dir = repo_root.join_component(".git");

let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root)
.await
.unwrap();
let cookie_jar = CookieWriter::new(&cookie_dir, Duration::from_secs(2));

let glob_watcher = GlobWatcher::new(&repo_root, cookie_jar, watcher.subscribe());
let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root).unwrap();
let recv = watcher.watch();
let cookie_jar = CookieWriter::new(&cookie_dir, Duration::from_secs(2), recv.clone());
let glob_watcher = GlobWatcher::new(repo_root.clone(), cookie_jar, recv);

let raw_includes = &["my-pkg/dist/**", "my-pkg/.next/**"];
let raw_excludes = ["my-pkg/.next/cache/**"];
Expand Down Expand Up @@ -553,12 +588,11 @@ mod test {
setup(&repo_root);
let cookie_dir = repo_root.join_component(".git");

let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root)
.await
.unwrap();
let cookie_jar = CookieWriter::new(&cookie_dir, Duration::from_secs(2));
let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root).unwrap();
let recv = watcher.watch();
let cookie_jar = CookieWriter::new(&cookie_dir, Duration::from_secs(2), recv.clone());

let glob_watcher = GlobWatcher::new(&repo_root, cookie_jar, watcher.subscribe());
let glob_watcher = GlobWatcher::new(repo_root.clone(), cookie_jar, recv);

let raw_includes = &["my-pkg/dist/**", "my-pkg/.next/**"];
let raw_excludes: [&str; 0] = [];
Expand Down Expand Up @@ -649,12 +683,11 @@ mod test {
setup(&repo_root);
let cookie_dir = repo_root.join_component(".git");

let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root)
.await
.unwrap();
let cookie_jar = CookieWriter::new(&cookie_dir, Duration::from_secs(2));
let watcher = FileSystemWatcher::new_with_default_cookie_dir(&repo_root).unwrap();
let recv = watcher.watch();
let cookie_jar = CookieWriter::new(&cookie_dir, Duration::from_secs(2), recv.clone());

let glob_watcher = GlobWatcher::new(&repo_root, cookie_jar, watcher.subscribe());
let glob_watcher = GlobWatcher::new(repo_root.clone(), cookie_jar, recv);

// On windows, we expect different sanitization before the
// globs are passed in, due to alternative data streams in files.
Expand Down