Skip to content

Commit

Permalink
[MOD] Use multiple allocators for iris_async_worker_t.
Browse files Browse the repository at this point in the history
  • Loading branch information
paintdream committed May 29, 2024
1 parent aead07e commit 0826af0
Showing 1 changed file with 20 additions and 24 deletions.
44 changes: 20 additions & 24 deletions src/iris_dispatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,7 @@ namespace iris {
}
} else {
// wait for current warp finish
task_t* task = async_worker.make_task(suspend_t<typename std::remove_reference<callable_t>::type>(*this, std::forward<callable_t>(func)));
task_t* task = async_worker.new_task(suspend_t<typename std::remove_reference<callable_t>::type>(*this, std::forward<callable_t>(func)));

// avoid legacy compiler bugs
// see https://en.cppreference.com/w/cpp/atomic/atomic/compare_exchange
Expand Down Expand Up @@ -697,7 +697,7 @@ namespace iris {
// queue task from specified thread.
template <bool s, typename callable_t>
typename std::enable_if<s>::type push(callable_t&& func) {
task_t* task = async_worker.make_task(std::forward<callable_t>(func));
task_t* task = async_worker.new_task(std::forward<callable_t>(func));

// avoid legacy compiler bugs
// see https://en.cppreference.com/w/cpp/atomic/atomic/compare_exchange
Expand Down Expand Up @@ -995,15 +995,15 @@ namespace iris {

// here we code a trivial worker demo
// could be replaced by your implementation
template <typename thread_t = std::thread, typename callback_t = std::function<void()>, template <typename...> class allocator_t = iris_default_object_allocator_t>
template <typename thread_t = std::thread, typename callback_t = std::function<void()>, template <typename...> class allocator_t = iris_default_object_allocator_t, size_t default_task_duplicate_count = 4, size_t default_sub_allocator_count = 4>
struct iris_async_worker_t {
// task wrapper
struct alignas(64) task_t {
template <typename func_t>
task_t(func_t&& func, task_t* n) noexcept(noexcept(callback_t(std::forward<func_t>(func))))
: task(std::forward<func_t>(func)), next(n) {}
task_t(func_t&& func, task_t* n, allocator_t<task_t>& alloc) noexcept(noexcept(callback_t(std::forward<func_t>(func))))
: task(std::forward<func_t>(func)), next(n), allocator(alloc) {}

task_t(task_t&& rhs) noexcept : task(std::move(rhs.task)), next(rhs.next) {
task_t(task_t&& rhs) noexcept : task(std::move(rhs.task)), next(rhs.next), allocator(rhs.allocator) {
rhs.next = nullptr;
}

Expand All @@ -1019,9 +1019,12 @@ namespace iris {

callback_t task;
task_t* next;
allocator_t<task_t>& allocator;
};

static constexpr size_t task_head_duplicate_count = 4;
static constexpr size_t task_head_duplicate_count = default_task_duplicate_count;
static constexpr size_t sub_allocator_count = default_sub_allocator_count;

template <typename element_t>
using general_allocator_t = allocator_t<element_t>;
using task_allocator_t = allocator_t<task_t>;
Expand All @@ -1036,15 +1039,6 @@ namespace iris {
resize(thread_count);
}

explicit iris_async_worker_t(const task_allocator_t& alloc) : task_allocator(alloc), waiting_thread_count(0), limit_count(0), internal_thread_count(0) {
running_count.store(0, std::memory_order_relaxed);
terminated.store(1, std::memory_order_release);
}

iris_async_worker_t(size_t thread_count, const task_allocator_t& alloc) : iris_async_worker_t(alloc) {
resize(thread_count);
}

void resize(size_t thread_count) {
IRIS_ASSERT(task_heads.empty()); // must not started

Expand Down Expand Up @@ -1122,14 +1116,14 @@ namespace iris {

// guard for exceptions on polling
struct poll_guard_t {
poll_guard_t(task_allocator_t& alloc, task_t* t) noexcept : allocator(alloc), task(t) {}
poll_guard_t(task_t* t) noexcept : task(t) {}
~poll_guard_t() noexcept {
// do cleanup work
task_allocator_t& allocator = task->allocator;
task->~task_t();
allocator.deallocate(task, 1);
}

task_allocator_t& allocator;
task_t* task;
};

Expand Down Expand Up @@ -1200,16 +1194,18 @@ namespace iris {

// queue a task to worker with given priority [0, thread_count - 1], which 0 is the highest priority
template <typename callable_t>
task_t* make_task(callable_t&& func) {
task_t* task = task_allocator.allocate(1);
new (task) task_t(std::forward<callable_t>(func), nullptr);
task_t* new_task(callable_t&& func) {
task_allocator_t& current_allocator = task_allocators[get_current_thread_index() % sub_allocator_count];
task_t* task = current_allocator.allocate(1);
new (task) task_t(std::forward<callable_t>(func), nullptr, current_allocator);
task_count.fetch_add(1, std::memory_order_relaxed);

return task;
}

void execute_task(task_t* task) {
task_count.fetch_sub(1, std::memory_order_release);
poll_guard_t guard(task_allocator, task);
poll_guard_t guard(task);
task->task();
}

Expand Down Expand Up @@ -1273,7 +1269,7 @@ namespace iris {

template <typename callable_t>
void queue(callable_t&& callable, size_t priority = 0) {
queue_task(make_task(std::forward<callable_t>(callable)), priority);
queue_task(new_task(std::forward<callable_t>(callable)), priority);
}

// mark as terminated
Expand Down Expand Up @@ -1452,7 +1448,7 @@ namespace iris {
}

protected:
task_allocator_t task_allocator; // default task allocator
task_allocator_t task_allocators[sub_allocator_count]; // default task allocator
std::vector<thread_t> threads; // worker
std::atomic<size_t> running_count; // running_count
std::atomic<size_t> task_count; // the count of total waiting tasks
Expand Down

0 comments on commit 0826af0

Please sign in to comment.