From c606808b20c88d6e2cc388bd650abf34ccae17cf Mon Sep 17 00:00:00 2001 From: Anton Schwaighofer Date: Mon, 13 Nov 2023 10:48:44 +0000 Subject: [PATCH] ENH: Enable single-node MPI jobs with SDK v1 (#914) Extend the `use_mpi_run_for_single_node_jobs` flag to SDK v1 jobs --- hi-ml-azure/src/health_azure/himl.py | 19 ++++---- hi-ml-azure/testazure/testazure/test_himl.py | 48 +++++++++++++++++++- 2 files changed, 57 insertions(+), 10 deletions(-) diff --git a/hi-ml-azure/src/health_azure/himl.py b/hi-ml-azure/src/health_azure/himl.py index 172dc3508..fe784fcc3 100644 --- a/hi-ml-azure/src/health_azure/himl.py +++ b/hi-ml-azure/src/health_azure/himl.py @@ -160,6 +160,7 @@ def create_run_configuration( max_run_duration: str = "", input_datasets: Optional[List[DatasetConfig]] = None, output_datasets: Optional[List[DatasetConfig]] = None, + use_mpi_run_for_single_node_jobs: bool = False, ) -> RunConfiguration: """ Creates an AzureML run configuration, that contains information about environment, multi node execution, and @@ -188,6 +189,8 @@ def create_run_configuration( :param output_datasets: The script will create a temporary folder when running in AzureML, and while the job writes data to that folder, upload it to blob storage, in the data store. :param num_nodes: The number of nodes to use in distributed training on AzureML. + :param use_mpi_run_for_single_node_jobs: If True, even single node jobs will be run as distributed MPI jobs. + If False, single node jobs will not be run as distributed jobs. :return: """ run_config = RunConfiguration() @@ -225,9 +228,8 @@ def create_run_configuration( if max_run_duration: run_config.max_run_duration_seconds = run_duration_string_to_seconds(max_run_duration) - # Create MPI configuration for distributed jobs (unless num_splits > 1, in which case - # an AML HyperdriveConfig is instantiated instead - if num_nodes > 1: + # Create MPI configuration for distributed jobs + if num_nodes > 1 or use_mpi_run_for_single_node_jobs: distributed_job_config = MpiConfiguration(node_count=num_nodes) run_config.mpi = distributed_job_config run_config.framework = "Python" @@ -483,7 +485,7 @@ def create_command_job(cmd: str) -> Command: # On AML managed compute, we can set distribution to None for single node jobs. # However, on Kubernetes compute, single node jobs don't see any GPUs. GPUs are visible for MpiDistribution # jobs, so we set MpiDistribution even for single node jobs. - if use_mpi_run_for_single_node_jobs: + if num_nodes > 1 or use_mpi_run_for_single_node_jobs: distribution = MpiDistribution(process_count_per_instance=1) else: distribution = PyTorchDistribution(process_count_per_instance=pytorch_processes_per_node) @@ -741,7 +743,7 @@ def submit_to_azure_if_needed( # type: ignore strictly_aml_v1: bool = False, identity_based_auth: bool = False, pytorch_processes_per_node_v2: Optional[int] = None, - use_mpi_run_for_single_node_jobs: bool = True, + use_mpi_run_for_single_node_jobs: bool = False, display_name: Optional[str] = None, entry_command: Optional[PathOrString] = None, ) -> AzureRunInfo: # pragma: no cover @@ -814,9 +816,9 @@ def submit_to_azure_if_needed( # type: ignore :param pytorch_processes_per_node_v2: For plain PyTorch multi-GPU processing: The number of processes per node. This is only supported with AML SDK v2, and ignored in v1. If supplied, the job will be submitted as using the "pytorch" framework (rather than "Python"), and using "nccl" as the communication backend. - :param use_mpi_run_for_single_node_jobs: If True, even single node jobs with SDK v2 will be run as distributed MPI - jobs. This is required for Kubernetes compute. If False, single node jobs will not be run as distributed jobs. - This setting only affects jobs submitted with SDK v2 (when `strictly_aml_v1=False`) + :param use_mpi_run_for_single_node_jobs: If True, even single node jobs will be run as distributed MPI + jobs. If False, single node jobs will not be run as distributed jobs. + Setting this flag to True is required Kubernetes compute. :param display_name: The name for the run that will be displayed in the AML UI. If not provided, a random display name will be generated by AzureML. :return: If the script is submitted to AzureML then we terminate python as the script should be executed in AzureML, @@ -926,6 +928,7 @@ def submit_to_azure_if_needed( # type: ignore max_run_duration=max_run_duration, input_datasets=cleaned_input_datasets, output_datasets=cleaned_output_datasets, + use_mpi_run_for_single_node_jobs=use_mpi_run_for_single_node_jobs, ) script_run_config = create_script_run( diff --git a/hi-ml-azure/testazure/testazure/test_himl.py b/hi-ml-azure/testazure/testazure/test_himl.py index 2d1585e4c..bd0289ef0 100644 --- a/hi-ml-azure/testazure/testazure/test_himl.py +++ b/hi-ml-azure/testazure/testazure/test_himl.py @@ -33,6 +33,7 @@ from azureml.core import ComputeTarget, Environment, RunConfiguration, ScriptRunConfig, Workspace from azureml.data.azure_storage_datastore import AzureBlobDatastore from azureml.data.dataset_consumption_config import DatasetConsumptionConfig +from azureml.core.runconfig import MpiConfiguration from azureml.dataprep.fuse.daemon import MountContext from azureml.train.hyperdrive import HyperDriveConfig @@ -2100,8 +2101,7 @@ def test_submit_to_azure_v2_distributed() -> None: assert call_kwargs.get("num_nodes") == num_nodes assert call_kwargs.get("pytorch_processes_per_node") == processes_per_node - # Single node job: The "distribution" argument of "command" should be set to MpiRun, to ensure that it - # runs fine on Kubernetes compute. + # Single node job: The "distribution" argument of "command" should be set to None with patch("health_azure.himl.command") as mock_command: _ = himl.submit_to_azure_if_needed( workspace_config_file="mockconfig.json", @@ -2113,6 +2113,22 @@ def test_submit_to_azure_v2_distributed() -> None: mock_command.assert_called_once() _, call_kwargs = mock_command.call_args assert call_kwargs.get("instance_count") == 1 + assert call_kwargs.get("distribution") is None + + # Single node job where we set the flag to use MPI runs nevertheless: The "distribution" argument of + # "command" should be set to MpiRun, to ensure that it runs fine on Kubernetes compute. + with patch("health_azure.himl.command") as mock_command: + _ = himl.submit_to_azure_if_needed( + workspace_config_file="mockconfig.json", + entry_script=Path(__file__), + snapshot_root_directory=Path.cwd(), + submit_to_azureml=True, + strictly_aml_v1=False, + use_mpi_run_for_single_node_jobs=True, + ) + mock_command.assert_called_once() + _, call_kwargs = mock_command.call_args + assert call_kwargs.get("instance_count") == 1 assert call_kwargs.get("distribution") == MpiDistribution(process_count_per_instance=1) # Single node job: The "distribution" argument of "command" should be set to None if we are passing a flag @@ -2217,3 +2233,31 @@ def test_extract_v2_data_asset_from_env_vars() -> None: himl._extract_v2_data_asset_from_env_vars(i, "INPUT_") for i in range(len(valid_mock_environment)) ] assert input_datasets == [Path("input_0"), Path("input_1"), Path("input_2"), Path("input_3")] + + +@pytest.mark.fast +@pytest.mark.parametrize("use_mpi_run_for_single_node_jobs", [True, False]) +def test_mpi_for_single_node_jobs_v1(use_mpi_run_for_single_node_jobs: bool) -> None: + """Test if we can create an MPI job with a single node using the v1 SDK.""" + with ( + patch("health_azure.himl.submit_run") as mock_submit_run, + patch("health_azure.himl.register_environment"), + patch("health_azure.himl.validate_compute_cluster"), + ): + with pytest.raises(SystemExit): + himl.submit_to_azure_if_needed( + aml_workspace=MagicMock(name="workspace"), + submit_to_azureml=True, + strictly_aml_v1=True, + use_mpi_run_for_single_node_jobs=use_mpi_run_for_single_node_jobs, + entry_script="foo", + ) + mock_submit_run.assert_called_once() + run_config = mock_submit_run.call_args[1]["script_run_config"].run_config + assert run_config.node_count == 1 + assert isinstance(run_config.mpi, MpiConfiguration) + assert run_config.mpi.node_count == 1 + if use_mpi_run_for_single_node_jobs: + assert run_config.communicator == "IntelMpi" + else: + assert run_config.communicator == "None"