diff --git a/torchelastic/rendezvous/etcd_rendezvous.py b/torchelastic/rendezvous/etcd_rendezvous.py index 01215b65..1dad55b1 100644 --- a/torchelastic/rendezvous/etcd_rendezvous.py +++ b/torchelastic/rendezvous/etcd_rendezvous.py @@ -94,23 +94,8 @@ def __del__(self): def next_rendezvous(self): rdzv_version, rank, world_size = self._rdzv_impl.rendezvous_barrier() - # TODO: https://github.com/pytorch/elastic/issues/11 - # make EtcdStore the default and remove TCPStore code - # Setup a c10d store for this specific rendezvous version, - # by piggybacking on the etcd handler used during rendezvous. - # Switch back to EtcdStore once issue with - # pybind11-trampoline for c10d Store is resolved. - # store = self._rdzv_impl.setup_kv_store(rdzv_version) - # path once the pybind11-trampoline fix for c10d::Store is included in - # the next pytorch release. Then, remove this hack. - import torchelastic.rendezvous # noqa - - if "_TORCHELASTIC_USE_ETCDSTORE" in torchelastic.rendezvous.__dict__: - log.info("Using EtcdStore for c10d::Store implementation") - store = self._rdzv_impl.setup_kv_store(rdzv_version) - else: - log.info("Using TCPStore for c10d::Store implementation") - store = setup_tcpstore(rank, world_size, rdzv_version, self._rdzv_impl) + log.info("Creating EtcdStore as the c10d::Store implementation") + store = self._rdzv_impl.setup_kv_store(rdzv_version) return store, rank, world_size @@ -1065,40 +1050,6 @@ def _get_socket_with_port(): raise RuntimeError("Failed to create a socket") -# Helper function to setup a TCPStore-based c10d::Store implementation. -def setup_tcpstore(rank, world_size, rdzv_version, rdzv_impl): - if rank == 0: - import socket - from contextlib import closing - - # FIXME: ideally, TCPStore should have an API that - # accepts a pre-constructed socket. - with closing(_get_socket_with_port()) as sock: - host = socket.gethostname() - port = sock.getsockname()[1] - - rdzv_impl.store_extra_data( - rdzv_version, key="tcpstore_server", value="{}:{}".format(host, port) - ) - - log.info(f"Setting up TCPStore server on {host}:{port}") - start_daemon = True - sock.close() # FIXME: get rid of race-condition by improving TCPStore API - store = TCPStore(host, port, world_size, start_daemon) - log.info(f"TCPStore server initialized on {host}:{port}") - else: - hostport = rdzv_impl.load_extra_data(rdzv_version, key="tcpstore_server") - log.info(f"Rank {rank} will conenct to TCPStore server at {hostport}") - - import re - - host, port = re.match(r"(.+):(\d+)$", hostport).groups() - start_daemon = False - store = TCPStore(host, int(port), world_size, start_daemon) - - return store - - # Helper for _etcd_rendezvous_handler(url) def _parse_etcd_client_params(params): kwargs = {}