Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion pympipool/flux/__init__.py

This file was deleted.

51 changes: 0 additions & 51 deletions pympipool/flux/fluxbroker.py

This file was deleted.

48 changes: 47 additions & 1 deletion pympipool/flux/fluxtask.py → pympipool/flux_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,57 @@
cloudpickle_register,
ExecutorBase,
execute_parallel_tasks,
executor_broker,
)
from pympipool.shared.interface import BaseInterface
from pympipool.shared.thread import RaisingThread


class PyFluxExecutor(ExecutorBase):
"""
Args:
max_workers (int): defines the number workers which can execute functions in parallel
cores_per_worker (int): number of MPI cores to be used for each function call
threads_per_core (int): number of OpenMP threads to be used for each function call
gpus_per_worker (int): number of GPUs per worker - defaults to 0
init_function (None): optional function to preset arguments for functions which are submitted later
cwd (str/None): current working directory where the parallel python task is executed
sleep_interval (float): synchronization interval - default 0.1
executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux
"""

def __init__(
self,
max_workers,
cores_per_worker=1,
threads_per_core=1,
gpus_per_worker=0,
init_function=None,
cwd=None,
sleep_interval=0.1,
executor=None,
):
super().__init__()
self._process = RaisingThread(
target=executor_broker,
kwargs={
# Broker Arguments
"future_queue": self._future_queue,
"max_workers": max_workers,
"sleep_interval": sleep_interval,
"executor_class": PyFluxSingleTaskExecutor,
# Executor Arguments
"cores": cores_per_worker,
"threads_per_core": threads_per_core,
"gpus_per_task": int(gpus_per_worker / cores_per_worker),
"init_function": init_function,
"cwd": cwd,
"executor": executor,
},
)
self._process.start()


class PyFluxSingleTaskExecutor(ExecutorBase):
"""
The pympipool.Executor behaves like the concurrent.futures.Executor but it uses mpi4py to execute parallel tasks.
Expand All @@ -29,7 +75,7 @@ class PyFluxSingleTaskExecutor(ExecutorBase):
Examples:
```
>>> import numpy as np
>>> from pympipool.flux.fluxtask import PyFluxSingleTaskExecutor
>>> from pympipool.flux_executor import PyFluxSingleTaskExecutor
>>>
>>> def calc(i, j, k):
>>> from mpi4py import MPI
Expand Down
1 change: 0 additions & 1 deletion pympipool/mpi/__init__.py

This file was deleted.

65 changes: 0 additions & 65 deletions pympipool/mpi/mpibroker.py

This file was deleted.

62 changes: 61 additions & 1 deletion pympipool/mpi/mpitask.py → pympipool/mpi_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,71 @@
cloudpickle_register,
execute_parallel_tasks,
ExecutorBase,
executor_broker,
)
from pympipool.shared.thread import RaisingThread
from pympipool.shared.interface import MpiExecInterface, SlurmSubprocessInterface


class PyMPIExecutor(ExecutorBase):
"""
Args:
max_workers (int): defines the number workers which can execute functions in parallel
cores_per_worker (int): number of MPI cores to be used for each function call
threads_per_core (int): number of OpenMP threads to be used for each function call
gpus_per_worker (int): number of GPUs per worker - defaults to 0
oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI only) - default False
init_function (None): optional function to preset arguments for functions which are submitted later
cwd (str/None): current working directory where the parallel python task is executed
sleep_interval (float): synchronization interval - default 0.1
enable_slurm_backend (bool): enable the SLURM queueing system as backend - defaults to False
"""

def __init__(
self,
max_workers,
cores_per_worker=1,
threads_per_core=1,
gpus_per_worker=0,
oversubscribe=False,
init_function=None,
cwd=None,
sleep_interval=0.1,
enable_slurm_backend=False,
):
super().__init__()
if not enable_slurm_backend:
if threads_per_core != 1:
raise ValueError(
"The MPI backend only supports threads_per_core=1, "
+ "to manage threads use the SLURM queuing system enable_slurm_backend=True ."
)
elif gpus_per_worker != 0:
raise ValueError(
"The MPI backend only supports gpus_per_core=0, "
+ "to manage GPUs use the SLURM queuing system enable_slurm_backend=True ."
)
self._process = RaisingThread(
target=executor_broker,
kwargs={
# Broker Arguments
"future_queue": self._future_queue,
"max_workers": max_workers,
"sleep_interval": sleep_interval,
"executor_class": PyMPISingleTaskExecutor,
# Executor Arguments
"cores": cores_per_worker,
"threads_per_core": threads_per_core,
"gpus_per_task": int(gpus_per_worker / cores_per_worker),
"oversubscribe": oversubscribe,
"init_function": init_function,
"cwd": cwd,
"enable_slurm_backend": enable_slurm_backend,
},
)
self._process.start()


class PyMPISingleTaskExecutor(ExecutorBase):
"""
The pympipool.Executor behaves like the concurrent.futures.Executor but it uses mpi4py to execute parallel tasks.
Expand All @@ -27,7 +87,7 @@ class PyMPISingleTaskExecutor(ExecutorBase):
Examples:
```
>>> import numpy as np
>>> from pympipool.mpi.mpitask import PyMPISingleTaskExecutor
>>> from pympipool.mpi_executor import PyMPISingleTaskExecutor
>>>
>>> def calc(i, j, k):
>>> from mpi4py import MPI
Expand Down
7 changes: 3 additions & 4 deletions tests/test_flux.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@

try:
import flux.job
from pympipool.flux.fluxbroker import PyFluxExecutor
from pympipool.flux.fluxtask import PyFluxSingleTaskExecutor, FluxPythonInterface
from pympipool.flux_executor import PyFluxExecutor, PyFluxSingleTaskExecutor, FluxPythonInterface

skip_flux_test = False
except ImportError:
Expand Down Expand Up @@ -89,8 +88,8 @@ def test_execute_task(self):
execute_parallel_tasks(
future_queue=q,
cores=1,
executor=self.executor,
interface_class=FluxPythonInterface,
executor=self.executor,
)
self.assertEqual(f.result(), 2)
q.join()
Expand All @@ -105,8 +104,8 @@ def test_execute_task_threads(self):
future_queue=q,
cores=1,
threads_per_core=1,
executor=self.executor,
interface_class=FluxPythonInterface,
executor=self.executor,
)
self.assertEqual(f.result(), 2)
q.join()
Expand Down
2 changes: 1 addition & 1 deletion tests/test_future.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import numpy as np
import unittest
from time import sleep
from pympipool.mpi.mpitask import PyMPISingleTaskExecutor
from pympipool.mpi_executor import PyMPISingleTaskExecutor
from concurrent.futures import Future


Expand Down
3 changes: 1 addition & 2 deletions tests/test_meta.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@
_get_executor_dict,
_get_future_done,
)
from pympipool.mpi.mpitask import PyMPISingleTaskExecutor
from pympipool.mpi.mpibroker import PyMPIExecutor
from pympipool.mpi_executor import PyMPISingleTaskExecutor, PyMPIExecutor


def calc(i):
Expand Down
2 changes: 1 addition & 1 deletion tests/test_task.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import unittest
from pympipool.mpi.mpitask import PyMPISingleTaskExecutor
from pympipool.mpi_executor import PyMPISingleTaskExecutor


def echo_funct(i):
Expand Down
2 changes: 1 addition & 1 deletion tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from queue import Queue
from time import sleep
from concurrent.futures import CancelledError
from pympipool.mpi.mpitask import PyMPISingleTaskExecutor, get_interface
from pympipool.mpi_executor import PyMPISingleTaskExecutor, get_interface
from pympipool.shared.executorbase import cloudpickle_register, execute_parallel_tasks
from concurrent.futures import Future

Expand Down
2 changes: 1 addition & 1 deletion tests/test_worker_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from queue import Queue
from pympipool.shared.backend import call_funct
from pympipool.shared.executorbase import cloudpickle_register, execute_parallel_tasks
from pympipool.mpi.mpitask import PyMPISingleTaskExecutor, get_interface
from pympipool.mpi_executor import PyMPISingleTaskExecutor, get_interface
from concurrent.futures import Future


Expand Down