Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 0 additions & 16 deletions pympipool/__init__.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,3 @@
from pympipool.shared.communication import (
SocketInterface,
interface_connect,
interface_bootup,
interface_send,
interface_shutdown,
interface_receive,
)
from pympipool.interfaces.taskbroker import HPCExecutor
from pympipool.interfaces.fluxbroker import PyFluxExecutor
from pympipool.interfaces.taskexecutor import Executor
from pympipool.legacy.interfaces.executor import PoolExecutor
from pympipool.legacy.interfaces.pool import Pool, MPISpawnPool
from pympipool.shared.thread import RaisingThread
from pympipool.shared.taskexecutor import cancel_items_in_queue

from ._version import get_versions

__version__ = get_versions()["version"]
Expand Down
1 change: 1 addition & 0 deletions pympipool/flux/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from pympipool.flux.fluxbroker import PyFluxExecutor
64 changes: 64 additions & 0 deletions pympipool/flux/fluxbroker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
from pympipool.shared.executorbase import (
ExecutorBase,
executor_broker,
get_executor_dict,
)
from pympipool.shared.thread import RaisingThread
from pympipool.flux.fluxtask import PyFluxSingleTaskExecutor


class PyFluxExecutor(ExecutorBase):
def __init__(
self,
max_workers,
cores_per_worker=1,
threads_per_core=1,
gpus_per_worker=0,
init_function=None,
cwd=None,
sleep_interval=0.1,
executor=None,
):
super().__init__()
self._process = RaisingThread(
target=_flux_executor_broker,
kwargs={
"future_queue": self._future_queue,
"max_workers": max_workers,
"cores_per_worker": cores_per_worker,
"threads_per_core": threads_per_core,
"gpus_per_worker": gpus_per_worker,
"init_function": init_function,
"cwd": cwd,
"sleep_interval": sleep_interval,
"executor": executor,
},
)
self._process.start()


def _flux_executor_broker(
future_queue,
max_workers,
cores_per_worker=1,
threads_per_core=1,
gpus_per_worker=0,
init_function=None,
cwd=None,
sleep_interval=0.1,
executor=None,
):
executor_broker(
future_queue=future_queue,
meta_future_lst=get_executor_dict(
max_workers=max_workers,
executor_class=PyFluxSingleTaskExecutor,
cores=cores_per_worker,
threads_per_core=threads_per_core,
gpus_per_task=int(gpus_per_worker / cores_per_worker),
init_function=init_function,
cwd=cwd,
executor=executor,
),
sleep_interval=sleep_interval,
)
166 changes: 166 additions & 0 deletions pympipool/flux/fluxtask.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
import os

import flux.job

from pympipool.shared.executorbase import (
cloudpickle_register,
ExecutorBase,
execute_parallel_tasks_loop,
get_backend_path,
)
from pympipool.shared.interface import BaseInterface
from pympipool.shared.communication import interface_bootup
from pympipool.shared.thread import RaisingThread


class PyFluxSingleTaskExecutor(ExecutorBase):
"""
The pympipool.Executor behaves like the concurrent.futures.Executor but it uses mpi4py to execute parallel tasks.
In contrast to the mpi4py.futures.MPIPoolExecutor the pympipool.Executor can be executed in a serial python process
and does not require the python script to be executed with MPI. Still internally the pympipool.Executor uses the
mpi4py.futures.MPIPoolExecutor, consequently it is primarily an abstraction of its functionality to improve the
usability in particular when used in combination with Jupyter notebooks.

Args:
cores (int): defines the number of MPI ranks to use for each function call
threads_per_core (int): number of OpenMP threads to be used for each function call
gpus_per_task (int): number of GPUs per MPI rank - defaults to 0
init_function (None): optional function to preset arguments for functions which are submitted later
cwd (str/None): current working directory where the parallel python task is executed

Examples:
```
>>> import numpy as np
>>> from pympipool.flux.fluxtask import PyFluxSingleTaskExecutor
>>>
>>> def calc(i, j, k):
>>> from mpi4py import MPI
>>> size = MPI.COMM_WORLD.Get_size()
>>> rank = MPI.COMM_WORLD.Get_rank()
>>> return np.array([i, j, k]), size, rank
>>>
>>> def init_k():
>>> return {"k": 3}
>>>
>>> with PyFluxSingleTaskExecutor(cores=2, init_function=init_k) as p:
>>> fs = p.submit(calc, 2, j=4)
>>> print(fs.result())

[(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)]
```
"""

def __init__(
self,
cores=1,
threads_per_core=1,
gpus_per_task=0,
init_function=None,
cwd=None,
executor=None,
):
super().__init__()
self._process = RaisingThread(
target=_flux_execute_parallel_tasks,
kwargs={
"future_queue": self._future_queue,
"cores": cores,
"threads_per_core": threads_per_core,
"gpus_per_task": gpus_per_task,
"cwd": cwd,
"executor": executor,
},
)
self._process.start()
if init_function is not None:
self._future_queue.put(
{"init": True, "fn": init_function, "args": (), "kwargs": {}}
)
cloudpickle_register(ind=3)


class FluxPythonInterface(BaseInterface):
def __init__(
self,
cwd=None,
cores=1,
threads_per_core=1,
gpus_per_core=0,
oversubscribe=False,
executor=None,
):
super().__init__(
cwd=cwd,
cores=cores,
gpus_per_core=gpus_per_core,
threads_per_core=threads_per_core,
oversubscribe=oversubscribe,
)
self._executor = executor
self._future = None

def bootup(self, command_lst):
if self._oversubscribe:
raise ValueError(
"Oversubscribing is currently not supported for the Flux adapter."
)
if self._executor is None:
self._executor = flux.job.FluxExecutor()
jobspec = flux.job.JobspecV1.from_command(
command=command_lst,
num_tasks=self._cores,
cores_per_task=self._threads_per_core,
gpus_per_task=self._gpus_per_core,
num_nodes=None,
exclusive=False,
)
jobspec.environment = dict(os.environ)
if self._cwd is not None:
jobspec.cwd = self._cwd
self._future = self._executor.submit(jobspec)

def shutdown(self, wait=True):
if self.poll():
self._future.cancel()
# The flux future objects are not instantly updated,
# still showing running after cancel was called,
# so we wait until the execution is completed.
self._future.result()

def poll(self):
return self._future is not None and not self._future.done()


def _flux_execute_parallel_tasks(
future_queue,
cores,
threads_per_core=1,
gpus_per_task=0,
cwd=None,
executor=None,
):
"""
Execute a single tasks in parallel using the message passing interface (MPI).

Args:
future_queue (queue.Queue): task queue of dictionary objects which are submitted to the parallel process
cores (int): defines the total number of MPI ranks to use
threads_per_core (int): number of OpenMP threads to be used for each function call
gpus_per_task (int): number of GPUs per MPI rank - defaults to 0
cwd (str/None): current working directory where the parallel python task is executed
executor (flux.job.FluxExecutor/None): flux executor to submit tasks to - optional
"""
execute_parallel_tasks_loop(
interface=interface_bootup(
command_lst=get_backend_path(cores=cores),
connections=FluxPythonInterface(
cwd=cwd,
cores=cores,
threads_per_core=threads_per_core,
gpus_per_core=gpus_per_task,
oversubscribe=False,
executor=executor,
),
),
future_queue=future_queue,
)
Empty file removed pympipool/interfaces/__init__.py
Empty file.
Loading