Skip to content

Commit

Permalink
[MOD] Replace std::this_thread::sleep with customized waiter.
Browse files Browse the repository at this point in the history
  • Loading branch information
paintdream committed Jun 13, 2024
1 parent 0ec9626 commit 4b081b8
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 23 deletions.
22 changes: 10 additions & 12 deletions src/iris_dispatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ SOFTWARE.
#include <functional>
#include <vector>
#include <mutex>
#include <chrono>
#include <thread>
#include <condition_variable>

Expand Down Expand Up @@ -290,9 +289,7 @@ namespace iris {
// a warp cannot be destructed in its context
IRIS_ASSERT(get_current_warp_internal() != this);

// execute remaining tasks on destruction
while (!join<true, true>()) {}

// must join manually before destruction!
IRIS_ASSERT(storage.empty());
IRIS_ASSERT(!has_parallel_task());
}
Expand Down Expand Up @@ -408,8 +405,8 @@ namespace iris {
}

// cleanup the dispatcher, pass true to 'execute_remaining' to make sure all tasks are executed finally.
template <bool execute_remaining = true, bool finalize = false, typename iterator_t = iris_warp_t*>
static bool join(iterator_t begin, iterator_t end) {
template <bool execute_remaining = true, bool finalize = false, typename iterator_t = iris_warp_t*, typename waiter_t>
static bool join(iterator_t begin, iterator_t end, waiter_t&& waiter) {
IRIS_PROFILE_SCOPE(__FUNCTION__);

if /* constexpr */ (!finalize) {
Expand All @@ -427,7 +424,7 @@ namespace iris {
while (true) {
preempt_guard_t preempt_guard(*p, ~size_t(0));
if (!preempt_guard) {
std::this_thread::sleep_for(std::chrono::milliseconds(50));
waiter();
} else {
if /* constexpr */ (execute_remaining) {
(*p).execute_parallel();
Expand Down Expand Up @@ -455,9 +452,9 @@ namespace iris {
return empty;
}

template <bool execute_remaining = true, bool finalize = false>
bool join() {
return join<execute_remaining, finalize>(this, this + 1);
template <bool execute_remaining = true, bool finalize = false, typename waiter_t>
bool join(waiter_t&& waiter) {
return join<execute_remaining, finalize>(this, this + 1, std::forward<waiter_t>(waiter));
}

// get current thread's warp binding instance
Expand Down Expand Up @@ -1155,10 +1152,11 @@ namespace iris {

// poll any task from thread poll manually with given priority in specified duration
// usually used in your customized thread procedures
bool poll_delay(size_t priority, size_t millseconds) {
template <typename duration_t>
bool poll_delay(size_t priority, duration_t&& delay) {
if (!poll(priority)) {
std::unique_lock<std::mutex> lock(mutex);
condition.wait_for(lock, std::chrono::milliseconds(millseconds));
condition.wait_for(lock, std::forward<duration_t>(delay));
lock.unlock();

if (!poll(priority)) {
Expand Down
4 changes: 2 additions & 2 deletions test/iris_coroutine_demo.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ static coroutine_t example_quota(quota_queue_t& q) {
bool b = q.acquire(req);
IRIS_ASSERT(b);
q.get_async_worker().queue([&q, req]() mutable {
std::this_thread::sleep_for(std::chrono::milliseconds{ 10 });
std::this_thread::sleep_for(std::chrono::milliseconds(10));
printf("Release quota holder!\n");
q.release(req);
});
Expand Down Expand Up @@ -231,7 +231,7 @@ int main(void) {
worker.join();

// finished!
while (!warp_t::join(warps.begin(), warps.end())) {}
while (!warp_t::join(warps.begin(), warps.end(), [] { std::this_thread::sleep_for(std::chrono::milliseconds(50)); })) {}
return 0;
}

12 changes: 6 additions & 6 deletions test/iris_dispatcher_demo.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ void external_poll() {
printf("[[ external thread running ... ]]\n");

while (!worker.is_terminated()) {
if (worker.poll_delay(0, 20)) {
if (worker.poll_delay(0, std::chrono::milliseconds(20))) {
// there is no 0 priority task, assert it
IRIS_ASSERT(false);
}
Expand Down Expand Up @@ -191,7 +191,7 @@ void simple_explosion(void) {
printf("[[ external thread running ... ]]\n");

while (!worker.is_terminated()) {
if (worker.poll_delay(0, 20)) {
if (worker.poll_delay(0, std::chrono::milliseconds(20))) {
printf("[[ external thread has polled a task ... ]]\n");
}
}
Expand Down Expand Up @@ -284,7 +284,7 @@ void simple_explosion(void) {
worker.join();

// finished!
while (!warp_t::join(warps.begin(), warps.end())) {}
while (!warp_t::join(warps.begin(), warps.end(), [] { std::this_thread::sleep_for(std::chrono::milliseconds(50)); })) {}

printf("after: \n");
for (size_t k = 0; k < warp_count; k++) {
Expand Down Expand Up @@ -395,7 +395,7 @@ void garbage_collection() {
worker.join();

// finished!
while (!warp_t::join(warps.begin(), warps.end())) {}
while (!warp_t::join(warps.begin(), warps.end(), [] { std::this_thread::sleep_for(std::chrono::milliseconds(50)); })) {}
}
}

Expand Down Expand Up @@ -500,7 +500,7 @@ void graph_dispatch() {
printf("sum of factors: %d\n", (int)sum_factors);

// finished!
while (!warp_t::join(warps.begin(), warps.end())) {}
while (!warp_t::join(warps.begin(), warps.end(), [] { std::this_thread::sleep_for(std::chrono::milliseconds(50)); })) {}
}

void graph_dispatch_exception() {
Expand Down Expand Up @@ -592,7 +592,7 @@ void acquire_release() {
}

worker.join();
main_warp.join();
main_warp.join([] { std::this_thread::sleep_for(std::chrono::milliseconds(50)); });
}

void update_version() {
Expand Down
3 changes: 2 additions & 1 deletion test/iris_engine_demo.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ struct engine_t {
worker.terminate();
worker.join();

while (!warp_audio.join() || !warp_script.join() || !warp_network.join() || !warp_render.join()) {
auto waiter = [] { std::this_thread::sleep_for(std::chrono::milliseconds(50)); };
while (!warp_audio.join(waiter) || !warp_script.join(waiter) || !warp_network.join(waiter) || !warp_render.join(waiter)) {
printf("finalizing ...\n");
}
}
Expand Down
4 changes: 2 additions & 2 deletions test/iris_lua_demo.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -495,8 +495,8 @@ end\n\

warp.yield();
worker.join();
warp.join();
warp2.join();
warp.join([] { std::this_thread::sleep_for(std::chrono::milliseconds(50)); });
warp2.join([] { std::this_thread::sleep_for(std::chrono::milliseconds(50)); });
preempt_guard.cleanup();
#endif
lua_close(L);
Expand Down

0 comments on commit 4b081b8

Please sign in to comment.