diff --git a/docs/src/SUMMARY.md b/docs/src/SUMMARY.md index 07b41b1..4237e50 100644 --- a/docs/src/SUMMARY.md +++ b/docs/src/SUMMARY.md @@ -4,6 +4,7 @@ - [Contributing](contributing.md) - [Senders, receivers, and operations](sender-receiver.md) - [Writing your own sender and operation](your-own-sender.md) + - [Writing low-level data structures](low-level-data-structures.md) - [IO services](io-service.md) - [API reference](headers.md) - [async/algorithm.hpp](headers/algorithm.md) diff --git a/docs/src/low-level-data-structures.md b/docs/src/low-level-data-structures.md new file mode 100644 index 0000000..b580e2d --- /dev/null +++ b/docs/src/low-level-data-structures.md @@ -0,0 +1,54 @@ +# Writing low-level data structures + +Writing low-level data structures with correct cancellation support can be difficult +and prone to errors. The `cancellation_resolver` data structure can help to simplify this issue. + +Low level data structures (such as wait queues) will typically have a `struct node` (or similar) +that is linked to the data structure during an async operation. + +```cpp +struct node { + // Called when the async operation completes successfully (no cancellation). + virtual void complete() = 0; + + // ... +}; + +template +struct operation : node { + void complete() override { /* ... */ } +}; +``` + +To correctly add cancellation support to such a data structure, the following recipe can be used: +1. Add a `bool try_cancel(node *nd)` function to the data structure that tries to remove + `nd` from the data structure. If this succeeds, `complete()` will never be called. + `try_cancel()` will fail (and return `false`) if the operation represented by `nd` is already being + completed concurrently (or if it was completed in the past). + The data structure's own synchronization mechanisms (e.g., mutexes) can be used to determine this outcome. + For example, this can be done by adding a member to the node that distinguishes between the states + "not submitted yet", "linked to the data structure" and "completion pending" (with the latter + corresponding to the scenario that a call to `complete()` is either imminent or has already been performed). +2. Add an `observation_resolver` to `operation` that calls `try_cancel()` on its cancellation path + and `async::execution::set_value()` on its resumption path. + Implement the `complete()` override in `operation` by calling the `observation_resolver`'s + `complete()` method. + ```cpp + void complete() { + cr_.complete(); + } + ``` +3. In the operation's `start()` function, follow the following recipe: + ```cpp + void start() { + bool fast_path = false; + { + // Lock the data structure, determine if a fast path is applicable etc. + // ... + } + + if (fast_path) + return async::execution::set_value(/* ... */); + cr_.listen(ct_); // If fast path is not taken, register cancellation_resolver. + } + ``` diff --git a/include/async/cancellation.hpp b/include/async/cancellation.hpp index 5fcc79e..f2b1306 100644 --- a/include/async/cancellation.hpp +++ b/include/async/cancellation.hpp @@ -30,6 +30,9 @@ struct cancellation_event { template friend struct cancellation_observer; + template + friend struct cancellation_resolver; + cancellation_event() : _was_requested{false} { }; @@ -70,6 +73,9 @@ struct cancellation_token { template friend struct cancellation_observer; + template + friend struct cancellation_resolver; + cancellation_token() : _event{nullptr} { } @@ -200,6 +206,123 @@ struct cancellation_observer final : private abstract_cancellation_callback { F _functor; }; +// Helper class to implement cancellation for low-level data structures. +// Both TryCancel and Resume are function objects that take a cancellation_resolver *. +// Cancellation always involves a potential race between the cancellation code path +// and the regular completion code path. This class helps to resolve this race. +// +// Usage: +// * Initialization: users do their internal book keeping (e.g., by adding nodes to their +// data structures or similar) and then call listen() with a cancellation token. +// * Regular completion: on regular completion, users call complete(). +// This tries to unregister the cancellation_resolver from the cancellation event. +// If this succeeds, tryCancel() will never be called. +// If it fails, tryCancel() either was already called or it is called concurrently. +// * On cancellation, the tryCancel() callback is called. +// If tryCancel() returns true, complete() will never be called. +// If it fails, complete() either was already called or it is called concurrently. +// * resume() is called once all of the following hold: +// - listen() is done. +// - Either complete() is done or it will never be called. +// - Either tryCancel() is done or it will never be called. +template +struct cancellation_resolver final : private abstract_cancellation_callback { + cancellation_resolver(TryCancel tryCancel = TryCancel{}, Resume resume = Resume{}) + : tryCancel_{std::move(tryCancel)}, resume_{std::move(resume)} { } + + cancellation_resolver(const cancellation_resolver &) = delete; + + // TODO: we could do some sanity checking of the state in the destructor. + ~cancellation_resolver() = default; + + cancellation_resolver &operator= (const cancellation_resolver &) = delete; + + void listen(cancellation_token ct) { + event_ = ct._event; + if (!event_) { + transition_(done_listen | done_cancellation_path); + return; + } + + // Try to register a callback for cancellation. + // This will succeed unless the cancellation event is already triggered. + bool registered = false; + { + frg::unique_lock guard{event_->_mutex}; + if(!event_->_was_requested) { + event_->_cbs.push_back(this); + registered = true; + } + } + if (registered) { + transition_(done_listen); + return; + } + + // If we get here, the cancellation event was already triggered. + // Do the equivalent of call(), except that we also set the done_listen bit. + if (tryCancel_(this)) { + transition_(done_listen | done_completion_path | done_cancellation_path); + return; + } + transition_(done_listen | done_cancellation_path); + } + + void complete() { + transition_(done_completion_path); + } + +private: + void call() override { + if (tryCancel_(this)) { + transition_(done_completion_path | done_cancellation_path); + return; + } + transition_(done_cancellation_path); + } + + void transition_(unsigned int new_bits) { + assert(!(new_bits & ~(done_listen | done_completion_path | done_cancellation_path))); + + auto old_st = state_.fetch_or(new_bits, std::memory_order_acq_rel); + assert(!(old_st & new_bits)); + + // Both resume() and cancellation event unregistration only happen once both + // listen() and the completion code path are done. + auto st = old_st | new_bits; + if (!(st & done_listen) || !(st & done_completion_path)) + return; + + if (!(st & done_cancellation_path)) { + // Try to unregister from the cancellation event once both listen() and complete() were called. + // Note that we enter this code path at most once since done_cancellation_path is the only missing + // bit in state_ and the next call to transition_() will necessarily set it. + assert(event_); + + frg::unique_lock guard{event_->_mutex}; + if (event_->_was_requested) + return; + auto it = event_->_cbs.iterator_to(this); + event_->_cbs.erase(it); + } + + // Call resume() when all code paths are done. This can only happen once. + resume_(this); + } + + // Set in state_ when listen() is done. + static constexpr unsigned int done_listen = 1u << 0; + // Set in state_ when complete() is done or when we know that it will never be called. + static constexpr unsigned int done_completion_path = 1u << 1; + // Set in state_ when tryCancel() is done or when we know that it will never be called. + static constexpr unsigned int done_cancellation_path = 1u << 2; + + cancellation_event *event_{nullptr}; + std::atomic state_{0}; + [[no_unique_address]] TryCancel tryCancel_; + [[no_unique_address]] Resume resume_; +}; + inline void cancellation_event::cancel() { frg::intrusive_list< abstract_cancellation_callback, @@ -236,6 +359,7 @@ using detail::cancellation_event; using detail::cancellation_token; using detail::cancellation_callback; using detail::cancellation_observer; +using detail::cancellation_resolver; namespace { diff --git a/include/async/post-ack.hpp b/include/async/post-ack.hpp index 514b388..123a288 100644 --- a/include/async/post-ack.hpp +++ b/include/async/post-ack.hpp @@ -3,7 +3,7 @@ #include #include -#include +#include #include namespace async { @@ -54,6 +54,18 @@ struct post_ack_mechanism { ~poll_node() = default; }; + bool try_cancel(poll_node *pnd) { + frg::unique_lock lock(mutex_); + + if(!pnd->pending) { + assert(!pnd->nd); + pnd->pending = true; + poll_queue_.erase(poll_queue_.iterator_to(pnd)); + return true; + } + return false; + } + public: template struct [[nodiscard]] post_operation final : private node { @@ -310,6 +322,7 @@ struct post_ack_agent { auto seq = agnt_->poll_seq_; + bool fast_path = false; { frg::unique_lock lock(agnt_->mech_->mutex_); assert(!nd); @@ -323,52 +336,44 @@ struct post_ack_agent { assert(it != agnt_->mech_->queue_.end()); pending = true; nd = *it; - }else if(!cobs_.try_set(ct_)) { - // Fast path: cancellation. - pending = true; + fast_path = true; }else{ // Slow path. agnt_->mech_->poll_queue_.push_back(this); - return; } } - if(nd) + if(fast_path) { ++agnt_->poll_seq_; - execution::set_value(receiver_, post_ack_handle{agnt_->mech_, nd}); + return execution::set_value(receiver_, post_ack_handle{agnt_->mech_, nd}); + } + cr_.listen(ct_); } private: - void complete() override { - if(cobs_.try_reset()) { - ++agnt_->poll_seq_; - execution::set_value(receiver_, post_ack_handle{agnt_->mech_, nd}); + struct try_cancel_fn { + bool operator()(auto *cr) { + auto self = frg::container_of(cr, &poll_operation::cr_); + return self->agnt_->mech_->try_cancel(self); } - } - - void complete_cancel() { - { - frg::unique_lock lock(agnt_->mech_->mutex_); - - if(!pending) { - assert(!nd); - pending = true; - agnt_->mech_->poll_queue_.erase(agnt_->mech_->poll_queue_.iterator_to(this)); - } + }; + struct resume_fn { + void operator()(auto *cr) { + auto self = frg::container_of(cr, &poll_operation::cr_); + if(self->nd) + ++self->agnt_->poll_seq_; + execution::set_value(self->receiver_, post_ack_handle{self->agnt_->mech_, self->nd}); } + }; - if(nd) { - ++agnt_->poll_seq_; - execution::set_value(receiver_, post_ack_handle{agnt_->mech_, nd}); - }else{ - execution::set_value(receiver_, post_ack_handle{}); - } + void complete() override { + cr_.complete(); } post_ack_agent *agnt_; cancellation_token ct_; R receiver_; - cancellation_observer> cobs_{this}; + cancellation_resolver cr_; }; struct [[nodiscard]] poll_sender { diff --git a/include/async/promise.hpp b/include/async/promise.hpp index 7a54d32..c166227 100644 --- a/include/async/promise.hpp +++ b/include/async/promise.hpp @@ -8,7 +8,7 @@ #include #include -#include +#include namespace async { @@ -49,11 +49,14 @@ namespace detail { virtual void complete() = 0; + bool was_cancelled() const { return cancelled_; } + protected: virtual ~node() = default; private: frg::default_list_hook hook_; + bool cancelled_ = false; }; frg::intrusive_list< @@ -65,6 +68,18 @@ namespace detail { > > queue_; + bool try_cancel(node *nd) { + frg::unique_lock lock{mutex_}; + + if (!has_value()) { + nd->cancelled_ = true; + auto it = queue_.iterator_to(nd); + queue_.erase(it); + return true; + } + return false; + } + void wake() { assert(has_value_); frg::intrusive_list< @@ -233,69 +248,58 @@ struct future { template struct get_operation final : private detail::promise_state_base::node { get_operation(detail::promise_state *state, cancellation_token ct, Receiver r) - : state_{state}, ct_{std::move(ct)}, r_{std::move(r)}, cobs_{this} { } + : state_{state}, ct_{std::move(ct)}, r_{std::move(r)} { } void start() { - bool cancelled = false; + bool fast_path = false; { frg::unique_lock lock{state_->mutex_}; if (!state_->has_value()) { - if (!cobs_.try_set(ct_)) { - cancelled = true; - } else { - state_->queue_.push_back(this); - return; - } + state_->queue_.push_back(this); + }else{ + fast_path = true; } } - if constexpr (std::is_same_v) - return execution::set_value(r_, !cancelled); - else { - if (cancelled) - return execution::set_value(r_, frg::optional{frg::null_opt}); + if(fast_path) { + if constexpr (std::is_same_v) + return execution::set_value(r_, true); else return execution::set_value(r_, frg::optional{&state_->get()}); } + cr_.listen(ct_); } private: - void cancel() { - bool cancelled = false; - { - frg::unique_lock lock{state_->mutex_}; - - if (!state_->has_value()) { - cancelled = true; - auto it = state_->queue_.iterator_to(this); - state_->queue_.erase(it); - } + struct try_cancel_fn { + bool operator()(auto *cr) { + auto self = frg::container_of(cr, &get_operation::cr_); + return self->state_->try_cancel(self); } - - if constexpr (std::is_same_v) - execution::set_value(r_, !cancelled); - else { - if (cancelled) - execution::set_value(r_, frg::optional{frg::null_opt}); - else - execution::set_value(r_, frg::optional{&state_->get()}); + }; + struct resume_fn { + void operator()(auto *cr) { + auto self = frg::container_of(cr, &get_operation::cr_); + if constexpr (std::is_same_v) { + execution::set_value(self->r_, !self->was_cancelled()); + } else { + if (self->was_cancelled()) + execution::set_value(self->r_, frg::optional{frg::null_opt}); + else + execution::set_value(self->r_, frg::optional{&self->state_->get()}); + } } - } + }; void complete() override { - if (cobs_.try_reset()) { - if constexpr (std::is_same_v) - execution::set_value(r_, true); - else - execution::set_value(r_, frg::optional{&state_->get()}); - } + cr_.complete(); } detail::promise_state *state_; cancellation_token ct_; Receiver r_; - cancellation_observer> cobs_; + cancellation_resolver cr_; }; public: diff --git a/include/async/queue.hpp b/include/async/queue.hpp index af12624..fd4efb2 100644 --- a/include/async/queue.hpp +++ b/include/async/queue.hpp @@ -2,8 +2,8 @@ #include #include +#include #include -#include #include namespace async { @@ -21,17 +21,26 @@ struct queue { virtual ~sink() = default; public: - virtual void cancel() = 0; virtual void complete() = 0; protected: - cancellation_observer> cobs{this}; frg::optional value; private: frg::default_list_hook hook_; }; + bool try_cancel(sink *sp) { + frg::unique_lock lock{mutex_}; + + if(!sp->value) { + auto it = sinks_.iterator_to(sp); + sinks_.erase(it); + return true; + } + return false; + } + public: void put(T item) { emplace(std::move(item)); @@ -39,7 +48,7 @@ struct queue { template void emplace(Ts&&... arg) { - sink *retire_sp = nullptr; + sink *complete_sp = nullptr; { frg::unique_lock lock{mutex_}; @@ -47,15 +56,14 @@ struct queue { assert(buffer_.empty()); auto sp = sinks_.pop_front(); sp->value.emplace(std::forward(arg)...); - if(sp->cobs.try_reset()) - retire_sp = sp; + complete_sp = sp; }else{ buffer_.emplace_back(std::forward(arg)...); } } - if(retire_sp) - retire_sp->complete(); + if(complete_sp) + complete_sp->complete(); } // ---------------------------------------------------------------------------------- @@ -68,7 +76,7 @@ struct queue { : q_{q}, ct_{std::move(ct)}, r_{std::move(r)} { } void start() { - bool retire = false; + bool fast_path = false; { frg::unique_lock lock{q_->mutex_}; @@ -76,45 +84,41 @@ struct queue { assert(q_->sinks_.empty()); value = std::move(q_->buffer_.front()); q_->buffer_.pop_front(); - retire = true; + fast_path = true; }else{ - if(!cobs.try_set(ct_)) { - retire = true; - }else{ - q_->sinks_.push_back(this); - } + q_->sinks_.push_back(this); } } - if(retire) + if(fast_path) return execution::set_value(r_, std::move(value)); + cr_.listen(ct_); } private: - using sink::cobs; using sink::value; - void cancel() override { - { - frg::unique_lock lock{q_->mutex_}; - - // We either have a value, or we are not part of the list anymore. - if(!value) { - auto it = q_->sinks_.iterator_to(this); - q_->sinks_.erase(it); - } + struct try_cancel_fn { + bool operator()(auto *cr) { + auto self = frg::container_of(cr, &get_operation::cr_); + return self->q_->try_cancel(self); } - - execution::set_value(r_, std::move(value)); - } + }; + struct resume_fn { + void operator()(auto *cr) { + auto self = frg::container_of(cr, &get_operation::cr_); + execution::set_value(self->r_, std::move(self->value)); + } + }; void complete() override { - execution::set_value(r_, std::move(value)); + cr_.complete(); } queue *q_; cancellation_token ct_; Receiver r_; + cancellation_resolver cr_; }; struct get_sender { diff --git a/include/async/recurring-event.hpp b/include/async/recurring-event.hpp index 23681df..f54436c 100644 --- a/include/async/recurring-event.hpp +++ b/include/async/recurring-event.hpp @@ -4,7 +4,7 @@ #include #include #include -#include +#include #include namespace async { @@ -15,7 +15,7 @@ struct recurring_event { none, submitted, pending, - retired + cancelled }; struct node { @@ -30,6 +30,8 @@ struct recurring_event { virtual void complete() = 0; + bool was_cancelled() const { return st_ == state::cancelled; } + protected: virtual ~node() = default; @@ -42,7 +44,7 @@ struct recurring_event { public: void raise() { - // Grab all items and mark them as retired while we hold the lock. + // Grab all items and mark them as pending while we hold the lock. frg::intrusive_list< node, frg::locate_member< @@ -69,6 +71,18 @@ struct recurring_event { } } + bool try_cancel(node *nd) { + frg::unique_lock lock(_mutex); + + if(nd->st_ == state::submitted) { + nd->st_ = state::cancelled; + auto it = queue_.iterator_to(nd); + queue_.erase(it); + return true; + } + return false; + } + // ---------------------------------------------------------------------------------- // async_wait_if() and its boilerplate. // ---------------------------------------------------------------------------------- @@ -76,70 +90,56 @@ struct recurring_event { template struct wait_if_operation final : private node { wait_if_operation(recurring_event *evt, C cond, cancellation_token ct, Receiver r) - : evt_{evt}, cond_{std::move(cond)}, ct_{std::move(ct)}, r_{std::move(r)}, cobs_{this} { } + : evt_{evt}, cond_{std::move(cond)}, ct_{std::move(ct)}, r_{std::move(r)} { } void start() { assert(st_ == state::none); - bool retire_condfail = false; - bool retire_cancelled = false; + bool fast_path = false; { frg::unique_lock lock(evt_->_mutex); if(!cond_()) { st_ = state::pending; - retire_condfail = true; - }else if(!cobs_.try_set(ct_)) { - st_ = state::pending; - cancelled_ = true; - retire_cancelled = true; + fast_path = true; }else{ st_ = state::submitted; evt_->queue_.push_back(this); } } - if(retire_condfail) { - st_ = state::retired; + if(fast_path) { return execution::set_value(r_, maybe_awaited::condition_failed); - }else if(retire_cancelled) { - st_ = state::retired; - return execution::set_value(r_, maybe_cancelled::cancelled); } + cr_.listen(ct_); } private: - void cancel() { - { - frg::unique_lock lock(evt_->_mutex); - - if(st_ == state::submitted) { - st_ = state::pending; - cancelled_ = true; - auto it = evt_->queue_.iterator_to(this); - evt_->queue_.erase(it); - }else{ - assert(st_ == state::pending); - } + struct try_cancel_fn { + bool operator()(auto *cr) { + auto self = frg::container_of(cr, &wait_if_operation::cr_); + return self->evt_->try_cancel(self); } - - st_ = state::retired; - execution::set_value(r_, maybe_cancelled::cancelled); - } + }; + struct resume_fn { + void operator()(auto *cr) { + auto self = frg::container_of(cr, &wait_if_operation::cr_); + if(self->was_cancelled()) + execution::set_value(self->r_, maybe_cancelled::cancelled); + else + execution::set_value(self->r_, maybe_awaited::awaited); + } + }; void complete() override { - if(cobs_.try_reset()) { - st_ = state::retired; - execution::set_value(r_, maybe_awaited::awaited); - } + cr_.complete(); } recurring_event *evt_; C cond_; cancellation_token ct_; Receiver r_; - cancellation_observer> cobs_; - bool cancelled_ = false; + cancellation_resolver cr_; }; template diff --git a/include/async/wait-group.hpp b/include/async/wait-group.hpp index e01b06f..d0055f4 100644 --- a/include/async/wait-group.hpp +++ b/include/async/wait-group.hpp @@ -6,7 +6,7 @@ #include #include #include -#include +#include #include namespace async { @@ -24,12 +24,15 @@ struct wait_group { virtual void complete() = 0; + bool was_cancelled() const { return cancelled_; } + protected: virtual ~node() = default; private: // Protected by mutex_. frg::default_list_hook _hook; + bool cancelled_ = false; }; public: @@ -70,6 +73,19 @@ struct wait_group { } } + bool try_cancel(node *nd) { + frg::unique_lock lock(mutex_); + + // Relaxed since non-zero -> zero transitions cannot happen while the mutex is held. + if(ctr_.load(std::memory_order_relaxed) > 0) { + nd->cancelled_ = true; + auto it = queue_.iterator_to(nd); + queue_.erase(it); + return true; + } + return false; + } + void add(size_t new_members) { ctr_.fetch_add(new_members, std::memory_order_acq_rel); } @@ -81,53 +97,48 @@ struct wait_group { template struct wait_operation final : private node { wait_operation(wait_group *wg, cancellation_token ct, Receiver r) - : wg_{wg}, ct_{std::move(ct)}, r_{std::move(r)}, cobs_{this} { } + : wg_{wg}, ct_{std::move(ct)}, r_{std::move(r)} { } void start() { - bool cancelled = false; + bool fast_path = false; { frg::unique_lock lock(wg_->mutex_); // Relaxed since non-zero -> zero transitions cannot happen while the mutex is held. if(wg_->ctr_.load(std::memory_order_relaxed) > 0) { - if(!cobs_.try_set(ct_)) { - cancelled = true; - }else{ - wg_->queue_.push_back(this); - return; - } + wg_->queue_.push_back(this); + }else{ + fast_path = true; } } - return execution::set_value(r_, !cancelled); + if(fast_path) + return execution::set_value(r_, true); + cr_.listen(ct_); } private: - void cancel() { - bool cancelled = false; - { - frg::unique_lock lock(wg_->mutex_); - - // Relaxed since non-zero -> zero transitions cannot happen while the mutex is held. - if(wg_->ctr_.load(std::memory_order_relaxed) > 0) { - cancelled = true; - auto it = wg_->queue_.iterator_to(this); - wg_->queue_.erase(it); - } + struct try_cancel_fn { + bool operator()(auto *cr) { + auto self = frg::container_of(cr, &wait_operation::cr_); + return self->wg_->try_cancel(self); } - - execution::set_value(r_, !cancelled); - } + }; + struct resume_fn { + void operator()(auto *cr) { + auto self = frg::container_of(cr, &wait_operation::cr_); + execution::set_value(self->r_, !self->was_cancelled()); + } + }; void complete() override { - if(cobs_.try_reset()) - execution::set_value(r_, true); + cr_.complete(); } wait_group *wg_; cancellation_token ct_; Receiver r_; - cancellation_observer> cobs_; + cancellation_resolver cr_; }; struct [[nodiscard]] wait_sender {