diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index 3f6134b438b..7852b0cb1bf 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -247,7 +247,7 @@ impl Receiver { /// [`changed`]: Receiver::changed pub fn borrow_and_update(&mut self) -> Ref<'_, T> { let inner = self.shared.value.read().unwrap(); - self.version = self.shared.version.load(SeqCst); + self.version = self.shared.version.load(SeqCst) & !CLOSED; Ref { inner } } diff --git a/tokio/tests/sync_watch.rs b/tokio/tests/sync_watch.rs index 9dcb0c530fa..a2a276d8beb 100644 --- a/tokio/tests/sync_watch.rs +++ b/tokio/tests/sync_watch.rs @@ -169,3 +169,20 @@ fn poll_close() { assert!(tx.send("two").is_err()); } + +#[test] +fn borrow_and_update() { + let (tx, mut rx) = watch::channel("one"); + + tx.send("two").unwrap(); + assert_ready!(spawn(rx.changed()).poll()).unwrap(); + assert_pending!(spawn(rx.changed()).poll()); + + tx.send("three").unwrap(); + assert_eq!(*rx.borrow_and_update(), "three"); + assert_pending!(spawn(rx.changed()).poll()); + + drop(tx); + assert_eq!(*rx.borrow_and_update(), "three"); + assert_ready!(spawn(rx.changed()).poll()).unwrap_err(); +}