From 030fa6cfba69da6342f89483ceca8c898b93f165 Mon Sep 17 00:00:00 2001 From: Luca Wehrstedt Date: Thu, 10 Dec 2020 03:45:30 -0800 Subject: [PATCH] Split out reusable CUDAFuture from FutureNCCL (#48506) Summary: Pull Request resolved: https://github.com/pytorch/pytorch/pull/48506 This commit is part of a stack that reworks FutureNCCL in order to extract a generic CUDA-aware Future subclass. The stack deliberately breaks up this transition into elementary changes, to make it easier to verify that the behavior is preserved (or to highlight how it gets changed). --- FutureNCCL is now a general-purpose type-agnostic multi-device class, so in this commit I extract it from ProcessGroupNCCL to make it available for wider use (notably by the RPC module). We'll call this new class CUDAFuture. We'll keep FutureNCCL as a subclass of CUDAFuture to deal with some NCCL peculiarity, namely the fact that the future becomes complete immediately upon creation. We can clean this up for good once we're done merging Future and Work. I'm not exactly sure of where to put CUDAFuture. It needs to be available to both c10d and RPC (which lives under torch/csrc). If I figured CMake out correctly (and that's a big if) I think c10d can only depend on ATen (I'll maybe add a comment with how I tracked that down). Hence we cannot put CUDAFuture in torch/csrc. On the other hand, RPC currently depends on c10d, because RPC agents use ProcessGroups internally, so it would be "ok" to put CUDAFuture in c10d. However, we want to get rid of ProcessGroups in RPC, and at that point RPC should in principle not depend on c10d. In that case, the only shared dep between the two that I see is ATen itself. While I'm a bit wary of putting it right in ATen, I think it might actually make sense. CUDAFuture is intended to be a general-purpose component that can be reused in all settings and is not particularly tied to c10d or RPC. Moreover, ATen already contains ivalue::Future, and it contains a lot of CUDA helpers, so CUDAFuture definitely belongs to the "closure" of what's already there. ghstack-source-id: 118180030 Test Plan: Unit tests? Reviewed By: wanchaol Differential Revision: D25180532 fbshipit-source-id: 697f655240dbdd3be22a568d5102ab27691f86d4 --- aten/src/ATen/cuda/CUDAFuture.h | 153 ++++++++++++++++++++++++++++ torch/lib/c10d/ProcessGroupNCCL.hpp | 131 +++--------------------- 2 files changed, 165 insertions(+), 119 deletions(-) create mode 100644 aten/src/ATen/cuda/CUDAFuture.h diff --git a/aten/src/ATen/cuda/CUDAFuture.h b/aten/src/ATen/cuda/CUDAFuture.h new file mode 100644 index 000000000000..7db95ba3f734 --- /dev/null +++ b/aten/src/ATen/cuda/CUDAFuture.h @@ -0,0 +1,153 @@ +#pragma once + +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace at { namespace cuda { + +struct TORCH_CUDA_API CUDAFuture : at::ivalue::Future { + public: + using at::ivalue::Future::Future; + + void setDataPtrExtractor(DataPtrExtractor dataPtrExtractor) override { + std::unique_lock lock(dataPtrExtractorMutex_); + dataPtrExtractor_ = std::move(dataPtrExtractor); + } + + protected: + c10::intrusive_ptr createInstance(at::TypePtr type) override { + auto fut = c10::make_intrusive(std::move(type)); + // The new future needs the DataPtr extractor when it gets marked complete + // but this might happen immediately inline or in parallel by another + // thread. In both these cases this would/might happen before the user has + // time to set their own DataPtr extractor, which might lead to failures + // if the default extractor can't handle some of the user's types. + // Therefore we propagate our extractor. + fut->setDataPtrExtractor(dataPtrExtractor_); + return fut; + } + + void postMarkCompletedHook(const at::IValue& value) override { + std::vector isCudaDeviceUsed(c10::cuda::device_count(), false); + for (const at::DataPtr& data_ptr : extractDataPtrs(value)) { + if (data_ptr.device().is_cuda()) { + isCudaDeviceUsed[data_ptr.device().index()] = true; + } + } + + cudaEvents_ = std::make_shared>(); + for (c10::DeviceIndex idx = 0; idx < isCudaDeviceUsed.size(); idx++) { + if (isCudaDeviceUsed[idx]) { + at::cuda::CUDAEvent cudaEvent; + cudaEvent.record(at::cuda::getCurrentCUDAStream(idx)); + (*cudaEvents_).push_back(std::move(cudaEvent)); + } + } + } + + std::function wrapCallback( + std::function callback) override { + return [this, callback{std::move(callback)}]() { + // We'd love to get a stream for all devices, even those that are not used + // by the value, because the callback could use those other devices, but + // unfortunately this could cause a deadlock with NCCL. See + // https://github.com/pytorch/pytorch/pull/48500#issuecomment-735395414 + // In general, if some devices haven't been used yet, by getting a stream + // for them we'd initialize them, and in addition to causing NCCL to + // misbehaving this also ends up using memory on those devices, which the + // user might not want. + std::vector streams; + for (at::cuda::CUDAEvent& cudaEvent : *cudaEvents_) { + c10::DeviceIndex idx = cudaEvent.device_index(); + // FIXME Should we find a way to allow to change the priority of + // streams? + at::cuda::CUDAStream stream = + at::cuda::getStreamFromPool(/*isHighPriority=*/false, idx); + cudaEvent.block(stream); + streams.push_back(stream); + } + + // Use the dedicated callback stream to run callback. + at::cuda::CUDAMultiStreamGuard streamGuard(streams); + + // Do not free the underlying data storage of value_ before its + // usage on the stream finishes. + for (const at::DataPtr& data_ptr : extractDataPtrs(constValue())) { + if (data_ptr.device().is_cuda()) { + c10::cuda::CUDACachingAllocator::recordStream( + data_ptr, at::cuda::getCurrentCUDAStream(data_ptr.device().index())); + } + } + + callback(); + }; + } + + void postWaitHook(const at::IValue& value) override { + for (at::cuda::CUDAEvent& cudaEvent : *cudaEvents_) { + cudaEvent.block( + at::cuda::getCurrentCUDAStream(cudaEvent.device_index())); + } + + for (const at::DataPtr& data_ptr : extractDataPtrs(value)) { + if (data_ptr.device().is_cuda()) { + c10::cuda::CUDACachingAllocator::recordStream( + data_ptr, at::cuda::getCurrentCUDAStream(data_ptr.device().index())); + } + } + } + + // FIXME This field is protected (rather than private) and wrapped in a + // shared_ptr in order to support the FutureNCCL subclass, which wants to set + // the events on its own in order to use the same ones as its WorkNCCL class. + // Once WorkNCCL is gone (as part of the Future and Work merge) this should be + // fixed. + protected: + // The events that correspond to the completion of the async I/O kernels. They + // are recorded on the appropriate streams when the future is marked completed + // and can then be queried/waited/blocked on. There is one event for each + // distinct device on which the value's tensors reside. + std::shared_ptr> cudaEvents_; + + private: + DataPtrExtractor dataPtrExtractor_; + std::mutex dataPtrExtractorMutex_; + + // FIXME This too is protected so that it can be used by FutureNCCL. Please + // undo that once FutureNCCL is dropped in favor of a "vanilla" CUDAFuture. + protected: + std::vector> extractDataPtrs( + const at::IValue& value) { + std::unique_lock lock(dataPtrExtractorMutex_); + std::vector> data_ptrs; + if (dataPtrExtractor_ != nullptr) { + // If a Python communication hook is used, dataPtrExtractor_ will be + // set in torch/csrc/jit/python/pybind_utils.h, which allows Python + // dependency to be imported. + data_ptrs = dataPtrExtractor_(value); + } else { + // If a C++ communication hook is used, use the default extractor. + data_ptrs = at::ivalue::Future::defaultDataPtrExtractor(value); + } + return data_ptrs; + } +}; + +} // namespace cuda +} // namespace at diff --git a/torch/lib/c10d/ProcessGroupNCCL.hpp b/torch/lib/c10d/ProcessGroupNCCL.hpp index 23bc12390541..0bd04645dac2 100644 --- a/torch/lib/c10d/ProcessGroupNCCL.hpp +++ b/torch/lib/c10d/ProcessGroupNCCL.hpp @@ -13,6 +13,7 @@ #include #include +#include #include #include #include @@ -207,146 +208,38 @@ class ProcessGroupNCCL : public ProcessGroup { // enables synchronizing the appropriate streams and avoids stalling PyTorch's // default stream while running the callback. In case of multiple then // callbacks, each will be executed on its own fresh stream. - struct FutureNCCL : at::ivalue::Future { + struct FutureNCCL : at::cuda::CUDAFuture { public: - explicit FutureNCCL( + FutureNCCL( at::IValue value, std::shared_ptr> cudaEvents) - : at::ivalue::Future(c10::ListType::create(c10::TensorType::get())), - cudaEvents_(std::move(cudaEvents)) { + : at::cuda::CUDAFuture(c10::ListType::create(c10::TensorType::get())){ // Check that the device indices are distinct std::unordered_set uniqueDeviceIndices; - for (const at::cuda::CUDAEvent& event : *cudaEvents_) { + for (const at::cuda::CUDAEvent& event : *cudaEvents) { TORCH_INTERNAL_ASSERT(event.isCreated()); uniqueDeviceIndices.insert(event.device_index()); } TORCH_INTERNAL_ASSERT( - cudaEvents_->size() == uniqueDeviceIndices.size(), - "Got ", cudaEvents_->size(), " events, but only ", + cudaEvents->size() == uniqueDeviceIndices.size(), + "Got ", cudaEvents->size(), " events, but only ", uniqueDeviceIndices.size(), " distinct devices"); for (const at::DataPtr& data_ptr : extractDataPtrs(value)) { TORCH_INTERNAL_ASSERT( std::find_if( - cudaEvents_->begin(), - cudaEvents_->end(), + cudaEvents->begin(), + cudaEvents->end(), [&](const at::cuda::CUDAEvent& ev) { return ev.device_index() == data_ptr.device().index(); - }) != cudaEvents_->end()); + }) != cudaEvents->end()); } + cudaEvents_ = std::move(cudaEvents); markCompleted(std::move(value)); } - using at::ivalue::Future::Future; - - void setDataPtrExtractor(DataPtrExtractor dataPtrExtractor) override { - std::unique_lock lock(dataPtrExtractorMutex_); - dataPtrExtractor_ = std::move(dataPtrExtractor); - } - protected: - c10::intrusive_ptr createInstance(at::TypePtr type) override { - auto fut = c10::make_intrusive(std::move(type)); - // The new future needs the DataPtr extractor when it gets marked complete - // but this might happen immediately inline or in parallel by another - // thread. In both these cases this would/might happen before the user has - // time to set their own DataPtr extractor, which might lead to failures - // if the default extractor can't handle some of the user's types. - // Therefore we propagate our extractor. - fut->setDataPtrExtractor(dataPtrExtractor_); - return fut; - } - void postMarkCompletedHook(const at::IValue& value) override { - // Check whether the first or second constructor created this instance. - if (cudaEvents_ == nullptr) { - std::vector isCudaDeviceUsed(c10::cuda::device_count(), false); - for (const at::DataPtr& data_ptr : extractDataPtrs(value)) { - if (data_ptr.device().is_cuda()) { - isCudaDeviceUsed[data_ptr.device().index()] = true; - } - } - - cudaEvents_ = std::make_shared>(); - for (c10::DeviceIndex idx = 0; idx < isCudaDeviceUsed.size(); idx++) { - if (isCudaDeviceUsed[idx]) { - at::cuda::CUDAEvent cudaEvent; - cudaEvent.record(at::cuda::getCurrentCUDAStream(idx)); - (*cudaEvents_).push_back(std::move(cudaEvent)); - } - } - } - } - - std::function wrapCallback(std::function callback) override { - return [this, callback{std::move(callback)}]() { - // We'd love to get a stream for all devices, even those that are not used - // by the value, because the callback could use those other devices, but - // unfortunately this could cause a deadlock with NCCL. See - // https://github.com/pytorch/pytorch/pull/48500#issuecomment-735395414 - // In general, if some devices haven't been used yet, by getting a stream - // for them we'd initialize them, and in addition to causing NCCL to - // misbehaving this also ends up using memory on those devices, which the - // user might not want. - std::vector streams; - for (at::cuda::CUDAEvent& cudaEvent : *cudaEvents_) { - c10::DeviceIndex idx = cudaEvent.device_index(); - // FIXME Should we find a way to allow to change the priority of - // streams? - at::cuda::CUDAStream stream = - at::cuda::getStreamFromPool(/*isHighPriority=*/false, idx); - cudaEvent.block(stream); - streams.push_back(stream); - } - - // Use the dedicated callback stream to run callback. - at::cuda::CUDAMultiStreamGuard streamGuard(streams); - - // Do not free the underlying data storage of value_ before its - // usage on the stream finishes. - for (const at::DataPtr& data_ptr : extractDataPtrs(constValue())) { - if (data_ptr.device().is_cuda()) { - c10::cuda::CUDACachingAllocator::recordStream( - data_ptr, at::cuda::getCurrentCUDAStream(data_ptr.device().index())); - } - } - - callback(); - }; - } - - void postWaitHook(const at::IValue& value) override { - for (at::cuda::CUDAEvent& cudaEvent : *cudaEvents_) { - cudaEvent.block( - at::cuda::getCurrentCUDAStream(cudaEvent.device_index())); - } - - for (const at::DataPtr& data_ptr : extractDataPtrs(value)) { - if (data_ptr.device().is_cuda()) { - c10::cuda::CUDACachingAllocator::recordStream( - data_ptr, at::cuda::getCurrentCUDAStream(data_ptr.device().index())); - } - } - } - - private: - std::shared_ptr> cudaEvents_; - DataPtrExtractor dataPtrExtractor_; - std::mutex dataPtrExtractorMutex_; - - std::vector> extractDataPtrs( - const at::IValue& value) { - std::unique_lock lock(dataPtrExtractorMutex_); - std::vector> data_ptrs; - if (dataPtrExtractor_ != nullptr) { - // If a Python communication hook is used, dataPtrExtractor_ will be - // set in torch/csrc/jit/python/pybind_utils.h, which allows Python - // dependency to be imported. - data_ptrs = dataPtrExtractor_(value); - } else { - // If a C++ communication hook is used, use the default extractor. - data_ptrs = at::ivalue::Future::defaultDataPtrExtractor(value); - } - return data_ptrs; + // Do nothing because the constructor already stored the events. } };