-
Notifications
You must be signed in to change notification settings - Fork 21.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Collective dispatching from Process Group #91257
Conversation
[ghstack-poisoned]
🔗 Helpful Links🧪 See artifacts and rendered test results at hud.pytorch.org/pr/91257
Note: Links to docs will display an error until the docs builds have been completed. ✅ No FailuresAs of commit f005be3: This comment was automatically generated by Dr. CI and updates every 15 minutes. |
ghstack-source-id: 0b6bbfc15dbf592ad81d6ba7a5e096fff7a6e0b0 Pull Request resolved: #91257
ghstack-source-id: 0e019441fec7fadb9dc89704e8f9a582e25691a0 Pull Request resolved: #91257
ghstack-source-id: 7deb6e0bb34fcd6bbca9ce961296728466d3678c Pull Request resolved: #91257
ghstack-source-id: 288c7f9bff5913e8f58102ca89efe5522219ca5e Pull Request resolved: #91257
@H-Huang has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the clean-up! LGTM.
Please see my inline comment.
output, | ||
input, | ||
process_group, | ||
output_split_sizes, | ||
input_split_sizes, | ||
opts.timeout.count()); | ||
} | ||
|
||
void monitored_barrier( | ||
const c10::intrusive_ptr<ProcessGroup>& process_group, | ||
const BarrierOptions& opts, | ||
bool wait_all_ranks) { | ||
static auto op = c10::Dispatcher::singleton() | ||
.findSchemaOrThrow("c10d::monitored_barrier_", "") | ||
.typed<void( | ||
at::Tensor, | ||
const c10::intrusive_ptr<::c10d::ProcessGroup>&, | ||
const std::vector<int64_t>&, | ||
int64_t, | ||
bool)>(); | ||
// Default to using cpu implementation, monitored barrier is only for GLOO | ||
at::Tensor tensor = at::empty({0}, at::TensorOptions().device(at::kCPU)); | ||
op.call( | ||
tensor, | ||
process_group, | ||
opts.device_ids, | ||
opts.timeout.count(), | ||
wait_all_ranks); | ||
} | ||
|
||
c10::intrusive_ptr<Work> barrier( | ||
const c10::intrusive_ptr<ProcessGroup>& process_group, | ||
const BarrierOptions& opts) { | ||
static at::Tensor tensor; | ||
// TODO: if nccl was specified then use it | ||
if (process_group->getBackendType() == | ||
c10d::ProcessGroup::BackendType::NCCL) { | ||
// set cuda tensor | ||
tensor = at::empty( | ||
{1}, at::TensorOptions().device(at::DeviceType::CUDA).dtype(at::kByte)); | ||
} else { | ||
// Default to using cpu implementation | ||
tensor = at::empty( | ||
{1}, at::TensorOptions().device(at::DeviceType::CPU).dtype(at::kByte)); | ||
} | ||
|
||
static auto op = c10::Dispatcher::singleton() | ||
.findSchemaOrThrow("c10d::barrier", "") | ||
.typed<c10::intrusive_ptr<::c10d::Work>( | ||
at::Tensor, | ||
const c10::intrusive_ptr<::c10d::ProcessGroup>&, | ||
const std::vector<int64_t>&, | ||
int64_t)>(); | ||
|
||
return op.call(tensor, process_group, opts.device_ids, opts.timeout.count()); | ||
} | ||
|
||
c10::intrusive_ptr<Work> send( | ||
const c10::intrusive_ptr<ProcessGroup>& process_group, | ||
at::TensorList tensors, | ||
int64_t dstRank, | ||
int64_t tag) { | ||
static auto op = c10::Dispatcher::singleton() | ||
.findSchemaOrThrow("c10d::send", "") | ||
.typed<c10::intrusive_ptr<::c10d::Work>( | ||
at::TensorList, | ||
const c10::intrusive_ptr<::c10d::ProcessGroup>&, | ||
int64_t, | ||
int64_t)>(); | ||
return op.call(tensors, process_group, dstRank, tag); | ||
} | ||
|
||
c10::intrusive_ptr<Work> recv( | ||
const c10::intrusive_ptr<ProcessGroup>& process_group, | ||
at::TensorList tensors, | ||
int64_t srcRank, | ||
int64_t tag) { | ||
static auto op = c10::Dispatcher::singleton() | ||
.findSchemaOrThrow("c10d::recv_", "") | ||
.typed<c10::intrusive_ptr<::c10d::Work>( | ||
at::TensorList, | ||
const c10::intrusive_ptr<::c10d::ProcessGroup>&, | ||
int64_t, | ||
int64_t)>(); | ||
return op.call(tensors, process_group, srcRank, tag); | ||
} | ||
|
||
c10::intrusive_ptr<Work> recv_any_source( | ||
const c10::intrusive_ptr<ProcessGroup>& process_group, | ||
at::TensorList tensors, | ||
int64_t tag) { | ||
static auto op = c10::Dispatcher::singleton() | ||
.findSchemaOrThrow("c10d::recv_any_source_", "") | ||
.typed<c10::intrusive_ptr<::c10d::Work>( | ||
at::TensorList, | ||
const c10::intrusive_ptr<::c10d::ProcessGroup>&, | ||
int64_t)>(); | ||
return op.call(tensors, process_group, tag); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After removing this code block, should we also remove the corresponding API declarations in Ops.hpp?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point! Thanks
Fixes #90932 Fixes #90659 Remove redundant collection operation definitions by calling the ops directly from `ProcessGroup` Context: #86225 Differential Revision: [D42854676](https://our.internmc.facebook.com/intern/diff/D42854676) [ghstack-poisoned]
ghstack-source-id: 75124aea6c1b79ed99b43f5790aa48b9b97d6c75 Pull Request resolved: #91257
@H-Huang has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator. |
@pytorchbot merge |
Merge startedYour change will be merged once all checks pass (ETA 0-4 Hours). Learn more about merging in the wiki. Questions? Feedback? Please reach out to the PyTorch DevX Team |
In #91257, we removed direct calls to methods in ops.cpp, so this is updating to also remove ops.hpp Pull Request resolved: #94532 Approved by: https://github.com/kwen2501
Stack from ghstack (oldest at bottom):
Fixes #90932
Fixes #90659
Remove redundant collection operation definitions by calling the ops directly from
ProcessGroup
Context:
#86225
Differential Revision: D42854676