diff --git a/pympipool/__init__.py b/pympipool/__init__.py index fb39ce25..80edaf05 100644 --- a/pympipool/__init__.py +++ b/pympipool/__init__.py @@ -1,19 +1,3 @@ -from pympipool.shared.communication import ( - SocketInterface, - interface_connect, - interface_bootup, - interface_send, - interface_shutdown, - interface_receive, -) -from pympipool.interfaces.taskbroker import HPCExecutor -from pympipool.interfaces.fluxbroker import PyFluxExecutor -from pympipool.interfaces.taskexecutor import Executor -from pympipool.legacy.interfaces.executor import PoolExecutor -from pympipool.legacy.interfaces.pool import Pool, MPISpawnPool -from pympipool.shared.thread import RaisingThread -from pympipool.shared.taskexecutor import cancel_items_in_queue - from ._version import get_versions __version__ = get_versions()["version"] diff --git a/pympipool/flux/__init__.py b/pympipool/flux/__init__.py new file mode 100644 index 00000000..26f0a677 --- /dev/null +++ b/pympipool/flux/__init__.py @@ -0,0 +1 @@ +from pympipool.flux.fluxbroker import PyFluxExecutor diff --git a/pympipool/flux/fluxbroker.py b/pympipool/flux/fluxbroker.py new file mode 100644 index 00000000..0439549d --- /dev/null +++ b/pympipool/flux/fluxbroker.py @@ -0,0 +1,64 @@ +from pympipool.shared.executorbase import ( + ExecutorBase, + executor_broker, + get_executor_dict, +) +from pympipool.shared.thread import RaisingThread +from pympipool.flux.fluxtask import PyFluxSingleTaskExecutor + + +class PyFluxExecutor(ExecutorBase): + def __init__( + self, + max_workers, + cores_per_worker=1, + threads_per_core=1, + gpus_per_worker=0, + init_function=None, + cwd=None, + sleep_interval=0.1, + executor=None, + ): + super().__init__() + self._process = RaisingThread( + target=_flux_executor_broker, + kwargs={ + "future_queue": self._future_queue, + "max_workers": max_workers, + "cores_per_worker": cores_per_worker, + "threads_per_core": threads_per_core, + "gpus_per_worker": gpus_per_worker, + "init_function": init_function, + "cwd": cwd, + "sleep_interval": sleep_interval, + "executor": executor, + }, + ) + self._process.start() + + +def _flux_executor_broker( + future_queue, + max_workers, + cores_per_worker=1, + threads_per_core=1, + gpus_per_worker=0, + init_function=None, + cwd=None, + sleep_interval=0.1, + executor=None, +): + executor_broker( + future_queue=future_queue, + meta_future_lst=get_executor_dict( + max_workers=max_workers, + executor_class=PyFluxSingleTaskExecutor, + cores=cores_per_worker, + threads_per_core=threads_per_core, + gpus_per_task=int(gpus_per_worker / cores_per_worker), + init_function=init_function, + cwd=cwd, + executor=executor, + ), + sleep_interval=sleep_interval, + ) diff --git a/pympipool/flux/fluxtask.py b/pympipool/flux/fluxtask.py new file mode 100644 index 00000000..05fc3697 --- /dev/null +++ b/pympipool/flux/fluxtask.py @@ -0,0 +1,166 @@ +import os + +import flux.job + +from pympipool.shared.executorbase import ( + cloudpickle_register, + ExecutorBase, + execute_parallel_tasks_loop, + get_backend_path, +) +from pympipool.shared.interface import BaseInterface +from pympipool.shared.communication import interface_bootup +from pympipool.shared.thread import RaisingThread + + +class PyFluxSingleTaskExecutor(ExecutorBase): + """ + The pympipool.Executor behaves like the concurrent.futures.Executor but it uses mpi4py to execute parallel tasks. + In contrast to the mpi4py.futures.MPIPoolExecutor the pympipool.Executor can be executed in a serial python process + and does not require the python script to be executed with MPI. Still internally the pympipool.Executor uses the + mpi4py.futures.MPIPoolExecutor, consequently it is primarily an abstraction of its functionality to improve the + usability in particular when used in combination with Jupyter notebooks. + + Args: + cores (int): defines the number of MPI ranks to use for each function call + threads_per_core (int): number of OpenMP threads to be used for each function call + gpus_per_task (int): number of GPUs per MPI rank - defaults to 0 + init_function (None): optional function to preset arguments for functions which are submitted later + cwd (str/None): current working directory where the parallel python task is executed + + Examples: + ``` + >>> import numpy as np + >>> from pympipool.flux.fluxtask import PyFluxSingleTaskExecutor + >>> + >>> def calc(i, j, k): + >>> from mpi4py import MPI + >>> size = MPI.COMM_WORLD.Get_size() + >>> rank = MPI.COMM_WORLD.Get_rank() + >>> return np.array([i, j, k]), size, rank + >>> + >>> def init_k(): + >>> return {"k": 3} + >>> + >>> with PyFluxSingleTaskExecutor(cores=2, init_function=init_k) as p: + >>> fs = p.submit(calc, 2, j=4) + >>> print(fs.result()) + + [(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)] + ``` + """ + + def __init__( + self, + cores=1, + threads_per_core=1, + gpus_per_task=0, + init_function=None, + cwd=None, + executor=None, + ): + super().__init__() + self._process = RaisingThread( + target=_flux_execute_parallel_tasks, + kwargs={ + "future_queue": self._future_queue, + "cores": cores, + "threads_per_core": threads_per_core, + "gpus_per_task": gpus_per_task, + "cwd": cwd, + "executor": executor, + }, + ) + self._process.start() + if init_function is not None: + self._future_queue.put( + {"init": True, "fn": init_function, "args": (), "kwargs": {}} + ) + cloudpickle_register(ind=3) + + +class FluxPythonInterface(BaseInterface): + def __init__( + self, + cwd=None, + cores=1, + threads_per_core=1, + gpus_per_core=0, + oversubscribe=False, + executor=None, + ): + super().__init__( + cwd=cwd, + cores=cores, + gpus_per_core=gpus_per_core, + threads_per_core=threads_per_core, + oversubscribe=oversubscribe, + ) + self._executor = executor + self._future = None + + def bootup(self, command_lst): + if self._oversubscribe: + raise ValueError( + "Oversubscribing is currently not supported for the Flux adapter." + ) + if self._executor is None: + self._executor = flux.job.FluxExecutor() + jobspec = flux.job.JobspecV1.from_command( + command=command_lst, + num_tasks=self._cores, + cores_per_task=self._threads_per_core, + gpus_per_task=self._gpus_per_core, + num_nodes=None, + exclusive=False, + ) + jobspec.environment = dict(os.environ) + if self._cwd is not None: + jobspec.cwd = self._cwd + self._future = self._executor.submit(jobspec) + + def shutdown(self, wait=True): + if self.poll(): + self._future.cancel() + # The flux future objects are not instantly updated, + # still showing running after cancel was called, + # so we wait until the execution is completed. + self._future.result() + + def poll(self): + return self._future is not None and not self._future.done() + + +def _flux_execute_parallel_tasks( + future_queue, + cores, + threads_per_core=1, + gpus_per_task=0, + cwd=None, + executor=None, +): + """ + Execute a single tasks in parallel using the message passing interface (MPI). + + Args: + future_queue (queue.Queue): task queue of dictionary objects which are submitted to the parallel process + cores (int): defines the total number of MPI ranks to use + threads_per_core (int): number of OpenMP threads to be used for each function call + gpus_per_task (int): number of GPUs per MPI rank - defaults to 0 + cwd (str/None): current working directory where the parallel python task is executed + executor (flux.job.FluxExecutor/None): flux executor to submit tasks to - optional + """ + execute_parallel_tasks_loop( + interface=interface_bootup( + command_lst=get_backend_path(cores=cores), + connections=FluxPythonInterface( + cwd=cwd, + cores=cores, + threads_per_core=threads_per_core, + gpus_per_core=gpus_per_task, + oversubscribe=False, + executor=executor, + ), + ), + future_queue=future_queue, + ) diff --git a/pympipool/interfaces/__init__.py b/pympipool/interfaces/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/pympipool/interfaces/fluxbroker.py b/pympipool/interfaces/fluxbroker.py deleted file mode 100644 index 9a4dc2fa..00000000 --- a/pympipool/interfaces/fluxbroker.py +++ /dev/null @@ -1,239 +0,0 @@ -import os -import queue -from socket import gethostname -import sys -from time import sleep - -from pympipool.shared.broker import ( - get_future_done, - execute_task_dict, -) -from pympipool.shared.base import ExecutorBase -from pympipool.shared.thread import RaisingThread -from pympipool.shared.taskexecutor import ( - cloudpickle_register, - execute_parallel_tasks_loop, -) -from pympipool.shared.connections import FluxPythonInterface -from pympipool.shared.communication import SocketInterface - - -class SingleTaskExecutor(ExecutorBase): - """ - The pympipool.Executor behaves like the concurrent.futures.Executor but it uses mpi4py to execute parallel tasks. - In contrast to the mpi4py.futures.MPIPoolExecutor the pympipool.Executor can be executed in a serial python process - and does not require the python script to be executed with MPI. Still internally the pympipool.Executor uses the - mpi4py.futures.MPIPoolExecutor, consequently it is primarily an abstraction of its functionality to improve the - usability in particular when used in combination with Jupyter notebooks. - - Args: - cores (int): defines the number of MPI ranks to use for each function call - threads_per_core (int): number of OpenMP threads to be used for each function call - gpus_per_task (int): number of GPUs per MPI rank - defaults to 0 - init_function (None): optional function to preset arguments for functions which are submitted later - cwd (str/None): current working directory where the parallel python task is executed - - Examples: - ``` - >>> import numpy as np - >>> from pympipool import Executor - >>> - >>> def calc(i, j, k): - >>> from mpi4py import MPI - >>> size = MPI.COMM_WORLD.Get_size() - >>> rank = MPI.COMM_WORLD.Get_rank() - >>> return np.array([i, j, k]), size, rank - >>> - >>> def init_k(): - >>> return {"k": 3} - >>> - >>> with Executor(cores=2, init_function=init_k) as p: - >>> fs = p.submit(calc, 2, j=4) - >>> print(fs.result()) - - [(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)] - ``` - """ - - def __init__( - self, - cores, - threads_per_core=1, - gpus_per_task=0, - init_function=None, - cwd=None, - executor=None, - ): - super().__init__() - self._process = RaisingThread( - target=execute_parallel_tasks, - kwargs={ - "future_queue": self._future_queue, - "cores": cores, - "threads_per_core": threads_per_core, - "gpus_per_task": gpus_per_task, - "cwd": cwd, - "executor": executor, - }, - ) - self._process.start() - if init_function is not None: - self._future_queue.put( - {"init": True, "fn": init_function, "args": (), "kwargs": {}} - ) - cloudpickle_register(ind=3) - - -class PyFluxExecutor(ExecutorBase): - def __init__( - self, - max_workers, - cores_per_worker=1, - threads_per_core=1, - gpus_per_worker=0, - init_function=None, - cwd=None, - sleep_interval=0.1, - executor=None, - ): - super().__init__() - self._process = RaisingThread( - target=executor_broker, - kwargs={ - "future_queue": self._future_queue, - "max_workers": max_workers, - "cores_per_worker": cores_per_worker, - "threads_per_core": threads_per_core, - "gpus_per_worker": gpus_per_worker, - "init_function": init_function, - "cwd": cwd, - "sleep_interval": sleep_interval, - "executor": executor, - }, - ) - self._process.start() - - -def execute_parallel_tasks( - future_queue, - cores, - threads_per_core=1, - gpus_per_task=0, - cwd=None, - executor=None, -): - """ - Execute a single tasks in parallel using the message passing interface (MPI). - - Args: - future_queue (queue.Queue): task queue of dictionary objects which are submitted to the parallel process - cores (int): defines the total number of MPI ranks to use - threads_per_core (int): number of OpenMP threads to be used for each function call - gpus_per_task (int): number of GPUs per MPI rank - defaults to 0 - cwd (str/None): current working directory where the parallel python task is executed - executor (flux.job.FluxExecutor/None): flux executor to submit tasks to - optional - """ - command_lst = [sys.executable] - if cores > 1: - command_lst += [ - os.path.abspath( - os.path.join(__file__, "..", "..", "backend", "mpiexec.py") - ), - ] - else: - command_lst += [ - os.path.abspath(os.path.join(__file__, "..", "..", "backend", "serial.py")), - ] - interface = interface_bootup( - command_lst=command_lst, - cwd=cwd, - cores=cores, - threads_per_core=threads_per_core, - gpus_per_core=gpus_per_task, - executor=executor, - ) - execute_parallel_tasks_loop(interface=interface, future_queue=future_queue) - - -def interface_bootup( - command_lst, - cwd=None, - cores=1, - threads_per_core=1, - gpus_per_core=0, - executor=None, -): - command_lst += [ - "--host", - gethostname(), - ] - connections = FluxPythonInterface( - cwd=cwd, - cores=cores, - threads_per_core=threads_per_core, - gpus_per_core=gpus_per_core, - oversubscribe=False, - executor=executor, - ) - interface = SocketInterface(interface=connections) - command_lst += [ - "--zmqport", - str(interface.bind_to_random_port()), - ] - interface.bootup(command_lst=command_lst) - return interface - - -def executor_broker( - future_queue, - max_workers, - cores_per_worker=1, - threads_per_core=1, - gpus_per_worker=0, - init_function=None, - cwd=None, - sleep_interval=0.1, - executor=None, -): - meta_future_lst = _get_executor_list( - max_workers=max_workers, - cores_per_worker=cores_per_worker, - threads_per_core=threads_per_core, - gpus_per_worker=gpus_per_worker, - init_function=init_function, - cwd=cwd, - executor=executor, - ) - while True: - try: - task_dict = future_queue.get_nowait() - except queue.Empty: - sleep(sleep_interval) - else: - if execute_task_dict(task_dict=task_dict, meta_future_lst=meta_future_lst): - future_queue.task_done() - else: - future_queue.task_done() - break - - -def _get_executor_list( - max_workers, - cores_per_worker=1, - threads_per_core=1, - gpus_per_worker=0, - init_function=None, - cwd=None, - executor=None, -): - return { - get_future_done(): SingleTaskExecutor( - cores=cores_per_worker, - threads_per_core=threads_per_core, - gpus_per_task=int(gpus_per_worker / cores_per_worker), - init_function=init_function, - cwd=cwd, - executor=executor, - ) - for _ in range(max_workers) - } diff --git a/pympipool/interfaces/taskbroker.py b/pympipool/interfaces/taskbroker.py deleted file mode 100644 index 1b962f9d..00000000 --- a/pympipool/interfaces/taskbroker.py +++ /dev/null @@ -1,39 +0,0 @@ -from pympipool.shared.base import ExecutorBase -from pympipool.shared.thread import RaisingThread -from pympipool.shared.broker import executor_broker - - -class HPCExecutor(ExecutorBase): - def __init__( - self, - max_workers, - cores_per_worker=1, - gpus_per_worker=0, - oversubscribe=False, - enable_flux_backend=False, - enable_slurm_backend=False, - init_function=None, - cwd=None, - sleep_interval=0.1, - queue_adapter=None, - queue_adapter_kwargs=None, - ): - super().__init__() - self._process = RaisingThread( - target=executor_broker, - kwargs={ - "future_queue": self._future_queue, - "max_workers": max_workers, - "cores_per_worker": cores_per_worker, - "gpus_per_worker": gpus_per_worker, - "oversubscribe": oversubscribe, - "enable_flux_backend": enable_flux_backend, - "enable_slurm_backend": enable_slurm_backend, - "init_function": init_function, - "cwd": cwd, - "sleep_interval": sleep_interval, - "queue_adapter": queue_adapter, - "queue_adapter_kwargs": queue_adapter_kwargs, - }, - ) - self._process.start() diff --git a/pympipool/interfaces/taskexecutor.py b/pympipool/interfaces/taskexecutor.py deleted file mode 100644 index 8039a4a2..00000000 --- a/pympipool/interfaces/taskexecutor.py +++ /dev/null @@ -1,81 +0,0 @@ -from pympipool.shared.base import ExecutorBase -from pympipool.shared.thread import RaisingThread -from pympipool.shared.taskexecutor import ( - execute_parallel_tasks, - cloudpickle_register, -) - - -class Executor(ExecutorBase): - """ - The pympipool.Executor behaves like the concurrent.futures.Executor but it uses mpi4py to execute parallel tasks. - In contrast to the mpi4py.futures.MPIPoolExecutor the pympipool.Executor can be executed in a serial python process - and does not require the python script to be executed with MPI. Still internally the pympipool.Executor uses the - mpi4py.futures.MPIPoolExecutor, consequently it is primarily an abstraction of its functionality to improve the - usability in particular when used in combination with Jupyter notebooks. - - Args: - cores (int): defines the number of MPI ranks to use for each function call - gpus_per_task (int): number of GPUs per MPI rank - defaults to 0 - oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI only) - default False - enable_flux_backend (bool): use the flux-framework as backend rather than just calling mpiexec - enable_slurm_backend (bool): enable the SLURM queueing system as backend - defaults to False - init_function (None): optional function to preset arguments for functions which are submitted later - cwd (str/None): current working directory where the parallel python task is executed - queue_adapter (pysqa.queueadapter.QueueAdapter): generalized interface to various queuing systems - queue_adapter_kwargs (dict/None): keyword arguments for the submit_job() function of the queue adapter - - Examples: - ``` - >>> import numpy as np - >>> from pympipool import Executor - >>> - >>> def calc(i, j, k): - >>> from mpi4py import MPI - >>> size = MPI.COMM_WORLD.Get_size() - >>> rank = MPI.COMM_WORLD.Get_rank() - >>> return np.array([i, j, k]), size, rank - >>> - >>> def init_k(): - >>> return {"k": 3} - >>> - >>> with Executor(cores=2, init_function=init_k) as p: - >>> fs = p.submit(calc, 2, j=4) - >>> print(fs.result()) - [(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)] - ``` - """ - - def __init__( - self, - cores, - gpus_per_task=0, - oversubscribe=False, - enable_flux_backend=False, - enable_slurm_backend=False, - init_function=None, - cwd=None, - queue_adapter=None, - queue_adapter_kwargs=None, - ): - super().__init__() - self._process = RaisingThread( - target=execute_parallel_tasks, - kwargs={ - "future_queue": self._future_queue, - "cores": cores, - "gpus_per_task": gpus_per_task, - "oversubscribe": oversubscribe, - "enable_flux_backend": enable_flux_backend, - "enable_slurm_backend": enable_slurm_backend, - "cwd": cwd, - "queue_adapter": queue_adapter, - "queue_adapter_kwargs": queue_adapter_kwargs, - }, - ) - self._process.start() - if init_function is not None: - self._future_queue.put( - {"init": True, "fn": init_function, "args": (), "kwargs": {}} - ) - cloudpickle_register(ind=3) diff --git a/pympipool/legacy/__init__.py b/pympipool/legacy/__init__.py index 19f7bf84..24948aa0 100644 --- a/pympipool/legacy/__init__.py +++ b/pympipool/legacy/__init__.py @@ -1,2 +1,2 @@ -from pympipool.legacy.interfaces.executor import PoolExecutor +from pympipool.legacy.interfaces.poolexecutor import PoolExecutor from pympipool.legacy.interfaces.pool import Pool, MPISpawnPool diff --git a/pympipool/legacy/interfaces/pool.py b/pympipool/legacy/interfaces/pool.py index da9b37fc..120e4708 100644 --- a/pympipool/legacy/interfaces/pool.py +++ b/pympipool/legacy/interfaces/pool.py @@ -1,8 +1,8 @@ from abc import ABC -from pympipool.shared.communication import interface_bootup -from pympipool.shared.taskexecutor import cloudpickle_register +from pympipool.shared.executorbase import cloudpickle_register from pympipool.legacy.shared.interface import get_pool_command +from pympipool.legacy.shared.connections import interface_bootup class PoolBase(ABC): diff --git a/pympipool/legacy/interfaces/executor.py b/pympipool/legacy/interfaces/poolexecutor.py similarity index 59% rename from pympipool/legacy/interfaces/executor.py rename to pympipool/legacy/interfaces/poolexecutor.py index 87dfeec9..d94b8b25 100644 --- a/pympipool/legacy/interfaces/executor.py +++ b/pympipool/legacy/interfaces/poolexecutor.py @@ -1,7 +1,10 @@ -from pympipool.shared.base import ExecutorBase +from pympipool.shared.executorbase import cloudpickle_register, ExecutorBase from pympipool.shared.thread import RaisingThread -from pympipool.legacy.shared.interface import execute_serial_tasks -from pympipool.shared.taskexecutor import cloudpickle_register +from pympipool.legacy.shared.connections import interface_bootup +from pympipool.legacy.shared.interface import ( + get_pool_command, + _execute_serial_tasks_loop, +) class PoolExecutor(ExecutorBase): @@ -70,3 +73,50 @@ def __init__( ) self._process.start() cloudpickle_register(ind=3) + + +def execute_serial_tasks( + future_queue, + cores, + gpus_per_task=0, + oversubscribe=False, + enable_flux_backend=False, + enable_slurm_backend=False, + cwd=None, + sleep_interval=0.1, + queue_adapter=None, + queue_adapter_kwargs=None, +): + """ + Execute a single tasks in serial. + + Args: + future_queue (queue.Queue): task queue of dictionary objects which are submitted to the parallel process + cores (int): defines the total number of MPI ranks to use + gpus_per_task (int): number of GPUs per MPI rank - defaults to 0 + oversubscribe (bool): enable of disable the oversubscribe feature of OpenMPI - defaults to False + enable_flux_backend (bool): enable the flux-framework as backend - defaults to False + enable_slurm_backend (bool): enable the SLURM queueing system as backend - defaults to False + cwd (str/None): current working directory where the parallel python task is executed + sleep_interval (float): + queue_adapter (pysqa.queueadapter.QueueAdapter): generalized interface to various queuing systems + queue_adapter_kwargs (dict/None): keyword arguments for the submit_job() function of the queue adapter + """ + future_dict = {} + interface = interface_bootup( + command_lst=get_pool_command(cores_total=cores, ranks_per_task=1)[0], + cwd=cwd, + cores=cores, + gpus_per_core=gpus_per_task, + oversubscribe=oversubscribe, + enable_flux_backend=enable_flux_backend, + enable_slurm_backend=enable_slurm_backend, + queue_adapter=queue_adapter, + queue_adapter_kwargs=queue_adapter_kwargs, + ) + _execute_serial_tasks_loop( + interface=interface, + future_queue=future_queue, + future_dict=future_dict, + sleep_interval=sleep_interval, + ) diff --git a/pympipool/legacy/shared/connections.py b/pympipool/legacy/shared/connections.py new file mode 100644 index 00000000..0b0a3577 --- /dev/null +++ b/pympipool/legacy/shared/connections.py @@ -0,0 +1,181 @@ +from socket import gethostname + +from pympipool.shared.interface import ( + BaseInterface, + MpiExecInterface, + SlurmSubprocessInterface, + SubprocessInterface, + generate_mpiexec_command, + generate_slurm_command, +) +from pympipool.shared.communication import SocketInterface + + +class PysqaInterface(BaseInterface): + def __init__( + self, + cwd=None, + cores=1, + gpus_per_core=0, + oversubscribe=False, + queue_adapter=None, + queue_type=None, + queue_adapter_kwargs=None, + ): + super().__init__( + cwd=cwd, + cores=cores, + gpus_per_core=gpus_per_core, + oversubscribe=oversubscribe, + ) + self._queue_adapter = queue_adapter + self._queue_type = queue_type + self._queue_adapter_kwargs = queue_adapter_kwargs + self._queue_id = None + + def bootup(self, command_lst): + if self._queue_type.lower() == "slurm": + command_prepend_lst = generate_slurm_command( + cores=self._cores, + cwd=self._cwd, + gpus_per_core=self._gpus_per_core, + oversubscribe=self._oversubscribe, + ) + else: + command_prepend_lst = generate_mpiexec_command( + cores=self._cores, + gpus_per_core=self._gpus_per_core, + oversubscribe=self._oversubscribe, + ) + self._queue_id = self._queue_adapter.submit_job( + working_directory=self._cwd, + cores=self._cores, + command=" ".join(command_prepend_lst + command_lst), + **self._queue_adapter_kwargs + ) + + def shutdown(self, wait=True): + self._queue_adapter.delete_job(process_id=self._queue_id) + + def poll(self): + return self._queue_adapter is not None + + +class FluxCmdInterface(SubprocessInterface): + def generate_command(self, command_lst): + command_prepend_lst = [ + "flux", + "run", + "-n", + str(self._cores), + ] + if self._cwd is not None: + command_prepend_lst += [ + "--cwd=" + self._cwd, + ] + if self._threads_per_core > 1: + command_prepend_lst += ["--cores-per-task=" + str(self._threads_per_core)] + if self._gpus_per_core > 0: + command_prepend_lst += ["--gpus-per-task=" + str(self._gpus_per_core)] + return super().generate_command( + command_lst=command_prepend_lst + command_lst, + ) + + +def get_connection_interface( + cwd=None, + cores=1, + gpus_per_core=0, + oversubscribe=False, + enable_flux_backend=False, + enable_slurm_backend=False, + queue_adapter=None, + queue_type=None, + queue_adapter_kwargs=None, +): + """ + Backwards compatibility adapter to get the connection interface + + Args: + cwd (str/None): current working directory where the parallel python task is executed + cores (int): defines the total number of MPI ranks to use + gpus_per_core (int): number of GPUs per MPI rank - defaults to 0 + oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI only) - default False + enable_flux_backend (bool): use the flux-framework as backend rather than just calling mpiexec + enable_slurm_backend (bool): enable the SLURM queueing system as backend - defaults to False + queue_adapter (pysqa.queueadapter.QueueAdapter): generalized interface to various queuing systems + queue_type (str): type of the queuing system + queue_adapter_kwargs (dict/None): keyword arguments for the submit_job() function of the queue adapter + + Returns: + pympipool.shared.interface.BaseInterface: Connection interface + """ + if queue_adapter is not None: + connections = PysqaInterface( + cwd=cwd, + cores=cores, + gpus_per_core=gpus_per_core, + oversubscribe=oversubscribe, + queue_adapter=queue_adapter, + queue_type=queue_type, + queue_adapter_kwargs=queue_adapter_kwargs, + ) + elif enable_flux_backend: + connections = FluxCmdInterface( + cwd=cwd, + cores=cores, + gpus_per_core=gpus_per_core, + oversubscribe=oversubscribe, + ) + elif enable_slurm_backend: + connections = SlurmSubprocessInterface( + cwd=cwd, + cores=cores, + gpus_per_core=gpus_per_core, + oversubscribe=oversubscribe, + ) + else: + connections = MpiExecInterface( + cwd=cwd, + cores=cores, + gpus_per_core=gpus_per_core, + oversubscribe=oversubscribe, + ) + return connections + + +def interface_bootup( + command_lst, + cwd=None, + cores=1, + gpus_per_core=0, + oversubscribe=False, + enable_flux_backend=False, + enable_slurm_backend=False, + queue_adapter=None, + queue_type=None, + queue_adapter_kwargs=None, +): + if enable_flux_backend or enable_slurm_backend or queue_adapter is not None: + command_lst += [ + "--host", + gethostname(), + ] + connections = get_connection_interface( + cwd=cwd, + cores=cores, + gpus_per_core=gpus_per_core, + oversubscribe=oversubscribe, + enable_flux_backend=enable_flux_backend, + enable_slurm_backend=enable_slurm_backend, + queue_adapter=queue_adapter, + queue_type=queue_type, + queue_adapter_kwargs=queue_adapter_kwargs, + ) + interface = SocketInterface(interface=connections) + command_lst += [ + "--zmqport", + str(interface.bind_to_random_port()), + ] + interface.bootup(command_lst=command_lst) + return interface diff --git a/pympipool/legacy/shared/interface.py b/pympipool/legacy/shared/interface.py index c9cd6b9a..edd5e778 100644 --- a/pympipool/legacy/shared/interface.py +++ b/pympipool/legacy/shared/interface.py @@ -3,55 +3,6 @@ import sys import time -from pympipool.shared.communication import interface_bootup - - -def execute_serial_tasks( - future_queue, - cores, - gpus_per_task=0, - oversubscribe=False, - enable_flux_backend=False, - enable_slurm_backend=False, - cwd=None, - sleep_interval=0.1, - queue_adapter=None, - queue_adapter_kwargs=None, -): - """ - Execute a single tasks in serial. - - Args: - future_queue (queue.Queue): task queue of dictionary objects which are submitted to the parallel process - cores (int): defines the total number of MPI ranks to use - gpus_per_task (int): number of GPUs per MPI rank - defaults to 0 - oversubscribe (bool): enable of disable the oversubscribe feature of OpenMPI - defaults to False - enable_flux_backend (bool): enable the flux-framework as backend - defaults to False - enable_slurm_backend (bool): enable the SLURM queueing system as backend - defaults to False - cwd (str/None): current working directory where the parallel python task is executed - sleep_interval (float): - queue_adapter (pysqa.queueadapter.QueueAdapter): generalized interface to various queuing systems - queue_adapter_kwargs (dict/None): keyword arguments for the submit_job() function of the queue adapter - """ - future_dict = {} - interface = interface_bootup( - command_lst=get_pool_command(cores_total=cores, ranks_per_task=1)[0], - cwd=cwd, - cores=cores, - gpus_per_core=gpus_per_task, - oversubscribe=oversubscribe, - enable_flux_backend=enable_flux_backend, - enable_slurm_backend=enable_slurm_backend, - queue_adapter=queue_adapter, - queue_adapter_kwargs=queue_adapter_kwargs, - ) - _execute_serial_tasks_loop( - interface=interface, - future_queue=future_queue, - future_dict=future_dict, - sleep_interval=sleep_interval, - ) - def get_pool_command(cores_total, ranks_per_task=1): executable = os.path.abspath( diff --git a/pympipool/mpi/__init__.py b/pympipool/mpi/__init__.py new file mode 100644 index 00000000..5d4bea6a --- /dev/null +++ b/pympipool/mpi/__init__.py @@ -0,0 +1 @@ +from pympipool.mpi.mpibroker import PyMPIExecutor diff --git a/pympipool/mpi/mpibroker.py b/pympipool/mpi/mpibroker.py new file mode 100644 index 00000000..d879923c --- /dev/null +++ b/pympipool/mpi/mpibroker.py @@ -0,0 +1,79 @@ +from pympipool.shared.executorbase import ( + ExecutorBase, + executor_broker, + get_executor_dict, +) +from pympipool.shared.thread import RaisingThread +from pympipool.mpi.mpitask import PyMPISingleTaskExecutor + + +class PyMPIExecutor(ExecutorBase): + def __init__( + self, + max_workers, + cores_per_worker=1, + threads_per_core=1, + gpus_per_worker=0, + oversubscribe=False, + init_function=None, + cwd=None, + sleep_interval=0.1, + enable_slurm_backend=False, + ): + super().__init__() + if not enable_slurm_backend: + if threads_per_core != 1: + raise ValueError( + "The MPI backend only supports threads_per_core=1, " + + "to manage threads use the SLURM queuing system enable_slurm_backend=True ." + ) + elif gpus_per_worker != 0: + raise ValueError( + "The MPI backend only supports gpus_per_core=0, " + + "to manage GPUs use the SLURM queuing system enable_slurm_backend=True ." + ) + self._process = RaisingThread( + target=_mpi_executor_broker, + kwargs={ + "future_queue": self._future_queue, + "max_workers": max_workers, + "cores_per_worker": cores_per_worker, + "threads_per_core": threads_per_core, + "gpus_per_worker": gpus_per_worker, + "oversubscribe": oversubscribe, + "init_function": init_function, + "cwd": cwd, + "sleep_interval": sleep_interval, + "enable_slurm_backend": enable_slurm_backend, + }, + ) + self._process.start() + + +def _mpi_executor_broker( + future_queue, + max_workers, + cores_per_worker=1, + threads_per_core=1, + gpus_per_worker=0, + oversubscribe=False, + init_function=None, + cwd=None, + sleep_interval=0.1, + enable_slurm_backend=False, +): + executor_broker( + future_queue=future_queue, + meta_future_lst=get_executor_dict( + max_workers=max_workers, + executor_class=PyMPISingleTaskExecutor, + cores=cores_per_worker, + threads_per_core=threads_per_core, + gpus_per_task=int(gpus_per_worker / cores_per_worker), + oversubscribe=oversubscribe, + init_function=init_function, + cwd=cwd, + enable_slurm_backend=enable_slurm_backend, + ), + sleep_interval=sleep_interval, + ) diff --git a/pympipool/mpi/mpitask.py b/pympipool/mpi/mpitask.py new file mode 100644 index 00000000..ec3acd4c --- /dev/null +++ b/pympipool/mpi/mpitask.py @@ -0,0 +1,141 @@ +from pympipool.shared.executorbase import ( + cloudpickle_register, + execute_parallel_tasks_loop, + ExecutorBase, + get_backend_path, +) +from pympipool.shared.thread import RaisingThread +from pympipool.shared.communication import interface_bootup +from pympipool.shared.interface import MpiExecInterface, SlurmSubprocessInterface + + +class PyMPISingleTaskExecutor(ExecutorBase): + """ + The pympipool.Executor behaves like the concurrent.futures.Executor but it uses mpi4py to execute parallel tasks. + In contrast to the mpi4py.futures.MPIPoolExecutor the pympipool.Executor can be executed in a serial python process + and does not require the python script to be executed with MPI. Still internally the pympipool.Executor uses the + mpi4py.futures.MPIPoolExecutor, consequently it is primarily an abstraction of its functionality to improve the + usability in particular when used in combination with Jupyter notebooks. + + Args: + cores (int): defines the number of MPI ranks to use for each function call + threads_per_core (int): number of OpenMP threads to be used for each function call + gpus_per_task (int): number of GPUs per MPI rank - defaults to 0 + oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI only) - default False + init_function (None): optional function to preset arguments for functions which are submitted later + cwd (str/None): current working directory where the parallel python task is executed + enable_slurm_backend (bool): enable the SLURM queueing system as backend - defaults to False + + Examples: + ``` + >>> import numpy as np + >>> from pympipool.mpi.mpitask import PyMPISingleTaskExecutor + >>> + >>> def calc(i, j, k): + >>> from mpi4py import MPI + >>> size = MPI.COMM_WORLD.Get_size() + >>> rank = MPI.COMM_WORLD.Get_rank() + >>> return np.array([i, j, k]), size, rank + >>> + >>> def init_k(): + >>> return {"k": 3} + >>> + >>> with PyMPISingleTaskExecutor(cores=2, init_function=init_k) as p: + >>> fs = p.submit(calc, 2, j=4) + >>> print(fs.result()) + [(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)] + ``` + """ + + def __init__( + self, + cores=1, + threads_per_core=1, + gpus_per_task=0, + oversubscribe=False, + init_function=None, + cwd=None, + enable_slurm_backend=False, + ): + super().__init__() + self._process = RaisingThread( + target=_mpi_execute_parallel_tasks, + kwargs={ + "future_queue": self._future_queue, + "cores": cores, + "threads_per_core": threads_per_core, + "gpus_per_task": gpus_per_task, + "cwd": cwd, + "oversubscribe": oversubscribe, + "enable_slurm_backend": enable_slurm_backend, + }, + ) + self._process.start() + if init_function is not None: + self._future_queue.put( + {"init": True, "fn": init_function, "args": (), "kwargs": {}} + ) + cloudpickle_register(ind=3) + + +def _mpi_execute_parallel_tasks( + future_queue, + cores, + threads_per_core=1, + gpus_per_task=0, + cwd=None, + oversubscribe=False, + enable_slurm_backend=False, +): + """ + Execute a single tasks in parallel using the message passing interface (MPI). + + Args: + future_queue (queue.Queue): task queue of dictionary objects which are submitted to the parallel process + cores (int): defines the total number of MPI ranks to use + threads_per_core (int): number of OpenMP threads to be used for each function call + gpus_per_task (int): number of GPUs per MPI rank - defaults to 0 + cwd (str/None): current working directory where the parallel python task is executed + oversubscribe (bool): enable of disable the oversubscribe feature of OpenMPI - defaults to False + enable_slurm_backend (bool): enable the SLURM queueing system as backend - defaults to False + """ + execute_parallel_tasks_loop( + interface=interface_bootup( + command_lst=get_backend_path(cores=cores), + connections=get_interface( + cores=cores, + threads_per_core=threads_per_core, + gpus_per_task=gpus_per_task, + cwd=cwd, + oversubscribe=oversubscribe, + enable_slurm_backend=enable_slurm_backend, + ), + ), + future_queue=future_queue, + ) + + +def get_interface( + cores=1, + threads_per_core=1, + gpus_per_task=0, + cwd=None, + oversubscribe=False, + enable_slurm_backend=False, +): + if not enable_slurm_backend: + return MpiExecInterface( + cwd=cwd, + cores=cores, + threads_per_core=threads_per_core, + gpus_per_core=gpus_per_task, + oversubscribe=oversubscribe, + ) + else: + return SlurmSubprocessInterface( + cwd=cwd, + cores=cores, + threads_per_core=threads_per_core, + gpus_per_core=gpus_per_task, + oversubscribe=oversubscribe, + ) diff --git a/pympipool/shared/__init__.py b/pympipool/shared/__init__.py index e69de29b..9e765c95 100644 --- a/pympipool/shared/__init__.py +++ b/pympipool/shared/__init__.py @@ -0,0 +1,9 @@ +from pympipool.shared.communication import ( + SocketInterface, + interface_connect, + interface_send, + interface_shutdown, + interface_receive, +) +from pympipool.shared.executorbase import cancel_items_in_queue +from pympipool.shared.thread import RaisingThread diff --git a/pympipool/shared/base.py b/pympipool/shared/base.py deleted file mode 100644 index 30cdfa15..00000000 --- a/pympipool/shared/base.py +++ /dev/null @@ -1,49 +0,0 @@ -from concurrent.futures import Executor as FutureExecutor, Future -import queue - -from pympipool.shared.taskexecutor import cancel_items_in_queue - - -class ExecutorBase(FutureExecutor): - def __init__(self): - self._future_queue = queue.Queue() - self._process = None - - @property - def future_queue(self): - return self._future_queue - - def submit(self, fn, *args, **kwargs): - """Submits a callable to be executed with the given arguments. - - Schedules the callable to be executed as fn(*args, **kwargs) and returns - a Future instance representing the execution of the callable. - - Returns: - A Future representing the given call. - """ - f = Future() - self._future_queue.put({"fn": fn, "args": args, "kwargs": kwargs, "future": f}) - return f - - def shutdown(self, wait=True, *, cancel_futures=False): - """Clean-up the resources associated with the Executor. - - It is safe to call this method several times. Otherwise, no other - methods can be called after this one. - - Args: - wait: If True then shutdown will not return until all running - futures have finished executing and the resources used by the - parallel_executors have been reclaimed. - cancel_futures: If True then shutdown will cancel all pending - futures. Futures that are completed or running will not be - cancelled. - """ - if cancel_futures: - cancel_items_in_queue(que=self._future_queue) - self._future_queue.put({"shutdown": True, "wait": wait}) - self._process.join() - - def __len__(self): - return self._future_queue.qsize() diff --git a/pympipool/shared/broker.py b/pympipool/shared/broker.py deleted file mode 100644 index f2c01091..00000000 --- a/pympipool/shared/broker.py +++ /dev/null @@ -1,93 +0,0 @@ -from concurrent.futures import as_completed, Future -import queue -from time import sleep - -from pympipool.interfaces.taskexecutor import Executor - - -def executor_broker( - future_queue, - max_workers, - cores_per_worker=1, - gpus_per_worker=0, - oversubscribe=False, - enable_flux_backend=False, - enable_slurm_backend=False, - init_function=None, - cwd=None, - sleep_interval=0.1, - queue_adapter=None, - queue_adapter_kwargs=None, -): - meta_future_lst = _get_executor_list( - max_workers=max_workers, - cores_per_worker=cores_per_worker, - gpus_per_worker=gpus_per_worker, - oversubscribe=oversubscribe, - enable_flux_backend=enable_flux_backend, - enable_slurm_backend=enable_slurm_backend, - init_function=init_function, - cwd=cwd, - queue_adapter=queue_adapter, - queue_adapter_kwargs=queue_adapter_kwargs, - ) - while True: - try: - task_dict = future_queue.get_nowait() - except queue.Empty: - sleep(sleep_interval) - else: - if execute_task_dict(task_dict=task_dict, meta_future_lst=meta_future_lst): - future_queue.task_done() - else: - future_queue.task_done() - break - - -def execute_task_dict(task_dict, meta_future_lst): - if "fn" in task_dict.keys(): - meta_future = next(as_completed(meta_future_lst.keys())) - executor = meta_future_lst.pop(meta_future) - executor.future_queue.put(task_dict) - meta_future_lst[task_dict["future"]] = executor - return True - elif "shutdown" in task_dict.keys() and task_dict["shutdown"]: - for executor in meta_future_lst.values(): - executor.shutdown(wait=task_dict["wait"]) - return False - else: - raise ValueError("Unrecognized Task in task_dict: ", task_dict) - - -def _get_executor_list( - max_workers, - cores_per_worker=1, - gpus_per_worker=0, - oversubscribe=False, - enable_flux_backend=False, - enable_slurm_backend=False, - init_function=None, - cwd=None, - queue_adapter=None, - queue_adapter_kwargs=None, -): - return { - get_future_done(): Executor( - cores=cores_per_worker, - gpus_per_task=int(gpus_per_worker / cores_per_worker), - oversubscribe=oversubscribe, - enable_flux_backend=enable_flux_backend, - enable_slurm_backend=enable_slurm_backend, - init_function=init_function, - cwd=cwd, - queue_adapter=queue_adapter, - queue_adapter_kwargs=queue_adapter_kwargs, - ) - for _ in range(max_workers) - } - - -def get_future_done(): - f = Future() - f.set_result(True) - return f diff --git a/pympipool/shared/communication.py b/pympipool/shared/communication.py index 09fecb6c..6272bc96 100644 --- a/pympipool/shared/communication.py +++ b/pympipool/shared/communication.py @@ -1,8 +1,7 @@ -import cloudpickle from socket import gethostname -import zmq -from pympipool.shared.connections import get_connection_interface +import cloudpickle +import zmq class SocketInterface(object): @@ -10,7 +9,7 @@ class SocketInterface(object): The SocketInterface is an abstraction layer on top of the zero message queue. Args: - interface (pympipool.shared.connections.BaseInterface): Interface for starting the parallel process + interface (pympipool.shared.interface.BaseInterface): Interface for starting the parallel process """ def __init__(self, interface=None): @@ -98,32 +97,12 @@ def __del__(self): def interface_bootup( command_lst, - cwd=None, - cores=1, - gpus_per_core=0, - oversubscribe=False, - enable_flux_backend=False, - enable_slurm_backend=False, - queue_adapter=None, - queue_type=None, - queue_adapter_kwargs=None, + connections, ): - if enable_flux_backend or enable_slurm_backend or queue_adapter is not None: - command_lst += [ - "--host", - gethostname(), - ] - connections = get_connection_interface( - cwd=cwd, - cores=cores, - gpus_per_core=gpus_per_core, - oversubscribe=oversubscribe, - enable_flux_backend=enable_flux_backend, - enable_slurm_backend=enable_slurm_backend, - queue_adapter=queue_adapter, - queue_type=queue_type, - queue_adapter_kwargs=queue_adapter_kwargs, - ) + command_lst += [ + "--host", + gethostname(), + ] interface = SocketInterface(interface=connections) command_lst += [ "--zmqport", diff --git a/pympipool/shared/connections.py b/pympipool/shared/connections.py deleted file mode 100644 index de7be7dd..00000000 --- a/pympipool/shared/connections.py +++ /dev/null @@ -1,301 +0,0 @@ -from abc import ABC -import os -import subprocess - - -class BaseInterface(ABC): - def __init__( - self, cwd, cores=1, threads_per_core=1, gpus_per_core=0, oversubscribe=False - ): - self._cwd = cwd - self._cores = cores - self._threads_per_core = threads_per_core - self._gpus_per_core = gpus_per_core - self._oversubscribe = oversubscribe - - def bootup(self, command_lst): - raise NotImplementedError - - def shutdown(self, wait=True): - raise NotImplementedError - - def poll(self): - raise NotImplementedError - - -class SubprocessInterface(BaseInterface): - def __init__( - self, - cwd=None, - cores=1, - threads_per_core=1, - gpus_per_core=0, - oversubscribe=False, - ): - super().__init__( - cwd=cwd, - cores=cores, - threads_per_core=threads_per_core, - gpus_per_core=gpus_per_core, - oversubscribe=oversubscribe, - ) - self._process = None - - def bootup(self, command_lst): - self._process = subprocess.Popen( - args=self.generate_command(command_lst=command_lst), - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - stdin=subprocess.PIPE, - cwd=self._cwd, - ) - - def generate_command(self, command_lst): - return command_lst - - def shutdown(self, wait=True): - self._process.terminate() - self._process.stdout.close() - self._process.stdin.close() - self._process.stderr.close() - if wait: - self._process.wait() - self._process = None - - def poll(self): - return self._process is not None and self._process.poll() is None - - -class MpiExecInterface(SubprocessInterface): - def generate_command(self, command_lst): - command_prepend_lst = generate_mpiexec_command( - cores=self._cores, - gpus_per_core=self._gpus_per_core, - oversubscribe=self._oversubscribe, - ) - return super().generate_command( - command_lst=command_prepend_lst + command_lst, - ) - - -class SlurmSubprocessInterface(SubprocessInterface): - def generate_command(self, command_lst): - command_prepend_lst = generate_slurm_command( - cores=self._cores, - cwd=self._cwd, - threads_per_core=self._threads_per_core, - gpus_per_core=self._gpus_per_core, - oversubscribe=self._oversubscribe, - ) - return super().generate_command( - command_lst=command_prepend_lst + command_lst, - ) - - -class PysqaInterface(BaseInterface): - def __init__( - self, - cwd=None, - cores=1, - gpus_per_core=0, - oversubscribe=False, - queue_adapter=None, - queue_type=None, - queue_adapter_kwargs=None, - ): - super().__init__( - cwd=cwd, - cores=cores, - gpus_per_core=gpus_per_core, - oversubscribe=oversubscribe, - ) - self._queue_adapter = queue_adapter - self._queue_type = queue_type - self._queue_adapter_kwargs = queue_adapter_kwargs - self._queue_id = None - - def bootup(self, command_lst): - if self._queue_type.lower() == "slurm": - command_prepend_lst = generate_slurm_command( - cores=self._cores, - cwd=self._cwd, - gpus_per_core=self._gpus_per_core, - oversubscribe=self._oversubscribe, - ) - else: - command_prepend_lst = generate_mpiexec_command( - cores=self._cores, - gpus_per_core=self._gpus_per_core, - oversubscribe=self._oversubscribe, - ) - self._queue_id = self._queue_adapter.submit_job( - working_directory=self._cwd, - cores=self._cores, - command=" ".join(command_prepend_lst + command_lst), - **self._queue_adapter_kwargs - ) - - def shutdown(self, wait=True): - self._queue_adapter.delete_job(process_id=self._queue_id) - - def poll(self): - return self._queue_adapter is not None - - -class FluxCmdInterface(SubprocessInterface): - def generate_command(self, command_lst): - command_prepend_lst = [ - "flux", - "run", - "-n", - str(self._cores), - ] - if self._cwd is not None: - command_prepend_lst += [ - "--cwd=" + self._cwd, - ] - if self._threads_per_core > 1: - command_prepend_lst += ["--cores-per-task=" + str(self._threads_per_core)] - if self._gpus_per_core > 0: - command_prepend_lst += ["--gpus-per-task=" + str(self._gpus_per_core)] - return super().generate_command( - command_lst=command_prepend_lst + command_lst, - ) - - -class FluxPythonInterface(BaseInterface): - def __init__( - self, - cwd=None, - cores=1, - threads_per_core=1, - gpus_per_core=0, - oversubscribe=False, - executor=None, - ): - super().__init__( - cwd=cwd, - cores=cores, - gpus_per_core=gpus_per_core, - threads_per_core=threads_per_core, - oversubscribe=oversubscribe, - ) - self._executor = executor - self._future = None - - def bootup(self, command_lst): - import flux.job - - if self._oversubscribe: - raise ValueError( - "Oversubscribing is currently not supported for the Flux adapter." - ) - if self._executor is None: - self._executor = flux.job.FluxExecutor() - jobspec = flux.job.JobspecV1.from_command( - command=command_lst, - num_tasks=self._cores, - cores_per_task=self._threads_per_core, - gpus_per_task=self._gpus_per_core, - num_nodes=None, - exclusive=False, - ) - jobspec.environment = dict(os.environ) - if self._cwd is not None: - jobspec.cwd = self._cwd - self._future = self._executor.submit(jobspec) - - def shutdown(self, wait=True): - if self.poll(): - self._future.cancel() - # The flux future objects are not instantly updated, - # still showing running after cancel was called, - # so we wait until the execution is completed. - self._future.result() - - def poll(self): - return self._future is not None and not self._future.done() - - -def generate_slurm_command( - cores, cwd, threads_per_core=1, gpus_per_core=0, oversubscribe=False -): - command_prepend_lst = ["srun", "-n", str(cores), "-D", cwd] - if threads_per_core > 1: - command_prepend_lst += ["--cpus-per-task" + str(threads_per_core)] - if gpus_per_core > 0: - command_prepend_lst += ["--gpus-per-task=" + str(gpus_per_core)] - if oversubscribe: - command_prepend_lst += ["--oversubscribe"] - return command_prepend_lst - - -def generate_mpiexec_command(cores, gpus_per_core=0, oversubscribe=False): - command_prepend_lst = ["mpiexec", "-n", str(cores)] - if oversubscribe: - command_prepend_lst += ["--oversubscribe"] - if gpus_per_core > 0: - raise ValueError() - return command_prepend_lst - - -def get_connection_interface( - cwd=None, - cores=1, - gpus_per_core=0, - oversubscribe=False, - enable_flux_backend=False, - enable_slurm_backend=False, - queue_adapter=None, - queue_type=None, - queue_adapter_kwargs=None, -): - """ - Backwards compatibility adapter to get the connection interface - - Args: - cwd (str/None): current working directory where the parallel python task is executed - cores (int): defines the total number of MPI ranks to use - gpus_per_core (int): number of GPUs per MPI rank - defaults to 0 - oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI only) - default False - enable_flux_backend (bool): use the flux-framework as backend rather than just calling mpiexec - enable_slurm_backend (bool): enable the SLURM queueing system as backend - defaults to False - queue_adapter (pysqa.queueadapter.QueueAdapter): generalized interface to various queuing systems - queue_type (str): type of the queuing system - queue_adapter_kwargs (dict/None): keyword arguments for the submit_job() function of the queue adapter - - Returns: - pympipool.shared.connections.BaseInterface: Connection interface - """ - if queue_adapter is not None: - connections = PysqaInterface( - cwd=cwd, - cores=cores, - gpus_per_core=gpus_per_core, - oversubscribe=oversubscribe, - queue_adapter=queue_adapter, - queue_type=queue_type, - queue_adapter_kwargs=queue_adapter_kwargs, - ) - elif enable_flux_backend: - connections = FluxCmdInterface( - cwd=cwd, - cores=cores, - gpus_per_core=gpus_per_core, - oversubscribe=oversubscribe, - ) - elif enable_slurm_backend: - connections = SlurmSubprocessInterface( - cwd=cwd, - cores=cores, - gpus_per_core=gpus_per_core, - oversubscribe=oversubscribe, - ) - else: - connections = MpiExecInterface( - cwd=cwd, - cores=cores, - gpus_per_core=gpus_per_core, - oversubscribe=oversubscribe, - ) - return connections diff --git a/pympipool/shared/executorbase.py b/pympipool/shared/executorbase.py new file mode 100644 index 00000000..c2eea3f0 --- /dev/null +++ b/pympipool/shared/executorbase.py @@ -0,0 +1,179 @@ +from concurrent.futures import ( + as_completed, + Executor as FutureExecutor, + Future, +) +import inspect +import os +import queue +import sys +from time import sleep + +import cloudpickle + + +class ExecutorBase(FutureExecutor): + def __init__(self): + self._future_queue = queue.Queue() + self._process = None + + @property + def future_queue(self): + return self._future_queue + + def submit(self, fn, *args, **kwargs): + """Submits a callable to be executed with the given arguments. + + Schedules the callable to be executed as fn(*args, **kwargs) and returns + a Future instance representing the execution of the callable. + + Returns: + A Future representing the given call. + """ + f = Future() + self._future_queue.put({"fn": fn, "args": args, "kwargs": kwargs, "future": f}) + return f + + def shutdown(self, wait=True, *, cancel_futures=False): + """Clean-up the resources associated with the Executor. + + It is safe to call this method several times. Otherwise, no other + methods can be called after this one. + + Args: + wait: If True then shutdown will not return until all running + futures have finished executing and the resources used by the + parallel_executors have been reclaimed. + cancel_futures: If True then shutdown will cancel all pending + futures. Futures that are completed or running will not be + cancelled. + """ + if cancel_futures: + cancel_items_in_queue(que=self._future_queue) + self._future_queue.put({"shutdown": True, "wait": wait}) + self._process.join() + + def __len__(self): + return self._future_queue.qsize() + + +def cancel_items_in_queue(que): + """ + Cancel items which are still waiting in the queue. If the executor is busy tasks remain in the queue, so the future + objects have to be cancelled when the executor shuts down. + + Args: + que (queue.Queue): Queue with task objects which should be executed + """ + while True: + try: + item = que.get_nowait() + if isinstance(item, dict) and "future" in item.keys(): + item["future"].cancel() + que.task_done() + except queue.Empty: + break + + +def cloudpickle_register(ind=2): + """ + Cloudpickle can either pickle by value or pickle by reference. The functions which are communicated have to + be pickled by value rather than by reference, so the module which calls the map function is pickled by value. + https://github.com/cloudpipe/cloudpickle#overriding-pickles-serialization-mechanism-for-importable-constructs + inspect can help to find the module which is calling pympipool + https://docs.python.org/3/library/inspect.html + to learn more about inspect another good read is: + http://pymotw.com/2/inspect/index.html#module-inspect + 1 refers to 1 level higher than the map function + + Args: + ind (int): index of the level at which pickle by value starts while for the rest pickle by reference is used + """ + try: # When executed in a jupyter notebook this can cause a ValueError - in this case we just ignore it. + cloudpickle.register_pickle_by_value(inspect.getmodule(inspect.stack()[ind][0])) + except IndexError: + cloudpickle_register(ind=ind - 1) + except ValueError: + pass + + +def execute_parallel_tasks_loop(interface, future_queue): + while True: + task_dict = future_queue.get() + if "shutdown" in task_dict.keys() and task_dict["shutdown"]: + interface.shutdown(wait=task_dict["wait"]) + future_queue.task_done() + break + elif "fn" in task_dict.keys() and "future" in task_dict.keys(): + f = task_dict.pop("future") + if f.set_running_or_notify_cancel(): + try: + f.set_result(interface.send_and_receive_dict(input_dict=task_dict)) + except Exception as thread_exception: + interface.shutdown(wait=True) + future_queue.task_done() + f.set_exception(exception=thread_exception) + raise thread_exception + else: + future_queue.task_done() + elif "fn" in task_dict.keys() and "init" in task_dict.keys(): + interface.send_dict(input_dict=task_dict) + future_queue.task_done() + + +def executor_broker( + future_queue, + meta_future_lst, + sleep_interval=0.1, +): + while True: + try: + task_dict = future_queue.get_nowait() + except queue.Empty: + sleep(sleep_interval) + else: + if execute_task_dict(task_dict=task_dict, meta_future_lst=meta_future_lst): + future_queue.task_done() + else: + future_queue.task_done() + break + + +def execute_task_dict(task_dict, meta_future_lst): + if "fn" in task_dict.keys(): + meta_future = next(as_completed(meta_future_lst.keys())) + executor = meta_future_lst.pop(meta_future) + executor.future_queue.put(task_dict) + meta_future_lst[task_dict["future"]] = executor + return True + elif "shutdown" in task_dict.keys() and task_dict["shutdown"]: + for executor in meta_future_lst.values(): + executor.shutdown(wait=task_dict["wait"]) + return False + else: + raise ValueError("Unrecognized Task in task_dict: ", task_dict) + + +def get_backend_path(cores): + command_lst = [sys.executable] + if cores > 1: + command_lst += [ + os.path.abspath( + os.path.join(__file__, "..", "..", "backend", "mpiexec.py") + ), + ] + else: + command_lst += [ + os.path.abspath(os.path.join(__file__, "..", "..", "backend", "serial.py")), + ] + return command_lst + + +def get_executor_dict(max_workers, executor_class, **kwargs): + return {get_future_done(): executor_class(**kwargs) for _ in range(max_workers)} + + +def get_future_done(): + f = Future() + f.set_result(True) + return f diff --git a/pympipool/shared/interface.py b/pympipool/shared/interface.py new file mode 100644 index 00000000..af5a75c9 --- /dev/null +++ b/pympipool/shared/interface.py @@ -0,0 +1,113 @@ +from abc import ABC +import subprocess + + +class BaseInterface(ABC): + def __init__( + self, cwd, cores=1, threads_per_core=1, gpus_per_core=0, oversubscribe=False + ): + self._cwd = cwd + self._cores = cores + self._threads_per_core = threads_per_core + self._gpus_per_core = gpus_per_core + self._oversubscribe = oversubscribe + + def bootup(self, command_lst): + raise NotImplementedError + + def shutdown(self, wait=True): + raise NotImplementedError + + def poll(self): + raise NotImplementedError + + +class SubprocessInterface(BaseInterface): + def __init__( + self, + cwd=None, + cores=1, + threads_per_core=1, + gpus_per_core=0, + oversubscribe=False, + ): + super().__init__( + cwd=cwd, + cores=cores, + threads_per_core=threads_per_core, + gpus_per_core=gpus_per_core, + oversubscribe=oversubscribe, + ) + self._process = None + + def bootup(self, command_lst): + self._process = subprocess.Popen( + args=self.generate_command(command_lst=command_lst), + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + stdin=subprocess.PIPE, + cwd=self._cwd, + ) + + def generate_command(self, command_lst): + return command_lst + + def shutdown(self, wait=True): + self._process.terminate() + self._process.stdout.close() + self._process.stdin.close() + self._process.stderr.close() + if wait: + self._process.wait() + self._process = None + + def poll(self): + return self._process is not None and self._process.poll() is None + + +class MpiExecInterface(SubprocessInterface): + def generate_command(self, command_lst): + command_prepend_lst = generate_mpiexec_command( + cores=self._cores, + gpus_per_core=self._gpus_per_core, + oversubscribe=self._oversubscribe, + ) + return super().generate_command( + command_lst=command_prepend_lst + command_lst, + ) + + +class SlurmSubprocessInterface(SubprocessInterface): + def generate_command(self, command_lst): + command_prepend_lst = generate_slurm_command( + cores=self._cores, + cwd=self._cwd, + threads_per_core=self._threads_per_core, + gpus_per_core=self._gpus_per_core, + oversubscribe=self._oversubscribe, + ) + return super().generate_command( + command_lst=command_prepend_lst + command_lst, + ) + + +def generate_mpiexec_command(cores, gpus_per_core=0, oversubscribe=False): + command_prepend_lst = ["mpiexec", "-n", str(cores)] + if oversubscribe: + command_prepend_lst += ["--oversubscribe"] + if gpus_per_core > 0: + raise ValueError() + return command_prepend_lst + + +def generate_slurm_command( + cores, cwd, threads_per_core=1, gpus_per_core=0, oversubscribe=False +): + command_prepend_lst = ["srun", "-n", str(cores), "-D", cwd] + if threads_per_core > 1: + command_prepend_lst += ["--cpus-per-task" + str(threads_per_core)] + if gpus_per_core > 0: + command_prepend_lst += ["--gpus-per-task=" + str(gpus_per_core)] + if oversubscribe: + command_prepend_lst += ["--oversubscribe"] + return command_prepend_lst diff --git a/pympipool/shared/taskexecutor.py b/pympipool/shared/taskexecutor.py deleted file mode 100644 index 4bffcd2b..00000000 --- a/pympipool/shared/taskexecutor.py +++ /dev/null @@ -1,115 +0,0 @@ -import inspect -import os -import queue -import sys - -import cloudpickle - -from pympipool.shared.communication import interface_bootup - - -def cancel_items_in_queue(que): - """ - Cancel items which are still waiting in the queue. If the executor is busy tasks remain in the queue, so the future - objects have to be cancelled when the executor shuts down. - - Args: - que (queue.Queue): Queue with task objects which should be executed - """ - while True: - try: - item = que.get_nowait() - if isinstance(item, dict) and "future" in item.keys(): - item["future"].cancel() - que.task_done() - except queue.Empty: - break - - -def cloudpickle_register(ind=2): - """ - Cloudpickle can either pickle by value or pickle by reference. The functions which are communicated have to - be pickled by value rather than by reference, so the module which calls the map function is pickled by value. - https://github.com/cloudpipe/cloudpickle#overriding-pickles-serialization-mechanism-for-importable-constructs - inspect can help to find the module which is calling pympipool - https://docs.python.org/3/library/inspect.html - to learn more about inspect another good read is: - http://pymotw.com/2/inspect/index.html#module-inspect - 1 refers to 1 level higher than the map function - - Args: - ind (int): index of the level at which pickle by value starts while for the rest pickle by reference is used - """ - try: # When executed in a jupyter notebook this can cause a ValueError - in this case we just ignore it. - cloudpickle.register_pickle_by_value(inspect.getmodule(inspect.stack()[ind][0])) - except IndexError: - cloudpickle_register(ind=ind - 1) - except ValueError: - pass - - -def execute_parallel_tasks( - future_queue, - cores, - gpus_per_task=0, - oversubscribe=False, - enable_flux_backend=False, - enable_slurm_backend=False, - cwd=None, - queue_adapter=None, - queue_adapter_kwargs=None, -): - """ - Execute a single tasks in parallel using the message passing interface (MPI). - - Args: - future_queue (queue.Queue): task queue of dictionary objects which are submitted to the parallel process - cores (int): defines the total number of MPI ranks to use - gpus_per_task (int): number of GPUs per MPI rank - defaults to 0 - oversubscribe (bool): enable of disable the oversubscribe feature of OpenMPI - defaults to False - enable_flux_backend (bool): enable the flux-framework as backend - defaults to False - enable_slurm_backend (bool): enable the SLURM queueing system as backend - defaults to False - cwd (str/None): current working directory where the parallel python task is executed - queue_adapter (pysqa.queueadapter.QueueAdapter): generalized interface to various queuing systems - queue_adapter_kwargs (dict/None): keyword arguments for the submit_job() function of the queue adapter - """ - command_lst = [ - sys.executable, - os.path.abspath(os.path.join(__file__, "..", "..", "backend", "mpiexec.py")), - ] - interface = interface_bootup( - command_lst=command_lst, - cwd=cwd, - cores=cores, - gpus_per_core=gpus_per_task, - oversubscribe=oversubscribe, - enable_flux_backend=enable_flux_backend, - enable_slurm_backend=enable_slurm_backend, - queue_adapter=queue_adapter, - queue_adapter_kwargs=queue_adapter_kwargs, - ) - execute_parallel_tasks_loop(interface=interface, future_queue=future_queue) - - -def execute_parallel_tasks_loop(interface, future_queue): - while True: - task_dict = future_queue.get() - if "shutdown" in task_dict.keys() and task_dict["shutdown"]: - interface.shutdown(wait=task_dict["wait"]) - future_queue.task_done() - break - elif "fn" in task_dict.keys() and "future" in task_dict.keys(): - f = task_dict.pop("future") - if f.set_running_or_notify_cancel(): - try: - f.set_result(interface.send_and_receive_dict(input_dict=task_dict)) - except Exception as thread_exeception: - interface.shutdown(wait=True) - future_queue.task_done() - f.set_exception(exception=thread_exeception) - raise thread_exeception - else: - future_queue.task_done() - elif "fn" in task_dict.keys() and "init" in task_dict.keys(): - interface.send_dict(input_dict=task_dict) - future_queue.task_done() diff --git a/tests/test_communicator_split.py b/tests/test_communicator_split.py index 9e09f65b..44ab60ed 100644 --- a/tests/test_communicator_split.py +++ b/tests/test_communicator_split.py @@ -1,9 +1,10 @@ import unittest -from pympipool import MPISpawnPool +from pympipool.legacy.interfaces.pool import MPISpawnPool def get_ranks(input_parameter, comm=None): from mpi4py import MPI + size = MPI.COMM_WORLD.Get_size() rank = MPI.COMM_WORLD.Get_rank() if comm is not None: @@ -17,6 +18,7 @@ def get_ranks(input_parameter, comm=None): def get_ranks_multi_input(input_parameter1, input_parameter2, comm=None): from mpi4py import MPI + size = MPI.COMM_WORLD.Get_size() rank = MPI.COMM_WORLD.Get_rank() if comm is not None: @@ -47,8 +49,7 @@ def test_map_parallel(self): def test_starmap_serial(self): with MPISpawnPool(max_ranks=2, ranks_per_task=1) as p: output = p.starmap( - func=get_ranks_multi_input, - iterable=[[1, 1], [2, 2], [3, 3]] + func=get_ranks_multi_input, iterable=[[1, 1], [2, 2], [3, 3]] ) self.assertEqual(output[0], (2, 1, 0, 0, 1, 1)) self.assertEqual(output[1], (2, 1, 0, 0, 2, 2)) @@ -57,8 +58,7 @@ def test_starmap_serial(self): def test_starmap_parallel(self): with MPISpawnPool(max_ranks=2, ranks_per_task=2) as p: output = p.starmap( - func=get_ranks_multi_input, - iterable=[[1, 1], [2, 2], [3, 3], [4, 4]] + func=get_ranks_multi_input, iterable=[[1, 1], [2, 2], [3, 3], [4, 4]] ) self.assertEqual(output[0][::2], (2, 2, 1)) self.assertEqual(output[1][::2], (2, 2, 2)) diff --git a/tests/test_connection.py b/tests/test_connection.py index c5abd005..5533eef8 100644 --- a/tests/test_connection.py +++ b/tests/test_connection.py @@ -1,12 +1,10 @@ -import os import unittest -from pympipool.shared.connections import ( - BaseInterface, +from pympipool.shared.interface import BaseInterface, SlurmSubprocessInterface +from pympipool.legacy.shared.connections import ( MpiExecInterface, - SlurmSubprocessInterface, PysqaInterface, FluxCmdInterface, - get_connection_interface + get_connection_interface, ) @@ -23,10 +21,7 @@ def __init__(self, cwd, cores=1, gpus_per_core=0, oversubscribe=False): class TestExecutor(unittest.TestCase): def setUp(self): self.interface = Interface( - cwd=None, - cores=1, - gpus_per_core=0, - oversubscribe=False + cwd=None, cores=1, gpus_per_core=0, oversubscribe=False ) def test_bootup(self): diff --git a/tests/test_executor.py b/tests/test_executor.py index e87a742d..eb88a26c 100644 --- a/tests/test_executor.py +++ b/tests/test_executor.py @@ -29,7 +29,7 @@ def test_exec_funct_single_core_starmap(self): lst=[[1, 1], [2, 2]], cores_per_task=1, chunksize=1, - map_flag=False + map_flag=False, ) def test_parse_socket_communication_close(self): @@ -38,7 +38,7 @@ def test_parse_socket_communication_close(self): executor=executor, input_dict={"shutdown": True, "wait": True}, future_dict={}, - cores_per_task=1 + cores_per_task=1, ) self.assertEqual(output, {"exit": True}) @@ -46,9 +46,14 @@ def test_parse_socket_communication_execute(self): with ThreadPoolExecutor(max_workers=1) as executor: output = parse_socket_communication( executor=executor, - input_dict={"fn": sum, "iterable": [[1, 1]], "chunksize": 1, "map": True}, + input_dict={ + "fn": sum, + "iterable": [[1, 1]], + "chunksize": 1, + "map": True, + }, future_dict={}, - cores_per_task=1 + cores_per_task=1, ) self.assertEqual(output, {"result": [2]}) @@ -56,9 +61,14 @@ def test_parse_socket_communication_error(self): with ThreadPoolExecutor(max_workers=1) as executor: output = parse_socket_communication( executor=executor, - input_dict={"fn": sum, "iterable": [["a", "b"]], "chunksize": 1, "map": True}, + input_dict={ + "fn": sum, + "iterable": [["a", "b"]], + "chunksize": 1, + "map": True, + }, future_dict={}, - cores_per_task=1 + cores_per_task=1, ) self.assertEqual(output["error_type"], "") @@ -69,9 +79,9 @@ def test_parse_socket_communication_submit_args(self): executor=executor, input_dict={"fn": sum, "args": [[1, 1]], "kwargs": {}}, future_dict=future_dict, - cores_per_task=1 + cores_per_task=1, ) - future = future_dict[output['result']] + future = future_dict[output["result"]] self.assertEqual(future.result(), 2) def test_parse_socket_communication_submit_kwargs(self): @@ -79,11 +89,15 @@ def test_parse_socket_communication_submit_kwargs(self): with ThreadPoolExecutor(max_workers=1) as executor: output = parse_socket_communication( executor=executor, - input_dict={"fn": function_multi_args, "args": (), "kwargs": {"a": 1, "b": 2}}, + input_dict={ + "fn": function_multi_args, + "args": (), + "kwargs": {"a": 1, "b": 2}, + }, future_dict=future_dict, - cores_per_task=1 + cores_per_task=1, ) - future = future_dict[output['result']] + future = future_dict[output["result"]] self.assertEqual(future.result(), 3) def test_parse_socket_communication_submit_both(self): @@ -93,9 +107,9 @@ def test_parse_socket_communication_submit_both(self): executor=executor, input_dict={"fn": function_multi_args, "args": [1], "kwargs": {"b": 2}}, future_dict=future_dict, - cores_per_task=1 + cores_per_task=1, ) - future = future_dict[output['result']] + future = future_dict[output["result"]] self.assertEqual(future.result(), 3) def test_parse_socket_communication_update(self): @@ -105,14 +119,14 @@ def test_parse_socket_communication_update(self): executor=executor, input_dict={"fn": sum, "args": [[1, 1]], "kwargs": {}}, future_dict=future_dict, - cores_per_task=1 + cores_per_task=1, ) future_hash = output["result"] result = parse_socket_communication( executor=executor, input_dict={"update": [future_hash]}, future_dict=future_dict, - cores_per_task=1 + cores_per_task=1, ) self.assertEqual(result, {"result": {future_hash: 2}}) @@ -123,20 +137,18 @@ def test_parse_socket_communication_cancel(self): executor=executor, input_dict={"fn": sum, "args": [[1, 1]], "kwargs": {}}, future_dict=future_dict, - cores_per_task=1 + cores_per_task=1, ) future_hash = output["result"] result = parse_socket_communication( executor=executor, input_dict={"cancel": [future_hash]}, future_dict=future_dict, - cores_per_task=1 + cores_per_task=1, ) self.assertEqual(result, {"result": True}) def test_funct_call_default(self): - self.assertEqual(call_funct(input_dict={ - "fn": sum, - "args": [[1, 2, 3]], - "kwargs": {} - }), 6) + self.assertEqual( + call_funct(input_dict={"fn": sum, "args": [[1, 2, 3]], "kwargs": {}}), 6 + ) diff --git a/tests/test_flux.py b/tests/test_flux.py index 3b54ffbf..9ebd732c 100644 --- a/tests/test_flux.py +++ b/tests/test_flux.py @@ -4,12 +4,17 @@ import numpy as np import unittest -from pympipool.shared.taskexecutor import cloudpickle_register -from pympipool.interfaces.fluxbroker import SingleTaskExecutor, PyFluxExecutor, execute_parallel_tasks, executor_broker +from pympipool.shared.executorbase import cloudpickle_register try: - from flux.job import FluxExecutor + import flux.job + from pympipool.flux.fluxbroker import PyFluxExecutor, _flux_executor_broker + from pympipool.flux.fluxtask import ( + _flux_execute_parallel_tasks, + PyFluxSingleTaskExecutor, + ) + skip_flux_test = False except ImportError: skip_flux_test = True @@ -21,6 +26,7 @@ def calc(i): def mpi_funct(i): from mpi4py import MPI + size = MPI.COMM_WORLD.Get_size() rank = MPI.COMM_WORLD.Get_rank() return i, size, rank @@ -34,10 +40,12 @@ def set_global(): return {"memory": np.array([5])} -@unittest.skipIf(skip_flux_test, "Flux is not installed, so the flux tests are skipped.") +@unittest.skipIf( + skip_flux_test, "Flux is not installed, so the flux tests are skipped." +) class TestFlux(unittest.TestCase): def setUp(self): - self.executor = FluxExecutor() + self.executor = flux.job.FluxExecutor() def test_flux_executor_serial(self): with PyFluxExecutor(max_workers=2, executor=self.executor) as exe: @@ -49,7 +57,9 @@ def test_flux_executor_serial(self): self.assertTrue(fs_2.done()) def test_flux_executor_threads(self): - with PyFluxExecutor(max_workers=1, threads_per_core=2, executor=self.executor) as exe: + with PyFluxExecutor( + max_workers=1, threads_per_core=2, executor=self.executor + ) as exe: fs_1 = exe.submit(calc, 1) fs_2 = exe.submit(calc, 2) self.assertEqual(fs_1.result(), 1) @@ -58,47 +68,47 @@ def test_flux_executor_threads(self): self.assertTrue(fs_2.done()) def test_flux_executor_parallel(self): - with PyFluxExecutor(max_workers=1, cores_per_worker=2, executor=self.executor) as exe: + with PyFluxExecutor( + max_workers=1, cores_per_worker=2, executor=self.executor + ) as exe: fs_1 = exe.submit(mpi_funct, 1) self.assertEqual(fs_1.result(), [(1, 2, 0), (1, 2, 1)]) self.assertTrue(fs_1.done()) def test_single_task(self): - with SingleTaskExecutor(cores=2, executor=self.executor) as p: + with PyFluxSingleTaskExecutor(cores=2, executor=self.executor) as p: output = p.map(mpi_funct, [1, 2, 3]) - self.assertEqual(list(output), [[(1, 2, 0), (1, 2, 1)], [(2, 2, 0), (2, 2, 1)], [(3, 2, 0), (3, 2, 1)]]) + self.assertEqual( + list(output), + [[(1, 2, 0), (1, 2, 1)], [(2, 2, 0), (2, 2, 1)], [(3, 2, 0), (3, 2, 1)]], + ) def test_execute_task(self): f = Future() q = Queue() - q.put({"fn": calc, 'args': (), "kwargs": {"i": 2}, "future": f}) + q.put({"fn": calc, "args": (), "kwargs": {"i": 2}, "future": f}) q.put({"shutdown": True, "wait": True}) cloudpickle_register(ind=1) - execute_parallel_tasks( - future_queue=q, - cores=1, - executor=self.executor - ) + _flux_execute_parallel_tasks(future_queue=q, cores=1, executor=self.executor) self.assertEqual(f.result(), 2) q.join() def test_execute_task_threads(self): f = Future() q = Queue() - q.put({"fn": calc, 'args': (), "kwargs": {"i": 2}, "future": f}) + q.put({"fn": calc, "args": (), "kwargs": {"i": 2}, "future": f}) q.put({"shutdown": True, "wait": True}) cloudpickle_register(ind=1) - execute_parallel_tasks( - future_queue=q, - cores=1, - threads_per_core=1, - executor=self.executor + _flux_execute_parallel_tasks( + future_queue=q, cores=1, threads_per_core=1, executor=self.executor ) self.assertEqual(f.result(), 2) q.join() def test_internal_memory(self): - with SingleTaskExecutor(cores=1, init_function=set_global, executor=self.executor) as p: + with PyFluxSingleTaskExecutor( + cores=1, init_function=set_global, executor=self.executor + ) as p: f = p.submit(get_global) self.assertFalse(f.done()) self.assertEqual(f.result(), np.array([5])) @@ -109,7 +119,7 @@ def test_executor_broker(self): f = Future() q.put({"fn": calc, "args": (1,), "kwargs": {}, "future": f}) q.put({"shutdown": True, "wait": True}) - executor_broker(future_queue=q, max_workers=1, executor=self.executor) + _flux_executor_broker(future_queue=q, max_workers=1, executor=self.executor) self.assertTrue(f.done()) self.assertEqual(f.result(), 1) q.join() @@ -119,7 +129,9 @@ def test_executor_broker_threads(self): f = Future() q.put({"fn": calc, "args": (1,), "kwargs": {}, "future": f}) q.put({"shutdown": True, "wait": True}) - executor_broker(future_queue=q, max_workers=1, threads_per_core=2, executor=self.executor) + _flux_executor_broker( + future_queue=q, max_workers=1, threads_per_core=2, executor=self.executor + ) self.assertTrue(f.done()) self.assertEqual(f.result(), 1) q.join() diff --git a/tests/test_future.py b/tests/test_future.py index e7a9980d..cd96441e 100644 --- a/tests/test_future.py +++ b/tests/test_future.py @@ -1,17 +1,17 @@ import numpy as np import unittest from time import sleep -from pympipool import Executor +from pympipool.mpi.mpitask import PyMPISingleTaskExecutor from concurrent.futures import Future def calc(i): - return np.array(i ** 2) + return np.array(i**2) class TestFuture(unittest.TestCase): def test_pool_serial(self): - with Executor(cores=1) as p: + with PyMPISingleTaskExecutor(cores=1) as p: output = p.submit(calc, i=2) self.assertTrue(isinstance(output, Future)) self.assertFalse(output.done()) @@ -20,7 +20,7 @@ def test_pool_serial(self): self.assertEqual(output.result(), np.array(4)) def test_pool_serial_multi_core(self): - with Executor(cores=2) as p: + with PyMPISingleTaskExecutor(cores=2) as p: output = p.submit(calc, i=2) self.assertTrue(isinstance(output, Future)) self.assertFalse(output.done()) diff --git a/tests/test_interface.py b/tests/test_interface.py index 00ef3d05..61fb6a97 100644 --- a/tests/test_interface.py +++ b/tests/test_interface.py @@ -4,31 +4,36 @@ import numpy as np import unittest from pympipool.shared.communication import SocketInterface -from pympipool.shared.taskexecutor import cloudpickle_register -from pympipool.shared.connections import MpiExecInterface +from pympipool.shared.executorbase import cloudpickle_register +from pympipool.legacy.shared.connections import MpiExecInterface def calc(i): - return np.array(i ** 2) + return np.array(i**2) class TestInterface(unittest.TestCase): def test_interface(self): cloudpickle_register(ind=1) - task_dict = {"fn": calc, 'args': (), "kwargs": {"i": 2}} + task_dict = {"fn": calc, "args": (), "kwargs": {"i": 2}} interface = SocketInterface( interface=MpiExecInterface( - cwd=None, - cores=1, - gpus_per_core=0, - oversubscribe=False + cwd=None, cores=1, gpus_per_core=0, oversubscribe=False ) ) - interface.bootup(command_lst=[ - sys.executable, - os.path.abspath(os.path.join(__file__, "..", "..", "pympipool", "backend", "mpiexec.py")), - "--zmqport", - str(interface.bind_to_random_port()), - ]) - self.assertEqual(interface.send_and_receive_dict(input_dict=task_dict), np.array(4)) - interface.shutdown(wait=True) \ No newline at end of file + interface.bootup( + command_lst=[ + sys.executable, + os.path.abspath( + os.path.join( + __file__, "..", "..", "pympipool", "backend", "mpiexec.py" + ) + ), + "--zmqport", + str(interface.bind_to_random_port()), + ] + ) + self.assertEqual( + interface.send_and_receive_dict(input_dict=task_dict), np.array(4) + ) + interface.shutdown(wait=True) diff --git a/tests/test_meta.py b/tests/test_meta.py index d4351a44..e11d13c1 100644 --- a/tests/test_meta.py +++ b/tests/test_meta.py @@ -1,14 +1,16 @@ from concurrent.futures import as_completed, Future, Executor from queue import Queue import unittest -from pympipool.shared.broker import ( - executor_broker, +from pympipool.shared.executorbase import ( execute_task_dict, + get_executor_dict, get_future_done, - _get_executor_list, ) - -from pympipool.interfaces.taskbroker import HPCExecutor +from pympipool.mpi.mpitask import PyMPISingleTaskExecutor +from pympipool.mpi.mpibroker import ( + PyMPIExecutor, + _mpi_executor_broker, +) def calc(i): @@ -17,6 +19,7 @@ def calc(i): def mpi_funct(i): from mpi4py import MPI + size = MPI.COMM_WORLD.Get_size() rank = MPI.COMM_WORLD.Get_rank() return i, size, rank @@ -31,7 +34,10 @@ def test_get_future_done(self): class TestMetaExecutorFuture(unittest.TestCase): def test_meta_executor_future(self): - meta_future = _get_executor_list(max_workers=1) + meta_future = get_executor_dict( + max_workers=1, + executor_class=PyMPISingleTaskExecutor, + ) future_obj = list(meta_future.keys())[0] executor_obj = list(meta_future.values())[0] self.assertTrue(isinstance(future_obj, Future)) @@ -41,21 +47,31 @@ def test_meta_executor_future(self): executor_obj.shutdown(wait=True) def test_execute_task_dict(self): - meta_future_lst = _get_executor_list(max_workers=1) + meta_future_lst = get_executor_dict( + max_workers=1, + executor_class=PyMPISingleTaskExecutor, + ) f = Future() - self.assertTrue(execute_task_dict( - task_dict={"fn": calc, "args": (1,), "kwargs": {}, "future": f}, - meta_future_lst=meta_future_lst - )) + self.assertTrue( + execute_task_dict( + task_dict={"fn": calc, "args": (1,), "kwargs": {}, "future": f}, + meta_future_lst=meta_future_lst, + ) + ) self.assertEqual(f.result(), 1) self.assertTrue(f.done()) - self.assertFalse(execute_task_dict( - task_dict={"shutdown": True, "wait": True}, - meta_future_lst=meta_future_lst - )) + self.assertFalse( + execute_task_dict( + task_dict={"shutdown": True, "wait": True}, + meta_future_lst=meta_future_lst, + ) + ) def test_execute_task_dict_error(self): - meta_future_lst = _get_executor_list(max_workers=1) + meta_future_lst = get_executor_dict( + max_workers=1, + executor_class=PyMPISingleTaskExecutor, + ) with self.assertRaises(ValueError): execute_task_dict(task_dict={}, meta_future_lst=meta_future_lst) list(meta_future_lst.values())[0].shutdown(wait=True) @@ -65,7 +81,7 @@ def test_executor_broker(self): f = Future() q.put({"fn": calc, "args": (1,), "kwargs": {}, "future": f}) q.put({"shutdown": True, "wait": True}) - executor_broker(future_queue=q, max_workers=1) + _mpi_executor_broker(future_queue=q, max_workers=1) self.assertTrue(f.done()) self.assertEqual(f.result(), 1) q.join() @@ -73,7 +89,7 @@ def test_executor_broker(self): class TestMetaExecutor(unittest.TestCase): def test_meta_executor_serial(self): - with HPCExecutor(max_workers=2) as exe: + with PyMPIExecutor(max_workers=2) as exe: fs_1 = exe.submit(calc, 1) fs_2 = exe.submit(calc, 2) self.assertEqual(fs_1.result(), 1) @@ -82,7 +98,7 @@ def test_meta_executor_serial(self): self.assertTrue(fs_2.done()) def test_meta_executor_single(self): - with HPCExecutor(max_workers=1) as exe: + with PyMPIExecutor(max_workers=1) as exe: fs_1 = exe.submit(calc, 1) fs_2 = exe.submit(calc, 2) self.assertEqual(fs_1.result(), 1) @@ -91,7 +107,13 @@ def test_meta_executor_single(self): self.assertTrue(fs_2.done()) def test_meta_executor_parallel(self): - with HPCExecutor(max_workers=1, cores_per_worker=2) as exe: + with PyMPIExecutor(max_workers=1, cores_per_worker=2) as exe: fs_1 = exe.submit(mpi_funct, 1) self.assertEqual(fs_1.result(), [(1, 2, 0), (1, 2, 1)]) self.assertTrue(fs_1.done()) + + def test_errors(self): + with self.assertRaises(ValueError): + PyMPIExecutor(max_workers=1, cores_per_worker=1, threads_per_core=2) + with self.assertRaises(ValueError): + PyMPIExecutor(max_workers=1, cores_per_worker=1, gpus_per_worker=1) diff --git a/tests/test_multitask.py b/tests/test_multitask.py index 9b5ea46a..a9bdb17d 100644 --- a/tests/test_multitask.py +++ b/tests/test_multitask.py @@ -2,14 +2,13 @@ import unittest from queue import Queue from time import sleep -from pympipool import PoolExecutor -from pympipool.legacy.shared.interface import execute_serial_tasks -from pympipool.shared.taskexecutor import cloudpickle_register +from pympipool.legacy.interfaces.poolexecutor import PoolExecutor, execute_serial_tasks +from pympipool.shared.executorbase import cloudpickle_register from concurrent.futures import Future def calc(i): - return np.array(i ** 2) + return np.array(i**2) def sleep_one(i): @@ -19,7 +18,7 @@ def sleep_one(i): def wait_and_calc(n): sleep(1) - return n ** 2 + return n**2 def call_back(future): @@ -44,14 +43,11 @@ def test_pool_serial(self): def test_execute_task(self): f = Future() q = Queue() - q.put({"fn": calc, 'args': (), "kwargs": {"i": 2}, "future": f}) + q.put({"fn": calc, "args": (), "kwargs": {"i": 2}, "future": f}) q.put({"shutdown": True, "wait": True}) cloudpickle_register(ind=1) execute_serial_tasks( - future_queue=q, - cores=1, - oversubscribe=False, - enable_flux_backend=False + future_queue=q, cores=1, oversubscribe=False, enable_flux_backend=False ) self.assertEqual(f.result(), np.array(4)) q.join() @@ -76,14 +72,11 @@ def test_cancel_task(self): fs1 = Future() fs1.cancel() q = Queue() - q.put({"fn": sleep_one, 'args': (), "kwargs": {"i": 1}, "future": fs1}) + q.put({"fn": sleep_one, "args": (), "kwargs": {"i": 1}, "future": fs1}) q.put({"shutdown": True, "wait": True}) cloudpickle_register(ind=1) execute_serial_tasks( - future_queue=q, - cores=1, - oversubscribe=False, - enable_flux_backend=False + future_queue=q, cores=1, oversubscribe=False, enable_flux_backend=False ) self.assertTrue(fs1.done()) self.assertTrue(fs1.cancelled()) diff --git a/tests/test_parse.py b/tests/test_parse.py index 40cc8c3f..f44ef613 100644 --- a/tests/test_parse.py +++ b/tests/test_parse.py @@ -2,71 +2,115 @@ import sys import unittest from pympipool.shared.backend import parse_arguments -from pympipool.shared.connections import MpiExecInterface, FluxCmdInterface, SlurmSubprocessInterface +from pympipool.shared.interface import SlurmSubprocessInterface +from pympipool.legacy.shared.connections import MpiExecInterface, FluxCmdInterface class TestParser(unittest.TestCase): def test_command_local(self): result_dict = { - 'host': 'localhost', - 'zmqport': '22', + "host": "localhost", + "zmqport": "22", } command_lst = [ - 'mpiexec', - '-n', '2', - '--oversubscribe', - sys.executable, '/', - '--zmqport', result_dict['zmqport'] + "mpiexec", + "-n", + "2", + "--oversubscribe", + sys.executable, + "/", + "--zmqport", + result_dict["zmqport"], ] - interface = MpiExecInterface(cwd=None, cores=2, gpus_per_core=0, oversubscribe=True) + interface = MpiExecInterface( + cwd=None, cores=2, gpus_per_core=0, oversubscribe=True + ) self.assertEqual( command_lst, - interface.generate_command(command_lst=[sys.executable, '/', '--zmqport', result_dict['zmqport']]) + interface.generate_command( + command_lst=[sys.executable, "/", "--zmqport", result_dict["zmqport"]] + ), ) self.assertEqual(result_dict, parse_arguments(command_lst)) def test_command_flux(self): result_dict = { - 'host': "127.0.0.1", - 'zmqport': '22', + "host": "127.0.0.1", + "zmqport": "22", } command_lst = [ - 'flux', 'run', '-n', '2', + "flux", + "run", + "-n", + "2", "--cwd=" + os.path.abspath("."), - '--gpus-per-task=1', - sys.executable, '/', - '--host', result_dict['host'], - '--zmqport', result_dict['zmqport'] + "--gpus-per-task=1", + sys.executable, + "/", + "--host", + result_dict["host"], + "--zmqport", + result_dict["zmqport"], ] - interface = FluxCmdInterface(cwd=os.path.abspath("."), cores=2, gpus_per_core=1, oversubscribe=False) + interface = FluxCmdInterface( + cwd=os.path.abspath("."), cores=2, gpus_per_core=1, oversubscribe=False + ) self.assertEqual( command_lst, - interface.generate_command(command_lst=[sys.executable, '/', '--host', result_dict['host'], '--zmqport', result_dict['zmqport']]) + interface.generate_command( + command_lst=[ + sys.executable, + "/", + "--host", + result_dict["host"], + "--zmqport", + result_dict["zmqport"], + ] + ), ) self.assertEqual(result_dict, parse_arguments(command_lst)) def test_mpiexec_gpu(self): - interface = MpiExecInterface(cwd=os.path.abspath("."), cores=2, gpus_per_core=1, oversubscribe=True) + interface = MpiExecInterface( + cwd=os.path.abspath("."), cores=2, gpus_per_core=1, oversubscribe=True + ) with self.assertRaises(ValueError): interface.bootup(command_lst=[]) def test_command_slurm(self): result_dict = { - 'host': "127.0.0.1", - 'zmqport': '22', + "host": "127.0.0.1", + "zmqport": "22", } command_lst = [ - 'srun', '-n', '2', - "-D", os.path.abspath("."), - '--gpus-per-task=1', - '--oversubscribe', - sys.executable, '/', - '--host', result_dict['host'], - '--zmqport', result_dict['zmqport'] + "srun", + "-n", + "2", + "-D", + os.path.abspath("."), + "--gpus-per-task=1", + "--oversubscribe", + sys.executable, + "/", + "--host", + result_dict["host"], + "--zmqport", + result_dict["zmqport"], ] - interface = SlurmSubprocessInterface(cwd=os.path.abspath("."), cores=2, gpus_per_core=1, oversubscribe=True) + interface = SlurmSubprocessInterface( + cwd=os.path.abspath("."), cores=2, gpus_per_core=1, oversubscribe=True + ) self.assertEqual( command_lst, - interface.generate_command(command_lst=[sys.executable, '/', '--host', result_dict['host'], '--zmqport', result_dict['zmqport']]) + interface.generate_command( + command_lst=[ + sys.executable, + "/", + "--host", + result_dict["host"], + "--zmqport", + result_dict["zmqport"], + ] + ), ) self.assertEqual(result_dict, parse_arguments(command_lst)) diff --git a/tests/test_parse_legacy.py b/tests/test_parse_legacy.py index 2e35e148..46b4c041 100644 --- a/tests/test_parse_legacy.py +++ b/tests/test_parse_legacy.py @@ -3,74 +3,97 @@ import sys from pympipool.legacy.shared.backend import parse_arguments -from pympipool.shared.connections import MpiExecInterface, FluxCmdInterface +from pympipool.legacy.shared.connections import MpiExecInterface, FluxCmdInterface class TestParser(unittest.TestCase): def test_command_local(self): result_dict = { - 'host': 'localhost', - 'total_cores': '2', - 'zmqport': '22', - 'cores_per_task': '1' + "host": "localhost", + "total_cores": "2", + "zmqport": "22", + "cores_per_task": "1", } command_lst = [ - 'mpiexec', - '-n', result_dict['total_cores'], - '--oversubscribe', - sys.executable, '-m', 'mpi4py.futures', '/', - '--zmqport', result_dict['zmqport'], - '--cores-per-task', result_dict['cores_per_task'], - '--cores-total', result_dict['total_cores'] + "mpiexec", + "-n", + result_dict["total_cores"], + "--oversubscribe", + sys.executable, + "-m", + "mpi4py.futures", + "/", + "--zmqport", + result_dict["zmqport"], + "--cores-per-task", + result_dict["cores_per_task"], + "--cores-total", + result_dict["total_cores"], ] interface = MpiExecInterface( - cwd=None, - cores=2, - gpus_per_core=0, - oversubscribe=True + cwd=None, cores=2, gpus_per_core=0, oversubscribe=True ) self.assertEqual( command_lst, interface.generate_command( command_lst=[ - sys.executable, '-m', 'mpi4py.futures', '/', - '--zmqport', result_dict['zmqport'], - '--cores-per-task', '1', '--cores-total', '2' + sys.executable, + "-m", + "mpi4py.futures", + "/", + "--zmqport", + result_dict["zmqport"], + "--cores-per-task", + "1", + "--cores-total", + "2", ] - ) + ), ) self.assertEqual(result_dict, parse_arguments(command_lst)) def test_command_flux(self): result_dict = { - 'host': "127.0.0.1", - 'total_cores': '2', - 'zmqport': '22', - 'cores_per_task': '2' + "host": "127.0.0.1", + "total_cores": "2", + "zmqport": "22", + "cores_per_task": "2", } command_lst = [ - 'flux', 'run', '-n', '1', + "flux", + "run", + "-n", + "1", "--cwd=" + os.path.abspath("."), - sys.executable, '/', - '--host', result_dict['host'], - '--zmqport', result_dict['zmqport'], - '--cores-per-task', result_dict['cores_per_task'], - '--cores-total', result_dict['total_cores'] + sys.executable, + "/", + "--host", + result_dict["host"], + "--zmqport", + result_dict["zmqport"], + "--cores-per-task", + result_dict["cores_per_task"], + "--cores-total", + result_dict["total_cores"], ] interface = FluxCmdInterface( - cwd=os.path.abspath("."), - cores=1, - gpus_per_core=0, - oversubscribe=False + cwd=os.path.abspath("."), cores=1, gpus_per_core=0, oversubscribe=False ) self.assertEqual( command_lst, interface.generate_command( command_lst=[ - sys.executable, '/', '--host', result_dict['host'], - '--zmqport', result_dict['zmqport'], - '--cores-per-task', '2', '--cores-total', '2' + sys.executable, + "/", + "--host", + result_dict["host"], + "--zmqport", + result_dict["zmqport"], + "--cores-per-task", + "2", + "--cores-total", + "2", ] - ) + ), ) self.assertEqual(result_dict, parse_arguments(command_lst)) diff --git a/tests/test_pool.py b/tests/test_pool.py index f5a6e56c..f574b055 100644 --- a/tests/test_pool.py +++ b/tests/test_pool.py @@ -1,10 +1,10 @@ import numpy as np import unittest -from pympipool import Pool +from pympipool.legacy.interfaces.pool import Pool def calc(i): - return np.array(i ** 2) + return np.array(i**2) def calc_add(i, j): diff --git a/tests/test_queue.py b/tests/test_queue.py index 0e61b811..0fd49d09 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -1,7 +1,7 @@ import unittest from concurrent.futures import Future, CancelledError from queue import Queue -from pympipool.shared.taskexecutor import cancel_items_in_queue +from pympipool.shared.executorbase import cancel_items_in_queue class TestQueue(unittest.TestCase): diff --git a/tests/test_serial.py b/tests/test_serial.py index a3861a9d..2f4a7075 100644 --- a/tests/test_serial.py +++ b/tests/test_serial.py @@ -14,14 +14,18 @@ def set_global(): def submit(socket): - socket.send(cloudpickle.dumps({"init": True, "fn": set_global, "args": (), "kwargs": {}})) - socket.send(cloudpickle.dumps({"fn": calc, 'args': (), "kwargs": {"i": 2}})) + socket.send( + cloudpickle.dumps({"init": True, "fn": set_global, "args": (), "kwargs": {}}) + ) + socket.send(cloudpickle.dumps({"fn": calc, "args": (), "kwargs": {"i": 2}})) socket.send(cloudpickle.dumps({"shutdown": True, "wait": True})) def submit_error(socket): - socket.send(cloudpickle.dumps({"init": True, "fn": set_global, "args": (), "kwargs": {}})) - socket.send(cloudpickle.dumps({"fn": calc, 'args': (), "kwargs": {}})) + socket.send( + cloudpickle.dumps({"init": True, "fn": set_global, "args": (), "kwargs": {}}) + ) + socket.send(cloudpickle.dumps({"fn": calc, "args": (), "kwargs": {}})) socket.send(cloudpickle.dumps({"shutdown": True, "wait": True})) @@ -33,8 +37,8 @@ def test_main_as_thread(self): t = Thread(target=main, kwargs={"argument_lst": ["--zmqport", str(port)]}) t.start() submit(socket=socket) - self.assertEqual(cloudpickle.loads(socket.recv()), {'result': 7}) - self.assertEqual(cloudpickle.loads(socket.recv()), {'result': True}) + self.assertEqual(cloudpickle.loads(socket.recv()), {"result": 7}) + self.assertEqual(cloudpickle.loads(socket.recv()), {"result": True}) socket.close() context.term() @@ -45,8 +49,10 @@ def test_main_as_thread_error(self): t = Thread(target=main, kwargs={"argument_lst": ["--zmqport", str(port)]}) t.start() submit_error(socket=socket) - self.assertEqual(cloudpickle.loads(socket.recv())['error_type'], "") - self.assertEqual(cloudpickle.loads(socket.recv()), {'result': True}) + self.assertEqual( + cloudpickle.loads(socket.recv())["error_type"], "" + ) + self.assertEqual(cloudpickle.loads(socket.recv()), {"result": True}) socket.close() context.term() @@ -57,8 +63,8 @@ def test_submit_as_thread(self): t = Thread(target=submit, kwargs={"socket": socket}) t.start() main(argument_lst=["--zmqport", str(port)]) - self.assertEqual(cloudpickle.loads(socket.recv()), {'result': 7}) - self.assertEqual(cloudpickle.loads(socket.recv()), {'result': True}) + self.assertEqual(cloudpickle.loads(socket.recv()), {"result": 7}) + self.assertEqual(cloudpickle.loads(socket.recv()), {"result": True}) socket.close() context.term() @@ -69,7 +75,9 @@ def test_submit_as_thread_error(self): t = Thread(target=submit_error, kwargs={"socket": socket}) t.start() main(argument_lst=["--zmqport", str(port)]) - self.assertEqual(cloudpickle.loads(socket.recv())['error_type'], "") - self.assertEqual(cloudpickle.loads(socket.recv()), {'result': True}) + self.assertEqual( + cloudpickle.loads(socket.recv())["error_type"], "" + ) + self.assertEqual(cloudpickle.loads(socket.recv()), {"result": True}) socket.close() context.term() diff --git a/tests/test_task.py b/tests/test_task.py index 2fb12da3..bf1e166b 100644 --- a/tests/test_task.py +++ b/tests/test_task.py @@ -1,5 +1,5 @@ import unittest -from pympipool import Executor +from pympipool.mpi.mpitask import PyMPISingleTaskExecutor def echo_funct(i): @@ -8,6 +8,7 @@ def echo_funct(i): def mpi_funct(i): from mpi4py import MPI + size = MPI.COMM_WORLD.Get_size() rank = MPI.COMM_WORLD.Get_rank() return i, size, rank @@ -15,17 +16,17 @@ def mpi_funct(i): class TestTask(unittest.TestCase): def test_echo(self): - with Executor(cores=2) as p: + with PyMPISingleTaskExecutor(cores=2) as p: output = p.submit(echo_funct, 2).result() self.assertEqual(output, [2, 2]) def test_mpi(self): - with Executor(cores=2) as p: + with PyMPISingleTaskExecutor(cores=2) as p: output = p.submit(mpi_funct, 2).result() self.assertEqual(output, [(2, 2, 0), (2, 2, 1)]) def test_mpi_multiple(self): - with Executor(cores=2) as p: + with PyMPISingleTaskExecutor(cores=2) as p: fs1 = p.submit(mpi_funct, 1) fs2 = p.submit(mpi_funct, 2) fs3 = p.submit(mpi_funct, 3) @@ -34,8 +35,7 @@ def test_mpi_multiple(self): fs2.result(), fs3.result(), ] - self.assertEqual(output, [ - [(1, 2, 0), (1, 2, 1)], - [(2, 2, 0), (2, 2, 1)], - [(3, 2, 0), (3, 2, 1)] - ]) + self.assertEqual( + output, + [[(1, 2, 0), (1, 2, 1)], [(2, 2, 0), (2, 2, 1)], [(3, 2, 0), (3, 2, 1)]], + ) diff --git a/tests/test_worker.py b/tests/test_worker.py index 1222467f..546b5626 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -3,13 +3,13 @@ from queue import Queue from time import sleep from concurrent.futures import CancelledError -from pympipool import Executor -from pympipool.shared.taskexecutor import execute_parallel_tasks, cloudpickle_register +from pympipool.mpi.mpitask import PyMPISingleTaskExecutor, _mpi_execute_parallel_tasks +from pympipool.shared.executorbase import cloudpickle_register from concurrent.futures import Future def calc(i): - return np.array(i ** 2) + return np.array(i**2) def sleep_one(i): @@ -19,6 +19,7 @@ def sleep_one(i): def mpi_funct(i): from mpi4py import MPI + size = MPI.COMM_WORLD.Get_size() rank = MPI.COMM_WORLD.Get_rank() return i, size, rank @@ -30,7 +31,7 @@ def raise_error(): class TestFuturePool(unittest.TestCase): def test_pool_serial(self): - with Executor(cores=1) as p: + with PyMPISingleTaskExecutor(cores=1) as p: output = p.submit(calc, i=2) self.assertEqual(len(p), 1) self.assertTrue(isinstance(output, Future)) @@ -41,7 +42,7 @@ def test_pool_serial(self): self.assertEqual(output.result(), np.array(4)) def test_executor_multi_submission(self): - with Executor(cores=1) as p: + with PyMPISingleTaskExecutor(cores=1) as p: fs_1 = p.submit(calc, i=2) fs_2 = p.submit(calc, i=2) self.assertEqual(fs_1.result(), np.array(4)) @@ -50,7 +51,7 @@ def test_executor_multi_submission(self): self.assertTrue(fs_2.done()) def test_shutdown(self): - p = Executor(cores=1) + p = PyMPISingleTaskExecutor(cores=1) fs1 = p.submit(sleep_one, i=2) fs2 = p.submit(sleep_one, i=4) sleep(1) @@ -62,23 +63,23 @@ def test_shutdown(self): fs2.result() def test_pool_serial_map(self): - with Executor(cores=1) as p: + with PyMPISingleTaskExecutor(cores=1) as p: output = p.map(calc, [1, 2, 3]) self.assertEqual(list(output), [np.array(1), np.array(4), np.array(9)]) def test_executor_exception(self): with self.assertRaises(RuntimeError): - with Executor(cores=1) as p: + with PyMPISingleTaskExecutor(cores=1) as p: p.submit(raise_error) def test_executor_exception_future(self): with self.assertRaises(RuntimeError): - with Executor(cores=1) as p: + with PyMPISingleTaskExecutor(cores=1) as p: fs = p.submit(raise_error) fs.result() def test_pool_multi_core(self): - with Executor(cores=2) as p: + with PyMPISingleTaskExecutor(cores=2) as p: output = p.submit(mpi_funct, i=2) self.assertEqual(len(p), 1) self.assertTrue(isinstance(output, Future)) @@ -89,49 +90,49 @@ def test_pool_multi_core(self): self.assertEqual(output.result(), [(2, 2, 0), (2, 2, 1)]) def test_pool_multi_core_map(self): - with Executor(cores=2) as p: + with PyMPISingleTaskExecutor(cores=2) as p: output = p.map(mpi_funct, [1, 2, 3]) - self.assertEqual(list(output), [[(1, 2, 0), (1, 2, 1)], [(2, 2, 0), (2, 2, 1)], [(3, 2, 0), (3, 2, 1)]]) + self.assertEqual( + list(output), + [[(1, 2, 0), (1, 2, 1)], [(2, 2, 0), (2, 2, 1)], [(3, 2, 0), (3, 2, 1)]], + ) def test_execute_task_failed_no_argument(self): f = Future() q = Queue() - q.put({"fn": calc, 'args': (), "kwargs": {}, "future": f}) + q.put({"fn": calc, "args": (), "kwargs": {}, "future": f}) cloudpickle_register(ind=1) with self.assertRaises(TypeError): - execute_parallel_tasks( + _mpi_execute_parallel_tasks( future_queue=q, cores=1, oversubscribe=False, - enable_flux_backend=False ) q.join() def test_execute_task_failed_wrong_argument(self): f = Future() q = Queue() - q.put({"fn": calc, 'args': (), "kwargs": {"j": 4}, "future": f}) + q.put({"fn": calc, "args": (), "kwargs": {"j": 4}, "future": f}) cloudpickle_register(ind=1) with self.assertRaises(TypeError): - execute_parallel_tasks( + _mpi_execute_parallel_tasks( future_queue=q, cores=1, oversubscribe=False, - enable_flux_backend=False ) q.join() def test_execute_task(self): f = Future() q = Queue() - q.put({"fn": calc, 'args': (), "kwargs": {"i": 2}, "future": f}) + q.put({"fn": calc, "args": (), "kwargs": {"i": 2}, "future": f}) q.put({"shutdown": True, "wait": True}) cloudpickle_register(ind=1) - execute_parallel_tasks( + _mpi_execute_parallel_tasks( future_queue=q, cores=1, oversubscribe=False, - enable_flux_backend=False ) self.assertEqual(f.result(), np.array(4)) q.join() @@ -139,14 +140,13 @@ def test_execute_task(self): def test_execute_task_parallel(self): f = Future() q = Queue() - q.put({"fn": calc, 'args': (), "kwargs": {"i": 2}, "future": f}) + q.put({"fn": calc, "args": (), "kwargs": {"i": 2}, "future": f}) q.put({"shutdown": True, "wait": True}) cloudpickle_register(ind=1) - execute_parallel_tasks( + _mpi_execute_parallel_tasks( future_queue=q, cores=2, oversubscribe=False, - enable_flux_backend=False ) self.assertEqual(f.result(), [np.array(4), np.array(4)]) q.join() diff --git a/tests/test_worker_memory.py b/tests/test_worker_memory.py index 468657fc..f49ee1cf 100644 --- a/tests/test_worker_memory.py +++ b/tests/test_worker_memory.py @@ -1,9 +1,9 @@ import unittest import numpy as np from queue import Queue -from pympipool import Executor from pympipool.shared.backend import call_funct -from pympipool.shared.taskexecutor import execute_parallel_tasks, cloudpickle_register +from pympipool.shared.executorbase import cloudpickle_register +from pympipool.mpi.mpitask import PyMPISingleTaskExecutor, _mpi_execute_parallel_tasks from concurrent.futures import Future @@ -17,30 +17,32 @@ def set_global(): class TestWorkerMemory(unittest.TestCase): def test_internal_memory(self): - with Executor(cores=1, init_function=set_global) as p: + with PyMPISingleTaskExecutor(cores=1, init_function=set_global) as p: f = p.submit(get_global) self.assertFalse(f.done()) self.assertEqual(f.result(), np.array([5])) self.assertTrue(f.done()) def test_call_funct(self): - self.assertEqual(call_funct( - input_dict={"fn": get_global, "args": (), "kwargs": {}}, - memory={"memory": 4} - ), 4) + self.assertEqual( + call_funct( + input_dict={"fn": get_global, "args": (), "kwargs": {}}, + memory={"memory": 4}, + ), + 4, + ) def test_execute_task(self): f = Future() q = Queue() q.put({"init": True, "fn": set_global, "args": (), "kwargs": {}}) - q.put({"fn": get_global, 'args': (), "kwargs": {}, "future": f}) + q.put({"fn": get_global, "args": (), "kwargs": {}, "future": f}) q.put({"shutdown": True, "wait": True}) cloudpickle_register(ind=1) - execute_parallel_tasks( + _mpi_execute_parallel_tasks( future_queue=q, cores=1, oversubscribe=False, - enable_flux_backend=False ) self.assertEqual(f.result(), np.array([5])) q.join() diff --git a/tests/test_zmq.py b/tests/test_zmq.py index 72b0de06..4fc48ee7 100644 --- a/tests/test_zmq.py +++ b/tests/test_zmq.py @@ -4,7 +4,7 @@ interface_connect, interface_shutdown, interface_send, - interface_receive + interface_receive, )