Skip to content

Commit

Permalink
[async] Add parallel task executor (#805)
Browse files Browse the repository at this point in the history
  • Loading branch information
yuanming-hu committed Apr 17, 2020
1 parent 73fe7c6 commit 74d8267
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 2 deletions.
3 changes: 1 addition & 2 deletions taichi/program/async_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "taichi/program/kernel.h"
#include "taichi/program/program.h"
#include "taichi/codegen/codegen_cpu.h"
#include "taichi/common/testing.h"

TLANG_NAMESPACE_BEGIN

Expand Down Expand Up @@ -33,8 +34,6 @@ void ExecutionQueue::synchronize() {
while (!task_queue.empty()) {
auto ker = task_queue.front();
std::string serialized;
irpass::re_id(ker.stmt);
irpass::print(ker.stmt);
auto h = hash(ker.stmt);
if (compiled_func.find(h) == compiled_func.end()) {
compiled_func[h] = CodeGenCPU(ker.kernel, ker.stmt).codegen();
Expand Down
88 changes: 88 additions & 0 deletions taichi/program/async_engine.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include <deque>
#include <thread>
#include <mutex>

#define TI_RUNTIME_HOST
#include "taichi/ir/ir.h"
Expand All @@ -10,6 +11,93 @@ TLANG_NAMESPACE_BEGIN

// TODO(yuanming-hu): split into multiple files

// TODO: use semaphores instead of Time::sleep
class ParallelExecutor {
public:
using TaskType = std::function<void()>;

enum class ExecutorStatus {
uninitialized,
initialized,
finalized,
};

explicit ParallelExecutor(int num_threads)
: num_threads(num_threads), status(ExecutorStatus::uninitialized) {
auto _ = std::lock_guard<std::mutex>(mut);

for (int i = 0; i < num_threads; i++) {
threads.emplace_back([this]() { this->task(); });
}

status = ExecutorStatus::initialized;
}

void enqueue(const TaskType &func) {
std::lock_guard<std::mutex> _(mut);
task_queue.push_back(func);
}

void flush() {
while (true) {
std::unique_lock<std::mutex> lock(mut);
if (task_queue.empty()) {
break;
} else {
lock.unlock();
Time::sleep(1e-6);
}
}
}

~ParallelExecutor() {
flush();
{
auto _ = std::lock_guard<std::mutex>(mut);
status = ExecutorStatus::finalized;
}
for (auto &th : threads) {
th.join();
}
}

int get_num_threads() {
return num_threads;
}

private:
void task() {
TI_DEBUG("Starting worker thread.");
while (true) {
std::unique_lock<std::mutex> lock(mut);
if (status == ExecutorStatus::uninitialized) {
lock.unlock();
Time::sleep(1e-6);
continue; // wait until initialized
}
if (status == ExecutorStatus::finalized) {
break; // finalized, exit
}
// initialized and not finalized. Do work.
if (!task_queue.empty()) {
auto task = task_queue.front();
task_queue.pop_front();
lock.unlock();
// Run the task
task();
}
}
TI_DEBUG("Exiting worker thread.");
}

int num_threads;
std::mutex mut;
ExecutorStatus status;

std::vector<std::thread> threads;
std::deque<TaskType> task_queue;
};

class KernelLaunchRecord {
public:
Context context;
Expand Down
25 changes: 25 additions & 0 deletions tests/cpp/test_parallel_executor.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#include "taichi/common/testing.h"
#include "taichi/program/async_engine.h"

TLANG_NAMESPACE_BEGIN

TI_TEST("parallel_executor") {
SECTION("create_and_destruct") {
ParallelExecutor exec(10);
}
SECTION("parallel_print") {
int N = 100;
std::vector<int> buffer(N, 0);
{
ParallelExecutor exec(10);
for (int i = 0; i < N; i++) {
exec.enqueue([i = i, &buffer]() { buffer[i] = i + 1; });
}
}
for (int i = 0; i < N; i++) {
CHECK(buffer[i] == i + 1);
}
}
}

TLANG_NAMESPACE_END

0 comments on commit 74d8267

Please sign in to comment.