Skip to content

Commit

Permalink
Add multi-GPU support to FutureNCCL (#48500)
Browse files Browse the repository at this point in the history
Summary:
Pull Request resolved: #48500

This commit is part of a stack that reworks FutureNCCL in order to extract a generic CUDA-aware Future subclass. The stack deliberately breaks up this transition into elementary changes, to make it easier to verify that the behavior is preserved (or to highlight how it gets changed).

 ---

After the previous changes, this is now much simpler than it sounds. For the most part it just consists in repeating some operations multiple times, once for device (e.g., recording and blocking on events). Funnily, we already had a vector of events, even though we only ever stored one element in it (this probably comes from the fact that this is shared with WorkNCCL, which can hold more than one event). Here, we now also store a vector of device indices.

Perhaps the only non-trivial part of this is that now, for "follow-up" Futures (for callbacks), we can't know in advance which device the result will be on so we must determine it dynamically when we receive the result, by inspecting it. That's also easier than it sound because we already have a dataptr extractor.
ghstack-source-id: 118180022

Test Plan: Unit tests (I should probably add new ones)

Reviewed By: mrshenli

Differential Revision: D25177556

fbshipit-source-id: 41ef39ec0dc458e341aa1564f2b9f2b573d7fa9f
  • Loading branch information
lw authored and facebook-github-bot committed Dec 10, 2020
1 parent 91ad3ed commit e294c2d
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 41 deletions.
14 changes: 7 additions & 7 deletions torch/lib/c10d/ProcessGroupNCCL.cpp
Expand Up @@ -1008,15 +1008,15 @@ std::vector<at::Tensor> ProcessGroupNCCL::WorkNCCL::result() {

c10::intrusive_ptr<c10::ivalue::Future> ProcessGroupNCCL::WorkNCCL::
getFuture() {
TORCH_INTERNAL_ASSERT(
outputs_->size() == 1,
"WorkNCCL's getFuture API is only supported for single-process single-device mode.");
auto deviceIndex = (*outputs_)[0].device().index();
// Create a new FutureNCCL object after checking for single-process
// single-device mode.
std::vector<c10::DeviceIndex> deviceIndices;
for (const c10::Device& device : devices_) {
TORCH_INTERNAL_ASSERT(device.is_cuda());
deviceIndices.push_back(device.index());
}

return c10::make_intrusive<FutureNCCL>(
at::IValue(*outputs_),
deviceIndex,
std::move(deviceIndices),
cudaEvents_);
}

Expand Down
116 changes: 82 additions & 34 deletions torch/lib/c10d/ProcessGroupNCCL.hpp
Expand Up @@ -13,6 +13,7 @@

#include <ATen/cuda/CUDAContext.h>
#include <ATen/cuda/CUDAEvent.h>
#include <ATen/cuda/CUDAMultiStreamGuard.h>
#include <c10/core/Stream.h>
#include <c10/core/StreamGuard.h>
#include <c10/cuda/CUDACachingAllocator.h>
Expand Down Expand Up @@ -196,10 +197,8 @@ class ProcessGroupNCCL : public ProcessGroup {
// or NCCL's barrier().
//
// If created by WorkNCCL's getFuture API, FutureNCCL has a reference to
// WorkNCCL's cudaEvents, NCCL collective's outputs, and the device index of
// outputs' device. Its value is NCCL collective's
// outputs. FutureNCCL only supports single-process single-device mode where
// the size of outputs is equal to 1.
// WorkNCCL's cudaEvents, NCCL collective's outputs, and the device indices of
// outputs' devices. Its value is NCCL collective's outputs.
//
// If created by FutureNCCL's then callback, its value becomes the value of
// callback() and its cudaEvents will record the NCCL stream that runs that
Expand All @@ -212,28 +211,46 @@ class ProcessGroupNCCL : public ProcessGroup {
public:
explicit FutureNCCL(
at::IValue value,
c10::DeviceIndex deviceIndex,
std::vector<c10::DeviceIndex> deviceIndices,
std::shared_ptr<std::vector<at::cuda::CUDAEvent>> cudaEvents)
: at::ivalue::Future(c10::ListType::create(c10::TensorType::get())),
value_(std::move(value)),
deviceIndex_(deviceIndex),
deviceIndices_(std::move(deviceIndices)),
cudaEvents_(std::move(cudaEvents)) {
// Check that the device indices are distinct
std::unordered_set<c10::DeviceIndex> uniqueDeviceIndices;
for (const auto& deviceIndex : deviceIndices_) {
uniqueDeviceIndices.insert(deviceIndex);
}
TORCH_INTERNAL_ASSERT(
deviceIndices_.size() == uniqueDeviceIndices.size(),
"Got ", deviceIndices_.size(), " devices, but only ",
uniqueDeviceIndices.size(), " distinct ones");
TORCH_INTERNAL_ASSERT(
cudaEvents_->size() == 1,
"FutureNCCL only supports single-process single-device mode.");
cudaEvents_->size() == deviceIndices_.size(),
"The device indices and the events must be paired up. Got ",
deviceIndices_.size(), " devices and ", cudaEvents_->size(),
" events.");
for (const at::cuda::CUDAEvent& event : *cudaEvents_) {
TORCH_INTERNAL_ASSERT(event.isCreated());
TORCH_INTERNAL_ASSERT(event.device_index() == deviceIndex_);
TORCH_INTERNAL_ASSERT(
std::find(
deviceIndices_.begin(),
deviceIndices_.end(),
event.device_index()) != deviceIndices_.end());
}
for (const at::DataPtr& data_ptr : extractDataPtrs(value_)) {
TORCH_INTERNAL_ASSERT(data_ptr.device().index() == deviceIndex_);
TORCH_INTERNAL_ASSERT(
std::find(
deviceIndices_.begin(),
deviceIndices_.end(),
data_ptr.device().index()) != deviceIndices_.end());
}
}

private:
explicit FutureNCCL(c10::DeviceIndex deviceIndex)
: at::ivalue::Future(c10::ListType::create(c10::TensorType::get())),
deviceIndex_(deviceIndex) {}
FutureNCCL()
: at::ivalue::Future(c10::ListType::create(c10::TensorType::get())) {}
// We need this because it will be the ::make() static method that actually
// creates the instance. This is a brittle approach and the passkey idiom
// would be a more robust solution. However, this will go away in #48505.
Expand All @@ -248,11 +265,17 @@ class ProcessGroupNCCL : public ProcessGroup {
if (error_) {
throw *error_;
}
auto stream = at::cuda::getCurrentCUDAStream(deviceIndex_);
(*cudaEvents_)[0].block(stream);

for (int i = 0; i < deviceIndices_.size(); i++) {
(*cudaEvents_)[i].block(
at::cuda::getCurrentCUDAStream(deviceIndices_[i]));
}

for (const at::DataPtr& data_ptr : extractDataPtrs(value_)) {
c10::cuda::CUDACachingAllocator::recordStream(data_ptr, stream);
if (data_ptr.device().is_cuda()) {
c10::cuda::CUDACachingAllocator::recordStream(
data_ptr, at::cuda::getCurrentCUDAStream(data_ptr.device().index()));
}
}
}

Expand All @@ -265,18 +288,25 @@ class ProcessGroupNCCL : public ProcessGroup {
"Attempting to set value of a FutureNCCL which has a value."
"FutureNCCL's value was internally set to NCCL collective's "
"outputs or the return value of the callback.");
for (const at::DataPtr& data_ptr : extractDataPtrs(value)) {
TORCH_INTERNAL_ASSERT(data_ptr.device().index() == deviceIndex_);
}
value_ = std::move(value);

TORCH_INTERNAL_ASSERT(cudaEvents_ == nullptr);
// Create a new cudaEvents object of size 1 that will record the current
// stream after callback and will be passed to the new FutureNCCL.
cudaEvents_ = std::make_shared<std::vector<at::cuda::CUDAEvent>>(1);
// In case of chained then callback calls, cudaEvents
// records callback's stream.
(*cudaEvents_)[0].record(at::cuda::getCurrentCUDAStream(deviceIndex_));
std::vector<bool> isCudaDeviceUsed(c10::cuda::device_count(), false);
for (const at::DataPtr& data_ptr : extractDataPtrs(value_)) {
if (data_ptr.device().is_cuda()) {
isCudaDeviceUsed[data_ptr.device().index()] = true;
}
}

cudaEvents_ = std::make_shared<std::vector<at::cuda::CUDAEvent>>();
for (c10::DeviceIndex idx = 0; idx < isCudaDeviceUsed.size(); idx++) {
if (isCudaDeviceUsed[idx]) {
at::cuda::CUDAEvent cudaEvent;
cudaEvent.record(at::cuda::getCurrentCUDAStream(idx));
deviceIndices_.push_back(idx);
(*cudaEvents_).push_back(std::move(cudaEvent));
}
}
}

// Just returns FutureNCCL's value after wait returns.
Expand All @@ -297,19 +327,37 @@ class ProcessGroupNCCL : public ProcessGroup {
// this callback. This new FutureNCCL's cudaEvents will record the
// callback's stream and will have the result value of the callback.
void addCallback(std::function<void(void)> callback) override {
// FIXME Should we find a way to allow to change the priority of streams?
at::cuda::CUDAStream stream =
at::cuda::getStreamFromPool(/*isHighPriority=*/false, deviceIndex_);
// We'd love to get a stream for all devices, even those that are not used
// by the value, because the callback could use those other devices, but
// unfortunately this could cause a deadlock with NCCL. See
// https://github.com/pytorch/pytorch/pull/48500#issuecomment-735395414
// In general, if some devices haven't been used yet, by getting a stream
// for them we'd initialize them, and in addition to causing NCCL to
// misbehaving this also ends up using memory on those devices, which the
// user might not want.
std::vector<at::cuda::CUDAStream> streams;
for (int i = 0; i < deviceIndices_.size(); i++) {
c10::DeviceIndex idx = deviceIndices_[i];
// FIXME Should we find a way to allow to change the priority of
// streams?
at::cuda::CUDAStream stream =
at::cuda::getStreamFromPool(/*isHighPriority=*/false, idx);
(*cudaEvents_)[i].block(stream);
streams.push_back(stream);
}

// Use the dedicated callback stream to run callback.
at::cuda::CUDAMultiStreamGuard streamGuard(streams);

// Do not free the underlying data storage of value_ before its
// usage on the stream finishes.
for (const at::DataPtr& data_ptr : extractDataPtrs(value_)) {
c10::cuda::CUDACachingAllocator::recordStream(data_ptr, stream);
if (data_ptr.device().is_cuda()) {
c10::cuda::CUDACachingAllocator::recordStream(
data_ptr, at::cuda::getCurrentCUDAStream(data_ptr.device().index()));
}
}

(*cudaEvents_)[0].block(stream);
// Use the dedicated callback stream to run callback.
c10::StreamGuard streamGuard{stream};
callback();
}

Expand All @@ -319,7 +367,7 @@ class ProcessGroupNCCL : public ProcessGroup {
c10::intrusive_ptr<Future> then(
std::function<at::IValue(void)> callback,
at::TypePtr /* unused */) override {
auto fut = c10::make_intrusive<FutureNCCL>(deviceIndex_);
auto fut = c10::make_intrusive<FutureNCCL>();
// The new future needs the DataPtr extractor when it gets marked complete
// but this might happen immediately inline or in parallel by another
// thread. In both these cases this would/might happen before the user has
Expand Down Expand Up @@ -358,7 +406,7 @@ class ProcessGroupNCCL : public ProcessGroup {

private:
at::IValue value_;
c10::DeviceIndex deviceIndex_;
std::vector<c10::DeviceIndex> deviceIndices_;
std::shared_ptr<std::vector<at::cuda::CUDAEvent>> cudaEvents_;
DataPtrExtractor dataPtrExtractor_;
std::mutex dataPtrExtractorMutex_;
Expand Down

0 comments on commit e294c2d

Please sign in to comment.