Skip to content

Commit

Permalink
Split out reusable CUDAFuture from FutureNCCL
Browse files Browse the repository at this point in the history
Pull Request resolved: #48506

This commit is part of a stack that reworks FutureNCCL in order to extract a generic CUDA-aware Future subclass. The stack deliberately breaks up this transition into elementary changes, to make it easier to verify that the behavior is preserved (or to highlight how it gets changed).

---

FutureNCCL is now a general-purpose type-agnostic multi-device class, so in this commit I extract it from ProcessGroupNCCL to make it available for wider use (notably by the RPC module). We'll call this new class CUDAFuture. We'll keep FutureNCCL as a subclass of CUDAFuture to deal with some NCCL peculiarity, namely the fact that the future becomes complete immediately upon creation. We can clean this up for good once we're done merging Future and Work.

I'm not exactly sure of where to put CUDAFuture. It needs to be available to both c10d and RPC (which lives under torch/csrc). If I figured CMake out correctly (and that's a big if) I think c10d can only depend on ATen (I'll maybe add a comment with how I tracked that down). Hence we cannot put CUDAFuture in torch/csrc. On the other hand, RPC currently depends on c10d, because RPC agents use ProcessGroups internally, so it would be "ok" to put CUDAFuture in c10d. However, we want to get rid of ProcessGroups in RPC, and at that point RPC should in principle not depend on c10d. In that case, the only shared dep between the two that I see is ATen itself.

While I'm a bit wary of putting it right in ATen, I think it might actually make sense. CUDAFuture is intended to be a general-purpose component that can be reused in all settings and is not particularly tied to c10d or RPC. Moreover, ATen already contains ivalue::Future, and it contains a lot of CUDA helpers, so CUDAFuture definitely belongs to the "closure" of what's already there.
ghstack-source-id: 117439444

Differential Revision: [D25180532](https://our.internmc.facebook.com/intern/diff/D25180532/)
  • Loading branch information
lw committed Nov 29, 2020
1 parent f7cff2f commit 3ebbf3a
Show file tree
Hide file tree
Showing 2 changed files with 159 additions and 118 deletions.
149 changes: 149 additions & 0 deletions aten/src/ATen/cuda/CUDAFuture.h
@@ -0,0 +1,149 @@
#pragma once

#include <functional>
#include <memory>
#include <utility>
#include <vector>

#include <ATen/core/ivalue.h>
#include <ATen/core/ivalue_inl.h>
#include <ATen/core/jit_type.h>
#include <ATen/cuda/CUDAEvent.h>
#include <ATen/cuda/CUDAMultiStreamGuard.h>
#include <c10/core/Allocator.h>
#include <c10/core/Device.h>
#include <c10/cuda/CUDACachingAllocator.h>
#include <c10/cuda/CUDAFunctions.h>
#include <c10/cuda/CUDAStream.h>
#include <c10/macros/Export.h>
#include <c10/util/intrusive_ptr.h>

namespace at { namespace cuda {

struct TORCH_CUDA_API CUDAFuture : at::ivalue::Future {
public:
using at::ivalue::Future::Future;

void setDataPtrExtractor(DataPtrExtractor data_ptr_extractor) override {
// To avoid races with other threads that may be using the extractor, we
// won't modify it after it's first set.
if (dataPtrExtractor_ == nullptr) {
dataPtrExtractor_ = std::move(data_ptr_extractor);
}
}

protected:
c10::intrusive_ptr<Future> createInstance(at::TypePtr type) override {
auto fut = c10::make_intrusive<CUDAFuture>(std::move(type));
// The new future needs the DataPtr extractor when it gets marked complete
// but this might happen immediately inline or in parallel by another
// thread. In both these cases this would/might happen before the user has
// time to set their own DataPtr extractor, which might lead to failures
// if the default extractor can't handle some of the user's types.
// Therefore we propagate our extractor.
fut->setDataPtrExtractor(dataPtrExtractor_);
return fut;
}

void postMarkCompletedHook(const at::IValue& value) override {
std::vector<bool> isCudaDeviceUsed(c10::cuda::device_count(), false);
for (const at::DataPtr& data_ptr : extractDataPtrs(value)) {
if (data_ptr.device().is_cuda()) {
isCudaDeviceUsed[data_ptr.device().index()] = true;
}
}

cudaEvents_ = std::make_shared<std::vector<at::cuda::CUDAEvent>>();
for (c10::DeviceIndex idx = 0; idx < isCudaDeviceUsed.size(); idx++) {
if (isCudaDeviceUsed[idx]) {
at::cuda::CUDAEvent cudaEvent;
cudaEvent.record(at::cuda::getCurrentCUDAStream(idx));
(*cudaEvents_).push_back(std::move(cudaEvent));
}
}
}

std::function<void(void)> wrapCallback(
std::function<void(void)> callback) override {
return [this, callback{std::move(callback)}]() {
// We'd love to get a stream for all devices, even those that are not used
// by the value, because the callback could use those other devices, but
// unfortunately this could cause a deadlock with NCCL. See
// https://github.com/pytorch/pytorch/pull/48500#issuecomment-735395414
// In general, if some devices haven't been used yet, by getting a stream
// for them we'd initialize them, and in addition to causing NCCL to
// misbehaving this also ends up using memory on those devices, which the
// user might not want.
std::vector<at::cuda::CUDAStream> streams;
for (at::cuda::CUDAEvent& cudaEvent : *cudaEvents_) {
c10::DeviceIndex idx = cudaEvent.device_index();
// FIXME Should we find a way to allow to change the priority of
// streams?
at::cuda::CUDAStream stream =
at::cuda::getStreamFromPool(/*isHighPriority=*/false, idx);
cudaEvent.block(stream);
streams.push_back(stream);
}

// Use the dedicated callback stream to run callback.
at::cuda::CUDAMultiStreamGuard streamGuard(streams);

// Do not free the underlying data storage of value_ before its
// usage on the stream finishes.
for (const at::DataPtr& data_ptr : extractDataPtrs(constValue())) {
if (data_ptr.device().is_cuda()) {
c10::cuda::CUDACachingAllocator::recordStream(
data_ptr, at::cuda::getCurrentCUDAStream(data_ptr.device().index()));
}
}

callback();
};
}

void postWaitHook(const at::IValue& value) override {
for (at::cuda::CUDAEvent& cudaEvent : *cudaEvents_) {
cudaEvent.block(
at::cuda::getCurrentCUDAStream(cudaEvent.device_index()));
}

for (const at::DataPtr& data_ptr : extractDataPtrs(value)) {
if (data_ptr.device().is_cuda()) {
c10::cuda::CUDACachingAllocator::recordStream(
data_ptr, at::cuda::getCurrentCUDAStream(data_ptr.device().index()));
}
}
}

// FIXME This field is protected (rather than private) and wrapped in a
// shared_ptr in order to support the FutureNCCL subclass, which wants to set
// the events on its own in order to use the same ones as its WorkNCCL class.
// Once WorkNCCL is gone (as part of the Future and Work merge) this should be
// fixed.
protected:
std::shared_ptr<std::vector<at::cuda::CUDAEvent>> cudaEvents_;

private:
DataPtrExtractor dataPtrExtractor_;

// FIXME This too is protected so that it can be used by FutureNCCL. Please
// undo that once FutureNCCL is dropped in favor of a "vanilla" CUDAFuture.
protected:
std::vector<std::reference_wrapper<const at::DataPtr>> extractDataPtrs(
const at::IValue& value) {
std::vector<std::reference_wrapper<const at::DataPtr>> data_ptrs;
if (dataPtrExtractor_ != nullptr) {
// If a Python communication hook is used, dataPtrExtractor_ will be
// set in torch/csrc/jit/python/pybind_utils.h, which allows Python
// dependency to be imported.
data_ptrs = dataPtrExtractor_(value);
} else {
// If a C++ communication hook is used, use the default extractor.
data_ptrs = at::ivalue::Future::defaultDataPtrExtractor(value);
}
return data_ptrs;
}
};

} // namespace cuda
} // namespace at
128 changes: 10 additions & 118 deletions torch/lib/c10d/ProcessGroupNCCL.hpp
Expand Up @@ -13,6 +13,7 @@

#include <ATen/cuda/CUDAContext.h>
#include <ATen/cuda/CUDAEvent.h>
#include <ATen/cuda/CUDAFuture.h>
#include <ATen/cuda/CUDAMultiStreamGuard.h>
#include <c10/core/Stream.h>
#include <c10/core/StreamGuard.h>
Expand Down Expand Up @@ -207,140 +208,31 @@ class ProcessGroupNCCL : public ProcessGroup {
// enables synchronizing the appropriate streams and avoids stalling PyTorch's
// default stream while running the callback. In case of multiple then
// callbacks, each will be executed on its own fresh stream.
struct FutureNCCL : at::ivalue::Future {
struct FutureNCCL : at::cuda::CUDAFuture {
public:
explicit FutureNCCL(
FutureNCCL(
at::IValue value,
std::shared_ptr<std::vector<at::cuda::CUDAEvent>> cudaEvents)
: at::ivalue::Future(c10::ListType::create(c10::TensorType::get())),
cudaEvents_(std::move(cudaEvents)) {
for (const at::cuda::CUDAEvent& event : *cudaEvents_) {
: at::cuda::CUDAFuture(c10::ListType::create(c10::TensorType::get())){
for (const at::cuda::CUDAEvent& event : *cudaEvents) {
TORCH_INTERNAL_ASSERT(event.isCreated());
}
for (const at::DataPtr& data_ptr : extractDataPtrs(value)) {
TORCH_INTERNAL_ASSERT(
std::find_if(
cudaEvents_->begin(),
cudaEvents_->end(),
cudaEvents->begin(),
cudaEvents->end(),
[&](const at::cuda::CUDAEvent& ev) {
return ev.device_index() == data_ptr.device().index();
}) != cudaEvents_->end());
}) != cudaEvents->end());
}
cudaEvents_ = std::move(cudaEvents);
markCompleted(std::move(value));
}

using at::ivalue::Future::Future;

void setDataPtrExtractor(DataPtrExtractor data_ptr_extractor) override {
// To avoid races with other threads that may be using the extractor, we
// won't modify it after it's first set.
if (dataPtrExtractor_ == nullptr) {
dataPtrExtractor_ = std::move(data_ptr_extractor);
}
}

protected:
c10::intrusive_ptr<Future> createInstance(at::TypePtr type) override {
auto fut = c10::make_intrusive<FutureNCCL>(std::move(type));
// The new future needs the DataPtr extractor when it gets marked complete
// but this might happen immediately inline or in parallel by another
// thread. In both these cases this would/might happen before the user has
// time to set their own DataPtr extractor, which might lead to failures
// if the default extractor can't handle some of the user's types.
// Therefore we propagate our extractor.
fut->setDataPtrExtractor(dataPtrExtractor_);
return fut;
}

void postMarkCompletedHook(const at::IValue& value) override {
// Check whether the first or second constructor created this instance.
if (cudaEvents_ == nullptr) {
std::vector<bool> isCudaDeviceUsed(c10::cuda::device_count(), false);
for (const at::DataPtr& data_ptr : extractDataPtrs(value)) {
if (data_ptr.device().is_cuda()) {
isCudaDeviceUsed[data_ptr.device().index()] = true;
}
}

cudaEvents_ = std::make_shared<std::vector<at::cuda::CUDAEvent>>();
for (c10::DeviceIndex idx = 0; idx < isCudaDeviceUsed.size(); idx++) {
if (isCudaDeviceUsed[idx]) {
at::cuda::CUDAEvent cudaEvent;
cudaEvent.record(at::cuda::getCurrentCUDAStream(idx));
(*cudaEvents_).push_back(std::move(cudaEvent));
}
}
}
}

std::function<void(void)> wrapCallback(std::function<void(void)> callback) override {
return [this, callback{std::move(callback)}]() {
// We'd love to get a stream for all devices, even those that are not used
// by the value, because the callback could use those other devices, but
// unfortunately this could cause a deadlock with NCCL. See
// https://github.com/pytorch/pytorch/pull/48500#issuecomment-735395414
// In general, if some devices haven't been used yet, by getting a stream
// for them we'd initialize them, and in addition to causing NCCL to
// misbehaving this also ends up using memory on those devices, which the
// user might not want.
std::vector<at::cuda::CUDAStream> streams;
for (at::cuda::CUDAEvent& cudaEvent : *cudaEvents_) {
c10::DeviceIndex idx = cudaEvent.device_index();
// FIXME Should we find a way to allow to change the priority of
// streams?
at::cuda::CUDAStream stream =
at::cuda::getStreamFromPool(/*isHighPriority=*/false, idx);
cudaEvent.block(stream);
streams.push_back(stream);
}

// Use the dedicated callback stream to run callback.
at::cuda::CUDAMultiStreamGuard streamGuard(streams);

// Do not free the underlying data storage of value_ before its
// usage on the stream finishes.
for (const at::DataPtr& data_ptr : extractDataPtrs(constValue())) {
if (data_ptr.device().is_cuda()) {
c10::cuda::CUDACachingAllocator::recordStream(
data_ptr, at::cuda::getCurrentCUDAStream(data_ptr.device().index()));
}
}

callback();
};
}

void postWaitHook(const at::IValue& value) override {
for (at::cuda::CUDAEvent& cudaEvent : *cudaEvents_) {
cudaEvent.block(
at::cuda::getCurrentCUDAStream(cudaEvent.device_index()));
}

for (const at::DataPtr& data_ptr : extractDataPtrs(value)) {
if (data_ptr.device().is_cuda()) {
c10::cuda::CUDACachingAllocator::recordStream(
data_ptr, at::cuda::getCurrentCUDAStream(data_ptr.device().index()));
}
}
}

private:
std::shared_ptr<std::vector<at::cuda::CUDAEvent>> cudaEvents_;
DataPtrExtractor dataPtrExtractor_;

std::vector<std::reference_wrapper<const at::DataPtr>> extractDataPtrs(
const at::IValue& value) {
std::vector<std::reference_wrapper<const at::DataPtr>> data_ptrs;
if (dataPtrExtractor_ != nullptr) {
// If a Python communication hook is used, dataPtrExtractor_ will be
// set in torch/csrc/jit/python/pybind_utils.h, which allows Python
// dependency to be imported.
data_ptrs = dataPtrExtractor_(value);
} else {
// If a C++ communication hook is used, use the default extractor.
data_ptrs = at::ivalue::Future::defaultDataPtrExtractor(value);
}
return data_ptrs;
// Do nothing because the constructor already stored the events.
}
};

Expand Down

0 comments on commit 3ebbf3a

Please sign in to comment.