Skip to content

Commit

Permalink
[Feat] ElasticLauncher auto dist_backend
Browse files Browse the repository at this point in the history
  • Loading branch information
Yuyao Huang committed Jan 25, 2022
1 parent aedd5bf commit 7f6741e
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 14 deletions.
2 changes: 1 addition & 1 deletion ice/core/hypergraph.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from ice.llutil.config import configurable
from ice.llutil.logging import get_logger
from ice.llutil.multiprocessing import called_from_main
from ice.llutil.multiprocessing.launcher import ElasticLauncher
from ice.llutil.launcher import ElasticLauncher
from torch.autograd.grad_mode import set_grad_enabled


Expand Down
21 changes: 13 additions & 8 deletions ice/llutil/multiprocessing/launcher.py → ice/llutil/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,21 +35,23 @@ def parse_min_max_nnodes(nnodes: str):
return min_nodes, max_nodes


def parse_devices(devices: str):
def parse_devices_and_backend(devices: str, dist_backend: str):

# determine device type
if devices[:3] == "cpu" or (
devices[:4] == "auto" and not torch.cuda.is_available()
):
device_type = "cpu"
ddp_backend = "gloo"
if dist_backend == "auto":
dist_backend = "gloo"
elif devices[:4] == "cuda" or (
devices[:4] == "auto" and torch.cuda.is_available()
):
if not torch.cuda.is_available():
raise ValueError("Cuda is not available.")
device_type = "cuda"
ddp_backend = "nccl"
if dist_backend == "auto":
dist_backend = "nccl"
else:
raise ValueError(f"Unsupported devices value: {devices}")

Expand Down Expand Up @@ -77,12 +79,14 @@ def parse_devices(devices: str):
out.append(torch.device(device_type, int(indices)))
if 0 == len(out):
raise ValueError("Empty devices indices.")
return out, ddp_backend
if dist_backend == "nccl" and len(set(out)) != len(out):
dist_backend = "gloo"
return out, dist_backend


def _wrap(launcher:"ElasticLauncher", entrypoint, *args):
dist.init_process_group(
backend=launcher.ddp_backend,
backend=launcher.dist_backend,
rank=launcher.rank,
world_size=launcher.world_size,
)
Expand Down Expand Up @@ -114,6 +118,7 @@ def __freeze__(self,
# Worker/node size related arguments.
devices="auto", # devices per node, e.g.: ["auto", "cpu", "cuda", "cuda:0", "cuda:*", "auto:*", "cuda:1,3", "cuda:0-2,7"]
nnodes="1:1", # Number of nodes, or the range of nodes in form <minimum_nodes>:<maximum_nodes>.
dist_backend="auto", # supports: ["nccl", "gloo", "mpi", "auto"]. If given "auto", will use "nccl" for "cuda" and "gloo" for "cpu" in general.

# Rendezvous related arguments
rdzv_id="none", # User-defined group id.
Expand Down Expand Up @@ -156,7 +161,7 @@ def __freeze__(self,
assert 0 < min_nodes <= max_nodes
assert max_restarts >= 0

self._devices, self._ddp_backend = parse_devices(devices)
self._devices, self._dist_backend = parse_devices_and_backend(devices, dist_backend)

nproc_per_node = len(self._devices)
logging.info(f"Using nproc_per_node={nproc_per_node}.")
Expand Down Expand Up @@ -214,8 +219,8 @@ def devices(self) -> List[torch.device]:
return self._devices

@property
def ddp_backend(self):
return self._ddp_backend
def dist_backend(self):
return self._dist_backend

#
# following properties should be called from the subprocesses, you can pass the launcher as argument for custom entrypoint function.
Expand Down
2 changes: 1 addition & 1 deletion tests/llutil/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import pytest
import torch.nn as nn
from ice.llutil.config import configurable, make_configurable
from ice.llutil.multiprocessing.launcher import ElasticLauncher
from ice.llutil.launcher import ElasticLauncher

make_configurable(nn.Conv2d)
make_configurable(nn.Conv2d)
Expand Down
15 changes: 11 additions & 4 deletions tests/llutil/test_launcher.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,29 @@
import pytest
import torch
import torch.distributed as dist
from ice.llutil.multiprocessing.launcher import ElasticLauncher
from ice.llutil.launcher import ElasticLauncher


def sum_rank(launcher:ElasticLauncher):
def sum_rank(launcher: ElasticLauncher):
# print(launcher.rank)
tensor = torch.Tensor([launcher.rank]).to(launcher.assigned_device)
dist.all_reduce(tensor, op=dist.ReduceOp.SUM)

assert tensor.item() == (0 + launcher.world_size - 1) * launcher.world_size // 2


def test_sum_rank():
launcher = ElasticLauncher(devices="cpu:0-3").freeze()
launcher(sum_rank, launcher)


@pytest.mark.cuda
def test_sum_rank_cuda():
launcher = ElasticLauncher(devices="cuda:0-1").freeze()
launcher = ElasticLauncher(devices="cuda:0,1").freeze()
launcher(sum_rank, launcher)


@pytest.mark.cuda
def test_same_gpu():
launcher = ElasticLauncher(devices="cuda:0,0").freeze()
launcher(sum_rank, launcher)

0 comments on commit 7f6741e

Please sign in to comment.