Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions ignite/distributed/auto.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,23 +188,23 @@ def auto_model(model: nn.Module, sync_bn: bool = False, **kwargs: Any) -> nn.Mod
# distributed data parallel model
if idist.get_world_size() > 1:
bnd = idist.backend()
if idist.has_native_dist_support and bnd == idist_native.NCCL:
if idist.has_native_dist_support and bnd in (idist_native.NCCL, idist_native.GLOO, idist_native.MPI):
if sync_bn:
logger.info("Convert batch norm to sync batch norm")
model = nn.SyncBatchNorm.convert_sync_batchnorm(model)

if "device_ids" in kwargs:
raise ValueError(f"Argument kwargs should not contain 'device_ids', but got {kwargs}")
if torch.cuda.is_available():
if "device_ids" in kwargs:
raise ValueError(f"Argument kwargs should not contain 'device_ids', but got {kwargs}")

lrank = idist.get_local_rank()
logger.info(f"Apply torch DistributedDataParallel on model, device id: {lrank}")
model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[lrank,], **kwargs)
elif idist.has_native_dist_support and bnd == idist_native.GLOO:
if sync_bn:
logger.info("Convert batch norm to sync batch norm")
model = nn.SyncBatchNorm.convert_sync_batchnorm(model)
lrank = idist.get_local_rank()
logger.info(f"Apply torch DistributedDataParallel on model, device id: {lrank}")
kwargs["device_ids"] = [
lrank,
]
else:
logger.info("Apply torch DistributedDataParallel on model")

logger.info("Apply torch DistributedDataParallel on model")
model = torch.nn.parallel.DistributedDataParallel(model, **kwargs)
elif idist.has_hvd_support and bnd == idist_hvd.HOROVOD:
import horovod.torch as hvd
Expand Down
10 changes: 5 additions & 5 deletions ignite/distributed/comp_models/native.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class _NativeDistModel(ComputationModel):
In this implementation we assume the following mapping between backend and devices:

- NCCL <-> GPU
- GLOO <-> CPU
- GLOO <-> CPU or GPU
- MPI <-> CPU

"""
Expand Down Expand Up @@ -127,7 +127,7 @@ def _create_from_backend(
# https://github.com/facebookresearch/maskrcnn-benchmark/issues/172
dist.barrier()

if backend == dist.Backend.NCCL:
if torch.cuda.is_available():
torch.cuda.set_device(self._local_rank)

self._setup_attrs()
Expand All @@ -140,7 +140,7 @@ def _init_from_context(self) -> None:
def _compute_nproc_per_node(self) -> int:
local_rank = self.get_local_rank()
device = torch.device("cpu")
if self.backend() == dist.Backend.NCCL:
if torch.cuda.is_available():
# we manually set cuda device to local rank in order to avoid a hang on all_reduce
device = torch.device(f"cuda:{local_rank}")
tensor = torch.tensor([self.get_local_rank() + 1]).to(device)
Expand All @@ -151,7 +151,7 @@ def _get_all_hostnames(self) -> List[Tuple[str, ...]]:
import socket

device = "cpu"
if self.backend() == dist.Backend.NCCL:
if torch.cuda.is_available():
index = torch.cuda.current_device()
device = f"cuda:{index}"
hostname = socket.gethostname()
Expand Down Expand Up @@ -281,7 +281,7 @@ def get_node_rank(self) -> int:
return cast(int, self._node)

def device(self) -> torch.device:
if self.backend() == dist.Backend.NCCL:
if torch.cuda.is_available():
index = torch.cuda.current_device()
if index < self.get_local_rank():
warnings.warn(
Expand Down
7 changes: 2 additions & 5 deletions tests/ignite/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def _create_dist_context(dist_info, lrank):

dist.init_process_group(**dist_info)
dist.barrier()
if dist_info["backend"] == "nccl":
if torch.cuda.is_available():
torch.cuda.set_device(lrank)

return {"local_rank": lrank, "world_size": dist_info["world_size"], "rank": dist_info["rank"]}
Expand Down Expand Up @@ -150,8 +150,6 @@ def distributed_context_single_node_nccl(local_rank, world_size):

free_port = _setup_free_port(local_rank)

print(local_rank, "Port:", free_port)

dist_info = {
"backend": "nccl",
"world_size": world_size,
Expand All @@ -174,7 +172,6 @@ def distributed_context_single_node_gloo(local_rank, world_size):
init_method = f'file:///{temp_file.name.replace(backslash, "/")}'
else:
free_port = _setup_free_port(local_rank)
print(local_rank, "Port:", free_port)
init_method = f"tcp://localhost:{free_port}"
temp_file = None

Expand Down Expand Up @@ -213,7 +210,7 @@ def _create_mnodes_dist_context(dist_info, mnodes_conf):

dist.init_process_group(**dist_info)
dist.barrier()
if dist_info["backend"] == "nccl":
if torch.cuda.is_available():
torch.cuda.device(mnodes_conf["local_rank"])
return mnodes_conf

Expand Down
22 changes: 13 additions & 9 deletions tests/ignite/contrib/engines/test_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def _test_setup_common_training_handlers(
num_epochs = 10

model = DummyModel().to(device)
if distributed and "cuda" in device:
if distributed and "cuda" in torch.device(device).type:
model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[local_rank,], output_device=local_rank)
optimizer = torch.optim.SGD(model.parameters(), lr=lr)

Expand Down Expand Up @@ -581,17 +581,19 @@ def test_setup_neptune_logging(dirname):
@pytest.mark.distributed
@pytest.mark.skipif(not idist.has_native_dist_support, reason="Skip if no native dist support")
@pytest.mark.skipif(torch.cuda.device_count() < 1, reason="Skip if no GPU")
def test_distrib_gpu(dirname, distributed_context_single_node_nccl):
def test_distrib_nccl_gpu(dirname, distributed_context_single_node_nccl):

local_rank = distributed_context_single_node_nccl["local_rank"]
device = f"cuda:{local_rank}"
device = idist.device()
_test_setup_common_training_handlers(dirname, device, rank=local_rank, local_rank=local_rank, distributed=True)
test_add_early_stopping_by_val_score()


@pytest.mark.distributed
@pytest.mark.skipif(not idist.has_native_dist_support, reason="Skip if no native dist support")
def test_distrib_cpu(dirname, distributed_context_single_node_gloo):
device = "cpu"
def test_distrib_gloo_cpu_or_gpu(dirname, distributed_context_single_node_gloo):

device = idist.device()
local_rank = distributed_context_single_node_gloo["local_rank"]
_test_setup_common_training_handlers(dirname, device, rank=local_rank, local_rank=local_rank, distributed=True)
_test_setup_common_training_handlers(
Expand All @@ -606,8 +608,9 @@ def test_distrib_cpu(dirname, distributed_context_single_node_gloo):
@pytest.mark.multinode_distributed
@pytest.mark.skipif(not idist.has_native_dist_support, reason="Skip if no native dist support")
@pytest.mark.skipif("MULTINODE_DISTRIB" not in os.environ, reason="Skip if not multi-node distributed")
def test_multinode_distrib_cpu(dirname, distributed_context_multi_node_gloo):
device = "cpu"
def test_multinode_distrib_gloo_cpu_or_gpu(dirname, distributed_context_multi_node_gloo):

device = idist.device()
rank = distributed_context_multi_node_gloo["rank"]
_test_setup_common_training_handlers(dirname, device, rank=rank)
test_add_early_stopping_by_val_score()
Expand All @@ -616,9 +619,10 @@ def test_multinode_distrib_cpu(dirname, distributed_context_multi_node_gloo):
@pytest.mark.multinode_distributed
@pytest.mark.skipif(not idist.has_native_dist_support, reason="Skip if no native dist support")
@pytest.mark.skipif("GPU_MULTINODE_DISTRIB" not in os.environ, reason="Skip if not multi-node distributed")
def test_multinode_distrib_gpu(dirname, distributed_context_multi_node_nccl):
def test_multinode_distrib_nccl_gpu(dirname, distributed_context_multi_node_nccl):

local_rank = distributed_context_multi_node_nccl["local_rank"]
rank = distributed_context_multi_node_nccl["rank"]
device = f"cuda:{local_rank}"
device = idist.device()
_test_setup_common_training_handlers(dirname, device, rank=rank, local_rank=local_rank, distributed=True)
test_add_early_stopping_by_val_score()
13 changes: 8 additions & 5 deletions tests/ignite/contrib/handlers/test_clearml_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -888,18 +888,21 @@ def update_fn(engine, batch):

@pytest.mark.distributed
@pytest.mark.skipif(not idist.has_native_dist_support, reason="Skip if no native dist support")
def test_distrib_cpu(distributed_context_single_node_gloo):
_test_save_model_optimizer_lr_scheduler_with_state_dict("cpu")
_test_save_model_optimizer_lr_scheduler_with_state_dict("cpu", on_zero_rank=True)
def test_distrib_gloo_cpu_or_gpu(distributed_context_single_node_gloo):

device = idist.device()
_test_save_model_optimizer_lr_scheduler_with_state_dict(device)
_test_save_model_optimizer_lr_scheduler_with_state_dict(device, on_zero_rank=True)


@pytest.mark.distributed
@pytest.mark.skipif(not idist.has_native_dist_support, reason="Skip if no native dist support")
@pytest.mark.skipif(torch.cuda.device_count() < 1, reason="Skip if no GPU")
def test_distrib_gpu(distributed_context_single_node_nccl):
def test_distrib_nccl_gpu(distributed_context_single_node_nccl):

device = idist.device()
_test_save_model_optimizer_lr_scheduler_with_state_dict(device)
_test_save_model_optimizer_lr_scheduler_with_state_dict("cpu", on_zero_rank=True)
_test_save_model_optimizer_lr_scheduler_with_state_dict(device, on_zero_rank=True)


@pytest.mark.tpu
Expand Down
10 changes: 6 additions & 4 deletions tests/ignite/contrib/handlers/test_lr_finder.py
Original file line number Diff line number Diff line change
Expand Up @@ -539,17 +539,19 @@ def forward(self, x):

@pytest.mark.distributed
@pytest.mark.skipif(not idist.has_native_dist_support, reason="Skip if no native dist support")
def test_distrib_cpu(distributed_context_single_node_gloo):
device = torch.device("cpu")
def test_distrib_gloo_cpu_or_gpu(distributed_context_single_node_gloo):

device = idist.device()
_test_distrib_log_lr_and_loss(device)
_test_distrib_integration_mnist(device)


@pytest.mark.distributed
@pytest.mark.skipif(not idist.has_native_dist_support, reason="Skip if no native dist support")
@pytest.mark.skipif(torch.cuda.device_count() < 1, reason="Skip if no GPU")
def test_distrib_gpu(distributed_context_single_node_nccl):
device = torch.device(f"cuda:{distributed_context_single_node_nccl['local_rank']}")
def test_distrib_nccl_gpu(distributed_context_single_node_nccl):

device = idist.device()
_test_distrib_log_lr_and_loss(device)
_test_distrib_integration_mnist(device)

Expand Down
9 changes: 6 additions & 3 deletions tests/ignite/contrib/handlers/test_neptune_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -516,13 +516,16 @@ def test_no_neptune_client(no_site_packages):

@pytest.mark.distributed
@pytest.mark.skipif(not idist.has_native_dist_support, reason="Skip if no native dist support")
def test_distrib_cpu(distributed_context_single_node_gloo):
_test_neptune_saver_integration("cpu")
def test_distrib_gloo_cpu_or_gpu(distributed_context_single_node_gloo):

device = idist.device()
_test_neptune_saver_integration(device)


@pytest.mark.distributed
@pytest.mark.skipif(not idist.has_native_dist_support, reason="Skip if no native dist support")
@pytest.mark.skipif(torch.cuda.device_count() < 1, reason="Skip if no GPU")
def test_distrib_gpu(distributed_context_single_node_nccl):
def test_distrib_nccl_gpu(distributed_context_single_node_nccl):

device = idist.device()
_test_neptune_saver_integration(device)
19 changes: 11 additions & 8 deletions tests/ignite/contrib/metrics/regression/test_canberra_metric.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,17 +184,18 @@ def update(engine, i):
@pytest.mark.distributed
@pytest.mark.skipif(not idist.has_native_dist_support, reason="Skip if no native dist support")
@pytest.mark.skipif(torch.cuda.device_count() < 1, reason="Skip if no GPU")
def test_distrib_gpu(distributed_context_single_node_nccl):
device = torch.device(f"cuda:{distributed_context_single_node_nccl['local_rank']}")
def test_distrib_nccl_gpu(distributed_context_single_node_nccl):

device = idist.device()
_test_distrib_compute(device)
_test_distrib_integration(device)


@pytest.mark.distributed
@pytest.mark.skipif(not idist.has_native_dist_support, reason="Skip if no native dist support")
def test_distrib_cpu(distributed_context_single_node_gloo):
def test_distrib_gloo_cpu_or_gpu(distributed_context_single_node_gloo):

device = torch.device("cpu")
device = idist.device()
_test_distrib_compute(device)
_test_distrib_integration(device)

Expand All @@ -214,17 +215,19 @@ def test_distrib_hvd(gloo_hvd_executor):
@pytest.mark.multinode_distributed
@pytest.mark.skipif(not idist.has_native_dist_support, reason="Skip if no native dist support")
@pytest.mark.skipif("MULTINODE_DISTRIB" not in os.environ, reason="Skip if not multi-node distributed")
def test_multinode_distrib_cpu(distributed_context_multi_node_gloo):
device = torch.device("cpu")
def test_multinode_distrib_gloo_cpu_or_gpu(distributed_context_multi_node_gloo):

device = idist.device()
_test_distrib_compute(device)
_test_distrib_integration(device)


@pytest.mark.multinode_distributed
@pytest.mark.skipif(not idist.has_native_dist_support, reason="Skip if no native dist support")
@pytest.mark.skipif("GPU_MULTINODE_DISTRIB" not in os.environ, reason="Skip if not multi-node distributed")
def test_multinode_distrib_gpu(distributed_context_multi_node_nccl):
device = torch.device(f"cuda:{distributed_context_multi_node_nccl['local_rank']}")
def test_multinode_distrib_nccl_gpu(distributed_context_multi_node_nccl):

device = idist.device()
_test_distrib_compute(device)
_test_distrib_integration(device)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,16 +191,18 @@ def update(engine, i):
@pytest.mark.distributed
@pytest.mark.skipif(not idist.has_native_dist_support, reason="Skip if no native dist support")
@pytest.mark.skipif(torch.cuda.device_count() < 1, reason="Skip if no GPU")
def test_distrib_gpu(distributed_context_single_node_nccl):
device = torch.device(f"cuda:{distributed_context_single_node_nccl['local_rank']}")
def test_distrib_nccl_gpu(distributed_context_single_node_nccl):

device = idist.device()
_test_distrib_compute(device)
_test_distrib_integration(device)


@pytest.mark.distributed
@pytest.mark.skipif(not idist.has_native_dist_support, reason="Skip if no native dist support")
def test_distrib_cpu(distributed_context_single_node_gloo):
device = torch.device("cpu")
def test_distrib_gloo_cpu_or_gpu(distributed_context_single_node_gloo):

device = idist.device()
_test_distrib_compute(device)
_test_distrib_integration(device)

Expand All @@ -219,17 +221,19 @@ def test_distrib_hvd(gloo_hvd_executor):
@pytest.mark.multinode_distributed
@pytest.mark.skipif(not idist.has_native_dist_support, reason="Skip if no native dist support")
@pytest.mark.skipif("MULTINODE_DISTRIB" not in os.environ, reason="Skip if not multi-node distributed")
def test_multinode_distrib_cpu(distributed_context_multi_node_gloo):
device = torch.device("cpu")
def test_multinode_distrib_gloo_cpu_or_gpu(distributed_context_multi_node_gloo):

device = idist.device()
_test_distrib_compute(device)
_test_distrib_integration(device)


@pytest.mark.multinode_distributed
@pytest.mark.skipif(not idist.has_native_dist_support, reason="Skip if no native dist support")
@pytest.mark.skipif("GPU_MULTINODE_DISTRIB" not in os.environ, reason="Skip if not multi-node distributed")
def test_multinode_distrib_gpu(distributed_context_multi_node_nccl):
device = torch.device(f"cuda:{distributed_context_multi_node_nccl['local_rank']}")
def test_multinode_distrib_nccl_gpu(distributed_context_multi_node_nccl):

device = idist.device()
_test_distrib_compute(device)


Expand Down
19 changes: 11 additions & 8 deletions tests/ignite/contrib/metrics/regression/test_fractional_bias.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,17 +197,18 @@ def update(engine, i):
@pytest.mark.distributed
@pytest.mark.skipif(not idist.has_native_dist_support, reason="Skip if no native dist support")
@pytest.mark.skipif(torch.cuda.device_count() < 1, reason="Skip if no GPU")
def test_distrib_gpu(distributed_context_single_node_nccl):
device = torch.device(f"cuda:{distributed_context_single_node_nccl['local_rank']}")
def test_distrib_nccl_gpu(distributed_context_single_node_nccl):

device = idist.device()
_test_distrib_compute(device)
_test_distrib_integration(device)


@pytest.mark.distributed
@pytest.mark.skipif(not idist.has_native_dist_support, reason="Skip if no native dist support")
def test_distrib_cpu(distributed_context_single_node_gloo):
def test_distrib_gloo_cpu_or_gpu(distributed_context_single_node_gloo):

device = torch.device("cpu")
device = idist.device()
_test_distrib_compute(device)
_test_distrib_integration(device)

Expand All @@ -227,17 +228,19 @@ def test_distrib_hvd(gloo_hvd_executor):
@pytest.mark.multinode_distributed
@pytest.mark.skipif(not idist.has_native_dist_support, reason="Skip if no native dist support")
@pytest.mark.skipif("MULTINODE_DISTRIB" not in os.environ, reason="Skip if not multi-node distributed")
def test_multinode_distrib_cpu(distributed_context_multi_node_gloo):
device = torch.device("cpu")
def test_multinode_distrib_gloo_cpu_or_gpu(distributed_context_multi_node_gloo):

device = idist.device()
_test_distrib_compute(device)
_test_distrib_integration(device)


@pytest.mark.multinode_distributed
@pytest.mark.skipif(not idist.has_native_dist_support, reason="Skip if no native dist support")
@pytest.mark.skipif("GPU_MULTINODE_DISTRIB" not in os.environ, reason="Skip if not multi-node distributed")
def test_multinode_distrib_gpu(distributed_context_multi_node_nccl):
device = torch.device(f"cuda:{distributed_context_multi_node_nccl['local_rank']}")
def test_multinode_distrib_nccl_gpu(distributed_context_multi_node_nccl):

device = idist.device()
_test_distrib_compute(device)
_test_distrib_integration(device)

Expand Down
Loading