Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rewrite test for Entropy metric #3216

Merged
merged 16 commits into from
Mar 23, 2024
166 changes: 50 additions & 116 deletions tests/ignite/metrics/test_entropy.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import os

import numpy as np
import pytest
import torch
from scipy.special import softmax
from scipy.stats import entropy as scipy_entropy

import ignite.distributed as idist

from ignite.engine import Engine
from ignite.exceptions import NotComputableError
from ignite.metrics import Entropy

Expand Down Expand Up @@ -65,68 +65,6 @@ def test_compute(n_times, test_case):
assert pytest.approx(ent.compute()) == np_res


def _test_distrib_integration(device, tol=1e-6):
from ignite.engine import Engine

rank = idist.get_rank()
torch.manual_seed(12 + rank)

def _test(metric_device):
n_iters = 100
batch_size = 10
n_cls = 50

y_true = torch.randint(0, n_cls, size=[n_iters * batch_size], dtype=torch.long).to(device)
y_preds = torch.normal(2.0, 3.0, size=(n_iters * batch_size, n_cls), dtype=torch.float).to(device)

def update(engine, i):
return (
y_preds[i * batch_size : (i + 1) * batch_size],
y_true[i * batch_size : (i + 1) * batch_size],
)

engine = Engine(update)

m = Entropy(device=metric_device)
m.attach(engine, "entropy")

data = list(range(n_iters))
engine.run(data=data, max_epochs=1)

y_preds = idist.all_gather(y_preds)
y_true = idist.all_gather(y_true)

assert "entropy" in engine.state.metrics
res = engine.state.metrics["entropy"]

true_res = np_entropy(y_preds.cpu().numpy())

assert pytest.approx(res, rel=tol) == true_res

_test("cpu")
if device.type != "xla":
_test(idist.device())


def _test_distrib_accumulator_device(device):
metric_devices = [torch.device("cpu")]
if device.type != "xla":
metric_devices.append(idist.device())
for metric_device in metric_devices:
device = torch.device(device)
ent = Entropy(device=metric_device)

for dev in [ent._device, ent._sum_of_entropies.device]:
assert dev == metric_device, f"{type(dev)}:{dev} vs {type(metric_device)}:{metric_device}"

y_pred = torch.tensor([[2.0], [-2.0]])
y = torch.zeros(2)
ent.update((y_pred, y))

for dev in [ent._device, ent._sum_of_entropies.device]:
assert dev == metric_device, f"{type(dev)}:{dev} vs {type(metric_device)}:{metric_device}"


def test_accumulator_detached():
ent = Entropy()

Expand All @@ -137,70 +75,66 @@ def test_accumulator_detached():
assert not ent._sum_of_entropies.requires_grad


@pytest.mark.distributed
@pytest.mark.skipif(not idist.has_native_dist_support, reason="Skip if no native dist support")
@pytest.mark.skipif(torch.cuda.device_count() < 1, reason="Skip if no GPU")
def test_distrib_nccl_gpu(distributed_context_single_node_nccl):
device = idist.device()
_test_distrib_integration(device)
_test_distrib_accumulator_device(device)
@pytest.mark.usefixtures("distributed")
class TestDistributed:
def test_integration(self):
tol = 1e-6
device = idist.device()
rank = idist.get_rank()
torch.manual_seed(12 + rank)

n_iters = 100
batch_size = 10
n_cls = 50

metric_devices = [torch.device("cpu")]
if device.type != "xla":
metric_devices.append(idist.device())

@pytest.mark.distributed
@pytest.mark.skipif(not idist.has_native_dist_support, reason="Skip if no native dist support")
def test_distrib_gloo_cpu_or_gpu(distributed_context_single_node_gloo):
device = idist.device()
_test_distrib_integration(device)
_test_distrib_accumulator_device(device)
for metric_device in metric_devices:
y_true = torch.randint(0, n_cls, size=[n_iters * batch_size], dtype=torch.long).to(device)
y_preds = torch.normal(2.0, 3.0, size=(n_iters * batch_size, n_cls), dtype=torch.float).to(device)

def update(engine, i):
return (
y_preds[i * batch_size : (i + 1) * batch_size],
y_true[i * batch_size : (i + 1) * batch_size],
)

@pytest.mark.distributed
@pytest.mark.skipif(not idist.has_hvd_support, reason="Skip if no Horovod dist support")
@pytest.mark.skipif("WORLD_SIZE" in os.environ, reason="Skip if launched as multiproc")
def test_distrib_hvd(gloo_hvd_executor):
device = torch.device("cpu" if not torch.cuda.is_available() else "cuda")
nproc = 4 if not torch.cuda.is_available() else torch.cuda.device_count()
engine = Engine(update)

gloo_hvd_executor(_test_distrib_integration, (device,), np=nproc, do_init=True)
gloo_hvd_executor(_test_distrib_accumulator_device, (device,), np=nproc, do_init=True)
m = Entropy(device=metric_device)
m.attach(engine, "entropy")

data = list(range(n_iters))
engine.run(data=data, max_epochs=1)

@pytest.mark.multinode_distributed
@pytest.mark.skipif(not idist.has_native_dist_support, reason="Skip if no native dist support")
@pytest.mark.skipif("MULTINODE_DISTRIB" not in os.environ, reason="Skip if not multi-node distributed")
def test_multinode_distrib_gloo_cpu_or_gpu(distributed_context_multi_node_gloo):
device = idist.device()
_test_distrib_integration(device)
_test_distrib_accumulator_device(device)
y_preds = idist.all_gather(y_preds)
y_true = idist.all_gather(y_true)

assert "entropy" in engine.state.metrics
res = engine.state.metrics["entropy"]

@pytest.mark.multinode_distributed
@pytest.mark.skipif(not idist.has_native_dist_support, reason="Skip if no native dist support")
@pytest.mark.skipif("GPU_MULTINODE_DISTRIB" not in os.environ, reason="Skip if not multi-node distributed")
def test_multinode_distrib_nccl_gpu(distributed_context_multi_node_nccl):
device = idist.device()
_test_distrib_integration(device)
_test_distrib_accumulator_device(device)
true_res = np_entropy(y_preds.cpu().numpy())

assert pytest.approx(res, rel=tol) == true_res

@pytest.mark.tpu
@pytest.mark.skipif("NUM_TPU_WORKERS" in os.environ, reason="Skip if NUM_TPU_WORKERS is in env vars")
@pytest.mark.skipif(not idist.has_xla_support, reason="Skip if no PyTorch XLA package")
def test_distrib_single_device_xla():
device = idist.device()
_test_distrib_integration(device, tol=1e-4)
_test_distrib_accumulator_device(device)
def test_accumulator_device(self):
device = idist.device()
metric_devices = [torch.device("cpu")]
if device.type != "xla":
metric_devices.append(idist.device())

for metric_device in metric_devices:
device = torch.device(device)
ent = Entropy(device=metric_device)

def _test_distrib_xla_nprocs(index):
device = idist.device()
_test_distrib_integration(device, tol=1e-4)
_test_distrib_accumulator_device(device)
for dev in [ent._device, ent._sum_of_entropies.device]:
assert dev == metric_device, f"{type(dev)}:{dev} vs {type(metric_device)}:{metric_device}"

y_pred = torch.tensor([[2.0], [-2.0]])
y = torch.zeros(2)
ent.update((y_pred, y))

@pytest.mark.tpu
@pytest.mark.skipif("NUM_TPU_WORKERS" not in os.environ, reason="Skip if no NUM_TPU_WORKERS in env vars")
@pytest.mark.skipif(not idist.has_xla_support, reason="Skip if no PyTorch XLA package")
def test_distrib_xla_nprocs(xmp_executor):
n = int(os.environ["NUM_TPU_WORKERS"])
xmp_executor(_test_distrib_xla_nprocs, args=(), nprocs=n)
for dev in [ent._device, ent._sum_of_entropies.device]:
assert dev == metric_device, f"{type(dev)}:{dev} vs {type(metric_device)}:{metric_device}"