From a438dc1e516cb5d8fd33044a68f89e98d9c9c515 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Tue, 16 Apr 2024 20:54:54 -0500 Subject: [PATCH 01/17] Add option to submit one function per executor --- pympipool/flux/__init__.py | 2 +- pympipool/flux/executor.py | 77 +++++++++++++++++++- pympipool/mpi/__init__.py | 2 +- pympipool/mpi/executor.py | 69 ++++++++++++++++++ pympipool/shared/executorbase.py | 116 +++++++++++++++++++++++++++++++ pympipool/slurm/__init__.py | 2 +- pympipool/slurm/executor.py | 74 ++++++++++++++++++++ 7 files changed, 338 insertions(+), 4 deletions(-) diff --git a/pympipool/flux/__init__.py b/pympipool/flux/__init__.py index 44e9e4f1..ba4b53b9 100644 --- a/pympipool/flux/__init__.py +++ b/pympipool/flux/__init__.py @@ -1 +1 @@ -from pympipool.flux.executor import PyFluxExecutor +from pympipool.flux.executor import PyFluxExecutor, PyFluxStepExecutor diff --git a/pympipool/flux/executor.py b/pympipool/flux/executor.py index 5856133a..3115d1e8 100644 --- a/pympipool/flux/executor.py +++ b/pympipool/flux/executor.py @@ -4,8 +4,10 @@ import flux.job from pympipool.shared.executorbase import ( - ExecutorBroker, execute_parallel_tasks, + execute_separate_tasks, + ExecutorBroker, + ExecutorSteps, ) from pympipool.shared.interface import BaseInterface from pympipool.shared.thread import RaisingThread @@ -90,6 +92,79 @@ def __init__( ) +class PyFluxStepExecutor(ExecutorSteps): + """ + The pympipool.flux.PyFluxStepExecutor leverages the flux framework to distribute python tasks within a queuing + system allocation. In analogy to the pympipool.slurm.PySlurmExecutur it provides the option to specify the number + of threads per worker as well as the number of GPUs per worker in addition to specifying the number of cores per + worker. + + Args: + max_cores (int): defines the number workers which can execute functions in parallel + cores_per_worker (int): number of MPI cores to be used for each function call + threads_per_core (int): number of OpenMP threads to be used for each function call + gpus_per_worker (int): number of GPUs per worker - defaults to 0 + init_function (None): optional function to preset arguments for functions which are submitted later + cwd (str/None): current working directory where the parallel python task is executed + executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux + hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the + context of an HPC cluster this essential to be able to communicate to an + Executor running on a different compute node within the same allocation. And + in principle any computer should be able to resolve that their own hostname + points to the same address as localhost. Still MacOS >= 12 seems to disable + this look up for security reasons. So on MacOS it is required to set this + option to true + + Examples: + + >>> import numpy as np + >>> from pympipool.flux import PyFluxStepExecutor + >>> + >>> def calc(i, j, k): + >>> from mpi4py import MPI + >>> size = MPI.COMM_WORLD.Get_size() + >>> rank = MPI.COMM_WORLD.Get_rank() + >>> return np.array([i, j, k]), size, rank + >>> + >>> with PyFluxStepExecutor(max_cores=2) as p: + >>> fs = p.submit(fn=calc, fn_args=[2], fn_kwargs={"j": 4, "k": 3}, executor_kwargs={"cores": 2}) + >>> print(fs.result()) + + [(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)] + + """ + + def __init__( + self, + max_cores: int = 1, + cores_per_worker: int = 1, + threads_per_core: int = 1, + gpus_per_worker: int = 0, + cwd: Optional[str] = None, + executor: Optional[flux.job.FluxExecutor] = None, + hostname_localhost: Optional[bool] = False, + ): + super().__init__() + self._set_process( + RaisingThread( + target=execute_separate_tasks, + kwargs={ + # Executor Arguments + "future_queue": self._future_queue, + "cores": cores_per_worker, + "interface_class": FluxPythonInterface, + "max_cores": max_cores, + "hostname_localhost": hostname_localhost, + # Interface Arguments + "threads_per_core": threads_per_core, + "gpus_per_core": int(gpus_per_worker / cores_per_worker), + "cwd": cwd, + "executor": executor, + }, + ) + ) + + class FluxPythonInterface(BaseInterface): def __init__( self, diff --git a/pympipool/mpi/__init__.py b/pympipool/mpi/__init__.py index 24aa8369..3ec1c1ff 100644 --- a/pympipool/mpi/__init__.py +++ b/pympipool/mpi/__init__.py @@ -1 +1 @@ -from pympipool.mpi.executor import PyMPIExecutor +from pympipool.mpi.executor import PyMPIExecutor, PyMPIStepExecutor diff --git a/pympipool/mpi/executor.py b/pympipool/mpi/executor.py index b1e39048..7fc01a21 100644 --- a/pympipool/mpi/executor.py +++ b/pympipool/mpi/executor.py @@ -2,7 +2,9 @@ from pympipool.shared.executorbase import ( execute_parallel_tasks, + execute_separate_tasks, ExecutorBroker, + ExecutorSteps, ) from pympipool.shared.interface import MpiExecInterface from pympipool.shared.thread import RaisingThread @@ -80,3 +82,70 @@ def __init__( for _ in range(max_workers) ], ) + + +class PyMPIStepExecutor(ExecutorSteps): + """ + The pympipool.mpi.PyMPIStepExecutor leverages the message passing interface MPI to distribute python tasks within an + MPI allocation. In contrast to the mpi4py.futures.MPIPoolExecutor the pympipool.mpi.PyMPIStepExecutor can be executed + in a serial python process and does not require the python script to be executed with MPI. Consequently, it is + primarily an abstraction of its functionality to improve the usability in particular when used in combination with \ + Jupyter notebooks. + + Args: + max_cores (int): defines the number cores which can be used in parallel + cores_per_worker (int): number of MPI cores to be used for each function call + oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI only) - default False + cwd (str/None): current working directory where the parallel python task is executed + hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the + context of an HPC cluster this essential to be able to communicate to an + Executor running on a different compute node within the same allocation. And + in principle any computer should be able to resolve that their own hostname + points to the same address as localhost. Still MacOS >= 12 seems to disable + this look up for security reasons. So on MacOS it is required to set this + option to true + + Examples: + + >>> import numpy as np + >>> from pympipool.mpi import PyMPIStepExecutor + >>> + >>> def calc(i, j, k): + >>> from mpi4py import MPI + >>> size = MPI.COMM_WORLD.Get_size() + >>> rank = MPI.COMM_WORLD.Get_rank() + >>> return np.array([i, j, k]), size, rank + >>> + >>> with PyMPIStepExecutor(max_cores=2) as p: + >>> fs = p.submit(fn=calc, fn_args=[2], fn_kwargs={"j": 4, "k": 3}, executor_kwargs={"cores": 2}) + >>> print(fs.result()) + + [(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)] + + """ + + def __init__( + self, + max_cores: int = 1, + cores_per_worker: int = 1, + oversubscribe: bool = False, + cwd: Optional[str] = None, + hostname_localhost: bool = False, + ): + super().__init__() + self._set_process( + RaisingThread( + target=execute_separate_tasks, + kwargs={ + # Executor Arguments + "future_queue": self._future_queue, + "cores": cores_per_worker, + "interface_class": MpiExecInterface, + "max_cores": max_cores, + "hostname_localhost": hostname_localhost, + # Interface Arguments + "cwd": cwd, + "oversubscribe": oversubscribe, + }, + ) + ) \ No newline at end of file diff --git a/pympipool/shared/executorbase.py b/pympipool/shared/executorbase.py index 602475a4..a5137436 100644 --- a/pympipool/shared/executorbase.py +++ b/pympipool/shared/executorbase.py @@ -128,6 +128,59 @@ def _set_process(self, process: List[RaisingThread]): process.start() +class ExecutorSteps(ExecutorBase): + def submit( + self, + fn: callable, + fn_args: list, + fn_kwargs: dict, + executor_kwargs: dict, + ): + """Submits a callable to be executed with the given arguments. + + Schedules the callable to be executed as fn(*args, **kwargs) and returns + a Future instance representing the execution of the callable. + + Returns: + A Future representing the given call. + """ + f = Future() + self._future_queue.put( + { + "fn": fn, + "args": fn_args, + "kwargs": fn_kwargs, + "future": f, + "executor_kwargs": executor_kwargs, + } + ) + return f + + def shutdown(self, wait: bool = True, *, cancel_futures: bool = False): + """Clean-up the resources associated with the Executor. + + It is safe to call this method several times. Otherwise, no other + methods can be called after this one. + + Args: + wait: If True then shutdown will not return until all running + futures have finished executing and the resources used by the + parallel_executors have been reclaimed. + cancel_futures: If True then shutdown will cancel all pending + futures. Futures that are completed or running will not be + cancelled. + """ + if cancel_futures: + cancel_items_in_queue(que=self._future_queue) + if self._process is not None: + self._future_queue.put({"shutdown": True, "wait": wait}) + if wait: + self._process.join() + self._future_queue.join() + self._process = None + self._future_queue = None + + def cancel_items_in_queue(que: queue.Queue): """ Cancel items which are still waiting in the queue. If the executor is busy tasks remain in the queue, so the future @@ -222,6 +275,63 @@ def execute_parallel_tasks( future_queue.task_done() +def execute_separate_tasks( + future_queue: queue.Queue, + interface_class: BaseInterface, + max_cores: int, + hostname_localhost: bool = False, + **kwargs, +): + """ + Execute a single tasks in parallel using the message passing interface (MPI). + + Args: + future_queue (queue.Queue): task queue of dictionary objects which are submitted to the parallel process + interface_class (BaseInterface): Interface to start process on selected compute resources + max_cores (int): defines the number cores which can be used in parallel + hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the + context of an HPC cluster this essential to be able to communicate to an + Executor running on a different compute node within the same allocation. And + in principle any computer should be able to resolve that their own hostname + points to the same address as localhost. Still MacOS >= 12 seems to disable + this look up for security reasons. So on MacOS it is required to set this + option to true + """ + active_task_dict = {} + while True: + task_dict = future_queue.get() + if "shutdown" in task_dict.keys() and task_dict["shutdown"]: + future_queue.task_done() + future_queue.join() + break + elif "fn" in task_dict.keys() and "future" in task_dict.keys(): + active_task_dict = _wait_for_free_slots( + active_task_dict=active_task_dict, + max_cores=max_cores, + ) + executor_dict = task_dict.pop("executor_kwargs") + qtask = queue.Queue() + qtask.put(task_dict) + qtask.put({"shutdown": True, "wait": True}) + active_task_dict[task_dict["future"]] = executor_dict["cores"] + task_kwargs = kwargs.copy() + task_kwargs.update(executor_dict) + task_kwargs.update( + { + "future_queue": qtask, + "interface_class": interface_class, + "hostname_localhost": hostname_localhost, + "init_function": None, + } + ) + process = RaisingThread( + target=execute_parallel_tasks, + kwargs=task_kwargs, + ) + process.start() + future_queue.task_done() + + def _get_backend_path(cores: int): command_lst = [sys.executable] if cores > 1: @@ -233,3 +343,9 @@ def _get_backend_path(cores: int): def _get_command_path(executable: str): return os.path.abspath(os.path.join(__file__, "..", "..", "backend", executable)) + + +def _wait_for_free_slots(active_task_dict, max_cores): + while sum(active_task_dict.values()) >= max_cores: + active_task_dict = {k: v for k, v in active_task_dict.items() if not k.done()} + return active_task_dict diff --git a/pympipool/slurm/__init__.py b/pympipool/slurm/__init__.py index ed4aeab3..0bdf170f 100644 --- a/pympipool/slurm/__init__.py +++ b/pympipool/slurm/__init__.py @@ -1 +1 @@ -from pympipool.slurm.executor import PySlurmExecutor +from pympipool.slurm.executor import PySlurmExecutor, PySlurmStepExecutor diff --git a/pympipool/slurm/executor.py b/pympipool/slurm/executor.py index a587b73e..22627871 100644 --- a/pympipool/slurm/executor.py +++ b/pympipool/slurm/executor.py @@ -1,7 +1,9 @@ from typing import Optional from pympipool.shared.executorbase import ( execute_parallel_tasks, + execute_separate_tasks, ExecutorBroker, + ExecutorSteps, ) from pympipool.shared.interface import SrunInterface from pympipool.shared.thread import RaisingThread @@ -86,3 +88,75 @@ def __init__( for _ in range(max_workers) ], ) + + +class PySlurmStepExecutor(ExecutorSteps): + """ + The pympipool.slurm.PySlurmStepExecutor leverages the srun command to distribute python tasks within a SLURM queuing + system allocation. In analogy to the pympipool.flux.PyFluxExecutor it provides the option to specify the number of + threads per worker as well as the number of GPUs per worker in addition to specifying the number of cores per + worker. + + Args: + max_cores (int): defines the number cores which can be used in parallel + cores_per_worker (int): number of MPI cores to be used for each function call + threads_per_core (int): number of OpenMP threads to be used for each function call + gpus_per_worker (int): number of GPUs per worker - defaults to 0 + oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI only) - default False + cwd (str/None): current working directory where the parallel python task is executed + hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the + context of an HPC cluster this essential to be able to communicate to an + Executor running on a different compute node within the same allocation. And + in principle any computer should be able to resolve that their own hostname + points to the same address as localhost. Still MacOS >= 12 seems to disable + this look up for security reasons. So on MacOS it is required to set this + option to true + + Examples: + + >>> import numpy as np + >>> from pympipool.slurm import PySlurmStepExecutor + >>> + >>> def calc(i, j, k): + >>> from mpi4py import MPI + >>> size = MPI.COMM_WORLD.Get_size() + >>> rank = MPI.COMM_WORLD.Get_rank() + >>> return np.array([i, j, k]), size, rank + >>> + >>> with PySlurmStepExecutor(max_cores=2) as p: + >>> fs = p.submit(fn=calc, fn_args=[2], fn_kwargs={"j": 4, "k": 3}, executor_kwargs={"cores": 2}) + >>> print(fs.result()) + + [(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)] + """ + def __init__( + self, + max_cores: int = 1, + cores_per_worker: int = 1, + threads_per_core: int = 1, + gpus_per_worker: int = 0, + oversubscribe: bool = False, + cwd: Optional[str] = None, + hostname_localhost: bool = False, + command_line_argument_lst: list[str] = [], + ): + super().__init__() + self._set_process( + RaisingThread( + target=execute_separate_tasks, + kwargs={ + # Executor Arguments + "future_queue": self._future_queue, + "cores": cores_per_worker, + "interface_class": SrunInterface, + "max_cores": max_cores, + "hostname_localhost": hostname_localhost, + # Interface Arguments + "threads_per_core": threads_per_core, + "gpus_per_core": int(gpus_per_worker / cores_per_worker), + "cwd": cwd, + "oversubscribe": oversubscribe, + "command_line_argument_lst": command_line_argument_lst, + }, + ) + ) From 0dbf4994d86bcb67503ddcf335d43b0e2a5db859 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Tue, 16 Apr 2024 21:04:17 -0500 Subject: [PATCH 02/17] clean up the interface --- pympipool/flux/executor.py | 2 +- pympipool/mpi/executor.py | 4 ++-- pympipool/shared/executorbase.py | 10 +++++----- pympipool/slurm/executor.py | 3 ++- 4 files changed, 10 insertions(+), 9 deletions(-) diff --git a/pympipool/flux/executor.py b/pympipool/flux/executor.py index 3115d1e8..baeab671 100644 --- a/pympipool/flux/executor.py +++ b/pympipool/flux/executor.py @@ -127,7 +127,7 @@ class PyFluxStepExecutor(ExecutorSteps): >>> return np.array([i, j, k]), size, rank >>> >>> with PyFluxStepExecutor(max_cores=2) as p: - >>> fs = p.submit(fn=calc, fn_args=[2], fn_kwargs={"j": 4, "k": 3}, executor_kwargs={"cores": 2}) + >>> fs = p.submit(fn=calc, args=[2], kwargs={"j": 4, "k": 3}, executor_kwargs={"cores": 2}) >>> print(fs.result()) [(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)] diff --git a/pympipool/mpi/executor.py b/pympipool/mpi/executor.py index 7fc01a21..7e212da8 100644 --- a/pympipool/mpi/executor.py +++ b/pympipool/mpi/executor.py @@ -117,7 +117,7 @@ class PyMPIStepExecutor(ExecutorSteps): >>> return np.array([i, j, k]), size, rank >>> >>> with PyMPIStepExecutor(max_cores=2) as p: - >>> fs = p.submit(fn=calc, fn_args=[2], fn_kwargs={"j": 4, "k": 3}, executor_kwargs={"cores": 2}) + >>> fs = p.submit(fn=calc, args=[2], kwargs={"j": 4, "k": 3}, executor_kwargs={"cores": 2}) >>> print(fs.result()) [(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)] @@ -148,4 +148,4 @@ def __init__( "oversubscribe": oversubscribe, }, ) - ) \ No newline at end of file + ) diff --git a/pympipool/shared/executorbase.py b/pympipool/shared/executorbase.py index a5137436..857aa087 100644 --- a/pympipool/shared/executorbase.py +++ b/pympipool/shared/executorbase.py @@ -132,9 +132,9 @@ class ExecutorSteps(ExecutorBase): def submit( self, fn: callable, - fn_args: list, - fn_kwargs: dict, - executor_kwargs: dict, + args: tuple = (), + kwargs: dict = {}, + executor_kwargs: dict = {}, ): """Submits a callable to be executed with the given arguments. @@ -148,8 +148,8 @@ def submit( self._future_queue.put( { "fn": fn, - "args": fn_args, - "kwargs": fn_kwargs, + "args": args, + "kwargs": kwargs, "future": f, "executor_kwargs": executor_kwargs, } diff --git a/pympipool/slurm/executor.py b/pympipool/slurm/executor.py index 22627871..66737648 100644 --- a/pympipool/slurm/executor.py +++ b/pympipool/slurm/executor.py @@ -124,11 +124,12 @@ class PySlurmStepExecutor(ExecutorSteps): >>> return np.array([i, j, k]), size, rank >>> >>> with PySlurmStepExecutor(max_cores=2) as p: - >>> fs = p.submit(fn=calc, fn_args=[2], fn_kwargs={"j": 4, "k": 3}, executor_kwargs={"cores": 2}) + >>> fs = p.submit(fn=calc, args=[2], kwargs={"j": 4, "k": 3}, executor_kwargs={"cores": 2}) >>> print(fs.result()) [(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)] """ + def __init__( self, max_cores: int = 1, From fbe95b83e67f373b5151b941696a3f7b66e2dddd Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Wed, 17 Apr 2024 15:09:48 -0500 Subject: [PATCH 03/17] support args and kwargs --- pympipool/shared/executorbase.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pympipool/shared/executorbase.py b/pympipool/shared/executorbase.py index 857aa087..b2ceccdb 100644 --- a/pympipool/shared/executorbase.py +++ b/pympipool/shared/executorbase.py @@ -132,9 +132,9 @@ class ExecutorSteps(ExecutorBase): def submit( self, fn: callable, - args: tuple = (), - kwargs: dict = {}, + *args, executor_kwargs: dict = {}, + **kwargs, ): """Submits a callable to be executed with the given arguments. From 6761c4e0835c7481681b44c55b175fcbc449fc0a Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Wed, 17 Apr 2024 15:11:11 -0500 Subject: [PATCH 04/17] Update Docstring example --- pympipool/flux/executor.py | 2 +- pympipool/mpi/executor.py | 2 +- pympipool/slurm/executor.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pympipool/flux/executor.py b/pympipool/flux/executor.py index baeab671..24f7ac89 100644 --- a/pympipool/flux/executor.py +++ b/pympipool/flux/executor.py @@ -127,7 +127,7 @@ class PyFluxStepExecutor(ExecutorSteps): >>> return np.array([i, j, k]), size, rank >>> >>> with PyFluxStepExecutor(max_cores=2) as p: - >>> fs = p.submit(fn=calc, args=[2], kwargs={"j": 4, "k": 3}, executor_kwargs={"cores": 2}) + >>> fs = p.submit(calc, 2, j=4, k=3, executor_kwargs={"cores": 2}) >>> print(fs.result()) [(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)] diff --git a/pympipool/mpi/executor.py b/pympipool/mpi/executor.py index 7e212da8..3e46d8c7 100644 --- a/pympipool/mpi/executor.py +++ b/pympipool/mpi/executor.py @@ -117,7 +117,7 @@ class PyMPIStepExecutor(ExecutorSteps): >>> return np.array([i, j, k]), size, rank >>> >>> with PyMPIStepExecutor(max_cores=2) as p: - >>> fs = p.submit(fn=calc, args=[2], kwargs={"j": 4, "k": 3}, executor_kwargs={"cores": 2}) + >>> fs = p.submit(calc, 2, j=4, k=3, executor_kwargs={"cores": 2}) >>> print(fs.result()) [(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)] diff --git a/pympipool/slurm/executor.py b/pympipool/slurm/executor.py index 66737648..3400fb53 100644 --- a/pympipool/slurm/executor.py +++ b/pympipool/slurm/executor.py @@ -124,7 +124,7 @@ class PySlurmStepExecutor(ExecutorSteps): >>> return np.array([i, j, k]), size, rank >>> >>> with PySlurmStepExecutor(max_cores=2) as p: - >>> fs = p.submit(fn=calc, args=[2], kwargs={"j": 4, "k": 3}, executor_kwargs={"cores": 2}) + >>> fs = p.submit(calc, 2, j=4, k=3, executor_kwargs={"cores": 2}) >>> print(fs.result()) [(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)] From e4577f61c3bcdbb53a6b33fe34a5c6133051a5d9 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Wed, 17 Apr 2024 15:57:58 -0500 Subject: [PATCH 05/17] Rename executor_kwargs to resource_dict --- pympipool/__init__.py | 114 ++++++++++++++++++++++--------- pympipool/flux/executor.py | 2 +- pympipool/mpi/executor.py | 2 +- pympipool/shared/executorbase.py | 69 +++++++++++++------ pympipool/slurm/executor.py | 2 +- 5 files changed, 133 insertions(+), 56 deletions(-) diff --git a/pympipool/__init__.py b/pympipool/__init__.py index dcf2b13a..45fe300c 100644 --- a/pympipool/__init__.py +++ b/pympipool/__init__.py @@ -2,14 +2,14 @@ import shutil from typing import Optional from ._version import get_versions -from pympipool.mpi.executor import PyMPIExecutor +from pympipool.mpi.executor import PyMPIExecutor, PyMPIStepExecutor from pympipool.shared.interface import SLURM_COMMAND from pympipool.shell.executor import SubprocessExecutor from pympipool.shell.interactive import ShellExecutor -from pympipool.slurm.executor import PySlurmExecutor +from pympipool.slurm.executor import PySlurmExecutor, PySlurmStepExecutor try: # The PyFluxExecutor requires flux-core to be installed. - from pympipool.flux.executor import PyFluxExecutor + from pympipool.flux.executor import PyFluxExecutor, PyFluxStepExecutor flux_installed = "FLUX_URI" in os.environ except ImportError: @@ -32,12 +32,11 @@ class Executor: an interactive Jupyter notebook. Args: - max_workers (int): defines the number workers which can execute functions in parallel + max_cores (int): defines the number cores which can be used in parallel cores_per_worker (int): number of MPI cores to be used for each function call threads_per_core (int): number of OpenMP threads to be used for each function call gpus_per_worker (int): number of GPUs per worker - defaults to 0 oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI only) - default False - init_function (None): optional function to preset arguments for functions which are submitted later cwd (str/None): current working directory where the parallel python task is executed hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the context of an HPC cluster this essential to be able to communicate to an @@ -46,6 +45,13 @@ class Executor: points to the same address as localhost. Still MacOS >= 12 seems to disable this look up for security reasons. So on MacOS it is required to set this option to true + backend (str): Switch between the different backends "flux", "mpi" or "slurm". Alternatively, when "auto" + is selected (the default) the available backend is determined automatically. + block_allocation (boolean): To accelerate the submission of a series of python functions with the same resource + requirements, pympipool supports block allocation. In this case all resources have + to be defined on the executor, rather than during the submission of the individual + function. + init_function (None): optional function to preset arguments for functions which are submitted later Examples: ``` @@ -70,32 +76,34 @@ class Executor: def __init__( self, - max_workers: int = 1, + max_cores: int = 1, cores_per_worker: int = 1, threads_per_core: int = 1, gpus_per_worker: int = 0, oversubscribe: bool = False, - init_function: Optional[callable] = None, cwd: Optional[str] = None, executor=None, hostname_localhost: bool = False, backend="auto", + block_allocation: bool = True, + init_function: Optional[callable] = None, ): # Use __new__() instead of __init__(). This function is only implemented to enable auto-completion. pass def __new__( cls, - max_workers: int = 1, + max_cores: int = 1, cores_per_worker: int = 1, threads_per_core: int = 1, gpus_per_worker: int = 0, oversubscribe: bool = False, - init_function: Optional[callable] = None, cwd: Optional[str] = None, executor=None, hostname_localhost: bool = False, backend: str = "auto", + block_allocation: bool = False, + init_function: Optional[callable] = None, ): """ Instead of returning a pympipool.Executor object this function returns either a pympipool.mpi.PyMPIExecutor, @@ -106,12 +114,11 @@ def __new__( requires the SLURM workload manager to be installed on the system. Args: - max_workers (int): defines the number workers which can execute functions in parallel + max_cores (int): defines the number cores which can be used in parallel cores_per_worker (int): number of MPI cores to be used for each function call threads_per_core (int): number of OpenMP threads to be used for each function call gpus_per_worker (int): number of GPUs per worker - defaults to 0 oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI only) - default False - init_function (None): optional function to preset arguments for functions which are submitted later cwd (str/None): current working directory where the parallel python task is executed hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the context of an HPC cluster this essential to be able to communicate to an @@ -122,8 +129,15 @@ def __new__( option to true backend (str): Switch between the different backends "flux", "mpi" or "slurm". Alternatively, when "auto" is selected (the default) the available backend is determined automatically. + block_allocation (boolean): To accelerate the submission of a series of python functions with the same + resource requirements, pympipool supports block allocation. In this case all + resources have to be defined on the executor, rather than during the submission + of the individual function. + init_function (None): optional function to preset arguments for functions which are submitted later """ + if not block_allocation and init_function is not None: + raise ValueError("") if backend not in ["auto", "mpi", "slurm", "flux"]: raise ValueError( 'The currently implemented backends are ["flux", "mpi", "slurm"]. ' @@ -137,23 +151,47 @@ def __new__( "Oversubscribing is not supported for the pympipool.flux.PyFLuxExecutor backend." "Please use oversubscribe=False instead of oversubscribe=True." ) - return PyFluxExecutor( - max_workers=max_workers, - cores_per_worker=cores_per_worker, - threads_per_core=threads_per_core, - gpus_per_worker=gpus_per_worker, - init_function=init_function, - cwd=cwd, - hostname_localhost=hostname_localhost, - ) + if block_allocation: + return PyFluxExecutor( + max_workers=int(max_cores / cores_per_worker), + cores_per_worker=cores_per_worker, + threads_per_core=threads_per_core, + gpus_per_worker=gpus_per_worker, + init_function=init_function, + cwd=cwd, + hostname_localhost=hostname_localhost, + ) + else: + return PyFluxStepExecutor( + max_cores=max_cores, + cores_per_worker=cores_per_worker, + threads_per_core=threads_per_core, + gpus_per_worker=gpus_per_worker, + cwd=cwd, + hostname_localhost=hostname_localhost, + ) elif backend == "slurm" or (backend == "auto" and slurm_installed): - return PySlurmExecutor( - max_workers=max_workers, - cores_per_worker=cores_per_worker, - init_function=init_function, - cwd=cwd, - hostname_localhost=hostname_localhost, - ) + if block_allocation: + return PySlurmExecutor( + max_workers=int(max_cores / cores_per_worker), + cores_per_worker=cores_per_worker, + threads_per_core=threads_per_core, + gpus_per_worker=gpus_per_worker, + oversubscribe=oversubscribe, + init_function=init_function, + cwd=cwd, + hostname_localhost=hostname_localhost, + ) + else: + return PySlurmStepExecutor( + max_cores=max_cores, + cores_per_worker=cores_per_worker, + threads_per_core=threads_per_core, + gpus_per_worker=gpus_per_worker, + oversubscribe=oversubscribe, + cwd=cwd, + hostname_localhost=hostname_localhost, + ) else: # backend="mpi" if threads_per_core != 1: raise ValueError( @@ -169,10 +207,18 @@ def __new__( + str(gpus_per_worker) + "." ) - return PyMPIExecutor( - max_workers=max_workers, - cores_per_worker=cores_per_worker, - init_function=init_function, - cwd=cwd, - hostname_localhost=hostname_localhost, - ) + if block_allocation: + return PyMPIExecutor( + max_workers=int(max_cores / cores_per_worker), + cores_per_worker=cores_per_worker, + init_function=init_function, + cwd=cwd, + hostname_localhost=hostname_localhost, + ) + else: + return PyMPIStepExecutor( + max_cores=max_cores, + cores_per_worker=cores_per_worker, + cwd=cwd, + hostname_localhost=hostname_localhost, + ) diff --git a/pympipool/flux/executor.py b/pympipool/flux/executor.py index 24f7ac89..32389de4 100644 --- a/pympipool/flux/executor.py +++ b/pympipool/flux/executor.py @@ -127,7 +127,7 @@ class PyFluxStepExecutor(ExecutorSteps): >>> return np.array([i, j, k]), size, rank >>> >>> with PyFluxStepExecutor(max_cores=2) as p: - >>> fs = p.submit(calc, 2, j=4, k=3, executor_kwargs={"cores": 2}) + >>> fs = p.submit(calc, 2, j=4, k=3, resource_dict={"cores": 2}) >>> print(fs.result()) [(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)] diff --git a/pympipool/mpi/executor.py b/pympipool/mpi/executor.py index 3e46d8c7..0cbec602 100644 --- a/pympipool/mpi/executor.py +++ b/pympipool/mpi/executor.py @@ -117,7 +117,7 @@ class PyMPIStepExecutor(ExecutorSteps): >>> return np.array([i, j, k]), size, rank >>> >>> with PyMPIStepExecutor(max_cores=2) as p: - >>> fs = p.submit(calc, 2, j=4, k=3, executor_kwargs={"cores": 2}) + >>> fs = p.submit(calc, 2, j=4, k=3, resource_dict={"cores": 2}) >>> print(fs.result()) [(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)] diff --git a/pympipool/shared/executorbase.py b/pympipool/shared/executorbase.py index b2ceccdb..b063e00b 100644 --- a/pympipool/shared/executorbase.py +++ b/pympipool/shared/executorbase.py @@ -41,21 +41,42 @@ def info(self): def future_queue(self): return self._future_queue - def submit(self, fn: callable, *args, **kwargs): - """Submits a callable to be executed with the given arguments. + def submit(self, fn: callable, *args, resource_dict: dict = {}, **kwargs): + """ + Submits a callable to be executed with the given arguments. Schedules the callable to be executed as fn(*args, **kwargs) and returns a Future instance representing the execution of the callable. + Args: + fn (callable): function to submit for execution + args: arguments for the submitted function + kwargs: keyword arguments for the submitted function + resource_dict (dict): resource dictionary, which defines the resources used for the execution of the + function. Example resource dictionary: { + cores: 1, + threads_per_core: 1, + gpus_per_worker: 0, + oversubscribe: False, + cwd: None, + executor: None, + hostname_localhost: False, + } + Returns: A Future representing the given call. """ + if len(resource_dict) > 0: + raise ValueError( + "When block_allocation is enabled, the resource requirements have to be defined on the executor level." + ) f = Future() self._future_queue.put({"fn": fn, "args": args, "kwargs": kwargs, "future": f}) return f def shutdown(self, wait: bool = True, *, cancel_futures: bool = False): - """Clean-up the resources associated with the Executor. + """ + Clean-up the resources associated with the Executor. It is safe to call this method several times. Otherwise, no other methods can be called after this one. @@ -90,10 +111,6 @@ def __del__(self): except (AttributeError, RuntimeError): pass - def _set_process(self, process: RaisingThread): - self._process = process - self._process.start() - class ExecutorBroker(ExecutorBase): def shutdown(self, wait: bool = True, *, cancel_futures: bool = False): @@ -129,18 +146,28 @@ def _set_process(self, process: List[RaisingThread]): class ExecutorSteps(ExecutorBase): - def submit( - self, - fn: callable, - *args, - executor_kwargs: dict = {}, - **kwargs, - ): - """Submits a callable to be executed with the given arguments. + def submit(self, fn: callable, *args, resource_dict: dict = {}, **kwargs): + """ + Submits a callable to be executed with the given arguments. Schedules the callable to be executed as fn(*args, **kwargs) and returns a Future instance representing the execution of the callable. + Args: + fn (callable): function to submit for execution + args: arguments for the submitted function + kwargs: keyword arguments for the submitted function + resource_dict (dict): resource dictionary, which defines the resources used for the execution of the + function. Example resource dictionary: { + cores: 1, + threads_per_core: 1, + gpus_per_worker: 0, + oversubscribe: False, + cwd: None, + executor: None, + hostname_localhost: False, + } + Returns: A Future representing the given call. """ @@ -151,7 +178,7 @@ def submit( "args": args, "kwargs": kwargs, "future": f, - "executor_kwargs": executor_kwargs, + "resource_dict": resource_dict, } ) return f @@ -298,9 +325,12 @@ def execute_separate_tasks( option to true """ active_task_dict = {} + process_lst = [] while True: task_dict = future_queue.get() if "shutdown" in task_dict.keys() and task_dict["shutdown"]: + if task_dict["wait"]: + _ = [process.join() for process in process_lst] future_queue.task_done() future_queue.join() break @@ -309,13 +339,13 @@ def execute_separate_tasks( active_task_dict=active_task_dict, max_cores=max_cores, ) - executor_dict = task_dict.pop("executor_kwargs") + resource_dict = task_dict.pop("resource_dict") qtask = queue.Queue() qtask.put(task_dict) qtask.put({"shutdown": True, "wait": True}) - active_task_dict[task_dict["future"]] = executor_dict["cores"] + active_task_dict[task_dict["future"]] = resource_dict["cores"] task_kwargs = kwargs.copy() - task_kwargs.update(executor_dict) + task_kwargs.update(resource_dict) task_kwargs.update( { "future_queue": qtask, @@ -329,6 +359,7 @@ def execute_separate_tasks( kwargs=task_kwargs, ) process.start() + process_lst.append(process) future_queue.task_done() diff --git a/pympipool/slurm/executor.py b/pympipool/slurm/executor.py index 3400fb53..ed6ddc23 100644 --- a/pympipool/slurm/executor.py +++ b/pympipool/slurm/executor.py @@ -124,7 +124,7 @@ class PySlurmStepExecutor(ExecutorSteps): >>> return np.array([i, j, k]), size, rank >>> >>> with PySlurmStepExecutor(max_cores=2) as p: - >>> fs = p.submit(calc, 2, j=4, k=3, executor_kwargs={"cores": 2}) + >>> fs = p.submit(calc, 2, j=4, k=3, resource_dict={"cores": 2}) >>> print(fs.result()) [(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)] From 92b01be2f1f41a6cb996e63843f93906c5b2caac Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Thu, 18 Apr 2024 13:51:06 -0500 Subject: [PATCH 06/17] fix tests --- tests/test_with_dynamic_objects.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/tests/test_with_dynamic_objects.py b/tests/test_with_dynamic_objects.py index 1f9277ad..6e757cd9 100644 --- a/tests/test_with_dynamic_objects.py +++ b/tests/test_with_dynamic_objects.py @@ -76,7 +76,7 @@ def slowly_returns_dynamic(dynamic_arg): return dynamic_arg dynamic_dynamic = slowly_returns_dynamic() - executor = Executor(hostname_localhost=True) + executor = Executor(hostname_localhost=True, block_allocation=True) cloudpickle_register(ind=1) dynamic_object = does_nothing() fs = executor.submit(dynamic_dynamic.run, dynamic_object) @@ -109,7 +109,7 @@ def slowly_returns_42(): dynamic_42.result, msg="Just a sanity check that the test is set up right" ) - executor = Executor(hostname_localhost=True) + executor = Executor(hostname_localhost=True, block_allocation=True) cloudpickle_register(ind=1) fs = executor.submit(dynamic_42.run) fs.add_done_callback(dynamic_42.process_result) @@ -142,7 +142,7 @@ def returns_42(): dynamic_42.running, msg="Sanity check that the test starts in the expected condition" ) - executor = Executor(hostname_localhost=True) + executor = Executor(hostname_localhost=True, block_allocation=True) cloudpickle_register(ind=1) fs = executor.submit(dynamic_42.run) fs.add_done_callback(dynamic_42.process_result) @@ -166,7 +166,7 @@ def raise_error(): raise RuntimeError re = raise_error() - executor = Executor(hostname_localhost=True) + executor = Executor(hostname_localhost=True, block_allocation=True) cloudpickle_register(ind=1) fs = executor.submit(re.run) with self.assertRaises( @@ -196,7 +196,7 @@ def slowly_returns_dynamic(): return inside_variable dynamic_dynamic = slowly_returns_dynamic() - executor = Executor(hostname_localhost=True) + executor = Executor(hostname_localhost=True, block_allocation=True) cloudpickle_register(ind=1) fs = executor.submit(dynamic_dynamic.run) self.assertIsInstance( @@ -225,7 +225,7 @@ def slow(): return fortytwo f = slow() - executor = Executor(hostname_localhost=True) + executor = Executor(hostname_localhost=True, block_allocation=True) cloudpickle_register(ind=1) fs = executor.submit(f.run) self.assertEqual( From aaadf69052441b8b69996324cf45e44cc603d585 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Thu, 18 Apr 2024 15:09:13 -0500 Subject: [PATCH 07/17] block_allocation=True --- tests/test_mpi_executor.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/tests/test_mpi_executor.py b/tests/test_mpi_executor.py index 03548711..3205c8a1 100644 --- a/tests/test_mpi_executor.py +++ b/tests/test_mpi_executor.py @@ -53,7 +53,7 @@ def test_errors(self): class TestExecutorBackend(unittest.TestCase): def test_meta_executor_serial(self): - with Executor(max_workers=2, hostname_localhost=True, backend="mpi") as exe: + with Executor(max_cores=2, hostname_localhost=True, backend="mpi", block_allocation=True) as exe: cloudpickle_register(ind=1) fs_1 = exe.submit(calc, 1) fs_2 = exe.submit(calc, 2) @@ -63,7 +63,7 @@ def test_meta_executor_serial(self): self.assertTrue(fs_2.done()) def test_meta_executor_single(self): - with Executor(max_workers=1, hostname_localhost=True, backend="mpi") as exe: + with Executor(max_cores=1, hostname_localhost=True, backend="mpi", block_allocation=True) as exe: cloudpickle_register(ind=1) fs_1 = exe.submit(calc, 1) fs_2 = exe.submit(calc, 2) @@ -73,7 +73,7 @@ def test_meta_executor_single(self): self.assertTrue(fs_2.done()) def test_meta_executor_parallel(self): - with Executor(max_workers=1, cores_per_worker=2, hostname_localhost=True, backend="mpi") as exe: + with Executor(max_cores=2, cores_per_worker=2, hostname_localhost=True, backend="mpi", block_allocation=True) as exe: cloudpickle_register(ind=1) fs_1 = exe.submit(mpi_funct, 1) self.assertEqual(fs_1.result(), [(1, 2, 0), (1, 2, 1)]) @@ -81,6 +81,6 @@ def test_meta_executor_parallel(self): def test_errors(self): with self.assertRaises(TypeError): - Executor(max_workers=1, cores_per_worker=1, threads_per_core=2, hostname_localhost=True, backend="mpi") + Executor(max_cores=1, cores_per_worker=1, threads_per_core=2, hostname_localhost=True, backend="mpi") with self.assertRaises(TypeError): - Executor(max_workers=1, cores_per_worker=1, gpus_per_worker=1, hostname_localhost=True, backend="mpi") + Executor(max_cores=1, cores_per_worker=1, gpus_per_worker=1, hostname_localhost=True, backend="mpi") From ffd028779fb1e1c30968caa3604573890e1c2061 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Thu, 18 Apr 2024 15:14:40 -0500 Subject: [PATCH 08/17] fix flux bug --- tests/test_flux_executor.py | 34 +++++++++++++++++++++++++++++----- 1 file changed, 29 insertions(+), 5 deletions(-) diff --git a/tests/test_flux_executor.py b/tests/test_flux_executor.py index 2d8f34ed..01b7b83b 100644 --- a/tests/test_flux_executor.py +++ b/tests/test_flux_executor.py @@ -132,7 +132,12 @@ def setUp(self): self.executor = flux.job.FluxExecutor() def test_flux_executor_serial(self): - with Executor(max_workers=2, executor=self.executor, backend="flux") as exe: + with Executor( + max_cores=2, + executor=self.executor, + backend="flux", + block_allocation=True, + ) as exe: fs_1 = exe.submit(calc, 1) fs_2 = exe.submit(calc, 2) self.assertEqual(fs_1.result(), 1) @@ -142,7 +147,11 @@ def test_flux_executor_serial(self): def test_flux_executor_threads(self): with Executor( - max_workers=1, threads_per_core=2, executor=self.executor, backend="flux" + max_cores=2, + threads_per_core=2, + executor=self.executor, + backend="flux", + block_allocation=True, ) as exe: fs_1 = exe.submit(calc, 1) fs_2 = exe.submit(calc, 2) @@ -153,14 +162,24 @@ def test_flux_executor_threads(self): def test_flux_executor_parallel(self): with Executor( - max_workers=1, cores_per_worker=2, executor=self.executor, backend="flux" + max_cores=2, + cores_per_worker=2, + executor=self.executor, + backend="flux", + block_allocation=True, ) as exe: fs_1 = exe.submit(mpi_funct, 1) self.assertEqual(fs_1.result(), [(1, 2, 0), (1, 2, 1)]) self.assertTrue(fs_1.done()) def test_single_task(self): - with Executor(max_workers=1, cores_per_worker=2, executor=self.executor, backend="flux") as p: + with Executor( + max_cores=2, + cores_per_worker=2, + executor=self.executor, + backend="flux", + block_allocation=True, + ) as p: output = p.map(mpi_funct, [1, 2, 3]) self.assertEqual( list(output), @@ -169,7 +188,12 @@ def test_single_task(self): def test_internal_memory(self): with Executor( - max_workers=1, cores_per_worker=1, init_function=set_global, executor=self.executor, backend="flux" + max_cores=1, + cores_per_worker=1, + init_function=set_global, + executor=self.executor, + backend="flux", + block_allocation=True, ) as p: f = p.submit(get_global) self.assertFalse(f.done()) From 79d06915b02741c6e3226d1b7a591659cb6dd8eb Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Thu, 18 Apr 2024 15:26:03 -0500 Subject: [PATCH 09/17] test fix --- tests/test_flux_executor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_flux_executor.py b/tests/test_flux_executor.py index 01b7b83b..17ce020e 100644 --- a/tests/test_flux_executor.py +++ b/tests/test_flux_executor.py @@ -147,7 +147,7 @@ def test_flux_executor_serial(self): def test_flux_executor_threads(self): with Executor( - max_cores=2, + max_cores=1, threads_per_core=2, executor=self.executor, backend="flux", From 7d9ca11a81cad4dc4ad1845dcbf021ff416bd050 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Thu, 18 Apr 2024 15:33:21 -0500 Subject: [PATCH 10/17] Merge tests --- tests/test_mpi_executor.py | 34 ++++++++++++++++++++++ tests/test_mpi_executor_parallel.py | 45 ----------------------------- 2 files changed, 34 insertions(+), 45 deletions(-) delete mode 100644 tests/test_mpi_executor_parallel.py diff --git a/tests/test_mpi_executor.py b/tests/test_mpi_executor.py index 3205c8a1..2ed04939 100644 --- a/tests/test_mpi_executor.py +++ b/tests/test_mpi_executor.py @@ -8,6 +8,10 @@ def calc(i): return i +def echo_funct(i): + return i + + def mpi_funct(i): from mpi4py import MPI @@ -84,3 +88,33 @@ def test_errors(self): Executor(max_cores=1, cores_per_worker=1, threads_per_core=2, hostname_localhost=True, backend="mpi") with self.assertRaises(TypeError): Executor(max_cores=1, cores_per_worker=1, gpus_per_worker=1, hostname_localhost=True, backend="mpi") + + +class TestTask(unittest.TestCase): + def test_echo(self): + with PyMPIExecutor(max_workers=1, cores_per_worker=2, hostname_localhost=True) as p: + cloudpickle_register(ind=1) + output = p.submit(echo_funct, 2).result() + self.assertEqual(output, [2, 2]) + + def test_mpi(self): + with PyMPIExecutor(max_workers=1, cores_per_worker=2, hostname_localhost=True) as p: + cloudpickle_register(ind=1) + output = p.submit(mpi_funct, 2).result() + self.assertEqual(output, [(2, 2, 0), (2, 2, 1)]) + + def test_mpi_multiple(self): + with PyMPIExecutor(max_workers=1, cores_per_worker=2, hostname_localhost=True) as p: + cloudpickle_register(ind=1) + fs1 = p.submit(mpi_funct, 1) + fs2 = p.submit(mpi_funct, 2) + fs3 = p.submit(mpi_funct, 3) + output = [ + fs1.result(), + fs2.result(), + fs3.result(), + ] + self.assertEqual( + output, + [[(1, 2, 0), (1, 2, 1)], [(2, 2, 0), (2, 2, 1)], [(3, 2, 0), (3, 2, 1)]], + ) diff --git a/tests/test_mpi_executor_parallel.py b/tests/test_mpi_executor_parallel.py deleted file mode 100644 index 2dc037c8..00000000 --- a/tests/test_mpi_executor_parallel.py +++ /dev/null @@ -1,45 +0,0 @@ -import unittest -from pympipool.mpi.executor import PyMPIExecutor -from pympipool.shared.executorbase import cloudpickle_register - - -def echo_funct(i): - return i - - -def mpi_funct(i): - from mpi4py import MPI - - size = MPI.COMM_WORLD.Get_size() - rank = MPI.COMM_WORLD.Get_rank() - return i, size, rank - - -class TestTask(unittest.TestCase): - def test_echo(self): - with PyMPIExecutor(max_workers=1, cores_per_worker=2, hostname_localhost=True) as p: - cloudpickle_register(ind=1) - output = p.submit(echo_funct, 2).result() - self.assertEqual(output, [2, 2]) - - def test_mpi(self): - with PyMPIExecutor(max_workers=1, cores_per_worker=2, hostname_localhost=True) as p: - cloudpickle_register(ind=1) - output = p.submit(mpi_funct, 2).result() - self.assertEqual(output, [(2, 2, 0), (2, 2, 1)]) - - def test_mpi_multiple(self): - with PyMPIExecutor(max_workers=1, cores_per_worker=2, hostname_localhost=True) as p: - cloudpickle_register(ind=1) - fs1 = p.submit(mpi_funct, 1) - fs2 = p.submit(mpi_funct, 2) - fs3 = p.submit(mpi_funct, 3) - output = [ - fs1.result(), - fs2.result(), - fs3.result(), - ] - self.assertEqual( - output, - [[(1, 2, 0), (1, 2, 1)], [(2, 2, 0), (2, 2, 1)], [(3, 2, 0), (3, 2, 1)]], - ) From e7b0b5e6f1fe13193d19607bbeb731edbf5c17ee Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Thu, 18 Apr 2024 15:42:25 -0500 Subject: [PATCH 11/17] Resort imports --- tests/test_backend_serial.py | 4 +++- tests/test_flux_executor.py | 2 +- tests/test_mpi_executor.py | 1 + tests/test_mpi_executor_future.py | 8 +++++--- tests/test_mpi_executor_memory.py | 8 +++++--- tests/test_mpi_executor_worker.py | 12 +++++------- tests/test_shared_backend.py | 1 + tests/test_shared_communication.py | 2 +- tests/test_shared_executorbase.py | 3 ++- tests/test_shared_thread.py | 1 + tests/test_shell_executor.py | 5 ++--- tests/test_shell_interactive.py | 10 +++++++--- 12 files changed, 34 insertions(+), 23 deletions(-) diff --git a/tests/test_backend_serial.py b/tests/test_backend_serial.py index 2f4a7075..07a70861 100644 --- a/tests/test_backend_serial.py +++ b/tests/test_backend_serial.py @@ -1,8 +1,10 @@ +from threading import Thread import unittest + import cloudpickle import zmq + from pympipool.backend.serial import main -from threading import Thread def calc(i, j): diff --git a/tests/test_flux_executor.py b/tests/test_flux_executor.py index 17ce020e..1f18cc29 100644 --- a/tests/test_flux_executor.py +++ b/tests/test_flux_executor.py @@ -1,9 +1,9 @@ from concurrent.futures import Future import os from queue import Queue +import unittest import numpy as np -import unittest from pympipool import Executor from pympipool.shared.executorbase import cloudpickle_register, execute_parallel_tasks diff --git a/tests/test_mpi_executor.py b/tests/test_mpi_executor.py index 2ed04939..0d0263ca 100644 --- a/tests/test_mpi_executor.py +++ b/tests/test_mpi_executor.py @@ -1,4 +1,5 @@ import unittest + from pympipool import Executor from pympipool.mpi.executor import PyMPIExecutor from pympipool.shared.executorbase import cloudpickle_register diff --git a/tests/test_mpi_executor_future.py b/tests/test_mpi_executor_future.py index 1854e084..f5d62582 100644 --- a/tests/test_mpi_executor_future.py +++ b/tests/test_mpi_executor_future.py @@ -1,8 +1,10 @@ -import numpy as np -import unittest +from concurrent.futures import Future from time import sleep +import unittest + +import numpy as np + from pympipool.mpi.executor import PyMPIExecutor -from concurrent.futures import Future def calc(i): diff --git a/tests/test_mpi_executor_memory.py b/tests/test_mpi_executor_memory.py index 372c34f5..c8f139e5 100644 --- a/tests/test_mpi_executor_memory.py +++ b/tests/test_mpi_executor_memory.py @@ -1,10 +1,12 @@ +from concurrent.futures import Future +from queue import Queue import unittest + import numpy as np -from queue import Queue + +from pympipool.mpi.executor import PyMPIExecutor, MpiExecInterface from pympipool.shared.backend import call_funct from pympipool.shared.executorbase import cloudpickle_register, execute_parallel_tasks -from pympipool.mpi.executor import PyMPIExecutor, MpiExecInterface -from concurrent.futures import Future def get_global(memory=None): diff --git a/tests/test_mpi_executor_worker.py b/tests/test_mpi_executor_worker.py index 68cf497c..4fd97cc3 100644 --- a/tests/test_mpi_executor_worker.py +++ b/tests/test_mpi_executor_worker.py @@ -1,12 +1,12 @@ -import numpy as np -import unittest +from concurrent.futures import CancelledError, Future from queue import Queue from time import sleep -from concurrent.futures import CancelledError +import unittest + +import numpy as np + from pympipool.mpi.executor import PyMPIExecutor, MpiExecInterface from pympipool.shared.executorbase import cloudpickle_register, execute_parallel_tasks, ExecutorBase -from pympipool.shell import ShellExecutor -from concurrent.futures import Future def calc(i): @@ -97,8 +97,6 @@ def test_meta(self): self.assertEqual(str(exe.info[k]), v) with ExecutorBase() as exe: self.assertIsNone(exe.info) - with ShellExecutor(["sleep"]) as exe: - self.assertEqual(exe.info, {}) def test_pool_multi_core(self): with PyMPIExecutor(max_workers=1, cores_per_worker=2, hostname_localhost=True) as p: diff --git a/tests/test_shared_backend.py b/tests/test_shared_backend.py index 3dd2336d..89a7c4d2 100644 --- a/tests/test_shared_backend.py +++ b/tests/test_shared_backend.py @@ -1,6 +1,7 @@ import os import sys import unittest + from pympipool.shared.backend import parse_arguments from pympipool.shared.interface import SrunInterface, MpiExecInterface diff --git a/tests/test_shared_communication.py b/tests/test_shared_communication.py index 476026f2..8eed9a65 100644 --- a/tests/test_shared_communication.py +++ b/tests/test_shared_communication.py @@ -1,8 +1,8 @@ import os import sys +import unittest import numpy as np -import unittest import zmq from pympipool.shared.communication import ( diff --git a/tests/test_shared_executorbase.py b/tests/test_shared_executorbase.py index 0fd49d09..348fe59e 100644 --- a/tests/test_shared_executorbase.py +++ b/tests/test_shared_executorbase.py @@ -1,6 +1,7 @@ -import unittest from concurrent.futures import Future, CancelledError from queue import Queue +import unittest + from pympipool.shared.executorbase import cancel_items_in_queue diff --git a/tests/test_shared_thread.py b/tests/test_shared_thread.py index 8fb1fddd..5f58fda2 100644 --- a/tests/test_shared_thread.py +++ b/tests/test_shared_thread.py @@ -1,4 +1,5 @@ import unittest + from pympipool.shared.thread import RaisingThread diff --git a/tests/test_shell_executor.py b/tests/test_shell_executor.py index 28d28561..5aa19c3c 100644 --- a/tests/test_shell_executor.py +++ b/tests/test_shell_executor.py @@ -1,12 +1,11 @@ from concurrent.futures import Future import queue - -from unittest import TestCase +import unittest from pympipool.shell.executor import SubprocessExecutor, execute_single_task -class SubprocessExecutorTest(TestCase): +class SubprocessExecutorTest(unittest.TestCase): def test_execute_single_task(self): test_queue = queue.Queue() f = Future() diff --git a/tests/test_shell_interactive.py b/tests/test_shell_interactive.py index 3c98cd6a..ea756f55 100644 --- a/tests/test_shell_interactive.py +++ b/tests/test_shell_interactive.py @@ -1,13 +1,12 @@ from concurrent.futures import Future import os import queue - -from unittest import TestCase +import unittest from pympipool.shell.interactive import ShellExecutor, execute_single_task -class ShellInteractiveExecutorTest(TestCase): +class ShellInteractiveExecutorTest(unittest.TestCase): def setUp(self): self.executable_path = os.path.join(os.path.dirname(__file__), "executables", "count.py") @@ -37,3 +36,8 @@ def test_shell_interactive_executor(self): self.assertEqual("0\n1\n2\n3\ndone\n", future_pattern.result()) self.assertTrue(future_lines.done()) self.assertTrue(future_pattern.done()) + + def test_meta(self): + with ShellExecutor(["sleep"]) as exe: + self.assertEqual(exe.info, {}) + From d49d74f233c5984b30045136e2d9668a4bf5fa82 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Thu, 18 Apr 2024 15:51:38 -0500 Subject: [PATCH 12/17] refactoring --- .github/workflows/unittest-flux.yml | 2 +- tests/test_executor_backend_flux.py | 117 ++++++++++++++++++++++++++++ tests/test_executor_backend_mpi.py | 51 ++++++++++++ tests/test_flux_executor.py | 77 ------------------ tests/test_mpi_executor.py | 36 --------- 5 files changed, 169 insertions(+), 114 deletions(-) create mode 100644 tests/test_executor_backend_flux.py create mode 100644 tests/test_executor_backend_mpi.py diff --git a/.github/workflows/unittest-flux.yml b/.github/workflows/unittest-flux.yml index 221a5357..410a60df 100644 --- a/.github/workflows/unittest-flux.yml +++ b/.github/workflows/unittest-flux.yml @@ -55,7 +55,7 @@ jobs: timeout-minutes: 5 run: > flux start - coverage run -a --omit="pympipool/_version.py,tests/*" -m unittest tests/test_flux_executor.py; + coverage run -a --omit="pympipool/_version.py,tests/*" -m unittest tests/test_flux_executor.py tests/test_executor_backend_flux.py; env: OMPI_MCA_plm: 'isolated' OMPI_MCA_rmaps_base_oversubscribe: 'yes' diff --git a/tests/test_executor_backend_flux.py b/tests/test_executor_backend_flux.py new file mode 100644 index 00000000..13b92b0d --- /dev/null +++ b/tests/test_executor_backend_flux.py @@ -0,0 +1,117 @@ +from concurrent.futures import Future +import os +from queue import Queue +import unittest + +import numpy as np + +from pympipool import Executor + + +try: + import flux.job + from pympipool.flux.executor import ( + PyFluxExecutor, + FluxPythonInterface, + ) + + skip_flux_test = "FLUX_URI" not in os.environ +except ImportError: + skip_flux_test = True + + +def calc(i): + return i + + +def mpi_funct(i): + from mpi4py import MPI + + size = MPI.COMM_WORLD.Get_size() + rank = MPI.COMM_WORLD.Get_rank() + return i, size, rank + + +def get_global(memory=None): + return memory + + +def set_global(): + return {"memory": np.array([5])} + + +@unittest.skipIf( + skip_flux_test, "Flux is not installed, so the flux tests are skipped." +) +class TestFluxBackend(unittest.TestCase): + def setUp(self): + self.executor = flux.job.FluxExecutor() + + def test_flux_executor_serial(self): + with Executor( + max_cores=2, + executor=self.executor, + backend="flux", + block_allocation=True, + ) as exe: + fs_1 = exe.submit(calc, 1) + fs_2 = exe.submit(calc, 2) + self.assertEqual(fs_1.result(), 1) + self.assertEqual(fs_2.result(), 2) + self.assertTrue(fs_1.done()) + self.assertTrue(fs_2.done()) + + def test_flux_executor_threads(self): + with Executor( + max_cores=1, + threads_per_core=2, + executor=self.executor, + backend="flux", + block_allocation=True, + ) as exe: + fs_1 = exe.submit(calc, 1) + fs_2 = exe.submit(calc, 2) + self.assertEqual(fs_1.result(), 1) + self.assertEqual(fs_2.result(), 2) + self.assertTrue(fs_1.done()) + self.assertTrue(fs_2.done()) + + def test_flux_executor_parallel(self): + with Executor( + max_cores=2, + cores_per_worker=2, + executor=self.executor, + backend="flux", + block_allocation=True, + ) as exe: + fs_1 = exe.submit(mpi_funct, 1) + self.assertEqual(fs_1.result(), [(1, 2, 0), (1, 2, 1)]) + self.assertTrue(fs_1.done()) + + def test_single_task(self): + with Executor( + max_cores=2, + cores_per_worker=2, + executor=self.executor, + backend="flux", + block_allocation=True, + ) as p: + output = p.map(mpi_funct, [1, 2, 3]) + self.assertEqual( + list(output), + [[(1, 2, 0), (1, 2, 1)], [(2, 2, 0), (2, 2, 1)], [(3, 2, 0), (3, 2, 1)]], + ) + + def test_internal_memory(self): + with Executor( + max_cores=1, + cores_per_worker=1, + init_function=set_global, + executor=self.executor, + backend="flux", + block_allocation=True, + ) as p: + f = p.submit(get_global) + self.assertFalse(f.done()) + self.assertEqual(f.result(), np.array([5])) + self.assertTrue(f.done()) diff --git a/tests/test_executor_backend_mpi.py b/tests/test_executor_backend_mpi.py new file mode 100644 index 00000000..3ceb5db5 --- /dev/null +++ b/tests/test_executor_backend_mpi.py @@ -0,0 +1,51 @@ +import unittest + +from pympipool import Executor +from pympipool.shared.executorbase import cloudpickle_register + + +def calc(i): + return i + + +def mpi_funct(i): + from mpi4py import MPI + + size = MPI.COMM_WORLD.Get_size() + rank = MPI.COMM_WORLD.Get_rank() + return i, size, rank + + +class TestExecutorBackend(unittest.TestCase): + def test_meta_executor_serial(self): + with Executor(max_cores=2, hostname_localhost=True, backend="mpi", block_allocation=True) as exe: + cloudpickle_register(ind=1) + fs_1 = exe.submit(calc, 1) + fs_2 = exe.submit(calc, 2) + self.assertEqual(fs_1.result(), 1) + self.assertEqual(fs_2.result(), 2) + self.assertTrue(fs_1.done()) + self.assertTrue(fs_2.done()) + + def test_meta_executor_single(self): + with Executor(max_cores=1, hostname_localhost=True, backend="mpi", block_allocation=True) as exe: + cloudpickle_register(ind=1) + fs_1 = exe.submit(calc, 1) + fs_2 = exe.submit(calc, 2) + self.assertEqual(fs_1.result(), 1) + self.assertEqual(fs_2.result(), 2) + self.assertTrue(fs_1.done()) + self.assertTrue(fs_2.done()) + + def test_meta_executor_parallel(self): + with Executor(max_cores=2, cores_per_worker=2, hostname_localhost=True, backend="mpi", block_allocation=True) as exe: + cloudpickle_register(ind=1) + fs_1 = exe.submit(mpi_funct, 1) + self.assertEqual(fs_1.result(), [(1, 2, 0), (1, 2, 1)]) + self.assertTrue(fs_1.done()) + + def test_errors(self): + with self.assertRaises(TypeError): + Executor(max_cores=1, cores_per_worker=1, threads_per_core=2, hostname_localhost=True, backend="mpi") + with self.assertRaises(TypeError): + Executor(max_cores=1, cores_per_worker=1, gpus_per_worker=1, hostname_localhost=True, backend="mpi") diff --git a/tests/test_flux_executor.py b/tests/test_flux_executor.py index 1f18cc29..50406974 100644 --- a/tests/test_flux_executor.py +++ b/tests/test_flux_executor.py @@ -5,7 +5,6 @@ import numpy as np -from pympipool import Executor from pympipool.shared.executorbase import cloudpickle_register, execute_parallel_tasks @@ -123,79 +122,3 @@ def test_internal_memory(self): self.assertFalse(f.done()) self.assertEqual(f.result(), np.array([5])) self.assertTrue(f.done()) - -@unittest.skipIf( - skip_flux_test, "Flux is not installed, so the flux tests are skipped." -) -class TestFluxBackend(unittest.TestCase): - def setUp(self): - self.executor = flux.job.FluxExecutor() - - def test_flux_executor_serial(self): - with Executor( - max_cores=2, - executor=self.executor, - backend="flux", - block_allocation=True, - ) as exe: - fs_1 = exe.submit(calc, 1) - fs_2 = exe.submit(calc, 2) - self.assertEqual(fs_1.result(), 1) - self.assertEqual(fs_2.result(), 2) - self.assertTrue(fs_1.done()) - self.assertTrue(fs_2.done()) - - def test_flux_executor_threads(self): - with Executor( - max_cores=1, - threads_per_core=2, - executor=self.executor, - backend="flux", - block_allocation=True, - ) as exe: - fs_1 = exe.submit(calc, 1) - fs_2 = exe.submit(calc, 2) - self.assertEqual(fs_1.result(), 1) - self.assertEqual(fs_2.result(), 2) - self.assertTrue(fs_1.done()) - self.assertTrue(fs_2.done()) - - def test_flux_executor_parallel(self): - with Executor( - max_cores=2, - cores_per_worker=2, - executor=self.executor, - backend="flux", - block_allocation=True, - ) as exe: - fs_1 = exe.submit(mpi_funct, 1) - self.assertEqual(fs_1.result(), [(1, 2, 0), (1, 2, 1)]) - self.assertTrue(fs_1.done()) - - def test_single_task(self): - with Executor( - max_cores=2, - cores_per_worker=2, - executor=self.executor, - backend="flux", - block_allocation=True, - ) as p: - output = p.map(mpi_funct, [1, 2, 3]) - self.assertEqual( - list(output), - [[(1, 2, 0), (1, 2, 1)], [(2, 2, 0), (2, 2, 1)], [(3, 2, 0), (3, 2, 1)]], - ) - - def test_internal_memory(self): - with Executor( - max_cores=1, - cores_per_worker=1, - init_function=set_global, - executor=self.executor, - backend="flux", - block_allocation=True, - ) as p: - f = p.submit(get_global) - self.assertFalse(f.done()) - self.assertEqual(f.result(), np.array([5])) - self.assertTrue(f.done()) diff --git a/tests/test_mpi_executor.py b/tests/test_mpi_executor.py index 0d0263ca..ec1d7423 100644 --- a/tests/test_mpi_executor.py +++ b/tests/test_mpi_executor.py @@ -1,6 +1,5 @@ import unittest -from pympipool import Executor from pympipool.mpi.executor import PyMPIExecutor from pympipool.shared.executorbase import cloudpickle_register @@ -56,41 +55,6 @@ def test_errors(self): PyMPIExecutor(max_workers=1, cores_per_worker=1, gpus_per_worker=1, hostname_localhost=True) -class TestExecutorBackend(unittest.TestCase): - def test_meta_executor_serial(self): - with Executor(max_cores=2, hostname_localhost=True, backend="mpi", block_allocation=True) as exe: - cloudpickle_register(ind=1) - fs_1 = exe.submit(calc, 1) - fs_2 = exe.submit(calc, 2) - self.assertEqual(fs_1.result(), 1) - self.assertEqual(fs_2.result(), 2) - self.assertTrue(fs_1.done()) - self.assertTrue(fs_2.done()) - - def test_meta_executor_single(self): - with Executor(max_cores=1, hostname_localhost=True, backend="mpi", block_allocation=True) as exe: - cloudpickle_register(ind=1) - fs_1 = exe.submit(calc, 1) - fs_2 = exe.submit(calc, 2) - self.assertEqual(fs_1.result(), 1) - self.assertEqual(fs_2.result(), 2) - self.assertTrue(fs_1.done()) - self.assertTrue(fs_2.done()) - - def test_meta_executor_parallel(self): - with Executor(max_cores=2, cores_per_worker=2, hostname_localhost=True, backend="mpi", block_allocation=True) as exe: - cloudpickle_register(ind=1) - fs_1 = exe.submit(mpi_funct, 1) - self.assertEqual(fs_1.result(), [(1, 2, 0), (1, 2, 1)]) - self.assertTrue(fs_1.done()) - - def test_errors(self): - with self.assertRaises(TypeError): - Executor(max_cores=1, cores_per_worker=1, threads_per_core=2, hostname_localhost=True, backend="mpi") - with self.assertRaises(TypeError): - Executor(max_cores=1, cores_per_worker=1, gpus_per_worker=1, hostname_localhost=True, backend="mpi") - - class TestTask(unittest.TestCase): def test_echo(self): with PyMPIExecutor(max_workers=1, cores_per_worker=2, hostname_localhost=True) as p: From 4c2bdbc85fee726d4e53e39b664f8ea363c0f57e Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Thu, 18 Apr 2024 16:08:33 -0500 Subject: [PATCH 13/17] merge executor tests into a single file --- tests/test_mpi_executor.py | 252 +++++++++++++++++++++++++++--- tests/test_mpi_executor_memory.py | 52 ------ tests/test_mpi_executor_worker.py | 180 --------------------- 3 files changed, 227 insertions(+), 257 deletions(-) delete mode 100644 tests/test_mpi_executor_memory.py delete mode 100644 tests/test_mpi_executor_worker.py diff --git a/tests/test_mpi_executor.py b/tests/test_mpi_executor.py index ec1d7423..1bc3525b 100644 --- a/tests/test_mpi_executor.py +++ b/tests/test_mpi_executor.py @@ -1,7 +1,13 @@ +from concurrent.futures import CancelledError, Future +from queue import Queue +from time import sleep import unittest -from pympipool.mpi.executor import PyMPIExecutor -from pympipool.shared.executorbase import cloudpickle_register +import numpy as np + +from pympipool.mpi.executor import PyMPIExecutor, MpiExecInterface +from pympipool.shared.backend import call_funct +from pympipool.shared.executorbase import cloudpickle_register, execute_parallel_tasks, ExecutorBase def calc(i): @@ -12,6 +18,14 @@ def echo_funct(i): return i +def get_global(memory=None): + return memory + + +def set_global(): + return {"memory": np.array([5])} + + def mpi_funct(i): from mpi4py import MPI @@ -20,8 +34,17 @@ def mpi_funct(i): return i, size, rank -class TestMetaExecutor(unittest.TestCase): - def test_meta_executor_serial(self): +def raise_error(): + raise RuntimeError + + +def sleep_one(i): + sleep(1) + return i + + +class TestPyMpiExecutorSerial(unittest.TestCase): + def test_pympiexecutor_two_workers(self): with PyMPIExecutor(max_workers=2, hostname_localhost=True) as exe: cloudpickle_register(ind=1) fs_1 = exe.submit(calc, 1) @@ -31,7 +54,7 @@ def test_meta_executor_serial(self): self.assertTrue(fs_1.done()) self.assertTrue(fs_2.done()) - def test_meta_executor_single(self): + def test_pympiexecutor_one_worker(self): with PyMPIExecutor(max_workers=1, hostname_localhost=True) as exe: cloudpickle_register(ind=1) fs_1 = exe.submit(calc, 1) @@ -41,34 +64,22 @@ def test_meta_executor_single(self): self.assertTrue(fs_1.done()) self.assertTrue(fs_2.done()) - def test_meta_executor_parallel(self): - with PyMPIExecutor(max_workers=1, cores_per_worker=2, hostname_localhost=True) as exe: - cloudpickle_register(ind=1) - fs_1 = exe.submit(mpi_funct, 1) - self.assertEqual(fs_1.result(), [(1, 2, 0), (1, 2, 1)]) - self.assertTrue(fs_1.done()) - - def test_errors(self): + def test_pympiexecutor_errors(self): with self.assertRaises(TypeError): PyMPIExecutor(max_workers=1, cores_per_worker=1, threads_per_core=2, hostname_localhost=True) with self.assertRaises(TypeError): PyMPIExecutor(max_workers=1, cores_per_worker=1, gpus_per_worker=1, hostname_localhost=True) -class TestTask(unittest.TestCase): - def test_echo(self): - with PyMPIExecutor(max_workers=1, cores_per_worker=2, hostname_localhost=True) as p: - cloudpickle_register(ind=1) - output = p.submit(echo_funct, 2).result() - self.assertEqual(output, [2, 2]) - - def test_mpi(self): - with PyMPIExecutor(max_workers=1, cores_per_worker=2, hostname_localhost=True) as p: +class TestPyMpiExecutorMPI(unittest.TestCase): + def test_pympiexecutor_one_worker_with_mpi(self): + with PyMPIExecutor(max_workers=1, cores_per_worker=2, hostname_localhost=True) as exe: cloudpickle_register(ind=1) - output = p.submit(mpi_funct, 2).result() - self.assertEqual(output, [(2, 2, 0), (2, 2, 1)]) + fs_1 = exe.submit(mpi_funct, 1) + self.assertEqual(fs_1.result(), [(1, 2, 0), (1, 2, 1)]) + self.assertTrue(fs_1.done()) - def test_mpi_multiple(self): + def test_pympiexecutor_one_worker_with_mpi_multiple_submissions(self): with PyMPIExecutor(max_workers=1, cores_per_worker=2, hostname_localhost=True) as p: cloudpickle_register(ind=1) fs1 = p.submit(mpi_funct, 1) @@ -83,3 +94,194 @@ def test_mpi_multiple(self): output, [[(1, 2, 0), (1, 2, 1)], [(2, 2, 0), (2, 2, 1)], [(3, 2, 0), (3, 2, 1)]], ) + + def test_pympiexecutor_one_worker_with_mpi_echo(self): + with PyMPIExecutor(max_workers=1, cores_per_worker=2, hostname_localhost=True) as p: + cloudpickle_register(ind=1) + output = p.submit(echo_funct, 2).result() + self.assertEqual(output, [2, 2]) + + +class TestPyMpiExecutorInitFunction(unittest.TestCase): + def test_internal_memory(self): + with PyMPIExecutor(max_workers=1, cores_per_worker=1, init_function=set_global, hostname_localhost=True) as p: + f = p.submit(get_global) + self.assertFalse(f.done()) + self.assertEqual(f.result(), np.array([5])) + self.assertTrue(f.done()) + + def test_call_funct(self): + self.assertEqual( + call_funct( + input_dict={"fn": get_global, "args": (), "kwargs": {}}, + memory={"memory": 4}, + ), + 4, + ) + + def test_execute_task(self): + f = Future() + q = Queue() + q.put({"fn": get_global, "args": (), "kwargs": {}, "future": f}) + q.put({"shutdown": True, "wait": True}) + cloudpickle_register(ind=1) + execute_parallel_tasks( + future_queue=q, + cores=1, + oversubscribe=False, + interface_class=MpiExecInterface, + hostname_localhost=True, + init_function=set_global, + ) + self.assertEqual(f.result(), np.array([5])) + q.join() + + +class TestFuturePool(unittest.TestCase): + def test_pool_serial(self): + with PyMPIExecutor(max_workers=1, cores_per_worker=1, hostname_localhost=True) as p: + output = p.submit(calc, i=2) + self.assertEqual(len(p), 1) + self.assertTrue(isinstance(output, Future)) + self.assertFalse(output.done()) + sleep(1) + self.assertTrue(output.done()) + self.assertEqual(len(p), 0) + self.assertEqual(output.result(), np.array(4)) + + def test_executor_multi_submission(self): + with PyMPIExecutor(max_workers=1, cores_per_worker=1, hostname_localhost=True) as p: + fs_1 = p.submit(calc, i=2) + fs_2 = p.submit(calc, i=2) + self.assertEqual(fs_1.result(), np.array(4)) + self.assertEqual(fs_2.result(), np.array(4)) + self.assertTrue(fs_1.done()) + self.assertTrue(fs_2.done()) + + def test_shutdown(self): + p = PyMPIExecutor(max_workers=1, cores_per_worker=1, hostname_localhost=True) + fs1 = p.submit(sleep_one, i=2) + fs2 = p.submit(sleep_one, i=4) + sleep(1) + p.shutdown(wait=True, cancel_futures=True) + self.assertTrue(fs1.done()) + self.assertTrue(fs2.done()) + self.assertEqual(fs1.result(), 2) + with self.assertRaises(CancelledError): + fs2.result() + + def test_pool_serial_map(self): + with PyMPIExecutor(max_workers=1, cores_per_worker=1, hostname_localhost=True) as p: + output = p.map(calc, [1, 2, 3]) + self.assertEqual(list(output), [np.array(1), np.array(4), np.array(9)]) + + def test_executor_exception(self): + with self.assertRaises(RuntimeError): + with PyMPIExecutor(max_workers=1, cores_per_worker=1, hostname_localhost=True) as p: + p.submit(raise_error) + + def test_executor_exception_future(self): + with self.assertRaises(RuntimeError): + with PyMPIExecutor(max_workers=1, cores_per_worker=1, hostname_localhost=True) as p: + fs = p.submit(raise_error) + fs.result() + + def test_meta(self): + meta_data_exe_dict = { + 'cores': 2, + 'interface_class': "", + 'hostname_localhost': True, + 'init_function': None, + 'cwd': None, + 'oversubscribe': False, + 'max_workers': 1 + } + with PyMPIExecutor(max_workers=1, cores_per_worker=2, hostname_localhost=True) as exe: + for k, v in meta_data_exe_dict.items(): + if k != 'interface_class': + self.assertEqual(exe.info[k], v) + else: + self.assertEqual(str(exe.info[k]), v) + with ExecutorBase() as exe: + self.assertIsNone(exe.info) + + def test_pool_multi_core(self): + with PyMPIExecutor(max_workers=1, cores_per_worker=2, hostname_localhost=True) as p: + output = p.submit(mpi_funct, i=2) + self.assertEqual(len(p), 1) + self.assertTrue(isinstance(output, Future)) + self.assertFalse(output.done()) + sleep(1) + self.assertTrue(output.done()) + self.assertEqual(len(p), 0) + self.assertEqual(output.result(), [(2, 2, 0), (2, 2, 1)]) + + def test_pool_multi_core_map(self): + with PyMPIExecutor(max_workers=1, cores_per_worker=2, hostname_localhost=True) as p: + output = p.map(mpi_funct, [1, 2, 3]) + self.assertEqual( + list(output), + [[(1, 2, 0), (1, 2, 1)], [(2, 2, 0), (2, 2, 1)], [(3, 2, 0), (3, 2, 1)]], + ) + + def test_execute_task_failed_no_argument(self): + f = Future() + q = Queue() + q.put({"fn": calc, "args": (), "kwargs": {}, "future": f}) + cloudpickle_register(ind=1) + with self.assertRaises(TypeError): + execute_parallel_tasks( + future_queue=q, + cores=1, + oversubscribe=False, + interface_class=MpiExecInterface, + hostname_localhost=True, + ) + q.join() + + def test_execute_task_failed_wrong_argument(self): + f = Future() + q = Queue() + q.put({"fn": calc, "args": (), "kwargs": {"j": 4}, "future": f}) + cloudpickle_register(ind=1) + with self.assertRaises(TypeError): + execute_parallel_tasks( + future_queue=q, + cores=1, + oversubscribe=False, + interface_class=MpiExecInterface, + hostname_localhost=True, + ) + q.join() + + def test_execute_task(self): + f = Future() + q = Queue() + q.put({"fn": calc, "args": (), "kwargs": {"i": 2}, "future": f}) + q.put({"shutdown": True, "wait": True}) + cloudpickle_register(ind=1) + execute_parallel_tasks( + future_queue=q, + cores=1, + oversubscribe=False, + interface_class=MpiExecInterface, + hostname_localhost=True, + ) + self.assertEqual(f.result(), np.array(4)) + q.join() + + def test_execute_task_parallel(self): + f = Future() + q = Queue() + q.put({"fn": calc, "args": (), "kwargs": {"i": 2}, "future": f}) + q.put({"shutdown": True, "wait": True}) + cloudpickle_register(ind=1) + execute_parallel_tasks( + future_queue=q, + cores=2, + oversubscribe=False, + interface_class=MpiExecInterface, + hostname_localhost=True, + ) + self.assertEqual(f.result(), [np.array(4), np.array(4)]) + q.join() diff --git a/tests/test_mpi_executor_memory.py b/tests/test_mpi_executor_memory.py deleted file mode 100644 index c8f139e5..00000000 --- a/tests/test_mpi_executor_memory.py +++ /dev/null @@ -1,52 +0,0 @@ -from concurrent.futures import Future -from queue import Queue -import unittest - -import numpy as np - -from pympipool.mpi.executor import PyMPIExecutor, MpiExecInterface -from pympipool.shared.backend import call_funct -from pympipool.shared.executorbase import cloudpickle_register, execute_parallel_tasks - - -def get_global(memory=None): - return memory - - -def set_global(): - return {"memory": np.array([5])} - - -class TestWorkerMemory(unittest.TestCase): - def test_internal_memory(self): - with PyMPIExecutor(max_workers=1, cores_per_worker=1, init_function=set_global, hostname_localhost=True) as p: - f = p.submit(get_global) - self.assertFalse(f.done()) - self.assertEqual(f.result(), np.array([5])) - self.assertTrue(f.done()) - - def test_call_funct(self): - self.assertEqual( - call_funct( - input_dict={"fn": get_global, "args": (), "kwargs": {}}, - memory={"memory": 4}, - ), - 4, - ) - - def test_execute_task(self): - f = Future() - q = Queue() - q.put({"fn": get_global, "args": (), "kwargs": {}, "future": f}) - q.put({"shutdown": True, "wait": True}) - cloudpickle_register(ind=1) - execute_parallel_tasks( - future_queue=q, - cores=1, - oversubscribe=False, - interface_class=MpiExecInterface, - hostname_localhost=True, - init_function=set_global, - ) - self.assertEqual(f.result(), np.array([5])) - q.join() diff --git a/tests/test_mpi_executor_worker.py b/tests/test_mpi_executor_worker.py deleted file mode 100644 index 4fd97cc3..00000000 --- a/tests/test_mpi_executor_worker.py +++ /dev/null @@ -1,180 +0,0 @@ -from concurrent.futures import CancelledError, Future -from queue import Queue -from time import sleep -import unittest - -import numpy as np - -from pympipool.mpi.executor import PyMPIExecutor, MpiExecInterface -from pympipool.shared.executorbase import cloudpickle_register, execute_parallel_tasks, ExecutorBase - - -def calc(i): - return np.array(i**2) - - -def sleep_one(i): - sleep(1) - return i - - -def mpi_funct(i): - from mpi4py import MPI - - size = MPI.COMM_WORLD.Get_size() - rank = MPI.COMM_WORLD.Get_rank() - return i, size, rank - - -def raise_error(): - raise RuntimeError - - -class TestFuturePool(unittest.TestCase): - def test_pool_serial(self): - with PyMPIExecutor(max_workers=1, cores_per_worker=1, hostname_localhost=True) as p: - output = p.submit(calc, i=2) - self.assertEqual(len(p), 1) - self.assertTrue(isinstance(output, Future)) - self.assertFalse(output.done()) - sleep(1) - self.assertTrue(output.done()) - self.assertEqual(len(p), 0) - self.assertEqual(output.result(), np.array(4)) - - def test_executor_multi_submission(self): - with PyMPIExecutor(max_workers=1, cores_per_worker=1, hostname_localhost=True) as p: - fs_1 = p.submit(calc, i=2) - fs_2 = p.submit(calc, i=2) - self.assertEqual(fs_1.result(), np.array(4)) - self.assertEqual(fs_2.result(), np.array(4)) - self.assertTrue(fs_1.done()) - self.assertTrue(fs_2.done()) - - def test_shutdown(self): - p = PyMPIExecutor(max_workers=1, cores_per_worker=1, hostname_localhost=True) - fs1 = p.submit(sleep_one, i=2) - fs2 = p.submit(sleep_one, i=4) - sleep(1) - p.shutdown(wait=True, cancel_futures=True) - self.assertTrue(fs1.done()) - self.assertTrue(fs2.done()) - self.assertEqual(fs1.result(), 2) - with self.assertRaises(CancelledError): - fs2.result() - - def test_pool_serial_map(self): - with PyMPIExecutor(max_workers=1, cores_per_worker=1, hostname_localhost=True) as p: - output = p.map(calc, [1, 2, 3]) - self.assertEqual(list(output), [np.array(1), np.array(4), np.array(9)]) - - def test_executor_exception(self): - with self.assertRaises(RuntimeError): - with PyMPIExecutor(max_workers=1, cores_per_worker=1, hostname_localhost=True) as p: - p.submit(raise_error) - - def test_executor_exception_future(self): - with self.assertRaises(RuntimeError): - with PyMPIExecutor(max_workers=1, cores_per_worker=1, hostname_localhost=True) as p: - fs = p.submit(raise_error) - fs.result() - - def test_meta(self): - meta_data_exe_dict = { - 'cores': 2, - 'interface_class': "", - 'hostname_localhost': True, - 'init_function': None, - 'cwd': None, - 'oversubscribe': False, - 'max_workers': 1 - } - with PyMPIExecutor(max_workers=1, cores_per_worker=2, hostname_localhost=True) as exe: - for k, v in meta_data_exe_dict.items(): - if k != 'interface_class': - self.assertEqual(exe.info[k], v) - else: - self.assertEqual(str(exe.info[k]), v) - with ExecutorBase() as exe: - self.assertIsNone(exe.info) - - def test_pool_multi_core(self): - with PyMPIExecutor(max_workers=1, cores_per_worker=2, hostname_localhost=True) as p: - output = p.submit(mpi_funct, i=2) - self.assertEqual(len(p), 1) - self.assertTrue(isinstance(output, Future)) - self.assertFalse(output.done()) - sleep(1) - self.assertTrue(output.done()) - self.assertEqual(len(p), 0) - self.assertEqual(output.result(), [(2, 2, 0), (2, 2, 1)]) - - def test_pool_multi_core_map(self): - with PyMPIExecutor(max_workers=1, cores_per_worker=2, hostname_localhost=True) as p: - output = p.map(mpi_funct, [1, 2, 3]) - self.assertEqual( - list(output), - [[(1, 2, 0), (1, 2, 1)], [(2, 2, 0), (2, 2, 1)], [(3, 2, 0), (3, 2, 1)]], - ) - - def test_execute_task_failed_no_argument(self): - f = Future() - q = Queue() - q.put({"fn": calc, "args": (), "kwargs": {}, "future": f}) - cloudpickle_register(ind=1) - with self.assertRaises(TypeError): - execute_parallel_tasks( - future_queue=q, - cores=1, - oversubscribe=False, - interface_class=MpiExecInterface, - hostname_localhost=True, - ) - q.join() - - def test_execute_task_failed_wrong_argument(self): - f = Future() - q = Queue() - q.put({"fn": calc, "args": (), "kwargs": {"j": 4}, "future": f}) - cloudpickle_register(ind=1) - with self.assertRaises(TypeError): - execute_parallel_tasks( - future_queue=q, - cores=1, - oversubscribe=False, - interface_class=MpiExecInterface, - hostname_localhost=True, - ) - q.join() - - def test_execute_task(self): - f = Future() - q = Queue() - q.put({"fn": calc, "args": (), "kwargs": {"i": 2}, "future": f}) - q.put({"shutdown": True, "wait": True}) - cloudpickle_register(ind=1) - execute_parallel_tasks( - future_queue=q, - cores=1, - oversubscribe=False, - interface_class=MpiExecInterface, - hostname_localhost=True, - ) - self.assertEqual(f.result(), np.array(4)) - q.join() - - def test_execute_task_parallel(self): - f = Future() - q = Queue() - q.put({"fn": calc, "args": (), "kwargs": {"i": 2}, "future": f}) - q.put({"shutdown": True, "wait": True}) - cloudpickle_register(ind=1) - execute_parallel_tasks( - future_queue=q, - cores=2, - oversubscribe=False, - interface_class=MpiExecInterface, - hostname_localhost=True, - ) - self.assertEqual(f.result(), [np.array(4), np.array(4)]) - q.join() From a73b81b23d8b369462d9a249e3cddb5ecd934b9a Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Thu, 18 Apr 2024 16:15:20 -0500 Subject: [PATCH 14/17] fix tests --- tests/test_mpi_executor.py | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/tests/test_mpi_executor.py b/tests/test_mpi_executor.py index 1bc3525b..721c3aca 100644 --- a/tests/test_mpi_executor.py +++ b/tests/test_mpi_executor.py @@ -14,6 +14,10 @@ def calc(i): return i +def calc_array(i): + return np.array(i**2) + + def echo_funct(i): return i @@ -140,7 +144,7 @@ def test_execute_task(self): class TestFuturePool(unittest.TestCase): def test_pool_serial(self): with PyMPIExecutor(max_workers=1, cores_per_worker=1, hostname_localhost=True) as p: - output = p.submit(calc, i=2) + output = p.submit(calc_array, i=2) self.assertEqual(len(p), 1) self.assertTrue(isinstance(output, Future)) self.assertFalse(output.done()) @@ -151,8 +155,8 @@ def test_pool_serial(self): def test_executor_multi_submission(self): with PyMPIExecutor(max_workers=1, cores_per_worker=1, hostname_localhost=True) as p: - fs_1 = p.submit(calc, i=2) - fs_2 = p.submit(calc, i=2) + fs_1 = p.submit(calc_array, i=2) + fs_2 = p.submit(calc_array, i=2) self.assertEqual(fs_1.result(), np.array(4)) self.assertEqual(fs_2.result(), np.array(4)) self.assertTrue(fs_1.done()) @@ -172,7 +176,7 @@ def test_shutdown(self): def test_pool_serial_map(self): with PyMPIExecutor(max_workers=1, cores_per_worker=1, hostname_localhost=True) as p: - output = p.map(calc, [1, 2, 3]) + output = p.map(calc_array, [1, 2, 3]) self.assertEqual(list(output), [np.array(1), np.array(4), np.array(9)]) def test_executor_exception(self): @@ -227,7 +231,7 @@ def test_pool_multi_core_map(self): def test_execute_task_failed_no_argument(self): f = Future() q = Queue() - q.put({"fn": calc, "args": (), "kwargs": {}, "future": f}) + q.put({"fn": calc_array, "args": (), "kwargs": {}, "future": f}) cloudpickle_register(ind=1) with self.assertRaises(TypeError): execute_parallel_tasks( @@ -242,7 +246,7 @@ def test_execute_task_failed_no_argument(self): def test_execute_task_failed_wrong_argument(self): f = Future() q = Queue() - q.put({"fn": calc, "args": (), "kwargs": {"j": 4}, "future": f}) + q.put({"fn": calc_array, "args": (), "kwargs": {"j": 4}, "future": f}) cloudpickle_register(ind=1) with self.assertRaises(TypeError): execute_parallel_tasks( @@ -257,7 +261,7 @@ def test_execute_task_failed_wrong_argument(self): def test_execute_task(self): f = Future() q = Queue() - q.put({"fn": calc, "args": (), "kwargs": {"i": 2}, "future": f}) + q.put({"fn": calc_array, "args": (), "kwargs": {"i": 2}, "future": f}) q.put({"shutdown": True, "wait": True}) cloudpickle_register(ind=1) execute_parallel_tasks( @@ -273,7 +277,7 @@ def test_execute_task(self): def test_execute_task_parallel(self): f = Future() q = Queue() - q.put({"fn": calc, "args": (), "kwargs": {"i": 2}, "future": f}) + q.put({"fn": calc_array, "args": (), "kwargs": {"i": 2}, "future": f}) q.put({"shutdown": True, "wait": True}) cloudpickle_register(ind=1) execute_parallel_tasks( From 78de28b76f0e6899abcae1776f85afebddb558b5 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Thu, 18 Apr 2024 16:17:25 -0500 Subject: [PATCH 15/17] black formatting for tests --- tests/benchmark/llh.py | 38 +++++++--- tests/executables/count.py | 2 +- tests/test_executor_backend_mpi.py | 32 +++++++-- tests/test_flux_executor.py | 9 ++- tests/test_integration_pyiron_workflow.py | 43 +++++------ tests/test_mpi_executor.py | 87 +++++++++++++++++------ tests/test_mpi_executor_future.py | 31 +++++--- tests/test_shared_backend.py | 4 +- tests/test_shared_communication.py | 4 +- tests/test_shell_executor.py | 16 ++++- tests/test_shell_interactive.py | 43 ++++++++--- 11 files changed, 216 insertions(+), 93 deletions(-) diff --git a/tests/benchmark/llh.py b/tests/benchmark/llh.py index 6973e2ac..c18fed0a 100644 --- a/tests/benchmark/llh.py +++ b/tests/benchmark/llh.py @@ -4,16 +4,19 @@ def llh_numpy(mean, sigma): import numpy - data = numpy.random.normal(size=100000000).astype('float64') - s = (data - mean) ** 2 / (2 * (sigma ** 2)) - pdfs = numpy.exp(- s) + + data = numpy.random.normal(size=100000000).astype("float64") + s = (data - mean) ** 2 / (2 * (sigma**2)) + pdfs = numpy.exp(-s) pdfs /= numpy.sqrt(2 * numpy.pi) * sigma return numpy.log(pdfs).sum() def run_with_executor(executor=None, mean=0.1, sigma=1.1, runs=32, **kwargs): with executor(**kwargs) as exe: - future_lst = [exe.submit(llh_numpy, mean=mean, sigma=sigma) for i in range(runs)] + future_lst = [ + exe.submit(llh_numpy, mean=mean, sigma=sigma) for i in range(runs) + ] return [f.result() for f in future_lst] @@ -28,20 +31,35 @@ def run_static(mean=0.1, sigma=1.1, runs=32): run_static(mean=0.1, sigma=1.1, runs=32) elif run_mode == "process": from concurrent.futures import ProcessPoolExecutor - run_with_executor(executor=ProcessPoolExecutor, mean=0.1, sigma=1.1, runs=32, max_workers=4) + + run_with_executor( + executor=ProcessPoolExecutor, mean=0.1, sigma=1.1, runs=32, max_workers=4 + ) elif run_mode == "thread": from concurrent.futures import ThreadPoolExecutor - run_with_executor(executor=ThreadPoolExecutor, mean=0.1, sigma=1.1, runs=32, max_workers=4) + + run_with_executor( + executor=ThreadPoolExecutor, mean=0.1, sigma=1.1, runs=32, max_workers=4 + ) elif run_mode == "pympipool": from pympipool.mpi.executor import PyMPIExecutor - run_with_executor(executor=PyMPIExecutor, mean=0.1, sigma=1.1, runs=32, max_workers=4) + + run_with_executor( + executor=PyMPIExecutor, mean=0.1, sigma=1.1, runs=32, max_workers=4 + ) elif run_mode == "flux": from pympipool.flux.executor import PyFluxExecutor - run_with_executor(executor=PyFluxExecutor, mean=0.1, sigma=1.1, runs=32, max_workers=4) + + run_with_executor( + executor=PyFluxExecutor, mean=0.1, sigma=1.1, runs=32, max_workers=4 + ) elif run_mode == "mpi4py": from mpi4py.futures import MPIPoolExecutor - run_with_executor(executor=MPIPoolExecutor, mean=0.1, sigma=1.1, runs=32, max_workers=4) + + run_with_executor( + executor=MPIPoolExecutor, mean=0.1, sigma=1.1, runs=32, max_workers=4 + ) else: raise ValueError(run_mode) stop_time = time() - print(run_mode, stop_time-start_time) + print(run_mode, stop_time - start_time) diff --git a/tests/executables/count.py b/tests/executables/count.py index 3e65d51e..4158a9e9 100644 --- a/tests/executables/count.py +++ b/tests/executables/count.py @@ -10,4 +10,4 @@ def count(iterations): if "shutdown" in user_input: break else: - count(iterations=int(user_input)) \ No newline at end of file + count(iterations=int(user_input)) diff --git a/tests/test_executor_backend_mpi.py b/tests/test_executor_backend_mpi.py index 3ceb5db5..27798b5c 100644 --- a/tests/test_executor_backend_mpi.py +++ b/tests/test_executor_backend_mpi.py @@ -18,7 +18,9 @@ def mpi_funct(i): class TestExecutorBackend(unittest.TestCase): def test_meta_executor_serial(self): - with Executor(max_cores=2, hostname_localhost=True, backend="mpi", block_allocation=True) as exe: + with Executor( + max_cores=2, hostname_localhost=True, backend="mpi", block_allocation=True + ) as exe: cloudpickle_register(ind=1) fs_1 = exe.submit(calc, 1) fs_2 = exe.submit(calc, 2) @@ -28,7 +30,9 @@ def test_meta_executor_serial(self): self.assertTrue(fs_2.done()) def test_meta_executor_single(self): - with Executor(max_cores=1, hostname_localhost=True, backend="mpi", block_allocation=True) as exe: + with Executor( + max_cores=1, hostname_localhost=True, backend="mpi", block_allocation=True + ) as exe: cloudpickle_register(ind=1) fs_1 = exe.submit(calc, 1) fs_2 = exe.submit(calc, 2) @@ -38,7 +42,13 @@ def test_meta_executor_single(self): self.assertTrue(fs_2.done()) def test_meta_executor_parallel(self): - with Executor(max_cores=2, cores_per_worker=2, hostname_localhost=True, backend="mpi", block_allocation=True) as exe: + with Executor( + max_cores=2, + cores_per_worker=2, + hostname_localhost=True, + backend="mpi", + block_allocation=True, + ) as exe: cloudpickle_register(ind=1) fs_1 = exe.submit(mpi_funct, 1) self.assertEqual(fs_1.result(), [(1, 2, 0), (1, 2, 1)]) @@ -46,6 +56,18 @@ def test_meta_executor_parallel(self): def test_errors(self): with self.assertRaises(TypeError): - Executor(max_cores=1, cores_per_worker=1, threads_per_core=2, hostname_localhost=True, backend="mpi") + Executor( + max_cores=1, + cores_per_worker=1, + threads_per_core=2, + hostname_localhost=True, + backend="mpi", + ) with self.assertRaises(TypeError): - Executor(max_cores=1, cores_per_worker=1, gpus_per_worker=1, hostname_localhost=True, backend="mpi") + Executor( + max_cores=1, + cores_per_worker=1, + gpus_per_worker=1, + hostname_localhost=True, + backend="mpi", + ) diff --git a/tests/test_flux_executor.py b/tests/test_flux_executor.py index 50406974..f1d968d1 100644 --- a/tests/test_flux_executor.py +++ b/tests/test_flux_executor.py @@ -76,7 +76,9 @@ def test_flux_executor_parallel(self): self.assertTrue(fs_1.done()) def test_single_task(self): - with PyFluxExecutor(max_workers=1, cores_per_worker=2, executor=self.executor) as p: + with PyFluxExecutor( + max_workers=1, cores_per_worker=2, executor=self.executor + ) as p: output = p.map(mpi_funct, [1, 2, 3]) self.assertEqual( list(output), @@ -116,7 +118,10 @@ def test_execute_task_threads(self): def test_internal_memory(self): with PyFluxExecutor( - max_workers=1, cores_per_worker=1, init_function=set_global, executor=self.executor + max_workers=1, + cores_per_worker=1, + init_function=set_global, + executor=self.executor, ) as p: f = p.submit(get_global) self.assertFalse(f.done()) diff --git a/tests/test_integration_pyiron_workflow.py b/tests/test_integration_pyiron_workflow.py index 6e757cd9..e9d101f2 100644 --- a/tests/test_integration_pyiron_workflow.py +++ b/tests/test_integration_pyiron_workflow.py @@ -5,6 +5,7 @@ be pickled using the standard pickle module, and thus poses a relatively thorough test for the general un-pickle-able case. """ + from concurrent.futures._base import TimeoutError as cfbTimeoutError from functools import partialmethod from time import sleep @@ -19,6 +20,7 @@ class Foo: A base class to be dynamically modified for putting an executor/serializer through its paces. """ + def __init__(self, fnc: callable): self.fnc = fnc self.result = None @@ -41,16 +43,12 @@ def dynamic_foo(): Overrides the `fnc` input of `Foo` with the decorated function. """ + def as_dynamic_foo(fnc: callable): return type( "DynamicFoo", (Foo,), # Define parentage - { - "__init__": partialmethod( - Foo.__init__, - fnc - ) - }, + {"__init__": partialmethod(Foo.__init__, fnc)}, ) return as_dynamic_foo @@ -84,7 +82,7 @@ def slowly_returns_dynamic(dynamic_arg): fs.result().attribute_on_dynamic, "attribute updated", msg="The submit callable should have modified the mutable, dynamically " - "defined object with a new attribute." + "defined object with a new attribute.", ) def test_callable(self): @@ -101,13 +99,10 @@ def slowly_returns_42(): dynamic_42 = slowly_returns_42() # Instantiate the dynamically defined class self.assertIsInstance( - dynamic_42, - Foo, - msg="Just a sanity check that the test is set up right" + dynamic_42, Foo, msg="Just a sanity check that the test is set up right" ) self.assertIsNone( - dynamic_42.result, - msg="Just a sanity check that the test is set up right" + dynamic_42.result, msg="Just a sanity check that the test is set up right" ) executor = Executor(hostname_localhost=True, block_allocation=True) cloudpickle_register(ind=1) @@ -116,18 +111,16 @@ def slowly_returns_42(): self.assertFalse( fs.done(), msg="The submit callable sleeps long enough that we expect to still be " - "running here -- did something fail to get submit to an executor??" + "running here -- did something fail to get submit to an executor??", ) self.assertEqual( - fortytwo, - fs.result(), - msg="The future is expected to behave as usual" + fortytwo, fs.result(), msg="The future is expected to behave as usual" ) self.assertEqual( fortytwo, dynamic_42.result, msg="The callback modifies its object and should run by the time the result" - "is available -- did it fail to get called?" + "is available -- did it fail to get called?", ) def test_callback(self): @@ -140,7 +133,7 @@ def returns_42(): dynamic_42 = returns_42() self.assertFalse( dynamic_42.running, - msg="Sanity check that the test starts in the expected condition" + msg="Sanity check that the test starts in the expected condition", ) executor = Executor(hostname_localhost=True, block_allocation=True) cloudpickle_register(ind=1) @@ -148,12 +141,12 @@ def returns_42(): fs.add_done_callback(dynamic_42.process_result) self.assertTrue( dynamic_42.running, - msg="Submit method need to be able to modify their owners" + msg="Submit method need to be able to modify their owners", ) fs.result() # Wait for the process to finish self.assertFalse( dynamic_42.running, - msg="Callback methods need to be able to modify their owners" + msg="Callback methods need to be able to modify their owners", ) def test_exception(self): @@ -171,7 +164,7 @@ def raise_error(): fs = executor.submit(re.run) with self.assertRaises( RuntimeError, - msg="The callable just raises an error -- this should get shown to the user" + msg="The callable just raises an error -- this should get shown to the user", ): fs.result() @@ -203,13 +196,13 @@ def slowly_returns_dynamic(): fs.result(), Foo, msg="Just a sanity check that we're getting the right type of dynamically " - "defined type of object" + "defined type of object", ) self.assertEqual( fs.result().result, "it was an inside job!", msg="The submit callable modifies the object that owns it, and this should" - "be reflected in the main process after deserialziation" + "be reflected in the main process after deserialziation", ) def test_timeout(self): @@ -231,13 +224,13 @@ def slow(): self.assertEqual( fs.result(timeout=30), fortytwo, - msg="waiting long enough should get the result" + msg="waiting long enough should get the result", ) with self.assertRaises( (TimeoutError, cfbTimeoutError), msg="With a timeout time smaller than our submit callable's sleep time, " - "we had better get an exception!" + "we had better get an exception!", ): fs = executor.submit(f.run) fs.result(timeout=0.0001) diff --git a/tests/test_mpi_executor.py b/tests/test_mpi_executor.py index 721c3aca..1803d6d4 100644 --- a/tests/test_mpi_executor.py +++ b/tests/test_mpi_executor.py @@ -7,7 +7,11 @@ from pympipool.mpi.executor import PyMPIExecutor, MpiExecInterface from pympipool.shared.backend import call_funct -from pympipool.shared.executorbase import cloudpickle_register, execute_parallel_tasks, ExecutorBase +from pympipool.shared.executorbase import ( + cloudpickle_register, + execute_parallel_tasks, + ExecutorBase, +) def calc(i): @@ -70,21 +74,35 @@ def test_pympiexecutor_one_worker(self): def test_pympiexecutor_errors(self): with self.assertRaises(TypeError): - PyMPIExecutor(max_workers=1, cores_per_worker=1, threads_per_core=2, hostname_localhost=True) + PyMPIExecutor( + max_workers=1, + cores_per_worker=1, + threads_per_core=2, + hostname_localhost=True, + ) with self.assertRaises(TypeError): - PyMPIExecutor(max_workers=1, cores_per_worker=1, gpus_per_worker=1, hostname_localhost=True) + PyMPIExecutor( + max_workers=1, + cores_per_worker=1, + gpus_per_worker=1, + hostname_localhost=True, + ) class TestPyMpiExecutorMPI(unittest.TestCase): def test_pympiexecutor_one_worker_with_mpi(self): - with PyMPIExecutor(max_workers=1, cores_per_worker=2, hostname_localhost=True) as exe: + with PyMPIExecutor( + max_workers=1, cores_per_worker=2, hostname_localhost=True + ) as exe: cloudpickle_register(ind=1) fs_1 = exe.submit(mpi_funct, 1) self.assertEqual(fs_1.result(), [(1, 2, 0), (1, 2, 1)]) self.assertTrue(fs_1.done()) def test_pympiexecutor_one_worker_with_mpi_multiple_submissions(self): - with PyMPIExecutor(max_workers=1, cores_per_worker=2, hostname_localhost=True) as p: + with PyMPIExecutor( + max_workers=1, cores_per_worker=2, hostname_localhost=True + ) as p: cloudpickle_register(ind=1) fs1 = p.submit(mpi_funct, 1) fs2 = p.submit(mpi_funct, 2) @@ -100,7 +118,9 @@ def test_pympiexecutor_one_worker_with_mpi_multiple_submissions(self): ) def test_pympiexecutor_one_worker_with_mpi_echo(self): - with PyMPIExecutor(max_workers=1, cores_per_worker=2, hostname_localhost=True) as p: + with PyMPIExecutor( + max_workers=1, cores_per_worker=2, hostname_localhost=True + ) as p: cloudpickle_register(ind=1) output = p.submit(echo_funct, 2).result() self.assertEqual(output, [2, 2]) @@ -108,7 +128,12 @@ def test_pympiexecutor_one_worker_with_mpi_echo(self): class TestPyMpiExecutorInitFunction(unittest.TestCase): def test_internal_memory(self): - with PyMPIExecutor(max_workers=1, cores_per_worker=1, init_function=set_global, hostname_localhost=True) as p: + with PyMPIExecutor( + max_workers=1, + cores_per_worker=1, + init_function=set_global, + hostname_localhost=True, + ) as p: f = p.submit(get_global) self.assertFalse(f.done()) self.assertEqual(f.result(), np.array([5])) @@ -143,7 +168,9 @@ def test_execute_task(self): class TestFuturePool(unittest.TestCase): def test_pool_serial(self): - with PyMPIExecutor(max_workers=1, cores_per_worker=1, hostname_localhost=True) as p: + with PyMPIExecutor( + max_workers=1, cores_per_worker=1, hostname_localhost=True + ) as p: output = p.submit(calc_array, i=2) self.assertEqual(len(p), 1) self.assertTrue(isinstance(output, Future)) @@ -154,7 +181,9 @@ def test_pool_serial(self): self.assertEqual(output.result(), np.array(4)) def test_executor_multi_submission(self): - with PyMPIExecutor(max_workers=1, cores_per_worker=1, hostname_localhost=True) as p: + with PyMPIExecutor( + max_workers=1, cores_per_worker=1, hostname_localhost=True + ) as p: fs_1 = p.submit(calc_array, i=2) fs_2 = p.submit(calc_array, i=2) self.assertEqual(fs_1.result(), np.array(4)) @@ -175,34 +204,42 @@ def test_shutdown(self): fs2.result() def test_pool_serial_map(self): - with PyMPIExecutor(max_workers=1, cores_per_worker=1, hostname_localhost=True) as p: + with PyMPIExecutor( + max_workers=1, cores_per_worker=1, hostname_localhost=True + ) as p: output = p.map(calc_array, [1, 2, 3]) self.assertEqual(list(output), [np.array(1), np.array(4), np.array(9)]) def test_executor_exception(self): with self.assertRaises(RuntimeError): - with PyMPIExecutor(max_workers=1, cores_per_worker=1, hostname_localhost=True) as p: + with PyMPIExecutor( + max_workers=1, cores_per_worker=1, hostname_localhost=True + ) as p: p.submit(raise_error) def test_executor_exception_future(self): with self.assertRaises(RuntimeError): - with PyMPIExecutor(max_workers=1, cores_per_worker=1, hostname_localhost=True) as p: + with PyMPIExecutor( + max_workers=1, cores_per_worker=1, hostname_localhost=True + ) as p: fs = p.submit(raise_error) fs.result() def test_meta(self): meta_data_exe_dict = { - 'cores': 2, - 'interface_class': "", - 'hostname_localhost': True, - 'init_function': None, - 'cwd': None, - 'oversubscribe': False, - 'max_workers': 1 + "cores": 2, + "interface_class": "", + "hostname_localhost": True, + "init_function": None, + "cwd": None, + "oversubscribe": False, + "max_workers": 1, } - with PyMPIExecutor(max_workers=1, cores_per_worker=2, hostname_localhost=True) as exe: + with PyMPIExecutor( + max_workers=1, cores_per_worker=2, hostname_localhost=True + ) as exe: for k, v in meta_data_exe_dict.items(): - if k != 'interface_class': + if k != "interface_class": self.assertEqual(exe.info[k], v) else: self.assertEqual(str(exe.info[k]), v) @@ -210,7 +247,9 @@ def test_meta(self): self.assertIsNone(exe.info) def test_pool_multi_core(self): - with PyMPIExecutor(max_workers=1, cores_per_worker=2, hostname_localhost=True) as p: + with PyMPIExecutor( + max_workers=1, cores_per_worker=2, hostname_localhost=True + ) as p: output = p.submit(mpi_funct, i=2) self.assertEqual(len(p), 1) self.assertTrue(isinstance(output, Future)) @@ -221,7 +260,9 @@ def test_pool_multi_core(self): self.assertEqual(output.result(), [(2, 2, 0), (2, 2, 1)]) def test_pool_multi_core_map(self): - with PyMPIExecutor(max_workers=1, cores_per_worker=2, hostname_localhost=True) as p: + with PyMPIExecutor( + max_workers=1, cores_per_worker=2, hostname_localhost=True + ) as p: output = p.map(mpi_funct, [1, 2, 3]) self.assertEqual( list(output), diff --git a/tests/test_mpi_executor_future.py b/tests/test_mpi_executor_future.py index f5d62582..737e51ec 100644 --- a/tests/test_mpi_executor_future.py +++ b/tests/test_mpi_executor_future.py @@ -13,7 +13,9 @@ def calc(i): class TestFuture(unittest.TestCase): def test_pool_serial(self): - with PyMPIExecutor(max_workers=1, cores_per_worker=1, hostname_localhost=True) as p: + with PyMPIExecutor( + max_workers=1, cores_per_worker=1, hostname_localhost=True + ) as p: output = p.submit(calc, i=2) self.assertTrue(isinstance(output, Future)) self.assertFalse(output.done()) @@ -22,7 +24,9 @@ def test_pool_serial(self): self.assertEqual(output.result(), np.array(4)) def test_pool_serial_multi_core(self): - with PyMPIExecutor(max_workers=1, cores_per_worker=2, hostname_localhost=True) as p: + with PyMPIExecutor( + max_workers=1, cores_per_worker=2, hostname_localhost=True + ) as p: output = p.submit(calc, i=2) self.assertTrue(isinstance(output, Future)) self.assertFalse(output.done()) @@ -41,6 +45,7 @@ def test_independence_from_executor(self): def slow_callable(): from time import sleep + sleep(1) return True @@ -57,28 +62,29 @@ def submit(): self.assertListEqual( [], mutable, - msg="Sanity check that test is starting in the expected condition" + msg="Sanity check that test is starting in the expected condition", ) future = submit() self.assertFalse( future.done(), - msg="The submit function is slow, it should be running still" + msg="The submit function is slow, it should be running still", ) self.assertListEqual( [], mutable, msg="While running, the mutable should not have been impacted by the " - "callback" + "callback", ) future.result() # Wait for the calculation to finish self.assertListEqual( ["Called back"], mutable, - msg="After completion, the callback should modify the mutable data" + msg="After completion, the callback should modify the mutable data", ) with self.subTest("From inside a class"): + class Foo: def __init__(self): self.running = False @@ -86,13 +92,16 @@ def __init__(self): def run(self): self.running = True - future = PyMPIExecutor(hostname_localhost=True).submit(self.return_42) + future = PyMPIExecutor(hostname_localhost=True).submit( + self.return_42 + ) future.add_done_callback(self.finished) return future def return_42(self): from time import sleep + sleep(1) return 42 @@ -102,15 +111,15 @@ def finished(self, future): foo = Foo() self.assertFalse( foo.running, - msg="Sanity check that the test starts in the expected condition" + msg="Sanity check that the test starts in the expected condition", ) fs = foo.run() self.assertTrue( foo.running, - msg="We should be able to exit the run method before the task completes" + msg="We should be able to exit the run method before the task completes", ) fs.result() # Wait for completion self.assertFalse( foo.running, - msg="After task completion, we expect the callback to modify the class" - ) \ No newline at end of file + msg="After task completion, we expect the callback to modify the class", + ) diff --git a/tests/test_shared_backend.py b/tests/test_shared_backend.py index 89a7c4d2..18e00bef 100644 --- a/tests/test_shared_backend.py +++ b/tests/test_shared_backend.py @@ -22,9 +22,7 @@ def test_command_local(self): "--zmqport", result_dict["zmqport"], ] - interface = MpiExecInterface( - cwd=None, cores=2, oversubscribe=True - ) + interface = MpiExecInterface(cwd=None, cores=2, oversubscribe=True) self.assertEqual( command_lst, interface.generate_command( diff --git a/tests/test_shared_communication.py b/tests/test_shared_communication.py index 8eed9a65..940b4ee9 100644 --- a/tests/test_shared_communication.py +++ b/tests/test_shared_communication.py @@ -25,9 +25,7 @@ def test_interface(self): cloudpickle_register(ind=1) task_dict = {"fn": calc, "args": (), "kwargs": {"i": 2}} interface = SocketInterface( - interface=MpiExecInterface( - cwd=None, cores=1, oversubscribe=False - ) + interface=MpiExecInterface(cwd=None, cores=1, oversubscribe=False) ) interface.bootup( command_lst=[ diff --git a/tests/test_shell_executor.py b/tests/test_shell_executor.py index 5aa19c3c..d2b8c255 100644 --- a/tests/test_shell_executor.py +++ b/tests/test_shell_executor.py @@ -9,7 +9,13 @@ class SubprocessExecutorTest(unittest.TestCase): def test_execute_single_task(self): test_queue = queue.Queue() f = Future() - test_queue.put({"future": f, "args": [["echo", "test"]], "kwargs": {"universal_newlines": True}}) + test_queue.put( + { + "future": f, + "args": [["echo", "test"]], + "kwargs": {"universal_newlines": True}, + } + ) test_queue.put({"shutdown": True}) self.assertFalse(f.done()) execute_single_task(future_queue=test_queue) @@ -25,7 +31,13 @@ def test_wrong_error(self): def test_broken_executable(self): test_queue = queue.Queue() f = Future() - test_queue.put({"future": f, "args": [["/executable/does/not/exist"]], "kwargs": {"universal_newlines": True}}) + test_queue.put( + { + "future": f, + "args": [["/executable/does/not/exist"]], + "kwargs": {"universal_newlines": True}, + } + ) with self.assertRaises(FileNotFoundError): execute_single_task(future_queue=test_queue) diff --git a/tests/test_shell_interactive.py b/tests/test_shell_interactive.py index ea756f55..7eee1ba0 100644 --- a/tests/test_shell_interactive.py +++ b/tests/test_shell_interactive.py @@ -8,15 +8,37 @@ class ShellInteractiveExecutorTest(unittest.TestCase): def setUp(self): - self.executable_path = os.path.join(os.path.dirname(__file__), "executables", "count.py") + self.executable_path = os.path.join( + os.path.dirname(__file__), "executables", "count.py" + ) def test_execute_single_task(self): test_queue = queue.Queue() future_lines = Future() future_pattern = Future() - test_queue.put({"init": True, "args": [["python", self.executable_path]], "kwargs": {"universal_newlines": True}}) - test_queue.put({"future": future_lines, "input": "4\n", "lines_to_read": 5, "stop_read_pattern": None}) - test_queue.put({"future": future_pattern, "input": "4\n", "lines_to_read": None, "stop_read_pattern": "done"}) + test_queue.put( + { + "init": True, + "args": [["python", self.executable_path]], + "kwargs": {"universal_newlines": True}, + } + ) + test_queue.put( + { + "future": future_lines, + "input": "4\n", + "lines_to_read": 5, + "stop_read_pattern": None, + } + ) + test_queue.put( + { + "future": future_pattern, + "input": "4\n", + "lines_to_read": None, + "stop_read_pattern": "done", + } + ) test_queue.put({"shutdown": True}) self.assertFalse(future_lines.done()) self.assertFalse(future_pattern.done()) @@ -27,9 +49,15 @@ def test_execute_single_task(self): self.assertEqual("0\n1\n2\n3\ndone\n", future_pattern.result()) def test_shell_interactive_executor(self): - with ShellExecutor(["python", self.executable_path], universal_newlines=True) as exe: - future_lines = exe.submit(string_input="4", lines_to_read=5, stop_read_pattern=None) - future_pattern = exe.submit(string_input="4", lines_to_read=None, stop_read_pattern="done") + with ShellExecutor( + ["python", self.executable_path], universal_newlines=True + ) as exe: + future_lines = exe.submit( + string_input="4", lines_to_read=5, stop_read_pattern=None + ) + future_pattern = exe.submit( + string_input="4", lines_to_read=None, stop_read_pattern="done" + ) self.assertFalse(future_lines.done()) self.assertFalse(future_pattern.done()) self.assertEqual("0\n1\n2\n3\ndone\n", future_lines.result()) @@ -40,4 +68,3 @@ def test_shell_interactive_executor(self): def test_meta(self): with ShellExecutor(["sleep"]) as exe: self.assertEqual(exe.info, {}) - From 505c5ef03fbbe1b43cd2e6ba332747d10800cd78 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Thu, 18 Apr 2024 16:49:05 -0500 Subject: [PATCH 16/17] Fix block_allocation=False and add tests --- pympipool/shared/executorbase.py | 5 +- tests/test_executor_backend_mpi_noblock.py | 60 ++++++++++++++++++++++ 2 files changed, 64 insertions(+), 1 deletion(-) create mode 100644 tests/test_executor_backend_mpi_noblock.py diff --git a/pympipool/shared/executorbase.py b/pympipool/shared/executorbase.py index b063e00b..f4ba9bb3 100644 --- a/pympipool/shared/executorbase.py +++ b/pympipool/shared/executorbase.py @@ -343,7 +343,10 @@ def execute_separate_tasks( qtask = queue.Queue() qtask.put(task_dict) qtask.put({"shutdown": True, "wait": True}) - active_task_dict[task_dict["future"]] = resource_dict["cores"] + if "cores" in resource_dict.keys(): + active_task_dict[task_dict["future"]] = resource_dict["cores"] + else: + active_task_dict[task_dict["future"]] = kwargs["cores"] task_kwargs = kwargs.copy() task_kwargs.update(resource_dict) task_kwargs.update( diff --git a/tests/test_executor_backend_mpi_noblock.py b/tests/test_executor_backend_mpi_noblock.py new file mode 100644 index 00000000..0d040dd0 --- /dev/null +++ b/tests/test_executor_backend_mpi_noblock.py @@ -0,0 +1,60 @@ +import unittest + +from pympipool import Executor +from pympipool.shared.executorbase import cloudpickle_register + + +def calc(i): + return i + + +def mpi_funct(i): + from mpi4py import MPI + + size = MPI.COMM_WORLD.Get_size() + rank = MPI.COMM_WORLD.Get_rank() + return i, size, rank + + +class TestExecutorBackend(unittest.TestCase): + def test_meta_executor_serial(self): + with Executor( + max_cores=2, hostname_localhost=True, backend="mpi", block_allocation=False + ) as exe: + cloudpickle_register(ind=1) + fs_1 = exe.submit(calc, 1) + fs_2 = exe.submit(calc, 2) + self.assertEqual(fs_1.result(), 1) + self.assertEqual(fs_2.result(), 2) + self.assertTrue(fs_1.done()) + self.assertTrue(fs_2.done()) + + def test_meta_executor_single(self): + with Executor( + max_cores=1, hostname_localhost=True, backend="mpi", block_allocation=False + ) as exe: + cloudpickle_register(ind=1) + fs_1 = exe.submit(calc, 1) + fs_2 = exe.submit(calc, 2) + self.assertEqual(fs_1.result(), 1) + self.assertEqual(fs_2.result(), 2) + self.assertTrue(fs_1.done()) + self.assertTrue(fs_2.done()) + + def test_errors(self): + with self.assertRaises(TypeError): + Executor( + max_cores=1, + cores_per_worker=1, + threads_per_core=2, + hostname_localhost=True, + backend="mpi", + ) + with self.assertRaises(TypeError): + Executor( + max_cores=1, + cores_per_worker=1, + gpus_per_worker=1, + hostname_localhost=True, + backend="mpi", + ) From 06fa381684262e5a0d106a9b7fa35861f9f3843e Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Thu, 18 Apr 2024 16:55:39 -0500 Subject: [PATCH 17/17] Add more tests --- tests/test_mpi_executor.py | 95 +++++++++++++++++++++++++++++++++++++- 1 file changed, 94 insertions(+), 1 deletion(-) diff --git a/tests/test_mpi_executor.py b/tests/test_mpi_executor.py index 1803d6d4..707ddf48 100644 --- a/tests/test_mpi_executor.py +++ b/tests/test_mpi_executor.py @@ -5,7 +5,7 @@ import numpy as np -from pympipool.mpi.executor import PyMPIExecutor, MpiExecInterface +from pympipool.mpi.executor import PyMPIExecutor, PyMPIStepExecutor, MpiExecInterface from pympipool.shared.backend import call_funct from pympipool.shared.executorbase import ( cloudpickle_register, @@ -89,6 +89,44 @@ def test_pympiexecutor_errors(self): ) +class TestPyMpiExecutorStepSerial(unittest.TestCase): + def test_pympiexecutor_two_workers(self): + with PyMPIStepExecutor(max_cores=2, hostname_localhost=True) as exe: + cloudpickle_register(ind=1) + fs_1 = exe.submit(calc, 1) + fs_2 = exe.submit(calc, 2) + self.assertEqual(fs_1.result(), 1) + self.assertEqual(fs_2.result(), 2) + self.assertTrue(fs_1.done()) + self.assertTrue(fs_2.done()) + + def test_pympiexecutor_one_worker(self): + with PyMPIStepExecutor(max_cores=1, hostname_localhost=True) as exe: + cloudpickle_register(ind=1) + fs_1 = exe.submit(calc, 1) + fs_2 = exe.submit(calc, 2) + self.assertEqual(fs_1.result(), 1) + self.assertEqual(fs_2.result(), 2) + self.assertTrue(fs_1.done()) + self.assertTrue(fs_2.done()) + + def test_pympiexecutor_errors(self): + with self.assertRaises(TypeError): + PyMPIStepExecutor( + max_cores=1, + cores_per_worker=1, + threads_per_core=2, + hostname_localhost=True, + ) + with self.assertRaises(TypeError): + PyMPIStepExecutor( + max_cores=1, + cores_per_worker=1, + gpus_per_worker=1, + hostname_localhost=True, + ) + + class TestPyMpiExecutorMPI(unittest.TestCase): def test_pympiexecutor_one_worker_with_mpi(self): with PyMPIExecutor( @@ -126,6 +164,43 @@ def test_pympiexecutor_one_worker_with_mpi_echo(self): self.assertEqual(output, [2, 2]) +class TestPyMpiStepExecutorMPI(unittest.TestCase): + def test_pympiexecutor_one_worker_with_mpi(self): + with PyMPIStepExecutor( + max_cores=2, cores_per_worker=2, hostname_localhost=True + ) as exe: + cloudpickle_register(ind=1) + fs_1 = exe.submit(mpi_funct, 1) + self.assertEqual(fs_1.result(), [(1, 2, 0), (1, 2, 1)]) + self.assertTrue(fs_1.done()) + + def test_pympiexecutor_one_worker_with_mpi_multiple_submissions(self): + with PyMPIStepExecutor( + max_cores=2, cores_per_worker=2, hostname_localhost=True + ) as p: + cloudpickle_register(ind=1) + fs1 = p.submit(mpi_funct, 1) + fs2 = p.submit(mpi_funct, 2) + fs3 = p.submit(mpi_funct, 3) + output = [ + fs1.result(), + fs2.result(), + fs3.result(), + ] + self.assertEqual( + output, + [[(1, 2, 0), (1, 2, 1)], [(2, 2, 0), (2, 2, 1)], [(3, 2, 0), (3, 2, 1)]], + ) + + def test_pympiexecutor_one_worker_with_mpi_echo(self): + with PyMPIStepExecutor( + max_cores=2, cores_per_worker=2, hostname_localhost=True + ) as p: + cloudpickle_register(ind=1) + output = p.submit(echo_funct, 2).result() + self.assertEqual(output, [2, 2]) + + class TestPyMpiExecutorInitFunction(unittest.TestCase): def test_internal_memory(self): with PyMPIExecutor( @@ -246,6 +321,24 @@ def test_meta(self): with ExecutorBase() as exe: self.assertIsNone(exe.info) + def test_meta_step(self): + meta_data_exe_dict = { + "cores": 2, + "interface_class": "", + "hostname_localhost": True, + "cwd": None, + "oversubscribe": False, + "max_cores": 2, + } + with PyMPIStepExecutor( + max_cores=2, cores_per_worker=2, hostname_localhost=True + ) as exe: + for k, v in meta_data_exe_dict.items(): + if k != "interface_class": + self.assertEqual(exe.info[k], v) + else: + self.assertEqual(str(exe.info[k]), v) + def test_pool_multi_core(self): with PyMPIExecutor( max_workers=1, cores_per_worker=2, hostname_localhost=True