From 98a8652b34f1f0551fb7e07865254bd1de85173e Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Fri, 11 Aug 2023 17:40:11 -0600 Subject: [PATCH 1/7] Separate the input for the broker and the executor --- pympipool/flux/fluxbroker.py | 35 ++++++++++------------------ pympipool/mpi/mpibroker.py | 39 +++++++++++--------------------- pympipool/shared/executorbase.py | 26 ++++++++++++++------- tests/test_meta.py | 12 +++++----- 4 files changed, 49 insertions(+), 63 deletions(-) diff --git a/pympipool/flux/fluxbroker.py b/pympipool/flux/fluxbroker.py index ac96332f..330f7c5c 100644 --- a/pympipool/flux/fluxbroker.py +++ b/pympipool/flux/fluxbroker.py @@ -1,7 +1,6 @@ from pympipool.shared.executorbase import ( ExecutorBase, executor_broker, - get_executor_dict, ) from pympipool.shared.thread import RaisingThread from pympipool.flux.fluxtask import PyFluxSingleTaskExecutor @@ -37,13 +36,15 @@ def __init__( kwargs={ "future_queue": self._future_queue, "max_workers": max_workers, - "cores_per_worker": cores_per_worker, - "threads_per_core": threads_per_core, - "gpus_per_worker": gpus_per_worker, - "init_function": init_function, - "cwd": cwd, "sleep_interval": sleep_interval, - "executor": executor, + "executor_kwargs": { + "cores": cores_per_worker, + "threads_per_core": threads_per_core, + "gpus_per_task": int(gpus_per_worker / cores_per_worker), + "init_function": init_function, + "cwd": cwd, + "executor": executor, + }, }, ) self._process.start() @@ -52,25 +53,13 @@ def __init__( def _flux_executor_broker( future_queue, max_workers, - cores_per_worker=1, - threads_per_core=1, - gpus_per_worker=0, - init_function=None, - cwd=None, + executor_kwargs, sleep_interval=0.1, - executor=None, ): executor_broker( future_queue=future_queue, - meta_future_lst=get_executor_dict( - max_workers=max_workers, - executor_class=PyFluxSingleTaskExecutor, - cores=cores_per_worker, - threads_per_core=threads_per_core, - gpus_per_task=int(gpus_per_worker / cores_per_worker), - init_function=init_function, - cwd=cwd, - executor=executor, - ), + max_workers=max_workers, + executor_class=PyFluxSingleTaskExecutor, + executor_kwargs=executor_kwargs, sleep_interval=sleep_interval, ) diff --git a/pympipool/mpi/mpibroker.py b/pympipool/mpi/mpibroker.py index 76c26797..3a349085 100644 --- a/pympipool/mpi/mpibroker.py +++ b/pympipool/mpi/mpibroker.py @@ -1,7 +1,6 @@ from pympipool.shared.executorbase import ( ExecutorBase, executor_broker, - get_executor_dict, ) from pympipool.shared.thread import RaisingThread from pympipool.mpi.mpitask import PyMPISingleTaskExecutor @@ -50,14 +49,16 @@ def __init__( kwargs={ "future_queue": self._future_queue, "max_workers": max_workers, - "cores_per_worker": cores_per_worker, - "threads_per_core": threads_per_core, - "gpus_per_worker": gpus_per_worker, - "oversubscribe": oversubscribe, - "init_function": init_function, - "cwd": cwd, "sleep_interval": sleep_interval, - "enable_slurm_backend": enable_slurm_backend, + "executor_kwargs": { + "cores": cores_per_worker, + "threads_per_core": threads_per_core, + "gpus_per_task": int(gpus_per_worker / cores_per_worker), + "oversubscribe": oversubscribe, + "init_function": init_function, + "cwd": cwd, + "enable_slurm_backend": enable_slurm_backend, + }, }, ) self._process.start() @@ -66,27 +67,13 @@ def __init__( def _mpi_executor_broker( future_queue, max_workers, - cores_per_worker=1, - threads_per_core=1, - gpus_per_worker=0, - oversubscribe=False, - init_function=None, - cwd=None, + executor_kwargs, sleep_interval=0.1, - enable_slurm_backend=False, ): executor_broker( future_queue=future_queue, - meta_future_lst=get_executor_dict( - max_workers=max_workers, - executor_class=PyMPISingleTaskExecutor, - cores=cores_per_worker, - threads_per_core=threads_per_core, - gpus_per_task=int(gpus_per_worker / cores_per_worker), - oversubscribe=oversubscribe, - init_function=init_function, - cwd=cwd, - enable_slurm_backend=enable_slurm_backend, - ), + max_workers=max_workers, + executor_class=PyMPISingleTaskExecutor, + executor_kwargs=executor_kwargs, sleep_interval=sleep_interval, ) diff --git a/pympipool/shared/executorbase.py b/pympipool/shared/executorbase.py index 8f2aafc2..eb9d5c93 100644 --- a/pympipool/shared/executorbase.py +++ b/pympipool/shared/executorbase.py @@ -123,9 +123,16 @@ def execute_parallel_tasks_loop(interface, future_queue): def executor_broker( future_queue, - meta_future_lst, + max_workers, + executor_class, + executor_kwargs, sleep_interval=0.1, ): + meta_future_lst = _get_executor_dict( + max_workers=max_workers, + executor_class=executor_class, + executor_kwargs=executor_kwargs, + ) while True: try: task_dict = future_queue.get_nowait() @@ -154,10 +161,6 @@ def execute_task_dict(task_dict, meta_future_lst): raise ValueError("Unrecognized Task in task_dict: ", task_dict) -def _get_command_path(executable): - return os.path.abspath(os.path.join(__file__, "..", "..", "backend", executable)) - - def get_backend_path(cores): command_lst = [sys.executable] if cores > 1: @@ -167,11 +170,18 @@ def get_backend_path(cores): return command_lst -def get_executor_dict(max_workers, executor_class, **kwargs): - return {get_future_done(): executor_class(**kwargs) for _ in range(max_workers)} +def _get_command_path(executable): + return os.path.abspath(os.path.join(__file__, "..", "..", "backend", executable)) + + +def _get_executor_dict(max_workers, executor_class, executor_kwargs): + return { + _get_future_done(): executor_class(**executor_kwargs) + for _ in range(max_workers) + } -def get_future_done(): +def _get_future_done(): f = Future() f.set_result(True) return f diff --git a/tests/test_meta.py b/tests/test_meta.py index e11d13c1..70528f1a 100644 --- a/tests/test_meta.py +++ b/tests/test_meta.py @@ -3,8 +3,8 @@ import unittest from pympipool.shared.executorbase import ( execute_task_dict, - get_executor_dict, - get_future_done, + _get_executor_dict, + _get_future_done, ) from pympipool.mpi.mpitask import PyMPISingleTaskExecutor from pympipool.mpi.mpibroker import ( @@ -27,14 +27,14 @@ def mpi_funct(i): class TestFutureCreation(unittest.TestCase): def test_get_future_done(self): - f = get_future_done() + f = _get_future_done() self.assertTrue(isinstance(f, Future)) self.assertTrue(f.done()) class TestMetaExecutorFuture(unittest.TestCase): def test_meta_executor_future(self): - meta_future = get_executor_dict( + meta_future = _get_executor_dict( max_workers=1, executor_class=PyMPISingleTaskExecutor, ) @@ -47,7 +47,7 @@ def test_meta_executor_future(self): executor_obj.shutdown(wait=True) def test_execute_task_dict(self): - meta_future_lst = get_executor_dict( + meta_future_lst = _get_executor_dict( max_workers=1, executor_class=PyMPISingleTaskExecutor, ) @@ -68,7 +68,7 @@ def test_execute_task_dict(self): ) def test_execute_task_dict_error(self): - meta_future_lst = get_executor_dict( + meta_future_lst = _get_executor_dict( max_workers=1, executor_class=PyMPISingleTaskExecutor, ) From bbb6f955371013f5961dfee8e4929ae4a8fe198b Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Fri, 11 Aug 2023 17:49:16 -0600 Subject: [PATCH 2/7] fixes --- pympipool/mpi/mpibroker.py | 2 +- pympipool/shared/executorbase.py | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/pympipool/mpi/mpibroker.py b/pympipool/mpi/mpibroker.py index 3a349085..f765cc1c 100644 --- a/pympipool/mpi/mpibroker.py +++ b/pympipool/mpi/mpibroker.py @@ -74,6 +74,6 @@ def _mpi_executor_broker( future_queue=future_queue, max_workers=max_workers, executor_class=PyMPISingleTaskExecutor, - executor_kwargs=executor_kwargs, sleep_interval=sleep_interval, + **executor_kwargs, ) diff --git a/pympipool/shared/executorbase.py b/pympipool/shared/executorbase.py index eb9d5c93..057efaf1 100644 --- a/pympipool/shared/executorbase.py +++ b/pympipool/shared/executorbase.py @@ -125,13 +125,13 @@ def executor_broker( future_queue, max_workers, executor_class, - executor_kwargs, sleep_interval=0.1, + **kwargs, ): meta_future_lst = _get_executor_dict( max_workers=max_workers, executor_class=executor_class, - executor_kwargs=executor_kwargs, + **kwargs, ) while True: try: @@ -174,9 +174,9 @@ def _get_command_path(executable): return os.path.abspath(os.path.join(__file__, "..", "..", "backend", executable)) -def _get_executor_dict(max_workers, executor_class, executor_kwargs): +def _get_executor_dict(max_workers, executor_class, **kwargs): return { - _get_future_done(): executor_class(**executor_kwargs) + _get_future_done(): executor_class(**kwargs) for _ in range(max_workers) } From a12bfaa13ceff120972e4216a8271adde7b54f7c Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Fri, 11 Aug 2023 17:54:58 -0600 Subject: [PATCH 3/7] Remove executor specific broker functions --- pympipool/flux/fluxbroker.py | 35 +++++++++++----------------------- pympipool/mpi/mpibroker.py | 37 ++++++++++++------------------------ tests/test_meta.py | 8 +++----- 3 files changed, 26 insertions(+), 54 deletions(-) diff --git a/pympipool/flux/fluxbroker.py b/pympipool/flux/fluxbroker.py index 330f7c5c..a4145cf6 100644 --- a/pympipool/flux/fluxbroker.py +++ b/pympipool/flux/fluxbroker.py @@ -32,34 +32,21 @@ def __init__( ): super().__init__() self._process = RaisingThread( - target=_flux_executor_broker, + target=executor_broker, kwargs={ + # Broker Arguments "future_queue": self._future_queue, "max_workers": max_workers, "sleep_interval": sleep_interval, - "executor_kwargs": { - "cores": cores_per_worker, - "threads_per_core": threads_per_core, - "gpus_per_task": int(gpus_per_worker / cores_per_worker), - "init_function": init_function, - "cwd": cwd, - "executor": executor, - }, + "executor_class": PyFluxSingleTaskExecutor, + + # Executor Arguments + "cores": cores_per_worker, + "threads_per_core": threads_per_core, + "gpus_per_task": int(gpus_per_worker / cores_per_worker), + "init_function": init_function, + "cwd": cwd, + "executor": executor, }, ) self._process.start() - - -def _flux_executor_broker( - future_queue, - max_workers, - executor_kwargs, - sleep_interval=0.1, -): - executor_broker( - future_queue=future_queue, - max_workers=max_workers, - executor_class=PyFluxSingleTaskExecutor, - executor_kwargs=executor_kwargs, - sleep_interval=sleep_interval, - ) diff --git a/pympipool/mpi/mpibroker.py b/pympipool/mpi/mpibroker.py index f765cc1c..64943e82 100644 --- a/pympipool/mpi/mpibroker.py +++ b/pympipool/mpi/mpibroker.py @@ -45,35 +45,22 @@ def __init__( + "to manage GPUs use the SLURM queuing system enable_slurm_backend=True ." ) self._process = RaisingThread( - target=_mpi_executor_broker, + target=executor_broker, kwargs={ + # Broker Arguments "future_queue": self._future_queue, "max_workers": max_workers, "sleep_interval": sleep_interval, - "executor_kwargs": { - "cores": cores_per_worker, - "threads_per_core": threads_per_core, - "gpus_per_task": int(gpus_per_worker / cores_per_worker), - "oversubscribe": oversubscribe, - "init_function": init_function, - "cwd": cwd, - "enable_slurm_backend": enable_slurm_backend, - }, + "executor_class": PyMPISingleTaskExecutor, + + # Executor Arguments + "cores": cores_per_worker, + "threads_per_core": threads_per_core, + "gpus_per_task": int(gpus_per_worker / cores_per_worker), + "oversubscribe": oversubscribe, + "init_function": init_function, + "cwd": cwd, + "enable_slurm_backend": enable_slurm_backend, }, ) self._process.start() - - -def _mpi_executor_broker( - future_queue, - max_workers, - executor_kwargs, - sleep_interval=0.1, -): - executor_broker( - future_queue=future_queue, - max_workers=max_workers, - executor_class=PyMPISingleTaskExecutor, - sleep_interval=sleep_interval, - **executor_kwargs, - ) diff --git a/tests/test_meta.py b/tests/test_meta.py index 70528f1a..2981204d 100644 --- a/tests/test_meta.py +++ b/tests/test_meta.py @@ -2,15 +2,13 @@ from queue import Queue import unittest from pympipool.shared.executorbase import ( + executor_broker, execute_task_dict, _get_executor_dict, _get_future_done, ) from pympipool.mpi.mpitask import PyMPISingleTaskExecutor -from pympipool.mpi.mpibroker import ( - PyMPIExecutor, - _mpi_executor_broker, -) +from pympipool.mpi.mpibroker import PyMPIExecutor def calc(i): @@ -81,7 +79,7 @@ def test_executor_broker(self): f = Future() q.put({"fn": calc, "args": (1,), "kwargs": {}, "future": f}) q.put({"shutdown": True, "wait": True}) - _mpi_executor_broker(future_queue=q, max_workers=1) + executor_broker(future_queue=q, max_workers=1, executor_class=PyMPISingleTaskExecutor) self.assertTrue(f.done()) self.assertEqual(f.result(), 1) q.join() From 7359e50858a08aa4ceddc7769d6022cffa9d79c6 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Fri, 11 Aug 2023 17:55:41 -0600 Subject: [PATCH 4/7] black formatting --- pympipool/flux/fluxbroker.py | 1 - pympipool/mpi/mpibroker.py | 1 - pympipool/shared/executorbase.py | 5 +---- 3 files changed, 1 insertion(+), 6 deletions(-) diff --git a/pympipool/flux/fluxbroker.py b/pympipool/flux/fluxbroker.py index a4145cf6..b0edafd2 100644 --- a/pympipool/flux/fluxbroker.py +++ b/pympipool/flux/fluxbroker.py @@ -39,7 +39,6 @@ def __init__( "max_workers": max_workers, "sleep_interval": sleep_interval, "executor_class": PyFluxSingleTaskExecutor, - # Executor Arguments "cores": cores_per_worker, "threads_per_core": threads_per_core, diff --git a/pympipool/mpi/mpibroker.py b/pympipool/mpi/mpibroker.py index 64943e82..0b660a21 100644 --- a/pympipool/mpi/mpibroker.py +++ b/pympipool/mpi/mpibroker.py @@ -52,7 +52,6 @@ def __init__( "max_workers": max_workers, "sleep_interval": sleep_interval, "executor_class": PyMPISingleTaskExecutor, - # Executor Arguments "cores": cores_per_worker, "threads_per_core": threads_per_core, diff --git a/pympipool/shared/executorbase.py b/pympipool/shared/executorbase.py index 057efaf1..5e0276d0 100644 --- a/pympipool/shared/executorbase.py +++ b/pympipool/shared/executorbase.py @@ -175,10 +175,7 @@ def _get_command_path(executable): def _get_executor_dict(max_workers, executor_class, **kwargs): - return { - _get_future_done(): executor_class(**kwargs) - for _ in range(max_workers) - } + return {_get_future_done(): executor_class(**kwargs) for _ in range(max_workers)} def _get_future_done(): From ef62a88e915a551a7869c6c4cc981a7bbdd13dff Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Fri, 11 Aug 2023 18:10:42 -0600 Subject: [PATCH 5/7] Fix flux tests --- tests/test_flux.py | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/tests/test_flux.py b/tests/test_flux.py index 9ebd732c..c96e8b80 100644 --- a/tests/test_flux.py +++ b/tests/test_flux.py @@ -4,12 +4,12 @@ import numpy as np import unittest -from pympipool.shared.executorbase import cloudpickle_register +from pympipool.shared.executorbase import cloudpickle_register, executor_broker try: import flux.job - from pympipool.flux.fluxbroker import PyFluxExecutor, _flux_executor_broker + from pympipool.flux.fluxbroker import PyFluxExecutor from pympipool.flux.fluxtask import ( _flux_execute_parallel_tasks, PyFluxSingleTaskExecutor, @@ -119,7 +119,12 @@ def test_executor_broker(self): f = Future() q.put({"fn": calc, "args": (1,), "kwargs": {}, "future": f}) q.put({"shutdown": True, "wait": True}) - _flux_executor_broker(future_queue=q, max_workers=1, executor=self.executor) + executor_broker( + future_queue=q, + max_workers=1, + executor=self.executor, + executor_class=PyFluxSingleTaskExecutor, + ) self.assertTrue(f.done()) self.assertEqual(f.result(), 1) q.join() @@ -129,8 +134,12 @@ def test_executor_broker_threads(self): f = Future() q.put({"fn": calc, "args": (1,), "kwargs": {}, "future": f}) q.put({"shutdown": True, "wait": True}) - _flux_executor_broker( - future_queue=q, max_workers=1, threads_per_core=2, executor=self.executor + executor_broker( + future_queue=q, + max_workers=1, + threads_per_core=2, + executor=self.executor, + executor_class=PyFluxSingleTaskExecutor, ) self.assertTrue(f.done()) self.assertEqual(f.result(), 1) From f15ccf7e45921eaa645bfab591eb486fc3c2c59b Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Fri, 11 Aug 2023 19:03:49 -0600 Subject: [PATCH 6/7] remove _flux_execute_parallel_tasks() and _mpi_execute_parallel_tasks() --- pympipool/flux/fluxtask.py | 44 ++++-------------------------- pympipool/mpi/mpitask.py | 46 ++++---------------------------- pympipool/shared/executorbase.py | 27 ++++++++++++++++++- tests/test_flux.py | 22 +++++++++------ tests/test_worker.py | 16 ++++++----- tests/test_worker_memory.py | 7 ++--- 6 files changed, 64 insertions(+), 98 deletions(-) diff --git a/pympipool/flux/fluxtask.py b/pympipool/flux/fluxtask.py index 05fc3697..62d4b5c4 100644 --- a/pympipool/flux/fluxtask.py +++ b/pympipool/flux/fluxtask.py @@ -5,11 +5,9 @@ from pympipool.shared.executorbase import ( cloudpickle_register, ExecutorBase, - execute_parallel_tasks_loop, - get_backend_path, + execute_parallel_tasks, ) from pympipool.shared.interface import BaseInterface -from pympipool.shared.communication import interface_bootup from pympipool.shared.thread import RaisingThread @@ -61,10 +59,13 @@ def __init__( ): super().__init__() self._process = RaisingThread( - target=_flux_execute_parallel_tasks, + target=execute_parallel_tasks, kwargs={ + # Executor Arguments "future_queue": self._future_queue, "cores": cores, + "interface_class": FluxPythonInterface, + # Interface Arguments "threads_per_core": threads_per_core, "gpus_per_task": gpus_per_task, "cwd": cwd, @@ -129,38 +130,3 @@ def shutdown(self, wait=True): def poll(self): return self._future is not None and not self._future.done() - - -def _flux_execute_parallel_tasks( - future_queue, - cores, - threads_per_core=1, - gpus_per_task=0, - cwd=None, - executor=None, -): - """ - Execute a single tasks in parallel using the message passing interface (MPI). - - Args: - future_queue (queue.Queue): task queue of dictionary objects which are submitted to the parallel process - cores (int): defines the total number of MPI ranks to use - threads_per_core (int): number of OpenMP threads to be used for each function call - gpus_per_task (int): number of GPUs per MPI rank - defaults to 0 - cwd (str/None): current working directory where the parallel python task is executed - executor (flux.job.FluxExecutor/None): flux executor to submit tasks to - optional - """ - execute_parallel_tasks_loop( - interface=interface_bootup( - command_lst=get_backend_path(cores=cores), - connections=FluxPythonInterface( - cwd=cwd, - cores=cores, - threads_per_core=threads_per_core, - gpus_per_core=gpus_per_task, - oversubscribe=False, - executor=executor, - ), - ), - future_queue=future_queue, - ) diff --git a/pympipool/mpi/mpitask.py b/pympipool/mpi/mpitask.py index ec3acd4c..3f974009 100644 --- a/pympipool/mpi/mpitask.py +++ b/pympipool/mpi/mpitask.py @@ -1,11 +1,9 @@ from pympipool.shared.executorbase import ( cloudpickle_register, - execute_parallel_tasks_loop, + execute_parallel_tasks, ExecutorBase, - get_backend_path, ) from pympipool.shared.thread import RaisingThread -from pympipool.shared.communication import interface_bootup from pympipool.shared.interface import MpiExecInterface, SlurmSubprocessInterface @@ -59,10 +57,13 @@ def __init__( ): super().__init__() self._process = RaisingThread( - target=_mpi_execute_parallel_tasks, + target=execute_parallel_tasks, kwargs={ + # Executor Arguments "future_queue": self._future_queue, "cores": cores, + "interface_class": get_interface, + # Interface Arguments "threads_per_core": threads_per_core, "gpus_per_task": gpus_per_task, "cwd": cwd, @@ -78,43 +79,6 @@ def __init__( cloudpickle_register(ind=3) -def _mpi_execute_parallel_tasks( - future_queue, - cores, - threads_per_core=1, - gpus_per_task=0, - cwd=None, - oversubscribe=False, - enable_slurm_backend=False, -): - """ - Execute a single tasks in parallel using the message passing interface (MPI). - - Args: - future_queue (queue.Queue): task queue of dictionary objects which are submitted to the parallel process - cores (int): defines the total number of MPI ranks to use - threads_per_core (int): number of OpenMP threads to be used for each function call - gpus_per_task (int): number of GPUs per MPI rank - defaults to 0 - cwd (str/None): current working directory where the parallel python task is executed - oversubscribe (bool): enable of disable the oversubscribe feature of OpenMPI - defaults to False - enable_slurm_backend (bool): enable the SLURM queueing system as backend - defaults to False - """ - execute_parallel_tasks_loop( - interface=interface_bootup( - command_lst=get_backend_path(cores=cores), - connections=get_interface( - cores=cores, - threads_per_core=threads_per_core, - gpus_per_task=gpus_per_task, - cwd=cwd, - oversubscribe=oversubscribe, - enable_slurm_backend=enable_slurm_backend, - ), - ), - future_queue=future_queue, - ) - - def get_interface( cores=1, threads_per_core=1, diff --git a/pympipool/shared/executorbase.py b/pympipool/shared/executorbase.py index 5e0276d0..ae4b687b 100644 --- a/pympipool/shared/executorbase.py +++ b/pympipool/shared/executorbase.py @@ -11,6 +11,8 @@ import cloudpickle +from pympipool.shared.communication import interface_bootup + class ExecutorBase(FutureExecutor): def __init__(self): @@ -97,6 +99,29 @@ def cloudpickle_register(ind=2): pass +def execute_parallel_tasks( + future_queue, + cores, + interface_class, + **kwargs, +): + """ + Execute a single tasks in parallel using the message passing interface (MPI). + + Args: + future_queue (queue.Queue): task queue of dictionary objects which are submitted to the parallel process + cores (int): defines the total number of MPI ranks to use + interface_class: + """ + execute_parallel_tasks_loop( + interface=interface_bootup( + command_lst=_get_backend_path(cores=cores), + connections=interface_class(cores=cores, **kwargs), + ), + future_queue=future_queue, + ) + + def execute_parallel_tasks_loop(interface, future_queue): while True: task_dict = future_queue.get() @@ -161,7 +186,7 @@ def execute_task_dict(task_dict, meta_future_lst): raise ValueError("Unrecognized Task in task_dict: ", task_dict) -def get_backend_path(cores): +def _get_backend_path(cores): command_lst = [sys.executable] if cores > 1: command_lst += [_get_command_path(executable="mpiexec.py")] diff --git a/tests/test_flux.py b/tests/test_flux.py index c96e8b80..4cfdf5c9 100644 --- a/tests/test_flux.py +++ b/tests/test_flux.py @@ -4,16 +4,13 @@ import numpy as np import unittest -from pympipool.shared.executorbase import cloudpickle_register, executor_broker +from pympipool.shared.executorbase import cloudpickle_register, executor_broker, execute_parallel_tasks try: import flux.job from pympipool.flux.fluxbroker import PyFluxExecutor - from pympipool.flux.fluxtask import ( - _flux_execute_parallel_tasks, - PyFluxSingleTaskExecutor, - ) + from pympipool.flux.fluxtask import PyFluxSingleTaskExecutor, FluxPythonInterface skip_flux_test = False except ImportError: @@ -89,7 +86,12 @@ def test_execute_task(self): q.put({"fn": calc, "args": (), "kwargs": {"i": 2}, "future": f}) q.put({"shutdown": True, "wait": True}) cloudpickle_register(ind=1) - _flux_execute_parallel_tasks(future_queue=q, cores=1, executor=self.executor) + execute_parallel_tasks( + future_queue=q, + cores=1, + executor=self.executor, + interface_class=FluxPythonInterface, + ) self.assertEqual(f.result(), 2) q.join() @@ -99,8 +101,12 @@ def test_execute_task_threads(self): q.put({"fn": calc, "args": (), "kwargs": {"i": 2}, "future": f}) q.put({"shutdown": True, "wait": True}) cloudpickle_register(ind=1) - _flux_execute_parallel_tasks( - future_queue=q, cores=1, threads_per_core=1, executor=self.executor + execute_parallel_tasks( + future_queue=q, + cores=1, + threads_per_core=1, + executor=self.executor, + interface_class=FluxPythonInterface, ) self.assertEqual(f.result(), 2) q.join() diff --git a/tests/test_worker.py b/tests/test_worker.py index 546b5626..7a396701 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -3,8 +3,8 @@ from queue import Queue from time import sleep from concurrent.futures import CancelledError -from pympipool.mpi.mpitask import PyMPISingleTaskExecutor, _mpi_execute_parallel_tasks -from pympipool.shared.executorbase import cloudpickle_register +from pympipool.mpi.mpitask import PyMPISingleTaskExecutor, get_interface +from pympipool.shared.executorbase import cloudpickle_register, execute_parallel_tasks from concurrent.futures import Future @@ -103,10 +103,11 @@ def test_execute_task_failed_no_argument(self): q.put({"fn": calc, "args": (), "kwargs": {}, "future": f}) cloudpickle_register(ind=1) with self.assertRaises(TypeError): - _mpi_execute_parallel_tasks( + execute_parallel_tasks( future_queue=q, cores=1, oversubscribe=False, + interface_class=get_interface, ) q.join() @@ -116,10 +117,11 @@ def test_execute_task_failed_wrong_argument(self): q.put({"fn": calc, "args": (), "kwargs": {"j": 4}, "future": f}) cloudpickle_register(ind=1) with self.assertRaises(TypeError): - _mpi_execute_parallel_tasks( + execute_parallel_tasks( future_queue=q, cores=1, oversubscribe=False, + interface_class=get_interface, ) q.join() @@ -129,10 +131,11 @@ def test_execute_task(self): q.put({"fn": calc, "args": (), "kwargs": {"i": 2}, "future": f}) q.put({"shutdown": True, "wait": True}) cloudpickle_register(ind=1) - _mpi_execute_parallel_tasks( + execute_parallel_tasks( future_queue=q, cores=1, oversubscribe=False, + interface_class=get_interface, ) self.assertEqual(f.result(), np.array(4)) q.join() @@ -143,10 +146,11 @@ def test_execute_task_parallel(self): q.put({"fn": calc, "args": (), "kwargs": {"i": 2}, "future": f}) q.put({"shutdown": True, "wait": True}) cloudpickle_register(ind=1) - _mpi_execute_parallel_tasks( + execute_parallel_tasks( future_queue=q, cores=2, oversubscribe=False, + interface_class=get_interface, ) self.assertEqual(f.result(), [np.array(4), np.array(4)]) q.join() diff --git a/tests/test_worker_memory.py b/tests/test_worker_memory.py index f49ee1cf..3c668dff 100644 --- a/tests/test_worker_memory.py +++ b/tests/test_worker_memory.py @@ -2,8 +2,8 @@ import numpy as np from queue import Queue from pympipool.shared.backend import call_funct -from pympipool.shared.executorbase import cloudpickle_register -from pympipool.mpi.mpitask import PyMPISingleTaskExecutor, _mpi_execute_parallel_tasks +from pympipool.shared.executorbase import cloudpickle_register, execute_parallel_tasks +from pympipool.mpi.mpitask import PyMPISingleTaskExecutor, get_interface from concurrent.futures import Future @@ -39,10 +39,11 @@ def test_execute_task(self): q.put({"fn": get_global, "args": (), "kwargs": {}, "future": f}) q.put({"shutdown": True, "wait": True}) cloudpickle_register(ind=1) - _mpi_execute_parallel_tasks( + execute_parallel_tasks( future_queue=q, cores=1, oversubscribe=False, + interface_class=get_interface, ) self.assertEqual(f.result(), np.array([5])) q.join() From b1669dcf22cbbc267fa1af3638b2d769dd34b992 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Fri, 11 Aug 2023 19:07:04 -0600 Subject: [PATCH 7/7] Fix gpus_per_core and gpus_per_task --- pympipool/flux/fluxtask.py | 2 +- pympipool/mpi/mpitask.py | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/pympipool/flux/fluxtask.py b/pympipool/flux/fluxtask.py index 62d4b5c4..89925cea 100644 --- a/pympipool/flux/fluxtask.py +++ b/pympipool/flux/fluxtask.py @@ -67,7 +67,7 @@ def __init__( "interface_class": FluxPythonInterface, # Interface Arguments "threads_per_core": threads_per_core, - "gpus_per_task": gpus_per_task, + "gpus_per_core": gpus_per_task, "cwd": cwd, "executor": executor, }, diff --git a/pympipool/mpi/mpitask.py b/pympipool/mpi/mpitask.py index 3f974009..61834f19 100644 --- a/pympipool/mpi/mpitask.py +++ b/pympipool/mpi/mpitask.py @@ -65,7 +65,7 @@ def __init__( "interface_class": get_interface, # Interface Arguments "threads_per_core": threads_per_core, - "gpus_per_task": gpus_per_task, + "gpus_per_core": gpus_per_task, "cwd": cwd, "oversubscribe": oversubscribe, "enable_slurm_backend": enable_slurm_backend, @@ -82,7 +82,7 @@ def __init__( def get_interface( cores=1, threads_per_core=1, - gpus_per_task=0, + gpus_per_core=0, cwd=None, oversubscribe=False, enable_slurm_backend=False, @@ -92,7 +92,7 @@ def get_interface( cwd=cwd, cores=cores, threads_per_core=threads_per_core, - gpus_per_core=gpus_per_task, + gpus_per_core=gpus_per_core, oversubscribe=oversubscribe, ) else: @@ -100,6 +100,6 @@ def get_interface( cwd=cwd, cores=cores, threads_per_core=threads_per_core, - gpus_per_core=gpus_per_task, + gpus_per_core=gpus_per_core, oversubscribe=oversubscribe, )