Skip to content

Commit

Permalink
use a dedicated thread for NCCL operations (#317) (#323)
Browse files Browse the repository at this point in the history
* wip

* NCCLThread

* use NCCLThread in LinearExecutor

* cleanup

* rm

* remove order_group
  • Loading branch information
lgarithm committed Sep 19, 2020
1 parent 80f3335 commit 2bc07d3
Show file tree
Hide file tree
Showing 15 changed files with 138 additions and 395 deletions.
7 changes: 0 additions & 7 deletions srcs/cpp/include/kungfu.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,6 @@
extern "C" {
#endif

typedef struct order_group_s order_group_t;

extern order_group_t *new_ranked_order_group(int n_names);
extern void del_order_group(order_group_t *);
extern void order_group_do_rank(order_group_t *, int rank, callback_t *task);
extern void order_group_wait(order_group_t *, int32_t *arrive_order);

extern void kungfu_run_main();

#ifdef __cplusplus
Expand Down
33 changes: 17 additions & 16 deletions srcs/cpp/include/kungfu/nccl/scheduler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,26 +8,23 @@

#include <kungfu.h>
#include <kungfu/nccl/common.hpp>
#include <kungfu/utils/channel.hpp>

namespace kungfu
{
// order_group wraps order_group_t
class order_group
class NCCLThread
{
order_group_t *og_;
std::map<std::string, int> ranks_;

public:
using Task = DoneCallback;

order_group(const std::vector<std::string> &names,
const std::vector<int32_t> &order);
using Task = std::function<void()>;
using TaskQueue = MpscChannel<Task>;

~order_group();
TaskQueue queue_;
std::unique_ptr<std::thread> thread_;

void Start(const std::string &name, const Task &task);

std::vector<int32_t> Wait();
public:
NCCLThread();
~NCCLThread();
void Put(std::function<void()> task);
void Do(std::function<void()> task);
};

class LinearExecutor
Expand All @@ -44,10 +41,11 @@ class LinearExecutor
std::vector<bool> is_started_;
std::vector<DoneCallback> tasks_;
std::unique_ptr<std::thread> executor_;
NCCLThread *nccl_thread_;

public:
LinearExecutor(const std::vector<std::string> &names,
const std::vector<int32_t> &order);
const std::vector<int32_t> &order, NCCLThread *nccl_thread);

~LinearExecutor();

Expand All @@ -65,7 +63,7 @@ class NCCLScheduler
int counter_;
std::vector<int32_t> order_;

// std::unique_ptr<order_group> order_group_;
std::unique_ptr<NCCLThread> nccl_thread_;
std::unique_ptr<LinearExecutor> executor_;

void ResetOrder(int n);
Expand All @@ -76,5 +74,8 @@ class NCCLScheduler
void Reset(const std::vector<std::string> &names);

void Start(const std::string &name, const DoneCallback &task);

// Run a task in the dedicated NCCL thread
void Do(std::function<void()> task);
};
} // namespace kungfu
33 changes: 33 additions & 0 deletions srcs/cpp/include/kungfu/utils/channel.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#pragma once
#include <condition_variable>
#include <mutex>
#include <queue>

namespace kungfu
{
template <typename T>
class MpscChannel // unbound multiple producer single consumer channel
{
std::mutex mu_;
std::queue<T> buffer_;

std::condition_variable cv_;

public:
T get()
{
std::unique_lock<std::mutex> lk(mu_);
cv_.wait(lk, [&]() { return buffer_.size() > 0; });
const T x = buffer_.front();
buffer_.pop();
return x;
}

void put(T x)
{
std::unique_lock<std::mutex> lk(mu_);
buffer_.push(x);
cv_.notify_one();
}
};
} // namespace kungfu
29 changes: 29 additions & 0 deletions srcs/cpp/include/kungfu/utils/waiter.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#pragma once
#include <condition_variable>
#include <mutex>

namespace kungfu
{
class Waiter
{
std::mutex mu;
std::condition_variable cv;
bool _done;

public:
Waiter() : _done(false) {}

void done()
{
std::lock_guard<std::mutex> lk(mu);
_done = true;
cv.notify_one();
}

void wait()
{
std::unique_lock<std::mutex> lk(mu);
cv.wait(lk, [this] { return this->_done; });
}
};
} // namespace kungfu
23 changes: 0 additions & 23 deletions srcs/cpp/src/kungfu.cpp
Original file line number Diff line number Diff line change
@@ -1,27 +1,4 @@
#include <kungfu.h>
#include <libkungfu-comm.h>

order_group_t *new_ranked_order_group(int n_names)
{
order_group_t *og = new order_group_t;
GoNewOrderGroup(GoInt(n_names), og);
return og;
}

void del_order_group(order_group_t *og)
{
GoFreeOrderGroup(og);
delete og;
}

void order_group_do_rank(order_group_t *og, int rank, callback_t *task)
{
GoOrderGroupDoRank(og, rank, task);
}

void order_group_wait(order_group_t *og, int32_t *arrive_order)
{
GoOrderGroupWait(og, arrive_order);
}

void kungfu_run_main() { GoKungfuRunMain(); }
5 changes: 1 addition & 4 deletions srcs/cpp/src/nccl/helper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,7 @@ NCCLController *NCCLHelper::EnsureController(const KungFu_NCCLScope scope)
{
std::lock_guard<std::mutex> _lk(mu_);
auto &ptr = controllers_[scope];
if (ptr.get() == nullptr) {
ptr.reset(new NCCLController(scope));
ptr->InitOnce();
}
if (ptr.get() == nullptr) { ptr.reset(new NCCLController(scope)); }
return ptr.get();
}

Expand Down
54 changes: 33 additions & 21 deletions srcs/cpp/src/nccl/scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,39 +2,44 @@

#include <kungfu/nccl/scheduler.hpp>
#include <kungfu/python/init.h> // FIXME: remove
#include <kungfu/utils/waiter.hpp>

namespace kungfu
{
order_group::order_group(const std::vector<std::string> &names,
const std::vector<int32_t> &order)
: og_(new_ranked_order_group(names.size()))
NCCLThread::NCCLThread()
{
const int n = names.size();
for (int i = 0; i < n; ++i) { ranks_[names[order[i]]] = i; }
thread_.reset(new std::thread([&] {
for (;;) {
auto t = queue_.get();
if (t == nullptr) { break; }
t();
}
}));
}

order_group::~order_group()
NCCLThread::~NCCLThread()
{
Wait();
del_order_group(og_);
queue_.put(nullptr);
thread_->join();
}

void order_group::Start(const std::string &name, const Task &task)
{
order_group_do_rank(og_, ranks_.at(name), new CallbackWrapper(task));
}
void NCCLThread::Put(std::function<void()> task) { queue_.put(task); }

std::vector<int32_t> order_group::Wait()
void NCCLThread::Do(std::function<void()> task)
{
std::vector<int32_t> arrive_order(ranks_.size());
order_group_wait(og_, arrive_order.data());
return arrive_order;
Waiter waiter;
queue_.put([=, waiter = &waiter] {
task();
waiter->done();
});
waiter.wait();
}

LinearExecutor::LinearExecutor(const std::vector<std::string> &names,
const std::vector<int32_t> &order)
const std::vector<int32_t> &order,
NCCLThread *nccl_thread)
: size_(names.size()), started_(0), arrive_order_(size_),
is_started_(size_), tasks_(size_)
is_started_(size_), tasks_(size_), nccl_thread_(nccl_thread)
{
for (int i = 0; i < size_; ++i) { ranks_[names[order[i]]] = i; }
std::fill(is_started_.begin(), is_started_.end(), false);
Expand All @@ -44,8 +49,9 @@ LinearExecutor::LinearExecutor(const std::vector<std::string> &names,
std::unique_lock<std::mutex> lk(mu_);
cv_.wait(lk, [&] { return is_started_[i]; });
}
tasks_[i]();
nccl_thread_->Put(tasks_[i]);
}
nccl_thread_->Do([] {}); // do an empty task to wait all previous tasks
}));
}

Expand Down Expand Up @@ -74,7 +80,8 @@ std::vector<int32_t> LinearExecutor::Wait()
NCCLScheduler::NCCLScheduler(const KungFu_NCCLScope scope,
const bool auto_order)
: name_("NCCLScheduler_" + std::to_string(int(scope))),
auto_order_(auto_order), scope_(scope), counter_(0)
auto_order_(auto_order), scope_(scope), counter_(0),
nccl_thread_(new NCCLThread)
{
}

Expand Down Expand Up @@ -108,12 +115,17 @@ void NCCLScheduler::Reset(const std::vector<std::string> &names)
}
}
}
executor_.reset(new LinearExecutor(names, order_));
executor_.reset(new LinearExecutor(names, order_, nccl_thread_.get()));
++counter_;
}

void NCCLScheduler::Start(const std::string &name, const DoneCallback &task)
{
executor_->Start(name, task);
}

void NCCLScheduler::Do(std::function<void()> task)
{
nccl_thread_->Do(std::move(task));
}
} // namespace kungfu
25 changes: 15 additions & 10 deletions srcs/cpp/src/tensorflow/ops/gpu/collective.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ class ScheduledNcclAllReduce : public AsyncOpKernel

void ComputeAsync(OpKernelContext *context, DoneCallback done) override
{
auto scheduler_ = _default_nccl_helper->EnsureScheduler(nccl_scope_);
auto nccl_ = _default_nccl_helper->EnsureController(nccl_scope_);
auto scheduler_ = _default_nccl_helper->EnsureScheduler(nccl_scope_);
auto controller_ = _default_nccl_helper->EnsureController(nccl_scope_);
const Tensor &input = context->input(0);
Tensor *output = nullptr;
OP_REQUIRES_OK_ASYNC(
Expand All @@ -66,7 +66,7 @@ class ScheduledNcclAllReduce : public AsyncOpKernel
auto ready_event = create_init_ready_event(context);
scheduler_->Start(op_name_, [=] {
wait_delete_ready_event(ready_event);
nccl_->AllReduce(w, KungFu_SUM, done);
controller_->AllReduce(w, KungFu_SUM, done);
});
}
};
Expand All @@ -86,13 +86,17 @@ class NcclAllReduce : public AsyncOpKernel
public:
void ComputeAsync(OpKernelContext *context, DoneCallback done) override
{
auto nccl_ = _default_nccl_helper->EnsureController(KungFu_NCCL_GLOBAL);
auto scheduler_ =
_default_nccl_helper->EnsureScheduler(KungFu_NCCL_GLOBAL);
auto controller_ =
_default_nccl_helper->EnsureController(KungFu_NCCL_GLOBAL);
scheduler_->Do([=] { controller_->InitOnce(); });
const Tensor &input = context->input(0);
Tensor *output = nullptr;
OP_REQUIRES_OK_ASYNC(
context, context->allocate_output(0, input.shape(), &output), done);
wait_delete_ready_event(create_init_ready_event(context));
nccl_->AllReduce(make_workspace(input, output), KungFu_SUM, done);
controller_->AllReduce(make_workspace(input, output), KungFu_SUM, done);
}
};

Expand Down Expand Up @@ -126,8 +130,8 @@ class ScheduledHierarchicalNcclAllReduce : public AsyncOpKernel

void ComputeAsync(OpKernelContext *context, DoneCallback done) override
{
auto scheduler_ = _default_nccl_helper->EnsureScheduler(nccl_scope_);
auto nccl_ = _default_nccl_helper->EnsureController(nccl_scope_);
auto scheduler_ = _default_nccl_helper->EnsureScheduler(nccl_scope_);
auto controller_ = _default_nccl_helper->EnsureController(nccl_scope_);
const Tensor &input = context->input(0);
Tensor *output = nullptr;
OP_REQUIRES_OK_ASYNC(
Expand All @@ -138,10 +142,11 @@ class ScheduledHierarchicalNcclAllReduce : public AsyncOpKernel
auto w_bcast = make_workspace(*output, output);
scheduler_->Start(reduce_op_, [=] {
wait_delete_ready_event(ready_event);
nccl_->Reduce(w_reduce, KungFu_SUM, [=] {
controller_->Reduce(w_reduce, KungFu_SUM, [=] {
CrossAllReduceGpu(w_all_reduce, KungFu_SUM, name(), [=] {
scheduler_->Start(bcast_op_,
[=] { nccl_->Broadcast(w_bcast, done); });
scheduler_->Start(bcast_op_, [=] {
controller_->Broadcast(w_bcast, done);
});
});
});
});
Expand Down
4 changes: 3 additions & 1 deletion srcs/cpp/src/tensorflow/ops/gpu/scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,16 @@ class StartNcclScheduler : public OpKernel

void Compute(OpKernelContext *context) override
{
auto scheduler_ = _default_nccl_helper->EnsureScheduler(nccl_scope_);
auto scheduler_ = _default_nccl_helper->EnsureScheduler(nccl_scope_);
auto controller_ = _default_nccl_helper->EnsureController(nccl_scope_);
const Tensor &input = context->input(0);
const auto t_names = input.vec<std::string>();
std::vector<std::string> names;
for (int i = 0; i < t_names.size(); ++i) {
names.push_back(t_names(i));
}
scheduler_->Reset(names);
scheduler_->Do([=] { controller_->InitOnce(); });
}
};

Expand Down
Loading

0 comments on commit 2bc07d3

Please sign in to comment.