diff --git a/core/include/userver/components/manager_controller_component.hpp b/core/include/userver/components/manager_controller_component.hpp index c50f4f60dbd..850dd88dc46 100644 --- a/core/include/userver/components/manager_controller_component.hpp +++ b/core/include/userver/components/manager_controller_component.hpp @@ -33,6 +33,7 @@ class Manager; /// coro_pool.initial_size | amount of coroutines to preallocate on startup | 1000 /// coro_pool.max_size | max amount of coroutines to keep preallocated | 4000 /// coro_pool.stack_size | size of a single coroutine | 256 * 1024 +/// coro_pool.local_cache_size | local coroutine cache size per thread | 32 /// event_thread_pool.threads | number of threads to process low level IO system calls (number of ev loops to start in libev) | 2 /// event_thread_pool.thread_name | set OS thread name to this value | 'event-worker' /// components | dictionary of "component name": "options" | - diff --git a/core/src/components/manager_config.cpp b/core/src/components/manager_config.cpp index 7833223b3a7..220eea2a789 100644 --- a/core/src/components/manager_config.cpp +++ b/core/src/components/manager_config.cpp @@ -103,6 +103,10 @@ additionalProperties: false type: integer description: size of a single coroutine, bytes defaultDescription: 256 * 1024 + local_cache_size: + type: integer + description: local coroutine cache size per thread + defaultDescription: 32 event_thread_pool: type: object description: event thread pool options diff --git a/core/src/components/manager_config_test.cpp b/core/src/components/manager_config_test.cpp index 2246ecb90c4..cdd8818149b 100644 --- a/core/src/components/manager_config_test.cpp +++ b/core/src/components/manager_config_test.cpp @@ -20,6 +20,7 @@ constexpr char kConfig[] = R"( max_size: $coro_pool_max_size max_size#fallback: 50000 stack_size#env: USERVER_STACK_SIZE + local_cache_size: 32 default_task_processor: main-task-processor mlock_debug_info: $variable_does_not_exist mlock_debug_info#env: MLOCK_DEBUG_INFO diff --git a/core/src/engine/coro/pool.hpp b/core/src/engine/coro/pool.hpp index d568bd7c937..724a460b0b7 100644 --- a/core/src/engine/coro/pool.hpp +++ b/core/src/engine/coro/pool.hpp @@ -3,6 +3,8 @@ #include // for std::max #include #include +#include +#include #include #include @@ -34,9 +36,13 @@ class Pool final { void PutCoroutine(CoroutinePtr&& coroutine_ptr); PoolStats GetStats() const; std::size_t GetStackSize() const; + void PrepareLocalCache(); + void ClearLocalCache(); private: Coroutine CreateCoroutine(bool quiet = false); + bool TryPopulateLocalCache(); + void ReduceLocalCacheBufferSize(std::size_t new_size); void OnCoroutineDestruction() noexcept; template @@ -45,6 +51,13 @@ class Pool final { const PoolConfig config_; const Executor executor_; + // Maximum number of coroutines exchanged between + // used_coroutines_ for thread local coroutine cache + // local_coroutine_move_size_ <= config_.local_cache_size + const std::size_t local_coroutine_move_size_; + + static inline thread_local std::vector local_coro_buffer_; + boost::coroutines2::protected_fixedsize_stack stack_allocator_; // We aim to reuse coroutines as much as possible, @@ -93,6 +106,7 @@ template Pool::Pool(PoolConfig config, Executor executor) : config_(std::move(config)), executor_(executor), + local_coroutine_move_size_((config_.local_cache_size + 1) / 2), stack_allocator_(config_.stack_size), initial_coroutines_(config_.initial_size), used_coroutines_(config_.max_size), @@ -119,31 +133,61 @@ typename Pool::CoroutinePtr Pool::GetCoroutine() { return *this; } }; - std::optional coroutine; CoroutineMover mover{coroutine}; // First try to dequeue from 'working set': if we can get a coroutine // from there we are happy, because we saved on minor-page-faulting (thus // increasing resident memory usage) a not-yet-de-virtualized coroutine stack. - if (used_coroutines_.try_dequeue( - GetUsedPoolToken(), mover) || - initial_coroutines_.try_dequeue(mover)) { - --idle_coroutines_num_; + if (!local_coro_buffer_.empty() || TryPopulateLocalCache()) { + coroutine = std::move(local_coro_buffer_.back()); + local_coro_buffer_.pop_back(); + } else if (initial_coroutines_.try_dequeue(mover)) { + idle_coroutines_num_.fetch_sub(1); } else { coroutine.emplace(CreateCoroutine()); } + return CoroutinePtr(std::move(*coroutine), *this); } template void Pool::PutCoroutine(CoroutinePtr&& coroutine_ptr) { - if (idle_coroutines_num_.load() >= config_.max_size) return; - auto& token = GetUsedPoolToken(); - const bool ok = - // We only ever return coroutines into our 'working set'. - used_coroutines_.enqueue(token, std::move(coroutine_ptr.Get())); - if (ok) ++idle_coroutines_num_; + if (local_coro_buffer_.size() < config_.max_size) { + local_coro_buffer_.push_back(std::move(coroutine_ptr.Get())); + return; + } + + const std::size_t current_idle_coroutines_num = idle_coroutines_num_.load(); + if (current_idle_coroutines_num >= config_.max_size) { + ReduceLocalCacheBufferSize(local_coro_buffer_.size() - + local_coroutine_move_size_); + return; + } + + if (config_.local_cache_size == 0) { + if (used_coroutines_.enqueue(GetUsedPoolToken(), + std::move(coroutine_ptr.Get()))) { + ++idle_coroutines_num_; + } + return; + } + + const std::size_t return_to_pool_from_local_cache_num = + std::min(config_.max_size - current_idle_coroutines_num, + local_coroutine_move_size_); + + if (used_coroutines_.enqueue_bulk( + GetUsedPoolToken(), + std::make_move_iterator(local_coro_buffer_.begin()) + + local_coro_buffer_.size() - return_to_pool_from_local_cache_num, + return_to_pool_from_local_cache_num)) { + idle_coroutines_num_.fetch_add(return_to_pool_from_local_cache_num); + } + + ReduceLocalCacheBufferSize(local_coro_buffer_.size() - + local_coroutine_move_size_); + local_coro_buffer_.push_back(std::move(coroutine_ptr.Get())); } template @@ -157,6 +201,29 @@ PoolStats Pool::GetStats() const { return stats; } +template +void Pool::PrepareLocalCache() { + local_coro_buffer_.reserve(config_.local_cache_size); +} + +template +void Pool::ClearLocalCache() { + const std::size_t current_idle_coroutines_num = idle_coroutines_num_.load(); + if (current_idle_coroutines_num < config_.max_size) { + const std::size_t return_to_pool_from_local_cache_num = + std::min(config_.max_size - current_idle_coroutines_num, + local_coro_buffer_.size()); + + if (used_coroutines_.enqueue_bulk( + GetUsedPoolToken(), + std::make_move_iterator(local_coro_buffer_.begin()), + return_to_pool_from_local_cache_num)) { + idle_coroutines_num_.fetch_add(return_to_pool_from_local_cache_num); + } + } + ReduceLocalCacheBufferSize(0); +} + template typename Pool::Coroutine Pool::CreateCoroutine(bool quiet) { try { @@ -184,6 +251,34 @@ typename Pool::Coroutine Pool::CreateCoroutine(bool quiet) { } } +template +bool Pool::TryPopulateLocalCache() { + // using initial_coroutines_.size_approx() fast + // because it has one producer queue + const std::size_t deque_num = + std::min(idle_coroutines_num_.load() - initial_coroutines_.size_approx(), + config_.local_cache_size); + if (deque_num > 0) { + const std::size_t dequed_num = used_coroutines_.try_dequeue_bulk( + GetUsedPoolToken(), + std::back_inserter(local_coro_buffer_), deque_num); + + if (dequed_num > 0) { + idle_coroutines_num_.fetch_sub(dequed_num); + return true; + } + } + return false; +} + +template +void Pool::ReduceLocalCacheBufferSize(std::size_t new_size) { + UASSERT(new_size <= local_coro_buffer_.size()); + while (local_coro_buffer_.size() > new_size) { + local_coro_buffer_.pop_back(); + } +} + template void Pool::OnCoroutineDestruction() noexcept { --total_coroutines_num_; diff --git a/core/src/engine/coro/pool_config.cpp b/core/src/engine/coro/pool_config.cpp index 46c35aa65ef..c4b0f267bfb 100644 --- a/core/src/engine/coro/pool_config.cpp +++ b/core/src/engine/coro/pool_config.cpp @@ -10,6 +10,8 @@ PoolConfig Parse(const yaml_config::YamlConfig& value, config.initial_size = value["initial_size"].As(config.initial_size); config.max_size = value["max_size"].As(config.max_size); config.stack_size = value["stack_size"].As(config.stack_size); + config.local_cache_size = + value["local_cache_size"].As(config.stack_size); return config; } diff --git a/core/src/engine/coro/pool_config.hpp b/core/src/engine/coro/pool_config.hpp index bf2f3ac68fc..a6bd12f4531 100644 --- a/core/src/engine/coro/pool_config.hpp +++ b/core/src/engine/coro/pool_config.hpp @@ -13,6 +13,7 @@ struct PoolConfig { std::size_t initial_size = 1000; std::size_t max_size = 4000; std::size_t stack_size = 256 * 1024ULL; + std::size_t local_cache_size = 32; }; PoolConfig Parse(const yaml_config::YamlConfig& value, diff --git a/core/src/engine/task/task_benchmark.cpp b/core/src/engine/task/task_benchmark.cpp index a4e5fa1440f..e8d75700e60 100644 --- a/core/src/engine/task/task_benchmark.cpp +++ b/core/src/engine/task/task_benchmark.cpp @@ -107,4 +107,25 @@ void thread_yield(benchmark::State& state) { } BENCHMARK(thread_yield)->RangeMultiplier(2)->ThreadRange(1, 32); +void engine_multiple_tasks_multiple_threads(benchmark::State& state) { + engine::RunStandalone(state.range(0), [&] { + std::atomic tasks_count_total = 0; + RunParallelBenchmark(state, [&](auto& range) { + std::uint64_t tasks_count = 0; + for ([[maybe_unused]] auto _ : range) { + engine::AsyncNoSpan([] {}).Wait(); + tasks_count++; + } + tasks_count_total += tasks_count; + benchmark::DoNotOptimize(tasks_count); + }); + benchmark::DoNotOptimize(tasks_count_total); + }); +} +BENCHMARK(engine_multiple_tasks_multiple_threads) + ->RangeMultiplier(2) + ->Range(1, 32) + ->Arg(6) + ->Arg(12); + USERVER_NAMESPACE_END diff --git a/core/src/engine/task/task_processor.cpp b/core/src/engine/task/task_processor.cpp index e3d0db99d9c..b225232ed28 100644 --- a/core/src/engine/task/task_processor.cpp +++ b/core/src/engine/task/task_processor.cpp @@ -102,6 +102,7 @@ TaskProcessor::TaskProcessor(TaskProcessorConfig config, PrepareWorkerThread(i); workers_left.count_down(); ProcessTasks(); + FinalizeWorkerThread(); }); } @@ -282,6 +283,8 @@ void TaskProcessor::PrepareWorkerThread(std::size_t index) noexcept { break; } + pools_->GetCoroPool().PrepareLocalCache(); + utils::SetCurrentThreadName(fmt::format("{}_{}", config_.thread_name, index)); impl::SetLocalTaskCounterData(task_counter_, index); @@ -289,6 +292,10 @@ void TaskProcessor::PrepareWorkerThread(std::size_t index) noexcept { TaskProcessorThreadStartedHook(); } +void TaskProcessor::FinalizeWorkerThread() noexcept { + pools_->GetCoroPool().ClearLocalCache(); +} + void TaskProcessor::ProcessTasks() noexcept { while (true) { auto context = task_queue_.PopBlocking(); diff --git a/core/src/engine/task/task_processor.hpp b/core/src/engine/task/task_processor.hpp index 88285e1b1a5..b4a12296f91 100644 --- a/core/src/engine/task/task_processor.hpp +++ b/core/src/engine/task/task_processor.hpp @@ -92,6 +92,8 @@ class TaskProcessor final { void PrepareWorkerThread(std::size_t index) noexcept; + void FinalizeWorkerThread() noexcept; + void ProcessTasks() noexcept; void CheckWaitTime(impl::TaskContext& context);