Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(Turborepo): Renaming, additional cookiewriter constructor #7553

Merged
merged 1 commit into from Feb 29, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
31 changes: 21 additions & 10 deletions crates/turborepo-filewatch/src/cookies.rs
Expand Up @@ -68,7 +68,8 @@ pub enum CookieError {
/// for a downstream, filewatching-backed service.
#[derive(Clone)]
pub struct CookieWriter {
root: AbsoluteSystemPathBuf,
// Where we put the cookie files, usually `<repo_root>/.turbo/cookies`
cookie_root: AbsoluteSystemPathBuf,
timeout: Duration,
cookie_request_sender_lazy:
OptionalWatch<mpsc::Sender<oneshot::Sender<Result<usize, CookieError>>>>,
Expand Down Expand Up @@ -112,17 +113,18 @@ impl<T> Ord for CookiedRequest<T> {
/// CookieWatcher is used by downstream filewatching-backed services to
/// know when it is safe to handle a particular request.
pub(crate) struct CookieWatcher<T> {
root: AbsoluteSystemPathBuf,
// Where we expect to find the cookie files, usually `<repo_root>/.turbo/cookies`
cookie_root: AbsoluteSystemPathBuf,
// We don't necessarily get requests in serial-order, but we want to keep them
// in order so we don't have to scan all requests every time we get a new cookie.
pending_requests: BinaryHeap<CookiedRequest<T>>,
latest: usize,
}

impl<T> CookieWatcher<T> {
pub(crate) fn new(root: AbsoluteSystemPathBuf) -> Self {
pub(crate) fn new(cookie_root: AbsoluteSystemPathBuf) -> Self {
Self {
root,
cookie_root,
pending_requests: BinaryHeap::new(),
latest: 0,
}
Expand Down Expand Up @@ -153,7 +155,7 @@ impl<T> CookieWatcher<T> {
if !matches!(event_kind, EventKind::Create(_)) {
return None;
}
if let Some(serial) = serial_for_path(&self.root, path) {
if let Some(serial) = serial_for_path(&self.cookie_root, path) {
self.latest = serial;
let mut ready_requests = Vec::new();
while let Some(cookied_request) = self.pending_requests.pop() {
Expand Down Expand Up @@ -181,15 +183,24 @@ fn serial_for_path(root: &AbsoluteSystemPath, path: &AbsoluteSystemPath) -> Opti
}

impl CookieWriter {
pub fn new_with_default_cookie_dir(
repo_root: &AbsoluteSystemPath,
timeout: Duration,
recv: OptionalWatch<broadcast::Receiver<Result<notify::Event, NotifyError>>>,
) -> Self {
let cookie_root = repo_root.join_components(&[".turbo", "cookies"]);
Self::new(&cookie_root, timeout, recv)
}

pub fn new(
root: &AbsoluteSystemPath,
cookie_root: &AbsoluteSystemPath,
timeout: Duration,
mut recv: OptionalWatch<broadcast::Receiver<Result<notify::Event, NotifyError>>>,
) -> Self {
let (cookie_request_sender_tx, cookie_request_sender_lazy) = OptionalWatch::new();
let (exit_ch, exit_signal) = mpsc::channel(16);
tokio::spawn({
let root = root.to_owned();
let root = cookie_root.to_owned();
async move {
if recv.get().await.is_err() {
// here we need to wait for confirmation that the watching end is ready
Expand All @@ -213,15 +224,15 @@ impl CookieWriter {
}
});
Self {
root: root.to_owned(),
cookie_root: cookie_root.to_owned(),
timeout,
cookie_request_sender_lazy,
_exit_ch: exit_ch,
}
}

pub(crate) fn root(&self) -> &AbsoluteSystemPath {
&self.root
&self.cookie_root
}

/// Sends a request to make a cookie file to the
Expand Down Expand Up @@ -421,7 +432,7 @@ impl<T, U: CookieReady + Clone> CookiedOptionalWatch<T, U> {
.await?
.serial;
self.ready(next_id).await;
tracing::debug!("waiting for data");
tracing::debug!("got cookie, waiting for data");
Ok(self.get_inner().await?)
}

Expand Down