Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added args/kwargs to idist barrier #2353

Open
wants to merge 38 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
fb2b897
#2213 pass args and kwargs to idist.barrier method
fco-dv Dec 11, 2021
87e3157
#2213 rm typing.Dict import
fco-dv Dec 11, 2021
4e30043
#2213 add HOROVOD_VERSION
fco-dv Dec 11, 2021
131ac8d
Merge remote-tracking branch 'upstream/master' into idist_barrier
fco-dv Dec 12, 2021
1b62160
minor fix in warning message (#2213)
fco-dv Dec 12, 2021
4c27588
Merge branch 'master' into idist_barrier
fco-dv Dec 12, 2021
f08d8d8
Merge remote-tracking branch 'upstream/master' into idist_barrier
fco-dv Dec 16, 2021
4265809
Merge remote-tracking branch 'upstream/master' into idist_barrier
fco-dv Dec 17, 2021
a2dfd40
remove _check_signature method
fco-dv Dec 22, 2021
007d6ec
Merge remote-tracking branch 'upstream/master' into idist_barrier
fco-dv Dec 22, 2021
2115544
rm unused import
fco-dv Dec 22, 2021
4309356
replace packaging version by distutils.version LooseVersion
fco-dv Dec 22, 2021
840174b
Update ignite/distributed/utils.py
fco-dv Dec 22, 2021
d1f7163
Update ignite/distributed/utils.py
vfdev-5 Dec 22, 2021
9266a91
rm warnings from barrier tpu tests , add tpu test with args
fco-dv Dec 23, 2021
6e4f635
fix args param
fco-dv Dec 23, 2021
3d85c31
Merge remote-tracking branch 'upstream/master' into idist_barrier
fco-dv Jan 13, 2022
ac45365
xla tag explicit
fco-dv Jan 13, 2022
02b2b7a
autopep8 fix
fco-dv Jan 13, 2022
4679671
add type hint / rm warning in test
fco-dv Jan 14, 2022
6c5964b
rm warning in test
fco-dv Jan 14, 2022
26adc91
revert test_xla
fco-dv Jan 14, 2022
2f2b607
fix bad import / add kwargs to _test_distrib_barrier
fco-dv Jan 14, 2022
45fd369
add tests for native and xla
fco-dv Jan 16, 2022
925ec46
add tests for hvd
fco-dv Jan 16, 2022
63a4291
fix hvd import
fco-dv Jan 16, 2022
ecbfe33
fix import for hvd
fco-dv Jan 16, 2022
f9d4e93
Update tests/ignite/distributed/utils/test_native.py
fco-dv Jan 17, 2022
1c59636
autopep8 fix
fco-dv Jan 17, 2022
8d1af22
rm trailing commas in tests
fco-dv Jan 17, 2022
a81b064
update xla test params
fco-dv Jan 17, 2022
5f90e6d
Merge branch 'master' into idist_barrier
vfdev-5 Jan 17, 2022
c0ae8fd
rm test params
fco-dv Jan 17, 2022
9db5db2
Merge branch 'master' into idist_barrier
fco-dv Jan 17, 2022
a697696
revert failing hvd test
fco-dv Jan 17, 2022
36fa945
rm test params
fco-dv Jan 17, 2022
07daca5
Merge branch 'master' into idist_barrier
fco-dv Jan 22, 2022
5d7600a
Merge branch 'master' into idist_barrier
fco-dv Jan 25, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions ignite/distributed/comp_models/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ def _do_broadcast(self, tensor: torch.Tensor, src: int) -> torch.Tensor:
pass

@abstractmethod
def barrier(self) -> None:
def barrier(self, *args: Any, **kwargs: Any) -> None:
pass


Expand Down Expand Up @@ -357,5 +357,5 @@ def _do_all_gather(self, tensor: torch.Tensor) -> torch.Tensor:
def _do_broadcast(self, tensor: torch.Tensor, src: int) -> torch.Tensor:
return tensor

def barrier(self) -> None:
def barrier(self, *args: Any, **kwargs: Any) -> None:
pass
19 changes: 15 additions & 4 deletions ignite/distributed/comp_models/horovod.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import warnings
from distutils.version import LooseVersion
from typing import Any, Callable, Mapping, Optional, Tuple, cast

import torch

from ignite.distributed.comp_models.base import ComputationModel

try:
import horovod
import horovod.torch as hvd

try:
Expand All @@ -23,6 +25,7 @@
if has_hvd_support:

HOROVOD = "horovod"
HOROVOD_VERSION = horovod.__version__

class _HorovodDistModel(ComputationModel):
"""Private class for `Horovod <https://horovod.readthedocs.io/en/stable/>`_ distributed computation model."""
Expand Down Expand Up @@ -192,7 +195,15 @@ def _do_all_gather(self, tensor: torch.Tensor) -> torch.Tensor:
def _do_broadcast(self, tensor: torch.Tensor, src: int) -> torch.Tensor:
return hvd.broadcast(tensor, root_rank=src)

def barrier(self) -> None:
# https://github.com/horovod/horovod/issues/159#issuecomment-424834603
# hvd.allreduce(torch.tensor(0, device=self.device()), name="barrier")
hvd.allreduce(torch.tensor(0, device="cpu"), name="barrier")
def barrier(self, *args: Any, **kwargs: Any) -> None:
if LooseVersion(HOROVOD_VERSION) < LooseVersion("0.23.0"):
if len(args) or len(kwargs):
warnings.warn(
f"Arguments {list(args) + list(kwargs)} are not passed to horovod barrier method. "
f"Please use horovod version>='0.23.0'"
)
# https://github.com/horovod/horovod/issues/159#issuecomment-424834603
# hvd.allreduce(torch.tensor(0, device=self.device()), name="barrier")
hvd.allreduce(torch.tensor(0, device="cpu"), name="barrier")
else:
hvd.barrier(*args, **kwargs)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vfdev-5 not a lot on this PR remaining I think, just wanted to check the hvd 2 gpus tests for this because it seems to fail here.

4 changes: 2 additions & 2 deletions ignite/distributed/comp_models/native.py
Original file line number Diff line number Diff line change
Expand Up @@ -437,8 +437,8 @@ def _do_broadcast(self, tensor: torch.Tensor, src: int) -> torch.Tensor:
dist.broadcast(tensor, src=src)
return tensor

def barrier(self) -> None:
dist.barrier()
def barrier(self, *args: Any, **kwargs: Any) -> None:
dist.barrier(*args, **kwargs)

def _expand_hostlist(nodelist: str) -> List[str]:
"""Expand a compressed hostlist string and returns all hosts listed.
Expand Down
4 changes: 2 additions & 2 deletions ignite/distributed/comp_models/xla.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,5 +159,5 @@ def _do_broadcast(self, tensor: torch.Tensor, src: int) -> torch.Tensor:
xm.all_reduce("sum", [tensor])
return tensor

def barrier(self) -> None:
xm.rendezvous("barrier")
def barrier(self, *args: Any, tag: str = "barrier", **kwargs: Any) -> None:
xm.rendezvous(tag, *args, **kwargs)
22 changes: 19 additions & 3 deletions ignite/distributed/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -418,12 +418,28 @@ def broadcast(
return _model.broadcast(tensor, src=src, safe_mode=safe_mode)


def barrier() -> None:
"""Helper method to synchronize all processes."""
def barrier(*args: Any, **kwargs: Any) -> None:
"""Helper method to synchronize all processes.

Args:
args: acceptable args according to provided backend
kwargs: acceptable kwargs according to provided backend

- | "nccl" or "gloo" : ``group`` (default, GroupMember.WORLD), ``async_op`` (default, False),
| ``device_ids`` (default, None).

- | "horovod" : for version >= "0.23.0", ``process_set`` (default, global_process_set).

- | "xla-tpu" : ``tag``, ``payload`` (default, b""), ``replicas`` (default, []).

.. versionchanged:: 0.5.1
Method now accepts ``args`` and ``kwargs`` for all supported backends.

"""
if _need_to_sync and isinstance(_model, _SerialModel):
sync(temporary=True)

_model.barrier()
_model.barrier(*args, **kwargs)


def set_local_rank(index: int) -> None:
Expand Down
4 changes: 2 additions & 2 deletions tests/ignite/distributed/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,14 +215,14 @@ def _test(data_src, data_others, safe_mode):
idist.broadcast(None, src=0)


def _test_distrib_barrier(device):
def _test_distrib_barrier(device, *args, **kwargs):

t = torch.tensor([idist.get_rank()], device=device, dtype=torch.float)
true_res = sum([i for i in range(idist.get_world_size())])

if idist.get_rank() == 0:
t += 10.0
idist.barrier()
idist.barrier(*args, **kwargs)

tt = idist.all_reduce(t)
assert tt.item() == true_res + 10.0
Expand Down
27 changes: 23 additions & 4 deletions tests/ignite/distributed/utils/test_native.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,21 +255,40 @@ def test_idist_broadcast_gloo(distributed_context_single_node_gloo):
_test_distrib_broadcast(device)


from torch.distributed import GroupMember


@pytest.mark.distributed
@pytest.mark.skipif(not has_native_dist_support, reason="Skip if no native dist support")
@pytest.mark.skipif(torch.cuda.device_count() < 1, reason="Skip if no GPU")
def test_idist_barrier_nccl(distributed_context_single_node_nccl):
@pytest.mark.parametrize(
"args,kwargs",
[
([], {}),
([GroupMember.WORLD, False], {}),
([GroupMember.WORLD, True], {}),
],
)
def test_idist_barrier_nccl(distributed_context_single_node_nccl, args, kwargs):

device = idist.device()
_test_distrib_barrier(device)
_test_distrib_barrier(device, *args, **kwargs)


@pytest.mark.distributed
@pytest.mark.skipif(not has_native_dist_support, reason="Skip if no native dist support")
def test_idist_barrier_gloo(distributed_context_single_node_gloo):
@pytest.mark.parametrize(
"args,kwargs",
[
([], {}),
([GroupMember.WORLD, False], {}),
([GroupMember.WORLD, True], {}),
],
)
def test_idist_barrier_gloo(distributed_context_single_node_gloo, args, kwargs):

device = idist.device()
_test_distrib_barrier(device)
_test_distrib_barrier(device, *args, **kwargs)


def _test_idist_methods_overhead(ok_factor):
Expand Down
30 changes: 24 additions & 6 deletions tests/ignite/distributed/utils/test_xla.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,23 +178,41 @@ def test_idist_broadcast_xla_in_child_proc(xmp_executor):
@pytest.mark.tpu
@pytest.mark.skipif("NUM_TPU_WORKERS" in os.environ, reason="Skip if NUM_TPU_WORKERS is in env vars")
@pytest.mark.skipif(not has_xla_support, reason="Skip if no PyTorch XLA package")
def test_idist_barrier_xla():
@pytest.mark.parametrize(
"args,kwargs",
[
([], {}),
([b"test_payload", []], {}),
([b"test_payload", []], {"tag": "test_barrier"}),
([], {"payload": b"test_payload", "replicas": [], "tag": "test_barrier"}),
],
)
def test_idist_barrier_xla(args, kwargs):

device = idist.device()
_test_distrib_barrier(device)
_test_distrib_barrier(device, *args, **kwargs)


def _test_idist_barrier_xla_in_child_proc(index):
def _test_idist_barrier_xla_in_child_proc(index, args, kwargs):
device = idist.device()
_test_distrib_barrier(device)
_test_distrib_barrier(device, *args, **kwargs)


@pytest.mark.tpu
@pytest.mark.skipif("NUM_TPU_WORKERS" not in os.environ, reason="Skip if no NUM_TPU_WORKERS in env vars")
@pytest.mark.skipif(not has_xla_support, reason="Skip if no PyTorch XLA package")
def test_idist_barrier_xla_in_child_proc(xmp_executor):
@pytest.mark.parametrize(
"args,kwargs",
[
([], {}),
([b"test_payload", []], {}),
([b"test_payload", []], {"tag": "test_barrier"}),
([], {"payload": b"test_payload", "replicas": [], "tag": "test_barrier"}),
],
)
def test_idist_barrier_xla_in_child_proc(xmp_executor, args, kwargs):
n = int(os.environ["NUM_TPU_WORKERS"])
xmp_executor(_test_idist_barrier_xla_in_child_proc, args=(), nprocs=n)
xmp_executor(_test_idist_barrier_xla_in_child_proc, args=(args, kwargs), nprocs=n)


@pytest.mark.tpu
Expand Down