Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove NCCL dependency from PythonFutureWrapper #48495

Closed
wants to merge 8 commits into from
34 changes: 28 additions & 6 deletions aten/src/ATen/core/ivalue_inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -417,12 +417,34 @@ struct C10_EXPORT ivalue::Future : c10::intrusive_ptr_target {
return fut;
}

// Since this file cannot import CUDA depedency, the type of the seocond arg
// in the callback is c10::Stream instead of at::cuda::CUDAStream, and
// CUDAStream is constructed on the fly. The default implementation
// is a no-op, since it does not deal with any CUDA streams.
virtual void setRecordStreamCallback(
std::function<void(const at::IValue&, const c10::Stream&)> record_stream_cb) {}
// Some subclasses deal with CUDA tensors and must inform the CUDA caching
// allocator of which CUDA streams each DataPtr is used in. If the value held
// by the future is a Python object we need to acquire the GIL when extracting
// these DataPtrs. Since this file cannot depend on Python, we allow users to
// provide a "custom" extractor. Look for example at the PythonFutureWrapper.
using DataPtrExtractor =
std::function<std::vector<std::reference_wrapper<const at::DataPtr>>(
const at::IValue&)>;
virtual void setDataPtrExtractor(DataPtrExtractor data_ptr_extractor) {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume this is an intermediate state, as setDataPtrExtractor exists in the base class, but dataPtrExtractor_ only lives in sub classes? If this will be the long-term solution, do we need to rename this function? Otherwise setDataPtrExtractor is not doing what the name suggests in non-FutureNCCL classes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be honest, I was thinking of this as a long-term solution. Well, actually, I didn't think about it too much, because this is basically how it was already done (the setRecordStreamCallback was a no-op in ivalue::Future and was only implemented by FutureNCCL). I was fine with such a solution as I read the semantics of this method basically as "setDataPtrExtractorIfNeeded".

Also, later on I'll do something similar to this in order to merge some FutureNCCL logic into ivalue::Future: I'll define (protected) virtual methods that are left unimplemented in ivalue::Future and only do something when overridden by the FutureNCCL subclass. Although admittedly that's not exactly the same as these hooks are not part of the public interface.

I recognize that these solutions are not the nicest ones, but the hook one was the safest one I could find (minimum code duplication and protection from later updates to ivalue::Future). I'm not as attached to the DataPtrExtractor though, and I'd be happy to hear alternative proposals.

I've also only just realized that DataPtrExtractor will incur in another issue once we support multi-GPU (in #48500) since then it will be used in two places (by the "parent" future, inside then, and by the "child" future, inside markCompleted). And thus we'll probably need the parent future to propagate its DataPtrExtractor to the child future, so that if the child future completes immediately (before it's wrapped in a PythonFutureWrapper) it already has the right DataPtrExtractor. This will be a bit tricky to get right, especially if multiple threads are at play and we need to protect against race conditions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And thus we'll probably need the parent future to propagate its DataPtrExtractor to the child future, so that if the child future completes immediately (before it's wrapped in a PythonFutureWrapper) it already has the right DataPtrExtractor.

I assume this means the child Future created by .then() would always be the same type (CPU/CUDA) as the parent Future?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume this means the child Future created by .then() would always be the same type (CPU/CUDA) as the parent Future?

That's indeed the case. I didn't give it much thought, do you think it could present a problem? Since the CUDAFuture is a "generalization" of ivalue::Future (and, in fact, it behaves exactly the same when the vector of CUDAEvents is empty), it should be perfectly fine to attach a CPU-only callback to a CUDAFutures. Issues would start to arise if one wants to attach a CUDA callback to a CPU-only ivalue::Future. I'm not sure how we would tackle that...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Issues would start to arise if one wants to attach a CUDA callback to a CPU-only ivalue::Future.

the current version LGTM. If users hit this, we can fix it later.


// Expose the default implementation so that external ones can defer to it.
static std::vector<std::reference_wrapper<const at::DataPtr>>
defaultDataPtrExtractor(const at::IValue& value) {
// FIXME Should we support more types than just tensors and tensor lists?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, if we are going use this for as a general CudaFuture. But it can come in followup PRs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, I'm doing this in #48502

TORCH_INTERNAL_ASSERT(
value.isTensorList() || value.isTensor(),
"the future value must be either a tensor list or a tensor.");
at::Tensor tensor;
if (value.isTensorList()) {
const auto tensors = value.toTensorVector();
TORCH_INTERNAL_ASSERT(tensors.size() == 1, "expected exactly 1 tensor");
tensor = tensors[0];
} else {
tensor = value.toTensor();
}

return {tensor.storage().data_ptr()};
};

// Tries to retrieve the error message from std::exception_ptr.
std::string tryRetrieveErrorMessage() {
Expand Down
45 changes: 19 additions & 26 deletions torch/csrc/jit/python/pybind_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,32 +119,7 @@ struct VISIBILITY_HIDDEN PythonFutureWrapper
// vector, but Future does not acquire GIL on destruction.
auto pf = std::make_shared<PythonFunctionGuard>(std::move(cb));

#ifdef USE_C10D_NCCL
// This callback is only used by NCCL backend, so skip this code on other
// backends and avoid importing cuda dependency.
// By default, assume that the input value is or can be casted into a tensor
// vector that has exactly one tensor.
auto record_stream_cb = [](const at::IValue& value,
const c10::Stream& stream) {
if (value.isTensorList() || value.isPyObject()) {
std::vector<at::Tensor> tensors;
if (value.isTensorList()) {
tensors = value.toTensorVector();
} else {
pybind11::gil_scoped_acquire gil;
py::object obj = torch::jit::toPyObject(value);
tensors = torch::jit::toIValue(
obj, c10::ListType::create(c10::TensorType::get()))
.toTensorVector();
}
TORCH_INTERNAL_ASSERT(tensors.size() == 1, "expected exactly 1 tensor");
at::cuda::CUDAStream cuda_stream(stream);
c10::cuda::CUDACachingAllocator::recordStream(
tensors[0].storage().data_ptr(), cuda_stream);
}
};
fut->setRecordStreamCallback(record_stream_cb);
#endif
fut->setDataPtrExtractor(&PythonFutureWrapper::dataPtrExtractor);

return std::make_shared<jit::PythonFutureWrapper>(fut->then(
// Capture a copy of the ivalue::Future instead of the `this` pointer
Expand Down Expand Up @@ -241,6 +216,24 @@ struct VISIBILITY_HIDDEN PythonFutureWrapper
std::shared_ptr<PythonFutureWrapper> getPtr() {
return shared_from_this();
}

// This callback is only used by subclasses of Future that deal with CUDA,
// in order to register the pointers on the right streams with the caching
// allocator.
// By default, assume that the input value is or can be casted into a tensor
// vector that has exactly one tensor.
static std::vector<std::reference_wrapper<const at::DataPtr>> dataPtrExtractor(
const at::IValue& value) {
if (value.isPyObject()) {
pybind11::gil_scoped_acquire gil;
py::object obj = torch::jit::toPyObject(value);
// FIXME Should we support more types than just tensor lists?
auto new_value = torch::jit::toIValue(
obj, c10::ListType::create(c10::TensorType::get()));
return at::ivalue::Future::defaultDataPtrExtractor(new_value);
}
return at::ivalue::Future::defaultDataPtrExtractor(value);
};
};

// error reporting: when reporting user-caused errors, these functions should
Expand Down
51 changes: 24 additions & 27 deletions torch/lib/c10d/ProcessGroupNCCL.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -314,28 +314,9 @@ class ProcessGroupNCCL : public ProcessGroup {

// Do not free the underlying data storage of value_ before its
// usage on futureNCCLCallbackStream_ finish.
if (record_stream_cb_ != nullptr) {
// If a Python communication hook is used, record_stream_cb_ will be
// set in torch/csrc/jit/python/pybind_utils.h, which allows Python
// dependency to be imported.
record_stream_cb_(value_, futureNCCLCallbackStream_->unwrap());
} else {
// If a C++ communication hook is used, create and set a record stream
// callback.
TORCH_INTERNAL_ASSERT(
value_.isTensorList() || value_.isTensor(),
"the future value must be either a tensor list or a tensor.");
at::Tensor tensor;
if (value_.isTensorList()) {
const auto tensors = value_.toTensorVector();
TORCH_INTERNAL_ASSERT(
tensors.size() == 1, "expected exactly 1 tensor");
tensor = tensors[0];
} else {
tensor = value_.toTensor();
}
for (const at::DataPtr& data_ptr : extractDataPtrs(value_)) {
c10::cuda::CUDACachingAllocator::recordStream(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious about the implication for RPC use cases. Do RPC also need to call recordStream? If yes, when? Is it when the tensors are retrieved from the Future (through result or wait), we should call recordStream on the current stream?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still figuring out the correct usage of the caching allocator, but I think this should work in the same way for RPC. The model I have in mind for RPC is the one we discussed in #44084 (comment). In that case, in the receiver (bottom right quadrant of the diagram), I think we need to record streams with the caching allocators whenever we "transfer" the result to other streams than the ones we used to receive it. This would happen both when using .wait()/.value(), and in callbacks (basically the points in the diagram where we say "record events"). And this is exactly what we're doing here. Does this make sense?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, make sense to me.

tensor.storage().data_ptr(), *futureNCCLCallbackStream_);
data_ptr, *futureNCCLCallbackStream_);
}

// Use the dedicated callback stream to run callback.
Expand Down Expand Up @@ -372,20 +353,36 @@ class ProcessGroupNCCL : public ProcessGroup {
return !value_.isNone();
}

void setRecordStreamCallback(
std::function<void(const at::IValue&, const c10::Stream&)>
record_stream_cb) override {
record_stream_cb_ = std::move(record_stream_cb);
void setDataPtrExtractor(DataPtrExtractor dataPtrExtractor) override {
std::unique_lock<std::mutex> lock(dataPtrExtractorMutex_);
dataPtrExtractor_ = std::move(dataPtrExtractor);
}

private:
at::IValue value_;
c10::DeviceIndex deviceIndex_;
std::shared_ptr<std::vector<at::cuda::CUDAEvent>> cudaEvents_;
std::shared_ptr<at::cuda::CUDAStream> futureNCCLCallbackStream_;
std::function<void(const at::IValue&, const c10::Stream&)>
record_stream_cb_;
DataPtrExtractor dataPtrExtractor_;
std::mutex dataPtrExtractorMutex_;
c10::optional<FutureError> error_;

std::vector<std::reference_wrapper<const at::DataPtr>> extractDataPtrs(
const at::IValue& value) {
std::unique_lock<std::mutex> lock(dataPtrExtractorMutex_);
std::vector<std::reference_wrapper<const at::DataPtr>> data_ptrs;
if (dataPtrExtractor_ != nullptr) {
// If a Python communication hook is used, dataPtrExtractor_ will be
// set in torch/csrc/jit/python/pybind_utils.h, which allows Python
// dependency to be imported.
data_ptrs = dataPtrExtractor_(value);
} else {
// If a C++ communication hook is used, use the default extractor.
data_ptrs = at::ivalue::Future::defaultDataPtrExtractor(value);
}
TORCH_INTERNAL_ASSERT(data_ptrs.size() == 1, "expected exactly 1 tensor");
return data_ptrs;
}
};

// If you wish to create multiple process groups, each with a potentially
Expand Down