Skip to content

Commit

Permalink
add cookie wrapper around OptionalWatch to synchronize updates
Browse files Browse the repository at this point in the history
  • Loading branch information
arlyon committed Feb 13, 2024
1 parent 1c92e7a commit 342056a
Show file tree
Hide file tree
Showing 3 changed files with 243 additions and 32 deletions.
205 changes: 195 additions & 10 deletions crates/turborepo-filewatch/src/cookies.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
use std::{collections::BinaryHeap, fs::OpenOptions, time::Duration};

use futures::FutureExt;
use notify::EventKind;
use thiserror::Error;
use tokio::{
sync::{broadcast, mpsc, oneshot},
sync::{broadcast, mpsc, oneshot, watch},
time::error::Elapsed,
};
use tracing::trace;
use turbopath::{AbsoluteSystemPath, AbsoluteSystemPathBuf, PathRelation};

use crate::{NotifyError, OptionalWatch};
use crate::{optional_watch::SomeRef, NotifyError, OptionalWatch};

#[derive(Debug, Error)]
pub enum CookieError {
Expand Down Expand Up @@ -114,7 +115,7 @@ impl<T> CookieWatcher<T> {
if !matches!(event_kind, EventKind::Create(_)) {
return None;
}
if let Some(serial) = self.serial_for_path(path) {
if let Some(serial) = serial_for_path(&self.root, path) {
self.latest = serial;
let mut ready_requests = Vec::new();
while let Some(cookied_request) = self.pending_requests.pop() {
Expand All @@ -130,14 +131,14 @@ impl<T> CookieWatcher<T> {
None
}
}
}

fn serial_for_path(&self, path: &AbsoluteSystemPath) -> Option<usize> {
if self.root.relation_to_path(path) == PathRelation::Parent {
let filename = path.file_name()?;
filename.strip_suffix(".cookie")?.parse().ok()
} else {
None
}
fn serial_for_path(root: &AbsoluteSystemPath, path: &AbsoluteSystemPath) -> Option<usize> {
if root.relation_to_path(path) == PathRelation::Parent {
let filename = path.file_name()?;
filename.strip_suffix(".cookie")?.parse().ok()
} else {
None
}
}

Expand Down Expand Up @@ -247,6 +248,190 @@ fn handle_cookie_request(
}
}

/// a lightweight wrapper around OptionalWatch that embeds cookie ids into the
/// get call. Ideal for queries that require a round trip to the filesystem to
/// be considered 'current'.
pub struct CookiedOptionalWatch<T, P: CookieReady> {
value: watch::Receiver<Option<T>>,
cookie_index: watch::Receiver<usize>,
cookie_writer: CookieWriter,
parent: P,
}

impl<T, P: CookieReady + Clone> Clone for CookiedOptionalWatch<T, P> {
fn clone(&self) -> Self {
Self {
value: self.value.clone(),
cookie_index: self.cookie_index.clone(),
cookie_writer: self.cookie_writer.clone(),
parent: self.parent.clone(),
}
}
}

/// A trait for types to signify that a required FS rountrip has been completed.
///
/// This is implemented in the base case for `()`, the parent of all
/// CookieWatches.
pub trait CookieReady {
fn ready(
&mut self,
id: usize,
) -> impl std::future::Future<Output = Result<(), watch::error::RecvError>>;
}

impl CookieReady for () {
async fn ready(&mut self, _id: usize) -> Result<(), watch::error::RecvError> {
Ok(())
}
}

impl<T, U: CookieReady> CookieReady for CookiedOptionalWatch<T, U> {
async fn ready(&mut self, id: usize) -> Result<(), watch::error::RecvError> {
tracing::debug!("waiting for cookie {}", id);
self.parent.ready(id).await?;

// failing here means the
self.cookie_index.wait_for(|v| v >= &id).await.map(|_| ())
}
}

impl<T> CookiedOptionalWatch<T, ()> {
pub fn new(
update: CookieWriter,
) -> (
watch::Sender<Option<T>>,
CookieRegister,
CookiedOptionalWatch<T, ()>,
) {
let (tx, rx) = watch::channel(None);
let (cookie_tx, cookie_rx) = watch::channel(0);
tracing::debug!("starting cookied optional watch in {}", update.root());
(
tx,
CookieRegister(cookie_tx, update.root().to_owned()),
CookiedOptionalWatch {
value: rx,
cookie_index: cookie_rx,
cookie_writer: update,
parent: (),
},
)
}
}

impl<T, U: CookieReady + Clone> CookiedOptionalWatch<T, U> {
/// Create a new sibling cookie watcher that inherits the same fs source as
/// this one.
pub fn new_sibling<T2>(&self) -> (watch::Sender<Option<T2>>, CookiedOptionalWatch<T2, U>) {
let (tx, rx) = watch::channel(None);
(
tx,
CookiedOptionalWatch {
value: rx,
cookie_index: self.cookie_index.clone(),
cookie_writer: self.cookie_writer.clone(),
parent: self.parent.clone(),
},
)
}

/// Create a new child cookie watcher that inherits the same fs source as
/// this one, but also has its own cookie source. This allows you to
/// synchronize two independent cookie streams.
pub fn new_child<T2>(
&self,
) -> (
watch::Sender<Option<T2>>,
CookieRegister,
CookiedOptionalWatch<T2, Self>,
) {
let (tx, rx) = watch::channel(None);
let (cookie_tx, cookie_rx) = watch::channel(0);

(
tx,
CookieRegister(cookie_tx, self.cookie_writer.root().to_owned()),
CookiedOptionalWatch {
value: rx,
cookie_index: cookie_rx,
cookie_writer: self.cookie_writer.clone(),
parent: self.clone(),
},
)
}

/// Get the current value, or wait until one becomes available.
#[tracing::instrument(skip(self))]
pub async fn get(&mut self) -> Result<SomeRef<'_, T>, watch::error::RecvError> {
let next_id = self.cookie_writer.cookie_request(()).await.unwrap().serial;
self.ready(next_id).await?;
tracing::debug!("waiting for data");
self.get_inner().await
}

/// Get the next, changed, available value.
///
/// This requires three conditions to be met:
/// - the value to change
/// - for a cookie to be seen
/// - for the value to be available
#[tracing::instrument(skip(self))]
pub async fn get_change(&mut self) -> Result<SomeRef<'_, T>, watch::error::RecvError> {
self.value.changed().await?;
self.get_inner().await
}

/// Get the raw value, without issuing a cookie request.
///
/// Please do not use this data from a user-facing query. It should only
/// really be used for internal state management. Equivalent to
/// `OptionalWatch::get`
///
/// For an example as to why we need this, sometimes file event processing
/// needs to access data but issuing a cookie request would deadlock.
///
/// `_reason` is purely for documentation purposes and is not used.
pub async fn get_raw(
&mut self,
_reason: &str,
) -> Result<SomeRef<'_, T>, watch::error::RecvError> {
self.get_inner().await
}

/// Get the current value, if it is available.
///
/// Unlike `OptionalWatch::get_immediate`, this method will block until the
/// cookie has been seen, at which point it will call `now_or_never` on the
/// value watch.
#[tracing::instrument(skip(self))]
pub async fn get_immediate(
&mut self,
) -> Option<Result<SomeRef<'_, T>, watch::error::RecvError>> {
let next_id = self.cookie_writer.cookie_request(()).await.unwrap().serial;
self.cookie_index.wait_for(|v| v >= &next_id).await.ok()?;
self.get_inner().now_or_never()
}

async fn get_inner(&mut self) -> Result<SomeRef<'_, T>, watch::error::RecvError> {
self.value.wait_for(|f| f.is_some()).await?;
Ok(SomeRef(self.value.borrow()))
}
}

pub struct CookieRegister(watch::Sender<usize>, AbsoluteSystemPathBuf);
impl CookieRegister {
pub fn register(&self, paths: &[&AbsoluteSystemPath]) {
tracing::trace!("registering cookie for {:?}", paths);
for path in paths {
if let Some(serial) = serial_for_path(&self.1, path) {
tracing::trace!("updating cookie to {}", serial);
let _ = self.0.send(serial);
}
}
}
}

#[cfg(test)]
mod test {
use std::time::Duration;
Expand Down
2 changes: 1 addition & 1 deletion crates/turborepo-filewatch/src/optional_watch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ impl<T> OptionalWatch<T> {
}
}

pub struct SomeRef<'a, T>(Ref<'a, Option<T>>);
pub struct SomeRef<'a, T>(pub(crate) Ref<'a, Option<T>>);

impl<'a, T> std::ops::Deref for SomeRef<'a, T> {
type Target = T;
Expand Down

0 comments on commit 342056a

Please sign in to comment.