diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index 7287dd9faaf..7ccd8f77048 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -786,6 +786,93 @@ impl Receiver { } } + /// Wait for a changed value that satisfies the provided filter condition + /// + /// Each value will be received and fed to `filter_fn` at most once. Values + /// might be skipped if they are sent faster than the receiver can process them. + /// + /// This method works as [`wait_for()`](Self::wait_for) but it will only + /// consider _changed_ values. It could be called repeatedly in a loop while + /// preserving the _at-most-once_ delivery guarantee. The current value + /// could be included even if it has already been seen by calling + /// [`mark_changed()`](Self::mark_changed) beforehand. + /// + /// ``` + /// use tokio::sync::watch; + /// use tokio::time::timeout; + /// + /// const TIMEOUT: std::time::Duration = std::time::Duration::from_millis(1); + /// + /// #[tokio::main] + /// async fn main() { + /// let tx = watch::Sender::new("hello"); + /// + /// let mut rx = tx.subscribe(); + /// + /// // After the subscribing the current value is not marked as changed. + /// assert!(!rx.borrow().has_changed()); + /// assert!(timeout(TIMEOUT, rx.wait_for_changed(|_| true)).await.is_err()); + /// + /// // Mark the current value as changed. + /// rx.mark_changed(); + /// assert_eq!("hello", *rx.wait_for_changed(|val| *val == "hello").await.unwrap()); + /// + /// // Values that don't satisfy the filter are skipped. + /// tx.send("hello again").unwrap(); + /// assert!(timeout(TIMEOUT, rx.wait_for_changed(|val| *val == "hello")).await.is_err()); + /// + /// // Send a final value and close the channel. + /// tx.send("goodbye").unwrap(); + /// drop(tx); + /// + /// // The final value is received if not seen before, even if the channel is closed. + /// assert!(rx.has_changed().is_err()); + /// assert_eq!("goodbye", *rx.borrow()); + /// assert_eq!("goodbye", *rx.wait_for(|val| *val == "goodbye").await.unwrap()); + /// + /// // All subsequent invocations return immediately with an error. + /// assert!(rx.wait_for_changed(|_| true).await.is_err()); + /// assert!(rx.wait_for_changed(|_| false).await.is_err()); + /// } + /// ``` + pub async fn wait_for_changed( + &mut self, + mut filter_fn: impl FnMut(&T) -> bool, + ) -> Result, error::RecvError> { + loop { + // Wait for the value to change. + changed_impl(&self.shared, &mut self.version).await?; + + let inner = self.shared.value.read().unwrap(); + + // The version has already been updated by `changed_impl`. But it + // needs to be updated again after acquiring the read-lock to ensure + // that the current value matches the version. Otherwise the same + // value could be returned twice, violating the at-most-once guarantee. + let new_version = self.shared.state.load().version(); + self.version = new_version; + + // Filter the value and catch a possible panic inside the provided closure. + let result = panic::catch_unwind(panic::AssertUnwindSafe(|| filter_fn(&inner))); + match result { + Ok(true) => { + return Ok(Ref { + inner, + has_changed: true, // Always changed + }); + } + Ok(false) => (), // Continue waiting + Err(panicked) => { + // Drop the lock to avoid poisoning it. + drop(inner); + // Forward the panic to the caller. + panic::resume_unwind(panicked); + // Unreachable + } + }; + } + } + /// Returns `true` if receivers belong to the same channel. /// /// # Examples