From 6edd2c13236d0f19168e8a6aedd0a66bd105ffb2 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Sun, 27 Jul 2025 12:32:47 +0200 Subject: [PATCH 01/30] Debug slurm pmi options --- .github/workflows/pipeline.yml | 35 ++++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/.github/workflows/pipeline.yml b/.github/workflows/pipeline.yml index 753b8d45..4a8feeef 100644 --- a/.github/workflows/pipeline.yml +++ b/.github/workflows/pipeline.yml @@ -295,6 +295,41 @@ jobs: run: | pip install . --no-deps --no-build-isolation cd tests + srun --mpi=list + python -m unittest test_slurmclusterexecutor.py + python -m unittest test_slurmjobexecutor.py + + unittest_slurm_openmpi: + needs: [black] + runs-on: ubuntu-latest + services: + mysql: + image: mysql:8.0 + env: + MYSQL_ROOT_PASSWORD: root + ports: + - "8888:3306" + options: --health-cmd="mysqladmin ping" --health-interval=10s --health-timeout=5s --health-retries=3 + steps: + - uses: actions/checkout@v4 + - uses: koesterlab/setup-slurm-action@v1 + timeout-minutes: 5 + - name: Conda config + shell: bash -l {0} + run: echo -e "channels:\n - conda-forge\n" > .condarc + - uses: conda-incubator/setup-miniconda@v3 + with: + python-version: '3.13' + miniforge-version: latest + condarc-file: .condarc + environment-file: .ci_support/environment-mpich.yml + - name: Test + shell: bash -l {0} + timeout-minutes: 5 + run: | + pip install . --no-deps --no-build-isolation + cd tests + srun --mpi=list python -m unittest test_slurmclusterexecutor.py python -m unittest test_slurmjobexecutor.py From 756e55c07b420ec5c8270669b1a47acb28c800f0 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Sun, 27 Jul 2025 12:57:14 +0200 Subject: [PATCH 02/30] Update pipeline.yml --- .github/workflows/pipeline.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/pipeline.yml b/.github/workflows/pipeline.yml index 4a8feeef..754a3273 100644 --- a/.github/workflows/pipeline.yml +++ b/.github/workflows/pipeline.yml @@ -322,7 +322,7 @@ jobs: python-version: '3.13' miniforge-version: latest condarc-file: .condarc - environment-file: .ci_support/environment-mpich.yml + environment-file: .ci_support/environment-openmpi.yml - name: Test shell: bash -l {0} timeout-minutes: 5 From ec507ec64b67111e370766a5ef84bed31c3ae7cb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sun, 27 Jul 2025 14:14:34 +0200 Subject: [PATCH 03/30] disable MPI parallel test --- .github/workflows/pipeline.yml | 1 - 1 file changed, 1 deletion(-) diff --git a/.github/workflows/pipeline.yml b/.github/workflows/pipeline.yml index 754a3273..22f8056d 100644 --- a/.github/workflows/pipeline.yml +++ b/.github/workflows/pipeline.yml @@ -330,7 +330,6 @@ jobs: pip install . --no-deps --no-build-isolation cd tests srun --mpi=list - python -m unittest test_slurmclusterexecutor.py python -m unittest test_slurmjobexecutor.py unittest_mpich: From b3ae43e5f92d9d2dcd8f8fd29489edd5a2017ed5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sun, 27 Jul 2025 14:23:10 +0200 Subject: [PATCH 04/30] sinfo --- .github/workflows/pipeline.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/pipeline.yml b/.github/workflows/pipeline.yml index 22f8056d..195c2e07 100644 --- a/.github/workflows/pipeline.yml +++ b/.github/workflows/pipeline.yml @@ -295,6 +295,7 @@ jobs: run: | pip install . --no-deps --no-build-isolation cd tests + sinfo srun --mpi=list python -m unittest test_slurmclusterexecutor.py python -m unittest test_slurmjobexecutor.py @@ -329,6 +330,7 @@ jobs: run: | pip install . --no-deps --no-build-isolation cd tests + sinfo srun --mpi=list python -m unittest test_slurmjobexecutor.py From 21afba9d3f3afc98bfa329d9328c1113f2857ccd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sun, 27 Jul 2025 14:29:36 +0200 Subject: [PATCH 05/30] update output --- .github/workflows/pipeline.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/pipeline.yml b/.github/workflows/pipeline.yml index 195c2e07..f4a1ff21 100644 --- a/.github/workflows/pipeline.yml +++ b/.github/workflows/pipeline.yml @@ -295,7 +295,7 @@ jobs: run: | pip install . --no-deps --no-build-isolation cd tests - sinfo + sinfo -o "%n %e %m %a %c %C" srun --mpi=list python -m unittest test_slurmclusterexecutor.py python -m unittest test_slurmjobexecutor.py @@ -330,7 +330,7 @@ jobs: run: | pip install . --no-deps --no-build-isolation cd tests - sinfo + sinfo -o "%n %e %m %a %c %C" srun --mpi=list python -m unittest test_slurmjobexecutor.py From d38dda3f5b39df8962391070dc5f07534f8487e3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sun, 27 Jul 2025 14:40:24 +0200 Subject: [PATCH 06/30] add mpi parallel test --- tests/test_slurmjobexecutor.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/tests/test_slurmjobexecutor.py b/tests/test_slurmjobexecutor.py index 5ef889d2..15814a4d 100644 --- a/tests/test_slurmjobexecutor.py +++ b/tests/test_slurmjobexecutor.py @@ -14,6 +14,14 @@ def calc(i): return i +def mpi_funct(i): + from mpi4py import MPI + + size = MPI.COMM_WORLD.Get_size() + rank = MPI.COMM_WORLD.Get_rank() + return i, size, rank + + @unittest.skipIf( skip_slurm_test, "Slurm is not installed, so the Slurm tests are skipped." ) @@ -26,3 +34,13 @@ def test_slurm_executor_serial(self): self.assertEqual(fs_2.result(), 2) self.assertTrue(fs_1.done()) self.assertTrue(fs_2.done()) + + def test_slurm_executor_parallel(self): + with SlurmJobExecutor( + max_cores=2, + resource_dict={"cores": 2}, + block_allocation=True, + ) as exe: + fs_1 = exe.submit(mpi_funct, 1) + self.assertEqual(fs_1.result(), [(1, 2, 0), (1, 2, 1)]) + self.assertTrue(fs_1.done()) \ No newline at end of file From 848da8895f93934e6bb9052915edabca8ad00357 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sun, 27 Jul 2025 14:48:01 +0200 Subject: [PATCH 07/30] core validation is only possible on compute node not on login node --- .../interactive/slurmspawner.py | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/executorlib/task_scheduler/interactive/slurmspawner.py b/executorlib/task_scheduler/interactive/slurmspawner.py index 8426012d..2c90f1ed 100644 --- a/executorlib/task_scheduler/interactive/slurmspawner.py +++ b/executorlib/task_scheduler/interactive/slurmspawner.py @@ -7,17 +7,17 @@ def validate_max_workers(max_workers: int, cores: int, threads_per_core: int): - cores_total = int(os.environ["SLURM_NTASKS"]) * int( - os.environ["SLURM_CPUS_PER_TASK"] - ) - cores_requested = max_workers * cores * threads_per_core - if cores_total < cores_requested: - raise ValueError( - "The number of requested cores is larger than the available cores " - + str(cores_total) - + " < " - + str(cores_requested) - ) + env = os.environ + if "SLURM_NTASKS" in env and "SLRUM_CPUS_PER_TASK" in env: + cores_total = int(env["SLURM_NTASKS"]) * int(env["SLURM_CPUS_PER_TASK"]) + cores_requested = max_workers * cores * threads_per_core + if cores_total < cores_requested: + raise ValueError( + "The number of requested cores is larger than the available cores " + + str(cores_total) + + " < " + + str(cores_requested) + ) class SrunSpawner(SubprocessSpawner): From 48000d4316a50669220872f44358dd1979b8c97e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sun, 27 Jul 2025 14:56:11 +0200 Subject: [PATCH 08/30] fix test --- executorlib/task_scheduler/interactive/slurmspawner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/executorlib/task_scheduler/interactive/slurmspawner.py b/executorlib/task_scheduler/interactive/slurmspawner.py index 2c90f1ed..87b4850b 100644 --- a/executorlib/task_scheduler/interactive/slurmspawner.py +++ b/executorlib/task_scheduler/interactive/slurmspawner.py @@ -8,7 +8,7 @@ def validate_max_workers(max_workers: int, cores: int, threads_per_core: int): env = os.environ - if "SLURM_NTASKS" in env and "SLRUM_CPUS_PER_TASK" in env: + if "SLURM_NTASKS" in env and "SLURM_CPUS_PER_TASK" in env: cores_total = int(env["SLURM_NTASKS"]) * int(env["SLURM_CPUS_PER_TASK"]) cores_requested = max_workers * cores * threads_per_core if cores_total < cores_requested: From a86b5df962304ed32a35e248fe579fa00e77c2b0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sun, 27 Jul 2025 15:04:57 +0200 Subject: [PATCH 09/30] enforce pmix for testing --- executorlib/task_scheduler/interactive/slurmspawner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/executorlib/task_scheduler/interactive/slurmspawner.py b/executorlib/task_scheduler/interactive/slurmspawner.py index 87b4850b..9a5e0f20 100644 --- a/executorlib/task_scheduler/interactive/slurmspawner.py +++ b/executorlib/task_scheduler/interactive/slurmspawner.py @@ -107,7 +107,7 @@ def generate_slurm_command( Returns: list[str]: The generated command list. """ - command_prepend_lst = [SLURM_COMMAND, "-n", str(cores)] + command_prepend_lst = [SLURM_COMMAND, "-n", str(cores), "--mpi=pmix"] if cwd is not None: command_prepend_lst += ["-D", cwd] if num_nodes is not None: From 2990d11c61d819fdfdbda1d433ca4a69c21ec1a6 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Sun, 27 Jul 2025 15:16:01 +0200 Subject: [PATCH 10/30] Update slurmspawner.py --- executorlib/task_scheduler/interactive/slurmspawner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/executorlib/task_scheduler/interactive/slurmspawner.py b/executorlib/task_scheduler/interactive/slurmspawner.py index 9a5e0f20..326d3ff5 100644 --- a/executorlib/task_scheduler/interactive/slurmspawner.py +++ b/executorlib/task_scheduler/interactive/slurmspawner.py @@ -107,7 +107,7 @@ def generate_slurm_command( Returns: list[str]: The generated command list. """ - command_prepend_lst = [SLURM_COMMAND, "-n", str(cores), "--mpi=pmix"] + command_prepend_lst = [SLURM_COMMAND, "-n", str(cores), "--mpi=pmi2"] if cwd is not None: command_prepend_lst += ["-D", cwd] if num_nodes is not None: From dbcd364667a77031b37bcfa9c1c970f9fd161641 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sun, 27 Jul 2025 15:28:29 +0200 Subject: [PATCH 11/30] downgrade to openmpi --- .ci_support/environment-slurm.yml | 16 ++++++++++++++++ .github/workflows/dependabot.yml | 1 + 2 files changed, 17 insertions(+) create mode 100644 .ci_support/environment-slurm.yml diff --git a/.ci_support/environment-slurm.yml b/.ci_support/environment-slurm.yml new file mode 100644 index 00000000..8cbc7023 --- /dev/null +++ b/.ci_support/environment-slurm.yml @@ -0,0 +1,16 @@ +channels: +- conda-forge +dependencies: +- python +- numpy +- openmpi =4.1.6 +- cloudpickle =3.1.1 +- mpi4py =4.0.1 +- pyzmq =27.0.0 +- h5py =3.14.0 +- networkx =3.4.2 +- pygraphviz =1.14 +- pysqa =0.2.7 +- ipython =9.0.2 +- hatchling =1.27.0 +- hatch-vcs =0.5.0 \ No newline at end of file diff --git a/.github/workflows/dependabot.yml b/.github/workflows/dependabot.yml index 0641ddfe..ed622f68 100644 --- a/.github/workflows/dependabot.yml +++ b/.github/workflows/dependabot.yml @@ -28,6 +28,7 @@ jobs: sed -i "/${package}/s/${from}/${to}/g" .ci_support/environment-docs.yml sed -i "/${package}/s/${from}/${to}/g" .ci_support/environment-mini.yml sed -i "/${package}/s/${from}/${to}/g" .ci_support/environment-integration.yml + sed -i "/${package}/s/${from}/${to}/g" .ci_support/environment-slurm.yml sed -i "/${package}/s/${from}/${to}/g" binder/environment.yml - name: UpdateDependabotPR commit run: | From 4b374b46f1be6f2aaa5fd1874f54d26e456408a5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sun, 27 Jul 2025 15:30:56 +0200 Subject: [PATCH 12/30] remove pmi setting --- executorlib/task_scheduler/interactive/slurmspawner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/executorlib/task_scheduler/interactive/slurmspawner.py b/executorlib/task_scheduler/interactive/slurmspawner.py index 326d3ff5..87b4850b 100644 --- a/executorlib/task_scheduler/interactive/slurmspawner.py +++ b/executorlib/task_scheduler/interactive/slurmspawner.py @@ -107,7 +107,7 @@ def generate_slurm_command( Returns: list[str]: The generated command list. """ - command_prepend_lst = [SLURM_COMMAND, "-n", str(cores), "--mpi=pmi2"] + command_prepend_lst = [SLURM_COMMAND, "-n", str(cores)] if cwd is not None: command_prepend_lst += ["-D", cwd] if num_nodes is not None: From 1571c6a7c7d744180cf9542375c7b39cf9fc4395 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sun, 27 Jul 2025 15:37:28 +0200 Subject: [PATCH 13/30] use slurm environment --- .github/workflows/pipeline.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/pipeline.yml b/.github/workflows/pipeline.yml index f4a1ff21..8ef8dc87 100644 --- a/.github/workflows/pipeline.yml +++ b/.github/workflows/pipeline.yml @@ -323,7 +323,7 @@ jobs: python-version: '3.13' miniforge-version: latest condarc-file: .condarc - environment-file: .ci_support/environment-openmpi.yml + environment-file: .ci_support/environment-slurm.yml - name: Test shell: bash -l {0} timeout-minutes: 5 From b3b393c00c44c1c7532d0ae299949dfd90af5e7a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sun, 27 Jul 2025 15:39:13 +0200 Subject: [PATCH 14/30] Try pmix again --- .ci_support/environment-slurm.yml | 16 ---------------- .github/workflows/dependabot.yml | 1 - .github/workflows/pipeline.yml | 2 +- .../task_scheduler/interactive/slurmspawner.py | 2 +- 4 files changed, 2 insertions(+), 19 deletions(-) delete mode 100644 .ci_support/environment-slurm.yml diff --git a/.ci_support/environment-slurm.yml b/.ci_support/environment-slurm.yml deleted file mode 100644 index 8cbc7023..00000000 --- a/.ci_support/environment-slurm.yml +++ /dev/null @@ -1,16 +0,0 @@ -channels: -- conda-forge -dependencies: -- python -- numpy -- openmpi =4.1.6 -- cloudpickle =3.1.1 -- mpi4py =4.0.1 -- pyzmq =27.0.0 -- h5py =3.14.0 -- networkx =3.4.2 -- pygraphviz =1.14 -- pysqa =0.2.7 -- ipython =9.0.2 -- hatchling =1.27.0 -- hatch-vcs =0.5.0 \ No newline at end of file diff --git a/.github/workflows/dependabot.yml b/.github/workflows/dependabot.yml index ed622f68..0641ddfe 100644 --- a/.github/workflows/dependabot.yml +++ b/.github/workflows/dependabot.yml @@ -28,7 +28,6 @@ jobs: sed -i "/${package}/s/${from}/${to}/g" .ci_support/environment-docs.yml sed -i "/${package}/s/${from}/${to}/g" .ci_support/environment-mini.yml sed -i "/${package}/s/${from}/${to}/g" .ci_support/environment-integration.yml - sed -i "/${package}/s/${from}/${to}/g" .ci_support/environment-slurm.yml sed -i "/${package}/s/${from}/${to}/g" binder/environment.yml - name: UpdateDependabotPR commit run: | diff --git a/.github/workflows/pipeline.yml b/.github/workflows/pipeline.yml index 8ef8dc87..f4a1ff21 100644 --- a/.github/workflows/pipeline.yml +++ b/.github/workflows/pipeline.yml @@ -323,7 +323,7 @@ jobs: python-version: '3.13' miniforge-version: latest condarc-file: .condarc - environment-file: .ci_support/environment-slurm.yml + environment-file: .ci_support/environment-openmpi.yml - name: Test shell: bash -l {0} timeout-minutes: 5 diff --git a/executorlib/task_scheduler/interactive/slurmspawner.py b/executorlib/task_scheduler/interactive/slurmspawner.py index 87b4850b..57784dd1 100644 --- a/executorlib/task_scheduler/interactive/slurmspawner.py +++ b/executorlib/task_scheduler/interactive/slurmspawner.py @@ -107,7 +107,7 @@ def generate_slurm_command( Returns: list[str]: The generated command list. """ - command_prepend_lst = [SLURM_COMMAND, "-n", str(cores)] + command_prepend_lst = [SLURM_COMMAND, "-n", str(cores), "--mpi=pmix_v5"] if cwd is not None: command_prepend_lst += ["-D", cwd] if num_nodes is not None: From 0dc0eccb7edd965d0178c949484a6d2849faa904 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Sun, 27 Jul 2025 15:53:55 +0200 Subject: [PATCH 15/30] Update slurmspawner.py --- executorlib/task_scheduler/interactive/slurmspawner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/executorlib/task_scheduler/interactive/slurmspawner.py b/executorlib/task_scheduler/interactive/slurmspawner.py index 57784dd1..326d3ff5 100644 --- a/executorlib/task_scheduler/interactive/slurmspawner.py +++ b/executorlib/task_scheduler/interactive/slurmspawner.py @@ -107,7 +107,7 @@ def generate_slurm_command( Returns: list[str]: The generated command list. """ - command_prepend_lst = [SLURM_COMMAND, "-n", str(cores), "--mpi=pmix_v5"] + command_prepend_lst = [SLURM_COMMAND, "-n", str(cores), "--mpi=pmi2"] if cwd is not None: command_prepend_lst += ["-D", cwd] if num_nodes is not None: From 97125e53a38f229efa3dde030ff52fcd954cd174 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Sun, 27 Jul 2025 17:32:08 +0200 Subject: [PATCH 16/30] Update slurmspawner.py --- executorlib/task_scheduler/interactive/slurmspawner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/executorlib/task_scheduler/interactive/slurmspawner.py b/executorlib/task_scheduler/interactive/slurmspawner.py index 326d3ff5..9a5e0f20 100644 --- a/executorlib/task_scheduler/interactive/slurmspawner.py +++ b/executorlib/task_scheduler/interactive/slurmspawner.py @@ -107,7 +107,7 @@ def generate_slurm_command( Returns: list[str]: The generated command list. """ - command_prepend_lst = [SLURM_COMMAND, "-n", str(cores), "--mpi=pmi2"] + command_prepend_lst = [SLURM_COMMAND, "-n", str(cores), "--mpi=pmix"] if cwd is not None: command_prepend_lst += ["-D", cwd] if num_nodes is not None: From 58d152fad67cef109c29cdc5a76bc0204ab1295f Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Sun, 27 Jul 2025 17:41:45 +0200 Subject: [PATCH 17/30] Update pipeline.yml --- .github/workflows/pipeline.yml | 61 ++++++++++++++++++++++++++++++++++ 1 file changed, 61 insertions(+) diff --git a/.github/workflows/pipeline.yml b/.github/workflows/pipeline.yml index f4a1ff21..0c760cee 100644 --- a/.github/workflows/pipeline.yml +++ b/.github/workflows/pipeline.yml @@ -280,6 +280,8 @@ jobs: - uses: actions/checkout@v4 - uses: koesterlab/setup-slurm-action@v1 timeout-minutes: 5 + - name: ubnuntu install + run: sudo apt install -y mpich - name: Conda config shell: bash -l {0} run: echo -e "channels:\n - conda-forge\n" > .condarc @@ -300,6 +302,35 @@ jobs: python -m unittest test_slurmclusterexecutor.py python -m unittest test_slurmjobexecutor.py + unittest_slurm_mpich_pip: + needs: [black] + runs-on: ubuntu-latest + services: + mysql: + image: mysql:8.0 + env: + MYSQL_ROOT_PASSWORD: root + ports: + - "8888:3306" + options: --health-cmd="mysqladmin ping" --health-interval=10s --health-timeout=5s --health-retries=3 + steps: + - uses: actions/checkout@v4 + - uses: koesterlab/setup-slurm-action@v1 + timeout-minutes: 5 + - name: ubnuntu install + run: sudo apt install -y mpich + - name: Test + shell: bash -l {0} + timeout-minutes: 5 + run: | + pip install mpi4py + pip install . + cd tests + sinfo -o "%n %e %m %a %c %C" + srun --mpi=list + python -m unittest test_slurmclusterexecutor.py + python -m unittest test_slurmjobexecutor.py + unittest_slurm_openmpi: needs: [black] runs-on: ubuntu-latest @@ -315,6 +346,8 @@ jobs: - uses: actions/checkout@v4 - uses: koesterlab/setup-slurm-action@v1 timeout-minutes: 5 + - name: ubnuntu install + run: sudo apt install -y openmpi-bin openmpi-common libopenmpi-dev - name: Conda config shell: bash -l {0} run: echo -e "channels:\n - conda-forge\n" > .condarc @@ -334,6 +367,34 @@ jobs: srun --mpi=list python -m unittest test_slurmjobexecutor.py + unittest_slurm_openmpi_pip: + needs: [black] + runs-on: ubuntu-latest + services: + mysql: + image: mysql:8.0 + env: + MYSQL_ROOT_PASSWORD: root + ports: + - "8888:3306" + options: --health-cmd="mysqladmin ping" --health-interval=10s --health-timeout=5s --health-retries=3 + steps: + - uses: actions/checkout@v4 + - uses: koesterlab/setup-slurm-action@v1 + timeout-minutes: 5 + - name: ubnuntu install + run: sudo apt install -y openmpi-bin openmpi-common libopenmpi-dev + - name: Test + shell: bash -l {0} + timeout-minutes: 5 + run: | + pip install mpi4py + pip install . + cd tests + sinfo -o "%n %e %m %a %c %C" + srun --mpi=list + python -m unittest test_slurmjobexecutor.py + unittest_mpich: needs: [black] runs-on: ${{ matrix.operating-system }} From fddf4a647897699fc437c88c74e827f75521bc9a Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Sun, 27 Jul 2025 17:51:46 +0200 Subject: [PATCH 18/30] Update pipeline.yml --- .github/workflows/pipeline.yml | 2 -- 1 file changed, 2 deletions(-) diff --git a/.github/workflows/pipeline.yml b/.github/workflows/pipeline.yml index 0c760cee..b3c90e67 100644 --- a/.github/workflows/pipeline.yml +++ b/.github/workflows/pipeline.yml @@ -299,7 +299,6 @@ jobs: cd tests sinfo -o "%n %e %m %a %c %C" srun --mpi=list - python -m unittest test_slurmclusterexecutor.py python -m unittest test_slurmjobexecutor.py unittest_slurm_mpich_pip: @@ -328,7 +327,6 @@ jobs: cd tests sinfo -o "%n %e %m %a %c %C" srun --mpi=list - python -m unittest test_slurmclusterexecutor.py python -m unittest test_slurmjobexecutor.py unittest_slurm_openmpi: From 0d248b7c49a1f8b433b488a49d71296b8795f9df Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Sun, 27 Jul 2025 17:58:47 +0200 Subject: [PATCH 19/30] Update slurmspawner.py --- executorlib/task_scheduler/interactive/slurmspawner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/executorlib/task_scheduler/interactive/slurmspawner.py b/executorlib/task_scheduler/interactive/slurmspawner.py index 9a5e0f20..326d3ff5 100644 --- a/executorlib/task_scheduler/interactive/slurmspawner.py +++ b/executorlib/task_scheduler/interactive/slurmspawner.py @@ -107,7 +107,7 @@ def generate_slurm_command( Returns: list[str]: The generated command list. """ - command_prepend_lst = [SLURM_COMMAND, "-n", str(cores), "--mpi=pmix"] + command_prepend_lst = [SLURM_COMMAND, "-n", str(cores), "--mpi=pmi2"] if cwd is not None: command_prepend_lst += ["-D", cwd] if num_nodes is not None: From 0c2d3f7cc07f7cc288e6eb93350a29cc35802aa7 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Sun, 27 Jul 2025 18:08:50 +0200 Subject: [PATCH 20/30] Update pipeline.yml --- .github/workflows/pipeline.yml | 92 ---------------------------------- 1 file changed, 92 deletions(-) diff --git a/.github/workflows/pipeline.yml b/.github/workflows/pipeline.yml index b3c90e67..14cc68f5 100644 --- a/.github/workflows/pipeline.yml +++ b/.github/workflows/pipeline.yml @@ -301,98 +301,6 @@ jobs: srun --mpi=list python -m unittest test_slurmjobexecutor.py - unittest_slurm_mpich_pip: - needs: [black] - runs-on: ubuntu-latest - services: - mysql: - image: mysql:8.0 - env: - MYSQL_ROOT_PASSWORD: root - ports: - - "8888:3306" - options: --health-cmd="mysqladmin ping" --health-interval=10s --health-timeout=5s --health-retries=3 - steps: - - uses: actions/checkout@v4 - - uses: koesterlab/setup-slurm-action@v1 - timeout-minutes: 5 - - name: ubnuntu install - run: sudo apt install -y mpich - - name: Test - shell: bash -l {0} - timeout-minutes: 5 - run: | - pip install mpi4py - pip install . - cd tests - sinfo -o "%n %e %m %a %c %C" - srun --mpi=list - python -m unittest test_slurmjobexecutor.py - - unittest_slurm_openmpi: - needs: [black] - runs-on: ubuntu-latest - services: - mysql: - image: mysql:8.0 - env: - MYSQL_ROOT_PASSWORD: root - ports: - - "8888:3306" - options: --health-cmd="mysqladmin ping" --health-interval=10s --health-timeout=5s --health-retries=3 - steps: - - uses: actions/checkout@v4 - - uses: koesterlab/setup-slurm-action@v1 - timeout-minutes: 5 - - name: ubnuntu install - run: sudo apt install -y openmpi-bin openmpi-common libopenmpi-dev - - name: Conda config - shell: bash -l {0} - run: echo -e "channels:\n - conda-forge\n" > .condarc - - uses: conda-incubator/setup-miniconda@v3 - with: - python-version: '3.13' - miniforge-version: latest - condarc-file: .condarc - environment-file: .ci_support/environment-openmpi.yml - - name: Test - shell: bash -l {0} - timeout-minutes: 5 - run: | - pip install . --no-deps --no-build-isolation - cd tests - sinfo -o "%n %e %m %a %c %C" - srun --mpi=list - python -m unittest test_slurmjobexecutor.py - - unittest_slurm_openmpi_pip: - needs: [black] - runs-on: ubuntu-latest - services: - mysql: - image: mysql:8.0 - env: - MYSQL_ROOT_PASSWORD: root - ports: - - "8888:3306" - options: --health-cmd="mysqladmin ping" --health-interval=10s --health-timeout=5s --health-retries=3 - steps: - - uses: actions/checkout@v4 - - uses: koesterlab/setup-slurm-action@v1 - timeout-minutes: 5 - - name: ubnuntu install - run: sudo apt install -y openmpi-bin openmpi-common libopenmpi-dev - - name: Test - shell: bash -l {0} - timeout-minutes: 5 - run: | - pip install mpi4py - pip install . - cd tests - sinfo -o "%n %e %m %a %c %C" - srun --mpi=list - python -m unittest test_slurmjobexecutor.py - unittest_mpich: needs: [black] runs-on: ${{ matrix.operating-system }} From 1c89a16c87d2a24c7b16dd181f5a17ec5217d862 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sun, 27 Jul 2025 18:33:51 +0200 Subject: [PATCH 21/30] use slrum args --- executorlib/task_scheduler/interactive/slurmspawner.py | 2 +- tests/test_slurmjobexecutor.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/executorlib/task_scheduler/interactive/slurmspawner.py b/executorlib/task_scheduler/interactive/slurmspawner.py index 326d3ff5..87b4850b 100644 --- a/executorlib/task_scheduler/interactive/slurmspawner.py +++ b/executorlib/task_scheduler/interactive/slurmspawner.py @@ -107,7 +107,7 @@ def generate_slurm_command( Returns: list[str]: The generated command list. """ - command_prepend_lst = [SLURM_COMMAND, "-n", str(cores), "--mpi=pmi2"] + command_prepend_lst = [SLURM_COMMAND, "-n", str(cores)] if cwd is not None: command_prepend_lst += ["-D", cwd] if num_nodes is not None: diff --git a/tests/test_slurmjobexecutor.py b/tests/test_slurmjobexecutor.py index 15814a4d..155b8b6d 100644 --- a/tests/test_slurmjobexecutor.py +++ b/tests/test_slurmjobexecutor.py @@ -27,7 +27,7 @@ def mpi_funct(i): ) class TestSlurmBackend(unittest.TestCase): def test_slurm_executor_serial(self): - with SlurmJobExecutor() as exe: + with SlurmJobExecutor(resource_dict={"slurm_cmd_args": ["--mpi=pmi2"]}) as exe: fs_1 = exe.submit(calc, 1) fs_2 = exe.submit(calc, 2) self.assertEqual(fs_1.result(), 1) @@ -38,7 +38,7 @@ def test_slurm_executor_serial(self): def test_slurm_executor_parallel(self): with SlurmJobExecutor( max_cores=2, - resource_dict={"cores": 2}, + resource_dict={"cores": 2, "slurm_cmd_args": ["--mpi=pmi2"]}, block_allocation=True, ) as exe: fs_1 = exe.submit(mpi_funct, 1) From fcf4394ee94ccf482c44d290e60904f0bb9f25a8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sun, 27 Jul 2025 18:46:41 +0200 Subject: [PATCH 22/30] extend test --- .github/workflows/pipeline.yml | 1 + executorlib/standalone/command.py | 2 +- tests/test_slurmclusterexecutor.py | 2 +- 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/.github/workflows/pipeline.yml b/.github/workflows/pipeline.yml index 14cc68f5..59420739 100644 --- a/.github/workflows/pipeline.yml +++ b/.github/workflows/pipeline.yml @@ -300,6 +300,7 @@ jobs: sinfo -o "%n %e %m %a %c %C" srun --mpi=list python -m unittest test_slurmjobexecutor.py + python -m unittest test_slurmclusterexecutor.py unittest_mpich: needs: [black] diff --git a/executorlib/standalone/command.py b/executorlib/standalone/command.py index de5567ba..926cfef6 100644 --- a/executorlib/standalone/command.py +++ b/executorlib/standalone/command.py @@ -45,7 +45,7 @@ def get_cache_execute_command( ) elif backend == "slurm": command_lst = ( - ["srun", "-n", str(cores)] + ["srun", "-n", str(cores), "--mpi=pmi2"] + command_lst + [get_command_path(executable="cache_parallel.py"), file_name] ) diff --git a/tests/test_slurmclusterexecutor.py b/tests/test_slurmclusterexecutor.py index 41d0f94b..7b07c414 100644 --- a/tests/test_slurmclusterexecutor.py +++ b/tests/test_slurmclusterexecutor.py @@ -26,7 +26,7 @@ #SBATCH --job-name={{job_name}} #SBATCH --chdir={{working_directory}} #SBATCH --get-user-env=L -#SBATCH --cpus-per-task={{cores}} +#SBATCH --ntasks={{cores}} {{command}} """ From 1d33bb5a5049f489ae83c324fd7313771287eff2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sun, 27 Jul 2025 18:48:09 +0200 Subject: [PATCH 23/30] fix tests --- tests/test_standalone_command.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/tests/test_standalone_command.py b/tests/test_standalone_command.py index eeb8288e..12ae84d7 100644 --- a/tests/test_standalone_command.py +++ b/tests/test_standalone_command.py @@ -55,9 +55,10 @@ def test_get_cache_execute_command_parallel(self): self.assertEqual(output[0], "srun") self.assertEqual(output[1], "-n") self.assertEqual(output[2], str(2)) - self.assertEqual(output[3], sys.executable) - self.assertEqual(output[4].split(os.sep)[-1], "cache_parallel.py") - self.assertEqual(output[5], file_name) + self.assertEqual(output[3], "--mpi=pmi2") + self.assertEqual(output[4], sys.executable) + self.assertEqual(output[5].split(os.sep)[-1], "cache_parallel.py") + self.assertEqual(output[6], file_name) output = get_cache_execute_command(cores=2, file_name=file_name, backend="flux") self.assertEqual(output[0], "flux") self.assertEqual(output[1], "run") From 3e08db9ea042f1ddbfcdc22554e1efd43fd2effa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sun, 27 Jul 2025 19:16:02 +0200 Subject: [PATCH 24/30] Add executor_pmi_mode option --- executorlib/executor/flux.py | 2 +- executorlib/executor/single.py | 2 +- executorlib/executor/slurm.py | 5 +++- executorlib/standalone/command.py | 24 ++++++++----------- executorlib/task_scheduler/file/shared.py | 6 ++--- .../task_scheduler/file/task_scheduler.py | 12 +++++----- .../interactive/slurmspawner.py | 8 +++++++ tests/test_standalone_command.py | 2 +- 8 files changed, 34 insertions(+), 27 deletions(-) diff --git a/executorlib/executor/flux.py b/executorlib/executor/flux.py index f236850d..b0c6b207 100644 --- a/executorlib/executor/flux.py +++ b/executorlib/executor/flux.py @@ -369,7 +369,7 @@ def __init__( cache_directory=cache_directory, resource_dict=resource_dict, flux_executor=None, - flux_executor_pmi_mode=flux_executor_pmi_mode, + executor_pmi_mode=flux_executor_pmi_mode, flux_executor_nesting=False, flux_log_files=False, pysqa_config_directory=pysqa_config_directory, diff --git a/executorlib/executor/single.py b/executorlib/executor/single.py index 9ad40c13..bd407fab 100644 --- a/executorlib/executor/single.py +++ b/executorlib/executor/single.py @@ -329,7 +329,7 @@ def __init__( cache_directory=cache_directory, resource_dict=resource_dict, flux_executor=None, - flux_executor_pmi_mode=None, + executor_pmi_mode=None, flux_executor_nesting=False, flux_log_files=False, pysqa_config_directory=None, diff --git a/executorlib/executor/slurm.py b/executorlib/executor/slurm.py index c0353a97..b5da706e 100644 --- a/executorlib/executor/slurm.py +++ b/executorlib/executor/slurm.py @@ -174,7 +174,7 @@ def __init__( cache_directory=cache_directory, resource_dict=resource_dict, flux_executor=None, - flux_executor_pmi_mode=None, + executor_pmi_mode=None, flux_executor_nesting=False, flux_log_files=False, pysqa_config_directory=pysqa_config_directory, @@ -393,6 +393,7 @@ def create_slurm_executor( block_allocation: bool = False, init_function: Optional[Callable] = None, log_obj_size: bool = False, + executor_pmi_mode: Optional[str] = None, ) -> Union[OneProcessTaskScheduler, BlockAllocationTaskScheduler]: """ Create a SLURM executor @@ -431,6 +432,7 @@ def create_slurm_executor( of the individual function. init_function (None): optional function to preset arguments for functions which are submitted later log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. + executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None Returns: InteractiveStepExecutor/ InteractiveExecutor @@ -441,6 +443,7 @@ def create_slurm_executor( resource_dict["cache_directory"] = cache_directory resource_dict["hostname_localhost"] = hostname_localhost resource_dict["log_obj_size"] = log_obj_size + resource_dict["executor_pmi_mode"] = executor_pmi_mode check_init_function(block_allocation=block_allocation, init_function=init_function) if block_allocation: resource_dict["init_function"] = init_function diff --git a/executorlib/standalone/command.py b/executorlib/standalone/command.py index 926cfef6..fdb0e6ea 100644 --- a/executorlib/standalone/command.py +++ b/executorlib/standalone/command.py @@ -21,7 +21,7 @@ def get_cache_execute_command( file_name: str, cores: int = 1, backend: Optional[str] = None, - flux_executor_pmi_mode: Optional[str] = None, + executor_pmi_mode: Optional[str] = None, ) -> list: """ Get command to call backend as a list of two strings @@ -30,7 +30,7 @@ def get_cache_execute_command( file_name (str): The name of the file. cores (int, optional): Number of cores used to execute the task. Defaults to 1. backend (str, optional): name of the backend used to spawn tasks ["slurm", "flux"]. - flux_executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only) + executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only) Returns: list[str]: List of strings containing the python executable path and the backend script to execute @@ -44,25 +44,21 @@ def get_cache_execute_command( + [get_command_path(executable="cache_parallel.py"), file_name] ) elif backend == "slurm": + command_prepend = ["srun", "-n", str(cores)] + if executor_pmi_mode is not None: + command_prepend += ["--mpi=pmi2"] command_lst = ( - ["srun", "-n", str(cores), "--mpi=pmi2"] + command_prepend + command_lst + [get_command_path(executable="cache_parallel.py"), file_name] ) elif backend == "flux": - if flux_executor_pmi_mode is not None: - flux_command = [ - "flux", - "run", - "-o", - "pmi=" + flux_executor_pmi_mode, - "-n", - str(cores), - ] - else: - flux_command = ["flux", "run", "-n", str(cores)] + flux_command = ["flux", "run"] + if executor_pmi_mode is not None: + flux_command += ["pmi=" + executor_pmi_mode] command_lst = ( flux_command + + ["-n", str(cores)] + command_lst + [get_command_path(executable="cache_parallel.py"), file_name] ) diff --git a/executorlib/task_scheduler/file/shared.py b/executorlib/task_scheduler/file/shared.py index c4f82527..81922733 100644 --- a/executorlib/task_scheduler/file/shared.py +++ b/executorlib/task_scheduler/file/shared.py @@ -57,7 +57,7 @@ def execute_tasks_h5( pysqa_config_directory: Optional[str] = None, backend: Optional[str] = None, disable_dependencies: bool = False, - flux_executor_pmi_mode: Optional[str] = None, + executor_pmi_mode: Optional[str] = None, ) -> None: """ Execute tasks stored in a queue using HDF5 files. @@ -72,7 +72,7 @@ def execute_tasks_h5( pysqa_config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend). backend (str, optional): name of the backend used to spawn tasks. disable_dependencies (boolean): Disable resolving future objects during the submission. - flux_executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only) + executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only) Returns: None @@ -157,7 +157,7 @@ def execute_tasks_h5( file_name=file_name, cores=task_resource_dict["cores"], backend=backend, - flux_executor_pmi_mode=flux_executor_pmi_mode, + executor_pmi_mode=executor_pmi_mode, ), file_name=file_name, data_dict=data_dict, diff --git a/executorlib/task_scheduler/file/task_scheduler.py b/executorlib/task_scheduler/file/task_scheduler.py index d836211c..7204036a 100644 --- a/executorlib/task_scheduler/file/task_scheduler.py +++ b/executorlib/task_scheduler/file/task_scheduler.py @@ -34,7 +34,7 @@ def __init__( pysqa_config_directory: Optional[str] = None, backend: Optional[str] = None, disable_dependencies: bool = False, - flux_executor_pmi_mode: Optional[str] = None, + executor_pmi_mode: Optional[str] = None, ): """ Initialize the FileExecutor. @@ -49,7 +49,7 @@ def __init__( pysqa_config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend). backend (str, optional): name of the backend used to spawn tasks. disable_dependencies (boolean): Disable resolving future objects during the submission. - flux_executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only) + executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None """ super().__init__(max_cores=None) default_resource_dict = { @@ -70,7 +70,7 @@ def __init__( "pysqa_config_directory": pysqa_config_directory, "backend": backend, "disable_dependencies": disable_dependencies, - "flux_executor_pmi_mode": flux_executor_pmi_mode, + "executor_pmi_mode": executor_pmi_mode, } self._set_process( Thread( @@ -87,7 +87,7 @@ def create_file_executor( max_cores: Optional[int] = None, cache_directory: Optional[str] = None, flux_executor=None, - flux_executor_pmi_mode: Optional[str] = None, + executor_pmi_mode: Optional[str] = None, flux_executor_nesting: bool = False, flux_log_files: bool = False, pysqa_config_directory: Optional[str] = None, @@ -108,7 +108,7 @@ def create_file_executor( if cache_directory is not None: resource_dict["cache_directory"] = cache_directory if backend != "flux": - check_flux_executor_pmi_mode(flux_executor_pmi_mode=flux_executor_pmi_mode) + check_flux_executor_pmi_mode(flux_executor_pmi_mode=executor_pmi_mode) check_max_workers_and_cores(max_cores=max_cores, max_workers=max_workers) check_hostname_localhost(hostname_localhost=hostname_localhost) check_executor(executor=flux_executor) @@ -125,5 +125,5 @@ def create_file_executor( disable_dependencies=disable_dependencies, execute_function=execute_function, terminate_function=terminate_function, - flux_executor_pmi_mode=flux_executor_pmi_mode, + executor_pmi_mode=executor_pmi_mode, ) diff --git a/executorlib/task_scheduler/interactive/slurmspawner.py b/executorlib/task_scheduler/interactive/slurmspawner.py index 87b4850b..9bbc5f1f 100644 --- a/executorlib/task_scheduler/interactive/slurmspawner.py +++ b/executorlib/task_scheduler/interactive/slurmspawner.py @@ -31,6 +31,7 @@ def __init__( exclusive: bool = False, openmpi_oversubscribe: bool = False, slurm_cmd_args: Optional[list[str]] = None, + executor_pmi_mode: Optional[str] = None, ): """ Srun interface implementation. @@ -44,6 +45,7 @@ def __init__( exclusive (bool): Whether to exclusively reserve the compute nodes, or allow sharing compute notes. Defaults to False. openmpi_oversubscribe (bool, optional): Whether to oversubscribe the cores. Defaults to False. slurm_cmd_args (list[str], optional): Additional command line arguments. Defaults to []. + executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None """ super().__init__( cwd=cwd, @@ -55,6 +57,7 @@ def __init__( self._slurm_cmd_args = slurm_cmd_args self._num_nodes = num_nodes self._exclusive = exclusive + self._executor_pmi_mode = executor_pmi_mode def generate_command(self, command_lst: list[str]) -> list[str]: """ @@ -75,6 +78,7 @@ def generate_command(self, command_lst: list[str]) -> list[str]: exclusive=self._exclusive, openmpi_oversubscribe=self._openmpi_oversubscribe, slurm_cmd_args=self._slurm_cmd_args, + executor_pmi_mode=self._executor_pmi_mode, ) return super().generate_command( command_lst=command_prepend_lst + command_lst, @@ -90,6 +94,7 @@ def generate_slurm_command( exclusive: bool = False, openmpi_oversubscribe: bool = False, slurm_cmd_args: Optional[list[str]] = None, + executor_pmi_mode: Optional[str] = None, ) -> list[str]: """ Generate the command list for the SLURM interface. @@ -103,6 +108,7 @@ def generate_slurm_command( exclusive (bool): Whether to exclusively reserve the compute nodes, or allow sharing compute notes. Defaults to False. openmpi_oversubscribe (bool, optional): Whether to oversubscribe the cores. Defaults to False. slurm_cmd_args (list[str], optional): Additional command line arguments. Defaults to []. + executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None Returns: list[str]: The generated command list. @@ -110,6 +116,8 @@ def generate_slurm_command( command_prepend_lst = [SLURM_COMMAND, "-n", str(cores)] if cwd is not None: command_prepend_lst += ["-D", cwd] + if executor_pmi_mode is not None: + command_prepend_lst += ["--mpi=" + executor_pmi_mode] if num_nodes is not None: command_prepend_lst += ["-N", str(num_nodes)] if threads_per_core > 1: diff --git a/tests/test_standalone_command.py b/tests/test_standalone_command.py index 12ae84d7..bf6b24ef 100644 --- a/tests/test_standalone_command.py +++ b/tests/test_standalone_command.py @@ -67,7 +67,7 @@ def test_get_cache_execute_command_parallel(self): self.assertEqual(output[4], sys.executable) self.assertEqual(output[5].split(os.sep)[-1], "cache_parallel.py") self.assertEqual(output[6], file_name) - output = get_cache_execute_command(cores=2, file_name=file_name, backend="flux", flux_executor_pmi_mode="pmix") + output = get_cache_execute_command(cores=2, file_name=file_name, backend="flux", executor_pmi_mode="pmix") self.assertEqual(output[0], "flux") self.assertEqual(output[1], "run") self.assertEqual(output[2], "-o") From f4102d8ac04029c52ef9dfdf953664210ba0ca09 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sun, 27 Jul 2025 19:49:31 +0200 Subject: [PATCH 25/30] check all files --- docs/installation.md | 2 +- executorlib/executor/flux.py | 28 +- executorlib/executor/slurm.py | 14 +- executorlib/standalone/inputcheck.py | 6 +- .../task_scheduler/file/task_scheduler.py | 6 +- .../task_scheduler/interactive/fluxspawner.py | 10 +- notebooks/3-hpc-job.ipynb | 502 +++++++++++++++++- tests/test_fluxclusterexecutor.py | 8 +- tests/test_fluxjobexecutor.py | 4 +- tests/test_fluxpythonspawner.py | 4 +- tests/test_slurmclusterexecutor.py | 4 + tests/test_standalone_inputcheck.py | 6 +- 12 files changed, 553 insertions(+), 41 deletions(-) diff --git a/docs/installation.md b/docs/installation.md index 5eec393a..14a3948d 100644 --- a/docs/installation.md +++ b/docs/installation.md @@ -120,7 +120,7 @@ For the version 5 of openmpi the backend changed to `pmix`, this requires the ad ``` conda install -c conda-forge flux-core flux-sched flux-pmix openmpi>=5 executorlib ``` -In addition, the `flux_executor_pmi_mode="pmix"` parameter has to be set for the `FluxJobExecutor` or the +In addition, the `executor_pmi_mode="pmix"` parameter has to be set for the `FluxJobExecutor` or the `FluxClusterExecutor` to switch to `pmix` as backend. ### Test Flux Framework diff --git a/executorlib/executor/flux.py b/executorlib/executor/flux.py index b0c6b207..e32ecb0b 100644 --- a/executorlib/executor/flux.py +++ b/executorlib/executor/flux.py @@ -44,7 +44,7 @@ class FluxJobExecutor(BaseExecutor): - error_log_file (str): Name of the error log file to use for storing exceptions raised by the Python functions submitted to the Executor. flux_executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux - flux_executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only) + executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only) flux_executor_nesting (bool): Provide hierarchically nested Flux job scheduler inside the submitted function. flux_log_files (bool, optional): Write flux stdout and stderr files. Defaults to False. hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the @@ -94,7 +94,7 @@ def __init__( max_cores: Optional[int] = None, resource_dict: Optional[dict] = None, flux_executor=None, - flux_executor_pmi_mode: Optional[str] = None, + executor_pmi_mode: Optional[str] = None, flux_executor_nesting: bool = False, flux_log_files: bool = False, hostname_localhost: Optional[bool] = None, @@ -131,7 +131,7 @@ def __init__( - error_log_file (str): Name of the error log file to use for storing exceptions raised by the Python functions submitted to the Executor. flux_executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux - flux_executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only) + executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None flux_executor_nesting (bool): Provide hierarchically nested Flux job scheduler inside the submitted function. flux_log_files (bool, optional): Write flux stdout and stderr files. Defaults to False. hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the @@ -176,7 +176,7 @@ def __init__( max_cores=max_cores, resource_dict=resource_dict, flux_executor=flux_executor, - flux_executor_pmi_mode=flux_executor_pmi_mode, + executor_pmi_mode=executor_pmi_mode, flux_executor_nesting=flux_executor_nesting, flux_log_files=flux_log_files, hostname_localhost=hostname_localhost, @@ -200,7 +200,7 @@ def __init__( max_cores=max_cores, resource_dict=resource_dict, flux_executor=flux_executor, - flux_executor_pmi_mode=flux_executor_pmi_mode, + executor_pmi_mode=executor_pmi_mode, flux_executor_nesting=flux_executor_nesting, flux_log_files=flux_log_files, hostname_localhost=hostname_localhost, @@ -236,7 +236,7 @@ class FluxClusterExecutor(BaseExecutor): - error_log_file (str): Name of the error log file to use for storing exceptions raised by the Python functions submitted to the Executor. pysqa_config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend). - flux_executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only) + executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only) hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the context of an HPC cluster this essential to be able to communicate to an Executor running on a different compute node within the same allocation. And @@ -284,7 +284,7 @@ def __init__( max_cores: Optional[int] = None, resource_dict: Optional[dict] = None, pysqa_config_directory: Optional[str] = None, - flux_executor_pmi_mode: Optional[str] = None, + executor_pmi_mode: Optional[str] = None, hostname_localhost: Optional[bool] = None, block_allocation: bool = False, init_function: Optional[Callable] = None, @@ -319,7 +319,7 @@ def __init__( - error_log_file (str): Name of the error log file to use for storing exceptions raised by the Python functions submitted to the Executor. pysqa_config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend). - flux_executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only) + executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the context of an HPC cluster this essential to be able to communicate to an Executor running on a different compute node within the same allocation. And @@ -369,7 +369,7 @@ def __init__( cache_directory=cache_directory, resource_dict=resource_dict, flux_executor=None, - executor_pmi_mode=flux_executor_pmi_mode, + executor_pmi_mode=executor_pmi_mode, flux_executor_nesting=False, flux_log_files=False, pysqa_config_directory=pysqa_config_directory, @@ -388,7 +388,7 @@ def __init__( max_cores=max_cores, resource_dict=resource_dict, flux_executor=None, - flux_executor_pmi_mode=None, + executor_pmi_mode=None, flux_executor_nesting=False, flux_log_files=False, hostname_localhost=hostname_localhost, @@ -409,7 +409,7 @@ def create_flux_executor( cache_directory: Optional[str] = None, resource_dict: Optional[dict] = None, flux_executor=None, - flux_executor_pmi_mode: Optional[str] = None, + executor_pmi_mode: Optional[str] = None, flux_executor_nesting: bool = False, flux_log_files: bool = False, hostname_localhost: Optional[bool] = None, @@ -438,7 +438,7 @@ def create_flux_executor( - error_log_file (str): Name of the error log file to use for storing exceptions raised by the Python functions submitted to the Executor. flux_executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux - flux_executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only) + executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None flux_executor_nesting (bool): Provide hierarchically nested Flux job scheduler inside the submitted function. flux_log_files (bool, optional): Write flux stdout and stderr files. Defaults to False. hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the @@ -470,7 +470,7 @@ def create_flux_executor( resource_dict["hostname_localhost"] = hostname_localhost resource_dict["log_obj_size"] = log_obj_size check_init_function(block_allocation=block_allocation, init_function=init_function) - check_pmi(backend="flux_allocation", pmi=flux_executor_pmi_mode) + check_pmi(backend="flux_allocation", pmi=executor_pmi_mode) check_oversubscribe(oversubscribe=resource_dict.get("openmpi_oversubscribe", False)) check_command_line_argument_lst( command_line_argument_lst=resource_dict.get("slurm_cmd_args", []) @@ -480,7 +480,7 @@ def create_flux_executor( if "slurm_cmd_args" in resource_dict: del resource_dict["slurm_cmd_args"] resource_dict["flux_executor"] = flux_executor - resource_dict["flux_executor_pmi_mode"] = flux_executor_pmi_mode + resource_dict["executor_pmi_mode"] = executor_pmi_mode resource_dict["flux_executor_nesting"] = flux_executor_nesting resource_dict["flux_log_files"] = flux_log_files if block_allocation: diff --git a/executorlib/executor/slurm.py b/executorlib/executor/slurm.py index b5da706e..831a7ff0 100644 --- a/executorlib/executor/slurm.py +++ b/executorlib/executor/slurm.py @@ -44,6 +44,7 @@ class SlurmClusterExecutor(BaseExecutor): - error_log_file (str): Name of the error log file to use for storing exceptions raised by the Python functions submitted to the Executor. pysqa_config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend). + executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the context of an HPC cluster this essential to be able to communicate to an Executor running on a different compute node within the same allocation. And @@ -91,6 +92,7 @@ def __init__( max_cores: Optional[int] = None, resource_dict: Optional[dict] = None, pysqa_config_directory: Optional[str] = None, + executor_pmi_mode: Optional[str] = None, hostname_localhost: Optional[bool] = None, block_allocation: bool = False, init_function: Optional[Callable] = None, @@ -125,6 +127,7 @@ def __init__( - error_log_file (str): Name of the error log file to use for storing exceptions raised by the Python functions submitted to the Executor. pysqa_config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend). + executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the context of an HPC cluster this essential to be able to communicate to an Executor running on a different compute node within the same allocation. And @@ -174,7 +177,7 @@ def __init__( cache_directory=cache_directory, resource_dict=resource_dict, flux_executor=None, - executor_pmi_mode=None, + executor_pmi_mode=executor_pmi_mode, flux_executor_nesting=False, flux_log_files=False, pysqa_config_directory=pysqa_config_directory, @@ -232,6 +235,7 @@ class SlurmJobExecutor(BaseExecutor): compute notes. Defaults to False. - error_log_file (str): Name of the error log file to use for storing exceptions raised by the Python functions submitted to the Executor. + executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the context of an HPC cluster this essential to be able to communicate to an Executor running on a different compute node within the same allocation. And @@ -278,6 +282,7 @@ def __init__( cache_directory: Optional[str] = None, max_cores: Optional[int] = None, resource_dict: Optional[dict] = None, + executor_pmi_mode: Optional[str] = None, hostname_localhost: Optional[bool] = None, block_allocation: bool = False, init_function: Optional[Callable] = None, @@ -315,6 +320,7 @@ def __init__( compute notes. Defaults to False. - error_log_file (str): Name of the error log file to use for storing exceptions raised by the Python functions submitted to the Executor. + executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the context of an HPC cluster this essential to be able to communicate to an Executor running on a different compute node within the same allocation. And @@ -356,6 +362,7 @@ def __init__( cache_directory=cache_directory, max_cores=max_cores, resource_dict=resource_dict, + executor_pmi_mode=executor_pmi_mode, hostname_localhost=hostname_localhost, block_allocation=block_allocation, init_function=init_function, @@ -376,6 +383,7 @@ def __init__( cache_directory=cache_directory, max_cores=max_cores, resource_dict=resource_dict, + executor_pmi_mode=executor_pmi_mode, hostname_localhost=hostname_localhost, block_allocation=block_allocation, init_function=init_function, @@ -389,11 +397,11 @@ def create_slurm_executor( max_cores: Optional[int] = None, cache_directory: Optional[str] = None, resource_dict: Optional[dict] = None, + executor_pmi_mode: Optional[str] = None, hostname_localhost: Optional[bool] = None, block_allocation: bool = False, init_function: Optional[Callable] = None, log_obj_size: bool = False, - executor_pmi_mode: Optional[str] = None, ) -> Union[OneProcessTaskScheduler, BlockAllocationTaskScheduler]: """ Create a SLURM executor @@ -419,6 +427,7 @@ def create_slurm_executor( compute notes. Defaults to False. - error_log_file (str): Name of the error log file to use for storing exceptions raised by the Python functions submitted to the Executor. + executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the context of an HPC cluster this essential to be able to communicate to an Executor running on a different compute node within the same allocation. And @@ -432,7 +441,6 @@ def create_slurm_executor( of the individual function. init_function (None): optional function to preset arguments for functions which are submitted later log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. - executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None Returns: InteractiveStepExecutor/ InteractiveExecutor diff --git a/executorlib/standalone/inputcheck.py b/executorlib/standalone/inputcheck.py index 56f39a5d..95b306b5 100644 --- a/executorlib/standalone/inputcheck.py +++ b/executorlib/standalone/inputcheck.py @@ -146,10 +146,10 @@ def check_hostname_localhost(hostname_localhost: Optional[bool]) -> None: ) -def check_flux_executor_pmi_mode(flux_executor_pmi_mode: Optional[str]) -> None: - if flux_executor_pmi_mode is not None: +def check_executor_pmi_mode(executor_pmi_mode: Optional[str]) -> None: + if executor_pmi_mode is not None: raise ValueError( - "The option to specify the flux pmi mode is not available with the pysqa based backend." + "The option to specify the pmi mode is not available on a local workstation, it requires SLURM or flux." ) diff --git a/executorlib/task_scheduler/file/task_scheduler.py b/executorlib/task_scheduler/file/task_scheduler.py index 7204036a..413c5f81 100644 --- a/executorlib/task_scheduler/file/task_scheduler.py +++ b/executorlib/task_scheduler/file/task_scheduler.py @@ -3,7 +3,7 @@ from executorlib.standalone.inputcheck import ( check_executor, - check_flux_executor_pmi_mode, + check_executor_pmi_mode, check_flux_log_files, check_hostname_localhost, check_max_workers_and_cores, @@ -107,8 +107,8 @@ def create_file_executor( ) if cache_directory is not None: resource_dict["cache_directory"] = cache_directory - if backend != "flux": - check_flux_executor_pmi_mode(flux_executor_pmi_mode=executor_pmi_mode) + if backend is None: + check_executor_pmi_mode(executor_pmi_mode=executor_pmi_mode) check_max_workers_and_cores(max_cores=max_cores, max_workers=max_workers) check_hostname_localhost(hostname_localhost=hostname_localhost) check_executor(executor=flux_executor) diff --git a/executorlib/task_scheduler/interactive/fluxspawner.py b/executorlib/task_scheduler/interactive/fluxspawner.py index 9cb4ed55..9f82cf33 100644 --- a/executorlib/task_scheduler/interactive/fluxspawner.py +++ b/executorlib/task_scheduler/interactive/fluxspawner.py @@ -36,7 +36,7 @@ class FluxPythonSpawner(BaseSpawner): priority (int, optional): job urgency 0 (lowest) through 31 (highest) (default is 16). Priorities 0 through 15 are restricted to the instance owner. flux_executor (flux.job.FluxExecutor, optional): The FluxExecutor instance. Defaults to None. - flux_executor_pmi_mode (str, optional): The PMI option. Defaults to None. + executor_pmi_mode (str, optional): The PMI option. Defaults to None. flux_executor_nesting (bool, optional): Whether to use nested FluxExecutor. Defaults to False. flux_log_files (bool, optional): Write flux stdout and stderr files. Defaults to False. """ @@ -52,7 +52,7 @@ def __init__( priority: Optional[int] = None, openmpi_oversubscribe: bool = False, flux_executor: Optional[flux.job.FluxExecutor] = None, - flux_executor_pmi_mode: Optional[str] = None, + executor_pmi_mode: Optional[str] = None, flux_executor_nesting: bool = False, flux_log_files: bool = False, ): @@ -66,7 +66,7 @@ def __init__( self._num_nodes = num_nodes self._exclusive = exclusive self._flux_executor = flux_executor - self._flux_executor_pmi_mode = flux_executor_pmi_mode + self._executor_pmi_mode = executor_pmi_mode self._flux_executor_nesting = flux_executor_nesting self._flux_log_files = flux_log_files self._priority = priority @@ -109,8 +109,8 @@ def bootup( exclusive=self._exclusive, ) jobspec.environment = dict(os.environ) - if self._flux_executor_pmi_mode is not None: - jobspec.setattr_shell_option("pmi", self._flux_executor_pmi_mode) + if self._executor_pmi_mode is not None: + jobspec.setattr_shell_option("pmi", self._executor_pmi_mode) if self._cwd is not None: jobspec.cwd = self._cwd if self._flux_log_files and self._cwd is not None: diff --git a/notebooks/3-hpc-job.ipynb b/notebooks/3-hpc-job.ipynb index c0552c68..ecabecc4 100644 --- a/notebooks/3-hpc-job.ipynb +++ b/notebooks/3-hpc-job.ipynb @@ -1 +1,501 @@ -{"metadata":{"kernelspec":{"display_name":"Flux","language":"python","name":"flux"},"language_info":{"codemirror_mode":{"name":"ipython","version":3},"file_extension":".py","mimetype":"text/x-python","name":"python","nbconvert_exporter":"python","pygments_lexer":"ipython3","version":"3.12.11"}},"nbformat_minor":5,"nbformat":4,"cells":[{"id":"87c3425d-5abe-4e0b-a948-e371808c322c","cell_type":"markdown","source":"# HPC Job Executor\nIn contrast to the [HPC Cluster Executor](https://executorlib.readthedocs.io/en/latest/2-hpc-cluster.html) which submits individual Python functions to HPC job schedulers, the HPC Job Executors take a given job allocation of the HPC job scheduler and executes Python functions with the resources available in this job allocation. In this regard it is similar to the [Single Node Executor](https://executorlib.readthedocs.io/en/latest/1-single-node.html) as it communicates with the individual Python processes using the [zero message queue](https://zeromq.org/), still it is more advanced as it can access the computational resources of all compute nodes of the given HPC job allocation and also provides the option to assign GPUs as accelerators for parallel execution.\n\nAvailable Functionality: \n* Submit Python functions with the [submit() function or the map() function](https://executorlib.readthedocs.io/en/latest/1-single-node.html#basic-functionality).\n* Support for parallel execution, either using the [message passing interface (MPI)](https://executorlib.readthedocs.io/en/latest/1-single-node.html#mpi-parallel-functions), [thread based parallelism](https://executorlib.readthedocs.io/en/latest/1-single-node.html#thread-parallel-functions) or by [assigning dedicated GPUs](https://executorlib.readthedocs.io/en/latest/2-hpc-cluster.html#resource-assignment) to selected Python functions. All these resources assignments are handled via the [resource dictionary parameter resource_dict](https://executorlib.readthedocs.io/en/latest/trouble_shooting.html#resource-dictionary).\n* Performance optimization features, like [block allocation](https://executorlib.readthedocs.io/en/latest/1-single-node.html#block-allocation), [dependency resolution](https://executorlib.readthedocs.io/en/latest/1-single-node.html#dependencies) and [caching](https://executorlib.readthedocs.io/en/latest/1-single-node.html#cache).\n\nThe only parameter the user has to change is the `backend` parameter. ","metadata":{}},{"id":"8c788b9f-6b54-4ce0-a864-4526b7f6f170","cell_type":"markdown","source":"## SLURM\nWith the [Simple Linux Utility for Resource Management (SLURM)](https://slurm.schedmd.com/) currently being the most commonly used job scheduler, executorlib provides an interface to submit Python functions to SLURM. Internally, this is based on the [srun](https://slurm.schedmd.com/srun.html) command of the SLURM scheduler, which creates job steps in a given allocation. Given that all resource requests in SLURM are communicated via a central database a large number of submitted Python functions and resulting job steps can slow down the performance of SLURM. To address this limitation it is recommended to install the hierarchical job scheduler [flux](https://flux-framework.org/) in addition to SLURM, to use flux for distributing the resources within a given allocation. This configuration is discussed in more detail below in the section [SLURM with flux](https://executorlib.readthedocs.io/en/latest/3-hpc-job.html#slurm-with-flux).","metadata":{}},{"id":"133b751f-0925-4d11-99f0-3f8dd9360b54","cell_type":"code","source":"from executorlib import SlurmJobExecutor","metadata":{"trusted":true},"outputs":[],"execution_count":1},{"id":"9b74944e-2ccd-4cb0-860a-d876310ea870","cell_type":"markdown","source":"```python\nwith SlurmAllocationExecutor() as exe:\n future = exe.submit(sum, [1, 1])\n print(future.result())\n```","metadata":{}},{"id":"36e2d68a-f093-4082-933a-d95bfe7a60c6","cell_type":"markdown","source":"## SLURM with Flux \nAs discussed in the installation section it is important to select the [flux](https://flux-framework.org/) version compatible to the installation of a given HPC cluster. Which GPUs are available? Who manufactured these GPUs? Does the HPC use [mpich](https://www.mpich.org/) or [OpenMPI](https://www.open-mpi.org/) or one of their commercial counter parts like cray MPI or intel MPI? Depending on the configuration different installation options can be choosen, as explained in the [installation section](https://executorlib.readthedocs.io/en/latest/installation.html#hpc-job-executor).\n\nAfterwards flux can be started in an [sbatch](https://slurm.schedmd.com/sbatch.html) submission script using:\n```\nsrun flux start python \n```\nIn this Python script `` the `\"flux_allocation\"` backend can be used.","metadata":{}},{"id":"68be70c3-af18-4165-862d-7022d35bf9e4","cell_type":"markdown","source":"### Resource Assignment\nIndependent of the selected Executor [Single Node Executor](https://executorlib.readthedocs.io/en/latest/1-single-node.html), [HPC Cluster Executor](https://executorlib.readthedocs.io/en/latest/2-hpc-cluster.html) or HPC job executor the assignment of the computational resources remains the same. They can either be specified in the `submit()` function by adding the resource dictionary parameter [resource_dict](https://executorlib.readthedocs.io/en/latest/trouble_shooting.html#resource-dictionary) or alternatively during the initialization of the `Executor` class by adding the resource dictionary parameter [resource_dict](https://executorlib.readthedocs.io/en/latest/trouble_shooting.html#resource-dictionary) there.\n\nThis functionality of executorlib is commonly used to rewrite individual Python functions to use MPI while the rest of the Python program remains serial.","metadata":{}},{"id":"8a2c08df-cfea-4783-ace6-68fcd8ebd330","cell_type":"code","source":"def calc_mpi(i):\n from mpi4py import MPI\n\n size = MPI.COMM_WORLD.Get_size()\n rank = MPI.COMM_WORLD.Get_rank()\n return i, size, rank","metadata":{"trusted":true},"outputs":[],"execution_count":2},{"id":"715e0c00-7b17-40bb-bd55-b0e097bfef07","cell_type":"markdown","source":"Depending on the choice of MPI version, it is recommended to specify the pmi standard which [flux](https://flux-framework.org/) should use internally for the resource assignment. For example for OpenMPI >=5 `\"pmix\"` is the recommended pmi standard.","metadata":{}},{"id":"5802c7d7-9560-4909-9d30-a915a91ac0a1","cell_type":"code","source":"from executorlib import FluxJobExecutor\n\nwith FluxJobExecutor(flux_executor_pmi_mode=\"pmix\") as exe:\n fs = exe.submit(calc_mpi, 3, resource_dict={\"cores\": 2})\n print(fs.result())","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"[(3, 2, 0), (3, 2, 1)]\n"}],"execution_count":3},{"id":"da862425-08b6-4ced-999f-89a74e85f410","cell_type":"markdown","source":"### Block Allocation\nThe block allocation for the HPC allocation mode follows the same implementation as the [block allocation for the Single Node Executor](https://executorlib.readthedocs.io/en/latest/1-single-node.html#block-allocation). It starts by defining the initialization function `init_function()` which returns a dictionary which is internally used to look up input parameters for Python functions submitted to the `FluxJobExecutor` class. Commonly this functionality is used to store large data objects inside the Python process created for the block allocation, rather than reloading these Python objects for each submitted function.","metadata":{}},{"id":"cdc742c0-35f7-47ff-88c0-1b0dbeabe51b","cell_type":"code","source":"def init_function():\n return {\"j\": 4, \"k\": 3, \"l\": 2}","metadata":{"trusted":true},"outputs":[],"execution_count":4},{"id":"5ddf8343-ab2c-4469-ac9f-ee568823d4ad","cell_type":"code","source":"def calc_with_preload(i, j, k):\n return i + j + k","metadata":{"trusted":true},"outputs":[],"execution_count":5},{"id":"0da13efa-1941-416f-b9e6-bba15b5cdfa2","cell_type":"code","source":"with FluxJobExecutor(\n flux_executor_pmi_mode=\"pmix\",\n max_workers=2,\n init_function=init_function,\n block_allocation=True,\n) as exe:\n fs = exe.submit(calc_with_preload, 2, j=5)\n print(fs.result())","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"10\n"}],"execution_count":6},{"id":"82f3b947-e662-4a0d-b590-9475e0b4f7dd","cell_type":"markdown","source":"In this example the parameter `k` is used from the dataset created by the initialization function while the parameters `i` and `j` are specified by the call of the `submit()` function. \n\nWhen using the block allocation mode, it is recommended to set either the maxium number of workers using the `max_workers` parameter or the maximum number of CPU cores using the `max_cores` parameter to prevent oversubscribing the available resources. ","metadata":{}},{"id":"8ced8359-8ecb-480b-966b-b85d8446d85c","cell_type":"markdown","source":"### Dependencies\nPython functions with rather different computational resource requirements should not be merged into a single function. So to able to execute a series of Python functions which each depend on the output of the previous Python function executorlib internally handles the dependencies based on the [concurrent futures future](https://docs.python.org/3/library/concurrent.futures.html#future-objects) objects from the Python standard library. This implementation is independent of the selected backend and works for HPC allocation mode just like explained in the [Single Node Executor](https://executorlib.readthedocs.io/en/latest/1-single-node.html#dependencies) section.","metadata":{}},{"id":"bd26d97b-46fd-4786-9ad1-1e534b31bf36","cell_type":"code","source":"def add_funct(a, b):\n return a + b","metadata":{"trusted":true},"outputs":[],"execution_count":7},{"id":"1a2d440f-3cfc-4ff2-b74d-e21823c65f69","cell_type":"code","source":"with FluxJobExecutor(flux_executor_pmi_mode=\"pmix\") as exe:\n future = 0\n for i in range(1, 4):\n future = exe.submit(add_funct, i, future)\n print(future.result())","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"6\n"}],"execution_count":8},{"id":"f526c2bf-fdf5-463b-a955-020753138415","cell_type":"markdown","source":"### Caching\nFinally, also the caching is available for HPC allocation mode, in analogy to the [Single Node Executor](https://executorlib.readthedocs.io/en/latest/1-single-node.html#cache). Again this functionality is not designed to identify function calls with the same parameters, but rather provides the option to reload previously cached results even after the Python processes which contained the executorlib `Executor` class is closed. As the cache is stored on the file system, this option can decrease the performance of executorlib. Consequently the caching option should primarily be used during the prototyping phase.","metadata":{}},{"id":"dcba63e0-72f5-49d1-ab04-2092fccc1c47","cell_type":"code","source":"with FluxJobExecutor(flux_executor_pmi_mode=\"pmix\", cache_directory=\"./file\") as exe:\n future_lst = [exe.submit(sum, [i, i]) for i in range(1, 4)]\n print([f.result() for f in future_lst])","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"[2, 4, 6]\n"}],"execution_count":9},{"id":"c3958a14-075b-4c10-9729-d1c559a9231c","cell_type":"code","source":"import os\nimport shutil\n\ncache_dir = \"./file\"\nif os.path.exists(cache_dir):\n print(os.listdir(cache_dir))\n try:\n shutil.rmtree(cache_dir)\n except OSError:\n pass","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"['sum89afbdf9da5eb1794f6976a3f01697c2_o.h5', 'sum0f7710227cda6456e5d07187702313f3_o.h5', 'sumf5ad27b855231a293ddd735a8554c9ea_o.h5']\n"}],"execution_count":10},{"id":"c24ca82d-60bd-4fb9-a082-bf9a81e838bf","cell_type":"markdown","source":"### Nested executors\nThe hierarchical nature of the [flux](https://flux-framework.org/) job scheduler allows the creation of additional executorlib Executors inside the functions submitted to the Executor. This hierarchy can be beneficial to separate the logic to saturate the available computational resources. ","metadata":{}},{"id":"06fb2d1f-65fc-4df6-9402-5e9837835484","cell_type":"code","source":"def calc_nested():\n from executorlib import FluxJobExecutor\n\n with FluxJobExecutor(flux_executor_pmi_mode=\"pmix\") as exe:\n fs = exe.submit(sum, [1, 1])\n return fs.result()","metadata":{"trusted":true},"outputs":[],"execution_count":11},{"id":"89b7d0fd-5978-4913-a79a-f26cc8047445","cell_type":"code","source":"with FluxJobExecutor(flux_executor_pmi_mode=\"pmix\", flux_executor_nesting=True) as exe:\n fs = exe.submit(calc_nested)\n print(fs.result())","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"2\n"}],"execution_count":12},{"id":"9f209925-1ce4-42e4-bbe5-becbb1f3cd79","cell_type":"markdown","source":"### Executor from Flux\nThe [flux framework](http://flux-framework.org/) provides its own [FluxExecutor](https://flux-framework.readthedocs.io/projects/flux-core/en/latest/python/autogenerated/flux.job.executor.html#flux.job.executor.FluxExecutor) which can be used to submit shell scripts to the [flux framework](http://flux-framework.org/) for execution. The [FluxExecutor](https://flux-framework.readthedocs.io/projects/flux-core/en/latest/python/autogenerated/flux.job.executor.html#flux.job.executor.FluxExecutor) returns its own representation of future objects which is incompatible with the [concurrent.futures.Future](https://docs.python.org/3/library/concurrent.futures.html) which is used by executorlib. Combining both provides the opportunity to link Python fucntions and external executables. For this purpose executorlib provides the option to use a [FluxExecutor](https://flux-framework.readthedocs.io/projects/flux-core/en/latest/python/autogenerated/flux.job.executor.html#flux.job.executor.FluxExecutor) as an input for the `FluxJobExecutor`:","metadata":{}},{"id":"3df0357e-d936-4989-a271-d0b03c6d0b48","cell_type":"code","source":"from executorlib import FluxJobExecutor\nimport flux.job\n\nwith flux.job.FluxExecutor() as flux_executor:\n with FluxJobExecutor(flux_executor=flux_executor) as exe:\n future = exe.submit(sum, [1, 1])\n print(future.result())","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"2\n"}],"execution_count":13},{"id":"34a8c690-ca5a-41d1-b38f-c67eff085750","cell_type":"markdown","source":"### Resource Monitoring\nFor debugging it is commonly helpful to keep track of the computational resources. [flux](https://flux-framework.org/) provides a number of features to analyse the resource utilization, so here only the two most commonly used ones are introduced. Starting with the option to list all the resources available in a given allocation with the `flux resource list` command:","metadata":{}},{"id":"7481eb0a-a41b-4d46-bb48-b4db299fcd86","cell_type":"code","source":"! flux resource list","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":" STATE NNODES NCORES NGPUS NODELIST\n free 1 24 0 jupyter-pyiron-executorlib-wx8wv67z\n allocated 0 0 0 \n down 0 0 0 \n"}],"execution_count":14},{"id":"08d98134-a0e0-4841-be82-e09e1af29e7f","cell_type":"markdown","source":"Followed by the list of jobs which were executed in a given flux session. This can be retrieved using the `flux jobs -a` command:","metadata":{}},{"id":"1ee6e147-f53a-4526-8ed0-fd036f2ee6bf","cell_type":"code","source":"! flux jobs -a","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":" JOBID USER NAME ST NTASKS NNODES TIME INFO\n\u001b[01;32m ƒ66TjsQs jovyan python CD 1 1 0.149s jupyter-pyiron-executorlib-wx8wv67z\n\u001b[0;0m\u001b[01;32m ƒ4R3m4Sj jovyan flux CD 1 1 3.509s jupyter-pyiron-executorlib-wx8wv67z\n\u001b[0;0m\u001b[01;32m ƒ3N4Qc3y jovyan python CD 1 1 1.922s jupyter-pyiron-executorlib-wx8wv67z\n\u001b[0;0m\u001b[01;32m ƒ3DuUZ9y jovyan python CD 1 1 2.291s jupyter-pyiron-executorlib-wx8wv67z\n\u001b[0;0m\u001b[01;32m ƒ3DrWabH jovyan python CD 1 1 2.204s jupyter-pyiron-executorlib-wx8wv67z\n\u001b[0;0m\u001b[01;32m ƒ2z9sDYT jovyan python CD 1 1 0.271s jupyter-pyiron-executorlib-wx8wv67z\n\u001b[0;0m\u001b[01;32m ƒ2m9FX6w jovyan python CD 1 1 0.404s jupyter-pyiron-executorlib-wx8wv67z\n\u001b[0;0m\u001b[01;32m ƒ2dGdLJj jovyan python CD 1 1 0.346s jupyter-pyiron-executorlib-wx8wv67z\n\u001b[0;0m\u001b[01;32m ƒ29qrcvj jovyan python CD 1 1 0.848s jupyter-pyiron-executorlib-wx8wv67z\n\u001b[0;0m\u001b[01;32m ƒ29tpbVR jovyan python CD 1 1 0.539s jupyter-pyiron-executorlib-wx8wv67z\n\u001b[0;0m\u001b[01;32m ƒZsZ5QT jovyan python CD 2 1 0.966s jupyter-pyiron-executorlib-wx8wv67z\n\u001b[0;0m"}],"execution_count":15},{"id":"021f165b-27cc-4676-968b-cbcfd1f0210a","cell_type":"markdown","source":"## Flux\nWhile the number of HPC clusters which use [flux](https://flux-framework.org/) as primary job scheduler is currently still limited the setup and functionality provided by executorlib for running [SLURM with flux](https://executorlib.readthedocs.io/en/latest/3-hpc-job.html#slurm-with-flux) also applies to HPCs which use [flux](https://flux-framework.org/) as primary job scheduler.","metadata":{}},{"id":"04f03ebb-3f9e-4738-b9d2-5cb0db9b63c3","cell_type":"code","source":"","metadata":{"trusted":true},"outputs":[],"execution_count":null}]} \ No newline at end of file +{ + "cells": [ + { + "cell_type": "markdown", + "id": "87c3425d-5abe-4e0b-a948-e371808c322c", + "metadata": {}, + "source": [ + "# HPC Job Executor\n", + "In contrast to the [HPC Cluster Executor](https://executorlib.readthedocs.io/en/latest/2-hpc-cluster.html) which submits individual Python functions to HPC job schedulers, the HPC Job Executors take a given job allocation of the HPC job scheduler and executes Python functions with the resources available in this job allocation. In this regard it is similar to the [Single Node Executor](https://executorlib.readthedocs.io/en/latest/1-single-node.html) as it communicates with the individual Python processes using the [zero message queue](https://zeromq.org/), still it is more advanced as it can access the computational resources of all compute nodes of the given HPC job allocation and also provides the option to assign GPUs as accelerators for parallel execution.\n", + "\n", + "Available Functionality: \n", + "* Submit Python functions with the [submit() function or the map() function](https://executorlib.readthedocs.io/en/latest/1-single-node.html#basic-functionality).\n", + "* Support for parallel execution, either using the [message passing interface (MPI)](https://executorlib.readthedocs.io/en/latest/1-single-node.html#mpi-parallel-functions), [thread based parallelism](https://executorlib.readthedocs.io/en/latest/1-single-node.html#thread-parallel-functions) or by [assigning dedicated GPUs](https://executorlib.readthedocs.io/en/latest/2-hpc-cluster.html#resource-assignment) to selected Python functions. All these resources assignments are handled via the [resource dictionary parameter resource_dict](https://executorlib.readthedocs.io/en/latest/trouble_shooting.html#resource-dictionary).\n", + "* Performance optimization features, like [block allocation](https://executorlib.readthedocs.io/en/latest/1-single-node.html#block-allocation), [dependency resolution](https://executorlib.readthedocs.io/en/latest/1-single-node.html#dependencies) and [caching](https://executorlib.readthedocs.io/en/latest/1-single-node.html#cache).\n", + "\n", + "The only parameter the user has to change is the `backend` parameter. " + ] + }, + { + "cell_type": "markdown", + "id": "8c788b9f-6b54-4ce0-a864-4526b7f6f170", + "metadata": {}, + "source": [ + "## SLURM\n", + "With the [Simple Linux Utility for Resource Management (SLURM)](https://slurm.schedmd.com/) currently being the most commonly used job scheduler, executorlib provides an interface to submit Python functions to SLURM. Internally, this is based on the [srun](https://slurm.schedmd.com/srun.html) command of the SLURM scheduler, which creates job steps in a given allocation. Given that all resource requests in SLURM are communicated via a central database a large number of submitted Python functions and resulting job steps can slow down the performance of SLURM. To address this limitation it is recommended to install the hierarchical job scheduler [flux](https://flux-framework.org/) in addition to SLURM, to use flux for distributing the resources within a given allocation. This configuration is discussed in more detail below in the section [SLURM with flux](https://executorlib.readthedocs.io/en/latest/3-hpc-job.html#slurm-with-flux)." + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "id": "133b751f-0925-4d11-99f0-3f8dd9360b54", + "metadata": { + "trusted": true + }, + "outputs": [], + "source": [ + "from executorlib import SlurmJobExecutor" + ] + }, + { + "cell_type": "markdown", + "id": "9b74944e-2ccd-4cb0-860a-d876310ea870", + "metadata": {}, + "source": [ + "```python\n", + "with SlurmAllocationExecutor() as exe:\n", + " future = exe.submit(sum, [1, 1])\n", + " print(future.result())\n", + "```" + ] + }, + { + "cell_type": "markdown", + "id": "36e2d68a-f093-4082-933a-d95bfe7a60c6", + "metadata": {}, + "source": [ + "## SLURM with Flux \n", + "As discussed in the installation section it is important to select the [flux](https://flux-framework.org/) version compatible to the installation of a given HPC cluster. Which GPUs are available? Who manufactured these GPUs? Does the HPC use [mpich](https://www.mpich.org/) or [OpenMPI](https://www.open-mpi.org/) or one of their commercial counter parts like cray MPI or intel MPI? Depending on the configuration different installation options can be choosen, as explained in the [installation section](https://executorlib.readthedocs.io/en/latest/installation.html#hpc-job-executor).\n", + "\n", + "Afterwards flux can be started in an [sbatch](https://slurm.schedmd.com/sbatch.html) submission script using:\n", + "```\n", + "srun flux start python \n", + "```\n", + "In this Python script `` the `\"flux_allocation\"` backend can be used." + ] + }, + { + "cell_type": "markdown", + "id": "68be70c3-af18-4165-862d-7022d35bf9e4", + "metadata": {}, + "source": [ + "### Resource Assignment\n", + "Independent of the selected Executor [Single Node Executor](https://executorlib.readthedocs.io/en/latest/1-single-node.html), [HPC Cluster Executor](https://executorlib.readthedocs.io/en/latest/2-hpc-cluster.html) or HPC job executor the assignment of the computational resources remains the same. They can either be specified in the `submit()` function by adding the resource dictionary parameter [resource_dict](https://executorlib.readthedocs.io/en/latest/trouble_shooting.html#resource-dictionary) or alternatively during the initialization of the `Executor` class by adding the resource dictionary parameter [resource_dict](https://executorlib.readthedocs.io/en/latest/trouble_shooting.html#resource-dictionary) there.\n", + "\n", + "This functionality of executorlib is commonly used to rewrite individual Python functions to use MPI while the rest of the Python program remains serial." + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "id": "8a2c08df-cfea-4783-ace6-68fcd8ebd330", + "metadata": { + "trusted": true + }, + "outputs": [], + "source": [ + "def calc_mpi(i):\n", + " from mpi4py import MPI\n", + "\n", + " size = MPI.COMM_WORLD.Get_size()\n", + " rank = MPI.COMM_WORLD.Get_rank()\n", + " return i, size, rank" + ] + }, + { + "cell_type": "markdown", + "id": "715e0c00-7b17-40bb-bd55-b0e097bfef07", + "metadata": {}, + "source": [ + "Depending on the choice of MPI version, it is recommended to specify the pmi standard which [flux](https://flux-framework.org/) should use internally for the resource assignment. For example for OpenMPI >=5 `\"pmix\"` is the recommended pmi standard." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "5802c7d7-9560-4909-9d30-a915a91ac0a1", + "metadata": { + "trusted": true + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "[(3, 2, 0), (3, 2, 1)]\n" + ] + } + ], + "source": [ + "from executorlib import FluxJobExecutor\n", + "\n", + "with FluxJobExecutor(executor_pmi_mode=\"pmix\") as exe:\n", + " fs = exe.submit(calc_mpi, 3, resource_dict={\"cores\": 2})\n", + " print(fs.result())" + ] + }, + { + "cell_type": "markdown", + "id": "da862425-08b6-4ced-999f-89a74e85f410", + "metadata": {}, + "source": [ + "### Block Allocation\n", + "The block allocation for the HPC allocation mode follows the same implementation as the [block allocation for the Single Node Executor](https://executorlib.readthedocs.io/en/latest/1-single-node.html#block-allocation). It starts by defining the initialization function `init_function()` which returns a dictionary which is internally used to look up input parameters for Python functions submitted to the `FluxJobExecutor` class. Commonly this functionality is used to store large data objects inside the Python process created for the block allocation, rather than reloading these Python objects for each submitted function." + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "id": "cdc742c0-35f7-47ff-88c0-1b0dbeabe51b", + "metadata": { + "trusted": true + }, + "outputs": [], + "source": [ + "def init_function():\n", + " return {\"j\": 4, \"k\": 3, \"l\": 2}" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "id": "5ddf8343-ab2c-4469-ac9f-ee568823d4ad", + "metadata": { + "trusted": true + }, + "outputs": [], + "source": [ + "def calc_with_preload(i, j, k):\n", + " return i + j + k" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "0da13efa-1941-416f-b9e6-bba15b5cdfa2", + "metadata": { + "trusted": true + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "10\n" + ] + } + ], + "source": [ + "with FluxJobExecutor(\n", + " executor_pmi_mode=\"pmix\",\n", + " max_workers=2,\n", + " init_function=init_function,\n", + " block_allocation=True,\n", + ") as exe:\n", + " fs = exe.submit(calc_with_preload, 2, j=5)\n", + " print(fs.result())" + ] + }, + { + "cell_type": "markdown", + "id": "82f3b947-e662-4a0d-b590-9475e0b4f7dd", + "metadata": {}, + "source": [ + "In this example the parameter `k` is used from the dataset created by the initialization function while the parameters `i` and `j` are specified by the call of the `submit()` function. \n", + "\n", + "When using the block allocation mode, it is recommended to set either the maxium number of workers using the `max_workers` parameter or the maximum number of CPU cores using the `max_cores` parameter to prevent oversubscribing the available resources. " + ] + }, + { + "cell_type": "markdown", + "id": "8ced8359-8ecb-480b-966b-b85d8446d85c", + "metadata": {}, + "source": [ + "### Dependencies\n", + "Python functions with rather different computational resource requirements should not be merged into a single function. So to able to execute a series of Python functions which each depend on the output of the previous Python function executorlib internally handles the dependencies based on the [concurrent futures future](https://docs.python.org/3/library/concurrent.futures.html#future-objects) objects from the Python standard library. This implementation is independent of the selected backend and works for HPC allocation mode just like explained in the [Single Node Executor](https://executorlib.readthedocs.io/en/latest/1-single-node.html#dependencies) section." + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "id": "bd26d97b-46fd-4786-9ad1-1e534b31bf36", + "metadata": { + "trusted": true + }, + "outputs": [], + "source": [ + "def add_funct(a, b):\n", + " return a + b" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "1a2d440f-3cfc-4ff2-b74d-e21823c65f69", + "metadata": { + "trusted": true + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "6\n" + ] + } + ], + "source": [ + "with FluxJobExecutor(executor_pmi_mode=\"pmix\") as exe:\n", + " future = 0\n", + " for i in range(1, 4):\n", + " future = exe.submit(add_funct, i, future)\n", + " print(future.result())" + ] + }, + { + "cell_type": "markdown", + "id": "f526c2bf-fdf5-463b-a955-020753138415", + "metadata": {}, + "source": [ + "### Caching\n", + "Finally, also the caching is available for HPC allocation mode, in analogy to the [Single Node Executor](https://executorlib.readthedocs.io/en/latest/1-single-node.html#cache). Again this functionality is not designed to identify function calls with the same parameters, but rather provides the option to reload previously cached results even after the Python processes which contained the executorlib `Executor` class is closed. As the cache is stored on the file system, this option can decrease the performance of executorlib. Consequently the caching option should primarily be used during the prototyping phase." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "dcba63e0-72f5-49d1-ab04-2092fccc1c47", + "metadata": { + "trusted": true + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "[2, 4, 6]\n" + ] + } + ], + "source": [ + "with FluxJobExecutor(executor_pmi_mode=\"pmix\", cache_directory=\"./file\") as exe:\n", + " future_lst = [exe.submit(sum, [i, i]) for i in range(1, 4)]\n", + " print([f.result() for f in future_lst])" + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "id": "c3958a14-075b-4c10-9729-d1c559a9231c", + "metadata": { + "trusted": true + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "['sum89afbdf9da5eb1794f6976a3f01697c2_o.h5', 'sum0f7710227cda6456e5d07187702313f3_o.h5', 'sumf5ad27b855231a293ddd735a8554c9ea_o.h5']\n" + ] + } + ], + "source": [ + "import os\n", + "import shutil\n", + "\n", + "cache_dir = \"./file\"\n", + "if os.path.exists(cache_dir):\n", + " print(os.listdir(cache_dir))\n", + " try:\n", + " shutil.rmtree(cache_dir)\n", + " except OSError:\n", + " pass" + ] + }, + { + "cell_type": "markdown", + "id": "c24ca82d-60bd-4fb9-a082-bf9a81e838bf", + "metadata": {}, + "source": [ + "### Nested executors\n", + "The hierarchical nature of the [flux](https://flux-framework.org/) job scheduler allows the creation of additional executorlib Executors inside the functions submitted to the Executor. This hierarchy can be beneficial to separate the logic to saturate the available computational resources. " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "06fb2d1f-65fc-4df6-9402-5e9837835484", + "metadata": { + "trusted": true + }, + "outputs": [], + "source": [ + "def calc_nested():\n", + " from executorlib import FluxJobExecutor\n", + "\n", + " with FluxJobExecutor(executor_pmi_mode=\"pmix\") as exe:\n", + " fs = exe.submit(sum, [1, 1])\n", + " return fs.result()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "89b7d0fd-5978-4913-a79a-f26cc8047445", + "metadata": { + "trusted": true + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "2\n" + ] + } + ], + "source": [ + "with FluxJobExecutor(executor_pmi_mode=\"pmix\", flux_executor_nesting=True) as exe:\n", + " fs = exe.submit(calc_nested)\n", + " print(fs.result())" + ] + }, + { + "cell_type": "markdown", + "id": "9f209925-1ce4-42e4-bbe5-becbb1f3cd79", + "metadata": {}, + "source": [ + "### Executor from Flux\n", + "The [flux framework](http://flux-framework.org/) provides its own [FluxExecutor](https://flux-framework.readthedocs.io/projects/flux-core/en/latest/python/autogenerated/flux.job.executor.html#flux.job.executor.FluxExecutor) which can be used to submit shell scripts to the [flux framework](http://flux-framework.org/) for execution. The [FluxExecutor](https://flux-framework.readthedocs.io/projects/flux-core/en/latest/python/autogenerated/flux.job.executor.html#flux.job.executor.FluxExecutor) returns its own representation of future objects which is incompatible with the [concurrent.futures.Future](https://docs.python.org/3/library/concurrent.futures.html) which is used by executorlib. Combining both provides the opportunity to link Python fucntions and external executables. For this purpose executorlib provides the option to use a [FluxExecutor](https://flux-framework.readthedocs.io/projects/flux-core/en/latest/python/autogenerated/flux.job.executor.html#flux.job.executor.FluxExecutor) as an input for the `FluxJobExecutor`:" + ] + }, + { + "cell_type": "code", + "execution_count": 13, + "id": "3df0357e-d936-4989-a271-d0b03c6d0b48", + "metadata": { + "trusted": true + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "2\n" + ] + } + ], + "source": [ + "from executorlib import FluxJobExecutor\n", + "import flux.job\n", + "\n", + "with flux.job.FluxExecutor() as flux_executor:\n", + " with FluxJobExecutor(flux_executor=flux_executor) as exe:\n", + " future = exe.submit(sum, [1, 1])\n", + " print(future.result())" + ] + }, + { + "cell_type": "markdown", + "id": "34a8c690-ca5a-41d1-b38f-c67eff085750", + "metadata": {}, + "source": [ + "### Resource Monitoring\n", + "For debugging it is commonly helpful to keep track of the computational resources. [flux](https://flux-framework.org/) provides a number of features to analyse the resource utilization, so here only the two most commonly used ones are introduced. Starting with the option to list all the resources available in a given allocation with the `flux resource list` command:" + ] + }, + { + "cell_type": "code", + "execution_count": 14, + "id": "7481eb0a-a41b-4d46-bb48-b4db299fcd86", + "metadata": { + "trusted": true + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + " STATE NNODES NCORES NGPUS NODELIST\n", + " free 1 24 0 jupyter-pyiron-executorlib-wx8wv67z\n", + " allocated 0 0 0 \n", + " down 0 0 0 \n" + ] + } + ], + "source": [ + "! flux resource list" + ] + }, + { + "cell_type": "markdown", + "id": "08d98134-a0e0-4841-be82-e09e1af29e7f", + "metadata": {}, + "source": [ + "Followed by the list of jobs which were executed in a given flux session. This can be retrieved using the `flux jobs -a` command:" + ] + }, + { + "cell_type": "code", + "execution_count": 15, + "id": "1ee6e147-f53a-4526-8ed0-fd036f2ee6bf", + "metadata": { + "trusted": true + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + " JOBID USER NAME ST NTASKS NNODES TIME INFO\n", + "\u001b[01;32m ƒ66TjsQs jovyan python CD 1 1 0.149s jupyter-pyiron-executorlib-wx8wv67z\n", + "\u001b[0;0m\u001b[01;32m ƒ4R3m4Sj jovyan flux CD 1 1 3.509s jupyter-pyiron-executorlib-wx8wv67z\n", + "\u001b[0;0m\u001b[01;32m ƒ3N4Qc3y jovyan python CD 1 1 1.922s jupyter-pyiron-executorlib-wx8wv67z\n", + "\u001b[0;0m\u001b[01;32m ƒ3DuUZ9y jovyan python CD 1 1 2.291s jupyter-pyiron-executorlib-wx8wv67z\n", + "\u001b[0;0m\u001b[01;32m ƒ3DrWabH jovyan python CD 1 1 2.204s jupyter-pyiron-executorlib-wx8wv67z\n", + "\u001b[0;0m\u001b[01;32m ƒ2z9sDYT jovyan python CD 1 1 0.271s jupyter-pyiron-executorlib-wx8wv67z\n", + "\u001b[0;0m\u001b[01;32m ƒ2m9FX6w jovyan python CD 1 1 0.404s jupyter-pyiron-executorlib-wx8wv67z\n", + "\u001b[0;0m\u001b[01;32m ƒ2dGdLJj jovyan python CD 1 1 0.346s jupyter-pyiron-executorlib-wx8wv67z\n", + "\u001b[0;0m\u001b[01;32m ƒ29qrcvj jovyan python CD 1 1 0.848s jupyter-pyiron-executorlib-wx8wv67z\n", + "\u001b[0;0m\u001b[01;32m ƒ29tpbVR jovyan python CD 1 1 0.539s jupyter-pyiron-executorlib-wx8wv67z\n", + "\u001b[0;0m\u001b[01;32m ƒZsZ5QT jovyan python CD 2 1 0.966s jupyter-pyiron-executorlib-wx8wv67z\n", + "\u001b[0;0m" + ] + } + ], + "source": [ + "! flux jobs -a" + ] + }, + { + "cell_type": "markdown", + "id": "021f165b-27cc-4676-968b-cbcfd1f0210a", + "metadata": {}, + "source": [ + "## Flux\n", + "While the number of HPC clusters which use [flux](https://flux-framework.org/) as primary job scheduler is currently still limited the setup and functionality provided by executorlib for running [SLURM with flux](https://executorlib.readthedocs.io/en/latest/3-hpc-job.html#slurm-with-flux) also applies to HPCs which use [flux](https://flux-framework.org/) as primary job scheduler." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "04f03ebb-3f9e-4738-b9d2-5cb0db9b63c3", + "metadata": { + "trusted": true + }, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Flux", + "language": "python", + "name": "flux" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.11" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/tests/test_fluxclusterexecutor.py b/tests/test_fluxclusterexecutor.py index 582d9f8c..9c721b96 100644 --- a/tests/test_fluxclusterexecutor.py +++ b/tests/test_fluxclusterexecutor.py @@ -41,7 +41,7 @@ def test_executor(self): resource_dict={"cores": 2, "cwd": "executorlib_cache"}, block_allocation=False, cache_directory="executorlib_cache", - flux_executor_pmi_mode=pmi, + executor_pmi_mode=pmi, ) as exe: cloudpickle_register(ind=1) fs1 = exe.submit(mpi_funct, 1) @@ -55,7 +55,7 @@ def test_executor_no_cwd(self): resource_dict={"cores": 2}, block_allocation=False, cache_directory="executorlib_cache", - flux_executor_pmi_mode=pmi, + executor_pmi_mode=pmi, ) as exe: cloudpickle_register(ind=1) fs1 = exe.submit(mpi_funct, 1) @@ -83,7 +83,7 @@ def test_executor_existing_files(self): resource_dict={"cores": 2, "cwd": "executorlib_cache"}, block_allocation=False, cache_directory="executorlib_cache", - flux_executor_pmi_mode=pmi, + executor_pmi_mode=pmi, ) as exe: cloudpickle_register(ind=1) fs1 = exe.submit(mpi_funct, 1) @@ -102,7 +102,7 @@ def test_executor_existing_files(self): resource_dict={"cores": 2, "cwd": "executorlib_cache"}, block_allocation=False, cache_directory="executorlib_cache", - flux_executor_pmi_mode=pmi, + executor_pmi_mode=pmi, ) as exe: cloudpickle_register(ind=1) fs1 = exe.submit(mpi_funct, 1) diff --git a/tests/test_fluxjobexecutor.py b/tests/test_fluxjobexecutor.py index 8cfa8e9a..2aeaef2d 100644 --- a/tests/test_fluxjobexecutor.py +++ b/tests/test_fluxjobexecutor.py @@ -90,7 +90,7 @@ def test_flux_executor_parallel(self): resource_dict={"cores": 2}, flux_executor=self.executor, block_allocation=True, - flux_executor_pmi_mode=pmi, + executor_pmi_mode=pmi, ) as exe: fs_1 = exe.submit(mpi_funct, 1) self.assertEqual(fs_1.result(), [(1, 2, 0), (1, 2, 1)]) @@ -102,7 +102,7 @@ def test_single_task(self): resource_dict={"cores": 2}, flux_executor=self.executor, block_allocation=True, - flux_executor_pmi_mode=pmi, + executor_pmi_mode=pmi, ) as p: output = p.map(mpi_funct, [1, 2, 3]) self.assertEqual( diff --git a/tests/test_fluxpythonspawner.py b/tests/test_fluxpythonspawner.py index bf8eb939..a29e6c3a 100644 --- a/tests/test_fluxpythonspawner.py +++ b/tests/test_fluxpythonspawner.py @@ -82,7 +82,7 @@ def test_flux_executor_parallel(self): executor_kwargs={ "flux_executor": self.flux_executor, "cores": 2, - "flux_executor_pmi_mode": pmi, + "executor_pmi_mode": pmi, }, spawner=FluxPythonSpawner, ) as exe: @@ -96,7 +96,7 @@ def test_single_task(self): executor_kwargs={ "flux_executor": self.flux_executor, "cores": 2, - "flux_executor_pmi_mode": pmi, + "executor_pmi_mode": pmi, }, spawner=FluxPythonSpawner, ) as p: diff --git a/tests/test_slurmclusterexecutor.py b/tests/test_slurmclusterexecutor.py index 7b07c414..998711b2 100644 --- a/tests/test_slurmclusterexecutor.py +++ b/tests/test_slurmclusterexecutor.py @@ -50,6 +50,7 @@ def test_executor(self): resource_dict={"cores": 2, "cwd": "executorlib_cache", "submission_template": submission_template}, block_allocation=False, cache_directory="executorlib_cache", + executor_pmi_mode="pmi2", ) as exe: cloudpickle_register(ind=1) fs1 = exe.submit(mpi_funct, 1) @@ -63,6 +64,7 @@ def test_executor_no_cwd(self): resource_dict={"cores": 2, "submission_template": submission_template}, block_allocation=False, cache_directory="executorlib_cache", + executor_pmi_mode="pmi2", ) as exe: cloudpickle_register(ind=1) fs1 = exe.submit(mpi_funct, 1) @@ -76,6 +78,7 @@ def test_executor_existing_files(self): resource_dict={"cores": 2, "cwd": "executorlib_cache", "submission_template": submission_template}, block_allocation=False, cache_directory="executorlib_cache", + executor_pmi_mode="pmi2", ) as exe: cloudpickle_register(ind=1) fs1 = exe.submit(mpi_funct, 1) @@ -94,6 +97,7 @@ def test_executor_existing_files(self): resource_dict={"cores": 2, "cwd": "executorlib_cache", "submission_template": submission_template}, block_allocation=False, cache_directory="executorlib_cache", + executor_pmi_mode="pmi2", ) as exe: cloudpickle_register(ind=1) fs1 = exe.submit(mpi_funct, 1) diff --git a/tests/test_standalone_inputcheck.py b/tests/test_standalone_inputcheck.py index d1d74df1..5d7413fc 100644 --- a/tests/test_standalone_inputcheck.py +++ b/tests/test_standalone_inputcheck.py @@ -13,7 +13,7 @@ check_refresh_rate, check_resource_dict, check_resource_dict_is_empty, - check_flux_executor_pmi_mode, + check_executor_pmi_mode, check_max_workers_and_cores, check_hostname_localhost, check_pysqa_config_directory, @@ -77,9 +77,9 @@ def test_check_plot_dependency_graph(self): with self.assertRaises(ValueError): check_plot_dependency_graph(plot_dependency_graph=True) - def test_check_flux_executor_pmi_mode(self): + def test_check_executor_pmi_mode(self): with self.assertRaises(ValueError): - check_flux_executor_pmi_mode(flux_executor_pmi_mode="test") + check_executor_pmi_mode(executor_pmi_mode="test") def test_check_max_workers_and_cores(self): with self.assertRaises(ValueError): From 0b693cf613fb329c0ac8ea14c447081e3876a579 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sun, 27 Jul 2025 19:53:55 +0200 Subject: [PATCH 26/30] fixes --- executorlib/standalone/command.py | 2 +- tests/test_standalone_command.py | 9 ++++++++- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/executorlib/standalone/command.py b/executorlib/standalone/command.py index fdb0e6ea..532af39b 100644 --- a/executorlib/standalone/command.py +++ b/executorlib/standalone/command.py @@ -55,7 +55,7 @@ def get_cache_execute_command( elif backend == "flux": flux_command = ["flux", "run"] if executor_pmi_mode is not None: - flux_command += ["pmi=" + executor_pmi_mode] + flux_command += ["-o", "pmi=" + executor_pmi_mode] command_lst = ( flux_command + ["-n", str(cores)] diff --git a/tests/test_standalone_command.py b/tests/test_standalone_command.py index bf6b24ef..f0b50a2c 100644 --- a/tests/test_standalone_command.py +++ b/tests/test_standalone_command.py @@ -51,7 +51,7 @@ def test_get_cache_execute_command_parallel(self): self.assertEqual(output[3], sys.executable) self.assertEqual(output[4].split(os.sep)[-1], "cache_parallel.py") self.assertEqual(output[5], file_name) - output = get_cache_execute_command(cores=2, file_name=file_name, backend="slurm") + output = get_cache_execute_command(cores=2, file_name=file_name, backend="slurm", executor_pmi_mode="pmi2") self.assertEqual(output[0], "srun") self.assertEqual(output[1], "-n") self.assertEqual(output[2], str(2)) @@ -59,6 +59,13 @@ def test_get_cache_execute_command_parallel(self): self.assertEqual(output[4], sys.executable) self.assertEqual(output[5].split(os.sep)[-1], "cache_parallel.py") self.assertEqual(output[6], file_name) + output = get_cache_execute_command(cores=2, file_name=file_name, backend="slurm") + self.assertEqual(output[0], "srun") + self.assertEqual(output[1], "-n") + self.assertEqual(output[2], str(2)) + self.assertEqual(output[3], sys.executable) + self.assertEqual(output[4].split(os.sep)[-1], "cache_parallel.py") + self.assertEqual(output[5], file_name) output = get_cache_execute_command(cores=2, file_name=file_name, backend="flux") self.assertEqual(output[0], "flux") self.assertEqual(output[1], "run") From 4e7304155fc540f9c6b2653b98af010008249789 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sun, 27 Jul 2025 19:59:27 +0200 Subject: [PATCH 27/30] extend tests --- tests/test_standalone_interactive_backend.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/test_standalone_interactive_backend.py b/tests/test_standalone_interactive_backend.py index cfa961af..bada59bd 100644 --- a/tests/test_standalone_interactive_backend.py +++ b/tests/test_standalone_interactive_backend.py @@ -84,6 +84,7 @@ def test_command_slurm_user_command(self): "2", "-D", os.path.abspath("."), + "--mpi=pmi2", "--gpus-per-task=1", "--oversubscribe", "--account=test", @@ -101,6 +102,7 @@ def test_command_slurm_user_command(self): gpus_per_core=1, openmpi_oversubscribe=True, slurm_cmd_args=["--account=test", "--job-name=executorlib"], + executor_pmi_mode="pmi2", ) self.assertEqual( command_lst, From 21e446567e75374676f1a06fafcb3138fecaa413 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sun, 27 Jul 2025 20:25:29 +0200 Subject: [PATCH 28/30] rename to pmi_mode --- docs/installation.md | 2 +- executorlib/executor/flux.py | 28 +++++++++---------- executorlib/executor/single.py | 2 +- executorlib/executor/slurm.py | 22 +++++++-------- executorlib/standalone/command.py | 8 +++--- executorlib/standalone/inputcheck.py | 4 +-- executorlib/task_scheduler/file/shared.py | 6 ++-- .../task_scheduler/file/task_scheduler.py | 14 +++++----- .../task_scheduler/interactive/fluxspawner.py | 10 +++---- .../interactive/slurmspawner.py | 16 +++++------ notebooks/3-hpc-job.ipynb | 12 ++++---- tests/test_fluxclusterexecutor.py | 8 +++--- tests/test_fluxjobexecutor.py | 4 +-- tests/test_fluxpythonspawner.py | 4 +-- tests/test_slurmclusterexecutor.py | 8 +++--- tests/test_standalone_command.py | 4 +-- tests/test_standalone_inputcheck.py | 6 ++-- tests/test_standalone_interactive_backend.py | 2 +- 18 files changed, 80 insertions(+), 80 deletions(-) diff --git a/docs/installation.md b/docs/installation.md index 14a3948d..3380bcff 100644 --- a/docs/installation.md +++ b/docs/installation.md @@ -120,7 +120,7 @@ For the version 5 of openmpi the backend changed to `pmix`, this requires the ad ``` conda install -c conda-forge flux-core flux-sched flux-pmix openmpi>=5 executorlib ``` -In addition, the `executor_pmi_mode="pmix"` parameter has to be set for the `FluxJobExecutor` or the +In addition, the `pmi_mode="pmix"` parameter has to be set for the `FluxJobExecutor` or the `FluxClusterExecutor` to switch to `pmix` as backend. ### Test Flux Framework diff --git a/executorlib/executor/flux.py b/executorlib/executor/flux.py index e32ecb0b..864548d6 100644 --- a/executorlib/executor/flux.py +++ b/executorlib/executor/flux.py @@ -43,8 +43,8 @@ class FluxJobExecutor(BaseExecutor): compute notes. Defaults to False. - error_log_file (str): Name of the error log file to use for storing exceptions raised by the Python functions submitted to the Executor. + pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None flux_executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux - executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only) flux_executor_nesting (bool): Provide hierarchically nested Flux job scheduler inside the submitted function. flux_log_files (bool, optional): Write flux stdout and stderr files. Defaults to False. hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the @@ -93,8 +93,8 @@ def __init__( cache_directory: Optional[str] = None, max_cores: Optional[int] = None, resource_dict: Optional[dict] = None, + pmi_mode: Optional[str] = None, flux_executor=None, - executor_pmi_mode: Optional[str] = None, flux_executor_nesting: bool = False, flux_log_files: bool = False, hostname_localhost: Optional[bool] = None, @@ -130,8 +130,8 @@ def __init__( compute notes. Defaults to False. - error_log_file (str): Name of the error log file to use for storing exceptions raised by the Python functions submitted to the Executor. + pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None flux_executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux - executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None flux_executor_nesting (bool): Provide hierarchically nested Flux job scheduler inside the submitted function. flux_log_files (bool, optional): Write flux stdout and stderr files. Defaults to False. hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the @@ -175,8 +175,8 @@ def __init__( cache_directory=cache_directory, max_cores=max_cores, resource_dict=resource_dict, + pmi_mode=pmi_mode, flux_executor=flux_executor, - executor_pmi_mode=executor_pmi_mode, flux_executor_nesting=flux_executor_nesting, flux_log_files=flux_log_files, hostname_localhost=hostname_localhost, @@ -199,8 +199,8 @@ def __init__( cache_directory=cache_directory, max_cores=max_cores, resource_dict=resource_dict, + pmi_mode=pmi_mode, flux_executor=flux_executor, - executor_pmi_mode=executor_pmi_mode, flux_executor_nesting=flux_executor_nesting, flux_log_files=flux_log_files, hostname_localhost=hostname_localhost, @@ -236,7 +236,7 @@ class FluxClusterExecutor(BaseExecutor): - error_log_file (str): Name of the error log file to use for storing exceptions raised by the Python functions submitted to the Executor. pysqa_config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend). - executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only) + pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the context of an HPC cluster this essential to be able to communicate to an Executor running on a different compute node within the same allocation. And @@ -284,7 +284,7 @@ def __init__( max_cores: Optional[int] = None, resource_dict: Optional[dict] = None, pysqa_config_directory: Optional[str] = None, - executor_pmi_mode: Optional[str] = None, + pmi_mode: Optional[str] = None, hostname_localhost: Optional[bool] = None, block_allocation: bool = False, init_function: Optional[Callable] = None, @@ -319,7 +319,7 @@ def __init__( - error_log_file (str): Name of the error log file to use for storing exceptions raised by the Python functions submitted to the Executor. pysqa_config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend). - executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None + pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the context of an HPC cluster this essential to be able to communicate to an Executor running on a different compute node within the same allocation. And @@ -369,7 +369,7 @@ def __init__( cache_directory=cache_directory, resource_dict=resource_dict, flux_executor=None, - executor_pmi_mode=executor_pmi_mode, + pmi_mode=pmi_mode, flux_executor_nesting=False, flux_log_files=False, pysqa_config_directory=pysqa_config_directory, @@ -387,8 +387,8 @@ def __init__( cache_directory=cache_directory, max_cores=max_cores, resource_dict=resource_dict, + pmi_mode=None, flux_executor=None, - executor_pmi_mode=None, flux_executor_nesting=False, flux_log_files=False, hostname_localhost=hostname_localhost, @@ -408,8 +408,8 @@ def create_flux_executor( max_cores: Optional[int] = None, cache_directory: Optional[str] = None, resource_dict: Optional[dict] = None, + pmi_mode: Optional[str] = None, flux_executor=None, - executor_pmi_mode: Optional[str] = None, flux_executor_nesting: bool = False, flux_log_files: bool = False, hostname_localhost: Optional[bool] = None, @@ -437,8 +437,8 @@ def create_flux_executor( compute notes. Defaults to False. - error_log_file (str): Name of the error log file to use for storing exceptions raised by the Python functions submitted to the Executor. + pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None flux_executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux - executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None flux_executor_nesting (bool): Provide hierarchically nested Flux job scheduler inside the submitted function. flux_log_files (bool, optional): Write flux stdout and stderr files. Defaults to False. hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the @@ -470,7 +470,7 @@ def create_flux_executor( resource_dict["hostname_localhost"] = hostname_localhost resource_dict["log_obj_size"] = log_obj_size check_init_function(block_allocation=block_allocation, init_function=init_function) - check_pmi(backend="flux_allocation", pmi=executor_pmi_mode) + check_pmi(backend="flux_allocation", pmi=pmi_mode) check_oversubscribe(oversubscribe=resource_dict.get("openmpi_oversubscribe", False)) check_command_line_argument_lst( command_line_argument_lst=resource_dict.get("slurm_cmd_args", []) @@ -479,8 +479,8 @@ def create_flux_executor( del resource_dict["openmpi_oversubscribe"] if "slurm_cmd_args" in resource_dict: del resource_dict["slurm_cmd_args"] + resource_dict["pmi_mode"] = pmi_mode resource_dict["flux_executor"] = flux_executor - resource_dict["executor_pmi_mode"] = executor_pmi_mode resource_dict["flux_executor_nesting"] = flux_executor_nesting resource_dict["flux_log_files"] = flux_log_files if block_allocation: diff --git a/executorlib/executor/single.py b/executorlib/executor/single.py index bd407fab..677782a6 100644 --- a/executorlib/executor/single.py +++ b/executorlib/executor/single.py @@ -329,7 +329,7 @@ def __init__( cache_directory=cache_directory, resource_dict=resource_dict, flux_executor=None, - executor_pmi_mode=None, + pmi_mode=None, flux_executor_nesting=False, flux_log_files=False, pysqa_config_directory=None, diff --git a/executorlib/executor/slurm.py b/executorlib/executor/slurm.py index 831a7ff0..64693036 100644 --- a/executorlib/executor/slurm.py +++ b/executorlib/executor/slurm.py @@ -44,7 +44,7 @@ class SlurmClusterExecutor(BaseExecutor): - error_log_file (str): Name of the error log file to use for storing exceptions raised by the Python functions submitted to the Executor. pysqa_config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend). - executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None + pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the context of an HPC cluster this essential to be able to communicate to an Executor running on a different compute node within the same allocation. And @@ -92,7 +92,7 @@ def __init__( max_cores: Optional[int] = None, resource_dict: Optional[dict] = None, pysqa_config_directory: Optional[str] = None, - executor_pmi_mode: Optional[str] = None, + pmi_mode: Optional[str] = None, hostname_localhost: Optional[bool] = None, block_allocation: bool = False, init_function: Optional[Callable] = None, @@ -127,7 +127,7 @@ def __init__( - error_log_file (str): Name of the error log file to use for storing exceptions raised by the Python functions submitted to the Executor. pysqa_config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend). - executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None + pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the context of an HPC cluster this essential to be able to communicate to an Executor running on a different compute node within the same allocation. And @@ -176,8 +176,8 @@ def __init__( max_cores=max_cores, cache_directory=cache_directory, resource_dict=resource_dict, + pmi_mode=pmi_mode, flux_executor=None, - executor_pmi_mode=executor_pmi_mode, flux_executor_nesting=False, flux_log_files=False, pysqa_config_directory=pysqa_config_directory, @@ -282,7 +282,7 @@ def __init__( cache_directory: Optional[str] = None, max_cores: Optional[int] = None, resource_dict: Optional[dict] = None, - executor_pmi_mode: Optional[str] = None, + pmi_mode: Optional[str] = None, hostname_localhost: Optional[bool] = None, block_allocation: bool = False, init_function: Optional[Callable] = None, @@ -320,7 +320,7 @@ def __init__( compute notes. Defaults to False. - error_log_file (str): Name of the error log file to use for storing exceptions raised by the Python functions submitted to the Executor. - executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None + pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the context of an HPC cluster this essential to be able to communicate to an Executor running on a different compute node within the same allocation. And @@ -362,7 +362,7 @@ def __init__( cache_directory=cache_directory, max_cores=max_cores, resource_dict=resource_dict, - executor_pmi_mode=executor_pmi_mode, + pmi_mode=pmi_mode, hostname_localhost=hostname_localhost, block_allocation=block_allocation, init_function=init_function, @@ -383,7 +383,7 @@ def __init__( cache_directory=cache_directory, max_cores=max_cores, resource_dict=resource_dict, - executor_pmi_mode=executor_pmi_mode, + pmi_mode=pmi_mode, hostname_localhost=hostname_localhost, block_allocation=block_allocation, init_function=init_function, @@ -397,7 +397,7 @@ def create_slurm_executor( max_cores: Optional[int] = None, cache_directory: Optional[str] = None, resource_dict: Optional[dict] = None, - executor_pmi_mode: Optional[str] = None, + pmi_mode: Optional[str] = None, hostname_localhost: Optional[bool] = None, block_allocation: bool = False, init_function: Optional[Callable] = None, @@ -427,7 +427,7 @@ def create_slurm_executor( compute notes. Defaults to False. - error_log_file (str): Name of the error log file to use for storing exceptions raised by the Python functions submitted to the Executor. - executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None + pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the context of an HPC cluster this essential to be able to communicate to an Executor running on a different compute node within the same allocation. And @@ -451,7 +451,7 @@ def create_slurm_executor( resource_dict["cache_directory"] = cache_directory resource_dict["hostname_localhost"] = hostname_localhost resource_dict["log_obj_size"] = log_obj_size - resource_dict["executor_pmi_mode"] = executor_pmi_mode + resource_dict["pmi_mode"] = pmi_mode check_init_function(block_allocation=block_allocation, init_function=init_function) if block_allocation: resource_dict["init_function"] = init_function diff --git a/executorlib/standalone/command.py b/executorlib/standalone/command.py index 532af39b..ec59ac77 100644 --- a/executorlib/standalone/command.py +++ b/executorlib/standalone/command.py @@ -21,7 +21,7 @@ def get_cache_execute_command( file_name: str, cores: int = 1, backend: Optional[str] = None, - executor_pmi_mode: Optional[str] = None, + pmi_mode: Optional[str] = None, ) -> list: """ Get command to call backend as a list of two strings @@ -30,7 +30,7 @@ def get_cache_execute_command( file_name (str): The name of the file. cores (int, optional): Number of cores used to execute the task. Defaults to 1. backend (str, optional): name of the backend used to spawn tasks ["slurm", "flux"]. - executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only) + pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only) Returns: list[str]: List of strings containing the python executable path and the backend script to execute @@ -45,8 +45,8 @@ def get_cache_execute_command( ) elif backend == "slurm": command_prepend = ["srun", "-n", str(cores)] - if executor_pmi_mode is not None: - command_prepend += ["--mpi=pmi2"] + if pmi_mode is not None: + command_prepend += ["--mpi=" + pmi_mode] command_lst = ( command_prepend + command_lst diff --git a/executorlib/standalone/inputcheck.py b/executorlib/standalone/inputcheck.py index 95b306b5..6f6ab763 100644 --- a/executorlib/standalone/inputcheck.py +++ b/executorlib/standalone/inputcheck.py @@ -146,8 +146,8 @@ def check_hostname_localhost(hostname_localhost: Optional[bool]) -> None: ) -def check_executor_pmi_mode(executor_pmi_mode: Optional[str]) -> None: - if executor_pmi_mode is not None: +def check_pmi_mode(pmi_mode: Optional[str]) -> None: + if pmi_mode is not None: raise ValueError( "The option to specify the pmi mode is not available on a local workstation, it requires SLURM or flux." ) diff --git a/executorlib/task_scheduler/file/shared.py b/executorlib/task_scheduler/file/shared.py index 81922733..f2662e40 100644 --- a/executorlib/task_scheduler/file/shared.py +++ b/executorlib/task_scheduler/file/shared.py @@ -57,7 +57,7 @@ def execute_tasks_h5( pysqa_config_directory: Optional[str] = None, backend: Optional[str] = None, disable_dependencies: bool = False, - executor_pmi_mode: Optional[str] = None, + pmi_mode: Optional[str] = None, ) -> None: """ Execute tasks stored in a queue using HDF5 files. @@ -72,7 +72,7 @@ def execute_tasks_h5( pysqa_config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend). backend (str, optional): name of the backend used to spawn tasks. disable_dependencies (boolean): Disable resolving future objects during the submission. - executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only) + pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only) Returns: None @@ -157,7 +157,7 @@ def execute_tasks_h5( file_name=file_name, cores=task_resource_dict["cores"], backend=backend, - executor_pmi_mode=executor_pmi_mode, + pmi_mode=pmi_mode, ), file_name=file_name, data_dict=data_dict, diff --git a/executorlib/task_scheduler/file/task_scheduler.py b/executorlib/task_scheduler/file/task_scheduler.py index 413c5f81..18c1cb01 100644 --- a/executorlib/task_scheduler/file/task_scheduler.py +++ b/executorlib/task_scheduler/file/task_scheduler.py @@ -3,7 +3,7 @@ from executorlib.standalone.inputcheck import ( check_executor, - check_executor_pmi_mode, + check_pmi_mode, check_flux_log_files, check_hostname_localhost, check_max_workers_and_cores, @@ -34,7 +34,7 @@ def __init__( pysqa_config_directory: Optional[str] = None, backend: Optional[str] = None, disable_dependencies: bool = False, - executor_pmi_mode: Optional[str] = None, + pmi_mode: Optional[str] = None, ): """ Initialize the FileExecutor. @@ -49,7 +49,7 @@ def __init__( pysqa_config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend). backend (str, optional): name of the backend used to spawn tasks. disable_dependencies (boolean): Disable resolving future objects during the submission. - executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None + pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None """ super().__init__(max_cores=None) default_resource_dict = { @@ -70,7 +70,7 @@ def __init__( "pysqa_config_directory": pysqa_config_directory, "backend": backend, "disable_dependencies": disable_dependencies, - "executor_pmi_mode": executor_pmi_mode, + "pmi_mode": pmi_mode, } self._set_process( Thread( @@ -86,8 +86,8 @@ def create_file_executor( backend: Optional[str] = None, max_cores: Optional[int] = None, cache_directory: Optional[str] = None, + pmi_mode: Optional[str] = None, flux_executor=None, - executor_pmi_mode: Optional[str] = None, flux_executor_nesting: bool = False, flux_log_files: bool = False, pysqa_config_directory: Optional[str] = None, @@ -108,7 +108,7 @@ def create_file_executor( if cache_directory is not None: resource_dict["cache_directory"] = cache_directory if backend is None: - check_executor_pmi_mode(executor_pmi_mode=executor_pmi_mode) + check_pmi_mode(pmi_mode=pmi_mode) check_max_workers_and_cores(max_cores=max_cores, max_workers=max_workers) check_hostname_localhost(hostname_localhost=hostname_localhost) check_executor(executor=flux_executor) @@ -125,5 +125,5 @@ def create_file_executor( disable_dependencies=disable_dependencies, execute_function=execute_function, terminate_function=terminate_function, - executor_pmi_mode=executor_pmi_mode, + pmi_mode=pmi_mode, ) diff --git a/executorlib/task_scheduler/interactive/fluxspawner.py b/executorlib/task_scheduler/interactive/fluxspawner.py index 9f82cf33..848e7a8f 100644 --- a/executorlib/task_scheduler/interactive/fluxspawner.py +++ b/executorlib/task_scheduler/interactive/fluxspawner.py @@ -35,8 +35,8 @@ class FluxPythonSpawner(BaseSpawner): openmpi_oversubscribe (bool, optional): Whether to oversubscribe. Defaults to False. priority (int, optional): job urgency 0 (lowest) through 31 (highest) (default is 16). Priorities 0 through 15 are restricted to the instance owner. + pmi_mode (str, optional): The PMI option. Defaults to None. flux_executor (flux.job.FluxExecutor, optional): The FluxExecutor instance. Defaults to None. - executor_pmi_mode (str, optional): The PMI option. Defaults to None. flux_executor_nesting (bool, optional): Whether to use nested FluxExecutor. Defaults to False. flux_log_files (bool, optional): Write flux stdout and stderr files. Defaults to False. """ @@ -51,8 +51,8 @@ def __init__( exclusive: bool = False, priority: Optional[int] = None, openmpi_oversubscribe: bool = False, + pmi_mode: Optional[str] = None, flux_executor: Optional[flux.job.FluxExecutor] = None, - executor_pmi_mode: Optional[str] = None, flux_executor_nesting: bool = False, flux_log_files: bool = False, ): @@ -66,7 +66,7 @@ def __init__( self._num_nodes = num_nodes self._exclusive = exclusive self._flux_executor = flux_executor - self._executor_pmi_mode = executor_pmi_mode + self._pmi_mode = pmi_mode self._flux_executor_nesting = flux_executor_nesting self._flux_log_files = flux_log_files self._priority = priority @@ -109,8 +109,8 @@ def bootup( exclusive=self._exclusive, ) jobspec.environment = dict(os.environ) - if self._executor_pmi_mode is not None: - jobspec.setattr_shell_option("pmi", self._executor_pmi_mode) + if self._pmi_mode is not None: + jobspec.setattr_shell_option("pmi", self._pmi_mode) if self._cwd is not None: jobspec.cwd = self._cwd if self._flux_log_files and self._cwd is not None: diff --git a/executorlib/task_scheduler/interactive/slurmspawner.py b/executorlib/task_scheduler/interactive/slurmspawner.py index 9bbc5f1f..b6490657 100644 --- a/executorlib/task_scheduler/interactive/slurmspawner.py +++ b/executorlib/task_scheduler/interactive/slurmspawner.py @@ -31,7 +31,7 @@ def __init__( exclusive: bool = False, openmpi_oversubscribe: bool = False, slurm_cmd_args: Optional[list[str]] = None, - executor_pmi_mode: Optional[str] = None, + pmi_mode: Optional[str] = None, ): """ Srun interface implementation. @@ -45,7 +45,7 @@ def __init__( exclusive (bool): Whether to exclusively reserve the compute nodes, or allow sharing compute notes. Defaults to False. openmpi_oversubscribe (bool, optional): Whether to oversubscribe the cores. Defaults to False. slurm_cmd_args (list[str], optional): Additional command line arguments. Defaults to []. - executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None + pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None """ super().__init__( cwd=cwd, @@ -57,7 +57,7 @@ def __init__( self._slurm_cmd_args = slurm_cmd_args self._num_nodes = num_nodes self._exclusive = exclusive - self._executor_pmi_mode = executor_pmi_mode + self._pmi_mode = pmi_mode def generate_command(self, command_lst: list[str]) -> list[str]: """ @@ -78,7 +78,7 @@ def generate_command(self, command_lst: list[str]) -> list[str]: exclusive=self._exclusive, openmpi_oversubscribe=self._openmpi_oversubscribe, slurm_cmd_args=self._slurm_cmd_args, - executor_pmi_mode=self._executor_pmi_mode, + pmi_mode=self._pmi_mode, ) return super().generate_command( command_lst=command_prepend_lst + command_lst, @@ -94,7 +94,7 @@ def generate_slurm_command( exclusive: bool = False, openmpi_oversubscribe: bool = False, slurm_cmd_args: Optional[list[str]] = None, - executor_pmi_mode: Optional[str] = None, + pmi_mode: Optional[str] = None, ) -> list[str]: """ Generate the command list for the SLURM interface. @@ -108,7 +108,7 @@ def generate_slurm_command( exclusive (bool): Whether to exclusively reserve the compute nodes, or allow sharing compute notes. Defaults to False. openmpi_oversubscribe (bool, optional): Whether to oversubscribe the cores. Defaults to False. slurm_cmd_args (list[str], optional): Additional command line arguments. Defaults to []. - executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None + pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None Returns: list[str]: The generated command list. @@ -116,8 +116,8 @@ def generate_slurm_command( command_prepend_lst = [SLURM_COMMAND, "-n", str(cores)] if cwd is not None: command_prepend_lst += ["-D", cwd] - if executor_pmi_mode is not None: - command_prepend_lst += ["--mpi=" + executor_pmi_mode] + if pmi_mode is not None: + command_prepend_lst += ["--mpi=" + pmi_mode] if num_nodes is not None: command_prepend_lst += ["-N", str(num_nodes)] if threads_per_core > 1: diff --git a/notebooks/3-hpc-job.ipynb b/notebooks/3-hpc-job.ipynb index ecabecc4..0c266722 100644 --- a/notebooks/3-hpc-job.ipynb +++ b/notebooks/3-hpc-job.ipynb @@ -119,7 +119,7 @@ "source": [ "from executorlib import FluxJobExecutor\n", "\n", - "with FluxJobExecutor(executor_pmi_mode=\"pmix\") as exe:\n", + "with FluxJobExecutor(pmi_mode=\"pmix\") as exe:\n", " fs = exe.submit(calc_mpi, 3, resource_dict={\"cores\": 2})\n", " print(fs.result())" ] @@ -177,7 +177,7 @@ ], "source": [ "with FluxJobExecutor(\n", - " executor_pmi_mode=\"pmix\",\n", + " pmi_mode=\"pmix\",\n", " max_workers=2,\n", " init_function=init_function,\n", " block_allocation=True,\n", @@ -235,7 +235,7 @@ } ], "source": [ - "with FluxJobExecutor(executor_pmi_mode=\"pmix\") as exe:\n", + "with FluxJobExecutor(pmi_mode=\"pmix\") as exe:\n", " future = 0\n", " for i in range(1, 4):\n", " future = exe.submit(add_funct, i, future)\n", @@ -268,7 +268,7 @@ } ], "source": [ - "with FluxJobExecutor(executor_pmi_mode=\"pmix\", cache_directory=\"./file\") as exe:\n", + "with FluxJobExecutor(pmi_mode=\"pmix\", cache_directory=\"./file\") as exe:\n", " future_lst = [exe.submit(sum, [i, i]) for i in range(1, 4)]\n", " print([f.result() for f in future_lst])" ] @@ -323,7 +323,7 @@ "def calc_nested():\n", " from executorlib import FluxJobExecutor\n", "\n", - " with FluxJobExecutor(executor_pmi_mode=\"pmix\") as exe:\n", + " with FluxJobExecutor(pmi_mode=\"pmix\") as exe:\n", " fs = exe.submit(sum, [1, 1])\n", " return fs.result()" ] @@ -345,7 +345,7 @@ } ], "source": [ - "with FluxJobExecutor(executor_pmi_mode=\"pmix\", flux_executor_nesting=True) as exe:\n", + "with FluxJobExecutor(pmi_mode=\"pmix\", flux_executor_nesting=True) as exe:\n", " fs = exe.submit(calc_nested)\n", " print(fs.result())" ] diff --git a/tests/test_fluxclusterexecutor.py b/tests/test_fluxclusterexecutor.py index 9c721b96..107d5add 100644 --- a/tests/test_fluxclusterexecutor.py +++ b/tests/test_fluxclusterexecutor.py @@ -41,7 +41,7 @@ def test_executor(self): resource_dict={"cores": 2, "cwd": "executorlib_cache"}, block_allocation=False, cache_directory="executorlib_cache", - executor_pmi_mode=pmi, + pmi_mode=pmi, ) as exe: cloudpickle_register(ind=1) fs1 = exe.submit(mpi_funct, 1) @@ -55,7 +55,7 @@ def test_executor_no_cwd(self): resource_dict={"cores": 2}, block_allocation=False, cache_directory="executorlib_cache", - executor_pmi_mode=pmi, + pmi_mode=pmi, ) as exe: cloudpickle_register(ind=1) fs1 = exe.submit(mpi_funct, 1) @@ -83,7 +83,7 @@ def test_executor_existing_files(self): resource_dict={"cores": 2, "cwd": "executorlib_cache"}, block_allocation=False, cache_directory="executorlib_cache", - executor_pmi_mode=pmi, + pmi_mode=pmi, ) as exe: cloudpickle_register(ind=1) fs1 = exe.submit(mpi_funct, 1) @@ -102,7 +102,7 @@ def test_executor_existing_files(self): resource_dict={"cores": 2, "cwd": "executorlib_cache"}, block_allocation=False, cache_directory="executorlib_cache", - executor_pmi_mode=pmi, + pmi_mode=pmi, ) as exe: cloudpickle_register(ind=1) fs1 = exe.submit(mpi_funct, 1) diff --git a/tests/test_fluxjobexecutor.py b/tests/test_fluxjobexecutor.py index 2aeaef2d..03d56831 100644 --- a/tests/test_fluxjobexecutor.py +++ b/tests/test_fluxjobexecutor.py @@ -90,7 +90,7 @@ def test_flux_executor_parallel(self): resource_dict={"cores": 2}, flux_executor=self.executor, block_allocation=True, - executor_pmi_mode=pmi, + pmi_mode=pmi, ) as exe: fs_1 = exe.submit(mpi_funct, 1) self.assertEqual(fs_1.result(), [(1, 2, 0), (1, 2, 1)]) @@ -102,7 +102,7 @@ def test_single_task(self): resource_dict={"cores": 2}, flux_executor=self.executor, block_allocation=True, - executor_pmi_mode=pmi, + pmi_mode=pmi, ) as p: output = p.map(mpi_funct, [1, 2, 3]) self.assertEqual( diff --git a/tests/test_fluxpythonspawner.py b/tests/test_fluxpythonspawner.py index a29e6c3a..01f1d160 100644 --- a/tests/test_fluxpythonspawner.py +++ b/tests/test_fluxpythonspawner.py @@ -82,7 +82,7 @@ def test_flux_executor_parallel(self): executor_kwargs={ "flux_executor": self.flux_executor, "cores": 2, - "executor_pmi_mode": pmi, + "pmi_mode": pmi, }, spawner=FluxPythonSpawner, ) as exe: @@ -96,7 +96,7 @@ def test_single_task(self): executor_kwargs={ "flux_executor": self.flux_executor, "cores": 2, - "executor_pmi_mode": pmi, + "pmi_mode": pmi, }, spawner=FluxPythonSpawner, ) as p: diff --git a/tests/test_slurmclusterexecutor.py b/tests/test_slurmclusterexecutor.py index 998711b2..4973037d 100644 --- a/tests/test_slurmclusterexecutor.py +++ b/tests/test_slurmclusterexecutor.py @@ -50,7 +50,7 @@ def test_executor(self): resource_dict={"cores": 2, "cwd": "executorlib_cache", "submission_template": submission_template}, block_allocation=False, cache_directory="executorlib_cache", - executor_pmi_mode="pmi2", + pmi_mode="pmi2", ) as exe: cloudpickle_register(ind=1) fs1 = exe.submit(mpi_funct, 1) @@ -64,7 +64,7 @@ def test_executor_no_cwd(self): resource_dict={"cores": 2, "submission_template": submission_template}, block_allocation=False, cache_directory="executorlib_cache", - executor_pmi_mode="pmi2", + pmi_mode="pmi2", ) as exe: cloudpickle_register(ind=1) fs1 = exe.submit(mpi_funct, 1) @@ -78,7 +78,7 @@ def test_executor_existing_files(self): resource_dict={"cores": 2, "cwd": "executorlib_cache", "submission_template": submission_template}, block_allocation=False, cache_directory="executorlib_cache", - executor_pmi_mode="pmi2", + pmi_mode="pmi2", ) as exe: cloudpickle_register(ind=1) fs1 = exe.submit(mpi_funct, 1) @@ -97,7 +97,7 @@ def test_executor_existing_files(self): resource_dict={"cores": 2, "cwd": "executorlib_cache", "submission_template": submission_template}, block_allocation=False, cache_directory="executorlib_cache", - executor_pmi_mode="pmi2", + pmi_mode="pmi2", ) as exe: cloudpickle_register(ind=1) fs1 = exe.submit(mpi_funct, 1) diff --git a/tests/test_standalone_command.py b/tests/test_standalone_command.py index f0b50a2c..d1bb55f1 100644 --- a/tests/test_standalone_command.py +++ b/tests/test_standalone_command.py @@ -51,7 +51,7 @@ def test_get_cache_execute_command_parallel(self): self.assertEqual(output[3], sys.executable) self.assertEqual(output[4].split(os.sep)[-1], "cache_parallel.py") self.assertEqual(output[5], file_name) - output = get_cache_execute_command(cores=2, file_name=file_name, backend="slurm", executor_pmi_mode="pmi2") + output = get_cache_execute_command(cores=2, file_name=file_name, backend="slurm", pmi_mode="pmi2") self.assertEqual(output[0], "srun") self.assertEqual(output[1], "-n") self.assertEqual(output[2], str(2)) @@ -74,7 +74,7 @@ def test_get_cache_execute_command_parallel(self): self.assertEqual(output[4], sys.executable) self.assertEqual(output[5].split(os.sep)[-1], "cache_parallel.py") self.assertEqual(output[6], file_name) - output = get_cache_execute_command(cores=2, file_name=file_name, backend="flux", executor_pmi_mode="pmix") + output = get_cache_execute_command(cores=2, file_name=file_name, backend="flux", pmi_mode="pmix") self.assertEqual(output[0], "flux") self.assertEqual(output[1], "run") self.assertEqual(output[2], "-o") diff --git a/tests/test_standalone_inputcheck.py b/tests/test_standalone_inputcheck.py index 5d7413fc..38fa896c 100644 --- a/tests/test_standalone_inputcheck.py +++ b/tests/test_standalone_inputcheck.py @@ -13,7 +13,7 @@ check_refresh_rate, check_resource_dict, check_resource_dict_is_empty, - check_executor_pmi_mode, + check_pmi_mode, check_max_workers_and_cores, check_hostname_localhost, check_pysqa_config_directory, @@ -77,9 +77,9 @@ def test_check_plot_dependency_graph(self): with self.assertRaises(ValueError): check_plot_dependency_graph(plot_dependency_graph=True) - def test_check_executor_pmi_mode(self): + def test_check_pmi_mode(self): with self.assertRaises(ValueError): - check_executor_pmi_mode(executor_pmi_mode="test") + check_pmi_mode(pmi_mode="test") def test_check_max_workers_and_cores(self): with self.assertRaises(ValueError): diff --git a/tests/test_standalone_interactive_backend.py b/tests/test_standalone_interactive_backend.py index bada59bd..c2306cae 100644 --- a/tests/test_standalone_interactive_backend.py +++ b/tests/test_standalone_interactive_backend.py @@ -102,7 +102,7 @@ def test_command_slurm_user_command(self): gpus_per_core=1, openmpi_oversubscribe=True, slurm_cmd_args=["--account=test", "--job-name=executorlib"], - executor_pmi_mode="pmi2", + pmi_mode="pmi2", ) self.assertEqual( command_lst, From 029bd8a8a3af2ad1ddcea36bcb45d5b97e95eafd Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sun, 27 Jul 2025 18:25:40 +0000 Subject: [PATCH 29/30] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- executorlib/task_scheduler/file/task_scheduler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/executorlib/task_scheduler/file/task_scheduler.py b/executorlib/task_scheduler/file/task_scheduler.py index 18c1cb01..587b0e0a 100644 --- a/executorlib/task_scheduler/file/task_scheduler.py +++ b/executorlib/task_scheduler/file/task_scheduler.py @@ -3,11 +3,11 @@ from executorlib.standalone.inputcheck import ( check_executor, - check_pmi_mode, check_flux_log_files, check_hostname_localhost, check_max_workers_and_cores, check_nested_flux_executor, + check_pmi_mode, ) from executorlib.task_scheduler.base import TaskSchedulerBase from executorlib.task_scheduler.file.shared import execute_tasks_h5 From 27735e90c7dcc861c48764fc0666bdb8eee115ee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sun, 27 Jul 2025 20:26:52 +0200 Subject: [PATCH 30/30] fixes --- executorlib/executor/slurm.py | 2 +- executorlib/standalone/command.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/executorlib/executor/slurm.py b/executorlib/executor/slurm.py index 64693036..3a4e202b 100644 --- a/executorlib/executor/slurm.py +++ b/executorlib/executor/slurm.py @@ -235,7 +235,7 @@ class SlurmJobExecutor(BaseExecutor): compute notes. Defaults to False. - error_log_file (str): Name of the error log file to use for storing exceptions raised by the Python functions submitted to the Executor. - executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None + pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the context of an HPC cluster this essential to be able to communicate to an Executor running on a different compute node within the same allocation. And diff --git a/executorlib/standalone/command.py b/executorlib/standalone/command.py index ec59ac77..68af9abc 100644 --- a/executorlib/standalone/command.py +++ b/executorlib/standalone/command.py @@ -54,8 +54,8 @@ def get_cache_execute_command( ) elif backend == "flux": flux_command = ["flux", "run"] - if executor_pmi_mode is not None: - flux_command += ["-o", "pmi=" + executor_pmi_mode] + if pmi_mode is not None: + flux_command += ["-o", "pmi=" + pmi_mode] command_lst = ( flux_command + ["-n", str(cores)]