From beaa54e8e9ba356c75ad1870eed91569ac12c675 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Fri, 11 Aug 2023 18:35:55 -0600 Subject: [PATCH 1/5] refactor --- pympipool/{flux/fluxtask.py => flux.py} | 90 +++++++++++--------- pympipool/flux/__init__.py | 1 - pympipool/flux/fluxbroker.py | 51 ------------ pympipool/{mpi/mpitask.py => mpi.py} | 106 +++++++++++++++--------- pympipool/mpi/__init__.py | 1 - pympipool/mpi/mpibroker.py | 65 --------------- pympipool/shared/executorbase.py | 30 ++++++- tests/test_flux.py | 23 +++-- tests/test_future.py | 2 +- tests/test_meta.py | 3 +- tests/test_task.py | 2 +- tests/test_worker.py | 16 ++-- tests/test_worker_memory.py | 7 +- 13 files changed, 176 insertions(+), 221 deletions(-) rename pympipool/{flux/fluxtask.py => flux.py} (69%) delete mode 100644 pympipool/flux/__init__.py delete mode 100644 pympipool/flux/fluxbroker.py rename pympipool/{mpi/mpitask.py => mpi.py} (57%) delete mode 100644 pympipool/mpi/__init__.py delete mode 100644 pympipool/mpi/mpibroker.py diff --git a/pympipool/flux/fluxtask.py b/pympipool/flux.py similarity index 69% rename from pympipool/flux/fluxtask.py rename to pympipool/flux.py index 05fc3697..6565c868 100644 --- a/pympipool/flux/fluxtask.py +++ b/pympipool/flux.py @@ -5,14 +5,58 @@ from pympipool.shared.executorbase import ( cloudpickle_register, ExecutorBase, - execute_parallel_tasks_loop, - get_backend_path, + execute_parallel_tasks, + executor_broker, ) from pympipool.shared.interface import BaseInterface -from pympipool.shared.communication import interface_bootup from pympipool.shared.thread import RaisingThread +class PyFluxExecutor(ExecutorBase): + """ + Args: + max_workers (int): defines the number workers which can execute functions in parallel + cores_per_worker (int): number of MPI cores to be used for each function call + threads_per_core (int): number of OpenMP threads to be used for each function call + gpus_per_worker (int): number of GPUs per worker - defaults to 0 + init_function (None): optional function to preset arguments for functions which are submitted later + cwd (str/None): current working directory where the parallel python task is executed + sleep_interval (float): synchronization interval - default 0.1 + executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux + """ + + def __init__( + self, + max_workers, + cores_per_worker=1, + threads_per_core=1, + gpus_per_worker=0, + init_function=None, + cwd=None, + sleep_interval=0.1, + executor=None, + ): + super().__init__() + self._process = RaisingThread( + target=executor_broker, + kwargs={ + # Broker Arguments + "future_queue": self._future_queue, + "max_workers": max_workers, + "sleep_interval": sleep_interval, + "executor_class": PyFluxSingleTaskExecutor, + # Executor Arguments + "cores": cores_per_worker, + "threads_per_core": threads_per_core, + "gpus_per_task": int(gpus_per_worker / cores_per_worker), + "init_function": init_function, + "cwd": cwd, + "executor": executor, + }, + ) + self._process.start() + + class PyFluxSingleTaskExecutor(ExecutorBase): """ The pympipool.Executor behaves like the concurrent.futures.Executor but it uses mpi4py to execute parallel tasks. @@ -61,10 +105,13 @@ def __init__( ): super().__init__() self._process = RaisingThread( - target=_flux_execute_parallel_tasks, + target=execute_parallel_tasks, kwargs={ + # Executor Arguments "future_queue": self._future_queue, "cores": cores, + "interface_class": FluxPythonInterface, + # Interface Arguments "threads_per_core": threads_per_core, "gpus_per_task": gpus_per_task, "cwd": cwd, @@ -129,38 +176,3 @@ def shutdown(self, wait=True): def poll(self): return self._future is not None and not self._future.done() - - -def _flux_execute_parallel_tasks( - future_queue, - cores, - threads_per_core=1, - gpus_per_task=0, - cwd=None, - executor=None, -): - """ - Execute a single tasks in parallel using the message passing interface (MPI). - - Args: - future_queue (queue.Queue): task queue of dictionary objects which are submitted to the parallel process - cores (int): defines the total number of MPI ranks to use - threads_per_core (int): number of OpenMP threads to be used for each function call - gpus_per_task (int): number of GPUs per MPI rank - defaults to 0 - cwd (str/None): current working directory where the parallel python task is executed - executor (flux.job.FluxExecutor/None): flux executor to submit tasks to - optional - """ - execute_parallel_tasks_loop( - interface=interface_bootup( - command_lst=get_backend_path(cores=cores), - connections=FluxPythonInterface( - cwd=cwd, - cores=cores, - threads_per_core=threads_per_core, - gpus_per_core=gpus_per_task, - oversubscribe=False, - executor=executor, - ), - ), - future_queue=future_queue, - ) diff --git a/pympipool/flux/__init__.py b/pympipool/flux/__init__.py deleted file mode 100644 index 26f0a677..00000000 --- a/pympipool/flux/__init__.py +++ /dev/null @@ -1 +0,0 @@ -from pympipool.flux.fluxbroker import PyFluxExecutor diff --git a/pympipool/flux/fluxbroker.py b/pympipool/flux/fluxbroker.py deleted file mode 100644 index b0edafd2..00000000 --- a/pympipool/flux/fluxbroker.py +++ /dev/null @@ -1,51 +0,0 @@ -from pympipool.shared.executorbase import ( - ExecutorBase, - executor_broker, -) -from pympipool.shared.thread import RaisingThread -from pympipool.flux.fluxtask import PyFluxSingleTaskExecutor - - -class PyFluxExecutor(ExecutorBase): - """ - Args: - max_workers (int): defines the number workers which can execute functions in parallel - cores_per_worker (int): number of MPI cores to be used for each function call - threads_per_core (int): number of OpenMP threads to be used for each function call - gpus_per_worker (int): number of GPUs per worker - defaults to 0 - init_function (None): optional function to preset arguments for functions which are submitted later - cwd (str/None): current working directory where the parallel python task is executed - sleep_interval (float): synchronization interval - default 0.1 - executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux - """ - - def __init__( - self, - max_workers, - cores_per_worker=1, - threads_per_core=1, - gpus_per_worker=0, - init_function=None, - cwd=None, - sleep_interval=0.1, - executor=None, - ): - super().__init__() - self._process = RaisingThread( - target=executor_broker, - kwargs={ - # Broker Arguments - "future_queue": self._future_queue, - "max_workers": max_workers, - "sleep_interval": sleep_interval, - "executor_class": PyFluxSingleTaskExecutor, - # Executor Arguments - "cores": cores_per_worker, - "threads_per_core": threads_per_core, - "gpus_per_task": int(gpus_per_worker / cores_per_worker), - "init_function": init_function, - "cwd": cwd, - "executor": executor, - }, - ) - self._process.start() diff --git a/pympipool/mpi/mpitask.py b/pympipool/mpi.py similarity index 57% rename from pympipool/mpi/mpitask.py rename to pympipool/mpi.py index ec3acd4c..5836c316 100644 --- a/pympipool/mpi/mpitask.py +++ b/pympipool/mpi.py @@ -1,14 +1,72 @@ from pympipool.shared.executorbase import ( cloudpickle_register, - execute_parallel_tasks_loop, + execute_parallel_tasks, ExecutorBase, - get_backend_path, + executor_broker, ) from pympipool.shared.thread import RaisingThread -from pympipool.shared.communication import interface_bootup from pympipool.shared.interface import MpiExecInterface, SlurmSubprocessInterface +class PyMPIExecutor(ExecutorBase): + """ + Args: + max_workers (int): defines the number workers which can execute functions in parallel + cores_per_worker (int): number of MPI cores to be used for each function call + threads_per_core (int): number of OpenMP threads to be used for each function call + gpus_per_worker (int): number of GPUs per worker - defaults to 0 + oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI only) - default False + init_function (None): optional function to preset arguments for functions which are submitted later + cwd (str/None): current working directory where the parallel python task is executed + sleep_interval (float): synchronization interval - default 0.1 + enable_slurm_backend (bool): enable the SLURM queueing system as backend - defaults to False + """ + + def __init__( + self, + max_workers, + cores_per_worker=1, + threads_per_core=1, + gpus_per_worker=0, + oversubscribe=False, + init_function=None, + cwd=None, + sleep_interval=0.1, + enable_slurm_backend=False, + ): + super().__init__() + if not enable_slurm_backend: + if threads_per_core != 1: + raise ValueError( + "The MPI backend only supports threads_per_core=1, " + + "to manage threads use the SLURM queuing system enable_slurm_backend=True ." + ) + elif gpus_per_worker != 0: + raise ValueError( + "The MPI backend only supports gpus_per_core=0, " + + "to manage GPUs use the SLURM queuing system enable_slurm_backend=True ." + ) + self._process = RaisingThread( + target=executor_broker, + kwargs={ + # Broker Arguments + "future_queue": self._future_queue, + "max_workers": max_workers, + "sleep_interval": sleep_interval, + "executor_class": PyMPISingleTaskExecutor, + # Executor Arguments + "cores": cores_per_worker, + "threads_per_core": threads_per_core, + "gpus_per_task": int(gpus_per_worker / cores_per_worker), + "oversubscribe": oversubscribe, + "init_function": init_function, + "cwd": cwd, + "enable_slurm_backend": enable_slurm_backend, + }, + ) + self._process.start() + + class PyMPISingleTaskExecutor(ExecutorBase): """ The pympipool.Executor behaves like the concurrent.futures.Executor but it uses mpi4py to execute parallel tasks. @@ -59,10 +117,13 @@ def __init__( ): super().__init__() self._process = RaisingThread( - target=_mpi_execute_parallel_tasks, + target=execute_parallel_tasks, kwargs={ + # Executor Arguments "future_queue": self._future_queue, "cores": cores, + "interface_class": get_interface, + # Interface Arguments "threads_per_core": threads_per_core, "gpus_per_task": gpus_per_task, "cwd": cwd, @@ -78,43 +139,6 @@ def __init__( cloudpickle_register(ind=3) -def _mpi_execute_parallel_tasks( - future_queue, - cores, - threads_per_core=1, - gpus_per_task=0, - cwd=None, - oversubscribe=False, - enable_slurm_backend=False, -): - """ - Execute a single tasks in parallel using the message passing interface (MPI). - - Args: - future_queue (queue.Queue): task queue of dictionary objects which are submitted to the parallel process - cores (int): defines the total number of MPI ranks to use - threads_per_core (int): number of OpenMP threads to be used for each function call - gpus_per_task (int): number of GPUs per MPI rank - defaults to 0 - cwd (str/None): current working directory where the parallel python task is executed - oversubscribe (bool): enable of disable the oversubscribe feature of OpenMPI - defaults to False - enable_slurm_backend (bool): enable the SLURM queueing system as backend - defaults to False - """ - execute_parallel_tasks_loop( - interface=interface_bootup( - command_lst=get_backend_path(cores=cores), - connections=get_interface( - cores=cores, - threads_per_core=threads_per_core, - gpus_per_task=gpus_per_task, - cwd=cwd, - oversubscribe=oversubscribe, - enable_slurm_backend=enable_slurm_backend, - ), - ), - future_queue=future_queue, - ) - - def get_interface( cores=1, threads_per_core=1, diff --git a/pympipool/mpi/__init__.py b/pympipool/mpi/__init__.py deleted file mode 100644 index 5d4bea6a..00000000 --- a/pympipool/mpi/__init__.py +++ /dev/null @@ -1 +0,0 @@ -from pympipool.mpi.mpibroker import PyMPIExecutor diff --git a/pympipool/mpi/mpibroker.py b/pympipool/mpi/mpibroker.py deleted file mode 100644 index 0b660a21..00000000 --- a/pympipool/mpi/mpibroker.py +++ /dev/null @@ -1,65 +0,0 @@ -from pympipool.shared.executorbase import ( - ExecutorBase, - executor_broker, -) -from pympipool.shared.thread import RaisingThread -from pympipool.mpi.mpitask import PyMPISingleTaskExecutor - - -class PyMPIExecutor(ExecutorBase): - """ - Args: - max_workers (int): defines the number workers which can execute functions in parallel - cores_per_worker (int): number of MPI cores to be used for each function call - threads_per_core (int): number of OpenMP threads to be used for each function call - gpus_per_worker (int): number of GPUs per worker - defaults to 0 - oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI only) - default False - init_function (None): optional function to preset arguments for functions which are submitted later - cwd (str/None): current working directory where the parallel python task is executed - sleep_interval (float): synchronization interval - default 0.1 - enable_slurm_backend (bool): enable the SLURM queueing system as backend - defaults to False - """ - - def __init__( - self, - max_workers, - cores_per_worker=1, - threads_per_core=1, - gpus_per_worker=0, - oversubscribe=False, - init_function=None, - cwd=None, - sleep_interval=0.1, - enable_slurm_backend=False, - ): - super().__init__() - if not enable_slurm_backend: - if threads_per_core != 1: - raise ValueError( - "The MPI backend only supports threads_per_core=1, " - + "to manage threads use the SLURM queuing system enable_slurm_backend=True ." - ) - elif gpus_per_worker != 0: - raise ValueError( - "The MPI backend only supports gpus_per_core=0, " - + "to manage GPUs use the SLURM queuing system enable_slurm_backend=True ." - ) - self._process = RaisingThread( - target=executor_broker, - kwargs={ - # Broker Arguments - "future_queue": self._future_queue, - "max_workers": max_workers, - "sleep_interval": sleep_interval, - "executor_class": PyMPISingleTaskExecutor, - # Executor Arguments - "cores": cores_per_worker, - "threads_per_core": threads_per_core, - "gpus_per_task": int(gpus_per_worker / cores_per_worker), - "oversubscribe": oversubscribe, - "init_function": init_function, - "cwd": cwd, - "enable_slurm_backend": enable_slurm_backend, - }, - ) - self._process.start() diff --git a/pympipool/shared/executorbase.py b/pympipool/shared/executorbase.py index 5e0276d0..dc188b2e 100644 --- a/pympipool/shared/executorbase.py +++ b/pympipool/shared/executorbase.py @@ -11,6 +11,8 @@ import cloudpickle +from pympipool.shared.communication import interface_bootup + class ExecutorBase(FutureExecutor): def __init__(self): @@ -97,6 +99,32 @@ def cloudpickle_register(ind=2): pass +def execute_parallel_tasks( + future_queue, + cores, + interface_class, + **kwargs, +): + """ + Execute a single tasks in parallel using the message passing interface (MPI). + + Args: + future_queue (queue.Queue): task queue of dictionary objects which are submitted to the parallel process + cores (int): defines the total number of MPI ranks to use + interface_class: + """ + execute_parallel_tasks_loop( + interface=interface_bootup( + command_lst=_get_backend_path(cores=cores), + connections=interface_class( + cores=cores, + **kwargs + ), + ), + future_queue=future_queue, + ) + + def execute_parallel_tasks_loop(interface, future_queue): while True: task_dict = future_queue.get() @@ -161,7 +189,7 @@ def execute_task_dict(task_dict, meta_future_lst): raise ValueError("Unrecognized Task in task_dict: ", task_dict) -def get_backend_path(cores): +def _get_backend_path(cores): command_lst = [sys.executable] if cores > 1: command_lst += [_get_command_path(executable="mpiexec.py")] diff --git a/tests/test_flux.py b/tests/test_flux.py index c96e8b80..544e6e64 100644 --- a/tests/test_flux.py +++ b/tests/test_flux.py @@ -4,16 +4,12 @@ import numpy as np import unittest -from pympipool.shared.executorbase import cloudpickle_register, executor_broker +from pympipool.shared.executorbase import cloudpickle_register, executor_broker, execute_parallel_tasks try: import flux.job - from pympipool.flux.fluxbroker import PyFluxExecutor - from pympipool.flux.fluxtask import ( - _flux_execute_parallel_tasks, - PyFluxSingleTaskExecutor, - ) + from pympipool.flux import PyFluxExecutor, PyFluxSingleTaskExecutor, FluxPythonInterface skip_flux_test = False except ImportError: @@ -89,7 +85,12 @@ def test_execute_task(self): q.put({"fn": calc, "args": (), "kwargs": {"i": 2}, "future": f}) q.put({"shutdown": True, "wait": True}) cloudpickle_register(ind=1) - _flux_execute_parallel_tasks(future_queue=q, cores=1, executor=self.executor) + execute_parallel_tasks( + future_queue=q, + cores=1, + interface_class=FluxPythonInterface, + executor=self.executor, + ) self.assertEqual(f.result(), 2) q.join() @@ -99,8 +100,12 @@ def test_execute_task_threads(self): q.put({"fn": calc, "args": (), "kwargs": {"i": 2}, "future": f}) q.put({"shutdown": True, "wait": True}) cloudpickle_register(ind=1) - _flux_execute_parallel_tasks( - future_queue=q, cores=1, threads_per_core=1, executor=self.executor + execute_parallel_tasks( + future_queue=q, + cores=1, + threads_per_core=1, + interface_class=FluxPythonInterface, + executor=self.executor, ) self.assertEqual(f.result(), 2) q.join() diff --git a/tests/test_future.py b/tests/test_future.py index cd96441e..9a2723fc 100644 --- a/tests/test_future.py +++ b/tests/test_future.py @@ -1,7 +1,7 @@ import numpy as np import unittest from time import sleep -from pympipool.mpi.mpitask import PyMPISingleTaskExecutor +from pympipool.mpi import PyMPISingleTaskExecutor from concurrent.futures import Future diff --git a/tests/test_meta.py b/tests/test_meta.py index 2981204d..b3cd9da6 100644 --- a/tests/test_meta.py +++ b/tests/test_meta.py @@ -7,8 +7,7 @@ _get_executor_dict, _get_future_done, ) -from pympipool.mpi.mpitask import PyMPISingleTaskExecutor -from pympipool.mpi.mpibroker import PyMPIExecutor +from pympipool.mpi import PyMPISingleTaskExecutor, PyMPIExecutor def calc(i): diff --git a/tests/test_task.py b/tests/test_task.py index bf1e166b..62fbcc78 100644 --- a/tests/test_task.py +++ b/tests/test_task.py @@ -1,5 +1,5 @@ import unittest -from pympipool.mpi.mpitask import PyMPISingleTaskExecutor +from pympipool.mpi import PyMPISingleTaskExecutor def echo_funct(i): diff --git a/tests/test_worker.py b/tests/test_worker.py index 546b5626..bf30a03f 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -3,8 +3,8 @@ from queue import Queue from time import sleep from concurrent.futures import CancelledError -from pympipool.mpi.mpitask import PyMPISingleTaskExecutor, _mpi_execute_parallel_tasks -from pympipool.shared.executorbase import cloudpickle_register +from pympipool.mpi import PyMPISingleTaskExecutor, get_interface +from pympipool.shared.executorbase import cloudpickle_register, execute_parallel_tasks from concurrent.futures import Future @@ -103,10 +103,11 @@ def test_execute_task_failed_no_argument(self): q.put({"fn": calc, "args": (), "kwargs": {}, "future": f}) cloudpickle_register(ind=1) with self.assertRaises(TypeError): - _mpi_execute_parallel_tasks( + execute_parallel_tasks( future_queue=q, cores=1, oversubscribe=False, + interface_class=get_interface, ) q.join() @@ -116,10 +117,11 @@ def test_execute_task_failed_wrong_argument(self): q.put({"fn": calc, "args": (), "kwargs": {"j": 4}, "future": f}) cloudpickle_register(ind=1) with self.assertRaises(TypeError): - _mpi_execute_parallel_tasks( + execute_parallel_tasks( future_queue=q, cores=1, oversubscribe=False, + interface_class=get_interface, ) q.join() @@ -129,10 +131,11 @@ def test_execute_task(self): q.put({"fn": calc, "args": (), "kwargs": {"i": 2}, "future": f}) q.put({"shutdown": True, "wait": True}) cloudpickle_register(ind=1) - _mpi_execute_parallel_tasks( + execute_parallel_tasks( future_queue=q, cores=1, oversubscribe=False, + interface_class=get_interface, ) self.assertEqual(f.result(), np.array(4)) q.join() @@ -143,10 +146,11 @@ def test_execute_task_parallel(self): q.put({"fn": calc, "args": (), "kwargs": {"i": 2}, "future": f}) q.put({"shutdown": True, "wait": True}) cloudpickle_register(ind=1) - _mpi_execute_parallel_tasks( + execute_parallel_tasks( future_queue=q, cores=2, oversubscribe=False, + interface_class=get_interface, ) self.assertEqual(f.result(), [np.array(4), np.array(4)]) q.join() diff --git a/tests/test_worker_memory.py b/tests/test_worker_memory.py index f49ee1cf..9a03ae53 100644 --- a/tests/test_worker_memory.py +++ b/tests/test_worker_memory.py @@ -2,8 +2,8 @@ import numpy as np from queue import Queue from pympipool.shared.backend import call_funct -from pympipool.shared.executorbase import cloudpickle_register -from pympipool.mpi.mpitask import PyMPISingleTaskExecutor, _mpi_execute_parallel_tasks +from pympipool.shared.executorbase import cloudpickle_register, execute_parallel_tasks +from pympipool.mpi import PyMPISingleTaskExecutor, get_interface from concurrent.futures import Future @@ -39,10 +39,11 @@ def test_execute_task(self): q.put({"fn": get_global, "args": (), "kwargs": {}, "future": f}) q.put({"shutdown": True, "wait": True}) cloudpickle_register(ind=1) - _mpi_execute_parallel_tasks( + execute_parallel_tasks( future_queue=q, cores=1, oversubscribe=False, + interface_class=get_interface, ) self.assertEqual(f.result(), np.array([5])) q.join() From 08ea7bad8238dab14ecfeb2880668b4c0ca5bb6d Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Fri, 11 Aug 2023 18:39:40 -0600 Subject: [PATCH 2/5] rename flux to flux_executor and mpi to mpi_executor --- pympipool/{flux.py => flux_executor.py} | 0 pympipool/{mpi.py => mpi_executor.py} | 0 tests/test_flux.py | 2 +- tests/test_future.py | 2 +- tests/test_meta.py | 2 +- tests/test_task.py | 2 +- tests/test_worker.py | 2 +- tests/test_worker_memory.py | 2 +- 8 files changed, 6 insertions(+), 6 deletions(-) rename pympipool/{flux.py => flux_executor.py} (100%) rename pympipool/{mpi.py => mpi_executor.py} (100%) diff --git a/pympipool/flux.py b/pympipool/flux_executor.py similarity index 100% rename from pympipool/flux.py rename to pympipool/flux_executor.py diff --git a/pympipool/mpi.py b/pympipool/mpi_executor.py similarity index 100% rename from pympipool/mpi.py rename to pympipool/mpi_executor.py diff --git a/tests/test_flux.py b/tests/test_flux.py index 544e6e64..705db0fd 100644 --- a/tests/test_flux.py +++ b/tests/test_flux.py @@ -9,7 +9,7 @@ try: import flux.job - from pympipool.flux import PyFluxExecutor, PyFluxSingleTaskExecutor, FluxPythonInterface + from pympipool.flux_executor import PyFluxExecutor, PyFluxSingleTaskExecutor, FluxPythonInterface skip_flux_test = False except ImportError: diff --git a/tests/test_future.py b/tests/test_future.py index 9a2723fc..d363306b 100644 --- a/tests/test_future.py +++ b/tests/test_future.py @@ -1,7 +1,7 @@ import numpy as np import unittest from time import sleep -from pympipool.mpi import PyMPISingleTaskExecutor +from pympipool.mpi_executor import PyMPISingleTaskExecutor from concurrent.futures import Future diff --git a/tests/test_meta.py b/tests/test_meta.py index b3cd9da6..c5508657 100644 --- a/tests/test_meta.py +++ b/tests/test_meta.py @@ -7,7 +7,7 @@ _get_executor_dict, _get_future_done, ) -from pympipool.mpi import PyMPISingleTaskExecutor, PyMPIExecutor +from pympipool.mpi_executor import PyMPISingleTaskExecutor, PyMPIExecutor def calc(i): diff --git a/tests/test_task.py b/tests/test_task.py index 62fbcc78..e9a0168c 100644 --- a/tests/test_task.py +++ b/tests/test_task.py @@ -1,5 +1,5 @@ import unittest -from pympipool.mpi import PyMPISingleTaskExecutor +from pympipool.mpi_executor import PyMPISingleTaskExecutor def echo_funct(i): diff --git a/tests/test_worker.py b/tests/test_worker.py index bf30a03f..ddedc3ef 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -3,7 +3,7 @@ from queue import Queue from time import sleep from concurrent.futures import CancelledError -from pympipool.mpi import PyMPISingleTaskExecutor, get_interface +from pympipool.mpi_executor import PyMPISingleTaskExecutor, get_interface from pympipool.shared.executorbase import cloudpickle_register, execute_parallel_tasks from concurrent.futures import Future diff --git a/tests/test_worker_memory.py b/tests/test_worker_memory.py index 9a03ae53..07781ac1 100644 --- a/tests/test_worker_memory.py +++ b/tests/test_worker_memory.py @@ -3,7 +3,7 @@ from queue import Queue from pympipool.shared.backend import call_funct from pympipool.shared.executorbase import cloudpickle_register, execute_parallel_tasks -from pympipool.mpi import PyMPISingleTaskExecutor, get_interface +from pympipool.mpi_executor import PyMPISingleTaskExecutor, get_interface from concurrent.futures import Future From 8a238798e141e4fb32d49320718bf4b7d7e71a02 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Fri, 11 Aug 2023 18:40:27 -0600 Subject: [PATCH 3/5] black formatting --- pympipool/shared/executorbase.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/pympipool/shared/executorbase.py b/pympipool/shared/executorbase.py index dc188b2e..ae4b687b 100644 --- a/pympipool/shared/executorbase.py +++ b/pympipool/shared/executorbase.py @@ -116,10 +116,7 @@ def execute_parallel_tasks( execute_parallel_tasks_loop( interface=interface_bootup( command_lst=_get_backend_path(cores=cores), - connections=interface_class( - cores=cores, - **kwargs - ), + connections=interface_class(cores=cores, **kwargs), ), future_queue=future_queue, ) From 06913170a644909bdaece49468b1a8aca56d6ef5 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Fri, 11 Aug 2023 18:42:38 -0600 Subject: [PATCH 4/5] update code examples --- pympipool/flux_executor.py | 2 +- pympipool/mpi_executor.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pympipool/flux_executor.py b/pympipool/flux_executor.py index 6565c868..c8c10e02 100644 --- a/pympipool/flux_executor.py +++ b/pympipool/flux_executor.py @@ -75,7 +75,7 @@ class PyFluxSingleTaskExecutor(ExecutorBase): Examples: ``` >>> import numpy as np - >>> from pympipool.flux.fluxtask import PyFluxSingleTaskExecutor + >>> from pympipool.flux_executor import PyFluxSingleTaskExecutor >>> >>> def calc(i, j, k): >>> from mpi4py import MPI diff --git a/pympipool/mpi_executor.py b/pympipool/mpi_executor.py index 5836c316..e69f62e4 100644 --- a/pympipool/mpi_executor.py +++ b/pympipool/mpi_executor.py @@ -87,7 +87,7 @@ class PyMPISingleTaskExecutor(ExecutorBase): Examples: ``` >>> import numpy as np - >>> from pympipool.mpi.mpitask import PyMPISingleTaskExecutor + >>> from pympipool.mpi_executor import PyMPISingleTaskExecutor >>> >>> def calc(i, j, k): >>> from mpi4py import MPI From ee3a144f1c1aa6588a1681daf9d8a621f2ccd107 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Fri, 11 Aug 2023 19:08:18 -0600 Subject: [PATCH 5/5] Fix confusion with gpus_per_core and gpus_per_task --- pympipool/flux_executor.py | 2 +- pympipool/mpi_executor.py | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/pympipool/flux_executor.py b/pympipool/flux_executor.py index c8c10e02..bb350740 100644 --- a/pympipool/flux_executor.py +++ b/pympipool/flux_executor.py @@ -113,7 +113,7 @@ def __init__( "interface_class": FluxPythonInterface, # Interface Arguments "threads_per_core": threads_per_core, - "gpus_per_task": gpus_per_task, + "gpus_per_core": gpus_per_task, "cwd": cwd, "executor": executor, }, diff --git a/pympipool/mpi_executor.py b/pympipool/mpi_executor.py index e69f62e4..17864395 100644 --- a/pympipool/mpi_executor.py +++ b/pympipool/mpi_executor.py @@ -125,7 +125,7 @@ def __init__( "interface_class": get_interface, # Interface Arguments "threads_per_core": threads_per_core, - "gpus_per_task": gpus_per_task, + "gpus_per_core": gpus_per_task, "cwd": cwd, "oversubscribe": oversubscribe, "enable_slurm_backend": enable_slurm_backend, @@ -142,7 +142,7 @@ def __init__( def get_interface( cores=1, threads_per_core=1, - gpus_per_task=0, + gpus_per_core=0, cwd=None, oversubscribe=False, enable_slurm_backend=False, @@ -152,7 +152,7 @@ def get_interface( cwd=cwd, cores=cores, threads_per_core=threads_per_core, - gpus_per_core=gpus_per_task, + gpus_per_core=gpus_per_core, oversubscribe=oversubscribe, ) else: @@ -160,6 +160,6 @@ def get_interface( cwd=cwd, cores=cores, threads_per_core=threads_per_core, - gpus_per_core=gpus_per_task, + gpus_per_core=gpus_per_core, oversubscribe=oversubscribe, )