Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/src/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
- [Contributing](contributing.md)
- [Senders, receivers, and operations](sender-receiver.md)
- [Writing your own sender and operation](your-own-sender.md)
- [Writing low-level data structures](low-level-data-structures.md)
- [IO services](io-service.md)
- [API reference](headers.md)
- [async/algorithm.hpp](headers/algorithm.md)
Expand Down
54 changes: 54 additions & 0 deletions docs/src/low-level-data-structures.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# Writing low-level data structures

Writing low-level data structures with correct cancellation support can be difficult
and prone to errors. The `cancellation_resolver` data structure can help to simplify this issue.

Low level data structures (such as wait queues) will typically have a `struct node` (or similar)
that is linked to the data structure during an async operation.

```cpp
struct node {
// Called when the async operation completes successfully (no cancellation).
virtual void complete() = 0;

// ...
};

template<typename Receiver>
struct operation : node {
void complete() override { /* ... */ }
};
```

To correctly add cancellation support to such a data structure, the following recipe can be used:
1. Add a `bool try_cancel(node *nd)` function to the data structure that tries to remove
`nd` from the data structure. If this succeeds, `complete()` will never be called.
`try_cancel()` will fail (and return `false`) if the operation represented by `nd` is already being
completed concurrently (or if it was completed in the past).
The data structure's own synchronization mechanisms (e.g., mutexes) can be used to determine this outcome.
For example, this can be done by adding a member to the node that distinguishes between the states
"not submitted yet", "linked to the data structure" and "completion pending" (with the latter
corresponding to the scenario that a call to `complete()` is either imminent or has already been performed).
2. Add an `observation_resolver` to `operation` that calls `try_cancel()` on its cancellation path
and `async::execution::set_value()` on its resumption path.
Implement the `complete()` override in `operation` by calling the `observation_resolver`'s
`complete()` method.
```cpp
void complete() {
cr_.complete();
}
```
3. In the operation's `start()` function, follow the following recipe:
```cpp
void start() {
bool fast_path = false;
{
// Lock the data structure, determine if a fast path is applicable etc.
// ...
}

if (fast_path)
return async::execution::set_value(/* ... */);
cr_.listen(ct_); // If fast path is not taken, register cancellation_resolver.
}
```
124 changes: 124 additions & 0 deletions include/async/cancellation.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ struct cancellation_event {
template<typename F>
friend struct cancellation_observer;

template<typename TryCancel, typename Cont>
friend struct cancellation_resolver;

cancellation_event()
: _was_requested{false} { };

Expand Down Expand Up @@ -70,6 +73,9 @@ struct cancellation_token {
template<typename F>
friend struct cancellation_observer;

template<typename TryCancel, typename Cont>
friend struct cancellation_resolver;

cancellation_token()
: _event{nullptr} { }

Expand Down Expand Up @@ -200,6 +206,123 @@ struct cancellation_observer final : private abstract_cancellation_callback {
F _functor;
};

// Helper class to implement cancellation for low-level data structures.
// Both TryCancel and Resume are function objects that take a cancellation_resolver *.
// Cancellation always involves a potential race between the cancellation code path
// and the regular completion code path. This class helps to resolve this race.
//
// Usage:
// * Initialization: users do their internal book keeping (e.g., by adding nodes to their
// data structures or similar) and then call listen() with a cancellation token.
// * Regular completion: on regular completion, users call complete().
// This tries to unregister the cancellation_resolver from the cancellation event.
// If this succeeds, tryCancel() will never be called.
// If it fails, tryCancel() either was already called or it is called concurrently.
// * On cancellation, the tryCancel() callback is called.
// If tryCancel() returns true, complete() will never be called.
// If it fails, complete() either was already called or it is called concurrently.
// * resume() is called once all of the following hold:
// - listen() is done.
// - Either complete() is done or it will never be called.
// - Either tryCancel() is done or it will never be called.
template<typename TryCancel, typename Resume>
struct cancellation_resolver final : private abstract_cancellation_callback {
cancellation_resolver(TryCancel tryCancel = TryCancel{}, Resume resume = Resume{})
: tryCancel_{std::move(tryCancel)}, resume_{std::move(resume)} { }

cancellation_resolver(const cancellation_resolver &) = delete;

// TODO: we could do some sanity checking of the state in the destructor.
~cancellation_resolver() = default;

cancellation_resolver &operator= (const cancellation_resolver &) = delete;

void listen(cancellation_token ct) {
event_ = ct._event;
if (!event_) {
transition_(done_listen | done_cancellation_path);
return;
}

// Try to register a callback for cancellation.
// This will succeed unless the cancellation event is already triggered.
bool registered = false;
{
frg::unique_lock guard{event_->_mutex};
if(!event_->_was_requested) {
event_->_cbs.push_back(this);
registered = true;
}
}
if (registered) {
transition_(done_listen);
return;
}

// If we get here, the cancellation event was already triggered.
// Do the equivalent of call(), except that we also set the done_listen bit.
if (tryCancel_(this)) {
transition_(done_listen | done_completion_path | done_cancellation_path);
return;
}
transition_(done_listen | done_cancellation_path);
}

void complete() {
transition_(done_completion_path);
}

private:
void call() override {
if (tryCancel_(this)) {
transition_(done_completion_path | done_cancellation_path);
return;
}
transition_(done_cancellation_path);
}

void transition_(unsigned int new_bits) {
assert(!(new_bits & ~(done_listen | done_completion_path | done_cancellation_path)));

auto old_st = state_.fetch_or(new_bits, std::memory_order_acq_rel);
assert(!(old_st & new_bits));

// Both resume() and cancellation event unregistration only happen once both
// listen() and the completion code path are done.
auto st = old_st | new_bits;
if (!(st & done_listen) || !(st & done_completion_path))
return;

if (!(st & done_cancellation_path)) {
// Try to unregister from the cancellation event once both listen() and complete() were called.
// Note that we enter this code path at most once since done_cancellation_path is the only missing
// bit in state_ and the next call to transition_() will necessarily set it.
assert(event_);

frg::unique_lock guard{event_->_mutex};
if (event_->_was_requested)
return;
auto it = event_->_cbs.iterator_to(this);
event_->_cbs.erase(it);
}

// Call resume() when all code paths are done. This can only happen once.
resume_(this);
}

// Set in state_ when listen() is done.
static constexpr unsigned int done_listen = 1u << 0;
// Set in state_ when complete() is done or when we know that it will never be called.
static constexpr unsigned int done_completion_path = 1u << 1;
// Set in state_ when tryCancel() is done or when we know that it will never be called.
static constexpr unsigned int done_cancellation_path = 1u << 2;

cancellation_event *event_{nullptr};
std::atomic<unsigned int> state_{0};
[[no_unique_address]] TryCancel tryCancel_;
[[no_unique_address]] Resume resume_;
};

inline void cancellation_event::cancel() {
frg::intrusive_list<
abstract_cancellation_callback,
Expand Down Expand Up @@ -236,6 +359,7 @@ using detail::cancellation_event;
using detail::cancellation_token;
using detail::cancellation_callback;
using detail::cancellation_observer;
using detail::cancellation_resolver;

namespace {

Expand Down
63 changes: 34 additions & 29 deletions include/async/post-ack.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#include <algorithm>

#include <async/cancellation.hpp>
#include <frg/functional.hpp>
#include <frg/container_of.hpp>
#include <frg/list.hpp>

namespace async {
Expand Down Expand Up @@ -54,6 +54,18 @@ struct post_ack_mechanism {
~poll_node() = default;
};

bool try_cancel(poll_node *pnd) {
frg::unique_lock lock(mutex_);

if(!pnd->pending) {
assert(!pnd->nd);
pnd->pending = true;
poll_queue_.erase(poll_queue_.iterator_to(pnd));
return true;
}
return false;
}

public:
template<typename R>
struct [[nodiscard]] post_operation final : private node {
Expand Down Expand Up @@ -310,6 +322,7 @@ struct post_ack_agent {

auto seq = agnt_->poll_seq_;

bool fast_path = false;
{
frg::unique_lock lock(agnt_->mech_->mutex_);
assert(!nd);
Expand All @@ -323,52 +336,44 @@ struct post_ack_agent {
assert(it != agnt_->mech_->queue_.end());
pending = true;
nd = *it;
}else if(!cobs_.try_set(ct_)) {
// Fast path: cancellation.
pending = true;
fast_path = true;
}else{
// Slow path.
agnt_->mech_->poll_queue_.push_back(this);
return;
}
}

if(nd)
if(fast_path) {
++agnt_->poll_seq_;
execution::set_value(receiver_, post_ack_handle<T>{agnt_->mech_, nd});
return execution::set_value(receiver_, post_ack_handle<T>{agnt_->mech_, nd});
}
cr_.listen(ct_);
}

private:
void complete() override {
if(cobs_.try_reset()) {
++agnt_->poll_seq_;
execution::set_value(receiver_, post_ack_handle<T>{agnt_->mech_, nd});
struct try_cancel_fn {
bool operator()(auto *cr) {
auto self = frg::container_of(cr, &poll_operation::cr_);
return self->agnt_->mech_->try_cancel(self);
}
}

void complete_cancel() {
{
frg::unique_lock lock(agnt_->mech_->mutex_);

if(!pending) {
assert(!nd);
pending = true;
agnt_->mech_->poll_queue_.erase(agnt_->mech_->poll_queue_.iterator_to(this));
}
};
struct resume_fn {
void operator()(auto *cr) {
auto self = frg::container_of(cr, &poll_operation::cr_);
if(self->nd)
++self->agnt_->poll_seq_;
execution::set_value(self->receiver_, post_ack_handle<T>{self->agnt_->mech_, self->nd});
}
};

if(nd) {
++agnt_->poll_seq_;
execution::set_value(receiver_, post_ack_handle<T>{agnt_->mech_, nd});
}else{
execution::set_value(receiver_, post_ack_handle<T>{});
}
void complete() override {
cr_.complete();
}

post_ack_agent *agnt_;
cancellation_token ct_;
R receiver_;
cancellation_observer<frg::bound_mem_fn<&poll_operation::complete_cancel>> cobs_{this};
cancellation_resolver<try_cancel_fn, resume_fn> cr_;
};

struct [[nodiscard]] poll_sender {
Expand Down
Loading
Loading