Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
6edd2c1
Debug slurm pmi options
jan-janssen Jul 27, 2025
756e55c
Update pipeline.yml
jan-janssen Jul 27, 2025
34c87d3
Merge remote-tracking branch 'origin/parallel_file_executor' into slu…
jan-janssen Jul 27, 2025
ec507ec
disable MPI parallel test
jan-janssen Jul 27, 2025
b3ae43e
sinfo
jan-janssen Jul 27, 2025
21afba9
update output
jan-janssen Jul 27, 2025
d38dda3
add mpi parallel test
jan-janssen Jul 27, 2025
848da88
core validation is only possible on compute node not on login node
jan-janssen Jul 27, 2025
48000d4
fix test
jan-janssen Jul 27, 2025
a86b5df
enforce pmix for testing
jan-janssen Jul 27, 2025
2990d11
Update slurmspawner.py
jan-janssen Jul 27, 2025
dbcd364
downgrade to openmpi
jan-janssen Jul 27, 2025
2084b99
Merge remote-tracking branch 'origin/slurm_mpi_interface' into slurm_…
jan-janssen Jul 27, 2025
4b374b4
remove pmi setting
jan-janssen Jul 27, 2025
1571c6a
use slurm environment
jan-janssen Jul 27, 2025
b3b393c
Try pmix again
jan-janssen Jul 27, 2025
0dc0ecc
Update slurmspawner.py
jan-janssen Jul 27, 2025
97125e5
Update slurmspawner.py
jan-janssen Jul 27, 2025
58d152f
Update pipeline.yml
jan-janssen Jul 27, 2025
fddf4a6
Update pipeline.yml
jan-janssen Jul 27, 2025
0d248b7
Update slurmspawner.py
jan-janssen Jul 27, 2025
0c2d3f7
Update pipeline.yml
jan-janssen Jul 27, 2025
1c89a16
use slrum args
jan-janssen Jul 27, 2025
fcf4394
extend test
jan-janssen Jul 27, 2025
1d33bb5
fix tests
jan-janssen Jul 27, 2025
3e08db9
Add executor_pmi_mode option
jan-janssen Jul 27, 2025
f4102d8
check all files
jan-janssen Jul 27, 2025
0b693cf
fixes
jan-janssen Jul 27, 2025
4e73041
extend tests
jan-janssen Jul 27, 2025
21e4465
rename to pmi_mode
jan-janssen Jul 27, 2025
029bd8a
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jul 27, 2025
27735e9
fixes
jan-janssen Jul 27, 2025
98cc5f5
Merge remote-tracking branch 'origin/pmix' into pmix
jan-janssen Jul 27, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion .github/workflows/pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,8 @@ jobs:
- uses: actions/checkout@v4
- uses: koesterlab/setup-slurm-action@v1
timeout-minutes: 5
- name: ubnuntu install
run: sudo apt install -y mpich
- name: Conda config
shell: bash -l {0}
run: echo -e "channels:\n - conda-forge\n" > .condarc
Expand All @@ -295,8 +297,10 @@ jobs:
run: |
pip install . --no-deps --no-build-isolation
cd tests
python -m unittest test_slurmclusterexecutor.py
sinfo -o "%n %e %m %a %c %C"
srun --mpi=list
python -m unittest test_slurmjobexecutor.py
python -m unittest test_slurmclusterexecutor.py

unittest_mpich:
needs: [black]
Expand Down
2 changes: 1 addition & 1 deletion docs/installation.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ For the version 5 of openmpi the backend changed to `pmix`, this requires the ad
```
conda install -c conda-forge flux-core flux-sched flux-pmix openmpi>=5 executorlib
```
In addition, the `flux_executor_pmi_mode="pmix"` parameter has to be set for the `FluxJobExecutor` or the
In addition, the `pmi_mode="pmix"` parameter has to be set for the `FluxJobExecutor` or the
`FluxClusterExecutor` to switch to `pmix` as backend.

### Test Flux Framework
Expand Down
28 changes: 14 additions & 14 deletions executorlib/executor/flux.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ class FluxJobExecutor(BaseExecutor):
compute notes. Defaults to False.
- error_log_file (str): Name of the error log file to use for storing exceptions raised
by the Python functions submitted to the Executor.
pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None
flux_executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux
flux_executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only)
flux_executor_nesting (bool): Provide hierarchically nested Flux job scheduler inside the submitted function.
flux_log_files (bool, optional): Write flux stdout and stderr files. Defaults to False.
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
Expand Down Expand Up @@ -93,8 +93,8 @@ def __init__(
cache_directory: Optional[str] = None,
max_cores: Optional[int] = None,
resource_dict: Optional[dict] = None,
pmi_mode: Optional[str] = None,
flux_executor=None,
flux_executor_pmi_mode: Optional[str] = None,
flux_executor_nesting: bool = False,
flux_log_files: bool = False,
hostname_localhost: Optional[bool] = None,
Expand Down Expand Up @@ -130,8 +130,8 @@ def __init__(
compute notes. Defaults to False.
- error_log_file (str): Name of the error log file to use for storing exceptions
raised by the Python functions submitted to the Executor.
pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None
flux_executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux
flux_executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only)
flux_executor_nesting (bool): Provide hierarchically nested Flux job scheduler inside the submitted function.
flux_log_files (bool, optional): Write flux stdout and stderr files. Defaults to False.
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
Expand Down Expand Up @@ -175,8 +175,8 @@ def __init__(
cache_directory=cache_directory,
max_cores=max_cores,
resource_dict=resource_dict,
pmi_mode=pmi_mode,
flux_executor=flux_executor,
flux_executor_pmi_mode=flux_executor_pmi_mode,
flux_executor_nesting=flux_executor_nesting,
flux_log_files=flux_log_files,
hostname_localhost=hostname_localhost,
Expand All @@ -199,8 +199,8 @@ def __init__(
cache_directory=cache_directory,
max_cores=max_cores,
resource_dict=resource_dict,
pmi_mode=pmi_mode,
flux_executor=flux_executor,
flux_executor_pmi_mode=flux_executor_pmi_mode,
flux_executor_nesting=flux_executor_nesting,
flux_log_files=flux_log_files,
hostname_localhost=hostname_localhost,
Expand Down Expand Up @@ -236,7 +236,7 @@ class FluxClusterExecutor(BaseExecutor):
- error_log_file (str): Name of the error log file to use for storing exceptions raised
by the Python functions submitted to the Executor.
pysqa_config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend).
flux_executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only)
pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
context of an HPC cluster this essential to be able to communicate to an
Executor running on a different compute node within the same allocation. And
Expand Down Expand Up @@ -284,7 +284,7 @@ def __init__(
max_cores: Optional[int] = None,
resource_dict: Optional[dict] = None,
pysqa_config_directory: Optional[str] = None,
flux_executor_pmi_mode: Optional[str] = None,
pmi_mode: Optional[str] = None,
hostname_localhost: Optional[bool] = None,
block_allocation: bool = False,
init_function: Optional[Callable] = None,
Expand Down Expand Up @@ -319,7 +319,7 @@ def __init__(
- error_log_file (str): Name of the error log file to use for storing exceptions
raised by the Python functions submitted to the Executor.
pysqa_config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend).
flux_executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only)
pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
context of an HPC cluster this essential to be able to communicate to an
Executor running on a different compute node within the same allocation. And
Expand Down Expand Up @@ -369,7 +369,7 @@ def __init__(
cache_directory=cache_directory,
resource_dict=resource_dict,
flux_executor=None,
flux_executor_pmi_mode=flux_executor_pmi_mode,
pmi_mode=pmi_mode,
flux_executor_nesting=False,
flux_log_files=False,
pysqa_config_directory=pysqa_config_directory,
Expand All @@ -387,8 +387,8 @@ def __init__(
cache_directory=cache_directory,
max_cores=max_cores,
resource_dict=resource_dict,
pmi_mode=None,
flux_executor=None,
flux_executor_pmi_mode=None,
flux_executor_nesting=False,
flux_log_files=False,
hostname_localhost=hostname_localhost,
Expand All @@ -408,8 +408,8 @@ def create_flux_executor(
max_cores: Optional[int] = None,
cache_directory: Optional[str] = None,
resource_dict: Optional[dict] = None,
pmi_mode: Optional[str] = None,
flux_executor=None,
flux_executor_pmi_mode: Optional[str] = None,
flux_executor_nesting: bool = False,
flux_log_files: bool = False,
hostname_localhost: Optional[bool] = None,
Expand Down Expand Up @@ -437,8 +437,8 @@ def create_flux_executor(
compute notes. Defaults to False.
- error_log_file (str): Name of the error log file to use for storing exceptions raised
by the Python functions submitted to the Executor.
pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None
flux_executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux
flux_executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only)
flux_executor_nesting (bool): Provide hierarchically nested Flux job scheduler inside the submitted function.
flux_log_files (bool, optional): Write flux stdout and stderr files. Defaults to False.
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
Expand Down Expand Up @@ -470,7 +470,7 @@ def create_flux_executor(
resource_dict["hostname_localhost"] = hostname_localhost
resource_dict["log_obj_size"] = log_obj_size
check_init_function(block_allocation=block_allocation, init_function=init_function)
check_pmi(backend="flux_allocation", pmi=flux_executor_pmi_mode)
check_pmi(backend="flux_allocation", pmi=pmi_mode)
check_oversubscribe(oversubscribe=resource_dict.get("openmpi_oversubscribe", False))
check_command_line_argument_lst(
command_line_argument_lst=resource_dict.get("slurm_cmd_args", [])
Expand All @@ -479,8 +479,8 @@ def create_flux_executor(
del resource_dict["openmpi_oversubscribe"]
if "slurm_cmd_args" in resource_dict:
del resource_dict["slurm_cmd_args"]
resource_dict["pmi_mode"] = pmi_mode
resource_dict["flux_executor"] = flux_executor
resource_dict["flux_executor_pmi_mode"] = flux_executor_pmi_mode
resource_dict["flux_executor_nesting"] = flux_executor_nesting
resource_dict["flux_log_files"] = flux_log_files
if block_allocation:
Expand Down
2 changes: 1 addition & 1 deletion executorlib/executor/single.py
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ def __init__(
cache_directory=cache_directory,
resource_dict=resource_dict,
flux_executor=None,
flux_executor_pmi_mode=None,
pmi_mode=None,
flux_executor_nesting=False,
flux_log_files=False,
pysqa_config_directory=None,
Expand Down
13 changes: 12 additions & 1 deletion executorlib/executor/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class SlurmClusterExecutor(BaseExecutor):
- error_log_file (str): Name of the error log file to use for storing exceptions raised
by the Python functions submitted to the Executor.
pysqa_config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend).
pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
context of an HPC cluster this essential to be able to communicate to an
Executor running on a different compute node within the same allocation. And
Expand Down Expand Up @@ -91,6 +92,7 @@ def __init__(
max_cores: Optional[int] = None,
resource_dict: Optional[dict] = None,
pysqa_config_directory: Optional[str] = None,
pmi_mode: Optional[str] = None,
hostname_localhost: Optional[bool] = None,
block_allocation: bool = False,
init_function: Optional[Callable] = None,
Expand Down Expand Up @@ -125,6 +127,7 @@ def __init__(
- error_log_file (str): Name of the error log file to use for storing exceptions
raised by the Python functions submitted to the Executor.
pysqa_config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend).
pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
context of an HPC cluster this essential to be able to communicate to an
Executor running on a different compute node within the same allocation. And
Expand Down Expand Up @@ -173,8 +176,8 @@ def __init__(
max_cores=max_cores,
cache_directory=cache_directory,
resource_dict=resource_dict,
pmi_mode=pmi_mode,
flux_executor=None,
flux_executor_pmi_mode=None,
flux_executor_nesting=False,
flux_log_files=False,
pysqa_config_directory=pysqa_config_directory,
Expand Down Expand Up @@ -232,6 +235,7 @@ class SlurmJobExecutor(BaseExecutor):
compute notes. Defaults to False.
- error_log_file (str): Name of the error log file to use for storing exceptions raised
by the Python functions submitted to the Executor.
pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
context of an HPC cluster this essential to be able to communicate to an
Executor running on a different compute node within the same allocation. And
Expand Down Expand Up @@ -278,6 +282,7 @@ def __init__(
cache_directory: Optional[str] = None,
max_cores: Optional[int] = None,
resource_dict: Optional[dict] = None,
pmi_mode: Optional[str] = None,
hostname_localhost: Optional[bool] = None,
block_allocation: bool = False,
init_function: Optional[Callable] = None,
Expand Down Expand Up @@ -315,6 +320,7 @@ def __init__(
compute notes. Defaults to False.
- error_log_file (str): Name of the error log file to use for storing exceptions
raised by the Python functions submitted to the Executor.
pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
context of an HPC cluster this essential to be able to communicate to an
Executor running on a different compute node within the same allocation. And
Expand Down Expand Up @@ -356,6 +362,7 @@ def __init__(
cache_directory=cache_directory,
max_cores=max_cores,
resource_dict=resource_dict,
pmi_mode=pmi_mode,
hostname_localhost=hostname_localhost,
block_allocation=block_allocation,
init_function=init_function,
Expand All @@ -376,6 +383,7 @@ def __init__(
cache_directory=cache_directory,
max_cores=max_cores,
resource_dict=resource_dict,
pmi_mode=pmi_mode,
hostname_localhost=hostname_localhost,
block_allocation=block_allocation,
init_function=init_function,
Expand All @@ -389,6 +397,7 @@ def create_slurm_executor(
max_cores: Optional[int] = None,
cache_directory: Optional[str] = None,
resource_dict: Optional[dict] = None,
pmi_mode: Optional[str] = None,
hostname_localhost: Optional[bool] = None,
block_allocation: bool = False,
init_function: Optional[Callable] = None,
Expand Down Expand Up @@ -418,6 +427,7 @@ def create_slurm_executor(
compute notes. Defaults to False.
- error_log_file (str): Name of the error log file to use for storing exceptions raised
by the Python functions submitted to the Executor.
pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
context of an HPC cluster this essential to be able to communicate to an
Executor running on a different compute node within the same allocation. And
Expand All @@ -441,6 +451,7 @@ def create_slurm_executor(
resource_dict["cache_directory"] = cache_directory
resource_dict["hostname_localhost"] = hostname_localhost
resource_dict["log_obj_size"] = log_obj_size
resource_dict["pmi_mode"] = pmi_mode
check_init_function(block_allocation=block_allocation, init_function=init_function)
if block_allocation:
resource_dict["init_function"] = init_function
Expand Down
24 changes: 10 additions & 14 deletions executorlib/standalone/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def get_cache_execute_command(
file_name: str,
cores: int = 1,
backend: Optional[str] = None,
flux_executor_pmi_mode: Optional[str] = None,
pmi_mode: Optional[str] = None,
) -> list:
"""
Get command to call backend as a list of two strings
Expand All @@ -30,7 +30,7 @@ def get_cache_execute_command(
file_name (str): The name of the file.
cores (int, optional): Number of cores used to execute the task. Defaults to 1.
backend (str, optional): name of the backend used to spawn tasks ["slurm", "flux"].
flux_executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only)
pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only)

Returns:
list[str]: List of strings containing the python executable path and the backend script to execute
Expand All @@ -44,25 +44,21 @@ def get_cache_execute_command(
+ [get_command_path(executable="cache_parallel.py"), file_name]
)
elif backend == "slurm":
command_prepend = ["srun", "-n", str(cores)]
if pmi_mode is not None:
command_prepend += ["--mpi=" + pmi_mode]
command_lst = (
["srun", "-n", str(cores)]
command_prepend
+ command_lst
+ [get_command_path(executable="cache_parallel.py"), file_name]
)
elif backend == "flux":
if flux_executor_pmi_mode is not None:
flux_command = [
"flux",
"run",
"-o",
"pmi=" + flux_executor_pmi_mode,
"-n",
str(cores),
]
else:
flux_command = ["flux", "run", "-n", str(cores)]
flux_command = ["flux", "run"]
if pmi_mode is not None:
flux_command += ["-o", "pmi=" + pmi_mode]
command_lst = (
flux_command
+ ["-n", str(cores)]
+ command_lst
+ [get_command_path(executable="cache_parallel.py"), file_name]
)
Expand Down
Loading
Loading