Skip to content

Commit

Permalink
feat engine: reduce contention in coro::Pool
Browse files Browse the repository at this point in the history
Part 2/2 of the efforts to reduce contention in `TaskProcessor`

Uses thread-local `Coroutine` cache and bulk push-pop to reduce contention on global `coro::Pool` coroutine stack queue.

Before:

```
Benchmark                                          Time             CPU   Iterations
------------------------------------------------------------------------------------
engine_multiple_tasks_multiple_threads/1         480 ns          480 ns     15230177
engine_multiple_tasks_multiple_threads/2         721 ns          721 ns      9664295
engine_multiple_tasks_multiple_threads/4        1036 ns         1036 ns      6668418
engine_multiple_tasks_multiple_threads/6        1406 ns         1406 ns      5204157
```

After:

```
Benchmark                                          Time             CPU   Iterations
------------------------------------------------------------------------------------
engine_multiple_tasks_multiple_threads/1         440 ns          440 ns     15851717
engine_multiple_tasks_multiple_threads/2         590 ns          589 ns     11844265
engine_multiple_tasks_multiple_threads/4         838 ns          838 ns      8269024
engine_multiple_tasks_multiple_threads/6        1249 ns         1249 ns      5681120
```
7fb381685354e92514a20aff7d41c808ccea8cac

Pull Request resolved: #584
  • Loading branch information
egor-bystepdev authored and Anton3 committed May 23, 2024
1 parent bbe9f5d commit f392f6b
Show file tree
Hide file tree
Showing 10 changed files with 152 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class Manager;
/// coro_pool.initial_size | amount of coroutines to preallocate on startup | 1000
/// coro_pool.max_size | max amount of coroutines to keep preallocated | 4000
/// coro_pool.stack_size | size of a single coroutine | 256 * 1024
/// coro_pool.local_cache_size | local coroutine cache size per thread | 32
/// event_thread_pool.threads | number of threads to process low level IO system calls (number of ev loops to start in libev) | 2
/// event_thread_pool.thread_name | set OS thread name to this value | 'event-worker'
/// components | dictionary of "component name": "options" | -
Expand Down
11 changes: 11 additions & 0 deletions core/src/components/manager_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,17 @@ additionalProperties: false
type: integer
description: size of a single coroutine, bytes
defaultDescription: 256 * 1024
local_cache_size:
type: integer
description: |
Tunes local coroutine cache size per TaskProcessor worker
thread. Current coro pool size is computed with
an inaccuracy of local_cache_size * total_worker_threads,
which may be relevant when comparing against max_size.
Lower values of local_cache_size lead to lower performance
under heavy contention in the engine, while higher values
lead to inaccuracy in coro pool size estimation.
defaultDescription: 32
event_thread_pool:
type: object
description: event thread pool options
Expand Down
1 change: 1 addition & 0 deletions core/src/components/manager_config_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ constexpr char kConfig[] = R"(
max_size: $coro_pool_max_size
max_size#fallback: 50000
stack_size#env: USERVER_STACK_SIZE
local_cache_size: 32
default_task_processor: main-task-processor
mlock_debug_info: $variable_does_not_exist
mlock_debug_info#env: MLOCK_DEBUG_INFO
Expand Down
116 changes: 105 additions & 11 deletions core/src/engine/coro/pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@
#include <algorithm> // for std::max
#include <atomic>
#include <cerrno>
#include <cstddef>
#include <iterator>
#include <optional>
#include <utility>
#include <vector>

#include <moodycamel/concurrentqueue.h>

#include <coroutines/coroutine.hpp>

#include <userver/logging/log.hpp>
Expand Down Expand Up @@ -34,17 +37,32 @@ class Pool final {
void PutCoroutine(CoroutinePtr&& coroutine_ptr);
PoolStats GetStats() const;
std::size_t GetStackSize() const;
void PrepareLocalCache();
void ClearLocalCache();

private:
Coroutine CreateCoroutine(bool quiet = false);
void OnCoroutineDestruction() noexcept;

bool TryPopulateLocalCache();
void DepopulateLocalCache();

template <typename Token>
Token& GetUsedPoolToken();

const PoolConfig config_;
const Executor executor_;

// Maximum number of coroutines exchanged between used_coroutines_ for thread
// local coroutine cache.
const std::size_t local_coroutine_move_size_;

// Reduces contention by allowing bulk operations on used_coroutines_.
// Coroutines in local_coro_buffer_ are counted as used in statistics.
// Unprotected thread_local is OK here, because coro::Pool is always used
// outside of any coroutine.
static inline thread_local std::vector<Coroutine> local_coro_buffer_;

boost::coroutines2::protected_fixedsize_stack stack_allocator_;

// We aim to reuse coroutines as much as possible,
Expand Down Expand Up @@ -93,11 +111,13 @@ template <typename Task>
Pool<Task>::Pool(PoolConfig config, Executor executor)
: config_(std::move(config)),
executor_(executor),
local_coroutine_move_size_((config_.local_cache_size + 1) / 2),
stack_allocator_(config_.stack_size),
initial_coroutines_(config_.initial_size),
used_coroutines_(config_.max_size),
idle_coroutines_num_(config_.initial_size),
total_coroutines_num_(0) {
UASSERT(local_coroutine_move_size_ < config_.local_cache_size);
moodycamel::ProducerToken token(initial_coroutines_);
for (std::size_t i = 0; i < config_.initial_size; ++i) {
bool ok =
Expand All @@ -119,31 +139,44 @@ typename Pool<Task>::CoroutinePtr Pool<Task>::GetCoroutine() {
return *this;
}
};

std::optional<Coroutine> coroutine;
CoroutineMover mover{coroutine};

// First try to dequeue from 'working set': if we can get a coroutine
// from there we are happy, because we saved on minor-page-faulting (thus
// increasing resident memory usage) a not-yet-de-virtualized coroutine stack.
if (used_coroutines_.try_dequeue(
GetUsedPoolToken<moodycamel::ConsumerToken>(), mover) ||
initial_coroutines_.try_dequeue(mover)) {
if (!local_coro_buffer_.empty() || TryPopulateLocalCache()) {
coroutine = std::move(local_coro_buffer_.back());
local_coro_buffer_.pop_back();
} else if (initial_coroutines_.try_dequeue(mover)) {
--idle_coroutines_num_;
} else {
coroutine.emplace(CreateCoroutine());
}

return CoroutinePtr(std::move(*coroutine), *this);
}

template <typename Task>
void Pool<Task>::PutCoroutine(CoroutinePtr&& coroutine_ptr) {
if (idle_coroutines_num_.load() >= config_.max_size) return;
auto& token = GetUsedPoolToken<moodycamel::ProducerToken>();
const bool ok =
// We only ever return coroutines into our 'working set'.
used_coroutines_.enqueue(token, std::move(coroutine_ptr.Get()));
if (ok) ++idle_coroutines_num_;
if (local_coro_buffer_.size() < config_.local_cache_size) {
local_coro_buffer_.push_back(std::move(coroutine_ptr.Get()));
return;
}

if (config_.local_cache_size == 0) {
const bool ok =
// We only ever return coroutines into our 'working set'.
used_coroutines_.enqueue(GetUsedPoolToken<moodycamel::ProducerToken>(),
std::move(coroutine_ptr.Get()));
if (ok) {
++idle_coroutines_num_;
}
return;
}

DepopulateLocalCache();
local_coro_buffer_.push_back(std::move(coroutine_ptr.Get()));
}

template <typename Task>
Expand All @@ -157,6 +190,30 @@ PoolStats Pool<Task>::GetStats() const {
return stats;
}

template <typename Task>
void Pool<Task>::PrepareLocalCache() {
local_coro_buffer_.reserve(config_.local_cache_size);
}

template <typename Task>
void Pool<Task>::ClearLocalCache() {
const std::size_t current_idle_coroutines_num = idle_coroutines_num_.load();
if (current_idle_coroutines_num < config_.max_size) {
const std::size_t return_to_pool_from_local_cache_num =
std::min(config_.max_size - current_idle_coroutines_num,
local_coro_buffer_.size());

const bool ok = used_coroutines_.enqueue_bulk(
GetUsedPoolToken<moodycamel::ProducerToken>(),
std::make_move_iterator(local_coro_buffer_.begin()),
return_to_pool_from_local_cache_num);
if (ok) {
idle_coroutines_num_.fetch_add(return_to_pool_from_local_cache_num);
}
}
local_coro_buffer_.clear();
}

template <typename Task>
typename Pool<Task>::Coroutine Pool<Task>::CreateCoroutine(bool quiet) {
try {
Expand Down Expand Up @@ -189,6 +246,43 @@ void Pool<Task>::OnCoroutineDestruction() noexcept {
--total_coroutines_num_;
}

template <typename Task>
bool Pool<Task>::TryPopulateLocalCache() {
if (local_coroutine_move_size_ == 0) return false;

const std::size_t dequed_num = used_coroutines_.try_dequeue_bulk(
GetUsedPoolToken<moodycamel::ConsumerToken>(),
std::back_inserter(local_coro_buffer_), local_coroutine_move_size_);
if (dequed_num == 0) return false;

idle_coroutines_num_.fetch_sub(dequed_num);
return true;
}

template <typename Task>
void Pool<Task>::DepopulateLocalCache() {
const std::size_t current_idle_coroutines_num = idle_coroutines_num_.load();

if (current_idle_coroutines_num < config_.max_size) {
const std::size_t return_to_pool_from_local_cache_num =
std::min(config_.max_size - current_idle_coroutines_num,
local_coroutine_move_size_);

const bool ok = used_coroutines_.enqueue_bulk(
GetUsedPoolToken<moodycamel::ProducerToken>(),
std::make_move_iterator(local_coro_buffer_.end() -
return_to_pool_from_local_cache_num),
return_to_pool_from_local_cache_num);
if (ok) {
idle_coroutines_num_.fetch_add(return_to_pool_from_local_cache_num);
}
}

local_coro_buffer_.erase(
local_coro_buffer_.end() - local_coroutine_move_size_,
local_coro_buffer_.end());
}

template <typename Task>
std::size_t Pool<Task>::GetStackSize() const {
return config_.stack_size;
Expand Down
2 changes: 2 additions & 0 deletions core/src/engine/coro/pool_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ PoolConfig Parse(const yaml_config::YamlConfig& value,
config.initial_size = value["initial_size"].As<size_t>(config.initial_size);
config.max_size = value["max_size"].As<size_t>(config.max_size);
config.stack_size = value["stack_size"].As<size_t>(config.stack_size);
config.local_cache_size =
value["local_cache_size"].As<size_t>(config.stack_size);
return config;
}

Expand Down
1 change: 1 addition & 0 deletions core/src/engine/coro/pool_config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ struct PoolConfig {
std::size_t initial_size = 1000;
std::size_t max_size = 4000;
std::size_t stack_size = 256 * 1024ULL;
std::size_t local_cache_size = 32;
};

PoolConfig Parse(const yaml_config::YamlConfig& value,
Expand Down
21 changes: 21 additions & 0 deletions core/src/engine/task/task_benchmark.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,4 +107,25 @@ void thread_yield(benchmark::State& state) {
}
BENCHMARK(thread_yield)->RangeMultiplier(2)->ThreadRange(1, 32);

void engine_multiple_tasks_multiple_threads(benchmark::State& state) {
engine::RunStandalone(state.range(0), [&] {
std::atomic<std::uint64_t> tasks_count_total = 0;
RunParallelBenchmark(state, [&](auto& range) {
std::uint64_t tasks_count = 0;
for ([[maybe_unused]] auto _ : range) {
engine::AsyncNoSpan([] {}).Wait();
tasks_count++;
}
tasks_count_total += tasks_count;
benchmark::DoNotOptimize(tasks_count);
});
benchmark::DoNotOptimize(tasks_count_total);
});
}
BENCHMARK(engine_multiple_tasks_multiple_threads)
->RangeMultiplier(2)
->Range(1, 32)
->Arg(6)
->Arg(12);

USERVER_NAMESPACE_END
7 changes: 7 additions & 0 deletions core/src/engine/task/task_processor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ TaskProcessor::TaskProcessor(TaskProcessorConfig config,
PrepareWorkerThread(i);
workers_left.count_down();
ProcessTasks();
FinalizeWorkerThread();
});
}

Expand Down Expand Up @@ -282,13 +283,19 @@ void TaskProcessor::PrepareWorkerThread(std::size_t index) noexcept {
break;
}

pools_->GetCoroPool().PrepareLocalCache();

utils::SetCurrentThreadName(fmt::format("{}_{}", config_.thread_name, index));

impl::SetLocalTaskCounterData(task_counter_, index);

TaskProcessorThreadStartedHook();
}

void TaskProcessor::FinalizeWorkerThread() noexcept {
pools_->GetCoroPool().ClearLocalCache();
}

void TaskProcessor::ProcessTasks() noexcept {
while (true) {
auto context = task_queue_.PopBlocking();
Expand Down
2 changes: 2 additions & 0 deletions core/src/engine/task/task_processor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ class TaskProcessor final {

void PrepareWorkerThread(std::size_t index) noexcept;

void FinalizeWorkerThread() noexcept;

void ProcessTasks() noexcept;

void CheckWaitTime(impl::TaskContext& context);
Expand Down
1 change: 1 addition & 0 deletions scripts/docs/en/userver/deploy_env.md
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,7 @@ components_manager:
coro_pool:
initial_size: 100 # Save memory and do not allocate many coroutines at start.
max_size: 200 # Do not keep more than 200 preallocated coroutines.
local_cache_size: 8 # Reduce thread-local coroutine cache size to avoid consuming extra coroutines.
task_processors:
main-task-processor:
Expand Down

0 comments on commit f392f6b

Please sign in to comment.