From f2d57e074519f2294b6c6c16eb86be5e5050caa1 Mon Sep 17 00:00:00 2001 From: Nisheeth Lahoti Date: Thu, 27 Jul 2023 09:36:51 +0530 Subject: [PATCH] Hydra + DDP improvements * Create different hydra output subdirectories for processes started by DDP * Support experimental-rerun * If rerun is not enabled but multi-run used, raise explicit error Reverts parts of #15737 --- .../strategies/launchers/subprocess_script.py | 22 ++++- .../launchers/test_subprocess_script.py | 96 ++++++++++++++++++- 2 files changed, 114 insertions(+), 4 deletions(-) diff --git a/src/lightning/fabric/strategies/launchers/subprocess_script.py b/src/lightning/fabric/strategies/launchers/subprocess_script.py index 62b3a718b90f92..1001600364c8cc 100644 --- a/src/lightning/fabric/strategies/launchers/subprocess_script.py +++ b/src/lightning/fabric/strategies/launchers/subprocess_script.py @@ -14,6 +14,7 @@ import os import subprocess import sys +from pathlib import Path from typing import Any, Callable, Optional, Sequence, Tuple from lightning_utilities.core.imports import RequirementCache @@ -143,6 +144,8 @@ def _basic_subprocess_cmd() -> Sequence[str]: def _hydra_subprocess_cmd(local_rank: int) -> Tuple[Sequence[str], str]: import __main__ # local import to avoid https://github.com/Lightning-AI/lightning/issues/15218 + from hydra.core.hydra_config import HydraConfig + from hydra.types import RunMode from hydra.utils import get_original_cwd, to_absolute_path # when user is using hydra find the absolute path @@ -151,9 +154,22 @@ def _hydra_subprocess_cmd(local_rank: int) -> Tuple[Sequence[str], str]: else: command = [sys.executable, "-m", __main__.__spec__.name] - command += sys.argv[1:] + # extract the hydra configuration + hydra_cfg = HydraConfig.get() + hydra_output = Path(hydra_cfg.runtime.output_dir) + + if hydra_cfg.output_subdir is None: # config isn't saved, so re-run original command + if hydra_cfg.mode == RunMode.MULTIRUN: + raise RuntimeError(f"DDP with multirun requires either re-run callback or saved config file") + command += sys.argv[1:] + [f"hydra.run.dir={hydra_output}"] # Keep output directory same + else: + hydra_subdir = hydra_output / hydra_cfg.output_subdir + pickled_config_path = hydra_subdir / "config.pickle" + if pickled_config_path.exists(): + command += ["--experimental-rerun", str(pickled_config_path)] + else: + command += ["-cp", str(hydra_subdir), "-cn", "config.yaml"] # Used saved config for new run + command += [f"hydra.output_subdir=.pl_ddp_hydra_{local_rank}", f"hydra.run.dir={hydra_output}"] cwd = get_original_cwd() - os_cwd = f'"{os.getcwd()}"' - command += [f"hydra.run.dir={os_cwd}", f"hydra.job.name=train_ddp_process_{local_rank}"] return command, cwd diff --git a/tests/tests_pytorch/strategies/launchers/test_subprocess_script.py b/tests/tests_pytorch/strategies/launchers/test_subprocess_script.py index 9f2fb371dc967b..a27c0a2a5be327 100644 --- a/tests/tests_pytorch/strategies/launchers/test_subprocess_script.py +++ b/tests/tests_pytorch/strategies/launchers/test_subprocess_script.py @@ -1,5 +1,6 @@ import subprocess import sys +from pathlib import Path from unittest.mock import Mock import pytest @@ -13,6 +14,7 @@ if _HYDRA_WITH_RUN_PROCESS: from hydra.test_utils.test_utils import run_process + from omegaconf import OmegaConf # Script to run from command line @@ -48,7 +50,7 @@ def task_fn(cfg): @RunIf(min_cuda_gpus=2, skip_windows=True, standalone=True) @pytest.mark.skipif(not _HYDRA_WITH_RUN_PROCESS, reason=str(_HYDRA_WITH_RUN_PROCESS)) -@pytest.mark.parametrize("subdir", [None, "dksa", ".hello"]) +@pytest.mark.parametrize("subdir", [None, "null", "dksa", ".hello"]) def test_ddp_with_hydra_runjob(subdir, tmpdir, monkeypatch): monkeypatch.chdir(tmpdir) @@ -63,6 +65,98 @@ def test_ddp_with_hydra_runjob(subdir, tmpdir, monkeypatch): cmd += [f"hydra.output_subdir={subdir}"] run_process(cmd) + if subdir == "null": # There should be no subdirectory created + # Make sure there's no config.yaml + logs = list(Path.cwd().glob("**/config.yaml")) + assert len(logs) == 0 + else: + # Make sure config.yaml was created for additional processes. + logs = list(Path.cwd().glob("**/config.yaml")) + assert len(logs) == devices + + # Make sure the parameter was set and used + cfg = OmegaConf.load(logs[0]) + assert cfg.devices == devices + + # Make sure PL spawned a job that is logged by Hydra + logs = list(Path.cwd().glob("**/*.log")) + assert len(logs) == 1 + + +@RunIf(min_cuda_gpus=2, skip_windows=True, standalone=True) +@pytest.mark.skipif(not _HYDRA_WITH_RUN_PROCESS, reason=str(_HYDRA_WITH_RUN_PROCESS)) +@pytest.mark.parametrize("num_jobs", [1, 2]) +def test_ddp_with_hydra_multirunjob(tmpdir, num_jobs, monkeypatch): + monkeypatch.chdir(tmpdir) + + # Save script locally + with open("temp.py", "w") as fn: + fn.write(script) + + # create fake multirun params based on `num_jobs` + fake_param = "+foo=" + ",".join(str(i) for i in range(num_jobs)) + + # Run CLI + run_process([sys.executable, "temp.py", "+devices=2", '+strategy="ddp"', fake_param, "--multirun"]) + + # Make sure config.yaml was created for each job + configs = sorted(Path.cwd().glob("**/.pl_ddp_hydra_*/config.yaml")) + assert len(configs) == num_jobs + + # Make sure the parameter was set and used for each job + for i, config in enumerate(configs): + cfg = OmegaConf.load(config) + local_rank = int(config.parent.parent.parts[-1]) + assert cfg.devices == 2 + assert cfg.foo == local_rank + + logs = list(Path.cwd().glob("**/*.log")) + assert len(logs) == num_jobs + + +yaml_file = """ +hydra: + callbacks: + save_job_info: + _target_: hydra.experimental.callbacks.PickleJobInfoCallback +""" + + +@RunIf(min_cuda_gpus=2, skip_windows=True, standalone=True) +@pytest.mark.skipif(not _HYDRA_WITH_RERUN, reason=str(_HYDRA_WITH_RERUN)) +@pytest.mark.parametrize("num_jobs", [1, 2]) +def test_ddp_with_hydra_multirunjob_rerun(tmpdir, num_jobs, monkeypatch): + monkeypatch.chdir(tmpdir) + + # Save script locally + with open("temp.py", "w") as fn: + fn.write(script) + + with open("config.yaml", "w") as fn: + fn.write(yaml_file) + + # create fake multirun params based on `num_jobs` + fake_param = "+foo=" + ",".join(str(i) for i in range(num_jobs)) + + # Run CLI + run_process( + [ + sys.executable, + "temp.py", + "-cp", + ".", + "-cn", + "config.yaml", + "+devices=2", + '+strategy="ddp"', + fake_param, + "--multirun", + ] + ) + + pickles = sorted(Path.cwd().glob("**/.hydra/config.pickle")) + assert len(pickles) == num_jobs + def test_kill(): launcher = _SubprocessScriptLauncher(Mock(), 1, 1)