New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Synchronize RRef.to_here() CUDA Streams properly #54932
Conversation
[ghstack-poisoned]
💊 CI failures summary and remediationsAs of commit b7aa243 (more details on the Dr. CI page):
This comment was automatically generated by Dr. CI (expand for details).Follow this link to opt-out of these comments for your Pull Requests.Please report bugs/suggestions to the (internal) Dr. CI Users group. |
[ghstack-poisoned]
ghstack-source-id: 31e6268aee87824bfea22bd9f97f341fde6beb57 Pull Request resolved: #54932
[ghstack-poisoned]
ghstack-source-id: 898e4459897c95b3b76d4c523c23653a9278c4f9 Pull Request resolved: #54932
[ghstack-poisoned]
@@ -411,6 +413,7 @@ void RequestCallbackImpl::processPythonRRefFetchCall( | |||
Message m = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this processPythonRemoteCall
function should take the ctx as an argument and insert streams to that ctx accordingly for each device used by the RRef.
@@ -411,6 +413,7 @@ void RequestCallbackImpl::processPythonRRefFetchCall( | |||
Message m = | |||
PythonRRefFetchRet(std::move(*result).toIValues()).toMessage(); | |||
m.setId(messageId); | |||
rref->waitAllDevices(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we shouldn't call waitAllDevices
in this way, because it currently only blocks current streams, but we don't haven't set up streams using the ctx, which means that after doing this, the streams are still not synchronized.
@@ -411,6 +413,7 @@ void RequestCallbackImpl::processPythonRRefFetchCall( | |||
Message m = | |||
PythonRRefFetchRet(std::move(*result).toIValues()).toMessage(); | |||
m.setId(messageId); | |||
rref->waitAllDevices(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is the owner RRef, and after accessing the value from this RRef, we know that the rpc.remote
request has already recorded CUDAEvents on this RRef, so we can access those events here. Then, we need to do two things:
- use the
ctx->getStream(device)
API to grab a stream for each device this owner RRef uses. - use owner RRef's CUDAEvents to block the streams allocated in (1).
The reason that we need to block those streams instead of current streams is because, 1) later when tensorpipe sends out response, it will use the same ctx to get streams, and we need to make sure that comm ops inserted in those streams will wait for the pending computations inserted in the rpc.remote
request. So one solution is to use the ctx as a proxy, where the processFetch
add streams here and tensorpipe write reuses those streams; 2) we don't want to mess up with current/default streams, unless those streams are set by us, otherwise, the internal logic might interfere with user logic in the user main thread.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @pbelevich, apologize that the simpler approach mentioned in the #54771 won't work, and we will have to go through the solution that passes the ctx instead of the device list to cb_->operator()(message, ctx)
and then propagate the ctx
to both processPythonRemoteCall
and processPythonRRefFetchCall
.
The reason is that we will need to make sure that processPythonRRefFetchCall
synchronizes the same
streams used by TensorPipe pipe->write
for the fetched tensors. More specifically processPythonRRefFetchCall
runs the following steps:
- enter from
cb_->operator()
futureResponseMessage = cb_->operator()(requestMessage); |
- get RRef value in
processPythonRRefFetchCall
pythonRpcHandler.serialize(jit::toPyObject(rref->getValue()))); |
- calls
pipeWrite
to send the responses
pipeWrite( |
- When serializing responses in
tensorpipeSerialize
, grab streams from the ctx. If the ctx does not contain streams for the requested device, it will grab new streams from the stream pool. And then TensorPipe will use this streamm to communicate tensors to the caller. [IMPORTANT] So, we will need to make sure that step 2 above already created such streams in the ctx and these streams are properly synchronized using owner RRef's CUDAEvents.
pytorch/torch/csrc/distributed/rpc/tensorpipe_agent.cpp
Lines 645 to 646 in faa4da4
std::tie(tpMessage, tpBuffers) = | |
tensorpipeSerialize(std::move(rpcMessage), std::move(devices), ctx); |
auto stream = ctx->getStream(tensorDataVec[i].device().index()); |
@@ -493,7 +493,7 @@ bool ProcessGroupAgent::handleRecv(RecvWork& work) { | |||
++serverActiveCalls_; | |||
std::shared_ptr<JitFuture> futureResponse; | |||
try { | |||
futureResponse = cb_->operator()(message); | |||
futureResponse = cb_->operator()(message, {}); // TODO: deviceIndices |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it should be fine to pass dummy context to process group agent, as they won't support CUDA anyway.
@@ -9,13 +9,13 @@ namespace rpc { | |||
|
|||
using namespace torch::distributed::autograd; | |||
|
|||
std::shared_ptr<JitFuture> RequestCallback::operator()(Message& request) const { | |||
std::shared_ptr<JitFuture> RequestCallback::operator()(Message& request, const std::set<c10::DeviceIndex>& deviceIndices) const { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we will need to pass the ctx instead of deviceIndices
, and this should be the same context used by TensorPipeAgent. When this operator is processPythonRRefFetchCall
, it will allocate streams for each device used by the owner RRef.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry that I mislead you in the simpler solution mentioned in #54771
// NB: cannot clear autograd context id here because the processMessage method | ||
// might pause waiting for all RRefs in the arguments to be confirmed by their | ||
// owners and resumne processing in a different thread. Hence, the | ||
// thread_local context id needs to be set and cleared in the thread that | ||
// indeed carries out the processing logic. | ||
return processMessage(request); | ||
return processMessage(request, deviceIndices); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
propagate ctx here as well.
@@ -12,7 +12,7 @@ namespace rpc { | |||
class TORCH_API RequestCallback { | |||
public: | |||
// Invoke the callback. | |||
std::shared_ptr<JitFuture> operator()(Message& request) const; | |||
std::shared_ptr<JitFuture> operator()(Message& request, const std::set<c10::DeviceIndex>& deviceIndices) const; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
@@ -24,7 +24,7 @@ class TORCH_API RequestCallback { | |||
// message containing an exception. Different rpc agent implementations are | |||
// expected to ensure delivery of the response/exception based on their | |||
// implementation specific mechanisms. | |||
virtual std::shared_ptr<JitFuture> processMessage(Message& request) const = 0; | |||
virtual std::shared_ptr<JitFuture> processMessage(Message& request, const std::set<c10::DeviceIndex>& deviceIndices) const = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
@@ -356,7 +357,7 @@ void RequestCallbackImpl::processPythonRemoteCall( | |||
messageId, | |||
responseFuture, | |||
uprc.isAsyncExecution(), | |||
[ownerRRef, rrefId, forkId, markComplete]( | |||
[ownerRRef, rrefId, forkId, markComplete, deviceIndices]( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this can be deviceIndices
if it's simpler, as processPythonRemoteCall
won't modify the ctx, it just need to know which devices/streams to record on. And yes, we can record on the current streams, as the TensorPipeAgent has already setup the current streams to the ones in the ctx for processPythonRemoteCall message.
@@ -411,6 +413,7 @@ void RequestCallbackImpl::processPythonRRefFetchCall( | |||
Message m = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
processPythonRRefFetchCall
will need to take ctx
, because it needs to modify the ctx. By modify I mean call ctx->getStream(device)
on all devices used by the OwnerRRef
@@ -411,6 +413,7 @@ void RequestCallbackImpl::processPythonRRefFetchCall( | |||
Message m = | |||
PythonRRefFetchRet(std::move(*result).toIValues()).toMessage(); | |||
m.setId(messageId); | |||
rref->waitAllDevices(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the correct ordering should be:
On processPythonRemoteCall:
- compute value for the RRef
- record events and store events in RRef
- set RRef value, this will unblock
On processPythonRRefFetchCall:
a. get the RRef value
b. when a is done, we know that 3. is done and hence 2. is done too. So that we can use the cuda events on the RRef.
c. call ctx-getStream(device)
for each device used by the RRef, and then let RRef's CUDAEvent block the stream returned by ctx-getStream(device)
. With this, the subsequent TensorPipe pipe->write will use the same streams and hence the ordering will be preservinged, i.e., TensorPipe comm must happen after all pending CUDA ops that contributes to the RRef value.
cudaEvents_.clear(); | ||
for (auto deviceIndex : deviceIndices) { | ||
at::cuda::CUDAEvent cudaEvent; | ||
cudaEvent.record(at::cuda::getCurrentCUDAStream(deviceIndex)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep, it's OK to record on current streams, as the agent has set up current streams using the ctx.
|
||
void OwnerRRef::waitAllDevices() { | ||
for (at::cuda::CUDAEvent& cudaEvent : cudaEvents_) { | ||
cudaEvent.block(at::cuda::getCurrentCUDAStream(cudaEvent.device_index())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here, we shouldn't block the current streams, because this will be call in processPythonRRefFetchCall
, and with this message, there is no CUDA arguments and hence the agent will setup current streams properly. Instead, we need to grab streams from the ctx and block those streams.
PR #54932 fixed the CUDA RPC for RRef when RRef is created through RPC. But besides that use case, RRef can also be created locally by directly passing in a value, which would bypass the CUDA stream synchronization in #54932. This commit covers the above gap by adding a `devices` argument to RRef constructor. The RRef will then use this argument to choose between `CUDAFutre` and `ivalue::Future` to hold the value. When `devices` is specified and non-empty, `CUDAFuture` will be used, and the `devices` will be passed to that `CUDAFuture`. [ghstack-poisoned]
PR #54932 fixed the CUDA RPC for RRef when RRef is created through RPC. But besides that use case, RRef can also be created locally by directly passing in a value, which would bypass the CUDA stream synchronization in #54932. This commit covers the above gap by adding a `devices` argument to RRef constructor. The RRef will then use this argument to choose between `CUDAFutre` and `ivalue::Future` to hold the value. When `devices` is specified and non-empty, `CUDAFuture` will be used, and the `devices` will be passed to that `CUDAFuture`. Differential Revision: [D28050001](https://our.internmc.facebook.com/intern/diff/D28050001) [ghstack-poisoned]
PR #54932 fixed the CUDA RPC for RRef when RRef is created through RPC. But besides that use case, RRef can also be created locally by directly passing in a value, which would bypass the CUDA stream synchronization in #54932. This commit covers the above gap by adding a `devices` argument to RRef constructor. The RRef will then use this argument to choose between `CUDAFutre` and `ivalue::Future` to hold the value. When `devices` is specified and non-empty, `CUDAFuture` will be used, and the `devices` will be passed to that `CUDAFuture`. Differential Revision: [D28050001](https://our.internmc.facebook.com/intern/diff/D28050001) [ghstack-poisoned]
PR #54932 fixed the CUDA RPC for RRef when RRef is created through RPC. But besides that use case, RRef can also be created locally by directly passing in a value, which would bypass the CUDA stream synchronization in #54932. This commit covers the above gap by adding a `devices` argument to RRef constructor. The RRef will then use this argument to choose between `CUDAFutre` and `ivalue::Future` to hold the value. When `devices` is specified and non-empty, `CUDAFuture` will be used, and the `devices` will be passed to that `CUDAFuture`. Differential Revision: [D28050001](https://our.internmc.facebook.com/intern/diff/D28050001) [ghstack-poisoned]
PR #54932 fixed the CUDA RPC for RRef when RRef is created through RPC. But besides that use case, RRef can also be created locally by directly passing in a value, which would bypass the CUDA stream synchronization in #54932. This commit covers the above gap by adding a `devices` argument to RRef constructor. The RRef will then use this argument to choose between `CUDAFutre` and `ivalue::Future` to hold the value. When `devices` is specified and non-empty, `CUDAFuture` will be used, and the `devices` will be passed to that `CUDAFuture`. Differential Revision: [D28050001](https://our.internmc.facebook.com/intern/diff/D28050001) [ghstack-poisoned]
PR #54932 fixed the CUDA RPC for RRef when RRef is created through RPC. But besides that use case, RRef can also be created locally by directly passing in a value, which would bypass the CUDA stream synchronization in #54932. This commit covers the above gap by adding a `devices` argument to RRef constructor. The RRef will then use this argument to choose between `CUDAFutre` and `ivalue::Future` to hold the value. When `devices` is specified and non-empty, `CUDAFuture` will be used, and the `devices` will be passed to that `CUDAFuture`. ghstack-source-id: eab63b2b4fe4f1fde4c36092f65c035e093786a8 Pull Request resolved: #57085
PR #54932 fixed the CUDA RPC for RRef when RRef is created through RPC. But besides that use case, RRef can also be created locally by directly passing in a value, which would bypass the CUDA stream synchronization in #54932. This commit covers the above gap by adding a `devices` argument to RRef constructor. The RRef will then use this argument to choose between `CUDAFutre` and `ivalue::Future` to hold the value. When `devices` is specified and non-empty, `CUDAFuture` will be used, and the `devices` will be passed to that `CUDAFuture`. Differential Revision: [D28050001](https://our.internmc.facebook.com/intern/diff/D28050001) [ghstack-poisoned]
PR #54932 fixed the CUDA RPC for RRef when RRef is created through RPC. But besides that use case, RRef can also be created locally by directly passing in a value, which would bypass the CUDA stream synchronization in #54932. This commit covers the above gap by adding a `devices` argument to RRef constructor. The RRef will then use this argument to choose between `CUDAFutre` and `ivalue::Future` to hold the value. When `devices` is specified and non-empty, `CUDAFuture` will be used, and the `devices` will be passed to that `CUDAFuture`. Differential Revision: [D28050001](https://our.internmc.facebook.com/intern/diff/D28050001) [ghstack-poisoned]
PR #54932 fixed the CUDA RPC for RRef when RRef is created through RPC. But besides that use case, RRef can also be created locally by directly passing in a value, which would bypass the CUDA stream synchronization in #54932. This commit covers the above gap by adding a `devices` argument to RRef constructor. The RRef will then use this argument to choose between `CUDAFutre` and `ivalue::Future` to hold the value. When `devices` is specified and non-empty, `CUDAFuture` will be used, and the `devices` will be passed to that `CUDAFuture`. Differential Revision: [D28050001](https://our.internmc.facebook.com/intern/diff/D28050001) [ghstack-poisoned]
PR #54932 fixed the CUDA RPC for RRef when RRef is created through RPC. But besides that use case, RRef can also be created locally by directly passing in a value, which would bypass the CUDA stream synchronization in #54932. This commit covers the above gap by adding a `devices` argument to RRef constructor. The RRef will then use this argument to choose between `CUDAFutre` and `ivalue::Future` to hold the value. When `devices` is specified and non-empty, `CUDAFuture` will be used, and the `devices` will be passed to that `CUDAFuture`. Differential Revision: [D28050001](https://our.internmc.facebook.com/intern/diff/D28050001) [ghstack-poisoned]
PR #54932 fixed the CUDA RPC for RRef when RRef is created through RPC. But besides that use case, RRef can also be created locally by directly passing in a value, which would bypass the CUDA stream synchronization in #54932. This commit covers the above gap by adding a `devices` argument to RRef constructor. The RRef will then use this argument to choose between `CUDAFutre` and `ivalue::Future` to hold the value. When `devices` is specified and non-empty, `CUDAFuture` will be used, and the `devices` will be passed to that `CUDAFuture`. Differential Revision: [D28050001](https://our.internmc.facebook.com/intern/diff/D28050001) [ghstack-poisoned]
PR #54932 fixed the CUDA RPC for RRef when RRef is created through RPC. But besides that use case, RRef can also be created locally by directly passing in a value, which would bypass the CUDA stream synchronization in #54932. This commit covers the above gap by adding a `devices` argument to RRef constructor. The RRef will then use this argument to choose between `CUDAFutre` and `ivalue::Future` to hold the value. When `devices` is specified and non-empty, `CUDAFuture` will be used, and the `devices` will be passed to that `CUDAFuture`. Differential Revision: [D28050001](https://our.internmc.facebook.com/intern/diff/D28050001) [ghstack-poisoned]
PR #54932 fixed the CUDA RPC for RRef when RRef is created through RPC. But besides that use case, RRef can also be created locally by directly passing in a value, which would bypass the CUDA stream synchronization in #54932. This commit covers the above gap by adding a `devices` argument to RRef constructor. The RRef will then use this argument to choose between `CUDAFutre` and `ivalue::Future` to hold the value. When `devices` is specified and non-empty, `CUDAFuture` will be used, and the `devices` will be passed to that `CUDAFuture`. Differential Revision: [D28050001](https://our.internmc.facebook.com/intern/diff/D28050001) [ghstack-poisoned]
PR #54932 fixed the CUDA RPC for RRef when RRef is created through RPC. But besides that use case, RRef can also be created locally by directly passing in a value, which would bypass the CUDA stream synchronization in #54932. This commit covers the above gap by adding a `devices` argument to RRef constructor. The RRef will then use this argument to choose between `CUDAFutre` and `ivalue::Future` to hold the value. When `devices` is specified and non-empty, `CUDAFuture` will be used, and the `devices` will be passed to that `CUDAFuture`. Differential Revision: [D28050001](https://our.internmc.facebook.com/intern/diff/D28050001) [ghstack-poisoned]
PR #54932 fixed the CUDA RPC for RRef when RRef is created through RPC. But besides that use case, RRef can also be created locally by directly passing in a value, which would bypass the CUDA stream synchronization in #54932. This commit covers the above gap by adding a `devices` argument to RRef constructor. The RRef will then use this argument to choose between `CUDAFutre` and `ivalue::Future` to hold the value. When `devices` is specified and non-empty, `CUDAFuture` will be used, and the `devices` will be passed to that `CUDAFuture`. Differential Revision: [D28050001](https://our.internmc.facebook.com/intern/diff/D28050001) [ghstack-poisoned]
PR #54932 fixed the CUDA RPC for RRef when RRef is created through RPC. But besides that use case, RRef can also be created locally by directly passing in a value, which would bypass the CUDA stream synchronization in #54932. This commit covers the above gap by adding a `devices` argument to RRef constructor. The RRef will then use this argument to choose between `CUDAFutre` and `ivalue::Future` to hold the value. When `devices` is specified and non-empty, `CUDAFuture` will be used, and the `devices` will be passed to that `CUDAFuture`. Differential Revision: [D28050001](https://our.internmc.facebook.com/intern/diff/D28050001) [ghstack-poisoned]
PR #54932 fixed the CUDA RPC for RRef when RRef is created through RPC. But besides that use case, RRef can also be created locally by directly passing in a value, which would bypass the CUDA stream synchronization in #54932. This commit covers the above gap by adding a `devices` argument to RRef constructor. The RRef will then use this argument to choose between `CUDAFutre` and `ivalue::Future` to hold the value. When `devices` is specified and non-empty, `CUDAFuture` will be used, and the `devices` will be passed to that `CUDAFuture`. ghstack-source-id: 0dc00c79a8c28d3d53eba8b1ecac0fba04e8b0d4 Pull Request resolved: #57085
Summary: Pull Request resolved: #57085 PR #54932 fixed the CUDA RPC for RRef when RRef is created through RPC. But besides that use case, RRef can also be created locally by directly passing in a value, which would bypass the CUDA stream synchronization in #54932. This commit covers the above gap by adding a `devices` argument to RRef constructor. The RRef will then use this argument to choose between `CUDAFutre` and `ivalue::Future` to hold the value. When `devices` is specified and non-empty, `CUDAFuture` will be used, and the `devices` will be passed to that `CUDAFuture`. Test Plan: Imported from OSS Reviewed By: lw Differential Revision: D28050001 Pulled By: mrshenli fbshipit-source-id: 2316b419fa69aa4dcd444050f0b74e61c3d0af1e
Summary: Pull Request resolved: pytorch#56895 PR pytorch#54932 fixes CUDA stream synchronization between RPC-created OwnerRRef and UserRRef when `to_here()` is invoked. However, there are two more gaps. 1. RRef value can be accessed on the owner directly through `local_value`, which bypasses the fix in pytorch#54932. 2. When RRef is created directly through RRef ctor instead of RPC, the OwnerRRef won't be able to correctly record CUDA events. This PR fixes 1 by letting current streams wait for RRef recorded CUDA events before returning the value in `RRef::getValue()`. For 2, more discussions is needed to decide whether we should add a `devices` argument to RRef ctor, or should RRef ctor inspect the given values. Test Plan: Imported from OSS Reviewed By: lw Differential Revision: D27992775 Pulled By: mrshenli fbshipit-source-id: ed0e5bfbf715460208c85e46dd3317deef17f8fe
Summary: Pull Request resolved: pytorch#54932 Test Plan: Imported from OSS Reviewed By: mrshenli Differential Revision: D27684022 Pulled By: pbelevich fbshipit-source-id: 2bae51ab6649258d0219ca4e9dbbf45ac6a76c28
Summary: Pull Request resolved: pytorch#56895 PR pytorch#54932 fixes CUDA stream synchronization between RPC-created OwnerRRef and UserRRef when `to_here()` is invoked. However, there are two more gaps. 1. RRef value can be accessed on the owner directly through `local_value`, which bypasses the fix in pytorch#54932. 2. When RRef is created directly through RRef ctor instead of RPC, the OwnerRRef won't be able to correctly record CUDA events. This PR fixes 1 by letting current streams wait for RRef recorded CUDA events before returning the value in `RRef::getValue()`. For 2, more discussions is needed to decide whether we should add a `devices` argument to RRef ctor, or should RRef ctor inspect the given values. Test Plan: Imported from OSS Reviewed By: lw Differential Revision: D27992775 Pulled By: mrshenli fbshipit-source-id: ed0e5bfbf715460208c85e46dd3317deef17f8fe
Summary: Pull Request resolved: pytorch#57085 PR pytorch#54932 fixed the CUDA RPC for RRef when RRef is created through RPC. But besides that use case, RRef can also be created locally by directly passing in a value, which would bypass the CUDA stream synchronization in pytorch#54932. This commit covers the above gap by adding a `devices` argument to RRef constructor. The RRef will then use this argument to choose between `CUDAFutre` and `ivalue::Future` to hold the value. When `devices` is specified and non-empty, `CUDAFuture` will be used, and the `devices` will be passed to that `CUDAFuture`. Test Plan: Imported from OSS Reviewed By: lw Differential Revision: D28050001 Pulled By: mrshenli fbshipit-source-id: 2316b419fa69aa4dcd444050f0b74e61c3d0af1e
This PR fixes #54771
c10::Event
s are stored in OwnerRRef andOwnerRRef::recordAllStreams
,OwnerRRef::blockAllStreams
are introduced.LazyStreamContext
is passed fromtensorpipe_agent.cpp
toRequestCallbackImpl::processPythonRRefFetchCall
andRequestCallbackImpl::processPythonRemoteCall
to make proper streams synchronization.CUDAEvent
was replaced withc10::Event
,CUDAStream
withc10::Stream
, deleteCudaLazyStreamContext
and makeLazyStreamContext
generic, device-agnostic.The follow up is #55757
Stack from ghstack:
Differential Revision: D27684022