diff --git a/pympipool/flux/__init__.py b/pympipool/flux/__init__.py index 26f0a677..44e9e4f1 100644 --- a/pympipool/flux/__init__.py +++ b/pympipool/flux/__init__.py @@ -1 +1 @@ -from pympipool.flux.fluxbroker import PyFluxExecutor +from pympipool.flux.executor import PyFluxExecutor diff --git a/pympipool/flux/fluxtask.py b/pympipool/flux/executor.py similarity index 70% rename from pympipool/flux/fluxtask.py rename to pympipool/flux/executor.py index b7a59331..4c8930da 100644 --- a/pympipool/flux/fluxtask.py +++ b/pympipool/flux/executor.py @@ -5,12 +5,58 @@ from pympipool.shared.executorbase import ( cloudpickle_register, ExecutorBase, + executor_broker, execute_parallel_tasks, ) from pympipool.shared.interface import BaseInterface from pympipool.shared.thread import RaisingThread +class PyFluxExecutor(ExecutorBase): + """ + Args: + max_workers (int): defines the number workers which can execute functions in parallel + cores_per_worker (int): number of MPI cores to be used for each function call + threads_per_core (int): number of OpenMP threads to be used for each function call + gpus_per_worker (int): number of GPUs per worker - defaults to 0 + init_function (None): optional function to preset arguments for functions which are submitted later + cwd (str/None): current working directory where the parallel python task is executed + sleep_interval (float): synchronization interval - default 0.1 + executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux + """ + + def __init__( + self, + max_workers, + cores_per_worker=1, + threads_per_core=1, + gpus_per_worker=0, + init_function=None, + cwd=None, + sleep_interval=0.1, + executor=None, + ): + super().__init__() + self._process = RaisingThread( + target=executor_broker, + kwargs={ + # Broker Arguments + "future_queue": self._future_queue, + "max_workers": max_workers, + "sleep_interval": sleep_interval, + "executor_class": PyFluxSingleTaskExecutor, + # Executor Arguments + "cores": cores_per_worker, + "threads_per_core": threads_per_core, + "gpus_per_task": int(gpus_per_worker / cores_per_worker), + "init_function": init_function, + "cwd": cwd, + "executor": executor, + }, + ) + self._process.start() + + class PyFluxSingleTaskExecutor(ExecutorBase): """ The pympipool.Executor behaves like the concurrent.futures.Executor but it uses mpi4py to execute parallel tasks. @@ -29,7 +75,7 @@ class PyFluxSingleTaskExecutor(ExecutorBase): Examples: ``` >>> import numpy as np - >>> from pympipool.flux.fluxtask import PyFluxSingleTaskExecutor + >>> from pympipool.flux.executor import PyFluxSingleTaskExecutor >>> >>> def calc(i, j, k): >>> from mpi4py import MPI diff --git a/pympipool/flux/fluxbroker.py b/pympipool/flux/fluxbroker.py deleted file mode 100644 index b0edafd2..00000000 --- a/pympipool/flux/fluxbroker.py +++ /dev/null @@ -1,51 +0,0 @@ -from pympipool.shared.executorbase import ( - ExecutorBase, - executor_broker, -) -from pympipool.shared.thread import RaisingThread -from pympipool.flux.fluxtask import PyFluxSingleTaskExecutor - - -class PyFluxExecutor(ExecutorBase): - """ - Args: - max_workers (int): defines the number workers which can execute functions in parallel - cores_per_worker (int): number of MPI cores to be used for each function call - threads_per_core (int): number of OpenMP threads to be used for each function call - gpus_per_worker (int): number of GPUs per worker - defaults to 0 - init_function (None): optional function to preset arguments for functions which are submitted later - cwd (str/None): current working directory where the parallel python task is executed - sleep_interval (float): synchronization interval - default 0.1 - executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux - """ - - def __init__( - self, - max_workers, - cores_per_worker=1, - threads_per_core=1, - gpus_per_worker=0, - init_function=None, - cwd=None, - sleep_interval=0.1, - executor=None, - ): - super().__init__() - self._process = RaisingThread( - target=executor_broker, - kwargs={ - # Broker Arguments - "future_queue": self._future_queue, - "max_workers": max_workers, - "sleep_interval": sleep_interval, - "executor_class": PyFluxSingleTaskExecutor, - # Executor Arguments - "cores": cores_per_worker, - "threads_per_core": threads_per_core, - "gpus_per_task": int(gpus_per_worker / cores_per_worker), - "init_function": init_function, - "cwd": cwd, - "executor": executor, - }, - ) - self._process.start() diff --git a/pympipool/mpi/__init__.py b/pympipool/mpi/__init__.py index 5d4bea6a..24aa8369 100644 --- a/pympipool/mpi/__init__.py +++ b/pympipool/mpi/__init__.py @@ -1 +1 @@ -from pympipool.mpi.mpibroker import PyMPIExecutor +from pympipool.mpi.executor import PyMPIExecutor diff --git a/pympipool/mpi/mpitask.py b/pympipool/mpi/executor.py similarity index 58% rename from pympipool/mpi/mpitask.py rename to pympipool/mpi/executor.py index fdcc17b7..9aac7289 100644 --- a/pympipool/mpi/mpitask.py +++ b/pympipool/mpi/executor.py @@ -2,9 +2,69 @@ cloudpickle_register, execute_parallel_tasks, ExecutorBase, + executor_broker, ) -from pympipool.shared.thread import RaisingThread from pympipool.shared.interface import MpiExecInterface, SlurmSubprocessInterface +from pympipool.shared.thread import RaisingThread + + +class PyMPIExecutor(ExecutorBase): + """ + Args: + max_workers (int): defines the number workers which can execute functions in parallel + cores_per_worker (int): number of MPI cores to be used for each function call + threads_per_core (int): number of OpenMP threads to be used for each function call + gpus_per_worker (int): number of GPUs per worker - defaults to 0 + oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI only) - default False + init_function (None): optional function to preset arguments for functions which are submitted later + cwd (str/None): current working directory where the parallel python task is executed + sleep_interval (float): synchronization interval - default 0.1 + enable_slurm_backend (bool): enable the SLURM queueing system as backend - defaults to False + """ + + def __init__( + self, + max_workers, + cores_per_worker=1, + threads_per_core=1, + gpus_per_worker=0, + oversubscribe=False, + init_function=None, + cwd=None, + sleep_interval=0.1, + enable_slurm_backend=False, + ): + super().__init__() + if not enable_slurm_backend: + if threads_per_core != 1: + raise ValueError( + "The MPI backend only supports threads_per_core=1, " + + "to manage threads use the SLURM queuing system enable_slurm_backend=True ." + ) + elif gpus_per_worker != 0: + raise ValueError( + "The MPI backend only supports gpus_per_core=0, " + + "to manage GPUs use the SLURM queuing system enable_slurm_backend=True ." + ) + self._process = RaisingThread( + target=executor_broker, + kwargs={ + # Broker Arguments + "future_queue": self._future_queue, + "max_workers": max_workers, + "sleep_interval": sleep_interval, + "executor_class": PyMPISingleTaskExecutor, + # Executor Arguments + "cores": cores_per_worker, + "threads_per_core": threads_per_core, + "gpus_per_task": int(gpus_per_worker / cores_per_worker), + "oversubscribe": oversubscribe, + "init_function": init_function, + "cwd": cwd, + "enable_slurm_backend": enable_slurm_backend, + }, + ) + self._process.start() class PyMPISingleTaskExecutor(ExecutorBase): @@ -27,7 +87,7 @@ class PyMPISingleTaskExecutor(ExecutorBase): Examples: ``` >>> import numpy as np - >>> from pympipool.mpi.mpitask import PyMPISingleTaskExecutor + >>> from pympipool.mpi.executor import PyMPISingleTaskExecutor >>> >>> def calc(i, j, k): >>> from mpi4py import MPI diff --git a/pympipool/mpi/mpibroker.py b/pympipool/mpi/mpibroker.py deleted file mode 100644 index 0b660a21..00000000 --- a/pympipool/mpi/mpibroker.py +++ /dev/null @@ -1,65 +0,0 @@ -from pympipool.shared.executorbase import ( - ExecutorBase, - executor_broker, -) -from pympipool.shared.thread import RaisingThread -from pympipool.mpi.mpitask import PyMPISingleTaskExecutor - - -class PyMPIExecutor(ExecutorBase): - """ - Args: - max_workers (int): defines the number workers which can execute functions in parallel - cores_per_worker (int): number of MPI cores to be used for each function call - threads_per_core (int): number of OpenMP threads to be used for each function call - gpus_per_worker (int): number of GPUs per worker - defaults to 0 - oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI only) - default False - init_function (None): optional function to preset arguments for functions which are submitted later - cwd (str/None): current working directory where the parallel python task is executed - sleep_interval (float): synchronization interval - default 0.1 - enable_slurm_backend (bool): enable the SLURM queueing system as backend - defaults to False - """ - - def __init__( - self, - max_workers, - cores_per_worker=1, - threads_per_core=1, - gpus_per_worker=0, - oversubscribe=False, - init_function=None, - cwd=None, - sleep_interval=0.1, - enable_slurm_backend=False, - ): - super().__init__() - if not enable_slurm_backend: - if threads_per_core != 1: - raise ValueError( - "The MPI backend only supports threads_per_core=1, " - + "to manage threads use the SLURM queuing system enable_slurm_backend=True ." - ) - elif gpus_per_worker != 0: - raise ValueError( - "The MPI backend only supports gpus_per_core=0, " - + "to manage GPUs use the SLURM queuing system enable_slurm_backend=True ." - ) - self._process = RaisingThread( - target=executor_broker, - kwargs={ - # Broker Arguments - "future_queue": self._future_queue, - "max_workers": max_workers, - "sleep_interval": sleep_interval, - "executor_class": PyMPISingleTaskExecutor, - # Executor Arguments - "cores": cores_per_worker, - "threads_per_core": threads_per_core, - "gpus_per_task": int(gpus_per_worker / cores_per_worker), - "oversubscribe": oversubscribe, - "init_function": init_function, - "cwd": cwd, - "enable_slurm_backend": enable_slurm_backend, - }, - ) - self._process.start() diff --git a/tests/test_flux.py b/tests/test_flux.py index 4cfdf5c9..4d84511d 100644 --- a/tests/test_flux.py +++ b/tests/test_flux.py @@ -9,8 +9,11 @@ try: import flux.job - from pympipool.flux.fluxbroker import PyFluxExecutor - from pympipool.flux.fluxtask import PyFluxSingleTaskExecutor, FluxPythonInterface + from pympipool.flux.executor import ( + PyFluxExecutor, + PyFluxSingleTaskExecutor, + FluxPythonInterface, + ) skip_flux_test = False except ImportError: diff --git a/tests/test_future.py b/tests/test_future.py index cd96441e..1d59d4ee 100644 --- a/tests/test_future.py +++ b/tests/test_future.py @@ -1,7 +1,7 @@ import numpy as np import unittest from time import sleep -from pympipool.mpi.mpitask import PyMPISingleTaskExecutor +from pympipool.mpi.executor import PyMPISingleTaskExecutor from concurrent.futures import Future diff --git a/tests/test_meta.py b/tests/test_meta.py index 2981204d..643883c8 100644 --- a/tests/test_meta.py +++ b/tests/test_meta.py @@ -7,8 +7,7 @@ _get_executor_dict, _get_future_done, ) -from pympipool.mpi.mpitask import PyMPISingleTaskExecutor -from pympipool.mpi.mpibroker import PyMPIExecutor +from pympipool.mpi.executor import PyMPIExecutor, PyMPISingleTaskExecutor def calc(i): diff --git a/tests/test_task.py b/tests/test_task.py index bf1e166b..1585e6d3 100644 --- a/tests/test_task.py +++ b/tests/test_task.py @@ -1,5 +1,5 @@ import unittest -from pympipool.mpi.mpitask import PyMPISingleTaskExecutor +from pympipool.mpi.executor import PyMPISingleTaskExecutor def echo_funct(i): diff --git a/tests/test_worker.py b/tests/test_worker.py index 7a396701..5bc26747 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -3,7 +3,7 @@ from queue import Queue from time import sleep from concurrent.futures import CancelledError -from pympipool.mpi.mpitask import PyMPISingleTaskExecutor, get_interface +from pympipool.mpi.executor import PyMPISingleTaskExecutor, get_interface from pympipool.shared.executorbase import cloudpickle_register, execute_parallel_tasks from concurrent.futures import Future diff --git a/tests/test_worker_memory.py b/tests/test_worker_memory.py index 3c668dff..f8aaa7cb 100644 --- a/tests/test_worker_memory.py +++ b/tests/test_worker_memory.py @@ -3,7 +3,7 @@ from queue import Queue from pympipool.shared.backend import call_funct from pympipool.shared.executorbase import cloudpickle_register, execute_parallel_tasks -from pympipool.mpi.mpitask import PyMPISingleTaskExecutor, get_interface +from pympipool.mpi.executor import PyMPISingleTaskExecutor, get_interface from concurrent.futures import Future