diff --git a/src/iris_dispatcher.h b/src/iris_dispatcher.h index 3c0c5f5..5c59596 100644 --- a/src/iris_dispatcher.h +++ b/src/iris_dispatcher.h @@ -33,7 +33,6 @@ SOFTWARE. #include #include #include -#include #include #include @@ -290,9 +289,7 @@ namespace iris { // a warp cannot be destructed in its context IRIS_ASSERT(get_current_warp_internal() != this); - // execute remaining tasks on destruction - while (!join()) {} - + // must join manually before destruction! IRIS_ASSERT(storage.empty()); IRIS_ASSERT(!has_parallel_task()); } @@ -408,8 +405,8 @@ namespace iris { } // cleanup the dispatcher, pass true to 'execute_remaining' to make sure all tasks are executed finally. - template - static bool join(iterator_t begin, iterator_t end) { + template + static bool join(iterator_t begin, iterator_t end, waiter_t&& waiter) { IRIS_PROFILE_SCOPE(__FUNCTION__); if /* constexpr */ (!finalize) { @@ -427,7 +424,7 @@ namespace iris { while (true) { preempt_guard_t preempt_guard(*p, ~size_t(0)); if (!preempt_guard) { - std::this_thread::sleep_for(std::chrono::milliseconds(50)); + waiter(); } else { if /* constexpr */ (execute_remaining) { (*p).execute_parallel(); @@ -455,9 +452,9 @@ namespace iris { return empty; } - template - bool join() { - return join(this, this + 1); + template + bool join(waiter_t&& waiter) { + return join(this, this + 1, std::forward(waiter)); } // get current thread's warp binding instance @@ -1155,10 +1152,11 @@ namespace iris { // poll any task from thread poll manually with given priority in specified duration // usually used in your customized thread procedures - bool poll_delay(size_t priority, size_t millseconds) { + template + bool poll_delay(size_t priority, duration_t&& delay) { if (!poll(priority)) { std::unique_lock lock(mutex); - condition.wait_for(lock, std::chrono::milliseconds(millseconds)); + condition.wait_for(lock, std::forward(delay)); lock.unlock(); if (!poll(priority)) { diff --git a/test/iris_coroutine_demo.cpp b/test/iris_coroutine_demo.cpp index a056e50..50de512 100644 --- a/test/iris_coroutine_demo.cpp +++ b/test/iris_coroutine_demo.cpp @@ -150,7 +150,7 @@ static coroutine_t example_quota(quota_queue_t& q) { bool b = q.acquire(req); IRIS_ASSERT(b); q.get_async_worker().queue([&q, req]() mutable { - std::this_thread::sleep_for(std::chrono::milliseconds{ 10 }); + std::this_thread::sleep_for(std::chrono::milliseconds(10)); printf("Release quota holder!\n"); q.release(req); }); @@ -231,7 +231,7 @@ int main(void) { worker.join(); // finished! - while (!warp_t::join(warps.begin(), warps.end())) {} + while (!warp_t::join(warps.begin(), warps.end(), [] { std::this_thread::sleep_for(std::chrono::milliseconds(50)); })) {} return 0; } diff --git a/test/iris_dispatcher_demo.cpp b/test/iris_dispatcher_demo.cpp index 44d29e9..2c78c1c 100644 --- a/test/iris_dispatcher_demo.cpp +++ b/test/iris_dispatcher_demo.cpp @@ -53,7 +53,7 @@ void external_poll() { printf("[[ external thread running ... ]]\n"); while (!worker.is_terminated()) { - if (worker.poll_delay(0, 20)) { + if (worker.poll_delay(0, std::chrono::milliseconds(20))) { // there is no 0 priority task, assert it IRIS_ASSERT(false); } @@ -191,7 +191,7 @@ void simple_explosion(void) { printf("[[ external thread running ... ]]\n"); while (!worker.is_terminated()) { - if (worker.poll_delay(0, 20)) { + if (worker.poll_delay(0, std::chrono::milliseconds(20))) { printf("[[ external thread has polled a task ... ]]\n"); } } @@ -284,7 +284,7 @@ void simple_explosion(void) { worker.join(); // finished! - while (!warp_t::join(warps.begin(), warps.end())) {} + while (!warp_t::join(warps.begin(), warps.end(), [] { std::this_thread::sleep_for(std::chrono::milliseconds(50)); })) {} printf("after: \n"); for (size_t k = 0; k < warp_count; k++) { @@ -395,7 +395,7 @@ void garbage_collection() { worker.join(); // finished! - while (!warp_t::join(warps.begin(), warps.end())) {} + while (!warp_t::join(warps.begin(), warps.end(), [] { std::this_thread::sleep_for(std::chrono::milliseconds(50)); })) {} } } @@ -500,7 +500,7 @@ void graph_dispatch() { printf("sum of factors: %d\n", (int)sum_factors); // finished! - while (!warp_t::join(warps.begin(), warps.end())) {} + while (!warp_t::join(warps.begin(), warps.end(), [] { std::this_thread::sleep_for(std::chrono::milliseconds(50)); })) {} } void graph_dispatch_exception() { @@ -592,7 +592,7 @@ void acquire_release() { } worker.join(); - main_warp.join(); + main_warp.join([] { std::this_thread::sleep_for(std::chrono::milliseconds(50)); }); } void update_version() { diff --git a/test/iris_engine_demo.cpp b/test/iris_engine_demo.cpp index faaa216..34260f7 100644 --- a/test/iris_engine_demo.cpp +++ b/test/iris_engine_demo.cpp @@ -28,7 +28,8 @@ struct engine_t { worker.terminate(); worker.join(); - while (!warp_audio.join() || !warp_script.join() || !warp_network.join() || !warp_render.join()) { + auto waiter = [] { std::this_thread::sleep_for(std::chrono::milliseconds(50)); }; + while (!warp_audio.join(waiter) || !warp_script.join(waiter) || !warp_network.join(waiter) || !warp_render.join(waiter)) { printf("finalizing ...\n"); } } diff --git a/test/iris_lua_demo.cpp b/test/iris_lua_demo.cpp index 640d29a..a5fcbd1 100644 --- a/test/iris_lua_demo.cpp +++ b/test/iris_lua_demo.cpp @@ -495,8 +495,8 @@ end\n\ warp.yield(); worker.join(); - warp.join(); - warp2.join(); + warp.join([] { std::this_thread::sleep_for(std::chrono::milliseconds(50)); }); + warp2.join([] { std::this_thread::sleep_for(std::chrono::milliseconds(50)); }); preempt_guard.cleanup(); #endif lua_close(L);