Skip to content

Commit

Permalink
fix accuracy bug when enabling multi-stream. (#191)
Browse files Browse the repository at this point in the history
* fix thread pool of multi-stream

* fix race of memory

* update way of initailizing thread pool
  • Loading branch information
zhenwei-intel committed Aug 24, 2022
1 parent 59232b9 commit f9a2f6a
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,8 @@ class MemoryAllocator {
}

static void* GetMemory(size_t size, const int life_count) {
static std::mutex getmem_lock;
std::lock_guard<std::mutex> lock(getmem_lock);
if (size == 0) {
LOG(INFO) << "please set the tensor size...";
return nullptr;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <algorithm>
#include <string>
#include <vector>
#include <mutex>

#include "common.hpp"
#include "operator_registry.hpp"
Expand Down Expand Up @@ -60,6 +61,8 @@ class Operator {
const vector<Tensor*>& output) = 0;

inline void unref_tensors(const vector<Tensor*>& input) {
static std::mutex unref_lock;
std::lock_guard<std::mutex> lock(unref_lock);
for (size_t i = 0; i < input.size(); ++i) {
auto status = input[i]->unref_data();
// (TODO) maybe check the tensors
Expand Down
24 changes: 12 additions & 12 deletions nlp_toolkit/backends/neural_engine/executor/include/thread_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class ThreadPool {
std::queue<Task> tasks;
// sync of multi stream
std::mutex tasks_lock;
std::condition_variable task_cond_var;
std::condition_variable task_cond_var, cond_finish_;
std::atomic<unsigned int> idle_thread_num;
std::atomic<unsigned int> work_thread_num;
// is thread pool stoped
Expand All @@ -54,7 +54,7 @@ class ThreadPool {
unsigned int index = pool.size();
stoped.emplace_back(false);
idle_thread_num++;
pool.emplace_back([this, index] {
pool.emplace_back(std::thread([this, index] {
while (true) {
// capature the task
std::function<void()> task;
Expand All @@ -67,28 +67,33 @@ class ThreadPool {
idle_thread_num--;
return;
}
idle_thread_num--; work_thread_num++;
task = std::move(this->tasks.front());
this->tasks.pop();
}

{
idle_thread_num--, work_thread_num++;
task(); // run the task
idle_thread_num++, work_thread_num--;
std::unique_lock<std::mutex> lock(this->tasks_lock);
idle_thread_num++; work_thread_num--;
cond_finish_.notify_one();
}
}
});
}));
}

public:
inline ThreadPool() {
work_thread_num = 0;
idle_thread_num = 0;
pool_stoped = false;
}
// wait for all threads to finish and stop all threads
inline ~ThreadPool() {
std::unique_lock<std::mutex> lock(tasks_lock);
for (auto item : stoped) item = true;
task_cond_var.notify_all(); // wake up all thread to run
lock.unlock();
for (auto& th : pool) {
if (th.joinable()) th.join();
}
Expand Down Expand Up @@ -161,13 +166,8 @@ class ThreadPool {
bool hasStopedPool() { return pool_stoped; }
// wait for all tasks to be completed
void waitAllTaskRunOver() {
while (true) {
if (work_thread_num == 0) {
return;
} else {
std::this_thread::yield();
}
}
std::unique_lock<std::mutex> lock(tasks_lock);
cond_finish_.wait(lock, [this]{ return tasks.empty() && (work_thread_num == 0); });
}
};

Expand Down
14 changes: 10 additions & 4 deletions nlp_toolkit/backends/neural_engine/executor/src/model.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,16 @@ void Model::Init(const ModelConfig& conf) {
multi_stream_tasks_.insert({i, StringToNum<int64_t>(it->second)});
}
}
auto max_tasks = std::max_element(multi_stream_tasks_.begin(), multi_stream_tasks_.end(),
[] (const std::pair<int, int64_t>& a, const std::pair<int, int64_t>& b)
->bool{ return a.second < b.second; } );
int tp_max_threads = max_tasks->second + (max_tasks->second & 1);
int total_available_threads = omp_get_num_procs();
tp_max_threads = tp_max_threads > total_available_threads ?
total_available_threads : tp_max_threads;
tp.begin(tp_max_threads);
LOG(INFO) << "Thread pool is initialized with " << tp_max_threads << " threads. (" <<
"Total avaiable threads: " << total_available_threads << ")";
}

engine_profiling_ = (getenv("ENGINE_PROFILING") != NULL); // profiling env
Expand Down Expand Up @@ -280,15 +290,13 @@ vector<Tensor>& Model::Forward(vector<Tensor>& input_data) {
for (int i = 0; i < operators_.size(); ++i) {
LOG(INFO) << "operator " << operators_[i]->name() << " gonna forward with type " << operators_[i]->type();
if (multi_stream_flag && multi_stream_tasks_.find(i) != multi_stream_tasks_.end()) {
tp.resize(thread_count);
float start = Time("start");
tp.commitTask(std::bind(&executor::Dispatcher::Forward, operators_[i], input_vecs_[i], output_vecs_[i]));
float end = Time("end");
operators_[i]->set_latency(end - start);
LOG(INFO) << "operator: " << operators_[i]->name() << ", latency: " << end - start << " ms";
if (thread_count >= multi_stream_tasks_[i]) {
tp.waitAllTaskRunOver();
tp.close();
thread_count = 0;
}
thread_count++;
Expand All @@ -304,11 +312,9 @@ vector<Tensor>& Model::Forward(vector<Tensor>& input_data) {
for (int i = 0; i < operators_.size(); ++i) {
LOG(INFO) << "operator " << operators_[i]->name() << " gonna forward with type " << operators_[i]->type();
if (multi_stream_flag && multi_stream_tasks_.find(i) != multi_stream_tasks_.end()) {
tp.resize(thread_count);
tp.commitTask(std::bind(&executor::Dispatcher::Forward, operators_[i], input_vecs_[i], output_vecs_[i]));
if (thread_count >= multi_stream_tasks_[i]) {
tp.waitAllTaskRunOver();
tp.close();
thread_count = 0;
}
thread_count++;
Expand Down

0 comments on commit f9a2f6a

Please sign in to comment.