Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat engine: add WorkStealingTaskQueue #528

Closed
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ class Manager;
/// worker_threads | threads count for the task processor | -
/// os-scheduling | OS scheduling mode for the task processor threads. 'idle' sets the lowest priority. 'low-priority' sets the priority below 'normal' but higher than 'idle'. | normal
/// spinning-iterations | tunes the number of spin-wait iterations in case of an empty task queue before threads go to sleep | 10000
/// task-processor-queue | Task queue mode for the task processor. `global-task-queue` default task queue. `work-stealing-task-queue` experimental with potentially better scalability than `global-task-queue`.
/// task-trace | optional dictionary of tracing options | empty (disabled)
/// task-trace.every | set N to trace each Nth task | 1000
/// task-trace.max-context-switch-count | set upper limit of context switches to trace for a single task | 1000
Expand Down
1 change: 1 addition & 0 deletions core/src/components/common_component_list_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ constexpr std::string_view kStaticConfig = R"(
thread_name: bg-worker
worker_threads: 2
os-scheduling: idle
task-processor-queue: global-task-queue
task-trace:
every: 1000
max-context-switch-count: 1000
Expand Down
11 changes: 11 additions & 0 deletions core/src/components/manager_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,17 @@ additionalProperties: false
tunes the number of spin-wait iterations in case of
an empty task queue before threads go to sleep
defaultDescription: 10000
task-processor-queue:
type: string
description: |
Task queue mode for the task processor.
`global-task-queue` default task queue.
`work-stealing-task-queue` experimental with
potentially better scalability than `global-task-queue`.
defaultDescription: global-task-queue
enum:
- global-task-queue
- work-stealing-task-queue
task-trace:
type: object
description: .
Expand Down
1 change: 1 addition & 0 deletions core/src/components/manager_config_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ constexpr char kConfig[] = R"(
worker_threads: $bg_worker_threads
worker_threads#fallback: 2
os-scheduling: low-priority
task-processor-queue: global-task-queue
fs-task-processor:
thread_name: fs-worker
worker_threads: $fs_worker_threads
Expand Down
4 changes: 4 additions & 0 deletions core/src/engine/sleep.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <userver/engine/task/cancel.hpp>

#include <engine/task/task_context.hpp>
#include <userver/utils/fast_scope_guard.hpp>

USERVER_NAMESPACE_BEGIN

Expand All @@ -23,6 +24,9 @@ class CommonSleepWaitStrategy final : public WaitStrategy {

void InterruptibleSleepUntil(Deadline deadline) {
auto& current = current_task::GetCurrentTaskContext();
const utils::FastScopeGuard reset_background(
[&current]() noexcept { current.SetBackground(false); });
current.SetBackground(true);
impl::CommonSleepWaitStrategy wait_manager{};
current.Sleep(wait_manager, deadline);
}
Expand Down
84 changes: 84 additions & 0 deletions core/src/engine/task/task_benchmark.cpp
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
#include <benchmark/benchmark.h>

#include <atomic>
#include <cstdint>
#include <memory>
#include <string>
#include <thread>

#include <engine/impl/standalone.hpp>
#include <engine/task/task_processor.hpp>
#include <engine/task/task_processor_config.hpp>
#include <engine/task/work_stealing_queue/task_queue.hpp>
#include <userver/engine/async.hpp>
#include <userver/engine/run_standalone.hpp>
#include <userver/engine/sleep.hpp>
Expand Down Expand Up @@ -107,4 +113,82 @@ void thread_yield(benchmark::State& state) {
}
BENCHMARK(thread_yield)->RangeMultiplier(2)->ThreadRange(1, 32);

void engine_multiple_tasks_multiple_threads(benchmark::State& state) {
engine::RunStandalone(state.range(0), [&] {
std::atomic<std::uint64_t> tasks_count_total = 0;
RunParallelBenchmark(state, [&](auto& range) {
std::uint64_t tasks_count = 0;
for ([[maybe_unused]] auto _ : range) {
engine::AsyncNoSpan([] {}).Wait();
tasks_count++;
}
tasks_count_total += tasks_count;
benchmark::DoNotOptimize(tasks_count);
});
benchmark::DoNotOptimize(tasks_count_total);
});
}
BENCHMARK(engine_multiple_tasks_multiple_threads)
->RangeMultiplier(2)
->Range(1, 32)
->Arg(6)
->Arg(12);

void engine_multiple_yield_two_task_processor_no_extra_wakeups(
benchmark::State& state) {
engine::RunStandalone([&] {
std::vector<std::unique_ptr<engine::TaskProcessor>> processors;
for (int i = 0; i < 2; i++) {
engine::TaskProcessorConfig proc_config;
proc_config.name = std::to_string(i);
proc_config.thread_name = std::to_string(i);
proc_config.worker_threads = state.range(0);
processors.push_back(std::make_unique<engine::TaskProcessor>(
std::move(proc_config),
engine::current_task::GetTaskProcessor().GetTaskProcessorPools()));
}
auto tasks_count{state.range(0) / 2};
std::vector<int64_t> tasks_per_tp{tasks_count, tasks_count - 1};
std::vector<engine::TaskWithResult<std::uint64_t>> tasks;
std::atomic<bool> keep_running{true};
for (int i = 0; i < 2; i++) {
for ([[maybe_unused]] auto _ : tasks_per_tp) {
tasks.push_back(engine::AsyncNoSpan(*processors[i].get(), [&] {
std::uint64_t yields_performed = 0;
while (keep_running) {
engine::Yield();
++yields_performed;
}
return yields_performed;
}));
}
}

tasks.push_back(engine::AsyncNoSpan(*processors.back().get(), [&] {
std::uint64_t yields_performed = 0;
for ([[maybe_unused]] auto _ : state) {
engine::Yield();
++yields_performed;
}
keep_running = false;
return yields_performed;
}));

std::uint64_t yields_performed = 0;
for (auto& task : tasks) {
yields_performed += task.Get();
}
state.counters["yields"] =
benchmark::Counter(yields_performed, benchmark::Counter::kIsRate);
state.counters["yields/thread"] = benchmark::Counter(
static_cast<double>(yields_performed) / state.range(0),
benchmark::Counter::kIsRate);
});
}
BENCHMARK(engine_multiple_yield_two_task_processor_no_extra_wakeups)
->RangeMultiplier(2)
->Range(2, 32)
->Arg(6)
->Arg(12);

USERVER_NAMESPACE_END
4 changes: 4 additions & 0 deletions core/src/engine/task/task_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,10 @@ bool TaskContext::SetCancellable(bool value) {
return std::exchange(is_cancellable_, value);
}

void TaskContext::SetBackground(bool is_background) {
is_background_ = is_background;
}

TaskContext::WakeupSource TaskContext::Sleep(WaitStrategy& wait_strategy,
Deadline deadline) {
UASSERT(IsCurrent());
Expand Down
4 changes: 4 additions & 0 deletions core/src/engine/task/task_context.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,9 @@ class TaskContext final : public ContextAccessor {
return IsCancelRequested() && IsCancellable();
}

void SetBackground(bool);
bool IsBackground() const noexcept { return is_background_; };

// causes this to yield and wait for wakeup
// must only be called from this context
// "spurious wakeups" may be caused by wakeup queueing
Expand Down Expand Up @@ -218,6 +221,7 @@ class TaskContext final : public ContextAccessor {
TaskCounter::Token task_counter_token_;
const bool is_critical_;
bool is_cancellable_{true};
bool is_background_{false};
bool within_sleep_{false};
EhGlobals eh_globals_;

Expand Down
21 changes: 17 additions & 4 deletions core/src/engine/task/task_processor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <engine/task/counted_coroutine_ptr.hpp>
#include <engine/task/task_context.hpp>
#include <engine/task/task_processor_pools.hpp>
#include <engine/task/work_stealing_queue/task_queue.hpp>

USERVER_NAMESPACE_BEGIN

Expand Down Expand Up @@ -63,12 +64,23 @@ void TaskProcessorThreadStartedHook() {
EmitMagicNanosleep();
}

auto MakeTaskQueue(TaskProcessorConfig config) {
using ResultType = std::variant<TaskQueue, WorkStealingTaskQueue>;
switch (config.task_processor_queue) {
case TaskQueueType::kGlobalTaskQueue:
return ResultType{std::in_place_index<0>, std::move(config)};
case TaskQueueType::kWorkStealingTaskQueue:
return ResultType{std::in_place_index<1>, std::move(config)};
}
UINVARIANT(false, "Unexpected value of ... enum");
}

} // namespace

TaskProcessor::TaskProcessor(TaskProcessorConfig config,
std::shared_ptr<impl::TaskProcessorPools> pools)
: task_counter_(config.worker_threads),
task_queue_(config),
task_queue_(MakeTaskQueue(config)),
config_(std::move(config)),
pools_(std::move(pools)) {
utils::impl::FinishStaticRegistration();
Expand Down Expand Up @@ -105,7 +117,7 @@ void TaskProcessor::Cleanup() noexcept {
// Some tasks may be bound but not scheduled yet
task_counter_.WaitForExhaustion();

task_queue_.StopProcessing();
std::visit([](auto&& arg) { return arg.StopProcessing(); }, task_queue_);

for (auto& w : workers_) {
w.join();
Expand Down Expand Up @@ -136,7 +148,7 @@ void TaskProcessor::Schedule(impl::TaskContext* context) {

SetTaskQueueWaitTimepoint(context);

task_queue_.Push(context);
std::visit([&context](auto&& arg) { return arg.Push(context); }, task_queue_);
}

void TaskProcessor::Adopt(impl::TaskContext& context) {
Expand Down Expand Up @@ -249,7 +261,8 @@ void TaskProcessor::PrepareWorkerThread(std::size_t index) noexcept {

void TaskProcessor::ProcessTasks() noexcept {
while (true) {
auto context = task_queue_.PopBlocking();
auto context =
std::visit([](auto&& arg) { return arg.PopBlocking(); }, task_queue_);
if (!context) break;

GetTaskCounter().AccountTaskSwitchSlow();
Expand Down
8 changes: 6 additions & 2 deletions core/src/engine/task/task_processor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <engine/task/task_counter.hpp>
#include <engine/task/task_processor_config.hpp>
#include <engine/task/task_queue.hpp>
#include <engine/task/work_stealing_queue/task_queue.hpp>
#include <utils/statistics/thread_statistics.hpp>

#include <userver/engine/impl/detached_tasks_sync_block.hpp>
Expand Down Expand Up @@ -57,7 +58,10 @@ class TaskProcessor final {

const impl::TaskCounter& GetTaskCounter() const { return task_counter_; }

size_t GetTaskQueueSize() const { return task_queue_.GetSizeApproximate(); }
size_t GetTaskQueueSize() const {
return std::visit([](auto&& arg) { return arg.GetSizeApproximate(); },
task_queue_);
}

size_t GetWorkerCount() const { return workers_.size(); }

Expand Down Expand Up @@ -95,7 +99,7 @@ class TaskProcessor final {
detached_contexts_{impl::DetachedTasksSyncBlock::StopMode::kCancel};
concurrent::impl::InterferenceShield<std::atomic<bool>>
task_queue_wait_time_overloaded_{false};
TaskQueue task_queue_;
std::variant<TaskQueue, WorkStealingTaskQueue> task_queue_;

const TaskProcessorConfig config_;
const std::shared_ptr<impl::TaskProcessorPools> pools_;
Expand Down
14 changes: 14 additions & 0 deletions core/src/engine/task/task_processor_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,18 @@ OsScheduling Parse(const yaml_config::YamlConfig& value,
return utils::ParseFromValueString(value, kMap);
}

TaskQueueType Parse(const yaml_config::YamlConfig& value,
formats::parse::To<TaskQueueType>) {
static constexpr utils::TrivialBiMap kMap([](auto selector) {
return selector()
.Case(TaskQueueType::kGlobalTaskQueue, "global-task-queue")
.Case(TaskQueueType::kWorkStealingTaskQueue,
"work-stealing-task-queue");
});

return utils::ParseFromValueString(value, kMap);
}

TaskProcessorConfig Parse(const yaml_config::YamlConfig& value,
formats::parse::To<TaskProcessorConfig>) {
TaskProcessorConfig config;
Expand All @@ -56,6 +68,8 @@ TaskProcessorConfig Parse(const yaml_config::YamlConfig& value,
value["os-scheduling"].As<OsScheduling>(config.os_scheduling);
config.spinning_iterations =
value["spinning-iterations"].As<int>(config.spinning_iterations);
config.task_processor_queue = value["task-processor-queue"].As<TaskQueueType>(
config.task_processor_queue);

const auto task_trace = value["task-trace"];
if (!task_trace.IsMissing()) {
Expand Down
6 changes: 6 additions & 0 deletions core/src/engine/task/task_processor_config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,14 @@ enum class OsScheduling {
kIdle,
};

enum class TaskQueueType { kGlobalTaskQueue, kWorkStealingTaskQueue };

OsScheduling Parse(const yaml_config::YamlConfig& value,
formats::parse::To<OsScheduling>);

TaskQueueType Parse(const yaml_config::YamlConfig& value,
formats::parse::To<TaskQueueType>);

struct TaskProcessorConfig {
std::string name;

Expand All @@ -29,6 +34,7 @@ struct TaskProcessorConfig {
std::string thread_name;
OsScheduling os_scheduling{OsScheduling::kNormal};
int spinning_iterations{10000};
TaskQueueType task_processor_queue{TaskQueueType::kGlobalTaskQueue};

std::size_t task_trace_every{1000};
std::size_t task_trace_max_csw{0};
Expand Down
Loading
Loading