Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 38 additions & 38 deletions torch/testing/_internal/distributed/distributed_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -391,12 +391,12 @@ def get_timeout(test_id):
def require_backend(backends):
if BACKEND not in backends:
return skip_but_pass_in_sandcastle(
"Test requires backend to be one of %s" % backends
f"Test requires backend {BACKEND} to be one of {backends}"
)
return lambda func: func


def require_backends_available(backends):
def require_backend_is_available(backends):
def check(backend):
if backend == dist.Backend.GLOO:
return dist.is_gloo_available()
Expand All @@ -410,9 +410,9 @@ def check(backend):
return True
return False

if not all(check(dist.Backend(backend)) for backend in backends):
if not check(dist.Backend(BACKEND)):
return skip_but_pass_in_sandcastle(
"Test requires backends to be available %s" % backends
f"Test requires backend {BACKEND} to be available"
)
return lambda func: func

Expand Down Expand Up @@ -869,14 +869,14 @@ def _test_group_override_backend(self, initializer):
self.assertEqual(_build_tensor(2, value=0), tensor.to("cpu"))

@require_backend(DistTestCases.backend_feature["gpu"])
@require_backends_available(DistTestCases.backend_feature["gpu"])
@require_backend_is_available(DistTestCases.backend_feature["gpu"])
@require_world_size(3)
@skip_if_lt_x_gpu(2)
def test_backend_group(self):
self._test_group_override_backend(self._init_group_test)

@require_backend(DistTestCases.backend_feature["gpu"])
@require_backends_available(DistTestCases.backend_feature["gpu"])
@require_backend_is_available(DistTestCases.backend_feature["gpu"])
@skip_if_lt_x_gpu(3)
def test_backend_full_group(self):
self._test_group_override_backend(self._init_full_group_test)
Expand Down Expand Up @@ -6655,7 +6655,7 @@ def _run_reduction_test(
self.assertEqual(tensor, expected_tensor)

@require_backend({"nccl"})
@require_backends_available({"nccl"})
@require_backend_is_available({"nccl"})
@skip_if_lt_x_gpu(2)
def test_nccl_backend_bool_allreduce(self):
torch.cuda.set_device(self.rank)
Expand Down Expand Up @@ -6683,7 +6683,7 @@ def test_nccl_backend_bool_allreduce(self):
# these once it is supported.

@require_backend({"nccl"})
@require_backends_available({"nccl"})
@require_backend_is_available({"nccl"})
@skip_if_lt_x_gpu(2)
def test_nccl_backend_bool_allgather(self):
torch.cuda.set_device(self.rank)
Expand All @@ -6706,7 +6706,7 @@ def test_nccl_backend_bool_allgather(self):
self.assertEqual(input_tensor_copy, input_tensor)

@require_backend({"nccl"})
@require_backends_available({"nccl"})
@require_backend_is_available({"nccl"})
@skip_if_lt_x_gpu(int(os.environ["WORLD_SIZE"]))
def test_nccl_backend_bool_reduce(self):
torch.cuda.set_device(self.rank)
Expand Down Expand Up @@ -6734,7 +6734,7 @@ def test_nccl_backend_bool_reduce(self):
self._run_reduction_test(input_tensor, expected, op, dist.reduce, dst=0)

@require_backend({"nccl"})
@require_backends_available({"nccl"})
@require_backend_is_available({"nccl"})
@skip_if_lt_x_gpu(2)
def test_nccl_backend_bool_broadcast(self):
tensor_size = 10
Expand Down Expand Up @@ -7128,14 +7128,14 @@ def _test_ddp_profiling(self, profiler_ctx):
self.assertEqual(len(events), 1)

@require_backend(DistTestCases.backend_feature["gpu"])
@require_backends_available(DistTestCases.backend_feature["gpu"])
@require_backend_is_available(DistTestCases.backend_feature["gpu"])
@skip_if_lt_x_gpu(2)
def test_ddp_profiling_autograd_profiler(self):
autograd_profiler_ctx = torch.autograd.profiler.profile()
return self._test_ddp_profiling(profiler_ctx=autograd_profiler_ctx)

@require_backend(DistTestCases.backend_feature["gpu"])
@require_backends_available(DistTestCases.backend_feature["gpu"])
@require_backend_is_available(DistTestCases.backend_feature["gpu"])
@skip_if_lt_x_gpu(2)
@skip_but_pass_in_sandcastle_if(IS_FBCODE, "Kineto in fbcode code causes hang")
@skip_but_pass_in_sandcastle_if(
Expand Down Expand Up @@ -7803,15 +7803,15 @@ def forward(self, x):
torch.cuda.synchronize(device=self.rank)

@require_backend(DistTestCases.backend_feature["gpu"])
@require_backends_available(DistTestCases.backend_feature["gpu"])
@require_backend_is_available(DistTestCases.backend_feature["gpu"])
@skip_if_lt_x_gpu(2)
def test_ddp_ignore_params_arg(self):
self._test_ddp_ignore_params_arg(static_graph=False)
self._test_ddp_ignore_params_arg(static_graph=True)

@with_dist_debug_levels(levels=["OFF", "INFO", "DETAIL"])
@require_backend(DistTestCases.backend_feature["gpu"])
@require_backends_available(DistTestCases.backend_feature["gpu"])
@require_backend_is_available(DistTestCases.backend_feature["gpu"])
@skip_if_lt_x_gpu(2)
def test_ddp_unused_params_rebuild_buckets_exception(self):
class ToyModel(nn.Module):
Expand Down Expand Up @@ -7864,7 +7864,7 @@ def forward(self, x):
dist.barrier()

@require_backend(DistTestCases.backend_feature["gpu"])
@require_backends_available(DistTestCases.backend_feature["gpu"])
@require_backend_is_available(DistTestCases.backend_feature["gpu"])
@skip_if_lt_x_gpu(2)
def test_ddp_shared_grad_acc_unused_params(self):
# When find_unused_parameters=True, ensure we mark unused parameters
Expand Down Expand Up @@ -7901,7 +7901,7 @@ def forward(self, x):
loss.backward()

@require_backend(DistTestCases.backend_feature["gpu"])
@require_backends_available(DistTestCases.backend_feature["gpu"])
@require_backend_is_available(DistTestCases.backend_feature["gpu"])
@skip_if_lt_x_gpu(2)
def test_ddp_device(self):
m = nn.Linear(10, 10).to(self.rank)
Expand Down Expand Up @@ -8006,7 +8006,7 @@ def train_iter(inp, input_type):
train_iter(inp, type(inp))

@require_backend(DistTestCases.backend_feature["gpu"])
@require_backends_available(DistTestCases.backend_feature["gpu"])
@require_backend_is_available(DistTestCases.backend_feature["gpu"])
@skip_if_lt_x_gpu(2)
def test_ddp_namedtuple(self):
batch = 5
Expand Down Expand Up @@ -8043,7 +8043,7 @@ def forward(_self, input, expected_type): # noqa: B902

@with_dist_debug_levels(levels=["OFF", "INFO", "DETAIL"])
@require_backend(DistTestCases.backend_feature["gpu"])
@require_backends_available(DistTestCases.backend_feature["gpu"])
@require_backend_is_available(DistTestCases.backend_feature["gpu"])
@skip_if_lt_x_gpu(2)
def test_ddp_control_flow_same_across_ranks(self):
# Control flow that is the same across ranks.
Expand Down Expand Up @@ -8126,7 +8126,7 @@ def test_ddp_control_flow_same_across_ranks(self):
dist.barrier()

@require_backend(DistTestCases.backend_feature["gpu"])
@require_backends_available(DistTestCases.backend_feature["gpu"])
@require_backend_is_available(DistTestCases.backend_feature["gpu"])
@skip_if_lt_x_gpu(2)
def test_invalid_static_graph(self):
world_size = dist.get_world_size()
Expand Down Expand Up @@ -8176,7 +8176,7 @@ def test_invalid_static_graph(self):

@with_dist_debug_levels(levels=["OFF", "INFO", "DETAIL"])
@require_backend(DistTestCases.backend_feature["gpu"])
@require_backends_available(DistTestCases.backend_feature["gpu"])
@require_backend_is_available(DistTestCases.backend_feature["gpu"])
@skip_if_lt_x_gpu(2)
def test_ddp_control_flow_different_across_ranks(self):
# Control flow that is different across ranks.
Expand Down Expand Up @@ -8365,13 +8365,13 @@ def _test_compute_bucket_assignment_by_size(self, use_logger):
dist.barrier(group_gloo)

@require_backend(DistTestCases.backend_feature["gpu"])
@require_backends_available(DistTestCases.backend_feature["gpu"])
@require_backend_is_available(DistTestCases.backend_feature["gpu"])
@skip_if_lt_x_gpu(2)
def test_compute_bucket_assignment_by_size_sparse_error_without_logger(self):
self._test_compute_bucket_assignment_by_size(use_logger=False)

@require_backend(DistTestCases.backend_feature["gpu"])
@require_backends_available(DistTestCases.backend_feature["gpu"])
@require_backend_is_available(DistTestCases.backend_feature["gpu"])
@skip_if_lt_x_gpu(2)
def test_compute_bucket_assignment_by_size_sparse_error_with_logger(self):
self._test_compute_bucket_assignment_by_size(use_logger=True)
Expand Down Expand Up @@ -8460,7 +8460,7 @@ def _test_verify_model_across_rank(self, use_logger):
dist.barrier(group_gloo)

@require_backend(DistTestCases.backend_feature["gpu"])
@require_backends_available(DistTestCases.backend_feature["gpu"])
@require_backend_is_available(DistTestCases.backend_feature["gpu"])
@skip_but_pass_in_sandcastle_if(
BACKEND == "ucc" and IS_SANDCASTLE, "Skipped internally"
)
Expand All @@ -8469,7 +8469,7 @@ def test_verify_model_across_rank_with_logger(self):
self._test_verify_model_across_rank(use_logger=True)

@require_backend(DistTestCases.backend_feature["gpu"])
@require_backends_available(DistTestCases.backend_feature["gpu"])
@require_backend_is_available(DistTestCases.backend_feature["gpu"])
@skip_but_pass_in_sandcastle_if(
BACKEND == "ucc" and IS_SANDCASTLE, "Skipped internally"
)
Expand All @@ -8493,7 +8493,7 @@ def _run_test_ddp_model_with_diff_params(self, ctx, net, ddp_group, group_gloo):
dist.barrier(group_gloo)

@require_backend(DistTestCases.backend_feature["gpu"])
@require_backends_available(DistTestCases.backend_feature["gpu"])
@require_backend_is_available(DistTestCases.backend_feature["gpu"])
@skip_but_pass_in_sandcastle_if(
BACKEND == "ucc" and IS_SANDCASTLE, "Skipped internally"
)
Expand All @@ -8520,7 +8520,7 @@ def test_ddp_model_diff_shape_across_ranks(self):
)

@require_backend(DistTestCases.backend_feature["gpu"])
@require_backends_available(DistTestCases.backend_feature["gpu"])
@require_backend_is_available(DistTestCases.backend_feature["gpu"])
@skip_but_pass_in_sandcastle_if(
BACKEND == "ucc" and IS_SANDCASTLE, "Skipped internally"
)
Expand Down Expand Up @@ -8738,7 +8738,7 @@ def forward(self, x):
return ddp_model

@require_backend(DistTestCases.backend_feature["gpu"])
@require_backends_available(DistTestCases.backend_feature["gpu"])
@require_backend_is_available(DistTestCases.backend_feature["gpu"])
@skip_if_lt_x_gpu(2)
def test_different_graph_across_ranks(self):
base_model = self._test_different_graph_across_ranks(
Expand All @@ -8755,7 +8755,7 @@ def test_different_graph_across_ranks(self):
self.assertEqual(i, j)

@require_backend({"gloo"})
@require_backends_available({"gloo"})
@require_backend_is_available({"gloo"})
@skip_but_pass_in_sandcastle_if(
IS_MACOS or IS_WINDOWS,
"MacOS uses uv transport which does not have as robust error handling as tcp transport",
Expand Down Expand Up @@ -8796,7 +8796,7 @@ def test_monitored_barrier_gloo(self):
self._barrier(timeout=30)

@require_backend({"gloo"})
@require_backends_available({"gloo"})
@require_backend_is_available({"gloo"})
def test_monitored_barrier_gloo_subgroup(self):
# Tests that monitored_barrier works as expected on non-default
# process groups.
Expand Down Expand Up @@ -8873,7 +8873,7 @@ def _test_monitored_barrier_allreduce_hang(self, wait_all_ranks):

@with_nccl_blocking_wait
@require_backend(DistTestCases.backend_feature["gpu"])
@require_backends_available(DistTestCases.backend_feature["gpu"])
@require_backend_is_available(DistTestCases.backend_feature["gpu"])
@skip_if_lt_x_gpu(int(os.environ["WORLD_SIZE"]))
def test_monitored_barrier_allreduce_hang(self):
# tests expected behavior when nonzero rank hangs and we want to
Expand All @@ -8882,15 +8882,15 @@ def test_monitored_barrier_allreduce_hang(self):

@with_nccl_blocking_wait
@require_backend(DistTestCases.backend_feature["gpu"])
@require_backends_available(DistTestCases.backend_feature["gpu"])
@require_backend_is_available(DistTestCases.backend_feature["gpu"])
@skip_if_lt_x_gpu(int(os.environ["WORLD_SIZE"]))
def test_monitored_barrier_allreduce_hang_wait_all_ranks(self):
# tests expected behavior when nonzero rank hangs and we want to
# report all timed out ranks.
self._test_monitored_barrier_allreduce_hang(wait_all_ranks=True)

@require_backend({"gloo"})
@require_backends_available({"gloo"})
@require_backend_is_available({"gloo"})
def test_monitored_barrier_gloo_rank_0_timeout(self):
# tests error when rank 0 exhausts its given timeout.
process_group = dist.new_group(ranks=list(range(int(self.world_size))))
Expand All @@ -8902,7 +8902,7 @@ def test_monitored_barrier_gloo_rank_0_timeout(self):
process_group.monitored_barrier(timeout)

@require_backend({"gloo"})
@require_backends_available({"gloo"})
@require_backend_is_available({"gloo"})
@skip_if_small_worldsize
@skip_but_pass_in_sandcastle_if(
IS_MACOS or IS_WINDOWS,
Expand Down Expand Up @@ -8930,7 +8930,7 @@ def test_monitored_barrier_failure_order(self):
dist.monitored_barrier(timeout=timeout)

@require_backend({"gloo"})
@require_backends_available({"gloo"})
@require_backend_is_available({"gloo"})
@skip_if_small_worldsize
def test_monitored_barrier_wait_all_ranks(self):
# Tests simple case where > 1 rank does not call into monitored
Expand All @@ -8943,7 +8943,7 @@ def test_monitored_barrier_wait_all_ranks(self):
dist.monitored_barrier(timeout=timeout, wait_all_ranks=True)

@require_backend(DistTestCases.backend_feature["gpu"])
@require_backends_available(DistTestCases.backend_feature["gpu"])
@require_backend_is_available(DistTestCases.backend_feature["gpu"])
@with_dist_debug_levels(levels=["INFO"])
@skip_if_lt_x_gpu(2)
def test_ddp_build_debug_param_to_name_mapping(self):
Expand Down Expand Up @@ -9153,14 +9153,14 @@ def forward(self, x):

@with_dist_debug_levels(levels=["OFF", "INFO", "DETAIL"])
@require_backend(DistTestCases.backend_feature["gpu"])
@require_backends_available(DistTestCases.backend_feature["gpu"])
@require_backend_is_available(DistTestCases.backend_feature["gpu"])
@skip_if_lt_x_gpu(2)
def test_ddp_multiple_nested_unused_params_error(self):
self._test_ddp_multiple_nested_unused_params_error(ignore_sparse=False)

@with_dist_debug_levels(levels=["OFF", "INFO", "DETAIL"])
@require_backend(DistTestCases.backend_feature["gpu"])
@require_backends_available(DistTestCases.backend_feature["gpu"])
@require_backend_is_available(DistTestCases.backend_feature["gpu"])
@skip_if_lt_x_gpu(2)
def test_ddp_multiple_nested_unused_params_err_ignore_params(self):
# Tests unused parameter reporting when DDP is configured to ignore
Expand Down Expand Up @@ -9827,7 +9827,7 @@ def forward(self, x):
self.assertIsNone(module.module.buffer.grad)

@require_backend(DistTestCases.backend_feature["gpu"])
@require_backends_available(DistTestCases.backend_feature["gpu"])
@require_backend_is_available(DistTestCases.backend_feature["gpu"])
@skip_if_lt_x_gpu(2)
def test_ddp_forward_backward_hook(self):
class DummyTestModel(nn.Module):
Expand Down