Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

engine: pool optimization #584

Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class Manager;
/// coro_pool.initial_size | amount of coroutines to preallocate on startup | 1000
/// coro_pool.max_size | max amount of coroutines to keep preallocated | 4000
/// coro_pool.stack_size | size of a single coroutine | 256 * 1024
/// coro_pool.local_cache_size | local coroutine cache size per thread | 32
/// event_thread_pool.threads | number of threads to process low level IO system calls (number of ev loops to start in libev) | 2
/// event_thread_pool.thread_name | set OS thread name to this value | 'event-worker'
/// components | dictionary of "component name": "options" | -
Expand Down
4 changes: 4 additions & 0 deletions core/src/components/manager_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ additionalProperties: false
type: integer
description: size of a single coroutine, bytes
defaultDescription: 256 * 1024
local_cache_size:
type: integer
description: local coroutine cache size per thread
defaultDescription: 32
event_thread_pool:
type: object
description: event thread pool options
Expand Down
1 change: 1 addition & 0 deletions core/src/components/manager_config_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ constexpr char kConfig[] = R"(
max_size: $coro_pool_max_size
max_size#fallback: 50000
stack_size#env: USERVER_STACK_SIZE
local_cache_size: 32
default_task_processor: main-task-processor
mlock_debug_info: $variable_does_not_exist
mlock_debug_info#env: MLOCK_DEBUG_INFO
Expand Down
117 changes: 106 additions & 11 deletions core/src/engine/coro/pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
#include <algorithm> // for std::max
#include <atomic>
#include <cerrno>
#include <iterator>
#include <optional>
Anton3 marked this conversation as resolved.
Show resolved Hide resolved
#include <utility>

#include <moodycamel/concurrentqueue.h>
Expand Down Expand Up @@ -34,9 +36,13 @@ class Pool final {
void PutCoroutine(CoroutinePtr&& coroutine_ptr);
PoolStats GetStats() const;
std::size_t GetStackSize() const;
void PrepareLocalCache();
void ClearLocalCache();

private:
Coroutine CreateCoroutine(bool quiet = false);
bool TryPopulateLocalCache();
void ReduceLocalCacheBufferSize(std::size_t new_size);
void OnCoroutineDestruction() noexcept;

template <typename Token>
Expand All @@ -45,6 +51,13 @@ class Pool final {
const PoolConfig config_;
const Executor executor_;

// Maximum number of coroutines exchanged between
// used_coroutines_ for thread local coroutine cache
// local_coroutine_move_size_ <= config_.local_cache_size
const std::size_t local_coroutine_move_size_;

static inline thread_local std::vector<Coroutine> local_coro_buffer_;

boost::coroutines2::protected_fixedsize_stack stack_allocator_;

// We aim to reuse coroutines as much as possible,
Expand Down Expand Up @@ -93,6 +106,7 @@ template <typename Task>
Pool<Task>::Pool(PoolConfig config, Executor executor)
: config_(std::move(config)),
executor_(executor),
local_coroutine_move_size_((config_.local_cache_size + 1) / 2),
stack_allocator_(config_.stack_size),
initial_coroutines_(config_.initial_size),
used_coroutines_(config_.max_size),
Expand All @@ -119,31 +133,61 @@ typename Pool<Task>::CoroutinePtr Pool<Task>::GetCoroutine() {
return *this;
}
};

std::optional<Coroutine> coroutine;
CoroutineMover mover{coroutine};

// First try to dequeue from 'working set': if we can get a coroutine
egor-bystepdev marked this conversation as resolved.
Show resolved Hide resolved
// from there we are happy, because we saved on minor-page-faulting (thus
// increasing resident memory usage) a not-yet-de-virtualized coroutine stack.
if (used_coroutines_.try_dequeue(
GetUsedPoolToken<moodycamel::ConsumerToken>(), mover) ||
initial_coroutines_.try_dequeue(mover)) {
--idle_coroutines_num_;
if (!local_coro_buffer_.empty() || TryPopulateLocalCache()) {
coroutine = std::move(local_coro_buffer_.back());
local_coro_buffer_.pop_back();
} else if (initial_coroutines_.try_dequeue(mover)) {
idle_coroutines_num_.fetch_sub(1);
} else {
coroutine.emplace(CreateCoroutine());
}

return CoroutinePtr(std::move(*coroutine), *this);
}

template <typename Task>
void Pool<Task>::PutCoroutine(CoroutinePtr&& coroutine_ptr) {
if (idle_coroutines_num_.load() >= config_.max_size) return;
auto& token = GetUsedPoolToken<moodycamel::ProducerToken>();
const bool ok =
// We only ever return coroutines into our 'working set'.
used_coroutines_.enqueue(token, std::move(coroutine_ptr.Get()));
if (ok) ++idle_coroutines_num_;
if (local_coro_buffer_.size() < config_.max_size) {
local_coro_buffer_.push_back(std::move(coroutine_ptr.Get()));
return;
}

const std::size_t current_idle_coroutines_num = idle_coroutines_num_.load();
if (current_idle_coroutines_num >= config_.max_size) {
ReduceLocalCacheBufferSize(local_coro_buffer_.size() -
local_coroutine_move_size_);
return;
}

if (config_.local_cache_size == 0) {
if (used_coroutines_.enqueue(GetUsedPoolToken<moodycamel::ProducerToken>(),
std::move(coroutine_ptr.Get()))) {
++idle_coroutines_num_;
}
return;
}

const std::size_t return_to_pool_from_local_cache_num =
std::min(config_.max_size - current_idle_coroutines_num,
local_coroutine_move_size_);

if (used_coroutines_.enqueue_bulk(
GetUsedPoolToken<moodycamel::ProducerToken>(),
std::make_move_iterator(local_coro_buffer_.begin()) +
local_coro_buffer_.size() - return_to_pool_from_local_cache_num,
return_to_pool_from_local_cache_num)) {
idle_coroutines_num_.fetch_add(return_to_pool_from_local_cache_num);
}

ReduceLocalCacheBufferSize(local_coro_buffer_.size() -
local_coroutine_move_size_);
local_coro_buffer_.push_back(std::move(coroutine_ptr.Get()));
}

template <typename Task>
Expand All @@ -157,6 +201,29 @@ PoolStats Pool<Task>::GetStats() const {
return stats;
}

template <typename Task>
void Pool<Task>::PrepareLocalCache() {
local_coro_buffer_.reserve(config_.local_cache_size);
}

template <typename Task>
void Pool<Task>::ClearLocalCache() {
const std::size_t current_idle_coroutines_num = idle_coroutines_num_.load();
if (current_idle_coroutines_num < config_.max_size) {
Anton3 marked this conversation as resolved.
Show resolved Hide resolved
const std::size_t return_to_pool_from_local_cache_num =
std::min(config_.max_size - current_idle_coroutines_num,
local_coro_buffer_.size());

if (used_coroutines_.enqueue_bulk(
GetUsedPoolToken<moodycamel::ProducerToken>(),
std::make_move_iterator(local_coro_buffer_.begin()),
return_to_pool_from_local_cache_num)) {
idle_coroutines_num_.fetch_add(return_to_pool_from_local_cache_num);
}
}
ReduceLocalCacheBufferSize(0);
}

template <typename Task>
typename Pool<Task>::Coroutine Pool<Task>::CreateCoroutine(bool quiet) {
try {
Expand Down Expand Up @@ -184,6 +251,34 @@ typename Pool<Task>::Coroutine Pool<Task>::CreateCoroutine(bool quiet) {
}
}

template <typename Task>
bool Pool<Task>::TryPopulateLocalCache() {
// using initial_coroutines_.size_approx() fast
// because it has one producer queue
const std::size_t deque_num =
Anton3 marked this conversation as resolved.
Show resolved Hide resolved
std::min(idle_coroutines_num_.load() - initial_coroutines_.size_approx(),
config_.local_cache_size);
if (deque_num > 0) {
const std::size_t dequed_num = used_coroutines_.try_dequeue_bulk(
GetUsedPoolToken<moodycamel::ConsumerToken>(),
std::back_inserter(local_coro_buffer_), deque_num);

if (dequed_num > 0) {
idle_coroutines_num_.fetch_sub(dequed_num);
return true;
}
}
return false;
}

template <typename Task>
void Pool<Task>::ReduceLocalCacheBufferSize(std::size_t new_size) {
UASSERT(new_size <= local_coro_buffer_.size());
while (local_coro_buffer_.size() > new_size) {
local_coro_buffer_.pop_back();
}
}

template <typename Task>
void Pool<Task>::OnCoroutineDestruction() noexcept {
--total_coroutines_num_;
Expand Down
2 changes: 2 additions & 0 deletions core/src/engine/coro/pool_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ PoolConfig Parse(const yaml_config::YamlConfig& value,
config.initial_size = value["initial_size"].As<size_t>(config.initial_size);
config.max_size = value["max_size"].As<size_t>(config.max_size);
config.stack_size = value["stack_size"].As<size_t>(config.stack_size);
config.local_cache_size =
value["local_cache_size"].As<size_t>(config.stack_size);
return config;
}

Expand Down
1 change: 1 addition & 0 deletions core/src/engine/coro/pool_config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ struct PoolConfig {
std::size_t initial_size = 1000;
std::size_t max_size = 4000;
std::size_t stack_size = 256 * 1024ULL;
std::size_t local_cache_size = 32;
};

PoolConfig Parse(const yaml_config::YamlConfig& value,
Expand Down
21 changes: 21 additions & 0 deletions core/src/engine/task/task_benchmark.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,4 +107,25 @@ void thread_yield(benchmark::State& state) {
}
BENCHMARK(thread_yield)->RangeMultiplier(2)->ThreadRange(1, 32);

void engine_multiple_tasks_multiple_threads(benchmark::State& state) {
engine::RunStandalone(state.range(0), [&] {
std::atomic<std::uint64_t> tasks_count_total = 0;
RunParallelBenchmark(state, [&](auto& range) {
std::uint64_t tasks_count = 0;
for ([[maybe_unused]] auto _ : range) {
engine::AsyncNoSpan([] {}).Wait();
tasks_count++;
}
tasks_count_total += tasks_count;
benchmark::DoNotOptimize(tasks_count);
});
benchmark::DoNotOptimize(tasks_count_total);
});
}
BENCHMARK(engine_multiple_tasks_multiple_threads)
->RangeMultiplier(2)
->Range(1, 32)
->Arg(6)
->Arg(12);

USERVER_NAMESPACE_END
7 changes: 7 additions & 0 deletions core/src/engine/task/task_processor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ TaskProcessor::TaskProcessor(TaskProcessorConfig config,
PrepareWorkerThread(i);
workers_left.count_down();
ProcessTasks();
FinalizeWorkerThread();
});
}

Expand Down Expand Up @@ -282,13 +283,19 @@ void TaskProcessor::PrepareWorkerThread(std::size_t index) noexcept {
break;
}

pools_->GetCoroPool().PrepareLocalCache();

utils::SetCurrentThreadName(fmt::format("{}_{}", config_.thread_name, index));

impl::SetLocalTaskCounterData(task_counter_, index);

TaskProcessorThreadStartedHook();
}

void TaskProcessor::FinalizeWorkerThread() noexcept {
pools_->GetCoroPool().ClearLocalCache();
}

void TaskProcessor::ProcessTasks() noexcept {
while (true) {
auto context = task_queue_.PopBlocking();
Expand Down
2 changes: 2 additions & 0 deletions core/src/engine/task/task_processor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ class TaskProcessor final {

void PrepareWorkerThread(std::size_t index) noexcept;

void FinalizeWorkerThread() noexcept;

void ProcessTasks() noexcept;

void CheckWaitTime(impl::TaskContext& context);
Expand Down