Skip to content

Commit

Permalink
[Oncall][MTPG] Fix flaky test multi_threaded - test_broadcast_object_…
Browse files Browse the repository at this point in the history
…list (#103568)

This test(https://github.com/pytorch/pytorch/blob/8340762211e3b55caa178bac748bd902249f6fc0/test/distributed/test_multi_threaded_pg.py#L133 ) is failing on internal sandbox with the following error msg:
```
  File "/data/sandcastle/boxes/eden-trunk-hg-fbcode-fbsource/buck-out/v2/gen/fbcode/8c7462494077df89/caffe2/test/distributed/__multi_threaded__/multi_threaded#link-tree/torch/testing/_internal/distributed/multi_threaded_pg.py", line 255, in _start_coll
    raise Exception(
Exception: world not ready, only 3 PG's registered but world has 4 ranks
 exiting thread 1
ERROR
```

Internal error report: https://www.internalfb.com/intern/test/562950031915334?ref_report_id=0

We believe this is because we no longer perform barrier after init (see #99937).
This PR temporarily turn back on ```TORCH_DIST_INIT_BARRIER``` to avoid flaky test for the time being, but we should look into it to find a way to properly do this.

cc. @kumpera @kwen2501
Pull Request resolved: #103568
Approved by: https://github.com/H-Huang
  • Loading branch information
wz337 authored and pytorchmergebot committed Jun 18, 2023
1 parent 59a01c4 commit 15eed5b
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 1 deletion.
6 changes: 6 additions & 0 deletions test/distributed/test_multi_threaded_pg.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Owner(s): ["oncall: distributed"]

import os
import sys
import torch
import torch.distributed as dist
Expand Down Expand Up @@ -95,9 +96,14 @@ def world_size(self):
return 4

def setUp(self):
os.environ["TORCH_DIST_INIT_BARRIER"] = "1"
super().setUp()
self._spawn_threads()

def tearDown(self):
super().tearDown()
os.environ["TORCH_DIST_INIT_BARRIER"] = "0"

def test_allgather(self):
input_tensor = torch.ones(3, 3) * dist.get_rank()
output_tensors = [torch.empty_like(input_tensor) for _ in range(self.world_size)]
Expand Down
1 change: 0 additions & 1 deletion torch/testing/_internal/common_distributed.py
Original file line number Diff line number Diff line change
Expand Up @@ -1029,7 +1029,6 @@ def run_test_with_threaded_pg(self, test_name, rank, world_size):
"""
Run the current test associated with `test_name` using the threaded process group.
"""

c10d.init_process_group(
backend="threaded", rank=rank, world_size=world_size, store=self.__class__.global_store
)
Expand Down

0 comments on commit 15eed5b

Please sign in to comment.