From 4edfcc77822420cb2e5a486ee9ca9531e5990907 Mon Sep 17 00:00:00 2001 From: Rohan Varma Date: Mon, 19 Apr 2021 13:58:11 -0700 Subject: [PATCH 1/4] Separate profiling tests from p2p tests We are investigating some flaky profiiling tests such as https://github.com/pytorch/pytorch/issues/56146. One issue is that the profiling tests are tightly coupled to these send/recv tests, hence if this test is disabled, we lose signal round send/recv collectives tests. To mitigate this, separate the tests into ones that only test send/recv, and ones that test it with profiling. This way flakiness should not result in the send/recv only tests being disabled. Differential Revision: [D27864845](https://our.internmc.facebook.com/intern/diff/D27864845/) [ghstack-poisoned] --- .../_internal/distributed/distributed_test.py | 202 +++++++++++------- 1 file changed, 120 insertions(+), 82 deletions(-) diff --git a/torch/testing/_internal/distributed/distributed_test.py b/torch/testing/_internal/distributed/distributed_test.py index e99cbe5b945b5..d3a1b903d948a 100644 --- a/torch/testing/_internal/distributed/distributed_test.py +++ b/torch/testing/_internal/distributed/distributed_test.py @@ -942,12 +942,12 @@ def test_send_recv_nccl(self): self._barrier() # SEND RECV - @unittest.skipIf(BACKEND == "nccl", "Nccl does not support send/recv") - def test_send_recv(self): + def _test_send_recv(self, enable_profiling): rank = dist.get_rank() send_size = rank + 1 tensor = _build_tensor(send_size) - with torch.autograd.profiler.profile(record_shapes=True) as prof: + profile_ctx = torch.autograd.profiler.profile(record_shapes=True) if enable_profiling else contextlib.suppress() + with profile_ctx as prof: for src in range(0, dist.get_world_size()): if src == rank: # Send mode @@ -965,34 +965,41 @@ def test_send_recv(self): self._barrier() - backend = dist.get_backend() - if backend in SEND_RECV_PROFILING_SUPPORTED_BACKENDS: - for event_name in [f"{backend}:send", f"{backend}:recv"]: - events = get_profiling_event(event_name, prof) - # Each rank sends/recvs from all other ranks. - event_count = sum(e.count for e in events) - expected_event_count = dist.get_world_size() - 1 - self.assertEqual(event_count, expected_event_count) - # Event order is not deterministic, so simply assert their shape - # is found in the following list. - expected_shapes = [ - [[rank + 1] * 3] for rank in range(dist.get_world_size()) - ] - for event in events: - self.assertTrue(event.input_shapes in expected_shapes) + if enable_profiling: + backend = dist.get_backend() + if backend in SEND_RECV_PROFILING_SUPPORTED_BACKENDS: + for event_name in [f"{backend}:send", f"{backend}:recv"]: + events = get_profiling_event(event_name, prof) + # Each rank sends/recvs from all other ranks. + event_count = sum(e.count for e in events) + expected_event_count = dist.get_world_size() - 1 + self.assertEqual(event_count, expected_event_count) + # Event order is not deterministic, so simply assert their shape + # is found in the following list. + expected_shapes = [ + [[rank + 1] * 3] for rank in range(dist.get_world_size()) + ] + for event in events: + self.assertTrue(event.input_shapes in expected_shapes) + + @unittest.skipIf(BACKEND == "nccl", "Nccl send/recv tested by test_send_recv_nccl") + def test_send_recv(self): + self._test_send_recv(enable_profiling=False) + + @unittest.skipIf(BACKEND == "nccl", "Nccl send/recv tested by test_send_recv_nccl") + def test_send_recv_autograd_profiler(self): + self._test_send_recv(enable_profiling=True) # SEND RECV ANY SOURCE - @unittest.skipIf( - BACKEND == "nccl", "Nccl does not support send/recv from any source" - ) - def test_send_recv_any_source(self): + def _test_send_recv_any_source(self, enable_profiling): rank = dist.get_rank() send_recv_size = 10 tensor = _build_tensor(send_recv_size, value=rank) recv_ranks = list() irecv_ranks = list() - with torch.autograd.profiler.profile(record_shapes=True) as prof: + profile_ctx = torch.autograd.profiler.profile(record_shapes=True) if enable_profiling else contextlib.suppress() + with profile_ctx as prof: for dst in range(0, dist.get_world_size()): if dst == rank: # Recv mode @@ -1021,39 +1028,52 @@ def test_send_recv_any_source(self): dist.send(tensor, dst) # recv dist.send(tensor, dst) # irecv - backend = dist.get_backend() - if backend in SEND_RECV_PROFILING_SUPPORTED_BACKENDS: - for event_name in [f"{backend}:send", f"{backend}:recvAnySource"]: - events = get_profiling_event(event_name, prof) - # Each rank sends/recvs from other rank twice. - self.assertEqual(sum(event.count for event in events), 2 * (dist.get_world_size() - 1)) - for event in events: - self.assertEqual(event.input_shapes, [[send_recv_size] * 3]) - - # Each rank would have 2 * (world_size - 1) sends, verify that - # globally we receive the same amount on the other end. - recv_ranks_tensor = torch.cat((torch.tensor(recv_ranks), torch.tensor(irecv_ranks)), 0) - global_recv_ranks = [torch.empty_like(recv_ranks_tensor) for _ in range(dist.get_world_size())] - dist.all_gather(global_recv_ranks, recv_ranks_tensor) - global_recv_ranks_list = [] - for tensor in global_recv_ranks: - global_recv_ranks_list += tensor.tolist() - - from itertools import groupby - global_recv_ranks_list.sort() - frequency = [len(list(group)) for key, group in groupby(global_recv_ranks_list)] - self.assertEqual(dist.get_world_size(), len(frequency)) - self.assertEqual([2 * (dist.get_world_size() - 1)] * dist.get_world_size(), frequency) - self._barrier() + if enable_profiling: + backend = dist.get_backend() + if backend in SEND_RECV_PROFILING_SUPPORTED_BACKENDS: + for event_name in [f"{backend}:send", f"{backend}:recvAnySource"]: + events = get_profiling_event(event_name, prof) + # Each rank sends/recvs from other rank twice. + self.assertEqual(sum(event.count for event in events), 2 * (dist.get_world_size() - 1)) + for event in events: + self.assertEqual(event.input_shapes, [[send_recv_size] * 3]) + + # Each rank would have 2 * (world_size - 1) sends, verify that + # globally we receive the same amount on the other end. + recv_ranks_tensor = torch.cat((torch.tensor(recv_ranks), torch.tensor(irecv_ranks)), 0) + global_recv_ranks = [torch.empty_like(recv_ranks_tensor) for _ in range(dist.get_world_size())] + dist.all_gather(global_recv_ranks, recv_ranks_tensor) + global_recv_ranks_list = [] + for tensor in global_recv_ranks: + global_recv_ranks_list += tensor.tolist() + + from itertools import groupby + global_recv_ranks_list.sort() + frequency = [len(list(group)) for key, group in groupby(global_recv_ranks_list)] + self.assertEqual(dist.get_world_size(), len(frequency)) + self.assertEqual([2 * (dist.get_world_size() - 1)] * dist.get_world_size(), frequency) + self._barrier() + + @unittest.skipIf( + BACKEND == "nccl", "Nccl does not support send/recv from any source" + ) + def test_send_recv_any_source(self): + self._test_send_recv_any_source(enable_profiling=False) + + @unittest.skipIf( + BACKEND == "nccl", "Nccl does not support send/recv from any source" + ) + def test_send_recv_any_source_autograd_profiler(self): + self._test_send_recv_any_source(enable_profiling=True) # SEND RECV WITH TAG - @unittest.skipIf(BACKEND == "nccl", "Nccl does not support send/recv") - def test_send_recv_with_tag(self): + def _test_send_recv_with_tag(self, enable_profiling): rank = dist.get_rank() world_size = dist.get_world_size() send_recv_size = 10 tensor = _build_tensor(send_recv_size, value=rank) - with torch.autograd.profiler.profile(record_shapes=True) as prof: + profiler_ctx = torch.autograd.profiler.profile(record_shapes=True) if enable_profiling else contextlib.suppress() + with profiler_ctx as prof: for dst in range(0, world_size): if dst == rank: # Recv mode @@ -1067,24 +1087,33 @@ def test_send_recv_with_tag(self): # Send mode dist.send(tensor, dst, tag=rank) - backend = dist.get_backend() - if backend in SEND_RECV_PROFILING_SUPPORTED_BACKENDS: - for event_name in [f"{backend}:send", f"{backend}:recv"]: - events = get_profiling_event(event_name, prof) - # Each rank sends/recvs from all other ranks - event_count = sum(e.count for e in events) - expected_event_count = dist.get_world_size() - 1 - self.assertEqual(event_count, expected_event_count) - for event in events: - self.assertEqual(event.name, event_name) - self.assertEqual(event.input_shapes, [[send_recv_size] * 3]) + if enable_profiling: + backend = dist.get_backend() + if backend in SEND_RECV_PROFILING_SUPPORTED_BACKENDS: + for event_name in [f"{backend}:send", f"{backend}:recv"]: + events = get_profiling_event(event_name, prof) + # Each rank sends/recvs from all other ranks + event_count = sum(e.count for e in events) + expected_event_count = dist.get_world_size() - 1 + self.assertEqual(event_count, expected_event_count) + for event in events: + self.assertEqual(event.name, event_name) + self.assertEqual(event.input_shapes, [[send_recv_size] * 3]) + + @unittest.skipIf(BACKEND == "nccl", "NCCL send/recv tested by test_send_recv_nccl") + def test_send_recv_with_tag(self): + self._test_send_recv_with_tag(enable_profiling=False) + + @unittest.skipIf(BACKEND == "nccl", "NCCL send/recv tested by test_send_recv_nccl") + def test_send_recv_with_tag_autograd_profiler(self): + self._test_send_recv_with_tag(enable_profiling=True) # ISEND - @unittest.skipIf(BACKEND == "nccl", "Nccl does not support isend") - def test_isend(self): + def _test_isend(self, enable_profiling): rank = dist.get_rank() world_size = dist.get_world_size() - with torch.autograd.profiler.profile(record_shapes=True) as prof: + profiler_ctx = torch.autograd.profiler.profile(record_shapes=True) if enable_profiling else contextlib.suppress() + with profiler_ctx as prof: if rank == 0: requests = [ dist.isend(_build_tensor(dest, 10), dest) @@ -1100,24 +1129,33 @@ def test_isend(self): self._barrier() - backend = dist.get_backend() - if backend in SEND_RECV_PROFILING_SUPPORTED_BACKENDS: - expected_event_name = f"{backend}:send" if rank == 0 else f"{backend}:recv" - events = get_profiling_event(expected_event_name, prof) - event_count = sum(e.count for e in events) - expected_count = dist.get_world_size() - 1 if rank == 0 else 1 - self.assertEqual(expected_count, event_count) - # Event ordering is not guaranteed, so simply ensure the shapes are - # found in the following map. - expected_shapes = { - r: [[r] * 3] for r in range(1, dist.get_world_size()) - } - for event in events: - self.assertEqual(event.name, expected_event_name) - if rank == 0: - self.assertTrue(event.input_shapes in expected_shapes.values()) - else: - self.assertEqual(event.input_shapes, expected_shapes[rank]) + if enable_profiling: + backend = dist.get_backend() + if backend in SEND_RECV_PROFILING_SUPPORTED_BACKENDS: + expected_event_name = f"{backend}:send" if rank == 0 else f"{backend}:recv" + events = get_profiling_event(expected_event_name, prof) + event_count = sum(e.count for e in events) + expected_count = dist.get_world_size() - 1 if rank == 0 else 1 + self.assertEqual(expected_count, event_count) + # Event ordering is not guaranteed, so simply ensure the shapes are + # found in the following map. + expected_shapes = { + r: [[r] * 3] for r in range(1, dist.get_world_size()) + } + for event in events: + self.assertEqual(event.name, expected_event_name) + if rank == 0: + self.assertTrue(event.input_shapes in expected_shapes.values()) + else: + self.assertEqual(event.input_shapes, expected_shapes[rank]) + + @unittest.skipIf(BACKEND == "nccl", "Nccl does not support isend") + def test_isend(self): + self._test_isend(enable_profiling=False) + + @unittest.skipIf(BACKEND == "nccl", "Nccl does not support isend") + def test_isend_autograd_profiler(self): + self._test_isend(enable_profiling=True) # IRECV @unittest.skipIf(BACKEND == "nccl", "Nccl does not support irecv") From cae61e11f4b63e7089d0028799c450e34c16814c Mon Sep 17 00:00:00 2001 From: Rohan Varma Date: Mon, 19 Apr 2021 14:02:54 -0700 Subject: [PATCH 2/4] Update on "Separate profiling tests from p2p tests" We are investigating some flaky profiiling tests such as https://github.com/pytorch/pytorch/issues/56146. One issue is that the profiling tests are tightly coupled to these send/recv tests, hence if this test is disabled, we lose signal round send/recv collectives tests. To mitigate this, separate the tests into ones that only test send/recv, and ones that test it with profiling. This way flakiness should not result in the send/recv only tests being disabled. Differential Revision: [D27864845](https://our.internmc.facebook.com/intern/diff/D27864845/) [ghstack-poisoned] --- .../_internal/distributed/distributed_test.py | 24 +++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) diff --git a/torch/testing/_internal/distributed/distributed_test.py b/torch/testing/_internal/distributed/distributed_test.py index d3a1b903d948a..39f93537f4fa9 100644 --- a/torch/testing/_internal/distributed/distributed_test.py +++ b/torch/testing/_internal/distributed/distributed_test.py @@ -946,7 +946,11 @@ def _test_send_recv(self, enable_profiling): rank = dist.get_rank() send_size = rank + 1 tensor = _build_tensor(send_size) - profile_ctx = torch.autograd.profiler.profile(record_shapes=True) if enable_profiling else contextlib.suppress() + profile_ctx = ( + torch.autograd.profiler.profile(record_shapes=True) + if enable_profiling else + suppress() + ) with profile_ctx as prof: for src in range(0, dist.get_world_size()): if src == rank: @@ -998,7 +1002,11 @@ def _test_send_recv_any_source(self, enable_profiling): recv_ranks = list() irecv_ranks = list() - profile_ctx = torch.autograd.profiler.profile(record_shapes=True) if enable_profiling else contextlib.suppress() + profile_ctx = ( + torch.autograd.profiler.profile(record_shapes=True) + if enable_profiling else + suppress() + ) with profile_ctx as prof: for dst in range(0, dist.get_world_size()): if dst == rank: @@ -1072,7 +1080,11 @@ def _test_send_recv_with_tag(self, enable_profiling): world_size = dist.get_world_size() send_recv_size = 10 tensor = _build_tensor(send_recv_size, value=rank) - profiler_ctx = torch.autograd.profiler.profile(record_shapes=True) if enable_profiling else contextlib.suppress() + profiler_ctx = ( + torch.autograd.profiler.profile(record_shapes=True) + if enable_profiling else + suppress() + ) with profiler_ctx as prof: for dst in range(0, world_size): if dst == rank: @@ -1112,7 +1124,11 @@ def test_send_recv_with_tag_autograd_profiler(self): def _test_isend(self, enable_profiling): rank = dist.get_rank() world_size = dist.get_world_size() - profiler_ctx = torch.autograd.profiler.profile(record_shapes=True) if enable_profiling else contextlib.suppress() + profiler_ctx = ( + torch.autograd.profiler.profile(record_shapes=True) + if enable_profiling else + suppress() + ) with profiler_ctx as prof: if rank == 0: requests = [ From f8ef8175108f5f8955254f1b82a2ba91b511698f Mon Sep 17 00:00:00 2001 From: Rohan Varma Date: Mon, 19 Apr 2021 15:28:48 -0700 Subject: [PATCH 3/4] Update on "Separate profiling tests from p2p tests" We are investigating some flaky profiiling tests such as https://github.com/pytorch/pytorch/issues/56146. One issue is that the profiling tests are tightly coupled to these send/recv tests, hence if this test is disabled, we lose signal round send/recv collectives tests. To mitigate this, separate the tests into ones that only test send/recv, and ones that test it with profiling. This way flakiness should not result in the send/recv only tests being disabled. Differential Revision: [D27864845](https://our.internmc.facebook.com/intern/diff/D27864845/) [ghstack-poisoned] --- .../_internal/distributed/distributed_test.py | 31 +++++++------------ 1 file changed, 11 insertions(+), 20 deletions(-) diff --git a/torch/testing/_internal/distributed/distributed_test.py b/torch/testing/_internal/distributed/distributed_test.py index 39f93537f4fa9..c690f8e24e7a4 100644 --- a/torch/testing/_internal/distributed/distributed_test.py +++ b/torch/testing/_internal/distributed/distributed_test.py @@ -331,6 +331,13 @@ def _build_multidim_tensor(dim, dim_size, value=None, dtype=torch.float): value = size return torch.empty(size=[dim_size for _ in range(dim)], dtype=dtype).fill_(value) +def _create_autograd_profiler(enable_profiling): + return ( + torch.autograd.profiler.profile(record_shapes=True) + if enable_profiling else + suppress() + ) + class Barrier(object): barrier_id = 0 @@ -946,11 +953,7 @@ def _test_send_recv(self, enable_profiling): rank = dist.get_rank() send_size = rank + 1 tensor = _build_tensor(send_size) - profile_ctx = ( - torch.autograd.profiler.profile(record_shapes=True) - if enable_profiling else - suppress() - ) + profile_ctx = _create_autograd_profiler(enable_profiling) with profile_ctx as prof: for src in range(0, dist.get_world_size()): if src == rank: @@ -1002,11 +1005,7 @@ def _test_send_recv_any_source(self, enable_profiling): recv_ranks = list() irecv_ranks = list() - profile_ctx = ( - torch.autograd.profiler.profile(record_shapes=True) - if enable_profiling else - suppress() - ) + profiler_ctx = _create_autograd_profiler(enable_profiling) with profile_ctx as prof: for dst in range(0, dist.get_world_size()): if dst == rank: @@ -1080,11 +1079,7 @@ def _test_send_recv_with_tag(self, enable_profiling): world_size = dist.get_world_size() send_recv_size = 10 tensor = _build_tensor(send_recv_size, value=rank) - profiler_ctx = ( - torch.autograd.profiler.profile(record_shapes=True) - if enable_profiling else - suppress() - ) + profiler_ctx = _create_autograd_profiler(enable_profiling) with profiler_ctx as prof: for dst in range(0, world_size): if dst == rank: @@ -1124,11 +1119,7 @@ def test_send_recv_with_tag_autograd_profiler(self): def _test_isend(self, enable_profiling): rank = dist.get_rank() world_size = dist.get_world_size() - profiler_ctx = ( - torch.autograd.profiler.profile(record_shapes=True) - if enable_profiling else - suppress() - ) + profiler_ctx = _create_autograd_profiler(enable_profiling) with profiler_ctx as prof: if rank == 0: requests = [ From b18878ca799123a1b440a667647a3859a0fe8d8e Mon Sep 17 00:00:00 2001 From: Rohan Varma Date: Mon, 19 Apr 2021 15:55:04 -0700 Subject: [PATCH 4/4] Update on "Separate profiling tests from p2p tests" We are investigating some flaky profiiling tests such as https://github.com/pytorch/pytorch/issues/56146. One issue is that the profiling tests are tightly coupled to these send/recv tests, hence if this test is disabled, we lose signal round send/recv collectives tests. To mitigate this, separate the tests into ones that only test send/recv, and ones that test it with profiling. This way flakiness should not result in the send/recv only tests being disabled. Differential Revision: [D27864845](https://our.internmc.facebook.com/intern/diff/D27864845/) [ghstack-poisoned] --- torch/testing/_internal/distributed/distributed_test.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/torch/testing/_internal/distributed/distributed_test.py b/torch/testing/_internal/distributed/distributed_test.py index c690f8e24e7a4..575dc0fcd3df7 100644 --- a/torch/testing/_internal/distributed/distributed_test.py +++ b/torch/testing/_internal/distributed/distributed_test.py @@ -970,8 +970,6 @@ def _test_send_recv(self, enable_profiling): dist.recv(output_tensor, src) self.assertEqual(output_tensor, expected_tensor) - self._barrier() - if enable_profiling: backend = dist.get_backend() if backend in SEND_RECV_PROFILING_SUPPORTED_BACKENDS: