From f242172e5f202ffbd873db8579e3c07195326bd0 Mon Sep 17 00:00:00 2001 From: pritam Date: Tue, 29 Dec 2020 10:37:26 -0800 Subject: [PATCH 1/3] Fix store based barrier to only use 'add'. Certain store implementations don't work well when we use get() and add() on the same key. To avoid this issue, we only use add() in the store based barrier. The buggy store implementations can't be properly fixed due to legacy reasons. Differential Revision: [D25725386](https://our.internmc.facebook.com/intern/diff/D25725386/) **NOTE FOR REVIEWERS**: This PR has internal Facebook specific changes or comments, please review them on [Phabricator](https://our.internmc.facebook.com/intern/diff/D25725386/)! [ghstack-poisoned] --- torch/distributed/distributed_c10d.py | 31 +++++++++------------------ 1 file changed, 10 insertions(+), 21 deletions(-) diff --git a/torch/distributed/distributed_c10d.py b/torch/distributed/distributed_c10d.py index a8517a4bb394..830178b33051 100644 --- a/torch/distributed/distributed_c10d.py +++ b/torch/distributed/distributed_c10d.py @@ -18,7 +18,6 @@ AllreduceCoalescedOptions, AllToAllOptions, BroadcastOptions, - FileStore, GatherOptions, PrefixStore, ProcessGroup, @@ -27,15 +26,8 @@ ReduceScatterOptions, ScatterOptions, Store, - TCPStore, ) -if sys.platform != 'win32': - from torch._C._distributed_c10d import ( - HashStore, - ) - - _MPI_AVAILABLE = True _NCCL_AVAILABLE = True _GLOO_AVAILABLE = True @@ -194,11 +186,16 @@ def _store_based_barrier(rank, store, timeout): # Now wait for all workers to check in with the store. world_size = get_world_size() - worker_count = int(store.get(store_key)) + # Use 'add' instead of 'get' since for some store implementations 'add' + # doesn't work well with 'get'. Ideally the store implementations should + # be fixed, but for backward compatiblity reasons it is risky to change + # the store implementations. Once, we completely migrate away from these + # legacy stores, we can use 'get' here instead. + worker_count = store.add(store_key, 0) start = time.time() while worker_count != world_size: time.sleep(0.01) - worker_count = int(store.get(store_key)) + worker_count = store.add(store_key, 0) if timedelta(seconds=(time.time() - start)) > timeout: raise RuntimeError("Timed out initializing process group") @@ -504,12 +501,8 @@ def init_process_group(backend, # barrier at the end to ensure that once we return from this method, all # process groups including global variables are updated correctly on all # ranks. - if backend == Backend.MPI or not ( - isinstance(store, TCPStore) or - isinstance(store, FileStore) or - (sys.platform != 'win32' and isinstance(store, HashStore)) - ): - # MPI doesn't have store. + if backend == Backend.MPI: + # MPI backend doesn't use store. barrier() else: # Use store based barrier here since barrier() used a bunch of @@ -2491,11 +2484,7 @@ def new_group(ranks=None, timeout=default_pg_timeout, backend=None): # barrier at the end to ensure that once we return from this method, all # process groups including global variables are updated correctly on all # ranks. - if backend == Backend.MPI or not ( - isinstance(default_store, TCPStore) or - isinstance(default_store, FileStore) or - (sys.platform != 'win32' and isinstance(default_store, HashStore)) - ): + if backend == Backend.MPI: # MPI doesn't have store. barrier() else: From dd928ffc0f83fcdd3ab1cb405e4bafe23c5d5090 Mon Sep 17 00:00:00 2001 From: pritam Date: Tue, 29 Dec 2020 12:55:27 -0800 Subject: [PATCH 2/3] Update on "Fix store based barrier to only use 'add'." Certain store implementations don't work well when we use get() and add() on the same key. To avoid this issue, we only use add() in the store based barrier. The buggy store implementations can't be properly fixed due to legacy reasons. Differential Revision: [D25725386](https://our.internmc.facebook.com/intern/diff/D25725386/) **NOTE FOR REVIEWERS**: This PR has internal Facebook specific changes or comments, please review them on [Phabricator](https://our.internmc.facebook.com/intern/diff/D25725386/)! [ghstack-poisoned] --- torch/distributed/distributed_c10d.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/torch/distributed/distributed_c10d.py b/torch/distributed/distributed_c10d.py index 830178b33051..a7d352450327 100644 --- a/torch/distributed/distributed_c10d.py +++ b/torch/distributed/distributed_c10d.py @@ -1,7 +1,8 @@ +import contextlib +import logging import pickle import torch import warnings -import contextlib import sys import time from torch._six import string_classes @@ -183,6 +184,7 @@ def _store_based_barrier(rank, store, timeout): """ store_key = "{}:{}".format(STORE_BASED_BARRIER_PREFIX, _group_count) store.add(store_key, 1) + logging.info('Added key: {} to store for rank: {}'.format(store_key, rank)) # Now wait for all workers to check in with the store. world_size = get_world_size() @@ -197,7 +199,10 @@ def _store_based_barrier(rank, store, timeout): time.sleep(0.01) worker_count = store.add(store_key, 0) if timedelta(seconds=(time.time() - start)) > timeout: - raise RuntimeError("Timed out initializing process group") + raise RuntimeError( + "Timed out initializing process group in store based barrier on " + "rank: {}, for key: {} (world_size={}, worker_count={})".format( + rank, store_key, world_size, worker_count)) def _rank_not_in_group(group: ProcessGroup): """ @@ -2490,6 +2495,6 @@ def new_group(ranks=None, timeout=default_pg_timeout, backend=None): else: # Use store based barrier here since barrier() used a bunch of # default devices and messes up NCCL internal state. - _store_based_barrier(group_rank, default_store, timeout) + _store_based_barrier(global_rank, default_store, timeout) return pg From 9405fafadb5d359aeadc27c5996b9d404d781bdf Mon Sep 17 00:00:00 2001 From: pritam Date: Tue, 29 Dec 2020 12:57:34 -0800 Subject: [PATCH 3/3] Update on "Fix store based barrier to only use 'add'." Certain store implementations don't work well when we use get() and add() on the same key. To avoid this issue, we only use add() in the store based barrier. The buggy store implementations can't be properly fixed due to legacy reasons. Differential Revision: [D25725386](https://our.internmc.facebook.com/intern/diff/D25725386/) **NOTE FOR REVIEWERS**: This PR has internal Facebook specific changes or comments, please review them on [Phabricator](https://our.internmc.facebook.com/intern/diff/D25725386/)! [ghstack-poisoned] --- torch/distributed/distributed_c10d.py | 1 - 1 file changed, 1 deletion(-) diff --git a/torch/distributed/distributed_c10d.py b/torch/distributed/distributed_c10d.py index a7d352450327..c0da643e8554 100644 --- a/torch/distributed/distributed_c10d.py +++ b/torch/distributed/distributed_c10d.py @@ -3,7 +3,6 @@ import pickle import torch import warnings -import sys import time from torch._six import string_classes from datetime import timedelta