diff --git a/.github/workflows/unittest-flux.yml b/.github/workflows/unittest-flux.yml index 221a5357..410a60df 100644 --- a/.github/workflows/unittest-flux.yml +++ b/.github/workflows/unittest-flux.yml @@ -55,7 +55,7 @@ jobs: timeout-minutes: 5 run: > flux start - coverage run -a --omit="pympipool/_version.py,tests/*" -m unittest tests/test_flux_executor.py; + coverage run -a --omit="pympipool/_version.py,tests/*" -m unittest tests/test_flux_executor.py tests/test_executor_backend_flux.py; env: OMPI_MCA_plm: 'isolated' OMPI_MCA_rmaps_base_oversubscribe: 'yes' diff --git a/pympipool/__init__.py b/pympipool/__init__.py index 199f5b78..f89f824f 100644 --- a/pympipool/__init__.py +++ b/pympipool/__init__.py @@ -2,14 +2,14 @@ import shutil from typing import Optional from ._version import get_versions -from pympipool.mpi.executor import PyMPIExecutor +from pympipool.mpi.executor import PyMPIExecutor, PyMPIStepExecutor from pympipool.shared.interface import SLURM_COMMAND from pympipool.shell.executor import SubprocessExecutor from pympipool.shell.interactive import ShellExecutor -from pympipool.slurm.executor import PySlurmExecutor +from pympipool.slurm.executor import PySlurmExecutor, PySlurmStepExecutor try: # The PyFluxExecutor requires flux-core to be installed. - from pympipool.flux.executor import PyFluxExecutor + from pympipool.flux.executor import PyFluxExecutor, PyFluxStepExecutor flux_installed = "FLUX_URI" in os.environ except ImportError: @@ -32,12 +32,11 @@ class Executor: an interactive Jupyter notebook. Args: - max_workers (int): defines the number workers which can execute functions in parallel + max_cores (int): defines the number cores which can be used in parallel cores_per_worker (int): number of MPI cores to be used for each function call threads_per_core (int): number of OpenMP threads to be used for each function call gpus_per_worker (int): number of GPUs per worker - defaults to 0 oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI only) - default False - init_function (None): optional function to preset arguments for functions which are submitted later cwd (str/None): current working directory where the parallel python task is executed hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the context of an HPC cluster this essential to be able to communicate to an @@ -46,6 +45,13 @@ class Executor: points to the same address as localhost. Still MacOS >= 12 seems to disable this look up for security reasons. So on MacOS it is required to set this option to true + backend (str): Switch between the different backends "flux", "mpi" or "slurm". Alternatively, when "auto" + is selected (the default) the available backend is determined automatically. + block_allocation (boolean): To accelerate the submission of a series of python functions with the same resource + requirements, pympipool supports block allocation. In this case all resources have + to be defined on the executor, rather than during the submission of the individual + function. + init_function (None): optional function to preset arguments for functions which are submitted later Examples: ``` @@ -70,32 +76,34 @@ class Executor: def __init__( self, - max_workers: int = 1, + max_cores: int = 1, cores_per_worker: int = 1, threads_per_core: int = 1, gpus_per_worker: int = 0, oversubscribe: bool = False, - init_function: Optional[callable] = None, cwd: Optional[str] = None, executor=None, hostname_localhost: bool = False, backend="auto", + block_allocation: bool = True, + init_function: Optional[callable] = None, ): # Use __new__() instead of __init__(). This function is only implemented to enable auto-completion. pass def __new__( cls, - max_workers: int = 1, + max_cores: int = 1, cores_per_worker: int = 1, threads_per_core: int = 1, gpus_per_worker: int = 0, oversubscribe: bool = False, - init_function: Optional[callable] = None, cwd: Optional[str] = None, executor=None, hostname_localhost: bool = False, backend: str = "auto", + block_allocation: bool = False, + init_function: Optional[callable] = None, ): """ Instead of returning a pympipool.Executor object this function returns either a pympipool.mpi.PyMPIExecutor, @@ -106,12 +114,11 @@ def __new__( requires the SLURM workload manager to be installed on the system. Args: - max_workers (int): defines the number workers which can execute functions in parallel + max_cores (int): defines the number cores which can be used in parallel cores_per_worker (int): number of MPI cores to be used for each function call threads_per_core (int): number of OpenMP threads to be used for each function call gpus_per_worker (int): number of GPUs per worker - defaults to 0 oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI only) - default False - init_function (None): optional function to preset arguments for functions which are submitted later cwd (str/None): current working directory where the parallel python task is executed hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the context of an HPC cluster this essential to be able to communicate to an @@ -122,8 +129,15 @@ def __new__( option to true backend (str): Switch between the different backends "flux", "mpi" or "slurm". Alternatively, when "auto" is selected (the default) the available backend is determined automatically. + block_allocation (boolean): To accelerate the submission of a series of python functions with the same + resource requirements, pympipool supports block allocation. In this case all + resources have to be defined on the executor, rather than during the submission + of the individual function. + init_function (None): optional function to preset arguments for functions which are submitted later """ + if not block_allocation and init_function is not None: + raise ValueError("") if backend not in ["auto", "mpi", "slurm", "flux"]: raise ValueError( 'The currently implemented backends are ["flux", "mpi", "slurm"]. ' @@ -137,23 +151,47 @@ def __new__( "Oversubscribing is not supported for the pympipool.flux.PyFLuxExecutor backend." "Please use oversubscribe=False instead of oversubscribe=True." ) - return PyFluxExecutor( - max_workers=max_workers, - cores_per_worker=cores_per_worker, - threads_per_core=threads_per_core, - gpus_per_worker=gpus_per_worker, - init_function=init_function, - cwd=cwd, - hostname_localhost=hostname_localhost, - ) + if block_allocation: + return PyFluxExecutor( + max_workers=int(max_cores / cores_per_worker), + cores_per_worker=cores_per_worker, + threads_per_core=threads_per_core, + gpus_per_worker=gpus_per_worker, + init_function=init_function, + cwd=cwd, + hostname_localhost=hostname_localhost, + ) + else: + return PyFluxStepExecutor( + max_cores=max_cores, + cores_per_worker=cores_per_worker, + threads_per_core=threads_per_core, + gpus_per_worker=gpus_per_worker, + cwd=cwd, + hostname_localhost=hostname_localhost, + ) elif backend == "slurm" or (backend == "auto" and slurm_installed): - return PySlurmExecutor( - max_workers=max_workers, - cores_per_worker=cores_per_worker, - init_function=init_function, - cwd=cwd, - hostname_localhost=hostname_localhost, - ) + if block_allocation: + return PySlurmExecutor( + max_workers=int(max_cores / cores_per_worker), + cores_per_worker=cores_per_worker, + threads_per_core=threads_per_core, + gpus_per_worker=gpus_per_worker, + oversubscribe=oversubscribe, + init_function=init_function, + cwd=cwd, + hostname_localhost=hostname_localhost, + ) + else: + return PySlurmStepExecutor( + max_cores=max_cores, + cores_per_worker=cores_per_worker, + threads_per_core=threads_per_core, + gpus_per_worker=gpus_per_worker, + oversubscribe=oversubscribe, + cwd=cwd, + hostname_localhost=hostname_localhost, + ) else: # backend="mpi" if threads_per_core != 1: raise TypeError( @@ -169,10 +207,18 @@ def __new__( + str(gpus_per_worker) + "." ) - return PyMPIExecutor( - max_workers=max_workers, - cores_per_worker=cores_per_worker, - init_function=init_function, - cwd=cwd, - hostname_localhost=hostname_localhost, - ) + if block_allocation: + return PyMPIExecutor( + max_workers=int(max_cores / cores_per_worker), + cores_per_worker=cores_per_worker, + init_function=init_function, + cwd=cwd, + hostname_localhost=hostname_localhost, + ) + else: + return PyMPIStepExecutor( + max_cores=max_cores, + cores_per_worker=cores_per_worker, + cwd=cwd, + hostname_localhost=hostname_localhost, + ) diff --git a/pympipool/flux/__init__.py b/pympipool/flux/__init__.py index 44e9e4f1..ba4b53b9 100644 --- a/pympipool/flux/__init__.py +++ b/pympipool/flux/__init__.py @@ -1 +1 @@ -from pympipool.flux.executor import PyFluxExecutor +from pympipool.flux.executor import PyFluxExecutor, PyFluxStepExecutor diff --git a/pympipool/flux/executor.py b/pympipool/flux/executor.py index 5856133a..32389de4 100644 --- a/pympipool/flux/executor.py +++ b/pympipool/flux/executor.py @@ -4,8 +4,10 @@ import flux.job from pympipool.shared.executorbase import ( - ExecutorBroker, execute_parallel_tasks, + execute_separate_tasks, + ExecutorBroker, + ExecutorSteps, ) from pympipool.shared.interface import BaseInterface from pympipool.shared.thread import RaisingThread @@ -90,6 +92,79 @@ def __init__( ) +class PyFluxStepExecutor(ExecutorSteps): + """ + The pympipool.flux.PyFluxStepExecutor leverages the flux framework to distribute python tasks within a queuing + system allocation. In analogy to the pympipool.slurm.PySlurmExecutur it provides the option to specify the number + of threads per worker as well as the number of GPUs per worker in addition to specifying the number of cores per + worker. + + Args: + max_cores (int): defines the number workers which can execute functions in parallel + cores_per_worker (int): number of MPI cores to be used for each function call + threads_per_core (int): number of OpenMP threads to be used for each function call + gpus_per_worker (int): number of GPUs per worker - defaults to 0 + init_function (None): optional function to preset arguments for functions which are submitted later + cwd (str/None): current working directory where the parallel python task is executed + executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux + hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the + context of an HPC cluster this essential to be able to communicate to an + Executor running on a different compute node within the same allocation. And + in principle any computer should be able to resolve that their own hostname + points to the same address as localhost. Still MacOS >= 12 seems to disable + this look up for security reasons. So on MacOS it is required to set this + option to true + + Examples: + + >>> import numpy as np + >>> from pympipool.flux import PyFluxStepExecutor + >>> + >>> def calc(i, j, k): + >>> from mpi4py import MPI + >>> size = MPI.COMM_WORLD.Get_size() + >>> rank = MPI.COMM_WORLD.Get_rank() + >>> return np.array([i, j, k]), size, rank + >>> + >>> with PyFluxStepExecutor(max_cores=2) as p: + >>> fs = p.submit(calc, 2, j=4, k=3, resource_dict={"cores": 2}) + >>> print(fs.result()) + + [(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)] + + """ + + def __init__( + self, + max_cores: int = 1, + cores_per_worker: int = 1, + threads_per_core: int = 1, + gpus_per_worker: int = 0, + cwd: Optional[str] = None, + executor: Optional[flux.job.FluxExecutor] = None, + hostname_localhost: Optional[bool] = False, + ): + super().__init__() + self._set_process( + RaisingThread( + target=execute_separate_tasks, + kwargs={ + # Executor Arguments + "future_queue": self._future_queue, + "cores": cores_per_worker, + "interface_class": FluxPythonInterface, + "max_cores": max_cores, + "hostname_localhost": hostname_localhost, + # Interface Arguments + "threads_per_core": threads_per_core, + "gpus_per_core": int(gpus_per_worker / cores_per_worker), + "cwd": cwd, + "executor": executor, + }, + ) + ) + + class FluxPythonInterface(BaseInterface): def __init__( self, diff --git a/pympipool/mpi/__init__.py b/pympipool/mpi/__init__.py index 24aa8369..3ec1c1ff 100644 --- a/pympipool/mpi/__init__.py +++ b/pympipool/mpi/__init__.py @@ -1 +1 @@ -from pympipool.mpi.executor import PyMPIExecutor +from pympipool.mpi.executor import PyMPIExecutor, PyMPIStepExecutor diff --git a/pympipool/mpi/executor.py b/pympipool/mpi/executor.py index b1e39048..0cbec602 100644 --- a/pympipool/mpi/executor.py +++ b/pympipool/mpi/executor.py @@ -2,7 +2,9 @@ from pympipool.shared.executorbase import ( execute_parallel_tasks, + execute_separate_tasks, ExecutorBroker, + ExecutorSteps, ) from pympipool.shared.interface import MpiExecInterface from pympipool.shared.thread import RaisingThread @@ -80,3 +82,70 @@ def __init__( for _ in range(max_workers) ], ) + + +class PyMPIStepExecutor(ExecutorSteps): + """ + The pympipool.mpi.PyMPIStepExecutor leverages the message passing interface MPI to distribute python tasks within an + MPI allocation. In contrast to the mpi4py.futures.MPIPoolExecutor the pympipool.mpi.PyMPIStepExecutor can be executed + in a serial python process and does not require the python script to be executed with MPI. Consequently, it is + primarily an abstraction of its functionality to improve the usability in particular when used in combination with \ + Jupyter notebooks. + + Args: + max_cores (int): defines the number cores which can be used in parallel + cores_per_worker (int): number of MPI cores to be used for each function call + oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI only) - default False + cwd (str/None): current working directory where the parallel python task is executed + hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the + context of an HPC cluster this essential to be able to communicate to an + Executor running on a different compute node within the same allocation. And + in principle any computer should be able to resolve that their own hostname + points to the same address as localhost. Still MacOS >= 12 seems to disable + this look up for security reasons. So on MacOS it is required to set this + option to true + + Examples: + + >>> import numpy as np + >>> from pympipool.mpi import PyMPIStepExecutor + >>> + >>> def calc(i, j, k): + >>> from mpi4py import MPI + >>> size = MPI.COMM_WORLD.Get_size() + >>> rank = MPI.COMM_WORLD.Get_rank() + >>> return np.array([i, j, k]), size, rank + >>> + >>> with PyMPIStepExecutor(max_cores=2) as p: + >>> fs = p.submit(calc, 2, j=4, k=3, resource_dict={"cores": 2}) + >>> print(fs.result()) + + [(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)] + + """ + + def __init__( + self, + max_cores: int = 1, + cores_per_worker: int = 1, + oversubscribe: bool = False, + cwd: Optional[str] = None, + hostname_localhost: bool = False, + ): + super().__init__() + self._set_process( + RaisingThread( + target=execute_separate_tasks, + kwargs={ + # Executor Arguments + "future_queue": self._future_queue, + "cores": cores_per_worker, + "interface_class": MpiExecInterface, + "max_cores": max_cores, + "hostname_localhost": hostname_localhost, + # Interface Arguments + "cwd": cwd, + "oversubscribe": oversubscribe, + }, + ) + ) diff --git a/pympipool/shared/executorbase.py b/pympipool/shared/executorbase.py index 00f974c3..f4ba9bb3 100644 --- a/pympipool/shared/executorbase.py +++ b/pympipool/shared/executorbase.py @@ -41,21 +41,42 @@ def info(self): def future_queue(self): return self._future_queue - def submit(self, fn: callable, *args, **kwargs): - """Submits a callable to be executed with the given arguments. + def submit(self, fn: callable, *args, resource_dict: dict = {}, **kwargs): + """ + Submits a callable to be executed with the given arguments. Schedules the callable to be executed as fn(*args, **kwargs) and returns a Future instance representing the execution of the callable. + Args: + fn (callable): function to submit for execution + args: arguments for the submitted function + kwargs: keyword arguments for the submitted function + resource_dict (dict): resource dictionary, which defines the resources used for the execution of the + function. Example resource dictionary: { + cores: 1, + threads_per_core: 1, + gpus_per_worker: 0, + oversubscribe: False, + cwd: None, + executor: None, + hostname_localhost: False, + } + Returns: A Future representing the given call. """ + if len(resource_dict) > 0: + raise ValueError( + "When block_allocation is enabled, the resource requirements have to be defined on the executor level." + ) f = Future() self._future_queue.put({"fn": fn, "args": args, "kwargs": kwargs, "future": f}) return f def shutdown(self, wait: bool = True, *, cancel_futures: bool = False): - """Clean-up the resources associated with the Executor. + """ + Clean-up the resources associated with the Executor. It is safe to call this method several times. Otherwise, no other methods can be called after this one. @@ -124,6 +145,69 @@ def _set_process(self, process: List[RaisingThread]): process.start() +class ExecutorSteps(ExecutorBase): + def submit(self, fn: callable, *args, resource_dict: dict = {}, **kwargs): + """ + Submits a callable to be executed with the given arguments. + + Schedules the callable to be executed as fn(*args, **kwargs) and returns + a Future instance representing the execution of the callable. + + Args: + fn (callable): function to submit for execution + args: arguments for the submitted function + kwargs: keyword arguments for the submitted function + resource_dict (dict): resource dictionary, which defines the resources used for the execution of the + function. Example resource dictionary: { + cores: 1, + threads_per_core: 1, + gpus_per_worker: 0, + oversubscribe: False, + cwd: None, + executor: None, + hostname_localhost: False, + } + + Returns: + A Future representing the given call. + """ + f = Future() + self._future_queue.put( + { + "fn": fn, + "args": args, + "kwargs": kwargs, + "future": f, + "resource_dict": resource_dict, + } + ) + return f + + def shutdown(self, wait: bool = True, *, cancel_futures: bool = False): + """Clean-up the resources associated with the Executor. + + It is safe to call this method several times. Otherwise, no other + methods can be called after this one. + + Args: + wait: If True then shutdown will not return until all running + futures have finished executing and the resources used by the + parallel_executors have been reclaimed. + cancel_futures: If True then shutdown will cancel all pending + futures. Futures that are completed or running will not be + cancelled. + """ + if cancel_futures: + cancel_items_in_queue(que=self._future_queue) + if self._process is not None: + self._future_queue.put({"shutdown": True, "wait": wait}) + if wait: + self._process.join() + self._future_queue.join() + self._process = None + self._future_queue = None + + def cancel_items_in_queue(que: queue.Queue): """ Cancel items which are still waiting in the queue. If the executor is busy tasks remain in the queue, so the future @@ -218,6 +302,70 @@ def execute_parallel_tasks( future_queue.task_done() +def execute_separate_tasks( + future_queue: queue.Queue, + interface_class: BaseInterface, + max_cores: int, + hostname_localhost: bool = False, + **kwargs, +): + """ + Execute a single tasks in parallel using the message passing interface (MPI). + + Args: + future_queue (queue.Queue): task queue of dictionary objects which are submitted to the parallel process + interface_class (BaseInterface): Interface to start process on selected compute resources + max_cores (int): defines the number cores which can be used in parallel + hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the + context of an HPC cluster this essential to be able to communicate to an + Executor running on a different compute node within the same allocation. And + in principle any computer should be able to resolve that their own hostname + points to the same address as localhost. Still MacOS >= 12 seems to disable + this look up for security reasons. So on MacOS it is required to set this + option to true + """ + active_task_dict = {} + process_lst = [] + while True: + task_dict = future_queue.get() + if "shutdown" in task_dict.keys() and task_dict["shutdown"]: + if task_dict["wait"]: + _ = [process.join() for process in process_lst] + future_queue.task_done() + future_queue.join() + break + elif "fn" in task_dict.keys() and "future" in task_dict.keys(): + active_task_dict = _wait_for_free_slots( + active_task_dict=active_task_dict, + max_cores=max_cores, + ) + resource_dict = task_dict.pop("resource_dict") + qtask = queue.Queue() + qtask.put(task_dict) + qtask.put({"shutdown": True, "wait": True}) + if "cores" in resource_dict.keys(): + active_task_dict[task_dict["future"]] = resource_dict["cores"] + else: + active_task_dict[task_dict["future"]] = kwargs["cores"] + task_kwargs = kwargs.copy() + task_kwargs.update(resource_dict) + task_kwargs.update( + { + "future_queue": qtask, + "interface_class": interface_class, + "hostname_localhost": hostname_localhost, + "init_function": None, + } + ) + process = RaisingThread( + target=execute_parallel_tasks, + kwargs=task_kwargs, + ) + process.start() + process_lst.append(process) + future_queue.task_done() + + def _get_backend_path(cores: int): command_lst = [sys.executable] if cores > 1: @@ -229,3 +377,9 @@ def _get_backend_path(cores: int): def _get_command_path(executable: str): return os.path.abspath(os.path.join(__file__, "..", "..", "backend", executable)) + + +def _wait_for_free_slots(active_task_dict, max_cores): + while sum(active_task_dict.values()) >= max_cores: + active_task_dict = {k: v for k, v in active_task_dict.items() if not k.done()} + return active_task_dict diff --git a/pympipool/slurm/__init__.py b/pympipool/slurm/__init__.py index ed4aeab3..0bdf170f 100644 --- a/pympipool/slurm/__init__.py +++ b/pympipool/slurm/__init__.py @@ -1 +1 @@ -from pympipool.slurm.executor import PySlurmExecutor +from pympipool.slurm.executor import PySlurmExecutor, PySlurmStepExecutor diff --git a/pympipool/slurm/executor.py b/pympipool/slurm/executor.py index a587b73e..ed6ddc23 100644 --- a/pympipool/slurm/executor.py +++ b/pympipool/slurm/executor.py @@ -1,7 +1,9 @@ from typing import Optional from pympipool.shared.executorbase import ( execute_parallel_tasks, + execute_separate_tasks, ExecutorBroker, + ExecutorSteps, ) from pympipool.shared.interface import SrunInterface from pympipool.shared.thread import RaisingThread @@ -86,3 +88,76 @@ def __init__( for _ in range(max_workers) ], ) + + +class PySlurmStepExecutor(ExecutorSteps): + """ + The pympipool.slurm.PySlurmStepExecutor leverages the srun command to distribute python tasks within a SLURM queuing + system allocation. In analogy to the pympipool.flux.PyFluxExecutor it provides the option to specify the number of + threads per worker as well as the number of GPUs per worker in addition to specifying the number of cores per + worker. + + Args: + max_cores (int): defines the number cores which can be used in parallel + cores_per_worker (int): number of MPI cores to be used for each function call + threads_per_core (int): number of OpenMP threads to be used for each function call + gpus_per_worker (int): number of GPUs per worker - defaults to 0 + oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI only) - default False + cwd (str/None): current working directory where the parallel python task is executed + hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the + context of an HPC cluster this essential to be able to communicate to an + Executor running on a different compute node within the same allocation. And + in principle any computer should be able to resolve that their own hostname + points to the same address as localhost. Still MacOS >= 12 seems to disable + this look up for security reasons. So on MacOS it is required to set this + option to true + + Examples: + + >>> import numpy as np + >>> from pympipool.slurm import PySlurmStepExecutor + >>> + >>> def calc(i, j, k): + >>> from mpi4py import MPI + >>> size = MPI.COMM_WORLD.Get_size() + >>> rank = MPI.COMM_WORLD.Get_rank() + >>> return np.array([i, j, k]), size, rank + >>> + >>> with PySlurmStepExecutor(max_cores=2) as p: + >>> fs = p.submit(calc, 2, j=4, k=3, resource_dict={"cores": 2}) + >>> print(fs.result()) + + [(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)] + """ + + def __init__( + self, + max_cores: int = 1, + cores_per_worker: int = 1, + threads_per_core: int = 1, + gpus_per_worker: int = 0, + oversubscribe: bool = False, + cwd: Optional[str] = None, + hostname_localhost: bool = False, + command_line_argument_lst: list[str] = [], + ): + super().__init__() + self._set_process( + RaisingThread( + target=execute_separate_tasks, + kwargs={ + # Executor Arguments + "future_queue": self._future_queue, + "cores": cores_per_worker, + "interface_class": SrunInterface, + "max_cores": max_cores, + "hostname_localhost": hostname_localhost, + # Interface Arguments + "threads_per_core": threads_per_core, + "gpus_per_core": int(gpus_per_worker / cores_per_worker), + "cwd": cwd, + "oversubscribe": oversubscribe, + "command_line_argument_lst": command_line_argument_lst, + }, + ) + ) diff --git a/tests/test_executor_backend_flux.py b/tests/test_executor_backend_flux.py index 708b6137..13b92b0d 100644 --- a/tests/test_executor_backend_flux.py +++ b/tests/test_executor_backend_flux.py @@ -49,9 +49,10 @@ def setUp(self): def test_flux_executor_serial(self): with Executor( - max_workers=2, + max_cores=2, executor=self.executor, backend="flux", + block_allocation=True, ) as exe: fs_1 = exe.submit(calc, 1) fs_2 = exe.submit(calc, 2) @@ -62,10 +63,11 @@ def test_flux_executor_serial(self): def test_flux_executor_threads(self): with Executor( - max_workers=1, + max_cores=1, threads_per_core=2, executor=self.executor, backend="flux", + block_allocation=True, ) as exe: fs_1 = exe.submit(calc, 1) fs_2 = exe.submit(calc, 2) @@ -76,10 +78,11 @@ def test_flux_executor_threads(self): def test_flux_executor_parallel(self): with Executor( - max_workers=1, + max_cores=2, cores_per_worker=2, executor=self.executor, backend="flux", + block_allocation=True, ) as exe: fs_1 = exe.submit(mpi_funct, 1) self.assertEqual(fs_1.result(), [(1, 2, 0), (1, 2, 1)]) @@ -87,10 +90,11 @@ def test_flux_executor_parallel(self): def test_single_task(self): with Executor( - max_workers=1, + max_cores=2, cores_per_worker=2, executor=self.executor, backend="flux", + block_allocation=True, ) as p: output = p.map(mpi_funct, [1, 2, 3]) self.assertEqual( @@ -100,11 +104,12 @@ def test_single_task(self): def test_internal_memory(self): with Executor( - max_workers=1, + max_cores=1, cores_per_worker=1, init_function=set_global, executor=self.executor, backend="flux", + block_allocation=True, ) as p: f = p.submit(get_global) self.assertFalse(f.done()) diff --git a/tests/test_executor_backend_mpi.py b/tests/test_executor_backend_mpi.py index fbc4a236..27798b5c 100644 --- a/tests/test_executor_backend_mpi.py +++ b/tests/test_executor_backend_mpi.py @@ -19,7 +19,7 @@ def mpi_funct(i): class TestExecutorBackend(unittest.TestCase): def test_meta_executor_serial(self): with Executor( - max_workers=2, hostname_localhost=True, backend="mpi" + max_cores=2, hostname_localhost=True, backend="mpi", block_allocation=True ) as exe: cloudpickle_register(ind=1) fs_1 = exe.submit(calc, 1) @@ -31,7 +31,7 @@ def test_meta_executor_serial(self): def test_meta_executor_single(self): with Executor( - max_workers=1, hostname_localhost=True, backend="mpi" + max_cores=1, hostname_localhost=True, backend="mpi", block_allocation=True ) as exe: cloudpickle_register(ind=1) fs_1 = exe.submit(calc, 1) @@ -43,10 +43,11 @@ def test_meta_executor_single(self): def test_meta_executor_parallel(self): with Executor( - max_workers=1, + max_cores=2, cores_per_worker=2, hostname_localhost=True, backend="mpi", + block_allocation=True, ) as exe: cloudpickle_register(ind=1) fs_1 = exe.submit(mpi_funct, 1) @@ -56,7 +57,7 @@ def test_meta_executor_parallel(self): def test_errors(self): with self.assertRaises(TypeError): Executor( - max_workers=1, + max_cores=1, cores_per_worker=1, threads_per_core=2, hostname_localhost=True, @@ -64,7 +65,7 @@ def test_errors(self): ) with self.assertRaises(TypeError): Executor( - max_workers=1, + max_cores=1, cores_per_worker=1, gpus_per_worker=1, hostname_localhost=True, diff --git a/tests/test_executor_backend_mpi_noblock.py b/tests/test_executor_backend_mpi_noblock.py new file mode 100644 index 00000000..0d040dd0 --- /dev/null +++ b/tests/test_executor_backend_mpi_noblock.py @@ -0,0 +1,60 @@ +import unittest + +from pympipool import Executor +from pympipool.shared.executorbase import cloudpickle_register + + +def calc(i): + return i + + +def mpi_funct(i): + from mpi4py import MPI + + size = MPI.COMM_WORLD.Get_size() + rank = MPI.COMM_WORLD.Get_rank() + return i, size, rank + + +class TestExecutorBackend(unittest.TestCase): + def test_meta_executor_serial(self): + with Executor( + max_cores=2, hostname_localhost=True, backend="mpi", block_allocation=False + ) as exe: + cloudpickle_register(ind=1) + fs_1 = exe.submit(calc, 1) + fs_2 = exe.submit(calc, 2) + self.assertEqual(fs_1.result(), 1) + self.assertEqual(fs_2.result(), 2) + self.assertTrue(fs_1.done()) + self.assertTrue(fs_2.done()) + + def test_meta_executor_single(self): + with Executor( + max_cores=1, hostname_localhost=True, backend="mpi", block_allocation=False + ) as exe: + cloudpickle_register(ind=1) + fs_1 = exe.submit(calc, 1) + fs_2 = exe.submit(calc, 2) + self.assertEqual(fs_1.result(), 1) + self.assertEqual(fs_2.result(), 2) + self.assertTrue(fs_1.done()) + self.assertTrue(fs_2.done()) + + def test_errors(self): + with self.assertRaises(TypeError): + Executor( + max_cores=1, + cores_per_worker=1, + threads_per_core=2, + hostname_localhost=True, + backend="mpi", + ) + with self.assertRaises(TypeError): + Executor( + max_cores=1, + cores_per_worker=1, + gpus_per_worker=1, + hostname_localhost=True, + backend="mpi", + ) diff --git a/tests/test_integration_pyiron_workflow.py b/tests/test_integration_pyiron_workflow.py index d3f0ceda..e9d101f2 100644 --- a/tests/test_integration_pyiron_workflow.py +++ b/tests/test_integration_pyiron_workflow.py @@ -74,7 +74,7 @@ def slowly_returns_dynamic(dynamic_arg): return dynamic_arg dynamic_dynamic = slowly_returns_dynamic() - executor = Executor(hostname_localhost=True) + executor = Executor(hostname_localhost=True, block_allocation=True) cloudpickle_register(ind=1) dynamic_object = does_nothing() fs = executor.submit(dynamic_dynamic.run, dynamic_object) @@ -104,7 +104,7 @@ def slowly_returns_42(): self.assertIsNone( dynamic_42.result, msg="Just a sanity check that the test is set up right" ) - executor = Executor(hostname_localhost=True) + executor = Executor(hostname_localhost=True, block_allocation=True) cloudpickle_register(ind=1) fs = executor.submit(dynamic_42.run) fs.add_done_callback(dynamic_42.process_result) @@ -135,7 +135,7 @@ def returns_42(): dynamic_42.running, msg="Sanity check that the test starts in the expected condition", ) - executor = Executor(hostname_localhost=True) + executor = Executor(hostname_localhost=True, block_allocation=True) cloudpickle_register(ind=1) fs = executor.submit(dynamic_42.run) fs.add_done_callback(dynamic_42.process_result) @@ -159,7 +159,7 @@ def raise_error(): raise RuntimeError re = raise_error() - executor = Executor(hostname_localhost=True) + executor = Executor(hostname_localhost=True, block_allocation=True) cloudpickle_register(ind=1) fs = executor.submit(re.run) with self.assertRaises( @@ -189,7 +189,7 @@ def slowly_returns_dynamic(): return inside_variable dynamic_dynamic = slowly_returns_dynamic() - executor = Executor(hostname_localhost=True) + executor = Executor(hostname_localhost=True, block_allocation=True) cloudpickle_register(ind=1) fs = executor.submit(dynamic_dynamic.run) self.assertIsInstance( @@ -218,7 +218,7 @@ def slow(): return fortytwo f = slow() - executor = Executor(hostname_localhost=True) + executor = Executor(hostname_localhost=True, block_allocation=True) cloudpickle_register(ind=1) fs = executor.submit(f.run) self.assertEqual( diff --git a/tests/test_mpi_executor.py b/tests/test_mpi_executor.py index 1803d6d4..707ddf48 100644 --- a/tests/test_mpi_executor.py +++ b/tests/test_mpi_executor.py @@ -5,7 +5,7 @@ import numpy as np -from pympipool.mpi.executor import PyMPIExecutor, MpiExecInterface +from pympipool.mpi.executor import PyMPIExecutor, PyMPIStepExecutor, MpiExecInterface from pympipool.shared.backend import call_funct from pympipool.shared.executorbase import ( cloudpickle_register, @@ -89,6 +89,44 @@ def test_pympiexecutor_errors(self): ) +class TestPyMpiExecutorStepSerial(unittest.TestCase): + def test_pympiexecutor_two_workers(self): + with PyMPIStepExecutor(max_cores=2, hostname_localhost=True) as exe: + cloudpickle_register(ind=1) + fs_1 = exe.submit(calc, 1) + fs_2 = exe.submit(calc, 2) + self.assertEqual(fs_1.result(), 1) + self.assertEqual(fs_2.result(), 2) + self.assertTrue(fs_1.done()) + self.assertTrue(fs_2.done()) + + def test_pympiexecutor_one_worker(self): + with PyMPIStepExecutor(max_cores=1, hostname_localhost=True) as exe: + cloudpickle_register(ind=1) + fs_1 = exe.submit(calc, 1) + fs_2 = exe.submit(calc, 2) + self.assertEqual(fs_1.result(), 1) + self.assertEqual(fs_2.result(), 2) + self.assertTrue(fs_1.done()) + self.assertTrue(fs_2.done()) + + def test_pympiexecutor_errors(self): + with self.assertRaises(TypeError): + PyMPIStepExecutor( + max_cores=1, + cores_per_worker=1, + threads_per_core=2, + hostname_localhost=True, + ) + with self.assertRaises(TypeError): + PyMPIStepExecutor( + max_cores=1, + cores_per_worker=1, + gpus_per_worker=1, + hostname_localhost=True, + ) + + class TestPyMpiExecutorMPI(unittest.TestCase): def test_pympiexecutor_one_worker_with_mpi(self): with PyMPIExecutor( @@ -126,6 +164,43 @@ def test_pympiexecutor_one_worker_with_mpi_echo(self): self.assertEqual(output, [2, 2]) +class TestPyMpiStepExecutorMPI(unittest.TestCase): + def test_pympiexecutor_one_worker_with_mpi(self): + with PyMPIStepExecutor( + max_cores=2, cores_per_worker=2, hostname_localhost=True + ) as exe: + cloudpickle_register(ind=1) + fs_1 = exe.submit(mpi_funct, 1) + self.assertEqual(fs_1.result(), [(1, 2, 0), (1, 2, 1)]) + self.assertTrue(fs_1.done()) + + def test_pympiexecutor_one_worker_with_mpi_multiple_submissions(self): + with PyMPIStepExecutor( + max_cores=2, cores_per_worker=2, hostname_localhost=True + ) as p: + cloudpickle_register(ind=1) + fs1 = p.submit(mpi_funct, 1) + fs2 = p.submit(mpi_funct, 2) + fs3 = p.submit(mpi_funct, 3) + output = [ + fs1.result(), + fs2.result(), + fs3.result(), + ] + self.assertEqual( + output, + [[(1, 2, 0), (1, 2, 1)], [(2, 2, 0), (2, 2, 1)], [(3, 2, 0), (3, 2, 1)]], + ) + + def test_pympiexecutor_one_worker_with_mpi_echo(self): + with PyMPIStepExecutor( + max_cores=2, cores_per_worker=2, hostname_localhost=True + ) as p: + cloudpickle_register(ind=1) + output = p.submit(echo_funct, 2).result() + self.assertEqual(output, [2, 2]) + + class TestPyMpiExecutorInitFunction(unittest.TestCase): def test_internal_memory(self): with PyMPIExecutor( @@ -246,6 +321,24 @@ def test_meta(self): with ExecutorBase() as exe: self.assertIsNone(exe.info) + def test_meta_step(self): + meta_data_exe_dict = { + "cores": 2, + "interface_class": "", + "hostname_localhost": True, + "cwd": None, + "oversubscribe": False, + "max_cores": 2, + } + with PyMPIStepExecutor( + max_cores=2, cores_per_worker=2, hostname_localhost=True + ) as exe: + for k, v in meta_data_exe_dict.items(): + if k != "interface_class": + self.assertEqual(exe.info[k], v) + else: + self.assertEqual(str(exe.info[k]), v) + def test_pool_multi_core(self): with PyMPIExecutor( max_workers=1, cores_per_worker=2, hostname_localhost=True