Skip to content

Commit

Permalink
Revert "Fix default timeouts for python entrypoints (e.g. init_proces…
Browse files Browse the repository at this point in the history
  • Loading branch information
pytorchmergebot committed Nov 6, 2023
1 parent eefe327 commit 75adb9f
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 86 deletions.
4 changes: 2 additions & 2 deletions test/distributed/test_c10d_nccl.py
Original file line number Diff line number Diff line change
Expand Up @@ -1220,7 +1220,7 @@ def _check_nccl_timeout(expected_timeout):

# test the default value coming from the `init_process_group` kwarg default
dist.init_process_group(**base_opts)
_check_nccl_timeout(torch.distributed.constants.default_pg_nccl_timeout)
_check_nccl_timeout(torch.distributed.distributed_c10d.default_pg_timeout)
dist.destroy_process_group()

# test that `kwarg` timeout takes effect
Expand All @@ -1238,7 +1238,7 @@ def _check_nccl_timeout(expected_timeout):
# TODO(whc) i verified that we are indeed emitting this warning, and i can't figure out why i can't catch it.
# self.assertEqual(len(w), 1)
# self.assertTrue("pg_options._timeout was specified" in str(w[-1].message))
_check_nccl_timeout(torch.distributed.constants.default_pg_nccl_timeout)
_check_nccl_timeout(torch.distributed.distributed_c10d.default_pg_timeout)
dist.destroy_process_group()

# test that timeout value provided via `pg_options` kwarg is ignored and issues warning,
Expand Down
1 change: 0 additions & 1 deletion torch/_C/_distributed_c10d.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ from torch.futures import Future
_DEFAULT_FIRST_BUCKET_BYTES: int
_DEFAULT_NO_TIMEOUT: timedelta
_DEFAULT_PG_TIMEOUT: timedelta
_DEFAULT_PG_NCCL_TIMEOUT: timedelta

class BuiltinCommHookType(Enum):
ALLREDUCE = ...
Expand Down
4 changes: 0 additions & 4 deletions torch/csrc/distributed/c10d/init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2659,10 +2659,6 @@ Example::

module.attr("_DEFAULT_FIRST_BUCKET_BYTES") = ::c10d::kDefaultFirstBucketBytes;
module.attr("_DEFAULT_PG_TIMEOUT") = py::cast(kProcessGroupDefaultTimeout);
#ifdef USE_C10D_NCCL
module.attr("_DEFAULT_PG_NCCL_TIMEOUT") =
py::cast(::c10d::kProcessGroupNCCLDefaultTimeout);
#endif
module.attr("_DEFAULT_NO_TIMEOUT") = py::cast(kNoTimeout);

module.def(
Expand Down
22 changes: 3 additions & 19 deletions torch/distributed/constants.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,7 @@
from torch._C._distributed_c10d import _DEFAULT_PG_TIMEOUT
from datetime import timedelta
from typing import Optional

__all__ = ['default_pg_timeout', 'default_pg_nccl_timeout']

# Default process group wide timeout, if applicable.
# This only applies to the non-nccl backends
# This only applies to the gloo and nccl backends
# (only if NCCL_BLOCKING_WAIT or NCCL_ASYNC_ERROR_HANDLING is set to 1).
# To make an attempt at backwards compatibility with THD, we use an
# extraordinarily high default timeout, given that THD did not have timeouts.
default_pg_timeout: timedelta = _DEFAULT_PG_TIMEOUT
# Separate timeout for PGNCCL mainly becuase it's always been that way in the C++ layer, but until recently
# there was one default that applied across all backends in the python layer.
# Later, we could consider merging them back together at the c++ layer if we can align on a same value.
# (only if NCCL_BLOCKING_WAIT or NCCL_ASYNC_ERROR_HANDLING is set to 1).

try:
from torch._C._distributed_c10d import _DEFAULT_PG_NCCL_TIMEOUT
default_pg_nccl_timeout: Optional[timedelta] = _DEFAULT_PG_NCCL_TIMEOUT
except ImportError:
# if C++ NCCL support is not compiled, we don't have access to the default nccl value.
# if anyone is actually trying to use nccl in this state, it should error.
default_pg_nccl_timeout = None
default_pg_timeout = _DEFAULT_PG_TIMEOUT
152 changes: 92 additions & 60 deletions torch/distributed/distributed_c10d.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
get_debug_level,
Work
)
from .constants import default_pg_timeout, default_pg_nccl_timeout
from .constants import default_pg_timeout
from .c10d_logger import _exception_logger, _time_logger
from .rendezvous import register_rendezvous_handler, rendezvous # noqa: F401
DistStoreError = torch._C._DistStoreError
Expand Down Expand Up @@ -598,20 +598,6 @@ class GroupMember(metaclass=_WorldMeta):
NON_GROUP_MEMBER = -100


def _get_default_timeout(backend: Backend) -> timedelta:
# see note on nccl vs other backend timeout (constants.py)
if backend == Backend.NCCL:
assert isinstance(default_pg_nccl_timeout, timedelta), "no NCCL default timeout, is NCCL support compiled?"
return default_pg_nccl_timeout
else:
return default_pg_timeout

def _check_valid_timeout(timeout: Any) -> None:
if not isinstance(timeout, timedelta):
raise TypeError(
f"Expected timeout argument to be of type datetime.timedelta, got {timeout}"
)

# Default process group state
_default_pg_init_method = None

Expand Down Expand Up @@ -1045,7 +1031,7 @@ def get_backend(group: Optional[ProcessGroup] = None) -> str:
def init_process_group(
backend: Union[str, Backend] = None,
init_method: Optional[str] = None,
timeout: Optional[timedelta] = None,
timeout: timedelta = default_pg_timeout,
world_size: int = -1,
rank: int = -1,
store: Optional[Store] = None,
Expand Down Expand Up @@ -1091,14 +1077,26 @@ def init_process_group(
to exchange connection/address information.
Mutually exclusive with ``init_method``.
timeout (timedelta, optional): Timeout for operations executed against
the process group. Default value is 10 minutes for NCCL and 30 minutes for other backends.
This is the duration after which collectives will be aborted asynchronously and the process will crash.
When ``NCCL_ASYNC_ERROR_HANDLING`` is set, this is the duration after which collectives will be aborted
asynchronously and the process will crash.
This is done since CUDA execution is async and it is no longer safe to continue executing user code since
failed async NCCL operations might result in subsequent CUDA operations running on corrupted data.
When NCCL_BLOCKING_WAIT is set, the process will block and wait for this timeout.
the process group. Default value equals 30 minutes.
This is applicable for the ``gloo`` backend. For ``nccl``, this is
applicable only if the environment variable ``NCCL_BLOCKING_WAIT``
or ``NCCL_ASYNC_ERROR_HANDLING`` is set to 1. When
``NCCL_BLOCKING_WAIT`` is set, this is the duration for which the
process will block and wait for collectives to complete before
throwing an exception. When ``NCCL_ASYNC_ERROR_HANDLING`` is set,
this is the duration after which collectives will be aborted
asynchronously and the process will crash. ``NCCL_BLOCKING_WAIT``
will provide errors to the user which can be caught and handled,
but due to its blocking nature, it has a performance overhead. On
the other hand, ``NCCL_ASYNC_ERROR_HANDLING`` has very little
performance overhead, but crashes the process on errors. This is
done since CUDA execution is async and it is no longer safe to
continue executing user code since failed async NCCL operations
might result in subsequent CUDA operations running on corrupted
data. Only one of these two environment variables should be set.
For ``ucc``, blocking wait is supported similar to NCCL. However,
async error handling is done differently since with UCC we have
progress thread and not watch-dog thread.
group_name (str, optional, deprecated): Group name. This argument is ignored
pg_options (ProcessGroupOptions, optional): process group options
specifying what additional options need to be passed in during
Expand All @@ -1124,6 +1122,11 @@ def init_process_group(
global _backend
global _default_pg_init_method

if not isinstance(timeout, timedelta):
raise TypeError(
"Expected timeout argument to be of type datetime.timedelta"
)

if GroupMember.WORLD is not None:
raise ValueError("trying to initialize the default process group twice!")

Expand All @@ -1142,11 +1145,6 @@ def init_process_group(
else:
backend = Backend("undefined")

if timeout is None:
timeout = _get_default_timeout(backend)

_check_valid_timeout(timeout)

"""
Group name is not visible to users unless they access
internals of c10d. This means we can ignore the value
Expand Down Expand Up @@ -1224,7 +1222,7 @@ def _new_process_group_helper(
store,
group_name,
pg_options=None,
timeout=None,
timeout=default_pg_timeout,
pg_tag=None
):
"""
Expand All @@ -1244,8 +1242,10 @@ def _new_process_group_helper(
"created, please use a different group name"
)

# Note: _new_process_group_helper is only called from init_process_group, which always provides a timeout value
_check_valid_timeout(timeout)
if not isinstance(timeout, timedelta):
raise TypeError(
"Expected timeout argument to be of type datetime.timedelta"
)

if pg_tag not in [None, ""]:
# creating with the same tag and rank set results in the same underlying PG
Expand Down Expand Up @@ -3809,16 +3809,7 @@ def monitored_barrier(group=GroupMember.WORLD, timeout=None, wait_all_ranks=Fals
raise ValueError("monitored_barrier is only implemented for GLOO backend.")

if timeout is None:
timeout = _get_default_timeout(get_backend(group))
elif isinstance(timeout, float):
# TODO(whc) aparently some existing test case for monitored_barrier passes in a timeout in float format?
warnings.warn(
"Please specify timeout arg as a timedelta. "
f"Converting current value of {timeout} assuming it represents seconds",
)
timeout = timedelta(seconds=timeout)

_check_valid_timeout(timeout)
timeout = default_pg_timeout

group_to_use = _get_default_group() if group is None else group
return group_to_use.monitored_barrier(timeout, wait_all_ranks=wait_all_ranks)
Expand All @@ -3832,8 +3823,6 @@ def _create_process_group_wrapper(
world_size: int,
timeout: timedelta = default_pg_timeout,
):
# (whc) this appears to be just for the gloo backend? if so, `default_pg_timeout` is appropriate...

# Create a separate prefix store for the helper process group.
prefix = f"{PG_WRAPPER_STORE_PREFIX}:{store_prefix}"
store = PrefixStore(prefix, store)
Expand Down Expand Up @@ -3863,7 +3852,7 @@ def _get_backend_from_str(backend: Optional[str] = None) -> Backend:


@_time_logger
def new_group(ranks=None, timeout=None, backend=None, pg_options=None, use_local_synchronization=False):
def new_group(ranks=None, timeout=default_pg_timeout, backend=None, pg_options=None, use_local_synchronization=False):
"""
Create a new distributed group.
Expand All @@ -3886,7 +3875,24 @@ def new_group(ranks=None, timeout=None, backend=None, pg_options=None, use_local
Args:
ranks (list[int]): List of ranks of group members. If ``None``, will be
set to all ranks. Default is ``None``.
timeout (timedelta, optional): see `init_process_group` for details and default value.
timeout (timedelta, optional): Timeout for operations executed against
the process group. Default value equals 30 minutes.
This is applicable for the ``gloo`` backend. For ``nccl``, this is
applicable only if the environment variable ``NCCL_BLOCKING_WAIT``
or ``NCCL_ASYNC_ERROR_HANDLING`` is set to 1. When
``NCCL_BLOCKING_WAIT`` is set, this is the duration for which the
process will block and wait for collectives to complete before
throwing an exception. When ``NCCL_ASYNC_ERROR_HANDLING`` is set,
this is the duration after which collectives will be aborted
asynchronously and the process will crash. ``NCCL_BLOCKING_WAIT``
will provide errors to the user which can be caught and handled,
but due to its blocking nature, it has a performance overhead. On
the other hand, ``NCCL_ASYNC_ERROR_HANDLING`` has very little
performance overhead, but crashes the process on errors. This is
done since CUDA execution is async and it is no longer safe to
continue executing user code since failed async NCCL operations
might result in subsequent CUDA operations running on corrupted
data. Only one of these two environment variables should be set.
backend (str or Backend, optional): The backend to use. Depending on
build-time configurations, valid values are ``gloo`` and ``nccl``.
By default uses the same backend as the global group. This field
Expand Down Expand Up @@ -3922,7 +3928,7 @@ def new_group(ranks=None, timeout=None, backend=None, pg_options=None, use_local

def _new_group_with_tag(
ranks=None,
timeout=None,
timeout=default_pg_timeout,
backend=None,
pg_options=None,
pg_tag=None,
Expand All @@ -3941,19 +3947,11 @@ def _new_group_with_tag(
global_rank = default_pg.rank()
global_world_size = default_pg.size()


# Default to the same backend as the global process group
# if the backend is not specified.
if not backend:
backend = default_backend
backend = Backend(backend)

# this timeout defaulting/validation is used for all the new_groups/new_subgroups variants,
# which may just pass their timeout value (or None)
if timeout is None:
timeout = _get_default_timeout(backend)
_check_valid_timeout(timeout)

if use_local_synchronization:
# MPI backend doesn't have have a way for us to perform a partial sync
if backend == Backend.MPI:
Expand Down Expand Up @@ -4035,7 +4033,7 @@ def _new_group_with_tag(
def new_subgroups(
group_size=None,
group=None,
timeout=None,
timeout=default_pg_timeout,
backend=None,
pg_options=None,
):
Expand Down Expand Up @@ -4076,7 +4074,24 @@ def new_subgroups(
the default subgroup size is equal to the number of devices on each machine,
based on the assumption that each machine has exactly the same
number of devices. Default is ``None``.
timeout (timedelta, optional): see `init_process_group` for details and default value.
timeout (timedelta, optional): Timeout for operations executed against
the process group. Default value equals 30 minutes.
This is applicable for the ``gloo`` backend. For ``nccl``, this is
applicable only if the environment variable ``NCCL_BLOCKING_WAIT``
or ``NCCL_ASYNC_ERROR_HANDLING`` is set to 1. When
``NCCL_BLOCKING_WAIT`` is set, this is the duration for which the
process will block and wait for collectives to complete before
throwing an exception. When ``NCCL_ASYNC_ERROR_HANDLING`` is set,
this is the duration after which collectives will be aborted
asynchronously and the process will crash. ``NCCL_BLOCKING_WAIT``
will provide errors to the user which can be caught and handled,
but due to its blocking nature, it has a performance overhead. On
the other hand, ``NCCL_ASYNC_ERROR_HANDLING`` has very little
performance overhead, but crashes the process on errors. This is
done since CUDA execution is async and it is no longer safe to
continue executing user code since failed async NCCL operations
might result in subsequent CUDA operations running on corrupted
data. Only one of these two environment variables should be set.
backend (str or Backend, optional): The backend to use. Depending on
build-time configurations, valid values are ``gloo`` and ``nccl``.
By default uses the same backend as the global group. This field
Expand Down Expand Up @@ -4151,7 +4166,7 @@ def new_subgroups(

def new_subgroups_by_enumeration(
ranks_per_subgroup_list,
timeout=None,
timeout=default_pg_timeout,
backend=None,
pg_options=None,
):
Expand Down Expand Up @@ -4180,8 +4195,25 @@ def new_subgroups_by_enumeration(
Args:
ranks_per_subgroup_list (list[list[int]]): A nested list of ranks of
group members.
timeout (timedelta, optional): see `init_process_group` for details and default value.
backend (str or Backend, optional): The backend to use. Depending on
timeout (timedelta, optional): Timeout for operations executed against
the process group. Default value equals 30 minutes.
This is applicable for the ``gloo`` backend. For ``nccl``, this is
applicable only if the environment variable ``NCCL_BLOCKING_WAIT``
or ``NCCL_ASYNC_ERROR_HANDLING`` is set to 1. When
``NCCL_BLOCKING_WAIT`` is set, this is the duration for which the
process will block and wait for collectives to complete before
throwing an exception. When ``NCCL_ASYNC_ERROR_HANDLING`` is set,
this is the duration after which collectives will be aborted
asynchronously and the process will crash. ``NCCL_BLOCKING_WAIT``
will provide errors to the user which can be caught and handled,
but due to its blocking nature, it has a performance overhead. On
the other hand, ``NCCL_ASYNC_ERROR_HANDLING`` has very little
performance overhead, but crashes the process on errors. This is
done since CUDA execution is async and it is no longer safe to
continue executing user code since failed async NCCL operations
might result in subsequent CUDA operations running on corrupted
data. Only one of these two environment variables should be set.
backend (str or Backend, optional): The backend to use. Depending on
build-time configurations, valid values are ``gloo`` and ``nccl``.
By default uses the same backend as the global group. This field
should be given as a lowercase string (e.g., ``"gloo"``), which can
Expand Down

0 comments on commit 75adb9f

Please sign in to comment.