diff --git a/tokio/src/sync/mod.rs b/tokio/src/sync/mod.rs index 70fd9b9e32d..8af2536311d 100644 --- a/tokio/src/sync/mod.rs +++ b/tokio/src/sync/mod.rs @@ -383,7 +383,7 @@ //! sleep.set(time::sleep_until(op_start + conf.timeout)); //! } //! _ = rx.changed() => { -//! conf = rx.borrow().clone(); +//! conf = rx.borrow_and_update().clone(); //! //! // The configuration has been updated. Update the //! // `sleep` using the new `timeout` value. diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index ad64539895f..5a46670eeeb 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -10,24 +10,75 @@ //! //! [`channel`] returns a [`Sender`] / [`Receiver`] pair. These are the producer //! and consumer halves of the channel. The channel is created with an initial -//! value. The **latest** value stored in the channel is accessed with -//! [`Receiver::borrow()`]. Awaiting [`Receiver::changed()`] waits for a new -//! value to be sent by the [`Sender`] half. +//! value. +//! +//! Each [`Receiver`] independently tracks the last value *seen* by its caller. +//! +//! To access the **current** value stored in the channel and mark it as *seen* +//! by a given [`Receiver`], use [`Receiver::borrow_and_update()`]. +//! +//! To access the current value **without** marking it as *seen*, use +//! [`Receiver::borrow()`]. (If the value has already been marked *seen*, +//! [`Receiver::borrow()`] is equivalent to [`Receiver::borrow_and_update()`].) +//! +//! For more information on when to use these methods, see +//! [here](#borrow_and_update-versus-borrow). +//! +//! ## Change notifications +//! +//! The [`Receiver`] half provides an asynchronous [`changed`] method. This +//! method is ready when a new, *unseen* value is sent via the [`Sender`] half. +//! +//! * [`Receiver::changed()`] returns `Ok(())` on receiving a new value, or +//! `Err(`[`error::RecvError`]`)` if the [`Sender`] has been dropped. +//! * If the current value is *unseen* when calling [`changed`], then +//! [`changed`] will return immediately. If the current value is *seen*, then +//! it will sleep until either a new message is sent via the [`Sender`] half, +//! or the [`Sender`] is dropped. +//! * On completion, the [`changed`] method marks the new value as *seen*. +//! * At creation, the initial value is considered *seen*. In other words, +//! [`Receiver::changed()`] will not return until a subsequent value is sent. +//! * New [`Receiver`] instances can be created with [`Sender::subscribe()`]. +//! The current value at the time the [`Receiver`] is created is considered +//! *seen*. +//! +//! ## `borrow_and_update` versus `borrow` +//! +//! If the receiver intends to await notifications from [`changed`] in a loop, +//! [`Receiver::borrow_and_update()`] should be preferred over +//! [`Receiver::borrow()`]. This avoids a potential race where a new value is +//! sent between [`changed`] being ready and the value being read. (If +//! [`Receiver::borrow()`] is used, the loop may run twice with the same value.) +//! +//! If the receiver is only interested in the current value, and does not intend +//! to wait for changes, then [`Receiver::borrow()`] can be used. It may be more +//! convenient to use [`borrow`](Receiver::borrow) since it's an `&self` +//! method---[`borrow_and_update`](Receiver::borrow_and_update) requires `&mut +//! self`. //! //! # Examples //! +//! The following example prints `hello! world! `. +//! //! ``` //! use tokio::sync::watch; +//! use tokio::time::{Duration, sleep}; //! //! # async fn dox() -> Result<(), Box> { //! let (tx, mut rx) = watch::channel("hello"); //! //! tokio::spawn(async move { -//! while rx.changed().await.is_ok() { -//! println!("received = {:?}", *rx.borrow()); +//! // Use the equivalent of a "do-while" loop so the initial value is +//! // processed before awaiting the `changed()` future. +//! loop { +//! println!("{}! ", *rx.borrow_and_update()); +//! if rx.changed().await.is_err() { +//! break; +//! } //! } //! }); //! +//! sleep(Duration::from_millis(100)).await; //! tx.send("world")?; //! # Ok(()) //! # } @@ -39,8 +90,8 @@ //! when all [`Receiver`] handles have been dropped. This indicates that there //! is no further interest in the values being produced and work can be stopped. //! -//! The value in the channel will not be dropped until the sender and all receivers -//! have been dropped. +//! The value in the channel will not be dropped until the sender and all +//! receivers have been dropped. //! //! # Thread safety //! @@ -50,11 +101,15 @@ //! //! [`Sender`]: crate::sync::watch::Sender //! [`Receiver`]: crate::sync::watch::Receiver +//! [`changed`]: crate::sync::watch::Receiver::changed //! [`Receiver::changed()`]: crate::sync::watch::Receiver::changed //! [`Receiver::borrow()`]: crate::sync::watch::Receiver::borrow +//! [`Receiver::borrow_and_update()`]: +//! crate::sync::watch::Receiver::borrow_and_update //! [`channel`]: crate::sync::watch::channel //! [`Sender::is_closed`]: crate::sync::watch::Sender::is_closed //! [`Sender::closed`]: crate::sync::watch::Sender::closed +//! [`Sender::subscribe()`]: crate::sync::watch::Sender::subscribe use crate::sync::notify::Notify; @@ -374,19 +429,28 @@ mod state { /// /// # Examples /// +/// The following example prints `hello! world! `. +/// /// ``` /// use tokio::sync::watch; +/// use tokio::time::{Duration, sleep}; /// /// # async fn dox() -> Result<(), Box> { -/// let (tx, mut rx) = watch::channel("hello"); +/// let (tx, mut rx) = watch::channel("hello"); /// -/// tokio::spawn(async move { -/// while rx.changed().await.is_ok() { -/// println!("received = {:?}", *rx.borrow()); +/// tokio::spawn(async move { +/// // Use the equivalent of a "do-while" loop so the initial value is +/// // processed before awaiting the `changed()` future. +/// loop { +/// println!("{}! ", *rx.borrow_and_update()); +/// if rx.changed().await.is_err() { +/// break; /// } -/// }); +/// } +/// }); /// -/// tx.send("world")?; +/// sleep(Duration::from_millis(100)).await; +/// tx.send("world")?; /// # Ok(()) /// # } /// ``` @@ -453,7 +517,11 @@ impl Receiver { /// ``` /// /// + /// For more information on when to use this method versus + /// [`borrow_and_update`], see [here](self#borrow_and_update-versus-borrow). + /// /// [`changed`]: Receiver::changed + /// [`borrow_and_update`]: Receiver::borrow_and_update /// /// # Examples /// @@ -505,7 +573,11 @@ impl Receiver { /// ``` /// /// + /// For more information on when to use this method versus [`borrow`], see + /// [here](self#borrow_and_update-versus-borrow). + /// /// [`changed`]: Receiver::changed + /// [`borrow`]: Receiver::borrow pub fn borrow_and_update(&mut self) -> Ref<'_, T> { let inner = self.shared.value.read().unwrap(); @@ -572,6 +644,9 @@ impl Receiver { /// /// This method returns an error if and only if the [`Sender`] is dropped. /// + /// For more information, see + /// [*Change notifications*](self#change-notifications) in the module-level documentation. + /// /// # Cancel safety /// /// This method is cancel safe. If you use it as the event in a @@ -595,7 +670,7 @@ impl Receiver { /// }); /// /// assert!(rx.changed().await.is_ok()); - /// assert_eq!(*rx.borrow(), "goodbye"); + /// assert_eq!(*rx.borrow_and_update(), "goodbye"); /// /// // The `tx` handle has been dropped /// assert!(rx.changed().await.is_err());