Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 24 additions & 7 deletions ignite/distributed/comp_models/native.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,21 +223,21 @@ def finalize(self):

@staticmethod
def _dist_worker_task_fn(
local_rank, backend, fn, world_size, num_procs_per_node, node_rank, master_addr, master_port, args, kwargs_dict
local_rank, backend, fn, args, kw_dict, world_size, nprocs_per_node, node_rank, master_addr, master_port, kw
):
from ignite.distributed.utils import _set_model, finalize

copy_env_vars = dict(os.environ)

os.environ["LOCAL_RANK"] = str(local_rank)
os.environ["RANK"] = str(node_rank * num_procs_per_node + local_rank)
os.environ["RANK"] = str(node_rank * nprocs_per_node + local_rank)
os.environ["WORLD_SIZE"] = str(world_size)
os.environ["MASTER_ADDR"] = str(master_addr)
os.environ["MASTER_PORT"] = str(master_port)

model = _NativeDistModel.create_from_backend(backend)
model = _NativeDistModel.create_from_backend(backend, **kw)
_set_model(model)
fn(local_rank, *args, **kwargs_dict)
fn(local_rank, *args, **kw_dict)
finalize()

os.environ = copy_env_vars
Expand All @@ -250,17 +250,34 @@ def spawn(
num_procs_per_node: int = 1,
num_nodes: int = 1,
node_rank: int = 0,
master_addr: str = "0.0.0.0",
master_addr: str = "127.0.0.1",
master_port: int = 2222,
backend: str = "nccl",
**kwargs
):
world_size = num_nodes * num_procs_per_node

spawn_kwargs = {
"join": kwargs.get("join", True),
"daemon": kwargs.get("daemon", False),
"start_method": kwargs.get("start_method", "spawn"),
}
mp.spawn(
_NativeDistModel._dist_worker_task_fn,
nprocs=num_procs_per_node,
args=(backend, fn, world_size, num_procs_per_node, node_rank, master_addr, master_port, args, kwargs_dict),
daemon=False,
args=(
backend,
fn,
args,
kwargs_dict,
world_size,
num_procs_per_node,
node_rank,
master_addr,
master_port,
kwargs,
),
**spawn_kwargs,
)

_reduce_op_map = {
Expand Down
5 changes: 2 additions & 3 deletions ignite/distributed/comp_models/xla.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,15 +120,14 @@ def spawn(

import os

spawn_kwargs = {}
if "COLAB_TPU_ADDR" in os.environ:
spawn_kwargs["start_method"] = "fork"
kwargs["start_method"] = "fork"

xmp.spawn(
_XlaDistModel._dist_worker_task_fn,
args=(backend, fn, args, kwargs_dict),
nprocs=num_procs_per_node,
**spawn_kwargs,
**kwargs,
)

_collective_op_dtype = torch.float32
Expand Down
26 changes: 16 additions & 10 deletions ignite/distributed/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,8 @@ def spawn(
num_procs_per_node: int = 1,
**kwargs
):
"""Spawns `num_procs_per_node` processes that run `fn` with `args` and initialize distributed configuration
defined by `backend`.
"""Spawns ``num_procs_per_node`` processes that run ``fn`` with ``args``/``kwargs_dict`` and initialize
distributed configuration defined by ``backend``.

Examples:

Expand Down Expand Up @@ -259,16 +259,22 @@ def train_fn(local_rank, a, b, c, d=12):
backend (str): backend to use: `nccl`, `gloo`, `xla-tpu`
fn (function): function to called as the entrypoint of the spawned process.
This function must be defined at the top level of a module so it can be pickled and spawned.
This is a requirement imposed by multiprocessing. The function is called as `fn(i, *args, **kwargs_dict)`,
This is a requirement imposed by multiprocessing. The function is called as ``fn(i, *args, **kwargs_dict)``,
where `i` is the process index and args is the passed through tuple of arguments.
args (tuple): arguments passed to `fn`.
kwargs_dict (Mapping): kwargs passed to `fn`.
num_procs_per_node (int): number of processes to spawn on a single node. Default, 1.
**kwargs: acceptable kwargs according to provided backend:

- "nccl" or "gloo" : `num_nodes` (=1), `node_rank` (=0), `master_addr` ("127.0.0.1"), `master_port` (2222)
- | "nccl" or "gloo" : `num_nodes` (default, 1), `node_rank` (default, 0), `master_addr`
| (default, "127.0.0.1"), `master_port` (default, 2222), `timeout` to `dist.init_process_group`_ function
| and kwargs for `mp.spawn`_ function.

- "xla-tpu" : `num_nodes` (=1), `node_rank` (=0)
- "xla-tpu" : `num_nodes` (default, 1), `node_rank` (default, 0) and kwargs to `xmp.spawn`_ function.

.. _dist.init_process_group: https://pytorch.org/docs/stable/distributed.html#torch.distributed.init_process_group
.. _mp.spawn: https://pytorch.org/docs/stable/multiprocessing.html#torch.multiprocessing.spawn
.. _xmp.spawn: http://pytorch.org/xla/release/1.5/index.html#torch_xla.distributed.xla_multiprocessing.spawn

"""
_assert_backend(backend)
Expand Down Expand Up @@ -323,7 +329,7 @@ def barrier():

def set_local_rank(index: int):
"""Method to hint the local rank in case if torch native distributed context is created by user
without using :meth:`~ignite.distributed.utils.initialize` or :meth:`~ignite.distributed.utils.spawn`.
without using :meth:`~ignite.distributed.initialize` or :meth:`~ignite.distributed.spawn`.

Usage:

Expand Down Expand Up @@ -364,11 +370,11 @@ def _assert_backend(backend):


def initialize(backend: str, **kwargs):
"""Initializes distributed configuration according to provided `backend`
"""Initializes distributed configuration according to provided ``backend``

Examples:

Launch single node multi-GPU training with `torch.distributed.launch` utility.
Launch single node multi-GPU training with ``torch.distributed.launch`` utility.

.. code-block:: python

Expand Down Expand Up @@ -416,14 +422,14 @@ def train_fn(local_rank, a, b, c):

def finalize():
"""Finalizes distributed configuration. For example, in case of native pytorch distributed configuration,
it calls `dist.destroy_process_group()`.
it calls ``dist.destroy_process_group()``.
"""
_model.finalize()
_set_model(_SerialModel())


def show_config():
"""Helper method to display distributed configuration via `logging`.
"""Helper method to display distributed configuration via ``logging``.
"""

# setup parallel logger
Expand Down
6 changes: 1 addition & 5 deletions tests/ignite/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,7 @@ def world_size():
remove_env_var = False

if "WORLD_SIZE" not in os.environ:
if torch.cuda.is_available():
ws = torch.cuda.device_count()
else:
ws = 1
os.environ["WORLD_SIZE"] = "{}".format(ws)
os.environ["WORLD_SIZE"] = "1"
remove_env_var = True

yield int(os.environ["WORLD_SIZE"])
Expand Down
6 changes: 3 additions & 3 deletions tests/ignite/engine/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,15 @@ def __call__(self, engine):

def setup_sampler(sampler_type, num_iters, batch_size):
if sampler_type is None:
return None
return None, batch_size

if sampler_type == "weighted":
from torch.utils.data.sampler import WeightedRandomSampler

w = torch.ones(num_iters * batch_size, dtype=torch.float)
for i in range(num_iters):
w[batch_size * i : batch_size * (i + 1)] += i * 1.0
return WeightedRandomSampler(w, num_samples=num_iters * batch_size, replacement=True)
return WeightedRandomSampler(w, num_samples=num_iters * batch_size, replacement=True), batch_size

if sampler_type == "distributed":
from torch.utils.data.distributed import DistributedSampler
Expand All @@ -55,4 +55,4 @@ def setup_sampler(sampler_type, num_iters, batch_size):
rank = dist.get_rank()

dataset = torch.zeros(num_iters * batch_size)
return DistributedSampler(dataset, num_replicas=num_replicas, rank=rank)
return DistributedSampler(dataset, num_replicas=num_replicas, rank=rank), batch_size // num_replicas
49 changes: 21 additions & 28 deletions tests/ignite/engine/test_deterministic.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@
def test_update_dataloader():
def _test(sampler_type=None):
num_epochs = 3
batch_size = 4
total_batch_size = 4
num_iters = 17
data = torch.randint(0, 1000, size=(num_iters * batch_size,))
data = torch.randint(0, 1000, size=(num_iters * total_batch_size,))
num_workers = 4

sampler = setup_sampler(sampler_type, num_iters, batch_size)
sampler, batch_size = setup_sampler(sampler_type, num_iters, total_batch_size)
dataloader = DataLoader(
data,
batch_size=batch_size,
Expand All @@ -48,7 +48,7 @@ def _test(sampler_type=None):
t.append(b)
seen_batches.append(t)

sampler = setup_sampler(sampler_type, num_iters, batch_size)
sampler, batch_size = setup_sampler(sampler_type, num_iters, total_batch_size)
dataloader = DataLoader(
data,
batch_size=batch_size,
Expand Down Expand Up @@ -240,18 +240,19 @@ def _test_resume_random_dataloader_from_epoch(device, _setup_sampler, sampler_ty
def _test(epoch_length=None):

max_epochs = 5
batch_size = 4
total_batch_size = 4
num_iters = 21
torch.manual_seed(0)
data = torch.randint(0, 1000, size=(num_iters * batch_size,))
data = torch.randint(0, 1000, size=(num_iters * total_batch_size,))

if epoch_length is None:
epoch_length = num_iters

for resume_epoch in range(1, max_epochs):

for num_workers in [0, 4]:
sampler = _setup_sampler(sampler_type, num_iters, batch_size)
sampler, batch_size = _setup_sampler(sampler_type, num_iters, total_batch_size)

orig_dataloader = DataLoader(
data,
batch_size=batch_size,
Expand Down Expand Up @@ -283,7 +284,7 @@ def _(engine):

batch_checker = BatchChecker(seen_batchs, init_counter=resume_epoch * epoch_length)

sampler = _setup_sampler(sampler_type, num_iters, batch_size)
sampler, batch_size = _setup_sampler(sampler_type, num_iters, total_batch_size)
resume_dataloader = DataLoader(
data,
batch_size=batch_size,
Expand Down Expand Up @@ -346,10 +347,10 @@ def __len__(self):
def _test_resume_random_dataloader_from_iter(device, _setup_sampler, sampler_type=None):
def _test(epoch_length=None):
max_epochs = 3
batch_size = 4
total_batch_size = 4
num_iters = 17
torch.manual_seed(0)
data = torch.randint(0, 1000, size=(num_iters * batch_size,))
data = torch.randint(0, 1000, size=(num_iters * total_batch_size,))

if epoch_length is None:
epoch_length = num_iters
Expand All @@ -358,7 +359,7 @@ def _test(epoch_length=None):

for num_workers in [0, 4]:

sampler = _setup_sampler(sampler_type, num_iters, batch_size)
sampler, batch_size = _setup_sampler(sampler_type, num_iters, total_batch_size)
orig_dataloader = DataLoader(
data,
batch_size=batch_size,
Expand Down Expand Up @@ -389,7 +390,7 @@ def _(engine):

batch_checker = BatchChecker(seen_batchs, init_counter=resume_iteration)

sampler = _setup_sampler(sampler_type, num_iters, batch_size)
sampler, batch_size = _setup_sampler(sampler_type, num_iters, total_batch_size)
resume_dataloader = DataLoader(
data,
batch_size=batch_size,
Expand Down Expand Up @@ -555,39 +556,31 @@ def test_resume_random_data_iterator_from_iter():
@pytest.mark.skipif(torch.cuda.device_count() < 1, reason="Skip if no GPU")
def test_distrib_gpu(distributed_context_single_node_nccl):
device = "cuda:{}".format(distributed_context_single_node_nccl["local_rank"])
_test_resume_random_data_iterator_from_iter(device)
_test_resume_random_data_iterator_from_epoch(device)
_test_resume_random_dataloader_from_iter(device, setup_sampler)
_test_resume_random_dataloader_from_epoch(device, setup_sampler)
_test_resume_random_dataloader_from_iter(device, setup_sampler, sampler_type="distributed")
_test_resume_random_dataloader_from_epoch(device, setup_sampler, sampler_type="distributed")


@pytest.mark.distributed
def test_distrib_cpu(distributed_context_single_node_gloo):
device = "cpu"
_test_resume_random_data_iterator_from_iter(device)
_test_resume_random_data_iterator_from_epoch(device)
_test_resume_random_dataloader_from_iter(device, setup_sampler)
_test_resume_random_dataloader_from_epoch(device, setup_sampler)
_test_resume_random_dataloader_from_iter(device, setup_sampler, sampler_type="distributed")
_test_resume_random_dataloader_from_epoch(device, setup_sampler, sampler_type="distributed")


@pytest.mark.multinode_distributed
@pytest.mark.skipif("MULTINODE_DISTRIB" not in os.environ, reason="Skip if not multi-node distributed")
def test_multinode_distrib_cpu(distributed_context_multi_node_gloo):
device = "cpu"
_test_resume_random_data_iterator_from_iter(device)
_test_resume_random_data_iterator_from_epoch(device)
_test_resume_random_dataloader_from_iter(device, setup_sampler)
_test_resume_random_dataloader_from_epoch(device, setup_sampler)
_test_resume_random_dataloader_from_iter(device, setup_sampler, sampler_type="distributed")
_test_resume_random_dataloader_from_epoch(device, setup_sampler, sampler_type="distributed")


@pytest.mark.multinode_distributed
@pytest.mark.skipif("GPU_MULTINODE_DISTRIB" not in os.environ, reason="Skip if not multi-node distributed")
def test_multinode_distrib_gpu(distributed_context_multi_node_nccl):
device = "cuda:{}".format(distributed_context_multi_node_nccl["local_rank"])
_test_resume_random_data_iterator_from_iter(device)
_test_resume_random_data_iterator_from_epoch(device)
_test_resume_random_dataloader_from_iter(device, setup_sampler)
_test_resume_random_dataloader_from_epoch(device, setup_sampler)
_test_resume_random_dataloader_from_iter(device, setup_sampler, sampler_type="distributed")
_test_resume_random_dataloader_from_epoch(device, setup_sampler, sampler_type="distributed")


def test_concepts_snippet_resume():
Expand Down
2 changes: 1 addition & 1 deletion tests/ignite/engine/test_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,7 @@ def test_multinode_distrib_gpu(distributed_context_multi_node_nccl):

def test_engine_with_iterable_dataloader():

from torch.utils.data import DataLoader, IterableDataset, get_worker_info
from torch.utils.data import DataLoader, IterableDataset

class MyIterableDataset(IterableDataset):
def __init__(self, start, end):
Expand Down
12 changes: 7 additions & 5 deletions tests/run_gpu_tests.sh
Original file line number Diff line number Diff line change
@@ -1,22 +1,24 @@
#!/bin/bash

if [ -z "$1" ]; then
ws=1
ngpus=1
else
ws=$1
ngpus=$1
fi

set -xeu

py.test --cov ignite --cov-report term-missing -vvv tests/ -k 'on_cuda'

if [ "$ws" -eq "1" ]; then
if [ "${ngpus}" -eq "1" ]; then

py.test --cov ignite --cov-append --cov-report term-missing -vvv tests/ -m distributed

else

export WORLD_SIZE=$ws
py.test --cov ignite --cov-append --cov-report term-missing --dist=each --tx $WORLD_SIZE*popen//python=python3.7 tests -m distributed -vvv
py.test --cov ignite --cov-append --cov-report term-missing -vvv tests/ -m distributed

export WORLD_SIZE=${ngpus}
py.test --cov ignite --cov-append --cov-report term-missing --dist=each --tx ${WORLD_SIZE}*popen//python=python3.7 tests -m distributed -vvv

fi