Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make possible to update version of watch to version in shared data #3666

Closed
pluyckx opened this issue Mar 30, 2021 · 8 comments · Fixed by #3813
Closed

Make possible to update version of watch to version in shared data #3666

pluyckx opened this issue Mar 30, 2021 · 8 comments · Fixed by #3813
Labels
A-tokio Area: The main tokio crate C-feature-request Category: A feature request. M-sync Module: tokio/sync

Comments

@pluyckx
Copy link

pluyckx commented Mar 30, 2021

Version
├── tokio v1.4.0
│ └── tokio-macros v1.1.0 (proc-macro)
├── tokio-tungstenite v0.14.0
│ ├── tokio v1.4.0 ()
│ ├── tokio-native-tls v0.3.0
│ │ └── tokio v1.4.0 (
)

Platform
Linux {username}-Linux 5.10.23-1-MANJARO #1 SMP PREEMPT Thu Mar 11 18:47:18 UTC 2021 x86_64 GNU/Linux

Description
It seems when you clone a watch receiver, it does not uses the latest version when calling changed() on the cloned watch. It is thus possible if you clone from a watch that hasn't called changed(), your call to changed() wouldn't block.

A bit more information. I have an object that holds a watch Receiver. Users of this object can receive a clone of this Receiver. However, the behavior when calling changed() is not what I expected when new values were sent to this watch. I would expect the clone would use the latest version of the shared data, while it just uses the version of the original watch object. The following is just a simplified example which shows the unexpected behaviour. The following code will assert, while I expect it shouldn't.

#[tokio::main]
async fn main() {
    println!("Start test");
    let (tx, rx) = watch::channel(0);

    tokio::spawn(async move {
        tx.send(1).unwrap();
        sleep(Duration::from_secs(1)).await;
        tx.send(2).unwrap();
    });

    let mut new_rx = rx.clone();
    new_rx.changed().await.unwrap();
    assert_eq!(1, *rx.borrow());

    let mut new_rx = rx.clone();
    new_rx.changed().await.unwrap();
    assert_eq!(2, *rx.borrow());

    println!("test finished");
}

This is solved by changing the clone code from:

impl<T> Clone for Receiver<T> {
    fn clone(&self) -> Self {
        let version = self.version;
        let shared = self.shared.clone();

        Self::from_shared(version, shared)
    }
}

to:

impl<T> Clone for Receiver<T> {
    fn clone(&self) -> Self {
        let version = shared.version.load(SeqCst);
        let shared = self.shared.clone();

        Self::from_shared(version, shared)
    }
}

Maybe clone is not the correct function to do this, but I would expect there should be such function, or a function that just updates the version to the version in the shared data object. Then you could clone the watch receiver and the first update the version to the latest one and then send a request. Then you are sure that changed() will block until a response is received.
I cannot use the subscribe function on the sender object because it is moved to another task.

@pluyckx pluyckx added A-tokio Area: The main tokio crate C-bug Category: This is a bug. labels Mar 30, 2021
@Darksonn Darksonn added the M-sync Module: tokio/sync label Mar 31, 2021
@Darksonn
Copy link
Contributor

This seems like the correct behavior for clone. However we could certainly add a method that updates the version counter. In fact, I think it should already be possible in this way:

recv.changed().now_or_never();

Here using FutureExt::now_or_never.

@pluyckx
Copy link
Author

pluyckx commented Mar 31, 2021

Thank you for your answer. I will try this this evening.

I was just about to say that maybe this isn't a bug and that the behavior of clone is correct (it just makes a deep copy of the object as is). This is more a feature request. I will update this thread after I tested now_or_never with my results and then close it. In case it does not work, I will create a feature request.

@Darksonn
Copy link
Contributor

I mean, it is a reasonable feature request even if now_or_never works. The now_or_never method is not exactly easily discoverable.

@pluyckx
Copy link
Author

pluyckx commented Mar 31, 2021

Ok, is it possible to change the label from bug to feature or should I create a new feature request?

@Darksonn Darksonn added C-feature-request Category: A feature request. and removed C-bug Category: This is a bug. labels Mar 31, 2021
@Darksonn
Copy link
Contributor

I have changed it. It would be good to rename the title as well, which you can do yourself.

@pluyckx pluyckx changed the title Unexpected behavior of watch Make possible to update version of watch to version in shared data Mar 31, 2021
@pluyckx
Copy link
Author

pluyckx commented Mar 31, 2021

Currently, I see two ways to implement this:

  1. Add a function like from_shared: from_watch which will clone the shared data and create a new Receiver object with the version of the shared data
  2. Add a function which just updates the version of the watch to the one in the shared data.

It is possible to just implement one, or to implement both solutions. Not sure what you prefer. If you want I am willing to implement this and open a pull request.

@Darksonn
Copy link
Contributor

I prefer your second option.

@wabain
Copy link
Contributor

wabain commented Apr 30, 2021

I was also bitten by this in pretty much the same scenario as the original report—there's a long-lived Receiver instance that isn't polled but is used to create tasks which then .borrow() and clone the current value before starting to await .changed() notifications within a select loop. Without a .changed().now_or_never() prior to entering the loop the task gets a spurious change notification after starting.

The proposed sync_version() certainly would be a more discoverable way of doing this, but it seems to me there's still some inherent raciness in this flow that I didn't expect before looking at the implementation. I did anticipate the possibility that the receiver misses some updates, for instance if the watch value is updated a second time between when the receiver's waker is notified and when the task runs. What I didn't expect is that it's also possible to get a spurious changed() notification, even outside of this cold start scenario. If the value is updated between when the receiver polls for updates and when rx.borrow() is called, the next call to rx.changed() will resolve immediately even though the receiver already accessed the latest version.

The documentation seems a little inconsistent on this. The docs for Receiver::changed() say it returns "when a new value has been sent by the Sender since the last time changed() was called" which corresponds to the implemented behavior. But the module-level docs say:

The channel is created with an initial value. The latest value stored in the channel is accessed with Receiver::borrow(). Awaiting Receiver::changed() waits for a new value to sent by the Sender half.

The emphasis on latest is maybe supposed to get at this race, but I'd assumed that was just intending to refer to the scenario where a receiver could be notified of fewer distinct versions than were sent. From the module docs alone, with the only obvious antecedent for "new value" in the text being the value retrieved using borrow(), I'd expect that after the receiver observes a value via borrow() a subsequent call to changed() would resolve only after a distinct version was received, with any additional wakeups dealt with as an internal implementation detail. I find it hard to imagine a scenario where someone would intentionally rely on changed() resolving due to an update which was already observed by the receiver.

One way to avoid this race would be to keep a second shared copy of the version inside the RwLock along with the value. Then when borrow() is called the receiver's version could be checked in case it had updated since the atomic load in changed(), preventing a subsequent call to changed() from resolving for a version which was already observed. There would be up to a usize of extra memory used in the shared state depending on alignment and an extra check inside borrow(), which is already documented to incur synchronization overhead. I think this would prevent needing something like sync_version(), at least if the receiving task accesses the initial value before calling changed().

If modifying borrow() to work that way is too much of a compatibility risk, it would be great if there could be a separate method—say Receiver::read or Receiver::refresh or something—that provides access to the latest value like borrow() but ensures changed() will not resolve again for that version. As it stands I think the only way to reliably detect spurious change notifications in general is to replicate the version number check and have the sending task include a version number in the payload that the receiver can verify.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-tokio Area: The main tokio crate C-feature-request Category: A feature request. M-sync Module: tokio/sync
Projects
None yet
3 participants