Skip to content

Commit

Permalink
[BE] Fix flaky ProcessGroupGloo tests (#61396)
Browse files Browse the repository at this point in the history
Summary:
A hypothesis as to why tests such as #57469 may be flaky is due to `c10d = ProcessGroupGloo(...)` is not actually guaranteed to be a synchronization point, so some ranks may create the PG, run all the error checking (which does not actually call into gloo APIs so doesn't require synchronization), and then exit, all before other ranks have created the gloo pg.

This can result in the following error:
```
File "distributed/test_c10d_gloo.py", line 1037, in test_reduce_checks
May 03 06:42:34     pg = c10d.ProcessGroupGloo(store, self.rank, self.world_size, self.opts())
May 03 06:42:34 RuntimeError: [/var/lib/jenkins/workspace/third_party/gloo/gloo/transport/tcp/pair.cc:598] Connection closed by peer [127.0.0.1]:35521
```

which indicates that the remote end has hung up. Furthermore all the flaky tests in this file only do error checking and don't call into the gloo APIs, further indicating that this issue may be the root cause. Not 100% sure this PR will fix it because I haven't been able to actually repro the issue even after 10000+ runs, but it happens regularly in CI.

To fix this, we add a `dist.barrier(group=pg)` call after creating the pg to enforce a synchronization. Would be good to land this and observe whether it helps with the flakiness.

Pull Request resolved: #61396

Reviewed By: mrshenli

Differential Revision: D29664189

Pulled By: rohan-varma

fbshipit-source-id: bc046d5d816fe6cb426522b85312383bfa3f90b7
  • Loading branch information
rohan-varma authored and facebook-github-bot committed Jul 13, 2021
1 parent 3e5d2b5 commit d520406
Showing 1 changed file with 36 additions and 30 deletions.
66 changes: 36 additions & 30 deletions test/distributed/test_c10d_gloo.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,12 @@ def test_default_store_timeout_gloo(self):
"TSAN is not fork-safe since we're forking in a multi-threaded environment",
)
class ProcessGroupGlooTest(MultiProcessTestCase):
def _create_process_group_gloo(self, store, rank, world_size, opts):
pg = c10d.ProcessGroupGloo(store, self.rank, self.world_size, opts)
dist.barrier(group=pg)
return pg


def setUp(self):
super(ProcessGroupGlooTest, self).setUp()

Expand All @@ -232,15 +238,15 @@ def test_multi_device_constructor(self):
create_device(interface=LOOPBACK),
create_device(interface=LOOPBACK),
]
pg = c10d.ProcessGroupGloo(store, self.rank, self.world_size, opts)
pg = self._create_process_group_gloo(store, self.rank, self.world_size, opts)

# Execute 2x the number of operations to ensure we use every device.
for fut in [pg.allreduce(torch.ones(i + 1)).get_future() for i in range(4)]:
fut.wait()

def test_empty_tensors(self):
store = c10d.FileStore(self.file_name, self.world_size)
pg = c10d.ProcessGroupGloo(store, self.rank, self.world_size, self.opts())
pg = self._create_process_group_gloo(store, self.rank, self.world_size, self.opts())

xs = [torch.FloatTensor([])]
fut = pg.broadcast(xs).get_future()
Expand All @@ -251,7 +257,7 @@ def test_empty_tensors(self):

def test_broadcast_checks(self):
store = c10d.FileStore(self.file_name, self.world_size)
pg = c10d.ProcessGroupGloo(store, self.rank, self.world_size, self.opts())
pg = self._create_process_group_gloo(store, self.rank, self.world_size, self.opts())

t1 = torch.zeros([1], dtype=torch.float32)
t2 = torch.zeros([1], dtype=torch.float64)
Expand Down Expand Up @@ -301,7 +307,7 @@ def test_broadcast_checks(self):

def _test_broadcast_basics(self, fn):
store = c10d.FileStore(self.file_name, self.world_size)
pg = c10d.ProcessGroupGloo(store, self.rank, self.world_size, self.opts())
pg = self._create_process_group_gloo(store, self.rank, self.world_size, self.opts())

def broadcast(xs, rootRank, rootTensor):
opts = c10d.BroadcastOptions()
Expand Down Expand Up @@ -349,7 +355,7 @@ def test_broadcast_basics_cuda(self):

def _test_broadcast_stress(self, inputs):
store = c10d.FileStore(self.file_name, self.world_size)
pg = c10d.ProcessGroupGloo(
pg = self._create_process_group_gloo(
store, self.rank, self.world_size, self.opts(threads=8)
)
work_handles = [
Expand Down Expand Up @@ -377,7 +383,7 @@ def test_broadcast_stress_cuda(self):

def test_allreduce_checks(self):
store = c10d.FileStore(self.file_name, self.world_size)
pg = c10d.ProcessGroupGloo(store, self.rank, self.world_size, self.opts())
pg = self._create_process_group_gloo(store, self.rank, self.world_size, self.opts())

t1 = torch.zeros([1], dtype=torch.float32)
t2 = torch.zeros([1], dtype=torch.float64)
Expand All @@ -397,7 +403,7 @@ def test_allreduce_checks(self):

def _test_allreduce_basics(self, fn):
store = c10d.FileStore(self.file_name, self.world_size)
pg = c10d.ProcessGroupGloo(store, self.rank, self.world_size, self.opts())
pg = self._create_process_group_gloo(store, self.rank, self.world_size, self.opts())

# Single input tests
tests = simple_reduce_tests(self.rank, self.world_size)
Expand Down Expand Up @@ -444,7 +450,7 @@ def test_allreduce_basics_cuda(self):
# This should go away as we deprecate it.
def _test_allreduce_basics_using_work_api(self, fn):
store = c10d.FileStore(self.file_name, self.world_size)
pg = c10d.ProcessGroupGloo(store, self.rank, self.world_size, self.opts())
pg = self._create_process_group_gloo(store, self.rank, self.world_size, self.opts())

# Single input tests
tests = simple_reduce_tests(self.rank, self.world_size)
Expand Down Expand Up @@ -489,7 +495,7 @@ def test_allreduce_basics_cuda_using_work_api(self):

def _test_allreduce_stress(self, inputs):
store = c10d.FileStore(self.file_name, self.world_size)
pg = c10d.ProcessGroupGloo(
pg = self._create_process_group_gloo(
store, self.rank, self.world_size, self.opts(threads=8)
)
future_handles = [pg.allreduce(inputs[i]).get_future() for i in range(len(inputs))]
Expand Down Expand Up @@ -519,7 +525,7 @@ def test_allreduce_stress_cuda(self):

def test_allreduce_coalesced_checks(self):
store = c10d.FileStore(self.file_name, self.world_size)
pg = c10d.ProcessGroupGloo(store, self.rank, self.world_size, self.opts())
pg = self._create_process_group_gloo(store, self.rank, self.world_size, self.opts())

t1 = torch.zeros(1, dtype=torch.float32)
t2 = torch.zeros(1, dtype=torch.float64)
Expand All @@ -544,7 +550,7 @@ def test_allreduce_coalesced_checks(self):
@skip_if_lt_x_gpu(1)
def test_allreduce_coalesced_checks_cuda(self):
store = c10d.FileStore(self.file_name, self.world_size)
pg = c10d.ProcessGroupGloo(store, self.rank, self.world_size, self.opts())
pg = self._create_process_group_gloo(store, self.rank, self.world_size, self.opts())

t1 = torch.zeros(1, dtype=torch.float32)

Expand All @@ -554,7 +560,7 @@ def test_allreduce_coalesced_checks_cuda(self):

def _test_allreduce_coalesced_basics(self, fn):
store = c10d.FileStore(self.file_name, self.world_size)
pg = c10d.ProcessGroupGloo(store, self.rank, self.world_size, self.opts())
pg = self._create_process_group_gloo(store, self.rank, self.world_size, self.opts())

test_cases = simple_coalesced_reduce_tests(self.rank, self.world_size)
for op, inputs, outputs in test_cases:
Expand All @@ -573,7 +579,7 @@ def test_allreduce_coalesced_basics(self):

def _test_allreduce_coalesced_stress(self, inputs):
store = c10d.FileStore(self.file_name, self.world_size)
pg = c10d.ProcessGroupGloo(
pg = self._create_process_group_gloo(
store, self.rank, self.world_size, self.opts(threads=8)
)
future_handles = [pg.allreduce_coalesced(input).get_future() for input in inputs]
Expand Down Expand Up @@ -601,7 +607,7 @@ def test_allreduce_coalesced_stress(self):

def test_sparse_allreduce_checks(self):
store = c10d.FileStore(self.file_name, self.world_size)
pg = c10d.ProcessGroupGloo(store, self.rank, self.world_size, self.opts())
pg = self._create_process_group_gloo(store, self.rank, self.world_size, self.opts())

t1 = torch.zeros([1])
t2 = torch.sparse_coo_tensor([[0]], [1], size=(2,))
Expand All @@ -628,7 +634,7 @@ def test_sparse_allreduce_checks(self):

def _test_sparse_allreduce_basics(self, fn):
store = c10d.FileStore(self.file_name, self.world_size)
pg = c10d.ProcessGroupGloo(store, self.rank, self.world_size, self.opts())
pg = self._create_process_group_gloo(store, self.rank, self.world_size, self.opts())

for num_inputs_per_rank in [1, 2]:
tests = simple_sparse_reduce_tests(
Expand All @@ -652,7 +658,7 @@ def test_sparse_allreduce_basics_cuda(self):

def test_scatter_checks(self):
store = c10d.FileStore(self.file_name, self.world_size)
pg = c10d.ProcessGroupGloo(store, self.rank, self.world_size, self.opts())
pg = self._create_process_group_gloo(store, self.rank, self.world_size, self.opts())

t1 = torch.zeros([1], dtype=torch.float32)
t2 = torch.zeros([1], dtype=torch.float64)
Expand Down Expand Up @@ -727,7 +733,7 @@ def test_scatter_checks(self):

def _test_scatter_basics(self, fn):
store = c10d.FileStore(self.file_name, self.world_size)
pg = c10d.ProcessGroupGloo(store, self.rank, self.world_size, self.opts())
pg = self._create_process_group_gloo(store, self.rank, self.world_size, self.opts())

# Preallocate tensors for input/output
input = [fn(torch.tensor([self.rank])) for _ in range(self.world_size)]
Expand Down Expand Up @@ -758,7 +764,7 @@ def test_scatter_basics_cuda(self):

def _test_scatter_stress(self, inputs, fn):
store = c10d.FileStore(self.file_name, self.world_size)
pg = c10d.ProcessGroupGloo(
pg = self._create_process_group_gloo(
store, self.rank, self.world_size, self.opts(threads=8)
)
outputs = [
Expand Down Expand Up @@ -808,7 +814,7 @@ def test_scatter_stress_cuda(self):

def test_gather_checks(self):
store = c10d.FileStore(self.file_name, self.world_size)
pg = c10d.ProcessGroupGloo(store, self.rank, self.world_size, self.opts())
pg = self._create_process_group_gloo(store, self.rank, self.world_size, self.opts())

t1 = torch.zeros([1], dtype=torch.float32)
t2 = torch.zeros([1], dtype=torch.float64)
Expand Down Expand Up @@ -887,7 +893,7 @@ def test_gather_checks(self):

def _test_gather_basics(self, fn):
store = c10d.FileStore(self.file_name, self.world_size)
pg = c10d.ProcessGroupGloo(store, self.rank, self.world_size, self.opts())
pg = self._create_process_group_gloo(store, self.rank, self.world_size, self.opts())

# Preallocate tensors for input/output
input = [fn(torch.tensor([self.rank]))]
Expand Down Expand Up @@ -920,7 +926,7 @@ def test_gather_basics_cuda(self):

def _test_gather_stress(self, inputs, fn):
store = c10d.FileStore(self.file_name, self.world_size)
pg = c10d.ProcessGroupGloo(
pg = self._create_process_group_gloo(
store, self.rank, self.world_size, self.opts(threads=8)
)
future_handles = []
Expand Down Expand Up @@ -966,7 +972,7 @@ def test_gather_stress_cuda(self):

def test_allgather_checks(self):
store = c10d.FileStore(self.file_name, self.world_size)
pg = c10d.ProcessGroupGloo(store, self.rank, self.world_size, self.opts())
pg = self._create_process_group_gloo(store, self.rank, self.world_size, self.opts())

t1 = torch.zeros([1], dtype=torch.float32)
t2 = torch.zeros([1], dtype=torch.float64)
Expand Down Expand Up @@ -1009,7 +1015,7 @@ def test_allgather_checks(self):

def _test_allgather_basics(self, fn):
store = c10d.FileStore(self.file_name, self.world_size)
pg = c10d.ProcessGroupGloo(store, self.rank, self.world_size, self.opts())
pg = self._create_process_group_gloo(store, self.rank, self.world_size, self.opts())

# Run with N input tensor per rank
for n in [1, 2, 3]:
Expand Down Expand Up @@ -1038,7 +1044,7 @@ def test_allgather_basics_cuda(self):

def _test_allgather_stress(self, inputs, fn):
store = c10d.FileStore(self.file_name, self.world_size)
pg = c10d.ProcessGroupGloo(
pg = self._create_process_group_gloo(
store, self.rank, self.world_size, self.opts(threads=8)
)
future_handles = []
Expand Down Expand Up @@ -1075,7 +1081,7 @@ def test_allgather_stress_cuda(self):

def test_allgather_coalesced_checks(self):
store = c10d.FileStore(self.file_name, self.world_size)
pg = c10d.ProcessGroupGloo(store, self.rank, self.world_size, self.opts())
pg = self._create_process_group_gloo(store, self.rank, self.world_size, self.opts())
dummy_input = [torch.zeros([1], dtype=torch.float32)]
dummy_output_lists = [
[torch.zeros([1], dtype=torch.float32)] for _ in range(self.world_size)
Expand Down Expand Up @@ -1111,7 +1117,7 @@ def test_allgather_coalesced_checks(self):

def test_reduce_checks(self):
store = c10d.FileStore(self.file_name, self.world_size)
pg = c10d.ProcessGroupGloo(store, self.rank, self.world_size, self.opts())
pg = pg = self._create_process_group_gloo(store, self.rank, self.world_size, self.opts())

t1 = torch.zeros([1], dtype=torch.float32)

Expand Down Expand Up @@ -1143,7 +1149,7 @@ def test_reduce_checks(self):

def _test_reduce_basics(self, fn):
store = c10d.FileStore(self.file_name, self.world_size)
pg = c10d.ProcessGroupGloo(store, self.rank, self.world_size, self.opts())
pg = self._create_process_group_gloo(store, self.rank, self.world_size, self.opts())
for (op, input, output) in simple_reduce_tests(self.rank, self.world_size):
for root in range(self.world_size):
opts = c10d.ReduceOptions()
Expand All @@ -1166,7 +1172,7 @@ def test_reduce_basics_cuda(self):

def _test_reduce_stress(self, inputs):
store = c10d.FileStore(self.file_name, self.world_size)
pg = c10d.ProcessGroupGloo(
pg = self._create_process_group_gloo(
store, self.rank, self.world_size, self.opts(threads=8)
)
future_handles = []
Expand Down Expand Up @@ -1210,7 +1216,7 @@ def test_reduce_stress_cuda(self):

def test_send_recv_all_to_all(self):
store = c10d.FileStore(self.file_name, self.world_size)
pg = c10d.ProcessGroupGloo(store, self.rank, self.world_size, self.opts())
pg = self._create_process_group_gloo(store, self.rank, self.world_size, self.opts())

# Preallocate tensors for input/output
inputs = [torch.tensor([self.rank]) for _ in range(self.world_size)]
Expand Down Expand Up @@ -1248,7 +1254,7 @@ def test_send_recv_all_to_all(self):

def test_barrier_implies_wait(self):
store = c10d.FileStore(self.file_name, self.world_size)
pg = c10d.ProcessGroupGloo(store, self.rank, self.world_size, self.opts())
pg = self._create_process_group_gloo(store, self.rank, self.world_size, self.opts())

# Kick off allreduce operations
size = (100, 100)
Expand Down

0 comments on commit d520406

Please sign in to comment.