Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
59e6bc5
Update environment-openmpi.yml
jan-janssen May 28, 2024
5a9fc5f
Update environment.yml
jan-janssen May 28, 2024
82e84d6
Update benchmark.yml
jan-janssen May 28, 2024
6eb85b2
Merge branch 'main' into release_openmpi_constraint
jan-janssen May 28, 2024
ba235de
Update benchmark.yml
jan-janssen May 28, 2024
c8b104b
Update unittest-flux.yml
jan-janssen May 28, 2024
d2c3acd
Update environment.yml
jan-janssen May 28, 2024
a4c50f3
Update unittest-flux.yml
jan-janssen May 28, 2024
4949934
Update unittest-flux.yml
jan-janssen May 28, 2024
59edd5e
Update unittest-flux.yml
jan-janssen May 28, 2024
fa2f4a6
Update unittest-flux.yml
jan-janssen May 28, 2024
e75b03d
Update unittest-flux.yml
jan-janssen May 28, 2024
10bb826
Update flux.py
jan-janssen May 28, 2024
885c030
Update flux.py
jan-janssen May 28, 2024
0051f63
Docstring clean up
jan-janssen May 29, 2024
9f3bb2a
Add type hints for input check
jan-janssen May 29, 2024
00b1011
update initialization
jan-janssen May 29, 2024
c535816
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] May 29, 2024
d6e69dd
Merge remote-tracking branch 'refs/remotes/origin/main' into release_…
jan-janssen May 29, 2024
37e0554
check for None
jan-janssen May 29, 2024
eaa2e74
add validate_backend()
jan-janssen May 29, 2024
056653c
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] May 29, 2024
5446731
Add files via upload
jan-janssen May 29, 2024
1caa277
update documentation
jan-janssen May 29, 2024
9377818
Merge remote-tracking branch 'origin/release_openmpi_constraint' into…
jan-janssen May 29, 2024
ed541e0
Update continous integration environment
jan-janssen May 29, 2024
4af4e97
Merge remote-tracking branch 'refs/remotes/origin/main' into release_…
jan-janssen May 29, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .ci_support/environment-openmpi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ channels:
dependencies:
- python
- numpy
- openmpi =4.1.6
- openmpi
- cloudpickle =3.0.0
- mpi4py =3.1.6
- tqdm =4.66.4
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/benchmark.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,4 @@ jobs:
cat timing.log
python -m unittest tests/benchmark/test_results.py
env:
OMPI_MCA_rmaps_base_oversubscribe: 'yes'
PRTE_MCA_rmaps_default_mapping_policy: ':oversubscribe'
40 changes: 40 additions & 0 deletions .github/workflows/unittest-flux-mpich.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
name: Unittests-flux-mpich

on:
push:
branches: [ main ]
pull_request:
branches: [ main ]

jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Extend environment
shell: bash -l {0}
timeout-minutes: 5
run: |
echo -e '- coverage\n- flux-core =0.59.0\n- versioneer =0.28'>> .ci_support/environment-mpich.yml
cat .ci_support/environment-mpich.yml
- uses: conda-incubator/setup-miniconda@v2.2.0
with:
python-version: '3.12'
mamba-version: "*"
channels: conda-forge
miniforge-variant: Mambaforge
channel-priority: strict
auto-update-conda: true
environment-file: .ci_support/environment-mpich.yml
- name: Test
shell: bash -l {0}
timeout-minutes: 5
run: |
pip install . --no-deps --no-build-isolation
python -m unittest discover tests
- name: Test Flux
shell: bash -l {0}
timeout-minutes: 5
run: >
flux start
python -m unittest tests/test_flux_executor.py tests/test_executor_backend_flux.py;
45 changes: 45 additions & 0 deletions .github/workflows/unittest-flux-openmpi.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
name: Unittests-flux-openmpi

on:
push:
branches: [ main ]
pull_request:
branches: [ main ]

jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Extend environment
shell: bash -l {0}
timeout-minutes: 5
run: |
echo -e '- coverage\n- flux-core =0.59.0\n- flux-pmix=0.5.0\n- versioneer =0.28' >> .ci_support/environment-openmpi.yml
cat .ci_support/environment-openmpi.yml
- uses: conda-incubator/setup-miniconda@v2.2.0
with:
python-version: '3.12'
mamba-version: "*"
channels: conda-forge
miniforge-variant: Mambaforge
channel-priority: strict
auto-update-conda: true
environment-file: .ci_support/environment-openmpi.yml
- name: Test
shell: bash -l {0}
timeout-minutes: 5
run: |
pip install . --no-deps --no-build-isolation
coverage run -a --omit="pympipool/_version.py,tests/*" -m unittest discover tests
- name: Test Flux with OpenMPI
shell: bash -l {0}
timeout-minutes: 5
run: >
flux start
coverage run -a --omit="pympipool/_version.py,tests/*" -m unittest tests/test_flux_executor.py tests/test_executor_backend_flux.py;
coverage xml
env:
PYMPIPOOL_PMIX: "pmix"
- name: Coveralls
uses: coverallsapp/github-action@v2
58 changes: 0 additions & 58 deletions .github/workflows/unittest-flux.yml

This file was deleted.

3 changes: 2 additions & 1 deletion binder/environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ channels:
dependencies:
- python
- numpy
- openmpi =4.1.6
- openmpi
- cloudpickle =3.0.0
- mpi4py =3.1.6
- tqdm =4.66.2
- pyzmq =26.0.0
- flux-core =0.59.0
- flux-pmix =0.5.0
- versioneer =0.28
7 changes: 6 additions & 1 deletion docs/installation.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,13 @@ conda install -c conda-forge flux-core flux-sched mpich=>4 pympipool
Flux is not limited to mpich / cray mpi, it can also be installed in compatibility with openmpi or intel mpi using the
openmpi package:
```
conda install -c conda-forge flux-core flux-sched openmpi pympipool
conda install -c conda-forge flux-core flux-sched openmpi=4.1.6 pympipool
```
For the version 5 of openmpi the backend changed to `pmix`, this requires the additional `flux-pmix` plugin:
```
conda install -c conda-forge flux-core flux-sched flux-pmix openmpi>=5 pympipool
```
In addition, the `pmi="pmix"` parameter has to be set for the `pympipool.Executor` to switch to `pmix` as backend.

## Test Flux Framework
To validate the installation of flux and confirm the GPUs are correctly recognized, you can start a flux session on the
Expand Down
2 changes: 1 addition & 1 deletion notebooks/examples.ipynb

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions pympipool/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class Executor:
function.
init_function (None): optional function to preset arguments for functions which are submitted later
command_line_argument_lst (list): Additional command line arguments for the srun call (SLURM only)
pmi (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only)

Examples:
```
Expand Down Expand Up @@ -85,6 +86,7 @@ def __init__(
block_allocation: bool = True,
init_function: Optional[callable] = None,
command_line_argument_lst: list[str] = [],
pmi: Optional[str] = None,
disable_dependencies: bool = False,
refresh_rate: float = 0.01,
):
Expand All @@ -106,6 +108,7 @@ def __new__(
block_allocation: bool = False,
init_function: Optional[callable] = None,
command_line_argument_lst: list[str] = [],
pmi: Optional[str] = None,
disable_dependencies: bool = False,
refresh_rate: float = 0.01,
):
Expand Down Expand Up @@ -143,6 +146,7 @@ def __new__(
of the individual function.
init_function (None): optional function to preset arguments for functions which are submitted later
command_line_argument_lst (list): Additional command line arguments for the srun call (SLURM only)
pmi (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only)
disable_dependencies (boolean): Disable resolving future objects during the submission.
refresh_rate (float): Set the refresh rate in seconds, how frequently the input queue is checked.

Expand All @@ -162,6 +166,7 @@ def __new__(
block_allocation=block_allocation,
init_function=init_function,
command_line_argument_lst=command_line_argument_lst,
pmi=pmi,
refresh_rate=refresh_rate,
)
else:
Expand All @@ -180,4 +185,5 @@ def __new__(
block_allocation=block_allocation,
init_function=init_function,
command_line_argument_lst=command_line_argument_lst,
pmi=pmi,
)
6 changes: 6 additions & 0 deletions pympipool/scheduler/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
check_oversubscribe,
check_executor,
check_init_function,
check_pmi,
validate_backend,
validate_number_of_cores,
)
Expand Down Expand Up @@ -50,6 +51,7 @@ def create_executor(
block_allocation: bool = False,
init_function: Optional[callable] = None,
command_line_argument_lst: list[str] = [],
pmi: Optional[str] = None,
):
"""
Instead of returning a pympipool.Executor object this function returns either a pympipool.mpi.PyMPIExecutor,
Expand Down Expand Up @@ -83,13 +85,15 @@ def create_executor(
of the individual function.
init_function (None): optional function to preset arguments for functions which are submitted later
command_line_argument_lst (list): Additional command line arguments for the srun call (SLURM only)
pmi (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only)

"""
max_cores = validate_number_of_cores(max_cores=max_cores, max_workers=max_workers)
check_init_function(block_allocation=block_allocation, init_function=init_function)
backend = validate_backend(
backend=backend, flux_installed=flux_installed, slurm_installed=slurm_installed
)
check_pmi(backend=backend, pmi=pmi)
if backend == "flux":
check_oversubscribe(oversubscribe=oversubscribe)
check_command_line_argument_lst(
Expand All @@ -105,6 +109,7 @@ def create_executor(
cwd=cwd,
executor=executor,
hostname_localhost=hostname_localhost,
pmi=pmi,
)
else:
return PyFluxStepExecutor(
Expand All @@ -115,6 +120,7 @@ def create_executor(
cwd=cwd,
executor=executor,
hostname_localhost=hostname_localhost,
pmi=pmi,
)
elif backend == "slurm":
check_executor(executor=executor)
Expand Down
10 changes: 10 additions & 0 deletions pympipool/scheduler/flux.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class PyFluxExecutor(ExecutorBroker):
init_function (None): optional function to preset arguments for functions which are submitted later
cwd (str/None): current working directory where the parallel python task is executed
executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux
pmi (str): PMI interface to use (OpenMPI v5 requires pmix) default is None
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
context of an HPC cluster this essential to be able to communicate to an
Executor running on a different compute node within the same allocation. And
Expand Down Expand Up @@ -66,6 +67,7 @@ def __init__(
init_function: Optional[callable] = None,
cwd: Optional[str] = None,
executor: Optional[flux.job.FluxExecutor] = None,
pmi: Optional[str] = None,
hostname_localhost: Optional[bool] = False,
):
super().__init__()
Expand All @@ -85,6 +87,7 @@ def __init__(
"gpus_per_core": int(gpus_per_worker / cores_per_worker),
"cwd": cwd,
"executor": executor,
"pmi": pmi,
},
)
for _ in range(max_workers)
Expand All @@ -106,6 +109,7 @@ class PyFluxStepExecutor(ExecutorSteps):
gpus_per_worker (int): number of GPUs per worker - defaults to 0
cwd (str/None): current working directory where the parallel python task is executed
executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux
pmi (str): PMI interface to use (OpenMPI v5 requires pmix) default is None
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
context of an HPC cluster this essential to be able to communicate to an
Executor running on a different compute node within the same allocation. And
Expand Down Expand Up @@ -141,6 +145,7 @@ def __init__(
gpus_per_worker: int = 0,
cwd: Optional[str] = None,
executor: Optional[flux.job.FluxExecutor] = None,
pmi: Optional[str] = None,
hostname_localhost: Optional[bool] = False,
):
super().__init__()
Expand All @@ -159,6 +164,7 @@ def __init__(
"gpus_per_core": int(gpus_per_worker / cores_per_worker),
"cwd": cwd,
"executor": executor,
"pmi": pmi,
},
)
)
Expand All @@ -173,6 +179,7 @@ def __init__(
gpus_per_core: int = 0,
oversubscribe: bool = False,
executor: Optional[flux.job.FluxExecutor] = None,
pmi: Optional[str] = None,
):
super().__init__(
cwd=cwd,
Expand All @@ -182,6 +189,7 @@ def __init__(
self._threads_per_core = threads_per_core
self._gpus_per_core = gpus_per_core
self._executor = executor
self._pmi = pmi
self._future = None

def bootup(self, command_lst: list[str]):
Expand All @@ -200,6 +208,8 @@ def bootup(self, command_lst: list[str]):
exclusive=False,
)
jobspec.environment = dict(os.environ)
if self._pmi is not None:
jobspec.setattr_shell_option("pmi", self._pmi)
if self._cwd is not None:
jobspec.cwd = self._cwd
self._future = self._executor.submit(jobspec)
Expand Down
11 changes: 10 additions & 1 deletion pympipool/shared/inputcheck.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import inspect
from typing import List
from typing import List, Optional
from concurrent.futures import Executor


Expand Down Expand Up @@ -85,6 +85,15 @@ def validate_backend(
return "mpi"


def check_pmi(backend: str, pmi: Optional[str]):
if backend != "flux" and pmi is not None:
raise ValueError("The pmi parameter is currently only implemented for flux.")
elif backend == "flux" and pmi not in ["pmix", "pmi1", "pmi2", None]:
raise ValueError(
"The pmi parameter supports [pmix, pmi1, pmi2], but not: " + pmi
)


def check_init_function(block_allocation: bool, init_function: callable):
if not block_allocation and init_function is not None:
raise ValueError("")
Expand Down
Loading