Skip to content

Commit

Permalink
[wip][not for review] Fix ProcessGroupNCCL profiling when profiler is…
Browse files Browse the repository at this point in the history
… not run with use_cuda

Still does not work with use_cuda=True. Debugging the deadlock
currently.

Differential Revision: [D25368322](https://our.internmc.facebook.com/intern/diff/D25368322/)

ghstack-source-id: 118016665
Pull Request resolved: #48946
  • Loading branch information
rohan-varma committed Dec 7, 2020
1 parent a39398b commit 27bf9c5
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 24 deletions.
24 changes: 12 additions & 12 deletions torch/lib/c10d/ProcessGroupNCCL.cpp
Expand Up @@ -462,7 +462,7 @@ ProcessGroupNCCL::ProcessGroupNCCL(
if (blockingWait_ && asyncErrorHandling_) {
LOG(INFO) << "[Rank " << rank_
<< "] NCCL_BLOCKING_WAIT and NCCL_ASYNC_ERROR_HANDLING "
<< "should not both be enabled. "
<< "should not both be enabled. "
<< "Only NCCL_BLOCKING_WAIT is being used in this process.";
asyncErrorHandling_ = false;
}
Expand Down Expand Up @@ -1071,17 +1071,6 @@ c10::intrusive_ptr<ProcessGroup::Work> ProcessGroupNCCL::collective(
work->outputs_ = std::make_shared<std::vector<at::Tensor>>(outputs);
work->futureNCCLCallbackStreams_ = futureNCCLCallbackStreams_;

if (work->recordFunctionEndCallback_) {
// recordFunctionEndCallback_ is normally called in fininsh() function by
// base class, but since finish is not called by WorkNCCL, we schedule this
// function to be run when work is done.
// Note when can_profile is false, profilingTitle is not provided and so,
// recordFunctionEndCallback_ is not set.
work->getFuture()->addCallback(std::move(work->recordFunctionEndCallback_));
}



at::cuda::OptionalCUDAGuard gpuGuard;

pre(ncclStreams_[key]);
Expand Down Expand Up @@ -1126,6 +1115,17 @@ c10::intrusive_ptr<ProcessGroup::Work> ProcessGroupNCCL::collective(
work->opTimeout_ = opTimeout_;
work->store_ = store_;

if (work->recordFunctionEndCallback_) {
// recordFunctionEndCallback_ is normally called in fininsh() function by
// base class, but since finish is not called by WorkNCCL, we schedule this
// function to be run when work is done. Note that addCallback() onto the
// Work's futureNCCL is not useful here, as it would just run the callback
// inline.
// Note when can_profile is false, profilingTitle is not provided and so,
// recordFunctionEndCallback_ is not set.
work->recordFunctionEndCallback_();
}

if (asyncErrorHandling_) {
workEnqueue(work);
}
Expand Down
15 changes: 3 additions & 12 deletions torch/testing/_internal/distributed/distributed_test.py
Expand Up @@ -2474,25 +2474,19 @@ def _test_reduce_multigpu_helper(
_build_tensor(src + 1, master_value).cuda(device=i)
for i in rank_to_GPU[rank]
]
# TODO: Setting expect_event=False to disable profiling
# tests. Once https://github.com/pytorch/pytorch/issues/48127
# is addressed, this should be reverted.
self.call_dist_op(
"reduce", False, dist.reduce_multigpu, tensors, src, op, group_id,
expect_event=False)
expect_event=len(tensors) == 1)
expected_tensor = _build_tensor(src + 1, expected_value)
self.assertEqual(tensors[0], expected_tensor)
else:
tensors = [
_build_tensor(src + 1, worker_value).cuda(device=i)
for i in rank_to_GPU[rank]
]
# TODO: Setting expect_event=False to disable profiling
# tests. Once https://github.com/pytorch/pytorch/issues/48127
# is addressed, this should be reverted.
self.call_dist_op(
"reduce", False, dist.reduce_multigpu, tensors, src, op, group_id,
expect_event=False)
expect_event=len(tensors) == 1)

self._barrier()

Expand Down Expand Up @@ -2532,13 +2526,10 @@ def _test_all_gather_multigpu_helper(self, group, group_id, rank, rank_to_GPU, d
for gpu in rank_to_GPU[rank]:
output_tensors.append([t.cuda(device=gpu) for t in output_per_gpu])
expected_output.append([t.cuda(device=gpu) for t in expected_per_gpu])
# TODO: Setting expect_event=False to disable profiling
# tests. Once https://github.com/pytorch/pytorch/issues/48127
# is addressed, this should be reverted.
self.call_dist_op(
"all_gather", False,
dist.all_gather_multigpu, output_tensors, tensors, group_id,
expect_event=False)
expect_event=len(expected_output) == 1)
self.assertEqual(output_tensors, expected_output)

self._barrier()
Expand Down

0 comments on commit 27bf9c5

Please sign in to comment.