diff --git a/pympipool/flux/fluxbroker.py b/pympipool/flux/fluxbroker.py index ac96332f..b0edafd2 100644 --- a/pympipool/flux/fluxbroker.py +++ b/pympipool/flux/fluxbroker.py @@ -1,7 +1,6 @@ from pympipool.shared.executorbase import ( ExecutorBase, executor_broker, - get_executor_dict, ) from pympipool.shared.thread import RaisingThread from pympipool.flux.fluxtask import PyFluxSingleTaskExecutor @@ -33,44 +32,20 @@ def __init__( ): super().__init__() self._process = RaisingThread( - target=_flux_executor_broker, + target=executor_broker, kwargs={ + # Broker Arguments "future_queue": self._future_queue, "max_workers": max_workers, - "cores_per_worker": cores_per_worker, + "sleep_interval": sleep_interval, + "executor_class": PyFluxSingleTaskExecutor, + # Executor Arguments + "cores": cores_per_worker, "threads_per_core": threads_per_core, - "gpus_per_worker": gpus_per_worker, + "gpus_per_task": int(gpus_per_worker / cores_per_worker), "init_function": init_function, "cwd": cwd, - "sleep_interval": sleep_interval, "executor": executor, }, ) self._process.start() - - -def _flux_executor_broker( - future_queue, - max_workers, - cores_per_worker=1, - threads_per_core=1, - gpus_per_worker=0, - init_function=None, - cwd=None, - sleep_interval=0.1, - executor=None, -): - executor_broker( - future_queue=future_queue, - meta_future_lst=get_executor_dict( - max_workers=max_workers, - executor_class=PyFluxSingleTaskExecutor, - cores=cores_per_worker, - threads_per_core=threads_per_core, - gpus_per_task=int(gpus_per_worker / cores_per_worker), - init_function=init_function, - cwd=cwd, - executor=executor, - ), - sleep_interval=sleep_interval, - ) diff --git a/pympipool/flux/fluxtask.py b/pympipool/flux/fluxtask.py index 05fc3697..89925cea 100644 --- a/pympipool/flux/fluxtask.py +++ b/pympipool/flux/fluxtask.py @@ -5,11 +5,9 @@ from pympipool.shared.executorbase import ( cloudpickle_register, ExecutorBase, - execute_parallel_tasks_loop, - get_backend_path, + execute_parallel_tasks, ) from pympipool.shared.interface import BaseInterface -from pympipool.shared.communication import interface_bootup from pympipool.shared.thread import RaisingThread @@ -61,12 +59,15 @@ def __init__( ): super().__init__() self._process = RaisingThread( - target=_flux_execute_parallel_tasks, + target=execute_parallel_tasks, kwargs={ + # Executor Arguments "future_queue": self._future_queue, "cores": cores, + "interface_class": FluxPythonInterface, + # Interface Arguments "threads_per_core": threads_per_core, - "gpus_per_task": gpus_per_task, + "gpus_per_core": gpus_per_task, "cwd": cwd, "executor": executor, }, @@ -129,38 +130,3 @@ def shutdown(self, wait=True): def poll(self): return self._future is not None and not self._future.done() - - -def _flux_execute_parallel_tasks( - future_queue, - cores, - threads_per_core=1, - gpus_per_task=0, - cwd=None, - executor=None, -): - """ - Execute a single tasks in parallel using the message passing interface (MPI). - - Args: - future_queue (queue.Queue): task queue of dictionary objects which are submitted to the parallel process - cores (int): defines the total number of MPI ranks to use - threads_per_core (int): number of OpenMP threads to be used for each function call - gpus_per_task (int): number of GPUs per MPI rank - defaults to 0 - cwd (str/None): current working directory where the parallel python task is executed - executor (flux.job.FluxExecutor/None): flux executor to submit tasks to - optional - """ - execute_parallel_tasks_loop( - interface=interface_bootup( - command_lst=get_backend_path(cores=cores), - connections=FluxPythonInterface( - cwd=cwd, - cores=cores, - threads_per_core=threads_per_core, - gpus_per_core=gpus_per_task, - oversubscribe=False, - executor=executor, - ), - ), - future_queue=future_queue, - ) diff --git a/pympipool/mpi/mpibroker.py b/pympipool/mpi/mpibroker.py index 76c26797..0b660a21 100644 --- a/pympipool/mpi/mpibroker.py +++ b/pympipool/mpi/mpibroker.py @@ -1,7 +1,6 @@ from pympipool.shared.executorbase import ( ExecutorBase, executor_broker, - get_executor_dict, ) from pympipool.shared.thread import RaisingThread from pympipool.mpi.mpitask import PyMPISingleTaskExecutor @@ -46,47 +45,21 @@ def __init__( + "to manage GPUs use the SLURM queuing system enable_slurm_backend=True ." ) self._process = RaisingThread( - target=_mpi_executor_broker, + target=executor_broker, kwargs={ + # Broker Arguments "future_queue": self._future_queue, "max_workers": max_workers, - "cores_per_worker": cores_per_worker, + "sleep_interval": sleep_interval, + "executor_class": PyMPISingleTaskExecutor, + # Executor Arguments + "cores": cores_per_worker, "threads_per_core": threads_per_core, - "gpus_per_worker": gpus_per_worker, + "gpus_per_task": int(gpus_per_worker / cores_per_worker), "oversubscribe": oversubscribe, "init_function": init_function, "cwd": cwd, - "sleep_interval": sleep_interval, "enable_slurm_backend": enable_slurm_backend, }, ) self._process.start() - - -def _mpi_executor_broker( - future_queue, - max_workers, - cores_per_worker=1, - threads_per_core=1, - gpus_per_worker=0, - oversubscribe=False, - init_function=None, - cwd=None, - sleep_interval=0.1, - enable_slurm_backend=False, -): - executor_broker( - future_queue=future_queue, - meta_future_lst=get_executor_dict( - max_workers=max_workers, - executor_class=PyMPISingleTaskExecutor, - cores=cores_per_worker, - threads_per_core=threads_per_core, - gpus_per_task=int(gpus_per_worker / cores_per_worker), - oversubscribe=oversubscribe, - init_function=init_function, - cwd=cwd, - enable_slurm_backend=enable_slurm_backend, - ), - sleep_interval=sleep_interval, - ) diff --git a/pympipool/mpi/mpitask.py b/pympipool/mpi/mpitask.py index ec3acd4c..61834f19 100644 --- a/pympipool/mpi/mpitask.py +++ b/pympipool/mpi/mpitask.py @@ -1,11 +1,9 @@ from pympipool.shared.executorbase import ( cloudpickle_register, - execute_parallel_tasks_loop, + execute_parallel_tasks, ExecutorBase, - get_backend_path, ) from pympipool.shared.thread import RaisingThread -from pympipool.shared.communication import interface_bootup from pympipool.shared.interface import MpiExecInterface, SlurmSubprocessInterface @@ -59,12 +57,15 @@ def __init__( ): super().__init__() self._process = RaisingThread( - target=_mpi_execute_parallel_tasks, + target=execute_parallel_tasks, kwargs={ + # Executor Arguments "future_queue": self._future_queue, "cores": cores, + "interface_class": get_interface, + # Interface Arguments "threads_per_core": threads_per_core, - "gpus_per_task": gpus_per_task, + "gpus_per_core": gpus_per_task, "cwd": cwd, "oversubscribe": oversubscribe, "enable_slurm_backend": enable_slurm_backend, @@ -78,47 +79,10 @@ def __init__( cloudpickle_register(ind=3) -def _mpi_execute_parallel_tasks( - future_queue, - cores, - threads_per_core=1, - gpus_per_task=0, - cwd=None, - oversubscribe=False, - enable_slurm_backend=False, -): - """ - Execute a single tasks in parallel using the message passing interface (MPI). - - Args: - future_queue (queue.Queue): task queue of dictionary objects which are submitted to the parallel process - cores (int): defines the total number of MPI ranks to use - threads_per_core (int): number of OpenMP threads to be used for each function call - gpus_per_task (int): number of GPUs per MPI rank - defaults to 0 - cwd (str/None): current working directory where the parallel python task is executed - oversubscribe (bool): enable of disable the oversubscribe feature of OpenMPI - defaults to False - enable_slurm_backend (bool): enable the SLURM queueing system as backend - defaults to False - """ - execute_parallel_tasks_loop( - interface=interface_bootup( - command_lst=get_backend_path(cores=cores), - connections=get_interface( - cores=cores, - threads_per_core=threads_per_core, - gpus_per_task=gpus_per_task, - cwd=cwd, - oversubscribe=oversubscribe, - enable_slurm_backend=enable_slurm_backend, - ), - ), - future_queue=future_queue, - ) - - def get_interface( cores=1, threads_per_core=1, - gpus_per_task=0, + gpus_per_core=0, cwd=None, oversubscribe=False, enable_slurm_backend=False, @@ -128,7 +92,7 @@ def get_interface( cwd=cwd, cores=cores, threads_per_core=threads_per_core, - gpus_per_core=gpus_per_task, + gpus_per_core=gpus_per_core, oversubscribe=oversubscribe, ) else: @@ -136,6 +100,6 @@ def get_interface( cwd=cwd, cores=cores, threads_per_core=threads_per_core, - gpus_per_core=gpus_per_task, + gpus_per_core=gpus_per_core, oversubscribe=oversubscribe, ) diff --git a/pympipool/shared/executorbase.py b/pympipool/shared/executorbase.py index 8f2aafc2..ae4b687b 100644 --- a/pympipool/shared/executorbase.py +++ b/pympipool/shared/executorbase.py @@ -11,6 +11,8 @@ import cloudpickle +from pympipool.shared.communication import interface_bootup + class ExecutorBase(FutureExecutor): def __init__(self): @@ -97,6 +99,29 @@ def cloudpickle_register(ind=2): pass +def execute_parallel_tasks( + future_queue, + cores, + interface_class, + **kwargs, +): + """ + Execute a single tasks in parallel using the message passing interface (MPI). + + Args: + future_queue (queue.Queue): task queue of dictionary objects which are submitted to the parallel process + cores (int): defines the total number of MPI ranks to use + interface_class: + """ + execute_parallel_tasks_loop( + interface=interface_bootup( + command_lst=_get_backend_path(cores=cores), + connections=interface_class(cores=cores, **kwargs), + ), + future_queue=future_queue, + ) + + def execute_parallel_tasks_loop(interface, future_queue): while True: task_dict = future_queue.get() @@ -123,9 +148,16 @@ def execute_parallel_tasks_loop(interface, future_queue): def executor_broker( future_queue, - meta_future_lst, + max_workers, + executor_class, sleep_interval=0.1, + **kwargs, ): + meta_future_lst = _get_executor_dict( + max_workers=max_workers, + executor_class=executor_class, + **kwargs, + ) while True: try: task_dict = future_queue.get_nowait() @@ -154,11 +186,7 @@ def execute_task_dict(task_dict, meta_future_lst): raise ValueError("Unrecognized Task in task_dict: ", task_dict) -def _get_command_path(executable): - return os.path.abspath(os.path.join(__file__, "..", "..", "backend", executable)) - - -def get_backend_path(cores): +def _get_backend_path(cores): command_lst = [sys.executable] if cores > 1: command_lst += [_get_command_path(executable="mpiexec.py")] @@ -167,11 +195,15 @@ def get_backend_path(cores): return command_lst -def get_executor_dict(max_workers, executor_class, **kwargs): - return {get_future_done(): executor_class(**kwargs) for _ in range(max_workers)} +def _get_command_path(executable): + return os.path.abspath(os.path.join(__file__, "..", "..", "backend", executable)) + + +def _get_executor_dict(max_workers, executor_class, **kwargs): + return {_get_future_done(): executor_class(**kwargs) for _ in range(max_workers)} -def get_future_done(): +def _get_future_done(): f = Future() f.set_result(True) return f diff --git a/tests/test_flux.py b/tests/test_flux.py index 9ebd732c..4cfdf5c9 100644 --- a/tests/test_flux.py +++ b/tests/test_flux.py @@ -4,16 +4,13 @@ import numpy as np import unittest -from pympipool.shared.executorbase import cloudpickle_register +from pympipool.shared.executorbase import cloudpickle_register, executor_broker, execute_parallel_tasks try: import flux.job - from pympipool.flux.fluxbroker import PyFluxExecutor, _flux_executor_broker - from pympipool.flux.fluxtask import ( - _flux_execute_parallel_tasks, - PyFluxSingleTaskExecutor, - ) + from pympipool.flux.fluxbroker import PyFluxExecutor + from pympipool.flux.fluxtask import PyFluxSingleTaskExecutor, FluxPythonInterface skip_flux_test = False except ImportError: @@ -89,7 +86,12 @@ def test_execute_task(self): q.put({"fn": calc, "args": (), "kwargs": {"i": 2}, "future": f}) q.put({"shutdown": True, "wait": True}) cloudpickle_register(ind=1) - _flux_execute_parallel_tasks(future_queue=q, cores=1, executor=self.executor) + execute_parallel_tasks( + future_queue=q, + cores=1, + executor=self.executor, + interface_class=FluxPythonInterface, + ) self.assertEqual(f.result(), 2) q.join() @@ -99,8 +101,12 @@ def test_execute_task_threads(self): q.put({"fn": calc, "args": (), "kwargs": {"i": 2}, "future": f}) q.put({"shutdown": True, "wait": True}) cloudpickle_register(ind=1) - _flux_execute_parallel_tasks( - future_queue=q, cores=1, threads_per_core=1, executor=self.executor + execute_parallel_tasks( + future_queue=q, + cores=1, + threads_per_core=1, + executor=self.executor, + interface_class=FluxPythonInterface, ) self.assertEqual(f.result(), 2) q.join() @@ -119,7 +125,12 @@ def test_executor_broker(self): f = Future() q.put({"fn": calc, "args": (1,), "kwargs": {}, "future": f}) q.put({"shutdown": True, "wait": True}) - _flux_executor_broker(future_queue=q, max_workers=1, executor=self.executor) + executor_broker( + future_queue=q, + max_workers=1, + executor=self.executor, + executor_class=PyFluxSingleTaskExecutor, + ) self.assertTrue(f.done()) self.assertEqual(f.result(), 1) q.join() @@ -129,8 +140,12 @@ def test_executor_broker_threads(self): f = Future() q.put({"fn": calc, "args": (1,), "kwargs": {}, "future": f}) q.put({"shutdown": True, "wait": True}) - _flux_executor_broker( - future_queue=q, max_workers=1, threads_per_core=2, executor=self.executor + executor_broker( + future_queue=q, + max_workers=1, + threads_per_core=2, + executor=self.executor, + executor_class=PyFluxSingleTaskExecutor, ) self.assertTrue(f.done()) self.assertEqual(f.result(), 1) diff --git a/tests/test_meta.py b/tests/test_meta.py index e11d13c1..2981204d 100644 --- a/tests/test_meta.py +++ b/tests/test_meta.py @@ -2,15 +2,13 @@ from queue import Queue import unittest from pympipool.shared.executorbase import ( + executor_broker, execute_task_dict, - get_executor_dict, - get_future_done, + _get_executor_dict, + _get_future_done, ) from pympipool.mpi.mpitask import PyMPISingleTaskExecutor -from pympipool.mpi.mpibroker import ( - PyMPIExecutor, - _mpi_executor_broker, -) +from pympipool.mpi.mpibroker import PyMPIExecutor def calc(i): @@ -27,14 +25,14 @@ def mpi_funct(i): class TestFutureCreation(unittest.TestCase): def test_get_future_done(self): - f = get_future_done() + f = _get_future_done() self.assertTrue(isinstance(f, Future)) self.assertTrue(f.done()) class TestMetaExecutorFuture(unittest.TestCase): def test_meta_executor_future(self): - meta_future = get_executor_dict( + meta_future = _get_executor_dict( max_workers=1, executor_class=PyMPISingleTaskExecutor, ) @@ -47,7 +45,7 @@ def test_meta_executor_future(self): executor_obj.shutdown(wait=True) def test_execute_task_dict(self): - meta_future_lst = get_executor_dict( + meta_future_lst = _get_executor_dict( max_workers=1, executor_class=PyMPISingleTaskExecutor, ) @@ -68,7 +66,7 @@ def test_execute_task_dict(self): ) def test_execute_task_dict_error(self): - meta_future_lst = get_executor_dict( + meta_future_lst = _get_executor_dict( max_workers=1, executor_class=PyMPISingleTaskExecutor, ) @@ -81,7 +79,7 @@ def test_executor_broker(self): f = Future() q.put({"fn": calc, "args": (1,), "kwargs": {}, "future": f}) q.put({"shutdown": True, "wait": True}) - _mpi_executor_broker(future_queue=q, max_workers=1) + executor_broker(future_queue=q, max_workers=1, executor_class=PyMPISingleTaskExecutor) self.assertTrue(f.done()) self.assertEqual(f.result(), 1) q.join() diff --git a/tests/test_worker.py b/tests/test_worker.py index 546b5626..7a396701 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -3,8 +3,8 @@ from queue import Queue from time import sleep from concurrent.futures import CancelledError -from pympipool.mpi.mpitask import PyMPISingleTaskExecutor, _mpi_execute_parallel_tasks -from pympipool.shared.executorbase import cloudpickle_register +from pympipool.mpi.mpitask import PyMPISingleTaskExecutor, get_interface +from pympipool.shared.executorbase import cloudpickle_register, execute_parallel_tasks from concurrent.futures import Future @@ -103,10 +103,11 @@ def test_execute_task_failed_no_argument(self): q.put({"fn": calc, "args": (), "kwargs": {}, "future": f}) cloudpickle_register(ind=1) with self.assertRaises(TypeError): - _mpi_execute_parallel_tasks( + execute_parallel_tasks( future_queue=q, cores=1, oversubscribe=False, + interface_class=get_interface, ) q.join() @@ -116,10 +117,11 @@ def test_execute_task_failed_wrong_argument(self): q.put({"fn": calc, "args": (), "kwargs": {"j": 4}, "future": f}) cloudpickle_register(ind=1) with self.assertRaises(TypeError): - _mpi_execute_parallel_tasks( + execute_parallel_tasks( future_queue=q, cores=1, oversubscribe=False, + interface_class=get_interface, ) q.join() @@ -129,10 +131,11 @@ def test_execute_task(self): q.put({"fn": calc, "args": (), "kwargs": {"i": 2}, "future": f}) q.put({"shutdown": True, "wait": True}) cloudpickle_register(ind=1) - _mpi_execute_parallel_tasks( + execute_parallel_tasks( future_queue=q, cores=1, oversubscribe=False, + interface_class=get_interface, ) self.assertEqual(f.result(), np.array(4)) q.join() @@ -143,10 +146,11 @@ def test_execute_task_parallel(self): q.put({"fn": calc, "args": (), "kwargs": {"i": 2}, "future": f}) q.put({"shutdown": True, "wait": True}) cloudpickle_register(ind=1) - _mpi_execute_parallel_tasks( + execute_parallel_tasks( future_queue=q, cores=2, oversubscribe=False, + interface_class=get_interface, ) self.assertEqual(f.result(), [np.array(4), np.array(4)]) q.join() diff --git a/tests/test_worker_memory.py b/tests/test_worker_memory.py index f49ee1cf..3c668dff 100644 --- a/tests/test_worker_memory.py +++ b/tests/test_worker_memory.py @@ -2,8 +2,8 @@ import numpy as np from queue import Queue from pympipool.shared.backend import call_funct -from pympipool.shared.executorbase import cloudpickle_register -from pympipool.mpi.mpitask import PyMPISingleTaskExecutor, _mpi_execute_parallel_tasks +from pympipool.shared.executorbase import cloudpickle_register, execute_parallel_tasks +from pympipool.mpi.mpitask import PyMPISingleTaskExecutor, get_interface from concurrent.futures import Future @@ -39,10 +39,11 @@ def test_execute_task(self): q.put({"fn": get_global, "args": (), "kwargs": {}, "future": f}) q.put({"shutdown": True, "wait": True}) cloudpickle_register(ind=1) - _mpi_execute_parallel_tasks( + execute_parallel_tasks( future_queue=q, cores=1, oversubscribe=False, + interface_class=get_interface, ) self.assertEqual(f.result(), np.array([5])) q.join()