Skip to content

Commit

Permalink
[FIX] Fix low-priority task missing for external polling thread.
Browse files Browse the repository at this point in the history
  • Loading branch information
paintdream committed Jun 9, 2023
1 parent 6f684de commit f0efdc8
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 16 deletions.
28 changes: 15 additions & 13 deletions src/iris_dispatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ namespace iris {
template <typename callable_t>
void queue_routine_external(callable_t&& func) {
assert(async_worker.get_current_thread_index() == ~size_t(0));
async_worker.queue(external_t<typename std::remove_reference<callable_t>::type>(*this, std::forward<callable_t>(func)));
async_worker.queue(external_t<typename std::remove_reference<callable_t>::type>(*this, std::forward<callable_t>(func)), priority);
}

// queue task parallelly to async_worker, blocking the execution of current warp at the same time
Expand All @@ -410,7 +410,7 @@ namespace iris {
suspend();

suspend_guard_t guard(this);
async_worker.queue(suspend_t<typename std::remove_reference<callable_t>::type>(*this, std::forward<callable_t>(func)));
async_worker.queue(suspend_t<typename std::remove_reference<callable_t>::type>(*this, std::forward<callable_t>(func)), priority);
guard.cleanup();
}

Expand Down Expand Up @@ -998,6 +998,9 @@ namespace iris {

if (fetch(threads.size()).first == ~size_t(0)) {
condition.wait_for(lock, std::chrono::milliseconds(millseconds));
lock.unlock();

wakeup_one_with_priority(0);
}
}
}
Expand Down Expand Up @@ -1064,7 +1067,7 @@ namespace iris {
void queue(callable_t&& func, size_t priority = 0) {
if (!is_terminated()) {
assert(!threads.empty());
priority = std::min(priority, threads.size() - 1);
priority = std::min(priority, internal_thread_count - 1);
task_t* task = task_allocator.allocate(1);
new (task) task_t(std::forward<callable_t>(func), nullptr);
task_count.fetch_add(1, std::memory_order_relaxed);
Expand All @@ -1078,10 +1081,7 @@ namespace iris {
task_t* expected = nullptr;
if (task_head.compare_exchange_strong(expected, task, std::memory_order_release)) {
// dispatch immediately
if (waiting_thread_count > priority + limit_count) {
wakeup_one();
}

wakeup_one_with_priority(priority);
return;
} else {
ptrdiff_t diff = task - expected;
Expand All @@ -1103,9 +1103,7 @@ namespace iris {
} while (!task_head.compare_exchange_weak(node, task, std::memory_order_acq_rel, std::memory_order_relaxed));

// dispatch immediately
if (waiting_thread_count > priority + limit_count) {
wakeup_one();
}
wakeup_one_with_priority(priority);
} else {
// terminate requested, chain to default task_head at 0
if (!task_heads.empty()) {
Expand Down Expand Up @@ -1195,6 +1193,12 @@ namespace iris {
}

protected:
void wakeup_one_with_priority(size_t priority) {
if (waiting_thread_count > priority + limit_count) {
wakeup_one();
}
}

// cleanup all pending tasks
bool cleanup() {
IRIS_PROFILE_SCOPE(__FUNCTION__);
Expand Down Expand Up @@ -1263,9 +1267,7 @@ namespace iris {
} while (org != nullptr);

std::atomic_thread_fence(std::memory_order_acq_rel);
if (waiting_thread_count > priority + limit_count) {
wakeup_one();
}
wakeup_one_with_priority(priority);
}

task_count.fetch_sub(1, std::memory_order_release);
Expand Down
62 changes: 59 additions & 3 deletions test/iris_dispatcher_demo.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <future>
using namespace iris;

static void external_poll();
static void stack_op();
static void not_pow_two();
static void framed_data();
Expand All @@ -13,6 +14,7 @@ static void graph_dispatch();
static void graph_dispatch_exception();

int main(void) {
external_poll();
stack_op();
not_pow_two();
framed_data();
Expand All @@ -24,6 +26,63 @@ int main(void) {
return 0;
}

template <typename element_t>
using worker_allocator_t = iris_object_allocator_t<element_t>;

void external_poll() {
static constexpr size_t thread_count = 2;
static constexpr size_t warp_count = 8;

using worker_t = iris_async_worker_t<std::thread, std::function<void()>, worker_allocator_t>;
using warp_t = iris_warp_t<worker_t>;

worker_t worker(thread_count);

std::promise<bool> started;

std::future<bool> future = started.get_future();
size_t i = worker.get_thread_count();
worker.append([&future, &started, &worker, i]() mutable {
// copied from iris_async_worker_t<>::start() thread routine
try {
future.get();

worker_t::get_current() = &worker;
worker_t::get_current_thread_index_internal() = i;
printf("[[ external thread running ... ]]\n");

while (!worker.is_terminated()) {
if (!worker.poll(0)) {
worker.delay(20);
} else {
// there is no 0 priority task, assert it
assert(false);
}
}

printf("[[ external thread exited ... ]]\n");
} catch (std::bad_alloc&) {
throw; // by default, terminate
} catch (std::exception&) {
throw;
}
});

worker.start();
started.set_value(true);

std::vector<warp_t> warps;
warps.reserve(warp_count);
for (size_t w = 0; w < warp_count; w++) {
warps.emplace_back(worker, 1);
}

warps[0].queue_routine_external([&worker]() {
worker.terminate();
});
worker.join();
}

void stack_op() {
static constexpr size_t thread_count = 4;
static constexpr size_t warp_count = 8;
Expand Down Expand Up @@ -110,9 +169,6 @@ void framed_data() {
}
}

template <typename element_t>
using worker_allocator_t = iris_object_allocator_t<element_t>;

void simple_explosion(void) {
static constexpr size_t thread_count = 4;
static constexpr size_t warp_count = 8;
Expand Down

0 comments on commit f0efdc8

Please sign in to comment.