Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
7f27436
Clean up CI
jan-janssen Jul 30, 2023
cb195d9
Add flux specific Executor
jan-janssen Jul 30, 2023
96824fd
Merge remote-tracking branch 'origin/main' into flux_python
jan-janssen Jul 30, 2023
06c9a1a
push coverage only on flux
jan-janssen Jul 30, 2023
b3ed7f4
install coverage only for flux
jan-janssen Jul 30, 2023
b8567d5
fix flux CI
jan-janssen Jul 30, 2023
20e6d5f
black formatting
jan-janssen Jul 30, 2023
7f4f46b
Add the first set of flux specific tests
jan-janssen Jul 30, 2023
e86c50c
update CI to only use flux for the flux tests
jan-janssen Jul 30, 2023
95865e2
skip flux tests when flux is not installed
jan-janssen Jul 30, 2023
5e8cff2
fix syntax error
jan-janssen Jul 30, 2023
4f6d1eb
more fixes
jan-janssen Jul 30, 2023
d61e60f
separate flux test
jan-janssen Jul 30, 2023
3d5f9f8
Enable tty
jan-janssen Jul 30, 2023
2419009
try bash -l
jan-janssen Jul 30, 2023
0f70684
Update unittest-flux.yml
jan-janssen Jul 30, 2023
dcbd6ef
fix flux
jan-janssen Jul 30, 2023
032d6c2
try commands as list of strings
jan-janssen Jul 30, 2023
b6de76e
Try debugging
jan-janssen Jul 30, 2023
5327a3b
Merge remote-tracking branch 'origin/main' into flux_python
jan-janssen Jul 30, 2023
975de10
Link python executable as sys.executable rather than python
jan-janssen Jul 30, 2023
de1554f
skip flux test when flux is not installed
jan-janssen Jul 30, 2023
f257e6e
Enable all tests again
jan-janssen Jul 30, 2023
ff6d943
fix working directory is none
jan-janssen Jul 30, 2023
2829030
fix environment
jan-janssen Jul 30, 2023
77e266a
some fixes
jan-janssen Jul 30, 2023
84c45ef
fix poll function
jan-janssen Jul 30, 2023
c431b88
disable full test
jan-janssen Jul 30, 2023
c7610fd
fix last test
jan-janssen Jul 31, 2023
6efcc9d
add test for internal memory
jan-janssen Jul 31, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions .ci_support/environment-openmpi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ channels:
- conda-forge
dependencies:
- python
- coverage
- coveralls =3.3.1
- numpy
- openmpi
- cloudpickle =2.2.1
Expand Down
78 changes: 78 additions & 0 deletions .github/workflows/unittest-flux.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
# This workflow is used to run the unittest of pyiron

name: Unittests-flux

on:
push:
branches: [ main ]
pull_request:
branches: [ main ]

jobs:
build:

runs-on: ${{ matrix.operating-system }}
strategy:
matrix:
include:
- operating-system: ubuntu-latest
python-version: '3.11'
label: linux-64-py-3-11-openmpi
prefix: /Users/runner/miniconda3/envs/my-env
environment-file: .ci_support/environment-openmpi.yml

- operating-system: ubuntu-latest
python-version: '3.11'
label: linux-64-py-3-11-mich
prefix: /usr/share/miniconda3/envs/my-env
environment-file: .ci_support/environment-mpich.yml

steps:
- uses: actions/checkout@v2
- uses: conda-incubator/setup-miniconda@v2.2.0
with:
python-version: ${{ matrix.python-version }}
mamba-version: "*"
channels: conda-forge
miniforge-variant: Mambaforge
channel-priority: strict
auto-update-conda: true
environment-file: ${{ matrix.environment-file }}
- name: Install flux
shell: bash -l {0}
run: mamba install -y flux-core coverage coveralls=3.3.1
- name: Setup
shell: bash -l {0}
run: pip install --no-deps .
- name: Test
shell: bash -l {0}
timeout-minutes: 5
run: >
for f in $(ls tests/test_*.py); do
echo $f;
if [ $f != "tests/test_flux.py" ]; then
coverage run --omit pympipool/_version.py -m unittest $f;
fi
done
env:
OMPI_MCA_plm: 'isolated'
OMPI_MCA_rmaps_base_oversubscribe: 'yes'
OMPI_MCA_btl_vader_single_copy_mechanism: 'none'
- name: Test Flux
shell: bash -l {0}
timeout-minutes: 5
run: >
flux start
coverage run --omit pympipool/_version.py -m unittest tests/test_flux.py;
env:
OMPI_MCA_plm: 'isolated'
OMPI_MCA_rmaps_base_oversubscribe: 'yes'
OMPI_MCA_btl_vader_single_copy_mechanism: 'none'
- name: Coverage
if: matrix.label == 'linux-64-py-3-11-openmpi'
continue-on-error: True
shell: bash -l {0}
run: |
coverage combine
coveralls
coverage xml
10 changes: 1 addition & 9 deletions .github/workflows/unittest-openmpi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,8 @@ jobs:
- name: Test
shell: bash -l {0}
timeout-minutes: 5
run: for f in $(ls tests/test_*.py); do echo $f; coverage run --omit pympipool/_version.py -m unittest $f; done
run: for f in $(ls tests/test_*.py); do echo $f; python -m unittest $f; done
env:
OMPI_MCA_plm: 'isolated'
OMPI_MCA_rmaps_base_oversubscribe: 'yes'
OMPI_MCA_btl_vader_single_copy_mechanism: 'none'
- name: Coverage
if: matrix.label == 'linux-64-py-3-10-openmpi'
continue-on-error: True
shell: bash -l {0}
run: |
coverage combine
coveralls
coverage xml
1 change: 1 addition & 0 deletions pympipool/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
interface_receive,
)
from pympipool.interfaces.taskbroker import HPCExecutor
from pympipool.interfaces.fluxbroker import PyFluxExecutor
from pympipool.interfaces.taskexecutor import Executor
from pympipool.legacy.interfaces.executor import PoolExecutor
from pympipool.legacy.interfaces.pool import Pool, MPISpawnPool
Expand Down
227 changes: 227 additions & 0 deletions pympipool/interfaces/fluxbroker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
import os
import queue
import socket
import sys
from time import sleep

from pympipool.shared.broker import (
MetaExecutorFuture,
_get_future_done,
_execute_task_dict,
)
from pympipool.interfaces.base import ExecutorBase
from pympipool.shared.thread import RaisingThread
from pympipool.shared.taskexecutor import (
cloudpickle_register,
_execute_parallel_tasks_loop,
)
from pympipool.shared.connections import FluxPythonInterface
from pympipool.shared.communication import SocketInterface


class SingleTaskExecutor(ExecutorBase):
"""
The pympipool.Executor behaves like the concurrent.futures.Executor but it uses mpi4py to execute parallel tasks.
In contrast to the mpi4py.futures.MPIPoolExecutor the pympipool.Executor can be executed in a serial python process
and does not require the python script to be executed with MPI. Still internally the pympipool.Executor uses the
mpi4py.futures.MPIPoolExecutor, consequently it is primarily an abstraction of its functionality to improve the
usability in particular when used in combination with Jupyter notebooks.

Args:
cores (int): defines the number of MPI ranks to use for each function call
gpus_per_task (int): number of GPUs per MPI rank - defaults to 0
oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI only) - default False
enable_flux_backend (bool): use the flux-framework as backend rather than just calling mpiexec
enable_slurm_backend (bool): enable the SLURM queueing system as backend - defaults to False
init_function (None): optional function to preset arguments for functions which are submitted later
cwd (str/None): current working directory where the parallel python task is executed
queue_adapter (pysqa.queueadapter.QueueAdapter): generalized interface to various queuing systems
queue_adapter_kwargs (dict/None): keyword arguments for the submit_job() function of the queue adapter

Simple example:
```
import numpy as np
from pympipool import Executor

def calc(i, j, k):
from mpi4py import MPI
size = MPI.COMM_WORLD.Get_size()
rank = MPI.COMM_WORLD.Get_rank()
return np.array([i, j, k]), size, rank

def init_k():
return {"k": 3}

with Executor(cores=2, init_function=init_k) as p:
fs = p.submit(calc, 2, j=4)
print(fs.result())

>>> [(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)]
```
"""

def __init__(
self,
cores,
gpus_per_task=0,
init_function=None,
cwd=None,
executor=None,
):
super().__init__()
self._process = RaisingThread(
target=execute_parallel_tasks,
kwargs={
"future_queue": self._future_queue,
"cores": cores,
"gpus_per_task": gpus_per_task,
"cwd": cwd,
"executor": executor,
},
)
self._process.start()
if init_function is not None:
self._future_queue.put(
{"init": True, "fn": init_function, "args": (), "kwargs": {}}
)
cloudpickle_register(ind=3)


class PyFluxExecutor(ExecutorBase):
def __init__(
self,
max_workers,
cores_per_worker=1,
gpus_per_worker=0,
init_function=None,
cwd=None,
sleep_interval=0.1,
executor=None,
):
super().__init__()
self._process = RaisingThread(
target=executor_broker,
kwargs={
"future_queue": self._future_queue,
"max_workers": max_workers,
"cores_per_worker": cores_per_worker,
"gpus_per_worker": gpus_per_worker,
"init_function": init_function,
"cwd": cwd,
"sleep_interval": sleep_interval,
"executor": executor,
},
)
self._process.start()


def execute_parallel_tasks(
future_queue,
cores,
gpus_per_task=0,
cwd=None,
executor=None,
):
"""
Execute a single tasks in parallel using the message passing interface (MPI).

Args:
future_queue (queue.Queue): task queue of dictionary objects which are submitted to the parallel process
cores (int): defines the total number of MPI ranks to use
gpus_per_task (int): number of GPUs per MPI rank - defaults to 0
cwd (str/None): current working directory where the parallel python task is executed
executor (flux.job.FluxExecutor/None): flux executor to submit tasks to - optional
"""
command_lst = [
sys.executable,
os.path.abspath(os.path.join(__file__, "..", "..", "backend", "mpiexec.py")),
]
interface = interface_bootup(
command_lst=command_lst,
cwd=cwd,
cores=cores,
gpus_per_core=gpus_per_task,
executor=executor,
)
_execute_parallel_tasks_loop(interface=interface, future_queue=future_queue)


def interface_bootup(
command_lst,
cwd=None,
cores=1,
gpus_per_core=0,
executor=None,
):
command_lst += [
"--host",
socket.gethostname(),
]
connections = FluxPythonInterface(
cwd=cwd,
cores=cores,
gpus_per_core=gpus_per_core,
oversubscribe=False,
executor=executor,
)
interface = SocketInterface(interface=connections)
command_lst += [
"--zmqport",
str(interface.bind_to_random_port()),
]
interface.bootup(command_lst=command_lst)
return interface


def executor_broker(
future_queue,
max_workers,
cores_per_worker=1,
gpus_per_worker=0,
init_function=None,
cwd=None,
sleep_interval=0.1,
executor=None,
):
meta_future_lst = _get_executor_list(
max_workers=max_workers,
cores_per_worker=cores_per_worker,
gpus_per_worker=gpus_per_worker,
init_function=init_function,
cwd=cwd,
executor=executor,
)
while True:
try:
task_dict = future_queue.get_nowait()
except queue.Empty:
sleep(sleep_interval)
else:
if _execute_task_dict(task_dict=task_dict, meta_future_lst=meta_future_lst):
future_queue.task_done()
else:
future_queue.task_done()
break


def _get_executor_list(
max_workers,
cores_per_worker=1,
gpus_per_worker=0,
init_function=None,
cwd=None,
executor=None,
):
return [
MetaExecutorFuture(
future=_get_future_done(),
executor=SingleTaskExecutor(
cores=cores_per_worker,
gpus_per_task=int(gpus_per_worker / cores_per_worker),
init_function=init_function,
cwd=cwd,
executor=executor,
),
)
for _ in range(max_workers)
]
20 changes: 14 additions & 6 deletions pympipool/shared/connections.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from abc import ABC
import os
import subprocess


Expand Down Expand Up @@ -171,21 +172,28 @@ def bootup(self, command_lst):
if self._executor is None:
self._executor = flux.job.FluxExecutor()
jobspec = flux.job.JobspecV1.from_command(
command=" ".join(command_lst),
num_tasks=1,
cores_per_task=self._cores,
command=command_lst,
num_tasks=self._cores,
cores_per_task=1,
gpus_per_task=self._gpus_per_core,
num_nodes=None,
exclusive=False,
)
jobspec.cwd = self._cwd
jobspec.environment = dict(os.environ)
if self._cwd is not None:
jobspec.cwd = self._cwd
self._future = self._executor.submit(jobspec)

def shutdown(self, wait=True):
self._executor.shutdown(wait=wait)
if self.poll():
self._future.cancel()
# The flux future objects are not instantly updated,
# still showing running after cancel was called,
# so we wait until the execution is completed.
self._future.result()

def poll(self):
return self._executor is not None
return self._future is not None and not self._future.done()


def generate_slurm_command(cores, cwd, gpus_per_core=0, oversubscribe=False):
Expand Down
Loading