Skip to content

Commit

Permalink
Used PySequence_Fast in distributed csrc
Browse files Browse the repository at this point in the history
  • Loading branch information
teng-li committed Nov 28, 2017
1 parent 538d6f6 commit 5c3a91c
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 45 deletions.
129 changes: 86 additions & 43 deletions torch/csrc/distributed/Module.cpp
Expand Up @@ -381,8 +381,13 @@ PyObject* THDPModule_allReduceMultiGPU(PyObject *_unused, PyObject *args)
if (PyTuple_GET_SIZE(args) != 3) {
goto invalid_arguments;
}
sequence = PyTuple_GET_ITEM(args, 0);
if (!PySequence_Check(sequence)) {

if (!PySequence_Check(PyTuple_GET_ITEM(args, 0))) {
goto invalid_arguments;
}

sequence = PySequence_Fast(PyTuple_GET_ITEM(args, 0), nullptr);
if (!sequence) {
goto invalid_arguments;
}

Expand All @@ -395,12 +400,12 @@ PyObject* THDPModule_allReduceMultiGPU(PyObject *_unused, PyObject *args)
descriptors.reserve(length);

for (std::size_t i = 0; i < length; ++i) {
if (!THPModule_isTensor(PySequence_ITEM(sequence, i))) {
if (!THPModule_isTensor(PySequence_Fast_GET_ITEM(sequence, i))) {
goto invalid_arguments;
}

descriptors.push_back(
THDPModule_makeDescriptor(PySequence_ITEM(sequence, i))
THDPModule_makeDescriptor(PySequence_Fast_GET_ITEM(sequence, i))
);
}

Expand Down Expand Up @@ -436,12 +441,16 @@ PyObject* THDPModule_reduceMultiGPU(PyObject *_unused, PyObject *args)
goto invalid_arguments;
}

sequence = PyTuple_GET_ITEM(args, 0);
if (!PySequence_Check(sequence) ||
if (!PySequence_Check(PyTuple_GET_ITEM(args, 0)) ||
!THPUtils_checkLong(PyTuple_GET_ITEM(args, 1))) {
goto invalid_arguments;
}

sequence = PySequence_Fast(PyTuple_GET_ITEM(args, 0), nullptr);
if (!sequence) {
goto invalid_arguments;
}

tmp_length = PySequence_Fast_GET_SIZE(sequence);
THPUtils_assert(tmp_length >= 0, "couldn't obtain the length of %s",
THPUtils_typename(sequence));
Expand All @@ -450,12 +459,12 @@ PyObject* THDPModule_reduceMultiGPU(PyObject *_unused, PyObject *args)
descriptors.reserve(length);

for (std::size_t i = 0; i < length; ++i) {
if (!THPModule_isTensor(PySequence_ITEM(sequence, i))) {
if (!THPModule_isTensor(PySequence_Fast_GET_ITEM(sequence, i))) {
goto invalid_arguments;
}

descriptors.push_back(
THDPModule_makeDescriptor(PySequence_ITEM(sequence, i))
THDPModule_makeDescriptor(PySequence_Fast_GET_ITEM(sequence, i))
);
}

Expand Down Expand Up @@ -492,13 +501,16 @@ PyObject* THDPModule_broadcastMultiGPU(PyObject *_unused, PyObject *args)
goto invalid_arguments;
}

sequence = PyTuple_GET_ITEM(args, 0);

if (!PySequence_Check(sequence) ||
if (!PySequence_Check(PyTuple_GET_ITEM(args, 0)) ||
!THPUtils_checkLong(PyTuple_GET_ITEM(args, 1))) {
goto invalid_arguments;
}

sequence = PySequence_Fast(PyTuple_GET_ITEM(args, 0), nullptr);
if (!sequence) {
goto invalid_arguments;
}

tmp_length = PySequence_Fast_GET_SIZE(sequence);
THPUtils_assert(tmp_length >= 0, "couldn't obtain the length of %s",
THPUtils_typename(sequence));
Expand All @@ -507,12 +519,12 @@ PyObject* THDPModule_broadcastMultiGPU(PyObject *_unused, PyObject *args)
descriptors.reserve(length);

for (std::size_t i = 0; i < length; ++i) {
if (!THPModule_isTensor(PySequence_ITEM(sequence, i))) {
if (!THPModule_isTensor(PySequence_Fast_GET_ITEM(sequence, i))) {
goto invalid_arguments;
}

descriptors.push_back(
THDPModule_makeDescriptor(PySequence_ITEM(sequence, i))
THDPModule_makeDescriptor(PySequence_Fast_GET_ITEM(sequence, i))
);
}

Expand Down Expand Up @@ -554,10 +566,15 @@ PyObject* THDPModule_allGatherMultiGPU(PyObject *_unused, PyObject *args)
goto invalid_arguments;
}

sequence_one = PyTuple_GET_ITEM(args, 0);
sequence_two = PyTuple_GET_ITEM(args, 1);
if (!PySequence_Check(PyTuple_GET_ITEM(args, 0)) ||
!PySequence_Check(PyTuple_GET_ITEM(args, 1))) {
goto invalid_arguments;
}

sequence_one = PySequence_Fast(PyTuple_GET_ITEM(args, 0), nullptr);
sequence_two = PySequence_Fast(PyTuple_GET_ITEM(args, 1), nullptr);

if (!PySequence_Check(sequence_one) || !PySequence_Check(sequence_two)) {
if (!sequence_one || !sequence_two) {
goto invalid_arguments;
}

Expand All @@ -580,17 +597,17 @@ PyObject* THDPModule_allGatherMultiGPU(PyObject *_unused, PyObject *args)

// Get the input list
for (size_t i = 0; i < length_two; ++i) {
if (!THPModule_isTensor(PySequence_ITEM(sequence_two, i)) ||
!THPModule_isTensor(PySequence_ITEM(sequence_one, i))) {
if (!THPModule_isTensor(PySequence_Fast_GET_ITEM(sequence_two, i)) ||
!THPModule_isTensor(PySequence_Fast_GET_ITEM(sequence_one, i))) {
goto invalid_arguments;
}

input_descriptors.push_back(
THDPModule_makeDescriptor(PySequence_ITEM(sequence_two, i))
THDPModule_makeDescriptor(PySequence_Fast_GET_ITEM(sequence_two, i))
);

output_descriptors.push_back(
THDPModule_makeDescriptor(PySequence_ITEM(sequence_one, i))
THDPModule_makeDescriptor(PySequence_Fast_GET_ITEM(sequence_one, i))
);
}

Expand Down Expand Up @@ -680,31 +697,38 @@ PyObject* THDPModule_broadcast(PyObject *_unused, PyObject *args)
PyObject* THDPModule_allGather(PyObject *_unused, PyObject *args)
{
HANDLE_TH_ERRORS
PyObject* sequence = PyTuple_GET_ITEM(args, 0);
PyObject* sequence;
Py_ssize_t tmp_length;
std::size_t length;
std::vector<at::Tensor> descriptors;
std::vector<at::Tensor> raw_descriptors;
THDGroup group;
at::Tensor desc;

if (PyTuple_GET_SIZE(args) != 3 || !PySequence_Check(sequence) ||
!THPModule_isTensor(PyTuple_GET_ITEM(args, 1))) {
if (PyTuple_GET_SIZE(args) != 3 ||
!PySequence_Check(PyTuple_GET_ITEM(args, 0)) ||
!THPModule_isTensor(PyTuple_GET_ITEM(args, 1))) {

goto invalid_arguments;
}

sequence = PySequence_Fast(PyTuple_GET_ITEM(args, 0), nullptr);
if (!sequence) {
goto invalid_arguments;
}

tmp_length = PySequence_Length(sequence);
tmp_length = PySequence_Fast_GET_SIZE(sequence);
THPUtils_assert(tmp_length >= 0, "couldn't obtain the length of %s",
THPUtils_typename(sequence));

length = static_cast<std::size_t>(tmp_length);
descriptors.reserve(length);
for (std::size_t i = 0; i < length; ++i) {
if (!THPModule_isTensor(PySequence_ITEM(sequence, i)))
if (!THPModule_isTensor(PySequence_Fast_GET_ITEM(sequence, i)))
goto invalid_arguments;

descriptors.push_back(
THDPModule_makeDescriptor(PySequence_ITEM(sequence, i))
THDPModule_makeDescriptor(PySequence_Fast_GET_ITEM(sequence, i))
);
raw_descriptors.push_back(descriptors.back());
}
Expand Down Expand Up @@ -747,31 +771,37 @@ PyObject* THDPModule_gatherSend(PyObject *_unused, PyObject *args)
PyObject* THDPModule_gatherRecv(PyObject *_unused, PyObject *args)
{
HANDLE_TH_ERRORS
PyObject* sequence = PyTuple_GET_ITEM(args, 0);
PyObject* sequence;
Py_ssize_t tmp_length;
std::size_t length;
std::vector<at::Tensor> descriptors;
std::vector<at::Tensor> raw_descriptors;
THDGroup group;
at::Tensor desc;

if (PyTuple_GET_SIZE(args) != 3 || !PySequence_Check(sequence) ||
!THPModule_isTensor(PyTuple_GET_ITEM(args, 1))) {
if (PyTuple_GET_SIZE(args) != 3 ||
!PySequence_Check(PyTuple_GET_ITEM(args, 0)) ||
!THPModule_isTensor(PyTuple_GET_ITEM(args, 1))) {
goto invalid_arguments;
}

tmp_length = PySequence_Length(sequence);
sequence = PySequence_Fast(PyTuple_GET_ITEM(args, 0), nullptr);
if (!sequence) {
goto invalid_arguments;
}

tmp_length = PySequence_Fast_GET_SIZE(sequence);
THPUtils_assert(tmp_length >= 0, "couldn't obtain the length of %s",
THPUtils_typename(sequence));

length = static_cast<std::size_t>(tmp_length);
descriptors.reserve(length);
for (std::size_t i = 0; i < length; ++i) {
if (!THPModule_isTensor(PySequence_ITEM(sequence, i)))
if (!THPModule_isTensor(PySequence_Fast_GET_ITEM(sequence, i)))
goto invalid_arguments;

descriptors.push_back(
THDPModule_makeDescriptor(PySequence_ITEM(sequence, i))
THDPModule_makeDescriptor(PySequence_Fast_GET_ITEM(sequence, i))
);
raw_descriptors.push_back(descriptors.back());
}
Expand All @@ -794,31 +824,37 @@ PyObject* THDPModule_gatherRecv(PyObject *_unused, PyObject *args)
PyObject* THDPModule_scatterSend(PyObject *_unused, PyObject *args)
{
HANDLE_TH_ERRORS
PyObject* sequence = PyTuple_GET_ITEM(args, 0);
PyObject* sequence;
Py_ssize_t tmp_length;
std::size_t length;
std::vector<at::Tensor> descriptors;
std::vector<at::Tensor> raw_descriptors;
THDGroup group;
at::Tensor desc;

if (PyTuple_GET_SIZE(args) != 3 || !PySequence_Check(sequence) ||
!THPModule_isTensor(PyTuple_GET_ITEM(args, 1))) {
if (PyTuple_GET_SIZE(args) != 3 ||
!PySequence_Check(PyTuple_GET_ITEM(args, 0)) ||
!THPModule_isTensor(PyTuple_GET_ITEM(args, 1))) {
goto invalid_arguments;
}

sequence = PySequence_Fast(PyTuple_GET_ITEM(args, 0), nullptr);
if (!sequence) {
goto invalid_arguments;
}

tmp_length = PySequence_Length(sequence);
tmp_length = PySequence_Fast_GET_SIZE(sequence);
THPUtils_assert(tmp_length >= 0, "couldn't obtain the length of %s",
THPUtils_typename(sequence));

length = static_cast<std::size_t>(tmp_length);
descriptors.reserve(length);
for (std::size_t i = 0; i < length; ++i) {
if (!THPModule_isTensor(PySequence_ITEM(sequence, i)))
if (!THPModule_isTensor(PySequence_Fast_GET_ITEM(sequence, i)))
goto invalid_arguments;

descriptors.push_back(
THDPModule_makeDescriptor(PySequence_ITEM(sequence, i))
THDPModule_makeDescriptor(PySequence_Fast_GET_ITEM(sequence, i))
);
raw_descriptors.push_back(descriptors.back());
}
Expand Down Expand Up @@ -873,25 +909,32 @@ PyObject* THDPModule_barrier(PyObject *_unused, PyObject *_group)
PyObject* THDPModule_newGroup(PyObject *_unused, PyObject *args)
{
HANDLE_TH_ERRORS
PyObject* sequence = PyTuple_GET_ITEM(args, 0);
PyObject* sequence;
Py_ssize_t tmp_length;
std::size_t length;
std::vector<int> ranks;

if (PyTuple_GET_SIZE(args) != 1 || !PySequence_Check(sequence))
if (PyTuple_GET_SIZE(args) != 1 ||
!PySequence_Check(PyTuple_GET_ITEM(args, 0))) {
goto invalid_arguments;
}

tmp_length = PySequence_Length(sequence);
sequence = PySequence_Fast(PyTuple_GET_ITEM(args, 0), nullptr);
if (!sequence) {
goto invalid_arguments;
}

tmp_length = PySequence_Fast_GET_SIZE(sequence);
THPUtils_assert(tmp_length >= 0, "couldn't obtain the length of %s",
THPUtils_typename(sequence));

length = static_cast<std::size_t>(tmp_length);
ranks.reserve(length);
for (std::size_t i = 0; i < length; ++i) {
if (!THPUtils_checkLong(PySequence_ITEM(sequence, i)))
if (!THPUtils_checkLong(PySequence_Fast_GET_ITEM(sequence, i)))
goto invalid_arguments;

ranks.push_back(THPUtils_unpackLong(PySequence_ITEM(sequence, i)));
ranks.push_back(THPUtils_unpackLong(PySequence_Fast_GET_ITEM(sequence, i)));
for (std::size_t j = 0; j < i; ++j)
THPUtils_assert(ranks[i] != ranks[j], "ranks should be unique");
}
Expand Down
2 changes: 1 addition & 1 deletion torch/distributed/__init__.py
Expand Up @@ -85,7 +85,7 @@ def init_process_group(backend, init_method='env://', **kwargs):
elif backend == "nccl":
_backend = dist_backend.NCCL
else:
raise RuntimeError("Invalid distributed backend name detected")
raise RuntimeError("Invalid distributed backend name: " + backend)

if _backend == dist_backend.NCCL:
warnings.warn("""
Expand Down
2 changes: 1 addition & 1 deletion torch/nn/parallel/distributed.py
Expand Up @@ -22,7 +22,7 @@


class DistributedDataParallel(Module):
"""Implements distributed data parallelism at the module level.
r"""Implements distributed data parallelism at the module level.
This container parallelizes the application of the given module by
splitting the input across the specified devices by chunking in the batch
Expand Down

0 comments on commit 5c3a91c

Please sign in to comment.