Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemented NCCL Distributed Backend for PyTorch with new dist APIs #3435

Merged
merged 12 commits into from Nov 29, 2017
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
129 changes: 86 additions & 43 deletions torch/csrc/distributed/Module.cpp
Expand Up @@ -381,8 +381,13 @@ PyObject* THDPModule_allReduceMultiGPU(PyObject *_unused, PyObject *args)
if (PyTuple_GET_SIZE(args) != 3) {
goto invalid_arguments;
}
sequence = PyTuple_GET_ITEM(args, 0);
if (!PySequence_Check(sequence)) {

if (!PySequence_Check(PyTuple_GET_ITEM(args, 0))) {
goto invalid_arguments;
}

sequence = PySequence_Fast(PyTuple_GET_ITEM(args, 0), nullptr);

This comment was marked as off-topic.

This comment was marked as off-topic.

if (!sequence) {
goto invalid_arguments;
}

Expand All @@ -395,12 +400,12 @@ PyObject* THDPModule_allReduceMultiGPU(PyObject *_unused, PyObject *args)
descriptors.reserve(length);

for (std::size_t i = 0; i < length; ++i) {
if (!THPModule_isTensor(PySequence_ITEM(sequence, i))) {
if (!THPModule_isTensor(PySequence_Fast_GET_ITEM(sequence, i))) {
goto invalid_arguments;
}

descriptors.push_back(
THDPModule_makeDescriptor(PySequence_ITEM(sequence, i))
THDPModule_makeDescriptor(PySequence_Fast_GET_ITEM(sequence, i))
);
}

Expand Down Expand Up @@ -436,12 +441,16 @@ PyObject* THDPModule_reduceMultiGPU(PyObject *_unused, PyObject *args)
goto invalid_arguments;
}

sequence = PyTuple_GET_ITEM(args, 0);
if (!PySequence_Check(sequence) ||
if (!PySequence_Check(PyTuple_GET_ITEM(args, 0)) ||
!THPUtils_checkLong(PyTuple_GET_ITEM(args, 1))) {
goto invalid_arguments;
}

sequence = PySequence_Fast(PyTuple_GET_ITEM(args, 0), nullptr);
if (!sequence) {
goto invalid_arguments;
}

tmp_length = PySequence_Fast_GET_SIZE(sequence);
THPUtils_assert(tmp_length >= 0, "couldn't obtain the length of %s",
THPUtils_typename(sequence));
Expand All @@ -450,12 +459,12 @@ PyObject* THDPModule_reduceMultiGPU(PyObject *_unused, PyObject *args)
descriptors.reserve(length);

for (std::size_t i = 0; i < length; ++i) {
if (!THPModule_isTensor(PySequence_ITEM(sequence, i))) {
if (!THPModule_isTensor(PySequence_Fast_GET_ITEM(sequence, i))) {
goto invalid_arguments;
}

descriptors.push_back(
THDPModule_makeDescriptor(PySequence_ITEM(sequence, i))
THDPModule_makeDescriptor(PySequence_Fast_GET_ITEM(sequence, i))
);
}

Expand Down Expand Up @@ -492,13 +501,16 @@ PyObject* THDPModule_broadcastMultiGPU(PyObject *_unused, PyObject *args)
goto invalid_arguments;
}

sequence = PyTuple_GET_ITEM(args, 0);

if (!PySequence_Check(sequence) ||
if (!PySequence_Check(PyTuple_GET_ITEM(args, 0)) ||
!THPUtils_checkLong(PyTuple_GET_ITEM(args, 1))) {
goto invalid_arguments;
}

sequence = PySequence_Fast(PyTuple_GET_ITEM(args, 0), nullptr);

This comment was marked as off-topic.

if (!sequence) {
goto invalid_arguments;
}

tmp_length = PySequence_Fast_GET_SIZE(sequence);
THPUtils_assert(tmp_length >= 0, "couldn't obtain the length of %s",
THPUtils_typename(sequence));
Expand All @@ -507,12 +519,12 @@ PyObject* THDPModule_broadcastMultiGPU(PyObject *_unused, PyObject *args)
descriptors.reserve(length);

for (std::size_t i = 0; i < length; ++i) {
if (!THPModule_isTensor(PySequence_ITEM(sequence, i))) {
if (!THPModule_isTensor(PySequence_Fast_GET_ITEM(sequence, i))) {
goto invalid_arguments;
}

descriptors.push_back(
THDPModule_makeDescriptor(PySequence_ITEM(sequence, i))
THDPModule_makeDescriptor(PySequence_Fast_GET_ITEM(sequence, i))
);
}

Expand Down Expand Up @@ -554,10 +566,15 @@ PyObject* THDPModule_allGatherMultiGPU(PyObject *_unused, PyObject *args)
goto invalid_arguments;
}

sequence_one = PyTuple_GET_ITEM(args, 0);
sequence_two = PyTuple_GET_ITEM(args, 1);
if (!PySequence_Check(PyTuple_GET_ITEM(args, 0)) ||
!PySequence_Check(PyTuple_GET_ITEM(args, 1))) {
goto invalid_arguments;
}

sequence_one = PySequence_Fast(PyTuple_GET_ITEM(args, 0), nullptr);
sequence_two = PySequence_Fast(PyTuple_GET_ITEM(args, 1), nullptr);

if (!PySequence_Check(sequence_one) || !PySequence_Check(sequence_two)) {
if (!sequence_one || !sequence_two) {
goto invalid_arguments;
}

Expand All @@ -580,17 +597,17 @@ PyObject* THDPModule_allGatherMultiGPU(PyObject *_unused, PyObject *args)

// Get the input list
for (size_t i = 0; i < length_two; ++i) {
if (!THPModule_isTensor(PySequence_ITEM(sequence_two, i)) ||
!THPModule_isTensor(PySequence_ITEM(sequence_one, i))) {
if (!THPModule_isTensor(PySequence_Fast_GET_ITEM(sequence_two, i)) ||
!THPModule_isTensor(PySequence_Fast_GET_ITEM(sequence_one, i))) {
goto invalid_arguments;
}

input_descriptors.push_back(
THDPModule_makeDescriptor(PySequence_ITEM(sequence_two, i))
THDPModule_makeDescriptor(PySequence_Fast_GET_ITEM(sequence_two, i))
);

output_descriptors.push_back(
THDPModule_makeDescriptor(PySequence_ITEM(sequence_one, i))
THDPModule_makeDescriptor(PySequence_Fast_GET_ITEM(sequence_one, i))
);
}

Expand Down Expand Up @@ -680,31 +697,38 @@ PyObject* THDPModule_broadcast(PyObject *_unused, PyObject *args)
PyObject* THDPModule_allGather(PyObject *_unused, PyObject *args)
{
HANDLE_TH_ERRORS
PyObject* sequence = PyTuple_GET_ITEM(args, 0);
PyObject* sequence;
Py_ssize_t tmp_length;
std::size_t length;
std::vector<at::Tensor> descriptors;
std::vector<at::Tensor> raw_descriptors;
THDGroup group;
at::Tensor desc;

if (PyTuple_GET_SIZE(args) != 3 || !PySequence_Check(sequence) ||
!THPModule_isTensor(PyTuple_GET_ITEM(args, 1))) {
if (PyTuple_GET_SIZE(args) != 3 ||
!PySequence_Check(PyTuple_GET_ITEM(args, 0)) ||
!THPModule_isTensor(PyTuple_GET_ITEM(args, 1))) {

goto invalid_arguments;
}

sequence = PySequence_Fast(PyTuple_GET_ITEM(args, 0), nullptr);
if (!sequence) {
goto invalid_arguments;
}

tmp_length = PySequence_Length(sequence);
tmp_length = PySequence_Fast_GET_SIZE(sequence);
THPUtils_assert(tmp_length >= 0, "couldn't obtain the length of %s",
THPUtils_typename(sequence));

length = static_cast<std::size_t>(tmp_length);
descriptors.reserve(length);
for (std::size_t i = 0; i < length; ++i) {
if (!THPModule_isTensor(PySequence_ITEM(sequence, i)))
if (!THPModule_isTensor(PySequence_Fast_GET_ITEM(sequence, i)))
goto invalid_arguments;

descriptors.push_back(
THDPModule_makeDescriptor(PySequence_ITEM(sequence, i))
THDPModule_makeDescriptor(PySequence_Fast_GET_ITEM(sequence, i))
);
raw_descriptors.push_back(descriptors.back());
}
Expand Down Expand Up @@ -747,31 +771,37 @@ PyObject* THDPModule_gatherSend(PyObject *_unused, PyObject *args)
PyObject* THDPModule_gatherRecv(PyObject *_unused, PyObject *args)
{
HANDLE_TH_ERRORS
PyObject* sequence = PyTuple_GET_ITEM(args, 0);
PyObject* sequence;
Py_ssize_t tmp_length;
std::size_t length;
std::vector<at::Tensor> descriptors;
std::vector<at::Tensor> raw_descriptors;
THDGroup group;
at::Tensor desc;

if (PyTuple_GET_SIZE(args) != 3 || !PySequence_Check(sequence) ||
!THPModule_isTensor(PyTuple_GET_ITEM(args, 1))) {
if (PyTuple_GET_SIZE(args) != 3 ||
!PySequence_Check(PyTuple_GET_ITEM(args, 0)) ||
!THPModule_isTensor(PyTuple_GET_ITEM(args, 1))) {
goto invalid_arguments;
}

tmp_length = PySequence_Length(sequence);
sequence = PySequence_Fast(PyTuple_GET_ITEM(args, 0), nullptr);
if (!sequence) {
goto invalid_arguments;
}

tmp_length = PySequence_Fast_GET_SIZE(sequence);
THPUtils_assert(tmp_length >= 0, "couldn't obtain the length of %s",
THPUtils_typename(sequence));

length = static_cast<std::size_t>(tmp_length);
descriptors.reserve(length);
for (std::size_t i = 0; i < length; ++i) {
if (!THPModule_isTensor(PySequence_ITEM(sequence, i)))
if (!THPModule_isTensor(PySequence_Fast_GET_ITEM(sequence, i)))
goto invalid_arguments;

descriptors.push_back(
THDPModule_makeDescriptor(PySequence_ITEM(sequence, i))
THDPModule_makeDescriptor(PySequence_Fast_GET_ITEM(sequence, i))
);
raw_descriptors.push_back(descriptors.back());
}
Expand All @@ -794,31 +824,37 @@ PyObject* THDPModule_gatherRecv(PyObject *_unused, PyObject *args)
PyObject* THDPModule_scatterSend(PyObject *_unused, PyObject *args)
{
HANDLE_TH_ERRORS
PyObject* sequence = PyTuple_GET_ITEM(args, 0);
PyObject* sequence;
Py_ssize_t tmp_length;
std::size_t length;
std::vector<at::Tensor> descriptors;
std::vector<at::Tensor> raw_descriptors;
THDGroup group;
at::Tensor desc;

if (PyTuple_GET_SIZE(args) != 3 || !PySequence_Check(sequence) ||
!THPModule_isTensor(PyTuple_GET_ITEM(args, 1))) {
if (PyTuple_GET_SIZE(args) != 3 ||
!PySequence_Check(PyTuple_GET_ITEM(args, 0)) ||
!THPModule_isTensor(PyTuple_GET_ITEM(args, 1))) {
goto invalid_arguments;
}

sequence = PySequence_Fast(PyTuple_GET_ITEM(args, 0), nullptr);
if (!sequence) {
goto invalid_arguments;
}

tmp_length = PySequence_Length(sequence);
tmp_length = PySequence_Fast_GET_SIZE(sequence);
THPUtils_assert(tmp_length >= 0, "couldn't obtain the length of %s",
THPUtils_typename(sequence));

length = static_cast<std::size_t>(tmp_length);
descriptors.reserve(length);
for (std::size_t i = 0; i < length; ++i) {
if (!THPModule_isTensor(PySequence_ITEM(sequence, i)))
if (!THPModule_isTensor(PySequence_Fast_GET_ITEM(sequence, i)))
goto invalid_arguments;

descriptors.push_back(
THDPModule_makeDescriptor(PySequence_ITEM(sequence, i))
THDPModule_makeDescriptor(PySequence_Fast_GET_ITEM(sequence, i))
);
raw_descriptors.push_back(descriptors.back());
}
Expand Down Expand Up @@ -873,25 +909,32 @@ PyObject* THDPModule_barrier(PyObject *_unused, PyObject *_group)
PyObject* THDPModule_newGroup(PyObject *_unused, PyObject *args)
{
HANDLE_TH_ERRORS
PyObject* sequence = PyTuple_GET_ITEM(args, 0);
PyObject* sequence;
Py_ssize_t tmp_length;
std::size_t length;
std::vector<int> ranks;

if (PyTuple_GET_SIZE(args) != 1 || !PySequence_Check(sequence))
if (PyTuple_GET_SIZE(args) != 1 ||
!PySequence_Check(PyTuple_GET_ITEM(args, 0))) {
goto invalid_arguments;
}

tmp_length = PySequence_Length(sequence);
sequence = PySequence_Fast(PyTuple_GET_ITEM(args, 0), nullptr);
if (!sequence) {
goto invalid_arguments;
}

tmp_length = PySequence_Fast_GET_SIZE(sequence);
THPUtils_assert(tmp_length >= 0, "couldn't obtain the length of %s",
THPUtils_typename(sequence));

length = static_cast<std::size_t>(tmp_length);
ranks.reserve(length);
for (std::size_t i = 0; i < length; ++i) {
if (!THPUtils_checkLong(PySequence_ITEM(sequence, i)))
if (!THPUtils_checkLong(PySequence_Fast_GET_ITEM(sequence, i)))
goto invalid_arguments;

ranks.push_back(THPUtils_unpackLong(PySequence_ITEM(sequence, i)));
ranks.push_back(THPUtils_unpackLong(PySequence_Fast_GET_ITEM(sequence, i)));
for (std::size_t j = 0; j < i; ++j)
THPUtils_assert(ranks[i] != ranks[j], "ranks should be unique");
}
Expand Down
14 changes: 8 additions & 6 deletions torch/distributed/__init__.py
Expand Up @@ -67,11 +67,6 @@ def init_process_group(backend, init_method='env://', **kwargs):
global _initialized
if _initialized:
raise RuntimeError("trying to initialize torch.distributed twice!")
torch._C._dist_init_process_group(backend, init_method, world_size,
group_name, rank)
_initialized = _INITIALIZED_PG
if not torch._C._dist_init_extension(False, reduce_op, group):
raise RuntimeError("distributed module initialization failed")

# Checking and assigning the distributed backend
global _backend
Expand All @@ -85,7 +80,11 @@ def init_process_group(backend, init_method='env://', **kwargs):
elif backend == "nccl":
_backend = dist_backend.NCCL
else:
raise RuntimeError("Invalid distributed backend name detected")
raise RuntimeError("Invalid distributed backend name: " + backend)

This comment was marked as off-topic.


torch._C._dist_init_process_group(backend, init_method, world_size,
group_name, rank)
_initialized = _INITIALIZED_PG

if _backend == dist_backend.NCCL:
warnings.warn("""
Expand All @@ -98,6 +97,9 @@ def init_process_group(backend, init_method='env://', **kwargs):
""")
atexit.register(destroy_process_group)

if not torch._C._dist_init_extension(False, reduce_op, group):
raise RuntimeError("distributed module initialization failed")


def init_master_worker(backend, init_method='env://', **kwargs):
warnings.warn("""
Expand Down
2 changes: 1 addition & 1 deletion torch/nn/parallel/distributed.py
Expand Up @@ -22,7 +22,7 @@


class DistributedDataParallel(Module):
"""Implements distributed data parallelism at the module level.
r"""Implements distributed data parallelism at the module level.

This container parallelizes the application of the given module by
splitting the input across the specified devices by chunking in the batch
Expand Down