Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sync: adds Notify for basic task notification #2210

Merged
merged 16 commits into from
Feb 26, 2020
Merged

sync: adds Notify for basic task notification #2210

merged 16 commits into from
Feb 26, 2020

Conversation

carllerche
Copy link
Member

Notify provides a synchronization primitive similar to thread park /
unpark, except for tasks. Roughly, it is like a semaphore that starts with
no permits and notify_one adds a permit if, and only if, there currently are
no permits.

This effectively provides a basic way to get a task to wait until a signal is
received.

This PR also is a first step towards bringing the techniques used
by futures-intrusive
into Tokio. This introduces an intrusive linked list guarded by a mutex. Wakers are
stored on the (async) stack thanks to Pin. The details of the linked list are pulled
mostly from #2153, but with unused code removed, some slight API tweaks made,
and additional tests in the form of fuzzing added.

Because of the similarities between Notify and Semaphore, a question would be
whether Notify should exist at all. I am leaning towards having both Notify and SemaphoreasSemaphoredoes not include the necessary APIs to implementNotify. Specifically, there is no add_permit_if_nonefunction. Eventually,Notifycould be refactored to useSemaphore` under the hood.

`Notify` provides a synchronization primitive similar to thread park /
unpark, except for tasks.
@carllerche
Copy link
Member Author

carllerche commented Feb 3, 2020

As an example (included in api docs), this is how one could implement an MPSC channel using a VecDeque and Notify:

use tokio::sync::Notify;
use std::collections::VecDeque;
use std::sync::Mutex;

struct Channel<T> {
    values: Mutex<VecDeque<T>>,
    notify: Notify,
}

impl<T> Channel<T> {
    pub fn send(&self, value: T) {
        self.values.lock().unwrap()
            .push_back(value);

        // Notify the consumer a value is available
        self.notify.notify_one();
    }

    pub async fn recv(&self) -> T {
        loop {
            // Drain values
            if let Some(value) = self.values.lock().unwrap().pop_front() {
                return value;
            }

            // Wait for values to be available
            self.notify.recv().await;
        }
    }
}

(code untested)

@carllerche
Copy link
Member Author

Another open question is what should the "receive" function be called. Right now, I have Notify::recv, but maybe it should be Notify::acquire to mirror Semaphore or maybe it should be something completely different.

Copy link
Member

@hawkw hawkw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a lot of code here, and I'd like to spend longer looking at it, but I've done a cursory review and left a few comments.

tokio/src/macros/pin.rs Outdated Show resolved Hide resolved
tokio/src/sync/notify.rs Outdated Show resolved Hide resolved
tokio/src/sync/notify.rs Show resolved Hide resolved
tokio/src/sync/notify.rs Show resolved Hide resolved
tokio/src/sync/notify.rs Outdated Show resolved Hide resolved
tokio/src/sync/notify.rs Outdated Show resolved Hide resolved
tokio/src/sync/notify.rs Show resolved Hide resolved
tokio/src/util/linked_list.rs Outdated Show resolved Hide resolved
tokio/src/util/linked_list.rs Outdated Show resolved Hide resolved
@hawkw
Copy link
Member

hawkw commented Feb 3, 2020

Another open question is what should the "receive" function be called. Right now, I have Notify::recv, but maybe it should be Notify::acquire to mirror Semaphore or maybe it should be something completely different.

How about Notify::wait? IMHO this best describes what the operation is actually trying to do: you are waiting to be notified, not receiving any actual data. That way, you have Notify::wait and Notify::notify_one, which kinda mirrors std::sync::Barrier.

On the other hand, then you have notify.wait().await; which seems kinda repetitive...

tokio/src/sync/notify.rs Outdated Show resolved Hide resolved
tokio/src/sync/notify.rs Outdated Show resolved Hide resolved
tokio/src/sync/notify.rs Outdated Show resolved Hide resolved
tokio/src/sync/notify.rs Outdated Show resolved Hide resolved
tokio/src/sync/notify.rs Outdated Show resolved Hide resolved
tokio/src/sync/notify.rs Show resolved Hide resolved
tokio/src/sync/notify.rs Show resolved Hide resolved
@hawkw hawkw mentioned this pull request Feb 21, 2020
Copy link
Member

@hawkw hawkw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm! i gave the documentation a quick proof-reading and had some minor edits to suggest.

tokio/src/sync/notify.rs Show resolved Hide resolved
tokio/src/macros/pin.rs Outdated Show resolved Hide resolved
tokio/src/sync/notify.rs Outdated Show resolved Hide resolved
tokio/src/sync/notify.rs Outdated Show resolved Hide resolved
tokio/src/sync/notify.rs Show resolved Hide resolved
tokio/src/sync/notify.rs Outdated Show resolved Hide resolved
tokio/src/sync/notify.rs Show resolved Hide resolved
@carllerche carllerche merged commit 8b7ea0f into master Feb 26, 2020
@carllerche carllerche deleted the notify branch February 26, 2020 22:13
hawkw added a commit that referenced this pull request Mar 23, 2020
## Motivation

Many of Tokio's synchronization primitives (`RwLock`, `Mutex`,
`Semaphore`, and the bounded MPSC channel) are based on the internal
semaphore implementation, called `semaphore_ll`. This semaphore type
provides a lower-level internal API for the semaphore implementation
than the public `Semaphore` type, and supports "batch" operations, where
waiters may acquire more than one permit at a time, and batches of
permits may be released back to the semaphore.

Currently, `semaphore_ll` uses an atomic singly-linked list for the
waiter queue. The linked list implementation is specific to the
semaphore. This implementation therefore requires a heap allocation for
every waiter in the queue. These allocations are owned by the semaphore,
rather than by the task awaiting permits from the semaphore. Critically,
they are only _deallocated_ when permits are released back to the
semaphore, at which point it dequeues as many waiters from the front of
the queue as can be satisfied with the released permits. If a task
attempts to acquire permits from the semaphore and is cancelled (such as
by timing out), their waiter nodes remain in the list until they are
dequeued while releasing permits. In cases where large numbers of tasks
are cancelled while waiting for permits, this results in extremely high
memory use for the semaphore (see #2237).

## Solution

@Matthias247 has proposed that Tokio adopt the approach used in his
`futures-intrusive` crate: using an _intrusive_ linked list to store the
wakers of tasks waiting on a synchronization primitive. In an intrusive
list, each list node is stored as part of the entry that node
represents, rather than in a heap allocation that owns the entry.
Because futures must be pinned in order to be polled, the necessary
invariant of such a list --- that entries may not move while in the list
--- may be upheld by making the waiter node `!Unpin`. In this approach,
the waiter node can be stored inline in the future, rather than
requiring  separate heap allocation, and cancelled futures may remove
their nodes from the list.

This branch adds a new semaphore implementation that uses the intrusive
list added to Tokio in #2210. The implementation is essentially a hybrid
of the old `semaphore_ll` and the semaphore used in `futures-intrusive`:
while a `Mutex` around the wait list is necessary, since the intrusive
list is not thread-safe, the permit state is stored outside of the mutex
and updated atomically. 

The mutex is acquired only when accessing the wait list — if a task 
can acquire sufficient permits without waiting, it does not need to
acquire the lock. When releasing permits, we iterate over the wait
list from the end of the queue until we run out of permits to release,
and split off all the nodes that received enough permits to wake up
into a separate list. Then, we can drain the new list and notify those
wakers *after* releasing the lock. Because the split operation only
modifies the pointers on the head node of the split-off list and the
new tail node of the old list, it is O(1) and does not require an
allocation to return a variable length number of waiters to notify.


Because of the intrusive list invariants, the API provided by the new
`batch_semaphore` is somewhat different than that of `semaphore_ll`. In
particular, the `Permit` type has been removed. This type was primarily
intended allow the reuse of a wait list node allocated on the heap.
Since the intrusive list means we can avoid heap-allocating waiters,
this is no longer necessary. Instead, acquiring permits is done by
polling an `Acquire` future returned by the `Semaphore` type. The use of
a future here ensures that the waiter node is always pinned while
waiting to acquire permits, and that a reference to the semaphore is
available to remove the waiter if the future is cancelled.
Unfortunately, the current implementation of the bounded MPSC requires a
`poll_acquire` operation, and has methods that call it while outside of
a pinned context. Therefore, I've left the old `semaphore_ll`
implementation in place to be used by the bounded MPSC, and updated the
`Mutex`, `RwLock`, and `Semaphore` APIs to use the new implementation.
Hopefully, a subsequent change can update the bounded MPSC to use the
new semaphore as well.

Fixes #2237

Signed-off-by: Eliza Weisman <eliza@buoyant.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants