Skip to content

Commit

Permalink
[NCCL][Test Only] test send/recv on OSS
Browse files Browse the repository at this point in the history
Pull Request resolved: #45140

Test only
ghstack-source-id: 113383015

Differential Revision: [D23844388](https://our.internmc.facebook.com/intern/diff/D23844388/)
  • Loading branch information
mingzhe0908 committed Oct 1, 2020
1 parent 91c876e commit 83707a4
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 18 deletions.
34 changes: 32 additions & 2 deletions torch/csrc/cuda/nccl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
#include <type_traits>
#include <unordered_map>

#include <unistd.h>
#include <sys/syscall.h>
#define gettid() syscall(SYS_gettid)


ncclComm_t* to_nccl_comm(torch::cuda::nccl::ncclComm_t* var) {
return reinterpret_cast<ncclComm_t*>(var);
Expand Down Expand Up @@ -99,6 +103,31 @@ ncclRedOp_t to_nccl_red_op(int var) {
return (ncclRedOp_t)(var);
}

#define NCCL_GROUP_START() \
do { \
NCCL_CHECK(from_nccl_result(ncclGroupStart())); \
fprintf( \
stdout, \
"[%d:%d] NCCL group start in: %s:%d\n", \
getpid(), \
gettid(), \
__FILE__, \
__LINE__); \
} while (0)


#define NCCL_GROUP_END() \
do { \
NCCL_CHECK(from_nccl_result(ncclGroupEnd())); \
fprintf( \
stdout, \
"[%d:%d] NCCL group end in: %s:%d\n", \
getpid(), \
gettid(), \
__FILE__, \
__LINE__); \
} while (0)

namespace torch {
namespace cuda {
namespace nccl {
Expand All @@ -111,12 +140,12 @@ struct AutoNcclGroup {
AutoNcclGroup() {
(c10::cuda::CUDACachingAllocator::getFreeMutex())->lock();
#if defined(NCCL_MAJOR) && (NCCL_MAJOR >= 2)
NCCL_CHECK(from_nccl_result(ncclGroupStart()));
NCCL_GROUP_START();
#endif
}
~AutoNcclGroup() {
#if defined(NCCL_MAJOR) && (NCCL_MAJOR >= 2)
NCCL_CHECK(from_nccl_result(ncclGroupEnd()));
NCCL_GROUP_END();
#endif
(c10::cuda::CUDACachingAllocator::getFreeMutex())->unlock();
}
Expand Down Expand Up @@ -420,6 +449,7 @@ void broadcast(
count_max,
")");
ncclComm_t comm = comms[i];
std::cout << "[" << getpid() << ":"<< gettid() << "]issue input: " << i << "num_tensors:" << num_tensors << std::endl;
NCCL_CHECK(from_nccl_result(ncclBcast(
tensors[i].data_ptr(), numel, data_type, 0, *(to_nccl_comm(&comm)), stream)));
}
Expand Down
3 changes: 0 additions & 3 deletions torch/lib/c10d/NCCLUtils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
#define ENABLE_NCCL_ERROR_CHECKING
#endif

// Fix build issues with NCCL P2P - until then disable NCCL send/recv.
#if defined(ENABLE_NCCL_A2A) && (ENABLE_NCCL_A2A == 1)
// P2P is enabled only for NCCL versions 2.7+ since ncclSend()
// and ncclRecv() are not supported in earlier versions.
#if defined(NCCL_MAJOR) && (NCCL_MAJOR == 2) && defined(NCCL_MINOR) && \
Expand All @@ -27,7 +25,6 @@
#elif defined(NCCL_MAJOR) && (NCCL_MAJOR >= 3)
#define ENABLE_NCCL_P2P_SUPPORT
#endif
#endif

// Macro to throw on a non-successful NCCL return value.
#define C10D_NCCL_CHECK(cmd) \
Expand Down
50 changes: 38 additions & 12 deletions torch/lib/c10d/ProcessGroupNCCL.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,28 @@
#include <c10/cuda/CUDAGuard.h>

#include <c10d/Utils.hpp>

#define NCCL_GROUP_START() \
do { \
C10D_NCCL_CHECK(ncclGroupStart()); \
fprintf( \
stdout, \
"NCCL group start in: %s:%d\n", \
__FILE__, \
__LINE__); \
} while (0)


#define NCCL_GROUP_END() \
do { \
C10D_NCCL_CHECK(ncclGroupEnd()); \
fprintf( \
stdout, \
"NCCL group end in: %s:%d\n", \
__FILE__, \
__LINE__); \
} while (0)

namespace c10d {

constexpr const char* const kNCCLAbortedCommStoreKey = "NCCLABORTEDCOMM";
Expand All @@ -23,12 +45,12 @@ struct AutoNcclGroup {
AutoNcclGroup() {
(c10::cuda::CUDACachingAllocator::getFreeMutex())->lock();
#if defined(NCCL_MAJOR) && (NCCL_MAJOR >= 2)
C10D_NCCL_CHECK(ncclGroupStart());
NCCL_GROUP_START();
#endif
}
~AutoNcclGroup() noexcept(false) {
#if defined(NCCL_MAJOR) && (NCCL_MAJOR >= 2)
C10D_NCCL_CHECK(ncclGroupEnd());
NCCL_GROUP_END();
#endif
(c10::cuda::CUDACachingAllocator::getFreeMutex())->unlock();
}
Expand Down Expand Up @@ -173,10 +195,11 @@ ncclResult_t ncclAlltoall(
ncclDataType_t type,
ncclComm_t comm,
cudaStream_t stream) {
std::cout << "ncclAlltoall?" << std::endl;
int numranks;
size_t rankdiff = count * size;
C10D_NCCL_CHECK(ncclCommCount(comm, &numranks));
C10D_NCCL_CHECK(ncclGroupStart());
NCCL_GROUP_START();
for (int r = 0; r < numranks; r++) {
// NCCL uses 0 byte message for synchronization
// Avoid send/recv when message size is zero
Expand All @@ -187,7 +210,7 @@ ncclResult_t ncclAlltoall(
((char*)recvbuff) + r * rankdiff, count, type, r, comm, stream));
}
}
C10D_NCCL_CHECK(ncclGroupEnd());
NCCL_GROUP_END();
return ncclSuccess;
}

Expand All @@ -202,9 +225,10 @@ ncclResult_t ncclAlltoallv(
ncclDataType_t type,
ncclComm_t comm,
cudaStream_t stream) {
std::cout << "alltoallv triggered" << std::endl;
int numranks;
C10D_NCCL_CHECK(ncclCommCount(comm, &numranks));
C10D_NCCL_CHECK(ncclGroupStart());
NCCL_GROUP_START();
for (int r = 0; r < numranks; r++) {
// NCCL uses 0 byte message for synchronization
// Avoid send/recv when message size is zero
Expand All @@ -227,7 +251,7 @@ ncclResult_t ncclAlltoallv(
stream));
}
}
C10D_NCCL_CHECK(ncclGroupEnd());
NCCL_GROUP_END();
return ncclSuccess;
}
#endif
Expand Down Expand Up @@ -783,11 +807,11 @@ std::vector<std::shared_ptr<NCCLComm>>& ProcessGroupNCCL::getNCCLComm(
// nccl communicator is actually created before encountering any communication calls.
// This is why we need the following for loop.
for (size_t i = 0; i < ncclActiveGroupCounter_; ++i) {
C10D_NCCL_CHECK(ncclGroupEnd());
NCCL_GROUP_END();
}

// [Note 1] Create the NCCL communicators for each GPU
C10D_NCCL_CHECK(ncclGroupStart());
NCCL_GROUP_START();

for (size_t i = 0; i < devices.size(); ++i) {
// GPU world size and GPU rank
Expand Down Expand Up @@ -821,11 +845,11 @@ std::vector<std::shared_ptr<NCCLComm>>& ProcessGroupNCCL::getNCCLComm(
}

// [Note 2 ]
C10D_NCCL_CHECK(ncclGroupEnd());
NCCL_GROUP_END();

// See [Group Start/End Note]
for (size_t i = 0; i < ncclActiveGroupCounter_; ++i) {
C10D_NCCL_CHECK(ncclGroupStart());
NCCL_GROUP_START();
}

ncclStreams_.emplace(devicesKey, std::move(streamVal));
Expand Down Expand Up @@ -1027,6 +1051,7 @@ std::shared_ptr<ProcessGroup::Work> ProcessGroupNCCL::collective(
for (size_t i = 0; i < inputs.size(); ++i) {
gpuGuard.set_index(devices[i].index());
at::cuda::CUDAStream& ncclStream = ncclStreams_[key][i];
std::cout << "schedule input" << i << std::endl;
C10D_NCCL_CHECK(
fn(inputs[i], outputs[i], ncclComms[i]->getNcclComm(), ncclStream));
}
Expand Down Expand Up @@ -1370,6 +1395,7 @@ std::shared_ptr<ProcessGroup::Work> ProcessGroupNCCL::alltoall_base(
std::vector<int64_t>& outputSplitSizes,
std::vector<int64_t>& inputSplitSizes,
const AllToAllOptions& /* unused */) {
std::cout << " alltoall_base triggered" << std::endl;
check_gpu_single_tensor(outputTensor);
check_gpu_single_tensor(inputTensor);
if (outputSplitSizes.size() == 0 && inputSplitSizes.size() == 0) {
Expand Down Expand Up @@ -1503,14 +1529,14 @@ std::shared_ptr<ProcessGroup::Work> ProcessGroupNCCL::recv(

void ProcessGroupNCCL::groupStart() {
#if defined(NCCL_MAJOR) && (NCCL_MAJOR >= 2)
C10D_NCCL_CHECK(ncclGroupStart());
NCCL_GROUP_START();
#endif
++ncclActiveGroupCounter_;
}

void ProcessGroupNCCL::groupEnd() {
#if defined(NCCL_MAJOR) && (NCCL_MAJOR >= 2)
C10D_NCCL_CHECK(ncclGroupEnd());
NCCL_GROUP_END();
#endif
--ncclActiveGroupCounter_;
}
Expand Down
2 changes: 1 addition & 1 deletion torch/testing/_internal/distributed/distributed_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -577,7 +577,7 @@ def test_backend_full_group(self):

# NCCL Batch SEND RECV
@skip_if_no_gpu
@unittest.skip("NCCL P2P is not enabled for OSS builds")
#@unittest.skip("NCCL P2P is not enabled for OSS builds")
@unittest.skipIf(BACKEND != "nccl", "NCCL Batch Send Recv Only")
@requires_nccl_version(2700, "Need NCCL 2.7+ for send/recv")
def test_batch_isend_irecv_nccl(self):
Expand Down

0 comments on commit 83707a4

Please sign in to comment.