Skip to content

Commit

Permalink
Separate profiling tests from p2p tests
Browse files Browse the repository at this point in the history
Pull Request resolved: #56412

We are investigating some flaky profiiling tests such as #56146. One issue is that the profiling tests are tightly coupled to these send/recv tests, hence if this test is disabled, we lose signal round send/recv collectives tests.

To mitigate this, separate the tests into ones that only test send/recv, and ones that test it with profiling. This way flakiness should not result in the send/recv only tests being disabled.
ghstack-source-id: 126920867

Differential Revision: [D27864845](https://our.internmc.facebook.com/intern/diff/D27864845/)
  • Loading branch information
rohan-varma committed Apr 20, 2021
1 parent 07653b7 commit c16e36c
Showing 1 changed file with 126 additions and 83 deletions.
209 changes: 126 additions & 83 deletions torch/testing/_internal/distributed/distributed_test.py
Expand Up @@ -331,6 +331,13 @@ def _build_multidim_tensor(dim, dim_size, value=None, dtype=torch.float):
value = size
return torch.empty(size=[dim_size for _ in range(dim)], dtype=dtype).fill_(value)

def _create_autograd_profiler(enable_profiling):
return (
torch.autograd.profiler.profile(record_shapes=True)
if enable_profiling else
suppress()
)


class Barrier(object):
barrier_id = 0
Expand Down Expand Up @@ -942,12 +949,12 @@ def test_send_recv_nccl(self):
self._barrier()

# SEND RECV
@unittest.skipIf(BACKEND == "nccl", "Nccl does not support send/recv")
def test_send_recv(self):
def _test_send_recv(self, enable_profiling):
rank = dist.get_rank()
send_size = rank + 1
tensor = _build_tensor(send_size)
with torch.autograd.profiler.profile(record_shapes=True) as prof:
profiler_ctx = _create_autograd_profiler(enable_profiling)
with profiler_ctx as prof:
for src in range(0, dist.get_world_size()):
if src == rank:
# Send mode
Expand All @@ -963,36 +970,41 @@ def test_send_recv(self):
dist.recv(output_tensor, src)
self.assertEqual(output_tensor, expected_tensor)

self._barrier()
if enable_profiling:
backend = dist.get_backend()
if backend in SEND_RECV_PROFILING_SUPPORTED_BACKENDS:
for event_name in [f"{backend}:send", f"{backend}:recv"]:
events = get_profiling_event(event_name, prof)
# Each rank sends/recvs from all other ranks.
event_count = sum(e.count for e in events)
expected_event_count = dist.get_world_size() - 1
self.assertEqual(event_count, expected_event_count)
# Event order is not deterministic, so simply assert their shape
# is found in the following list.
expected_shapes = [
[[rank + 1] * 3] for rank in range(dist.get_world_size())
]
for event in events:
self.assertTrue(event.input_shapes in expected_shapes)

backend = dist.get_backend()
if backend in SEND_RECV_PROFILING_SUPPORTED_BACKENDS:
for event_name in [f"{backend}:send", f"{backend}:recv"]:
events = get_profiling_event(event_name, prof)
# Each rank sends/recvs from all other ranks.
event_count = sum(e.count for e in events)
expected_event_count = dist.get_world_size() - 1
self.assertEqual(event_count, expected_event_count)
# Event order is not deterministic, so simply assert their shape
# is found in the following list.
expected_shapes = [
[[rank + 1] * 3] for rank in range(dist.get_world_size())
]
for event in events:
self.assertTrue(event.input_shapes in expected_shapes)
@unittest.skipIf(BACKEND == "nccl", "Nccl send/recv tested by test_send_recv_nccl")
def test_send_recv(self):
self._test_send_recv(enable_profiling=False)

@unittest.skipIf(BACKEND == "nccl", "Nccl send/recv tested by test_send_recv_nccl")
def test_send_recv_autograd_profiler(self):
self._test_send_recv(enable_profiling=True)

# SEND RECV ANY SOURCE
@unittest.skipIf(
BACKEND == "nccl", "Nccl does not support send/recv from any source"
)
def test_send_recv_any_source(self):
def _test_send_recv_any_source(self, enable_profiling):
rank = dist.get_rank()
send_recv_size = 10
tensor = _build_tensor(send_recv_size, value=rank)
recv_ranks = list()
irecv_ranks = list()

with torch.autograd.profiler.profile(record_shapes=True) as prof:
profiler_ctx = _create_autograd_profiler(enable_profiling)
with profiler_ctx as prof:
for dst in range(0, dist.get_world_size()):
if dst == rank:
# Recv mode
Expand Down Expand Up @@ -1021,39 +1033,52 @@ def test_send_recv_any_source(self):
dist.send(tensor, dst) # recv
dist.send(tensor, dst) # irecv

backend = dist.get_backend()
if backend in SEND_RECV_PROFILING_SUPPORTED_BACKENDS:
for event_name in [f"{backend}:send", f"{backend}:recvAnySource"]:
events = get_profiling_event(event_name, prof)
# Each rank sends/recvs from other rank twice.
self.assertEqual(sum(event.count for event in events), 2 * (dist.get_world_size() - 1))
for event in events:
self.assertEqual(event.input_shapes, [[send_recv_size] * 3])

# Each rank would have 2 * (world_size - 1) sends, verify that
# globally we receive the same amount on the other end.
recv_ranks_tensor = torch.cat((torch.tensor(recv_ranks), torch.tensor(irecv_ranks)), 0)
global_recv_ranks = [torch.empty_like(recv_ranks_tensor) for _ in range(dist.get_world_size())]
dist.all_gather(global_recv_ranks, recv_ranks_tensor)
global_recv_ranks_list = []
for tensor in global_recv_ranks:
global_recv_ranks_list += tensor.tolist()

from itertools import groupby
global_recv_ranks_list.sort()
frequency = [len(list(group)) for key, group in groupby(global_recv_ranks_list)]
self.assertEqual(dist.get_world_size(), len(frequency))
self.assertEqual([2 * (dist.get_world_size() - 1)] * dist.get_world_size(), frequency)
self._barrier()
if enable_profiling:
backend = dist.get_backend()
if backend in SEND_RECV_PROFILING_SUPPORTED_BACKENDS:
for event_name in [f"{backend}:send", f"{backend}:recvAnySource"]:
events = get_profiling_event(event_name, prof)
# Each rank sends/recvs from other rank twice.
self.assertEqual(sum(event.count for event in events), 2 * (dist.get_world_size() - 1))
for event in events:
self.assertEqual(event.input_shapes, [[send_recv_size] * 3])

# Each rank would have 2 * (world_size - 1) sends, verify that
# globally we receive the same amount on the other end.
recv_ranks_tensor = torch.cat((torch.tensor(recv_ranks), torch.tensor(irecv_ranks)), 0)
global_recv_ranks = [torch.empty_like(recv_ranks_tensor) for _ in range(dist.get_world_size())]
dist.all_gather(global_recv_ranks, recv_ranks_tensor)
global_recv_ranks_list = []
for tensor in global_recv_ranks:
global_recv_ranks_list += tensor.tolist()

from itertools import groupby
global_recv_ranks_list.sort()
frequency = [len(list(group)) for key, group in groupby(global_recv_ranks_list)]
self.assertEqual(dist.get_world_size(), len(frequency))
self.assertEqual([2 * (dist.get_world_size() - 1)] * dist.get_world_size(), frequency)
self._barrier()

@unittest.skipIf(
BACKEND == "nccl", "Nccl does not support send/recv from any source"
)
def test_send_recv_any_source(self):
self._test_send_recv_any_source(enable_profiling=False)

@unittest.skipIf(
BACKEND == "nccl", "Nccl does not support send/recv from any source"
)
def test_send_recv_any_source_autograd_profiler(self):
self._test_send_recv_any_source(enable_profiling=True)

# SEND RECV WITH TAG
@unittest.skipIf(BACKEND == "nccl", "Nccl does not support send/recv")
def test_send_recv_with_tag(self):
def _test_send_recv_with_tag(self, enable_profiling):
rank = dist.get_rank()
world_size = dist.get_world_size()
send_recv_size = 10
tensor = _build_tensor(send_recv_size, value=rank)
with torch.autograd.profiler.profile(record_shapes=True) as prof:
profiler_ctx = _create_autograd_profiler(enable_profiling)
with profiler_ctx as prof:
for dst in range(0, world_size):
if dst == rank:
# Recv mode
Expand All @@ -1067,24 +1092,33 @@ def test_send_recv_with_tag(self):
# Send mode
dist.send(tensor, dst, tag=rank)

backend = dist.get_backend()
if backend in SEND_RECV_PROFILING_SUPPORTED_BACKENDS:
for event_name in [f"{backend}:send", f"{backend}:recv"]:
events = get_profiling_event(event_name, prof)
# Each rank sends/recvs from all other ranks
event_count = sum(e.count for e in events)
expected_event_count = dist.get_world_size() - 1
self.assertEqual(event_count, expected_event_count)
for event in events:
self.assertEqual(event.name, event_name)
self.assertEqual(event.input_shapes, [[send_recv_size] * 3])
if enable_profiling:
backend = dist.get_backend()
if backend in SEND_RECV_PROFILING_SUPPORTED_BACKENDS:
for event_name in [f"{backend}:send", f"{backend}:recv"]:
events = get_profiling_event(event_name, prof)
# Each rank sends/recvs from all other ranks
event_count = sum(e.count for e in events)
expected_event_count = dist.get_world_size() - 1
self.assertEqual(event_count, expected_event_count)
for event in events:
self.assertEqual(event.name, event_name)
self.assertEqual(event.input_shapes, [[send_recv_size] * 3])

@unittest.skipIf(BACKEND == "nccl", "NCCL send/recv tested by test_send_recv_nccl")
def test_send_recv_with_tag(self):
self._test_send_recv_with_tag(enable_profiling=False)

@unittest.skipIf(BACKEND == "nccl", "NCCL send/recv tested by test_send_recv_nccl")
def test_send_recv_with_tag_autograd_profiler(self):
self._test_send_recv_with_tag(enable_profiling=True)

# ISEND
@unittest.skipIf(BACKEND == "nccl", "Nccl does not support isend")
def test_isend(self):
def _test_isend(self, enable_profiling):
rank = dist.get_rank()
world_size = dist.get_world_size()
with torch.autograd.profiler.profile(record_shapes=True) as prof:
profiler_ctx = _create_autograd_profiler(enable_profiling)
with profiler_ctx as prof:
if rank == 0:
requests = [
dist.isend(_build_tensor(dest, 10), dest)
Expand All @@ -1100,24 +1134,33 @@ def test_isend(self):

self._barrier()

backend = dist.get_backend()
if backend in SEND_RECV_PROFILING_SUPPORTED_BACKENDS:
expected_event_name = f"{backend}:send" if rank == 0 else f"{backend}:recv"
events = get_profiling_event(expected_event_name, prof)
event_count = sum(e.count for e in events)
expected_count = dist.get_world_size() - 1 if rank == 0 else 1
self.assertEqual(expected_count, event_count)
# Event ordering is not guaranteed, so simply ensure the shapes are
# found in the following map.
expected_shapes = {
r: [[r] * 3] for r in range(1, dist.get_world_size())
}
for event in events:
self.assertEqual(event.name, expected_event_name)
if rank == 0:
self.assertTrue(event.input_shapes in expected_shapes.values())
else:
self.assertEqual(event.input_shapes, expected_shapes[rank])
if enable_profiling:
backend = dist.get_backend()
if backend in SEND_RECV_PROFILING_SUPPORTED_BACKENDS:
expected_event_name = f"{backend}:send" if rank == 0 else f"{backend}:recv"
events = get_profiling_event(expected_event_name, prof)
event_count = sum(e.count for e in events)
expected_count = dist.get_world_size() - 1 if rank == 0 else 1
self.assertEqual(expected_count, event_count)
# Event ordering is not guaranteed, so simply ensure the shapes are
# found in the following map.
expected_shapes = {
r: [[r] * 3] for r in range(1, dist.get_world_size())
}
for event in events:
self.assertEqual(event.name, expected_event_name)
if rank == 0:
self.assertTrue(event.input_shapes in expected_shapes.values())
else:
self.assertEqual(event.input_shapes, expected_shapes[rank])

@unittest.skipIf(BACKEND == "nccl", "Nccl does not support isend")
def test_isend(self):
self._test_isend(enable_profiling=False)

@unittest.skipIf(BACKEND == "nccl", "Nccl does not support isend")
def test_isend_autograd_profiler(self):
self._test_isend(enable_profiling=True)

# IRECV
@unittest.skipIf(BACKEND == "nccl", "Nccl does not support irecv")
Expand Down

0 comments on commit c16e36c

Please sign in to comment.