diff --git a/test/distributed/elastic/multiprocessing/api_test.py b/test/distributed/elastic/multiprocessing/api_test.py index 811137a8d83b..6e998c5c271d 100644 --- a/test/distributed/elastic/multiprocessing/api_test.py +++ b/test/distributed/elastic/multiprocessing/api_test.py @@ -15,7 +15,7 @@ import time import unittest from itertools import product -from typing import Dict, List, Union, Callable +from typing import Callable, Dict, List, Union from unittest import mock from unittest.mock import patch @@ -24,25 +24,25 @@ from torch.distributed.elastic.multiprocessing import ProcessFailure, start_processes from torch.distributed.elastic.multiprocessing.api import ( MultiprocessContext, - SignalException, RunProcsResult, + SignalException, Std, _validate_full_rank, - to_map, _wrap, + to_map, ) from torch.distributed.elastic.multiprocessing.errors.error_handler import _write_error from torch.testing._internal.common_utils import ( + IS_IN_CI, + IS_MACOS, + IS_WINDOWS, NO_MULTIPROCESSING_SPAWN, TEST_WITH_ASAN, - TEST_WITH_TSAN, TEST_WITH_DEV_DBG_ASAN, - IS_IN_CI, - IS_WINDOWS, - IS_MACOS, + TEST_WITH_TSAN, + run_tests, sandcastle_skip_if, ) -from torch.testing._internal.common_utils import run_tests class RunProcResultsTest(unittest.TestCase): @@ -224,6 +224,7 @@ def start_processes_zombie_test( # tests incompatible with tsan or asan if not (TEST_WITH_DEV_DBG_ASAN or IS_WINDOWS or IS_MACOS): + class StartProcessesTest(unittest.TestCase): def setUp(self): self.test_dir = tempfile.mkdtemp(prefix=f"{self.__class__.__name__}_") @@ -251,12 +252,15 @@ def assert_pids_noexist(self, pids: Dict[int, int]): def test_to_map(self): local_world_size = 2 - self.assertEqual({0: Std.OUT, 1: Std.OUT}, to_map(Std.OUT, local_world_size)) + self.assertEqual( + {0: Std.OUT, 1: Std.OUT}, to_map(Std.OUT, local_world_size) + ) self.assertEqual( {0: Std.NONE, 1: Std.OUT}, to_map({1: Std.OUT}, local_world_size) ) self.assertEqual( - {0: Std.ERR, 1: Std.OUT}, to_map({0: Std.ERR, 1: Std.OUT}, local_world_size) + {0: Std.ERR, 1: Std.OUT}, + to_map({0: Std.ERR, 1: Std.OUT}, local_world_size), ) def test_invalid_log_dir(self): @@ -382,9 +386,7 @@ def test_void_function(self): results = pc.wait(period=0.1) self.assertEqual({0: None, 1: None}, results.return_values) - @sandcastle_skip_if( - TEST_WITH_DEV_DBG_ASAN, "tests incompatible with asan" - ) + @sandcastle_skip_if(TEST_WITH_DEV_DBG_ASAN, "tests incompatible with asan") def test_function_large_ret_val(self): # python multiprocessing.queue module uses pipes and actually PipedQueues # This means that if a single object is greater than a pipe size @@ -439,7 +441,9 @@ def test_function_raise(self): self.assertEqual(1, failure.exitcode) self.assertEqual("", failure.signal_name()) self.assertEqual(pc.pids()[0], failure.pid) - self.assertEqual(os.path.join(log_dir, "0", "error.json"), error_file) + self.assertEqual( + os.path.join(log_dir, "0", "error.json"), error_file + ) self.assertEqual( int(error_file_data["message"]["extraInfo"]["timestamp"]), int(failure.timestamp), @@ -541,17 +545,22 @@ def test_multiprocessing_context_poll_raises_exception(self): run_result = mp_context._poll() self.assertEqual(1, len(run_result.failures)) failure = run_result.failures[0] - self.assertEqual("Signal 1 (SIGHUP) received by PID 123", failure.message) + self.assertEqual( + "Signal 1 (SIGHUP) received by PID 123", failure.message + ) # tests incompatible with tsan or asan, the redirect functionality does not work on macos or windows if not (TEST_WITH_DEV_DBG_ASAN or IS_WINDOWS or IS_MACOS): + class StartProcessesListTest(StartProcessesTest): ######################################## # start_processes as binary tests ######################################## def test_function(self): - for start_method, redirs in product(self._start_methods, redirects_oss_test()): + for start_method, redirs in product( + self._start_methods, redirects_oss_test() + ): with self.subTest(start_method=start_method, redirs=redirs): pc = start_processes( name="echo", @@ -644,6 +653,7 @@ def test_binary_redirect_and_tee(self): # tests incompatible with tsan or asan, the redirect functionality does not work on macos or windows if not (TEST_WITH_DEV_DBG_ASAN or IS_WINDOWS or IS_MACOS or IS_IN_CI): + class StartProcessesNotCITest(StartProcessesTest): def test_wrap_bad(self): none = "" @@ -796,7 +806,8 @@ def test_function_exit(self): self.assertEqual(pc.pids()[0], failure.pid) self.assertEqual("", error_file) self.assertEqual( - f"Process failed with exitcode {FAIL}", failure.message + "To enable traceback see: https://pytorch.org/docs/stable/elastic/errors.html", + failure.message, ) self.assertLessEqual(failure.timestamp, int(time.time())) diff --git a/test/distributed/elastic/multiprocessing/errors/api_test.py b/test/distributed/elastic/multiprocessing/errors/api_test.py index 859069004ae7..a24986c65981 100644 --- a/test/distributed/elastic/multiprocessing/errors/api_test.py +++ b/test/distributed/elastic/multiprocessing/errors/api_test.py @@ -115,7 +115,10 @@ def test_process_failure_no_error_file(self): pf = self.failure_without_error_file(exitcode=138) self.assertEqual("", pf.signal_name()) self.assertEqual("", pf.error_file) - self.assertEqual("Process failed with exitcode 138", pf.message) + self.assertEqual( + "To enable traceback see: https://pytorch.org/docs/stable/elastic/errors.html", + pf.message, + ) def test_child_failed_error(self): pf0 = self.failure_with_error_file(exception=SentinelError("rank 0")) @@ -134,7 +137,7 @@ def test_child_failed_error(self): rank: 0 (local_rank: 0) exitcode: 1 (pid: 997) error_file: /tmp/ApiTesttbb37ier/error.json - msg: "SentinelError: rank 0" + traceback: "SentinelError: rank 0" ============================================= Other Failures: [1]: @@ -148,7 +151,7 @@ def test_child_failed_error(self): rank: 2 (local_rank: 0) exitcode: 138 (pid: 997) error_file: - msg: "Process failed with exitcode 138" + traceback: To enable traceback see: https://pytorch.org/docs/stable/elastic/errors.html ********************************************* """ print(ex) diff --git a/torch/distributed/elastic/agent/server/local_elastic_agent.py b/torch/distributed/elastic/agent/server/local_elastic_agent.py index 4a8238f1259e..c84df1a8e434 100644 --- a/torch/distributed/elastic/agent/server/local_elastic_agent.py +++ b/torch/distributed/elastic/agent/server/local_elastic_agent.py @@ -21,10 +21,11 @@ WorkerState, ) from torch.distributed.elastic.metrics.api import prof -from torch.distributed.elastic.multiprocessing import start_processes, PContext +from torch.distributed.elastic.multiprocessing import PContext, start_processes from torch.distributed.elastic.utils import macros from torch.distributed.elastic.utils.logging import get_logger + log = get_logger() diff --git a/torch/distributed/elastic/multiprocessing/errors/__init__.py b/torch/distributed/elastic/multiprocessing/errors/__init__.py index c1be1e093484..be955d28d470 100644 --- a/torch/distributed/elastic/multiprocessing/errors/__init__.py +++ b/torch/distributed/elastic/multiprocessing/errors/__init__.py @@ -51,6 +51,7 @@ import json import os import signal +import socket import time import warnings from dataclasses import dataclass, field @@ -109,7 +110,7 @@ def __post_init__(self): try: with open(self.error_file, "r") as fp: self.error_file_data = json.load(fp) - log.info( + log.debug( f"User process failed with error data: {json.dumps(self.error_file_data, indent=2)}" ) self.message, self.timestamp = self._get_error_data( @@ -130,7 +131,7 @@ def __post_init__(self): f" received by PID {self.pid}" ) else: - self.message = f"Process failed with exitcode {self.exitcode}" + self.message = "To enable traceback see: https://pytorch.org/docs/stable/elastic/errors.html" def _get_error_data(self, error_file_data: Dict[str, Any]) -> Tuple[str, int]: message = error_file_data["message"] @@ -162,24 +163,24 @@ def timestamp_isoformat(self): GlobalRank = int _FAILURE_FORMAT_TEMPLATE = """[${idx}]: - time: ${time} - rank: ${rank} (local_rank: ${local_rank}) - exitcode: ${exitcode} (pid: ${pid}) + time : ${time} + host : ${hostname} + rank : ${rank} (local_rank: ${local_rank}) + exitcode : ${exitcode} (pid: ${pid}) error_file: ${error_file} - msg: ${message}""" + traceback : ${message}""" # extra new lines before and after are intentional _MSG_FORMAT_TEMPLATE = """ ${boarder} ${title} ${section} -Root Cause: -${root_failure} -${section} -Other Failures: +Failures: ${other_failures} -${boarder} -""" +${section} +Root Cause (first observed failure): +${root_failure} +${boarder}""" class ChildFailedError(Exception): @@ -230,8 +231,8 @@ def get_first_failure(self) -> Tuple[GlobalRank, ProcessFailure]: rank = min(self.failures.keys(), key=lambda r: self.failures[r].timestamp) return rank, self.failures[rank] - def format_msg(self, boarder_delim="*", section_delim="="): - title = f" {self.name} FAILED " + def format_msg(self, boarder_delim="=", section_delim="-"): + title = f"{self.name} FAILED" root_rank, root_failure = self.get_first_failure() root_failure_fmt: str = "" @@ -246,11 +247,11 @@ def format_msg(self, boarder_delim="*", section_delim="="): other_failures_fmt.append(fmt) # upper boundary on width - width = min(width, 80) + width = min(width, 60) return Template(_MSG_FORMAT_TEMPLATE).substitute( boarder=boarder_delim * width, - title=title.center(width), + title=title, section=section_delim * width, root_failure=root_failure_fmt, other_failures="\n".join(other_failures_fmt or [" "]), @@ -279,6 +280,7 @@ def _format_failure( fmt = Template(_FAILURE_FORMAT_TEMPLATE).substitute( idx=idx, time=failure.timestamp_isoformat(), + hostname=socket.getfqdn(), rank=rank, local_rank=failure.local_rank, exitcode=failure.exitcode, @@ -292,32 +294,6 @@ def _format_failure( return fmt, width -def _no_error_file_warning_msg(rank: int, failure: ProcessFailure) -> str: - msg = [ - "CHILD PROCESS FAILED WITH NO ERROR_FILE", - f"Child process {failure.pid} (local_rank {rank}) FAILED (exitcode {failure.exitcode})", - f"Error msg: {failure.message}", - f"Without writing an error file to {failure.error_file}.", - "While this DOES NOT affect the correctness of your application,", - "no trace information about the error will be available for inspection.", - "Consider decorating your top level entrypoint function with", - "torch.distributed.elastic.multiprocessing.errors.record. Example:", - "", - r" from torch.distributed.elastic.multiprocessing.errors import record", - "", - r" @record", - r" def trainer_main(args):", - r" # do train", - ] - width = 0 - for line in msg: - width = max(width, len(line)) - - boarder = "*" * width - header = "CHILD PROCESS FAILED WITH NO ERROR_FILE".center(width) - return "\n".join(["\n", boarder, header, boarder, *msg, boarder]) - - def record( fn: Callable[..., T], error_handler: Optional[ErrorHandler] = None ) -> Callable[..., T]: @@ -372,7 +348,13 @@ def wrapper(*args, **kwargs): if failure.error_file != _NOT_AVAILABLE: error_handler.dump_error_file(failure.error_file, failure.exitcode) else: - warnings.warn(_no_error_file_warning_msg(rank, failure)) + log.info( + ( + f"local_rank {rank} FAILED with no error file." + f" Decorate your entrypoint fn with @record for traceback info." + f" See: https://pytorch.org/docs/stable/elastic/errors.html" + ) + ) raise except Exception as e: error_handler.record_exception(e) diff --git a/torch/distributed/elastic/multiprocessing/errors/error_handler.py b/torch/distributed/elastic/multiprocessing/errors/error_handler.py index 74586e9fd852..da47eb77e8a5 100644 --- a/torch/distributed/elastic/multiprocessing/errors/error_handler.py +++ b/torch/distributed/elastic/multiprocessing/errors/error_handler.py @@ -107,7 +107,7 @@ def dump_error_file(self, rootcause_error_file: str, error_code: int = 0): else: rootcause_error["message"]["errorCode"] = error_code - log.info( + log.debug( f"child error file ({rootcause_error_file}) contents:\n" f"{json.dumps(rootcause_error, indent=2)}" ) diff --git a/torch/distributed/run.py b/torch/distributed/run.py index c6e84d6f65f4..127d91355a4f 100644 --- a/torch/distributed/run.py +++ b/torch/distributed/run.py @@ -304,6 +304,27 @@ def train(): if should_checkpoint: save_checkpoint(checkpoint_path) + +9. (Recommended) On worker errors, this tool will summarize the details of the error + (e.g. time, rank, host, pid, traceback, etc). On each node, the first error (by timestamp) + is heuristically reported as the "Root Cause" error. To get tracebacks as part of this + error summary print out, you must decorate your main entrypoint function in your + training script as shown in the example below. If not decorated, then the summary + will not include the traceback of the exception and will only contain the exitcode. + For details on torchelastic error handling see: https://pytorch.org/docs/stable/elastic/errors.html + +:: + + from torch.distributed.elastic.multiprocessing.errors import record + + @record + def main(): + # do train + pass + + if __name__ == "__main__": + main() + """ import logging import os @@ -597,7 +618,7 @@ def config_from_args(args) -> Tuple[LaunchConfig, Union[Callable, str], List[str if "OMP_NUM_THREADS" not in os.environ and nproc_per_node > 1: omp_num_threads = 1 log.warning( - f"*****************************************\n" + f"\n*****************************************\n" f"Setting OMP_NUM_THREADS environment variable for each process to be " f"{omp_num_threads} in default, to avoid your system being overloaded, " f"please further tune the variable for optimal performance in "