From 68923a7215e694d64c27242d646dee11933d4a66 Mon Sep 17 00:00:00 2001 From: Rain Date: Sat, 26 Aug 2023 17:27:36 -0700 Subject: [PATCH 1/6] sync: improve docs for watch channels I found the watch docs as written to be somewhat confusing. * It wasn't clear to me whether values are marked seen or not at creation/subscribe time. * The example also confused me a bit, suggesting a while loop when a do-while loop is generally more correct. * I noticed a potential race with `borrow` that is no longer an issue with `borrow_and_update`. Update the documentation to try and make it clearer. --- tokio/src/sync/mod.rs | 2 +- tokio/src/sync/watch.rs | 80 +++++++++++++++++++++++++++++++++-------- 2 files changed, 67 insertions(+), 15 deletions(-) diff --git a/tokio/src/sync/mod.rs b/tokio/src/sync/mod.rs index 70fd9b9e32d..8af2536311d 100644 --- a/tokio/src/sync/mod.rs +++ b/tokio/src/sync/mod.rs @@ -383,7 +383,7 @@ //! sleep.set(time::sleep_until(op_start + conf.timeout)); //! } //! _ = rx.changed() => { -//! conf = rx.borrow().clone(); +//! conf = rx.borrow_and_update().clone(); //! //! // The configuration has been updated. Update the //! // `sleep` using the new `timeout` value. diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index ad64539895f..14949ada717 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -10,24 +10,63 @@ //! //! [`channel`] returns a [`Sender`] / [`Receiver`] pair. These are the producer //! and consumer halves of the channel. The channel is created with an initial -//! value. The **latest** value stored in the channel is accessed with -//! [`Receiver::borrow()`]. Awaiting [`Receiver::changed()`] waits for a new -//! value to be sent by the [`Sender`] half. +//! value. +//! +//! Each [`Receiver`] independently tracks the last value *seen* by its caller. +//! +//! To access the **latest** value stored in the channel and mark it as *seen*, +//! use [`Receiver::borrow_and_update()`]. +//! +//! To access the latest value but **not** mark it as seen, use +//! [`Receiver::borrow()`]. +//! +//! ## Change notifications +//! +//! The [`Receiver`] half provides an asynchronous [`changed`] method. This +//! method is ready when a new, *unseen* value is sent via the [`Sender`] half. +//! +//! * The [`changed`] method returns `Ok(())` on receiving a new value, or +//! `Err(_)` if the [`Sender`] has been closed. +//! * On completion, the [`changed`] method marks the new value as *seen*. If +//! [`Receiver::changed()`] is called again, it will not be ready unless a +//! subsequent value is sent. +//! * At creation, the initial value is considered *seen*. In other words, +//! [`Receiver::changed()`] will not be ready until a subsequent value is sent +//! via the [`Sender`] half. +//! * New [`Receiver`] instances can be created with [`Sender::subscribe()`]. +//! The current value at the time the [`Receiver`] is created is considered +//! *seen*. [`Receiver::changed()`] will only be ready after subsequent values +//! are sent. //! //! # Examples //! +//! In a loop with [`Receiver::changed()`], [`Receiver::borrow_and_update()`] is +//! recommended over [`Receiver::borrow()`]. This avoids a potential race where +//! a new value is sent between [`changed`] being ready and the value being +//! read. If [`Receiver::borrow()`] is used, the loop may run twice with the +//! same value. +//! +//! The following example prints `hello! world! `. +//! //! ``` //! use tokio::sync::watch; +//! use tokio::time::{Duration, sleep}; //! //! # async fn dox() -> Result<(), Box> { //! let (tx, mut rx) = watch::channel("hello"); //! //! tokio::spawn(async move { -//! while rx.changed().await.is_ok() { -//! println!("received = {:?}", *rx.borrow()); +//! // Use the equivalent of a "do-while" loop so the initial value is +//! // processed. +//! loop { +//! println!("{}! ", *rx.borrow_and_update()); +//! if rx.changed().await.is_err() { +//! break; +//! } //! } //! }); //! +//! sleep(Duration::from_millis(100)).await; //! tx.send("world")?; //! # Ok(()) //! # } @@ -39,8 +78,8 @@ //! when all [`Receiver`] handles have been dropped. This indicates that there //! is no further interest in the values being produced and work can be stopped. //! -//! The value in the channel will not be dropped until the sender and all receivers -//! have been dropped. +//! The value in the channel will not be dropped until the sender and all +//! receivers have been dropped. //! //! # Thread safety //! @@ -50,11 +89,15 @@ //! //! [`Sender`]: crate::sync::watch::Sender //! [`Receiver`]: crate::sync::watch::Receiver +//! [`changed`]: crate::sync::watch::Receiver::changed //! [`Receiver::changed()`]: crate::sync::watch::Receiver::changed //! [`Receiver::borrow()`]: crate::sync::watch::Receiver::borrow +//! [`Receiver::borrow_and_update()`]: +//! crate::sync::watch::Receiver::borrow_and_update //! [`channel`]: crate::sync::watch::channel //! [`Sender::is_closed`]: crate::sync::watch::Sender::is_closed //! [`Sender::closed`]: crate::sync::watch::Sender::closed +//! [`Sender::subscribe()`]: crate::sync::watch::Sender::subscribe use crate::sync::notify::Notify; @@ -374,19 +417,28 @@ mod state { /// /// # Examples /// +/// The following example prints `hello! world! `. +/// /// ``` /// use tokio::sync::watch; +/// use tokio::time::{Duration, sleep}; /// /// # async fn dox() -> Result<(), Box> { -/// let (tx, mut rx) = watch::channel("hello"); +/// let (tx, mut rx) = watch::channel("hello"); /// -/// tokio::spawn(async move { -/// while rx.changed().await.is_ok() { -/// println!("received = {:?}", *rx.borrow()); +/// tokio::spawn(async move { +/// // Use the equivalent of a "do-while" loop so the initial value is +/// // processed. +/// loop { +/// println!("{}! ", *rx.borrow_and_update()); +/// if rx.changed().await.is_err() { +/// break; /// } -/// }); +/// } +/// }); /// -/// tx.send("world")?; +/// sleep(Duration::from_millis(100)).await; +/// tx.send("world")?; /// # Ok(()) /// # } /// ``` @@ -595,7 +647,7 @@ impl Receiver { /// }); /// /// assert!(rx.changed().await.is_ok()); - /// assert_eq!(*rx.borrow(), "goodbye"); + /// assert_eq!(*rx.borrow_and_update(), "goodbye"); /// /// // The `tx` handle has been dropped /// assert!(rx.changed().await.is_err()); From 6d7ec40006c27c3378144dd2df3c6eb326c260f5 Mon Sep 17 00:00:00 2001 From: Rain Date: Sun, 27 Aug 2023 12:28:49 -0700 Subject: [PATCH 2/6] address review comment --- tokio/src/sync/watch.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index 14949ada717..0ae7137f644 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -28,14 +28,14 @@ //! * The [`changed`] method returns `Ok(())` on receiving a new value, or //! `Err(_)` if the [`Sender`] has been closed. //! * On completion, the [`changed`] method marks the new value as *seen*. If -//! [`Receiver::changed()`] is called again, it will not be ready unless a -//! subsequent value is sent. +//! [`Receiver::changed()`] is called again, it will not return immediately +//! unless a subsequent value is sent. //! * At creation, the initial value is considered *seen*. In other words, -//! [`Receiver::changed()`] will not be ready until a subsequent value is sent -//! via the [`Sender`] half. +//! [`Receiver::changed()`] will not return immediately until a subsequent +//! value is sent via the [`Sender`] half. //! * New [`Receiver`] instances can be created with [`Sender::subscribe()`]. //! The current value at the time the [`Receiver`] is created is considered -//! *seen*. [`Receiver::changed()`] will only be ready after subsequent values +//! *seen*. [`Receiver::changed()`] will only return after subsequent values //! are sent. //! //! # Examples From 0b7f413b5f3d8c11e9e800dc0f9fddd6379a78bd Mon Sep 17 00:00:00 2001 From: Rain Date: Sun, 27 Aug 2023 12:54:01 -0700 Subject: [PATCH 3/6] also make it clearer that borrow doesn't change its state --- tokio/src/sync/watch.rs | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index 0ae7137f644..958e4737776 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -17,8 +17,9 @@ //! To access the **latest** value stored in the channel and mark it as *seen*, //! use [`Receiver::borrow_and_update()`]. //! -//! To access the latest value but **not** mark it as seen, use -//! [`Receiver::borrow()`]. +//! To access the latest value **without** changing its state to *seen*, use +//! [`Receiver::borrow()`]. (If the value has already been marked *seen*, +//! [`Receiver::borrow()`] is equivalent to [`Receiver::borrow_and_update()`].) //! //! ## Change notifications //! @@ -31,12 +32,12 @@ //! [`Receiver::changed()`] is called again, it will not return immediately //! unless a subsequent value is sent. //! * At creation, the initial value is considered *seen*. In other words, -//! [`Receiver::changed()`] will not return immediately until a subsequent -//! value is sent via the [`Sender`] half. +//! [`Receiver::changed()`] will not return until a subsequent value is sent +//! via the [`Sender`] half. //! * New [`Receiver`] instances can be created with [`Sender::subscribe()`]. //! The current value at the time the [`Receiver`] is created is considered -//! *seen*. [`Receiver::changed()`] will only return after subsequent values -//! are sent. +//! *seen*. [`Receiver::changed()`] will not return until a subsequent value +//! is sent. //! //! # Examples //! From 70910603c10003645261e717196c88ded623b0c8 Mon Sep 17 00:00:00 2001 From: Rain Date: Mon, 28 Aug 2023 10:57:54 -0700 Subject: [PATCH 4/6] address review comment --- tokio/src/sync/watch.rs | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index 958e4737776..f532f8dc534 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -26,18 +26,18 @@ //! The [`Receiver`] half provides an asynchronous [`changed`] method. This //! method is ready when a new, *unseen* value is sent via the [`Sender`] half. //! -//! * The [`changed`] method returns `Ok(())` on receiving a new value, or -//! `Err(_)` if the [`Sender`] has been closed. -//! * On completion, the [`changed`] method marks the new value as *seen*. If -//! [`Receiver::changed()`] is called again, it will not return immediately -//! unless a subsequent value is sent. +//! * [`Receiver::changed()`] returns `Ok(())` on receiving a new value, or +//! `Err(_)` if the [`Sender`] has been dropped. +//! * If the latest value is *unseen* when calling [`changed`], then [`changed`] +//! will return immediately. If the latest message is *seen*, then it will +//! sleep until either a new message is sent via the [`Sender`] half, or the +//! [`Sender`] is dropped. +//! * On completion, the [`changed`] method marks the new value as *seen*. //! * At creation, the initial value is considered *seen*. In other words, -//! [`Receiver::changed()`] will not return until a subsequent value is sent -//! via the [`Sender`] half. +//! [`Receiver::changed()`] will not return until a subsequent value is sent. //! * New [`Receiver`] instances can be created with [`Sender::subscribe()`]. //! The current value at the time the [`Receiver`] is created is considered -//! *seen*. [`Receiver::changed()`] will not return until a subsequent value -//! is sent. +//! *seen*. //! //! # Examples //! From cc0433d719319536770d524869e86e8c7113da87 Mon Sep 17 00:00:00 2001 From: Rain Date: Mon, 28 Aug 2023 12:40:02 -0700 Subject: [PATCH 5/6] more review comments --- tokio/src/sync/watch.rs | 43 ++++++++++++++++++++++++++--------------- 1 file changed, 27 insertions(+), 16 deletions(-) diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index f532f8dc534..b5e9e05e260 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -14,10 +14,10 @@ //! //! Each [`Receiver`] independently tracks the last value *seen* by its caller. //! -//! To access the **latest** value stored in the channel and mark it as *seen*, -//! use [`Receiver::borrow_and_update()`]. +//! To access the **current** value stored in the channel and mark it as *seen* +//! by a given [`Receiver`], use [`Receiver::borrow_and_update()`]. //! -//! To access the latest value **without** changing its state to *seen*, use +//! To access the current value **without** marking it as *seen*, use //! [`Receiver::borrow()`]. (If the value has already been marked *seen*, //! [`Receiver::borrow()`] is equivalent to [`Receiver::borrow_and_update()`].) //! @@ -27,11 +27,11 @@ //! method is ready when a new, *unseen* value is sent via the [`Sender`] half. //! //! * [`Receiver::changed()`] returns `Ok(())` on receiving a new value, or -//! `Err(_)` if the [`Sender`] has been dropped. -//! * If the latest value is *unseen* when calling [`changed`], then [`changed`] -//! will return immediately. If the latest message is *seen*, then it will -//! sleep until either a new message is sent via the [`Sender`] half, or the -//! [`Sender`] is dropped. +//! `Err(`[`error::RecvError`]`)` if the [`Sender`] has been dropped. +//! * If the current value is *unseen* when calling [`changed`], then +//! [`changed`] will return immediately. If the current value is *seen*, then +//! it will sleep until either a new message is sent via the [`Sender`] half, +//! or the [`Sender`] is dropped. //! * On completion, the [`changed`] method marks the new value as *seen*. //! * At creation, the initial value is considered *seen*. In other words, //! [`Receiver::changed()`] will not return until a subsequent value is sent. @@ -39,13 +39,21 @@ //! The current value at the time the [`Receiver`] is created is considered //! *seen*. //! -//! # Examples +//! ## `borrow_and_update` versus `borrow` +//! +//! If the receiver intends to await notifications from [`changed`] in a loop, +//! [`Receiver::borrow_and_update()`] should be preferred over +//! [`Receiver::borrow()`]. This avoids a potential race where a new value is +//! sent between [`changed`] being ready and the value being read. (If +//! [`Receiver::borrow()`] is used, the loop may run twice with the same value.) //! -//! In a loop with [`Receiver::changed()`], [`Receiver::borrow_and_update()`] is -//! recommended over [`Receiver::borrow()`]. This avoids a potential race where -//! a new value is sent between [`changed`] being ready and the value being -//! read. If [`Receiver::borrow()`] is used, the loop may run twice with the -//! same value. +//! If the receiver is only interested in the current value, and does not intend +//! to wait for changes, then [`Receiver::borrow()`] can be used. It may be more +//! convenient to use [`borrow`](Receiver::borrow) since it's an `&self` +//! method---[`borrow_and_update`](Receiver::borrow_and_update) requires `&mut +//! self`. +//! +//! # Examples //! //! The following example prints `hello! world! `. //! @@ -58,7 +66,7 @@ //! //! tokio::spawn(async move { //! // Use the equivalent of a "do-while" loop so the initial value is -//! // processed. +//! // processed before awaiting the `changed()` future. //! loop { //! println!("{}! ", *rx.borrow_and_update()); //! if rx.changed().await.is_err() { @@ -429,7 +437,7 @@ mod state { /// /// tokio::spawn(async move { /// // Use the equivalent of a "do-while" loop so the initial value is -/// // processed. +/// // processed before awaiting the `changed()` future. /// loop { /// println!("{}! ", *rx.borrow_and_update()); /// if rx.changed().await.is_err() { @@ -625,6 +633,9 @@ impl Receiver { /// /// This method returns an error if and only if the [`Sender`] is dropped. /// + /// For more information, see + /// [*Change notifications*](self#change-notifications) in the module-level documentation. + /// /// # Cancel safety /// /// This method is cancel safe. If you use it as the event in a From 4caf956665363a938254a044a3ebd6086ae31200 Mon Sep 17 00:00:00 2001 From: Rain Date: Mon, 28 Aug 2023 13:11:43 -0700 Subject: [PATCH 6/6] add links to borrow_and_update-versus-borrow --- tokio/src/sync/watch.rs | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index b5e9e05e260..5a46670eeeb 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -21,6 +21,9 @@ //! [`Receiver::borrow()`]. (If the value has already been marked *seen*, //! [`Receiver::borrow()`] is equivalent to [`Receiver::borrow_and_update()`].) //! +//! For more information on when to use these methods, see +//! [here](#borrow_and_update-versus-borrow). +//! //! ## Change notifications //! //! The [`Receiver`] half provides an asynchronous [`changed`] method. This @@ -514,7 +517,11 @@ impl Receiver { /// ``` /// /// + /// For more information on when to use this method versus + /// [`borrow_and_update`], see [here](self#borrow_and_update-versus-borrow). + /// /// [`changed`]: Receiver::changed + /// [`borrow_and_update`]: Receiver::borrow_and_update /// /// # Examples /// @@ -566,7 +573,11 @@ impl Receiver { /// ``` /// /// + /// For more information on when to use this method versus [`borrow`], see + /// [here](self#borrow_and_update-versus-borrow). + /// /// [`changed`]: Receiver::changed + /// [`borrow`]: Receiver::borrow pub fn borrow_and_update(&mut self) -> Ref<'_, T> { let inner = self.shared.value.read().unwrap();