Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Separate profiling tests from p2p tests #56412

Closed
wants to merge 5 commits into from
Closed
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
218 changes: 136 additions & 82 deletions torch/testing/_internal/distributed/distributed_test.py
Expand Up @@ -942,12 +942,16 @@ def test_send_recv_nccl(self):
self._barrier()

# SEND RECV
@unittest.skipIf(BACKEND == "nccl", "Nccl does not support send/recv")
def test_send_recv(self):
def _test_send_recv(self, enable_profiling):
rank = dist.get_rank()
send_size = rank + 1
tensor = _build_tensor(send_size)
with torch.autograd.profiler.profile(record_shapes=True) as prof:
profile_ctx = (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this occurs multiple times, shall we dedup this?

torch.autograd.profiler.profile(record_shapes=True)
if enable_profiling else
suppress()
)
with profile_ctx as prof:
for src in range(0, dist.get_world_size()):
if src == rank:
# Send mode
Expand All @@ -965,34 +969,45 @@ def test_send_recv(self):

self._barrier()

backend = dist.get_backend()
if backend in SEND_RECV_PROFILING_SUPPORTED_BACKENDS:
for event_name in [f"{backend}:send", f"{backend}:recv"]:
events = get_profiling_event(event_name, prof)
# Each rank sends/recvs from all other ranks.
event_count = sum(e.count for e in events)
expected_event_count = dist.get_world_size() - 1
self.assertEqual(event_count, expected_event_count)
# Event order is not deterministic, so simply assert their shape
# is found in the following list.
expected_shapes = [
[[rank + 1] * 3] for rank in range(dist.get_world_size())
]
for event in events:
self.assertTrue(event.input_shapes in expected_shapes)
if enable_profiling:
backend = dist.get_backend()
if backend in SEND_RECV_PROFILING_SUPPORTED_BACKENDS:
for event_name in [f"{backend}:send", f"{backend}:recv"]:
events = get_profiling_event(event_name, prof)
# Each rank sends/recvs from all other ranks.
event_count = sum(e.count for e in events)
expected_event_count = dist.get_world_size() - 1
self.assertEqual(event_count, expected_event_count)
# Event order is not deterministic, so simply assert their shape
# is found in the following list.
expected_shapes = [
[[rank + 1] * 3] for rank in range(dist.get_world_size())
]
for event in events:
self.assertTrue(event.input_shapes in expected_shapes)

@unittest.skipIf(BACKEND == "nccl", "Nccl send/recv tested by test_send_recv_nccl")
def test_send_recv(self):
self._test_send_recv(enable_profiling=False)

@unittest.skipIf(BACKEND == "nccl", "Nccl send/recv tested by test_send_recv_nccl")
def test_send_recv_autograd_profiler(self):
self._test_send_recv(enable_profiling=True)

# SEND RECV ANY SOURCE
@unittest.skipIf(
BACKEND == "nccl", "Nccl does not support send/recv from any source"
)
def test_send_recv_any_source(self):
def _test_send_recv_any_source(self, enable_profiling):
rank = dist.get_rank()
send_recv_size = 10
tensor = _build_tensor(send_recv_size, value=rank)
recv_ranks = list()
irecv_ranks = list()

with torch.autograd.profiler.profile(record_shapes=True) as prof:
profile_ctx = (
torch.autograd.profiler.profile(record_shapes=True)
if enable_profiling else
suppress()
)
with profile_ctx as prof:
for dst in range(0, dist.get_world_size()):
if dst == rank:
# Recv mode
Expand Down Expand Up @@ -1021,39 +1036,56 @@ def test_send_recv_any_source(self):
dist.send(tensor, dst) # recv
dist.send(tensor, dst) # irecv

backend = dist.get_backend()
if backend in SEND_RECV_PROFILING_SUPPORTED_BACKENDS:
for event_name in [f"{backend}:send", f"{backend}:recvAnySource"]:
events = get_profiling_event(event_name, prof)
# Each rank sends/recvs from other rank twice.
self.assertEqual(sum(event.count for event in events), 2 * (dist.get_world_size() - 1))
for event in events:
self.assertEqual(event.input_shapes, [[send_recv_size] * 3])

# Each rank would have 2 * (world_size - 1) sends, verify that
# globally we receive the same amount on the other end.
recv_ranks_tensor = torch.cat((torch.tensor(recv_ranks), torch.tensor(irecv_ranks)), 0)
global_recv_ranks = [torch.empty_like(recv_ranks_tensor) for _ in range(dist.get_world_size())]
dist.all_gather(global_recv_ranks, recv_ranks_tensor)
global_recv_ranks_list = []
for tensor in global_recv_ranks:
global_recv_ranks_list += tensor.tolist()

from itertools import groupby
global_recv_ranks_list.sort()
frequency = [len(list(group)) for key, group in groupby(global_recv_ranks_list)]
self.assertEqual(dist.get_world_size(), len(frequency))
self.assertEqual([2 * (dist.get_world_size() - 1)] * dist.get_world_size(), frequency)
self._barrier()
if enable_profiling:
backend = dist.get_backend()
if backend in SEND_RECV_PROFILING_SUPPORTED_BACKENDS:
for event_name in [f"{backend}:send", f"{backend}:recvAnySource"]:
events = get_profiling_event(event_name, prof)
# Each rank sends/recvs from other rank twice.
self.assertEqual(sum(event.count for event in events), 2 * (dist.get_world_size() - 1))
for event in events:
self.assertEqual(event.input_shapes, [[send_recv_size] * 3])

# Each rank would have 2 * (world_size - 1) sends, verify that
# globally we receive the same amount on the other end.
recv_ranks_tensor = torch.cat((torch.tensor(recv_ranks), torch.tensor(irecv_ranks)), 0)
global_recv_ranks = [torch.empty_like(recv_ranks_tensor) for _ in range(dist.get_world_size())]
dist.all_gather(global_recv_ranks, recv_ranks_tensor)
global_recv_ranks_list = []
for tensor in global_recv_ranks:
global_recv_ranks_list += tensor.tolist()

from itertools import groupby
global_recv_ranks_list.sort()
frequency = [len(list(group)) for key, group in groupby(global_recv_ranks_list)]
self.assertEqual(dist.get_world_size(), len(frequency))
self.assertEqual([2 * (dist.get_world_size() - 1)] * dist.get_world_size(), frequency)
self._barrier()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this barrier is only needed for the profiler enabled case?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like test_send_recv_nccl has this without profiling tests, hence thought we should keep it here as well. Although maybe that test copied the barrier from this test and it's not needed when profiler is not enabled.

Actually looking at this a bit more, I don't think _barrier() is needed at all since send/recv are issued in blocking fashion and _barrier doesn't provide anything extra here. I think it's better to just remove it.


@unittest.skipIf(
BACKEND == "nccl", "Nccl does not support send/recv from any source"
)
def test_send_recv_any_source(self):
self._test_send_recv_any_source(enable_profiling=False)

@unittest.skipIf(
BACKEND == "nccl", "Nccl does not support send/recv from any source"
)
def test_send_recv_any_source_autograd_profiler(self):
self._test_send_recv_any_source(enable_profiling=True)

# SEND RECV WITH TAG
@unittest.skipIf(BACKEND == "nccl", "Nccl does not support send/recv")
def test_send_recv_with_tag(self):
def _test_send_recv_with_tag(self, enable_profiling):
rank = dist.get_rank()
world_size = dist.get_world_size()
send_recv_size = 10
tensor = _build_tensor(send_recv_size, value=rank)
with torch.autograd.profiler.profile(record_shapes=True) as prof:
profiler_ctx = (
torch.autograd.profiler.profile(record_shapes=True)
if enable_profiling else
suppress()
)
with profiler_ctx as prof:
for dst in range(0, world_size):
if dst == rank:
# Recv mode
Expand All @@ -1067,24 +1099,37 @@ def test_send_recv_with_tag(self):
# Send mode
dist.send(tensor, dst, tag=rank)

backend = dist.get_backend()
if backend in SEND_RECV_PROFILING_SUPPORTED_BACKENDS:
for event_name in [f"{backend}:send", f"{backend}:recv"]:
events = get_profiling_event(event_name, prof)
# Each rank sends/recvs from all other ranks
event_count = sum(e.count for e in events)
expected_event_count = dist.get_world_size() - 1
self.assertEqual(event_count, expected_event_count)
for event in events:
self.assertEqual(event.name, event_name)
self.assertEqual(event.input_shapes, [[send_recv_size] * 3])
if enable_profiling:
backend = dist.get_backend()
if backend in SEND_RECV_PROFILING_SUPPORTED_BACKENDS:
for event_name in [f"{backend}:send", f"{backend}:recv"]:
events = get_profiling_event(event_name, prof)
# Each rank sends/recvs from all other ranks
event_count = sum(e.count for e in events)
expected_event_count = dist.get_world_size() - 1
self.assertEqual(event_count, expected_event_count)
for event in events:
self.assertEqual(event.name, event_name)
self.assertEqual(event.input_shapes, [[send_recv_size] * 3])

@unittest.skipIf(BACKEND == "nccl", "NCCL send/recv tested by test_send_recv_nccl")
def test_send_recv_with_tag(self):
self._test_send_recv_with_tag(enable_profiling=False)

@unittest.skipIf(BACKEND == "nccl", "NCCL send/recv tested by test_send_recv_nccl")
def test_send_recv_with_tag_autograd_profiler(self):
self._test_send_recv_with_tag(enable_profiling=True)

# ISEND
@unittest.skipIf(BACKEND == "nccl", "Nccl does not support isend")
def test_isend(self):
def _test_isend(self, enable_profiling):
rank = dist.get_rank()
world_size = dist.get_world_size()
with torch.autograd.profiler.profile(record_shapes=True) as prof:
profiler_ctx = (
torch.autograd.profiler.profile(record_shapes=True)
if enable_profiling else
suppress()
)
with profiler_ctx as prof:
if rank == 0:
requests = [
dist.isend(_build_tensor(dest, 10), dest)
Expand All @@ -1100,24 +1145,33 @@ def test_isend(self):

self._barrier()

backend = dist.get_backend()
if backend in SEND_RECV_PROFILING_SUPPORTED_BACKENDS:
expected_event_name = f"{backend}:send" if rank == 0 else f"{backend}:recv"
events = get_profiling_event(expected_event_name, prof)
event_count = sum(e.count for e in events)
expected_count = dist.get_world_size() - 1 if rank == 0 else 1
self.assertEqual(expected_count, event_count)
# Event ordering is not guaranteed, so simply ensure the shapes are
# found in the following map.
expected_shapes = {
r: [[r] * 3] for r in range(1, dist.get_world_size())
}
for event in events:
self.assertEqual(event.name, expected_event_name)
if rank == 0:
self.assertTrue(event.input_shapes in expected_shapes.values())
else:
self.assertEqual(event.input_shapes, expected_shapes[rank])
if enable_profiling:
backend = dist.get_backend()
if backend in SEND_RECV_PROFILING_SUPPORTED_BACKENDS:
expected_event_name = f"{backend}:send" if rank == 0 else f"{backend}:recv"
events = get_profiling_event(expected_event_name, prof)
event_count = sum(e.count for e in events)
expected_count = dist.get_world_size() - 1 if rank == 0 else 1
self.assertEqual(expected_count, event_count)
# Event ordering is not guaranteed, so simply ensure the shapes are
# found in the following map.
expected_shapes = {
r: [[r] * 3] for r in range(1, dist.get_world_size())
}
for event in events:
self.assertEqual(event.name, expected_event_name)
if rank == 0:
self.assertTrue(event.input_shapes in expected_shapes.values())
else:
self.assertEqual(event.input_shapes, expected_shapes[rank])

@unittest.skipIf(BACKEND == "nccl", "Nccl does not support isend")
def test_isend(self):
self._test_isend(enable_profiling=False)

@unittest.skipIf(BACKEND == "nccl", "Nccl does not support isend")
def test_isend_autograd_profiler(self):
self._test_isend(enable_profiling=True)

# IRECV
@unittest.skipIf(BACKEND == "nccl", "Nccl does not support irecv")
Expand Down