Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

engine: pool optimization #584

127 changes: 100 additions & 27 deletions core/src/engine/coro/pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
#include <algorithm> // for std::max
#include <atomic>
#include <cerrno>
#include <iterator>
#include <optional>
Anton3 marked this conversation as resolved.
Show resolved Hide resolved
#include <utility>

#include <moodycamel/concurrentqueue.h>
Expand Down Expand Up @@ -34,9 +36,11 @@ class Pool final {
void PutCoroutine(CoroutinePtr&& coroutine_ptr);
PoolStats GetStats() const;
std::size_t GetStackSize() const;
void ClearLocalCache();

private:
Coroutine CreateCoroutine(bool quiet = false);
bool TryPopulateLocalCache();
void OnCoroutineDestruction() noexcept;

template <typename Token>
Expand All @@ -45,6 +49,17 @@ class Pool final {
const PoolConfig config_;
const Executor executor_;

// Maximum thread local coroutine cache size
static constexpr std::size_t kLocalCoroutineCacheMaxSize = 32;
// Maximum number of coroutines exchanged between
// used_coroutines_ for thread local coroutine cache
// for sure kLocalCoroutineMoveSize <= kLocalCoroutineCacheMaxSize
static constexpr std::size_t kLocalCoroutineMoveSize = 16;
static_assert(kLocalCoroutineMoveSize <= kLocalCoroutineCacheMaxSize);

static inline thread_local std::vector<std::optional<Coroutine>>
egor-bystepdev marked this conversation as resolved.
Show resolved Hide resolved
local_coro_buffer_;

boost::coroutines2::protected_fixedsize_stack stack_allocator_;

// We aim to reuse coroutines as much as possible,
Expand All @@ -53,8 +68,8 @@ class Pool final {
//
// The same could've been achieved with some LIFO container, but apparently
// we don't have a container handy enough to not just use 2 queues.
moodycamel::ConcurrentQueue<Coroutine> initial_coroutines_;
moodycamel::ConcurrentQueue<Coroutine> used_coroutines_;
moodycamel::ConcurrentQueue<std::optional<Coroutine>> initial_coroutines_;
moodycamel::ConcurrentQueue<std::optional<Coroutine>> used_coroutines_;

std::atomic<std::size_t> idle_coroutines_num_;
std::atomic<std::size_t> total_coroutines_num_;
Expand All @@ -64,7 +79,9 @@ template <typename Task>
class Pool<Task>::CoroutinePtr final {
public:
CoroutinePtr(Coroutine&& coro, Pool<Task>& pool) noexcept
: coro_(std::move(coro)), pool_(&pool) {}
: coro_(std::move(coro)), pool_(&pool) {
UASSERT(kLocalCoroutineMoveSize <= kLocalCoroutineCacheMaxSize);
egor-bystepdev marked this conversation as resolved.
Show resolved Hide resolved
}

CoroutinePtr(CoroutinePtr&&) noexcept = default;
CoroutinePtr& operator=(CoroutinePtr&&) noexcept = default;
Expand Down Expand Up @@ -111,39 +128,54 @@ Pool<Task>::~Pool() = default;

template <typename Task>
typename Pool<Task>::CoroutinePtr Pool<Task>::GetCoroutine() {
struct CoroutineMover {
std::optional<Coroutine>& result;

CoroutineMover& operator=(Coroutine&& coro) {
result.emplace(std::move(coro));
return *this;
}
};

std::optional<Coroutine> coroutine;
CoroutineMover mover{coroutine};

// First try to dequeue from 'working set': if we can get a coroutine
// from there we are happy, because we saved on minor-page-faulting (thus
// increasing resident memory usage) a not-yet-de-virtualized coroutine stack.
if (used_coroutines_.try_dequeue(
GetUsedPoolToken<moodycamel::ConsumerToken>(), mover) ||
initial_coroutines_.try_dequeue(mover)) {
--idle_coroutines_num_;

// First try to dequeue from 'working set' from local cache and maybe
// fetch Coroutine from used_coroutines_ to local cache and
// if we can get a coroutine from there we are happy, because
// we saved on minor-page-faulting (thusincreasing resident memory usage)
// a not-yet-de-virtualized coroutine stack.
if (!local_coro_buffer_.empty() || TryPopulateLocalCache()) {
coroutine = std::move(local_coro_buffer_.back());
local_coro_buffer_.pop_back();
} else if (initial_coroutines_.try_dequeue(coroutine)) {
idle_coroutines_num_.fetch_sub(1, std::memory_order_release);
egor-bystepdev marked this conversation as resolved.
Show resolved Hide resolved
} else {
coroutine.emplace(CreateCoroutine());
}

return CoroutinePtr(std::move(*coroutine), *this);
}

template <typename Task>
void Pool<Task>::PutCoroutine(CoroutinePtr&& coroutine_ptr) {
if (idle_coroutines_num_.load() >= config_.max_size) return;
auto& token = GetUsedPoolToken<moodycamel::ProducerToken>();
const bool ok =
// We only ever return coroutines into our 'working set'.
used_coroutines_.enqueue(token, std::move(coroutine_ptr.Get()));
if (ok) ++idle_coroutines_num_;
local_coro_buffer_.push_back(std::move(coroutine_ptr.Get()));
if (local_coro_buffer_.size() <= kLocalCoroutineCacheMaxSize) {
return;
}

const std::size_t current_idle_coroutines_num =
idle_coroutines_num_.load(std::memory_order_acquire);
egor-bystepdev marked this conversation as resolved.
Show resolved Hide resolved
if (current_idle_coroutines_num >= config_.max_size) {
local_coro_buffer_.resize(local_coro_buffer_.size() -
kLocalCoroutineMoveSize);
return;
}

const std::size_t return_to_pool_from_local_cache_num = std::min(
config_.max_size - current_idle_coroutines_num, kLocalCoroutineMoveSize);

if (used_coroutines_.enqueue_bulk(
GetUsedPoolToken<moodycamel::ProducerToken>(),
std::make_move_iterator(local_coro_buffer_.begin()) +
local_coro_buffer_.size() - return_to_pool_from_local_cache_num,
return_to_pool_from_local_cache_num)) {
idle_coroutines_num_.fetch_add(return_to_pool_from_local_cache_num,
std::memory_order_release);
egor-bystepdev marked this conversation as resolved.
Show resolved Hide resolved
}

local_coro_buffer_.resize(local_coro_buffer_.size() -
kLocalCoroutineMoveSize);
}

template <typename Task>
Expand All @@ -157,6 +189,26 @@ PoolStats Pool<Task>::GetStats() const {
return stats;
}

template <typename Task>
void Pool<Task>::ClearLocalCache() {
const std::size_t current_idle_coroutines_num =
idle_coroutines_num_.load(std::memory_order_acquire);
if (current_idle_coroutines_num < config_.max_size) {
Anton3 marked this conversation as resolved.
Show resolved Hide resolved
const std::size_t return_to_pool_from_local_cache_num =
std::min(config_.max_size - current_idle_coroutines_num,
local_coro_buffer_.size());

if (used_coroutines_.enqueue_bulk(
GetUsedPoolToken<moodycamel::ProducerToken>(),
std::make_move_iterator(local_coro_buffer_.begin()),
return_to_pool_from_local_cache_num)) {
idle_coroutines_num_.fetch_add(return_to_pool_from_local_cache_num,
std::memory_order_release);
egor-bystepdev marked this conversation as resolved.
Show resolved Hide resolved
}
}
local_coro_buffer_.clear();
}

template <typename Task>
typename Pool<Task>::Coroutine Pool<Task>::CreateCoroutine(bool quiet) {
try {
Expand Down Expand Up @@ -184,6 +236,27 @@ typename Pool<Task>::Coroutine Pool<Task>::CreateCoroutine(bool quiet) {
}
}

template <typename Task>
bool Pool<Task>::TryPopulateLocalCache() {
// using initial_coroutines_.size_approx() fast
// because it has one producer queue
const std::size_t deque_num =
Anton3 marked this conversation as resolved.
Show resolved Hide resolved
std::min(idle_coroutines_num_.load(std::memory_order_acquire) -
initial_coroutines_.size_approx(),
kLocalCoroutineMoveSize);
if (deque_num > 0) {
const std::size_t dequed_num = used_coroutines_.try_dequeue_bulk(
GetUsedPoolToken<moodycamel::ConsumerToken>(),
std::back_inserter(local_coro_buffer_), deque_num);

if (dequed_num > 0) {
idle_coroutines_num_.fetch_sub(dequed_num, std::memory_order_release);
return true;
}
}
return false;
}

template <typename Task>
void Pool<Task>::OnCoroutineDestruction() noexcept {
--total_coroutines_num_;
Expand Down
5 changes: 5 additions & 0 deletions core/src/engine/task/task_processor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ TaskProcessor::TaskProcessor(TaskProcessorConfig config,
PrepareWorkerThread(i);
workers_left.count_down();
ProcessTasks();
FinalizeWorkerThread();
});
}

Expand Down Expand Up @@ -289,6 +290,10 @@ void TaskProcessor::PrepareWorkerThread(std::size_t index) noexcept {
TaskProcessorThreadStartedHook();
}

void TaskProcessor::FinalizeWorkerThread() noexcept {
pools_->GetCoroPool().ClearLocalCache();
}

void TaskProcessor::ProcessTasks() noexcept {
while (true) {
auto context = task_queue_.PopBlocking();
Expand Down
2 changes: 2 additions & 0 deletions core/src/engine/task/task_processor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ class TaskProcessor final {

void PrepareWorkerThread(std::size_t index) noexcept;

void FinalizeWorkerThread() noexcept;

void ProcessTasks() noexcept;

void CheckWaitTime(impl::TaskContext& context);
Expand Down