Skip to content

Commit

Permalink
Let NCCL2 Backend use ATEN instead deprecated THPP
Browse files Browse the repository at this point in the history
  • Loading branch information
teng-li committed Nov 2, 2017
1 parent cda4ad4 commit ae2028e
Show file tree
Hide file tree
Showing 9 changed files with 136 additions and 138 deletions.
1 change: 1 addition & 0 deletions test/test_distributed.py
Expand Up @@ -577,6 +577,7 @@ class TestMPI(TestCase, _DistTestBase):
elif BACKEND == 'nccl':
dist.init_process_group(init_method=INIT_METHOD, backend='nccl')
# TODO

class TestNCCL(TestCase, _DistTestBase):
pass

Expand Down
32 changes: 15 additions & 17 deletions torch/csrc/distributed/Module.cpp
Expand Up @@ -374,8 +374,8 @@ PyObject* THDPModule_allReduceMultiGPU(PyObject *_unused, PyObject *args)
PyObject* sequence = PyTuple_GET_ITEM(args, 0);
Py_ssize_t tmp_length;
std::size_t length;
std::vector<THDPTensorDesc> descriptors;
std::vector<THDTensorDescriptor*> raw_descriptors;
std::vector<at::Tensor> descriptors;
std::vector<at::Tensor> raw_descriptors;
THDGroup group;
THDReduceOp op;

Expand All @@ -396,7 +396,7 @@ PyObject* THDPModule_allReduceMultiGPU(PyObject *_unused, PyObject *args)
}

descriptors.push_back(
THDPTensorDesc(THDPModule_makeDescriptor(PySequence_ITEM(sequence, i)))
THDPModule_makeDescriptor(PySequence_ITEM(sequence, i))
);
raw_descriptors.push_back(descriptors.back());
}
Expand Down Expand Up @@ -424,8 +424,8 @@ PyObject* THDPModule_reduceMultiGPU(PyObject *_unused, PyObject *args)
PyObject* sequence = PyTuple_GET_ITEM(args, 0);
Py_ssize_t tmp_length;
std::size_t length;
std::vector<THDPTensorDesc> descriptors;
std::vector<THDTensorDescriptor*> raw_descriptors;
std::vector<at::Tensor> descriptors;
std::vector<at::Tensor> raw_descriptors;
THDGroup group;
THDReduceOp op;
int dst_rank;
Expand All @@ -448,7 +448,7 @@ PyObject* THDPModule_reduceMultiGPU(PyObject *_unused, PyObject *args)
}

descriptors.push_back(
THDPTensorDesc(THDPModule_makeDescriptor(PySequence_ITEM(sequence, i)))
THDPModule_makeDescriptor(PySequence_ITEM(sequence, i))
);
raw_descriptors.push_back(descriptors.back());
}
Expand Down Expand Up @@ -478,8 +478,8 @@ PyObject* THDPModule_broadcastMultiGPU(PyObject *_unused, PyObject *args)
PyObject* sequence = PyTuple_GET_ITEM(args, 0);
Py_ssize_t tmp_length;
std::size_t length;
std::vector<THDPTensorDesc> descriptors;
std::vector<THDTensorDescriptor*> raw_descriptors;
std::vector<at::Tensor> descriptors;
std::vector<at::Tensor> raw_descriptors;
THDGroup group;
int src_rank;

Expand All @@ -501,7 +501,7 @@ PyObject* THDPModule_broadcastMultiGPU(PyObject *_unused, PyObject *args)
}

descriptors.push_back(
THDPTensorDesc(THDPModule_makeDescriptor(PySequence_ITEM(sequence, i)))
THDPModule_makeDescriptor(PySequence_ITEM(sequence, i))
);
raw_descriptors.push_back(descriptors.back());
}
Expand Down Expand Up @@ -535,11 +535,11 @@ PyObject* THDPModule_allGatherMultiGPU(PyObject *_unused, PyObject *args)
size_t length_one;
size_t length_two;

std::vector<THDPTensorDesc> output_descriptors;
std::vector<THDTensorDescriptor*> output_raw_descriptors;
std::vector<at::Tensor> output_descriptors;
std::vector<at::Tensor> output_raw_descriptors;

std::vector<THDPTensorDesc> input_descriptors;
std::vector<THDTensorDescriptor*> input_raw_descriptors;
std::vector<at::Tensor> input_descriptors;
std::vector<at::Tensor> input_raw_descriptors;

THDGroup group;

Expand Down Expand Up @@ -574,14 +574,12 @@ PyObject* THDPModule_allGatherMultiGPU(PyObject *_unused, PyObject *args)
}

input_descriptors.push_back(
THDPTensorDesc(THDPModule_makeDescriptor(
PySequence_ITEM(sequence_two, i)))
THDPModule_makeDescriptor(PySequence_ITEM(sequence_two, i))
);
input_raw_descriptors.push_back(input_descriptors.back());

output_descriptors.push_back(
THDPTensorDesc(THDPModule_makeDescriptor(
PySequence_ITEM(sequence_one, i)))
THDPModule_makeDescriptor(PySequence_ITEM(sequence_one, i))
);
output_raw_descriptors.push_back(output_descriptors.back());
}
Expand Down
9 changes: 5 additions & 4 deletions torch/distributed/__init__.py
Expand Up @@ -5,7 +5,7 @@
"""
import torch
import warnings
from torch._utils import _flatten_tensors, _unflatten_tensors
from torch._utils import _flatten_dense_tensors, _unflatten_dense_tensors

_INITIALIZED_PG = 1
_INITIALIZED_MW = 2
Expand Down Expand Up @@ -311,6 +311,7 @@ def reduce_multigpu(tensor_list, dst, op=reduce_op.SUM, group=group.WORLD):
"Multi GPU collectives only supported in nccl backend"
return torch._C._dist_reduce_multigpu(tensor_list, dst, op, group)


def reduce(tensor, dst, op=reduce_op.SUM, group=group.WORLD):
"""Reduces the tensor data across all machines.
Expand Down Expand Up @@ -353,7 +354,7 @@ def all_gather_multigpu(output_tensor_lists,

flatten_tensor_list = []
for output_tensor_list in output_tensor_lists:
flatten_tensor_list.append(_flatten_tensors(output_tensor_list))
flatten_tensor_list.append(_flatten_dense_tensors(output_tensor_list))

ret = torch._C._dist_all_gather_multigpu(flatten_tensor_list,
input_tensor_list,
Expand All @@ -362,8 +363,8 @@ def all_gather_multigpu(output_tensor_lists,
for output_tensor_list, flatten_tensor in zip(output_tensor_lists,
flatten_tensor_list):
for tensor, value in zip(output_tensor_list,
_unflatten_tensors(flatten_tensor,
output_tensor_list)):
_unflatten_dense_tensors(flatten_tensor,
output_tensor_list)):
tensor.copy_(value)

return ret
Expand Down
1 change: 0 additions & 1 deletion torch/lib/THD/base/DataChannel.cpp
@@ -1,4 +1,3 @@
/* definition to expand macro then apply to pragma message */
#include "DataChannel.hpp"
#ifdef WITH_GLOO
#include "data_channels/DataChannelGloo.hpp"
Expand Down
12 changes: 6 additions & 6 deletions torch/lib/THD/base/data_channels/DataChannelGloo.cpp
Expand Up @@ -268,8 +268,8 @@ auto DataChannelGloo::ireceive(at::Tensor& data, rank_type src_rank) -> RequestG
}


void DataChannelGloo::allReduce(std::vector<thpp::Tensor*>& input,
std::vector<thpp::Tensor*>& output,
void DataChannelGloo::allReduce(std::vector<at::Tensor>& input,
std::vector<at::Tensor>& output,
THDReduceOp operation,
THDGroup groupId) {

Expand All @@ -278,16 +278,16 @@ void DataChannelGloo::allReduce(std::vector<thpp::Tensor*>& input,
}


void DataChannelGloo::allGather(std::vector<thpp::Tensor*>& input,
std::vector<thpp::Tensor*>& output,
void DataChannelGloo::allGather(std::vector<at::Tensor>& input,
std::vector<at::Tensor>& output,
THDGroup groupId) {

throw std::runtime_error("DataChannelGloo does not support mult-GPU cross "
"node allgather");
}


void DataChannelGloo::reduce(std::vector<thpp::Tensor*>& data,
void DataChannelGloo::reduce(std::vector<at::Tensor>& data,
THDReduceOp operation,
rank_type dstRank,
THDGroup groupId) {
Expand All @@ -297,7 +297,7 @@ void DataChannelGloo::reduce(std::vector<thpp::Tensor*>& data,
}


void DataChannelGloo::broadcast(std::vector<thpp::Tensor*>& data,
void DataChannelGloo::broadcast(std::vector<at::Tensor>& data,
rank_type srcRank,
THDGroup groupId) {

Expand Down
12 changes: 6 additions & 6 deletions torch/lib/THD/base/data_channels/DataChannelMPI.cpp
Expand Up @@ -508,8 +508,8 @@ THDGroup DataChannelMPI::newGroup(const std::vector<rank_type>& ranks) {
return new_group_id;
}

void DataChannelMPI::allReduce(std::vector<thpp::Tensor*>& input,
std::vector<thpp::Tensor*>& output,
void DataChannelMPI::allReduce(std::vector<at::Tensor>& input,
std::vector<at::Tensor>& output,
THDReduceOp operation,
THDGroup groupId) {

Expand All @@ -518,16 +518,16 @@ void DataChannelMPI::allReduce(std::vector<thpp::Tensor*>& input,
}


void DataChannelMPI::allGather(std::vector<thpp::Tensor*>& input,
std::vector<thpp::Tensor*>& output,
void DataChannelMPI::allGather(std::vector<at::Tensor>& input,
std::vector<at::Tensor>& output,
THDGroup groupId) {

throw std::runtime_error("DataChannelMPI does not support mult-GPU cross "
"node allgather");
}


void DataChannelMPI::reduce(std::vector<thpp::Tensor*>& data,
void DataChannelMPI::reduce(std::vector<at::Tensor>& data,
THDReduceOp operation,
rank_type dstRank,
THDGroup groupId) {
Expand All @@ -537,7 +537,7 @@ void DataChannelMPI::reduce(std::vector<thpp::Tensor*>& data,
}


void DataChannelMPI::broadcast(std::vector<thpp::Tensor*>& data,
void DataChannelMPI::broadcast(std::vector<at::Tensor>& data,
rank_type srcRank,
THDGroup groupId) {

Expand Down

0 comments on commit ae2028e

Please sign in to comment.