-
Notifications
You must be signed in to change notification settings - Fork 15.2k
[libc++] stop_token
uses mutex
#69600
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
Closed
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@llvm/pr-subscribers-libcxx Author: Hui (huixie90) Changes
Full diff: https://github.com/llvm/llvm-project/pull/69600.diff 2 Files Affected:
diff --git a/libcxx/benchmarks/stop_token.bench.cpp b/libcxx/benchmarks/stop_token.bench.cpp
index 293d55ed82a08cf..e059a1166af16bd 100644
--- a/libcxx/benchmarks/stop_token.bench.cpp
+++ b/libcxx/benchmarks/stop_token.bench.cpp
@@ -14,6 +14,81 @@
using namespace std::chrono_literals;
+// We have a single thread created by std::jthread consuming the stop_token:
+// polling for stop_requested.
+void BM_stop_token_single_thread_polling_stop_requested(benchmark::State& state) {
+ auto thread_func = [&](std::stop_token st, std::atomic<std::uint64_t>* loop_count) {
+ while (!st.stop_requested()) {
+ // doing some work
+ loop_count->fetch_add(1, std::memory_order_relaxed);
+ }
+ };
+
+ std::atomic<std::uint64_t> loop_count(0);
+ std::uint64_t total_loop_test_param = state.range(0);
+
+ auto thread = support::make_test_jthread(thread_func, &loop_count);
+
+ for (auto _ : state) {
+ auto start_total = loop_count.load(std::memory_order_relaxed);
+
+ while (loop_count.load(std::memory_order_relaxed) - start_total < total_loop_test_param) {
+ std::this_thread::yield();
+ }
+ }
+}
+
+BENCHMARK(BM_stop_token_single_thread_polling_stop_requested)->RangeMultiplier(2)->Range(1 << 10, 1 << 24);
+
+// We have multiple threads polling for stop_requested of the same stop_token.
+void BM_stop_token_multi_thread_polling_stop_requested(benchmark::State& state) {
+ std::atomic<bool> start{false};
+
+ auto thread_func = [&start](std::atomic<std::uint64_t>* loop_count, std::stop_token st) {
+ start.wait(false);
+ while (!st.stop_requested()) {
+ // doing some work
+ loop_count->fetch_add(1, std::memory_order_relaxed);
+ }
+ };
+
+ constexpr size_t thread_count = 20;
+
+ std::uint64_t total_loop_test_param = state.range(0);
+
+ std::vector<std::atomic<std::uint64_t>> loop_counts(thread_count);
+ std::stop_source ss;
+ std::vector<std::jthread> threads;
+ threads.reserve(thread_count);
+
+ for (size_t i = 0; i < thread_count; ++i) {
+ threads.emplace_back(support::make_test_jthread(thread_func, &loop_counts[i], ss.get_token()));
+ }
+
+ auto get_total_loop = [&loop_counts] {
+ std::uint64_t total = 0;
+ for (const auto& loop_count : loop_counts) {
+ total += loop_count.load(std::memory_order_relaxed);
+ }
+ return total;
+ };
+
+ start = true;
+ start.notify_all();
+
+ for (auto _ : state) {
+ auto start_total = get_total_loop();
+
+ while (get_total_loop() - start_total < total_loop_test_param) {
+ std::this_thread::yield();
+ }
+ }
+
+ ss.request_stop();
+}
+
+BENCHMARK(BM_stop_token_multi_thread_polling_stop_requested)->RangeMultiplier(2)->Range(1 << 10, 1 << 24);
+
// We have a single thread created by std::jthread consuming the stop_token:
// registering/deregistering callbacks, one at a time.
void BM_stop_token_single_thread_reg_unreg_callback(benchmark::State& state) {
@@ -59,11 +134,11 @@ void BM_stop_token_async_reg_unreg_callback(benchmark::State& state) {
std::atomic<bool> start{false};
std::uint64_t total_reg_test_param = state.range(0);
+ std::vector<std::atomic<std::uint64_t>> reg_counts(thread_count);
std::stop_source ss;
std::vector<std::jthread> threads;
threads.reserve(thread_count);
- std::vector<std::atomic<std::uint64_t>> reg_counts(thread_count);
auto thread_func = [&start](std::atomic<std::uint64_t>* count, std::stop_token st) {
std::vector<std::optional<std::stop_callback<dummy_stop_callback>>> cbs(concurrent_request_count);
@@ -84,8 +159,8 @@ void BM_stop_token_async_reg_unreg_callback(benchmark::State& state) {
auto get_total_reg = [&] {
std::uint64_t total = 0;
- for (const auto& reg_counts : reg_counts) {
- total += reg_counts.load(std::memory_order_relaxed);
+ for (const auto& reg_count : reg_counts) {
+ total += reg_count.load(std::memory_order_relaxed);
}
return total;
};
diff --git a/libcxx/include/__stop_token/stop_state.h b/libcxx/include/__stop_token/stop_state.h
index 462aa73952b84f9..f3fca6554b378a7 100644
--- a/libcxx/include/__stop_token/stop_state.h
+++ b/libcxx/include/__stop_token/stop_state.h
@@ -12,7 +12,7 @@
#include <__availability>
#include <__config>
-#include <__stop_token/atomic_unique_lock.h>
+#include <__mutex/mutex.h>
#include <__stop_token/intrusive_list_view.h>
#include <__thread/id.h>
#include <atomic>
@@ -37,10 +37,51 @@ struct __stop_callback_base : __intrusive_node_base<__stop_callback_base> {
bool* __destroyed_ = nullptr;
};
+// stop_token needs to lock with noexcept. mutex::lock can throw.
+// wrap it with a while loop and catch all exceptions
+class __nothrow_mutex_lock {
+ std::mutex& __mutex_;
+ bool __is_locked_;
+
+public:
+ _LIBCPP_HIDE_FROM_ABI explicit __nothrow_mutex_lock(std::mutex& __mutex) noexcept
+ : __mutex_(__mutex), __is_locked_(true) {
+ __lock();
+ }
+
+ __nothrow_mutex_lock(const __nothrow_mutex_lock&) = delete;
+ __nothrow_mutex_lock(__nothrow_mutex_lock&&) = delete;
+ __nothrow_mutex_lock& operator=(const __nothrow_mutex_lock&) = delete;
+ __nothrow_mutex_lock& operator=(__nothrow_mutex_lock&&) = delete;
+
+ _LIBCPP_HIDE_FROM_ABI ~__nothrow_mutex_lock() {
+ if (__is_locked_) {
+ __unlock();
+ }
+ }
+
+ _LIBCPP_HIDE_FROM_ABI bool __owns_lock() const noexcept { return __is_locked_; }
+
+ _LIBCPP_HIDE_FROM_ABI void __lock() noexcept {
+ while (true) {
+ try {
+ __mutex_.lock();
+ break;
+ } catch (...) {
+ }
+ }
+ __is_locked_ = true;
+ }
+
+ _LIBCPP_HIDE_FROM_ABI void __unlock() noexcept {
+ __mutex_.unlock(); // throws nothing
+ __is_locked_ = false;
+ }
+};
+
class __stop_state {
static constexpr uint32_t __stop_requested_bit = 1;
- static constexpr uint32_t __callback_list_locked_bit = 1 << 1;
- static constexpr uint32_t __stop_source_counter_shift = 2;
+ static constexpr uint32_t __stop_source_counter_shift = 1;
// The "stop_source counter" is not used for lifetime reference counting.
// When the number of stop_source reaches 0, the remaining stop_tokens's
@@ -49,9 +90,10 @@ class __stop_state {
// The "callback list locked" bit implements the atomic_unique_lock to
// guard the operations on the callback list
//
- // 31 - 2 | 1 | 0 |
- // stop_source counter | callback list locked | stop_requested |
+ // 31 - 1 | 0 |
+ // stop_source counter | stop_requested |
atomic<uint32_t> __state_ = 0;
+ std::mutex __mutex_;
// Reference count for stop_token + stop_callback + stop_source
// When the counter reaches zero, the state is destroyed
@@ -59,7 +101,7 @@ class __stop_state {
atomic<uint32_t> __ref_count_ = 0;
using __state_t = uint32_t;
- using __callback_list_lock = __atomic_unique_lock<__state_t, __callback_list_locked_bit>;
+ using __callback_list_lock = __nothrow_mutex_lock;
using __callback_list = __intrusive_list_view<__stop_callback_base>;
__callback_list __callback_list_;
@@ -101,8 +143,9 @@ class __stop_state {
}
_LIBCPP_AVAILABILITY_SYNC _LIBCPP_HIDE_FROM_ABI bool __request_stop() noexcept {
- auto __cb_list_lock = __try_lock_for_request_stop();
- if (!__cb_list_lock.__owns_lock()) {
+ __callback_list_lock __cb_list_lock(__mutex_);
+ auto __old = __state_.fetch_or(__stop_requested_bit, std::memory_order_release);
+ if ((__old & __stop_requested_bit) == __stop_requested_bit) {
return false;
}
__requesting_thread_ = this_thread::get_id();
@@ -138,20 +181,15 @@ class __stop_state {
}
_LIBCPP_AVAILABILITY_SYNC _LIBCPP_HIDE_FROM_ABI bool __add_callback(__stop_callback_base* __cb) noexcept {
- // If it is already stop_requested. Do not try to request it again.
- const auto __give_up_trying_to_lock_condition = [__cb](__state_t __state) {
- if ((__state & __stop_requested_bit) != 0) {
- // already stop requested, synchronously run the callback and no need to lock the list again
- __cb->__invoke();
- return true;
- }
- // no stop source. no need to lock the list to add the callback as it can never be invoked
- return (__state >> __stop_source_counter_shift) == 0;
- };
-
- __callback_list_lock __cb_list_lock(__state_, __give_up_trying_to_lock_condition);
+ __callback_list_lock __cb_list_lock(__mutex_);
+ auto __state = __state_.load(std::memory_order_acquire);
+ if ((__state & __stop_requested_bit) != 0) {
+ // already stop requested, synchronously run the callback and no need to lock the list again
+ __cb->__invoke();
+ return false;
+ }
- if (!__cb_list_lock.__owns_lock()) {
+ if ((__state >> __stop_source_counter_shift) == 0) {
return false;
}
@@ -165,7 +203,7 @@ class __stop_state {
// called by the destructor of stop_callback
_LIBCPP_AVAILABILITY_SYNC _LIBCPP_HIDE_FROM_ABI void __remove_callback(__stop_callback_base* __cb) noexcept {
- __callback_list_lock __cb_list_lock(__state_);
+ __callback_list_lock __cb_list_lock(__mutex_);
// under below condition, the request_stop call just popped __cb from the list and could execute it now
bool __potentially_executing_now = __cb->__prev_ == nullptr && !__callback_list_.__is_head(__cb);
@@ -191,30 +229,6 @@ class __stop_state {
}
}
-private:
- _LIBCPP_AVAILABILITY_SYNC _LIBCPP_HIDE_FROM_ABI __callback_list_lock __try_lock_for_request_stop() noexcept {
- // If it is already stop_requested, do not try to request stop or lock the list again.
- const auto __lock_fail_condition = [](__state_t __state) { return (__state & __stop_requested_bit) != 0; };
-
- // set locked and requested bit at the same time
- const auto __after_lock_state = [](__state_t __state) {
- return __state | __callback_list_locked_bit | __stop_requested_bit;
- };
-
- // acq because [thread.stoptoken.intro] Registration of a callback synchronizes with the invocation of that
- // callback. We are going to invoke the callback after getting the lock, acquire so that we can see the
- // registration of a callback (and other writes that happens-before the add_callback)
- // Note: the rel (unlock) in the add_callback syncs with this acq
- // rel because [thread.stoptoken.intro] A call to request_stop that returns true synchronizes with a call
- // to stop_requested on an associated stop_token or stop_source object that returns true.
- // We need to make sure that all writes (including user code) before request_stop will be made visible
- // to the threads that waiting for `stop_requested == true`
- // Note: this rel syncs with the acq in `stop_requested`
- const auto __locked_ordering = std::memory_order_acq_rel;
-
- return __callback_list_lock(__state_, __lock_fail_condition, __after_lock_state, __locked_ordering);
- }
-
template <class _Tp>
friend struct __intrusive_shared_ptr_traits;
};
|
08fe05a
to
5860b7a
Compare
Closing since we decided not to do this. |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Labels
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
stop_token
(this commit has an in progress PR)mutex
in thestop_token
Benchmark results:
Old is origin/main
New is the code in this patch (uses
mutex
in the implementation ofstop_token
My platform is MacOS with an old Intel CPU (Intel(R) Core(TM) i7-4850HQ CPU @ 2.30GHz)
Note that the most realistic test case is
BM_stop_token_async_reg_unreg_callback
(which was given by Lewis Baker)However, using
mutex
is 30% worse on my platformHowever, running on a MacOS with arm64 chip, the results seems quite opposite: