Navigation Menu

Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

watch::Sender: Modify and send value conditionally #4591

Merged
merged 6 commits into from May 3, 2022
88 changes: 79 additions & 9 deletions tokio/src/sync/watch.rs
Expand Up @@ -574,20 +574,88 @@ impl<T> Sender<T> {
pub fn send_modify<F>(&self, func: F)
where
F: FnOnce(&mut T),
{
self.send_if_modified(|value| {
func(value);
true
});
}

/// Modifies watched value, notifying all receivers if modified.
///
/// This can useful for modifying the watched value, without
/// having to allocate a new instance. Additionally, this
/// method permits sending values even when there are no receivers.
///
/// The closure that modifies the value must return `true` if the
/// value has actually been modified. It should only return `false`
/// if the value is guaranteed to be unnmodified despite the mutable
/// borrow. Receivers are only notified if the value has been modified.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although not modifying it when you return false is the intended use-case, I still think we should document what happens if you modify the value and then return false.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While we are at it: Should we remove the redundancy in the docs and refer to the more versatile send_if_modified() in the docs for send_modify()? The docs would then basically reflect the forwarding of the implementation.

Copy link
Contributor

@Darksonn Darksonn Apr 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any improvements to the docs you can come up with would be good! Let me know when you think the docs are ready.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have revised and aligned the docs of both functions, highlighting their differences and similarities. Since I am not a native speaker someone should verify.

Removing the redundancies would be difficult. Copying the shared parts should be ok to keep the descriptions readable.

///
/// Returns the result of the closure, i.e. `true` if the value has
/// been modified and `false` otherwise.
///
/// # Panics
///
/// This function panics if calling `func` results in a panic.
/// No receivers are notified if panic occurred, but if the closure has modified
/// the value, that change is still visible to future calls to `borrow`.
///
/// # Examples
///
/// ```
/// use tokio::sync::watch;
///
/// struct State {
/// counter: usize,
/// }
/// let (state_tx, mut state_rx) = watch::channel(State { counter: 1 });
/// let inc_counter_if_odd = |state: &mut State| {
/// if state.counter % 2 == 1 {
/// state.counter += 1;
/// return true;
/// }
/// false
/// };
///
/// assert_eq!(state_rx.borrow().counter, 1);
///
/// assert!(!state_rx.has_changed().unwrap());
/// assert!(state_tx.send_if_modified(inc_counter_if_odd));
/// assert!(state_rx.has_changed().unwrap());
/// assert_eq!(state_rx.borrow_and_update().counter, 2);
///
/// assert!(!state_rx.has_changed().unwrap());
/// assert!(!state_tx.send_if_modified(inc_counter_if_odd));
/// assert!(!state_rx.has_changed().unwrap());
/// assert_eq!(state_rx.borrow_and_update().counter, 2);
/// ```
pub fn send_if_modified<F>(&self, modify: F) -> bool
where
F: FnOnce(&mut T) -> bool,
{
{
// Acquire the write lock and update the value.
let mut lock = self.shared.value.write().unwrap();

// Update the value and catch possible panic inside func.
let result = panic::catch_unwind(panic::AssertUnwindSafe(|| {
func(&mut lock);
}));
// If the func panicked return the panic to the caller.
if let Err(error) = result {
// Drop the lock to avoid poisoning it.
drop(lock);
panic::resume_unwind(error);
}
let result = panic::catch_unwind(panic::AssertUnwindSafe(|| modify(&mut lock)));
match result {
Ok(modified) => {
if !modified {
// Abort, i.e. don't notify receivers if unmodified
return false;
}
// Continue if modified
}
Err(panicked) => {
// Drop the lock to avoid poisoning it.
drop(lock);
// Forward the panic to the caller.
panic::resume_unwind(panicked);
// Unreachable
}
};

self.shared.state.increment_version();

Expand All @@ -600,6 +668,8 @@ impl<T> Sender<T> {
}

self.shared.notify_rx.notify_waiters();

true
}

/// Sends a new value via the channel, notifying all receivers and returning
Expand Down