-
Notifications
You must be signed in to change notification settings - Fork 22.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Futures to ProcessGroupGloo #57818
Conversation
[ghstack-poisoned]
💊 CI failures summary and remediationsAs of commit ca3a079 (more details on the Dr. CI page):
🕵️ 1 new failure recognized by patternsThe following CI failures do not appear to be due to upstream breakages: test (1/1)Step: "Test PyTorch" (full log | diagnosis details | 🔁 rerun)
|
ghstack-source-id: a45d96b1ca65dfdb83ccfe3f8d84c6fea8533d60 Pull Request resolved: #57818
[ghstack-poisoned]
[ghstack-poisoned]
ghstack-source-id: ab4f05c1ad0888d067994001d24a0b9f139c2323 Pull Request resolved: #57818
[ghstack-poisoned]
ghstack-source-id: 1985c72ce50b896b287308f31cfa959fc72f9b0c Pull Request resolved: #57818
[ghstack-poisoned]
[ghstack-poisoned]
[ghstack-poisoned]
[ghstack-poisoned]
[ghstack-poisoned]
ghstack-source-id: 9347ea446747d672c8c4bda81e033d9e60774c94 Pull Request resolved: #57818
@agolynski has imported this pull request. If you are a Facebook employee, you can view this diff on Phabricator. |
|
||
// FIXME: We need to call it here since Future completion requires all | ||
// the work to be synchronized to CUDA. | ||
work->synchronize(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this only specific to GLOO?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it concerns all backends that implement synchronize()
you can call synchronize() to make sure that tensors arrive to GPU and usable there without resorting to stream/event syncronization or you can CUDA mechanisms to syncronize.
@mrshenli requested that we use cuda futures to deal with such syncronizations which would be more efficient.
torch/lib/c10d/ProcessGroupGloo.cpp
Outdated
} | ||
|
||
namespace { | ||
c10::intrusive_ptr<c10::ivalue::Future> CreateFutureForOutput( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: AsOutput?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
try { | ||
fut->wait(); | ||
} catch (const std::exception& ex) { | ||
std::cerr << "Exception received: " << ex.what() << std::endl; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand that this is not new code, but shouldn't we use LOG(ERROR) here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we abort here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LOG(ERROR) here
Done
Shall we abort here?
Then we won't be able to test exception throwing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the merge!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for adding this! I left some questions for consolidating Work/Future APIs/fields. I am OK with addressing some of those comments in follow up PRs, but let's not stop here can call the consolidation done.
One example is that, after the consolidation, at least we shouldn't have both Work and Future keeping completed_
/ exception_
field.
cc @lw
w = pg.broadcast(xs) | ||
w.wait() | ||
output = w.result() | ||
fut = pg.broadcast(xs).get_future() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
curious, do we still have test coverage for work
that makes sure that APIs exposed from work
still work the same way?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point. Added c++ and Py tests for work API
@@ -3591,8 +3602,8 @@ def allreduce_hook( | |||
) | |||
|
|||
@unittest.skipIf( | |||
BACKEND != "mpi" and BACKEND != 'nccl', | |||
"get_future is only supported on mpi and nccl" | |||
BACKEND != "mpi" and BACKEND != "nccl" and BACKEND != "gloo", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we still need this if we support all three backends now? Or is there any 3rd-party backend uses this test file?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think any 3rd party backends use this file, but we also test 'test' and RR backend. Although they are not covered by this test, I prefer to be explicit
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see
torch/lib/c10d/ProcessGroupGloo.cpp
Outdated
} | ||
|
||
namespace { | ||
c10::intrusive_ptr<c10::ivalue::Future> CreateFutureForOutput( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ReturnFutureWithOutput -> returnFutureWithOutput to stay consistent with other function names in this file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
torch/lib/c10d/ProcessGroupGloo.cpp
Outdated
c10::ListType::create(c10::TensorType::get())); | ||
} | ||
|
||
void ReturnFutureWithOutput( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
std::vector<std::vector<at::Tensor>> outputTensors, | ||
const char* profilingTitle, | ||
const c10::optional<std::vector<at::Tensor>>& inputTensors) | ||
: ProcessGroup::Work(-1, OpType::UNKNOWN, profilingTitle, inputTensors), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(This is prior to this PR): Curious, any reason for using UNKNOWN OpType here? I saw UNKNOWN is also used for recv, but I don't recall the reason for that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've looked around and seems like optype is only used in NCCL process group for p2p comm at #45873. Maybe there are other uses which I haven't found?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is already prior to this PR, it shouldn't block this PR from landing. Let's add an BE issue to track this.
work->finish(eptr); | ||
|
||
// FIXME: We need to call it here since Future completion requires all | ||
// the work to be synchronized to CUDA. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we now have CUDA support in ivalue::Future
, should this synchronization work be done by ivalue::Future
instead? cc @lw
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
completely agree.
BTW, we used to do some other things in synchronize() (e.g. copying tensors), however seems like now all derived versions of synchronize() match ivalue::Future::synchronizeWithCurrentStreams()
The plan is to deprecate work API in 1.10, so we don't have any more versions of synchronize() and then use
ivalue::Future::synchronizeWithCurrentStreams() directly.
cc @lw
|
||
void ProcessGroupGloo::AsyncWork::finishWorkGlooError(std::exception_ptr eptr) { | ||
future_->setError(eptr); | ||
finish(eptr); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
similarly, since ivalue::Future
has similar concepts and states for completed_
/ exception_
, shall we just use that instead? cc @lw
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we exposed completed_ and exception_ as protected members of ProcessGroup::Work, so potentially they can be referred to by 3rd parties. Let's make deprecation announcement for 1.10 and then it'd be easier to make better Work/Future merge which can also include CUDA synchronization.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's add a tracking issue for this as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't get it: how can third-parties access the completed_
and exception_
fields? If they are protected it means they can only be accessed by subclasses, but I don't see the point of users subclassing Work: even if they did, ProcessGroupGloo would still return its own classes, and not the user ones, so what does one gain? Moreover, the trailing underscore in the fields names is C++'s "convention" for private fields. All in all, I don't think we should worry about users depending on it.
I agree with Shen that now, effectively, we're doing the same thing twice within Work: we have two ways to mark completion (Work's own one, and the one within Future), etc. One way to go about it is to redefine Work so that it just "forwards" each of its methods to the Future it holds (e.g., Work::wait() basically just calls this->future_->wait()).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't get it: how can third-parties access the
completed_
andexception_
fields? If they are protected it means they can only be accessed by subclasses, but I don't see the point of users subclassing Work: even if they did, ProcessGroupGloo would still return its own classes, and not the user ones, so what does one gain?
MPI/GLOO subclass Work, same way 3rd party implementers can also subclass Work and use completed_
and exception_
?
ProcessGroupGloo ProcessGroupMPI Work subclasses use (and 3rd party can) PG::Work's completed_
and exception_
, so if we remove them 3rd party Work classes will break.
Moreover, the trailing underscore in the fields names is C++'s "convention" for private fields.
I thought trailing underscore just means it's a member, not necessarily private?
All in all, I don't think we should worry about users depending on it.
I think you are right, but it's good to be cautious.
I agree with Shen that now, effectively, we're doing the same thing twice within Work: we have two ways to mark completion (Work's own one, and the one within Future), etc. One way to go about it is to redefine Work so that it just "forwards" each of its methods to the Future it holds (e.g., Work::wait() basically just calls this->future_->wait()).
the plan is to remove fields from PG::Work and replace it a future. Note that right now, MPI and GLOO subclasses hold a future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, I hadn't understood the completed_
and exception_
fields came from the root "abstract" base class Work (which, well, isn't actually abstract it seems). Indeed, people who created their own ProcessGroup implementations (as extensions) would be broken by any change there. But I think we eventually will need to do it, right?
|
||
void ProcessGroupGloo::AsyncWork::finishWorkGloo() { | ||
ReturnFutureWithOutput(future_, outputTensors_); | ||
finish(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's leave it for 1.10?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep, looks OK to me, as we are too close to branch cut.
Differential Revision: [D28304171](https://our.internmc.facebook.com/intern/diff/D28304171) [ghstack-poisoned]
ghstack-source-id: 536de83ce4543a5bfb8f64409317675b404192be Pull Request resolved: #57818
Differential Revision: [D28304171](https://our.internmc.facebook.com/intern/diff/D28304171) [ghstack-poisoned]
ghstack-source-id: 9441cd0457c2d49b287b6833fb305012282819d4 Pull Request resolved: #57818
std::vector<std::vector<at::Tensor>> outputTensors, | ||
const char* profilingTitle, | ||
const c10::optional<std::vector<at::Tensor>>& inputTensors) | ||
: ProcessGroup::Work(-1, OpType::UNKNOWN, profilingTitle, inputTensors), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is already prior to this PR, it shouldn't block this PR from landing. Let's add an BE issue to track this.
|
||
void ProcessGroupGloo::AsyncWork::finishWorkGlooError(std::exception_ptr eptr) { | ||
future_->setError(eptr); | ||
finish(eptr); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's add a tracking issue for this as well.
|
||
void ProcessGroupGloo::AsyncWork::finishWorkGloo() { | ||
ReturnFutureWithOutput(future_, outputTensors_); | ||
finish(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep, looks OK to me, as we are too close to branch cut.
@@ -3591,8 +3602,8 @@ def allreduce_hook( | |||
) | |||
|
|||
@unittest.skipIf( | |||
BACKEND != "mpi" and BACKEND != 'nccl', | |||
"get_future is only supported on mpi and nccl" | |||
BACKEND != "mpi" and BACKEND != "nccl" and BACKEND != "gloo", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a good first step to add the getFuture
method to Gloo too, but we agree that the final solution will be to remove that method and to make Work itself be a Future subclass right?
torch/lib/c10d/ProcessGroupGloo.cpp
Outdated
} | ||
|
||
void returnFutureWithOutput( | ||
c10::intrusive_ptr<c10::ivalue::Future> future, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: no need to pass the pointer by value, this function could just take a Future&
and thus avoid the copy and the consequent refcount increase+decrease.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
if (outputTensors.size() > 1) { | ||
return c10::make_intrusive<c10::ivalue::Future>( | ||
c10::ListType::create(c10::ListType::create(c10::TensorType::get()))); | ||
} | ||
return c10::make_intrusive<c10::ivalue::Future>( | ||
c10::ListType::create(c10::TensorType::get())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason for this? I find it odd that a type information depends on a runtime condition. Types are typically useful to "predict what would happen before it happens" (e.g., static analysis) but by doing this we defeat that purpose.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lw agreed that this is not good for static analysis.
right now, the only return value work supports is a list of tensors, however some operations need to return list of lists (c10d libs has been very spotty on return value coverage and we never properly encountered this case). also, I prefer this PR to be BC, i.e. in most cases we still need to return a list.
how would you prefer to address this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not very knowledgeable on the PG API. My first impression is that it would be more "regular" and predictable if here we always used a list of tensors, even in case of a singleton.
If however you say that different methods of a PG need to return different types, that's fine, and some can return a list of tensors while other can return a singleton tensor. However I'd like if a certain method always returns the same type, no matter its arguments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agreed. I don't mind returning a singleton tensor wrapped in a list and we already do, but how would we deal with returning list of lists?
|
||
void ProcessGroupGloo::AsyncWork::finishWorkGlooError(std::exception_ptr eptr) { | ||
future_->setError(eptr); | ||
finish(eptr); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't get it: how can third-parties access the completed_
and exception_
fields? If they are protected it means they can only be accessed by subclasses, but I don't see the point of users subclassing Work: even if they did, ProcessGroupGloo would still return its own classes, and not the user ones, so what does one gain? Moreover, the trailing underscore in the fields names is C++'s "convention" for private fields. All in all, I don't think we should worry about users depending on it.
I agree with Shen that now, effectively, we're doing the same thing twice within Work: we have two ways to mark completion (Work's own one, and the one within Future), etc. One way to go about it is to redefine Work so that it just "forwards" each of its methods to the Future it holds (e.g., Work::wait() basically just calls this->future_->wait()).
yes |
Differential Revision: [D28304171](https://our.internmc.facebook.com/intern/diff/D28304171) [ghstack-poisoned]
ghstack-source-id: 7a3a006bf6cc234425b2a6d8828fdbd711ccef9d Pull Request resolved: #57818
@agolynski has imported this pull request. If you are a Facebook employee, you can view this diff on Phabricator. |
@agolynski merged this pull request in 4ef9426. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A bit late to this, but are we planning to also add support for send/recv futures and testing? It looks like we would need to add that to Send/RecvWork.
yes, we could add handling that 1.10. Send/RecvWork (both GLOO and MPI) are running in async mode, so they would require some more work (maybe sacrificing some efficiency or API or a combination) @rohan-varma are there usecases for send/recv futures? |
Summary: Pull Request resolved: pytorch#57818 Test Plan: Imported from OSS Reviewed By: SciPioneer Differential Revision: D28304171 Pulled By: agolynski fbshipit-source-id: dbf7f5538890d138582831aa0279ede89619ea1e
Stack from ghstack:
Differential Revision: D28304171