Skip to content

Commit

Permalink
feat(Turborepo): Renaming, additional cookiewriter constructor (#7553)
Browse files Browse the repository at this point in the history
### Description

- Extracts the cookie-related changes from #7183 to try to slim down
that PR a bit

### Testing Instructions

Existing test suite

Closes TURBO-2480

Co-authored-by: Greg Soltis <Greg Soltis>
  • Loading branch information
gsoltis committed Feb 29, 2024
1 parent 27b6884 commit 17a148f
Showing 1 changed file with 21 additions and 10 deletions.
31 changes: 21 additions & 10 deletions crates/turborepo-filewatch/src/cookies.rs
Expand Up @@ -68,7 +68,8 @@ pub enum CookieError {
/// for a downstream, filewatching-backed service.
#[derive(Clone)]
pub struct CookieWriter {
root: AbsoluteSystemPathBuf,
// Where we put the cookie files, usually `<repo_root>/.turbo/cookies`
cookie_root: AbsoluteSystemPathBuf,
timeout: Duration,
cookie_request_sender_lazy:
OptionalWatch<mpsc::Sender<oneshot::Sender<Result<usize, CookieError>>>>,
Expand Down Expand Up @@ -112,17 +113,18 @@ impl<T> Ord for CookiedRequest<T> {
/// CookieWatcher is used by downstream filewatching-backed services to
/// know when it is safe to handle a particular request.
pub(crate) struct CookieWatcher<T> {
root: AbsoluteSystemPathBuf,
// Where we expect to find the cookie files, usually `<repo_root>/.turbo/cookies`
cookie_root: AbsoluteSystemPathBuf,
// We don't necessarily get requests in serial-order, but we want to keep them
// in order so we don't have to scan all requests every time we get a new cookie.
pending_requests: BinaryHeap<CookiedRequest<T>>,
latest: usize,
}

impl<T> CookieWatcher<T> {
pub(crate) fn new(root: AbsoluteSystemPathBuf) -> Self {
pub(crate) fn new(cookie_root: AbsoluteSystemPathBuf) -> Self {
Self {
root,
cookie_root,
pending_requests: BinaryHeap::new(),
latest: 0,
}
Expand Down Expand Up @@ -153,7 +155,7 @@ impl<T> CookieWatcher<T> {
if !matches!(event_kind, EventKind::Create(_)) {
return None;
}
if let Some(serial) = serial_for_path(&self.root, path) {
if let Some(serial) = serial_for_path(&self.cookie_root, path) {
self.latest = serial;
let mut ready_requests = Vec::new();
while let Some(cookied_request) = self.pending_requests.pop() {
Expand Down Expand Up @@ -181,15 +183,24 @@ fn serial_for_path(root: &AbsoluteSystemPath, path: &AbsoluteSystemPath) -> Opti
}

impl CookieWriter {
pub fn new_with_default_cookie_dir(
repo_root: &AbsoluteSystemPath,
timeout: Duration,
recv: OptionalWatch<broadcast::Receiver<Result<notify::Event, NotifyError>>>,
) -> Self {
let cookie_root = repo_root.join_components(&[".turbo", "cookies"]);
Self::new(&cookie_root, timeout, recv)
}

pub fn new(
root: &AbsoluteSystemPath,
cookie_root: &AbsoluteSystemPath,
timeout: Duration,
mut recv: OptionalWatch<broadcast::Receiver<Result<notify::Event, NotifyError>>>,
) -> Self {
let (cookie_request_sender_tx, cookie_request_sender_lazy) = OptionalWatch::new();
let (exit_ch, exit_signal) = mpsc::channel(16);
tokio::spawn({
let root = root.to_owned();
let root = cookie_root.to_owned();
async move {
if recv.get().await.is_err() {
// here we need to wait for confirmation that the watching end is ready
Expand All @@ -213,15 +224,15 @@ impl CookieWriter {
}
});
Self {
root: root.to_owned(),
cookie_root: cookie_root.to_owned(),
timeout,
cookie_request_sender_lazy,
_exit_ch: exit_ch,
}
}

pub(crate) fn root(&self) -> &AbsoluteSystemPath {
&self.root
&self.cookie_root
}

/// Sends a request to make a cookie file to the
Expand Down Expand Up @@ -421,7 +432,7 @@ impl<T, U: CookieReady + Clone> CookiedOptionalWatch<T, U> {
.await?
.serial;
self.ready(next_id).await;
tracing::debug!("waiting for data");
tracing::debug!("got cookie, waiting for data");
Ok(self.get_inner().await?)
}

Expand Down

0 comments on commit 17a148f

Please sign in to comment.