Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

No good way to reliably notify all consumers about a change #3757

Closed
arthurprs opened this issue May 6, 2021 · 5 comments
Closed

No good way to reliably notify all consumers about a change #3757

arthurprs opened this issue May 6, 2021 · 5 comments
Labels
A-tokio Area: The main tokio crate C-feature-request Category: A feature request. M-sync Module: tokio/sync

Comments

@arthurprs
Copy link
Contributor

arthurprs commented May 6, 2021

Is your feature request related to a problem? Please describe.

Consider an use-case where a producer wants to notify consumers about a change, even if they were doing something else at the time of the notification.

At first Notify and Notify::notify_waiters seem like a fit, but it's not. Because notify_waiters doesn't make the permit available for waiters that are not waiting at the time of the notification.

C: subscribe to S by cloning the Notify
P: notifier.notify_waiters()
C: notifier.notified().await <<< waits even if the permit was provided above

Option 1

One way to get the expected behavior is to use a broadcast channel with capacity of 1.

C: subscribe to S by calling sender.subscribe()
P: sender.send(())
C: receiver.recv().await <<< wakes up correctly, with either Err(Lagged(_)) or Ok(())

This works but involves a a couple locks inside the Broadcast channel unnecessarily.

Option 2

A Watch works too but it's unergonomic to keep both Sender and Receivers around, as there's no equivalent to subscribe()

Describe the solution you'd like

Option 1

Add Notifier::notify_all which will notify all existing waiters, not only the ones awaiting.

Option 2

Consider exposing Watch::subscribe which already exists internally. With it this use case can be modeled ergonomically/performantly with a Sender::subscribe() + Receiver::changed().await

pub(crate) fn subscribe(&self) -> Receiver<T> {

Describe alternatives you've considered

See description above, which mentions 2 options, each with its set of drawbacks.

Additional context

I did find #3066 seems to describe a similar need but was closed with a partial(?) solution in the form of Notifier::notify_awaiters which only takes into account current waiters.

@arthurprs arthurprs added A-tokio Area: The main tokio crate C-feature-request Category: A feature request. labels May 6, 2021
@arthurprs arthurprs changed the title Consider exposing Watch::subscribe No good way to notify all consumers about a change May 6, 2021
@arthurprs arthurprs changed the title No good way to notify all consumers about a change No good way to reliably notify all consumers about a change May 6, 2021
@Darksonn Darksonn added the M-sync Module: tokio/sync label May 6, 2021
@Darksonn
Copy link
Contributor

Darksonn commented May 6, 2021

Add Notifier::notify_all which will notify all existing waiters, not only the ones awaiting.

This is not possible because unlike the channels, the Notify utility does not know how many waiters there are. It is shared by putting it in an Arc, not by cloning the Notify itself.

@Wesmania
Copy link

Wesmania commented May 12, 2021

Would changing signature of notified to fn(&self) -> impl Future<Output = ()> + '_ help? This way, Notified struct could insert itself into a waiter queue immediately and could be used like so:

struct Event {
    occured: Arc<Mutex<bool>>,
    notify: Notify,
}

impl Event {
    fn set(&self) {
        *self.occured.lock().unwrap() = true;
        self.notify.notify_waiters();
    }

    fn wait(&self) -> impl Future<Output = ()> {
        let guard = self.occured.lock().unwrap();
        let wait = self.notify.notified();
        let occured = *guard;
        async move {
            if occured {
                return;
            } else {
                wait.await;
            }
        }
    }
}

@arthurprs
Copy link
Contributor Author

Something like that works, and is very similar to Watch with no value. But occurred needs to be a counter (that is also tracked in Event) so each Event instance can track if it's up to date or not.

@Wesmania
Copy link

Actually I'm wrong, notify already works like that since Notified is constructed as part of the future. Sorry for the confusion.

@Darksonn
Copy link
Contributor

The subscribe method has been exposed on the watch channel, so I'm closing this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-tokio Area: The main tokio crate C-feature-request Category: A feature request. M-sync Module: tokio/sync
Projects
None yet
Development

No branches or pull requests

3 participants