Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sync: implement watch::Receiver::wait_for method #5611

Merged
merged 33 commits into from
Apr 24, 2023
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
e7f8366
sync: implement watch::Receiver::wait_for method
debadree25 Apr 9, 2023
32de794
fixup! lint
debadree25 Apr 9, 2023
79a7f1a
fixup! doc
debadree25 Apr 10, 2023
899c11a
fixup! lint again
debadree25 Apr 10, 2023
e0e5157
fixup! add a test
debadree25 Apr 10, 2023
778017c
Merge remote-tracking branch 'origin/master' into ft/implement-wait_for
debadree25 Apr 11, 2023
de91fc4
fixup! update the test for multiple threads
debadree25 Apr 11, 2023
f8e0810
fixup! lint
debadree25 Apr 11, 2023
15b0ee5
fixup! use send_modify and fix race condition
debadree25 Apr 11, 2023
539a17b
fixup! simplify the closure
debadree25 Apr 11, 2023
3e8f268
fixup! lint
debadree25 Apr 11, 2023
6424dd1
fixup! add description of the function
debadree25 Apr 11, 2023
add1152
fixup! update the description
debadree25 Apr 11, 2023
46196fb
fixup! return type same as changed()
debadree25 Apr 11, 2023
1287301
fixup! lint
debadree25 Apr 11, 2023
d76b150
Merge branch 'master' into ft/implement-wait_for
debadree25 Apr 12, 2023
a8793fa
Merge branch 'master' into ft/implement-wait_for
debadree25 Apr 17, 2023
ba8db72
fixup! update docs
debadree25 Apr 17, 2023
d7d7adc
fixup! add a guarantee to see the last value
debadree25 Apr 17, 2023
16f5277
Merge branch 'master' into ft/implement-wait_for
debadree25 Apr 21, 2023
2bece23
fixup! use Ref
debadree25 Apr 21, 2023
3f5f3e0
fixup! finish update to Ref
debadree25 Apr 21, 2023
b76576e
fixup! clippy fix
debadree25 Apr 21, 2023
59e7074
fixup! clippy again
debadree25 Apr 21, 2023
ca2d2bf
fixup! include version matching
debadree25 Apr 22, 2023
09652f8
fixup! use has_changed instead
debadree25 Apr 22, 2023
e4472d5
inline borrow_and_update into wait_for
Darksonn Apr 23, 2023
0e5f4fe
Merge branch 'master' into ft/implement-wait_for
debadree25 Apr 23, 2023
fd15eea
fixup! lint and clippy
debadree25 Apr 23, 2023
39f1cbe
fixup! lint again
debadree25 Apr 23, 2023
1f38f3f
Update tokio/src/sync/tests/loom_watch.rs
debadree25 Apr 24, 2023
3d1077f
fixup! add another test
debadree25 Apr 24, 2023
9157bbc
fixup! lint
debadree25 Apr 24, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions tokio/src/sync/tests/loom_watch.rs
Darksonn marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ use crate::sync::watch;

use loom::future::block_on;
use loom::thread;
use std::sync::Arc;
use std::time::Duration;

#[test]
fn smoke() {
Expand Down Expand Up @@ -34,3 +36,32 @@ fn smoke() {
th.join().unwrap();
})
}

#[test]
fn wait_for_test() {
loom::model(move || {
let (tx, mut rx) = watch::channel(false);

let tx_arc = Arc::new(tx);
let tx1 = tx_arc.clone();
let tx2 = tx_arc.clone();

let th1 = thread::spawn(move || {
for _ in 0..10 {
tx1.send(false).unwrap();
std::thread::sleep(Duration::from_millis(10));
debadree25 marked this conversation as resolved.
Show resolved Hide resolved
}
Darksonn marked this conversation as resolved.
Show resolved Hide resolved
});

let th2 = thread::spawn(move || {
std::thread::sleep(Duration::from_millis(10));
tx2.send(true).unwrap();
Darksonn marked this conversation as resolved.
Show resolved Hide resolved
});

let result = block_on(rx.wait_for(|x| *x));
assert_eq!(result.unwrap(), true);

th1.join().unwrap();
th2.join().unwrap();
});
}
49 changes: 49 additions & 0 deletions tokio/src/sync/watch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -610,6 +610,55 @@ impl<T> Receiver<T> {
}
}

/// This function is similar to [`Receiver::changed()`], but it takes a closure that is
/// called with a reference to the new value. If the closure returns `true`,
/// then the function returns immediately. Otherwise, it waits for a new
/// value and calls the closure again.
Darksonn marked this conversation as resolved.
Show resolved Hide resolved
///
/// [`Receiver::changed()`]: crate::sync::watch::Receiver::changed
///
/// # Examples
///
/// ```
/// use tokio::sync::watch;
///
/// #[tokio::main]
///
/// async fn main() {
/// let (tx, _rx) = watch::channel("hello");
///
/// tx.send("goodbye").unwrap();
///
/// // here we subscribe to a second receiver
/// // now in case of using `changed` we would have
/// // to first check the current value and then wait
/// // for changes or else `changed` would hang.
/// let mut rx2 = tx.subscribe();
///
/// // in place of changed we have use `wait_for`
/// // which would automatically check the current value
/// // and wait for changes until the closure returns true.
/// assert!(rx2.wait_for(|val| *val == "goodbye").await.is_ok());
/// assert_eq!(*rx2.borrow(), "goodbye");
/// }
/// ```

pub async fn wait_for(
debadree25 marked this conversation as resolved.
Show resolved Hide resolved
Darksonn marked this conversation as resolved.
Show resolved Hide resolved
&mut self,
mut f: impl FnMut(&T) -> bool,
) -> Result<bool, error::RecvError> {
Darksonn marked this conversation as resolved.
Show resolved Hide resolved
loop {
if f(&self.borrow_and_update()) {
return Ok(true);
}

let changed = self.changed().await;
if changed.is_err() {
return Ok(false);
}
}
}

/// Returns `true` if receivers belong to the same channel.
///
/// # Examples
Expand Down