From 6d265452f076706001e937bb56e4ac6d2a1f4eea Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Tue, 31 Oct 2023 11:26:48 +0100 Subject: [PATCH 1/8] Split Slurm and MPI interface --- pympipool/mpi/executor.py | 59 +---------------- pympipool/shared/interface.py | 31 +++++---- pympipool/slurm/__init__.py | 1 + pympipool/slurm/executor.py | 119 ++++++++++++++++++++++++++++++++++ 4 files changed, 141 insertions(+), 69 deletions(-) create mode 100644 pympipool/slurm/__init__.py create mode 100644 pympipool/slurm/executor.py diff --git a/pympipool/mpi/executor.py b/pympipool/mpi/executor.py index 9aac7289..5d137789 100644 --- a/pympipool/mpi/executor.py +++ b/pympipool/mpi/executor.py @@ -4,7 +4,7 @@ ExecutorBase, executor_broker, ) -from pympipool.shared.interface import MpiExecInterface, SlurmSubprocessInterface +from pympipool.shared.interface import MpiExecInterface from pympipool.shared.thread import RaisingThread @@ -13,39 +13,22 @@ class PyMPIExecutor(ExecutorBase): Args: max_workers (int): defines the number workers which can execute functions in parallel cores_per_worker (int): number of MPI cores to be used for each function call - threads_per_core (int): number of OpenMP threads to be used for each function call - gpus_per_worker (int): number of GPUs per worker - defaults to 0 oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI only) - default False init_function (None): optional function to preset arguments for functions which are submitted later cwd (str/None): current working directory where the parallel python task is executed sleep_interval (float): synchronization interval - default 0.1 - enable_slurm_backend (bool): enable the SLURM queueing system as backend - defaults to False """ def __init__( self, max_workers, cores_per_worker=1, - threads_per_core=1, - gpus_per_worker=0, oversubscribe=False, init_function=None, cwd=None, sleep_interval=0.1, - enable_slurm_backend=False, ): super().__init__() - if not enable_slurm_backend: - if threads_per_core != 1: - raise ValueError( - "The MPI backend only supports threads_per_core=1, " - + "to manage threads use the SLURM queuing system enable_slurm_backend=True ." - ) - elif gpus_per_worker != 0: - raise ValueError( - "The MPI backend only supports gpus_per_core=0, " - + "to manage GPUs use the SLURM queuing system enable_slurm_backend=True ." - ) self._process = RaisingThread( target=executor_broker, kwargs={ @@ -56,12 +39,9 @@ def __init__( "executor_class": PyMPISingleTaskExecutor, # Executor Arguments "cores": cores_per_worker, - "threads_per_core": threads_per_core, - "gpus_per_task": int(gpus_per_worker / cores_per_worker), "oversubscribe": oversubscribe, "init_function": init_function, "cwd": cwd, - "enable_slurm_backend": enable_slurm_backend, }, ) self._process.start() @@ -77,12 +57,9 @@ class PyMPISingleTaskExecutor(ExecutorBase): Args: cores (int): defines the number of MPI ranks to use for each function call - threads_per_core (int): number of OpenMP threads to be used for each function call - gpus_per_task (int): number of GPUs per MPI rank - defaults to 0 oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI only) - default False init_function (None): optional function to preset arguments for functions which are submitted later cwd (str/None): current working directory where the parallel python task is executed - enable_slurm_backend (bool): enable the SLURM queueing system as backend - defaults to False Examples: ``` @@ -108,12 +85,9 @@ class PyMPISingleTaskExecutor(ExecutorBase): def __init__( self, cores=1, - threads_per_core=1, - gpus_per_task=0, oversubscribe=False, init_function=None, cwd=None, - enable_slurm_backend=False, ): super().__init__() self._process = RaisingThread( @@ -122,41 +96,12 @@ def __init__( # Executor Arguments "future_queue": self._future_queue, "cores": cores, - "interface_class": get_interface, + "interface_class": MpiExecInterface, # Interface Arguments - "threads_per_core": threads_per_core, - "gpus_per_core": gpus_per_task, "cwd": cwd, "oversubscribe": oversubscribe, - "enable_slurm_backend": enable_slurm_backend, }, ) self._process.start() self._set_init_function(init_function=init_function) cloudpickle_register(ind=3) - - -def get_interface( - cores=1, - threads_per_core=1, - gpus_per_core=0, - cwd=None, - oversubscribe=False, - enable_slurm_backend=False, -): - if not enable_slurm_backend: - return MpiExecInterface( - cwd=cwd, - cores=cores, - threads_per_core=threads_per_core, - gpus_per_core=gpus_per_core, - oversubscribe=oversubscribe, - ) - else: - return SlurmSubprocessInterface( - cwd=cwd, - cores=cores, - threads_per_core=threads_per_core, - gpus_per_core=gpus_per_core, - oversubscribe=oversubscribe, - ) diff --git a/pympipool/shared/interface.py b/pympipool/shared/interface.py index 8885cb36..147ca578 100644 --- a/pympipool/shared/interface.py +++ b/pympipool/shared/interface.py @@ -4,12 +4,10 @@ class BaseInterface(ABC): def __init__( - self, cwd, cores=1, threads_per_core=1, gpus_per_core=0, oversubscribe=False + self, cwd, cores=1, oversubscribe=False ): self._cwd = cwd self._cores = cores - self._threads_per_core = threads_per_core - self._gpus_per_core = gpus_per_core self._oversubscribe = oversubscribe def bootup(self, command_lst): @@ -27,15 +25,11 @@ def __init__( self, cwd=None, cores=1, - threads_per_core=1, - gpus_per_core=0, oversubscribe=False, ): super().__init__( cwd=cwd, cores=cores, - threads_per_core=threads_per_core, - gpus_per_core=gpus_per_core, oversubscribe=oversubscribe, ) self._process = None @@ -63,7 +57,6 @@ class MpiExecInterface(SubprocessInterface): def generate_command(self, command_lst): command_prepend_lst = generate_mpiexec_command( cores=self._cores, - gpus_per_core=self._gpus_per_core, oversubscribe=self._oversubscribe, ) return super().generate_command( @@ -71,7 +64,23 @@ def generate_command(self, command_lst): ) -class SlurmSubprocessInterface(SubprocessInterface): +class SrunInterface(SubprocessInterface): + def __init__( + self, + cwd=None, + cores=1, + threads_per_core=1, + gpus_per_core=0, + oversubscribe=False, + ): + super().__init__( + cwd=cwd, + cores=cores, + oversubscribe=oversubscribe, + ) + self._threads_per_core = threads_per_core + self._gpus_per_core = gpus_per_core + def generate_command(self, command_lst): command_prepend_lst = generate_slurm_command( cores=self._cores, @@ -85,12 +94,10 @@ def generate_command(self, command_lst): ) -def generate_mpiexec_command(cores, gpus_per_core=0, oversubscribe=False): +def generate_mpiexec_command(cores, oversubscribe=False): command_prepend_lst = ["mpiexec", "-n", str(cores)] if oversubscribe: command_prepend_lst += ["--oversubscribe"] - if gpus_per_core > 0: - raise ValueError() return command_prepend_lst diff --git a/pympipool/slurm/__init__.py b/pympipool/slurm/__init__.py new file mode 100644 index 00000000..d9534d99 --- /dev/null +++ b/pympipool/slurm/__init__.py @@ -0,0 +1 @@ +from pympipool.slurm.executor import PySlurmExecutor \ No newline at end of file diff --git a/pympipool/slurm/executor.py b/pympipool/slurm/executor.py new file mode 100644 index 00000000..2adf0264 --- /dev/null +++ b/pympipool/slurm/executor.py @@ -0,0 +1,119 @@ +from pympipool.shared.executorbase import ( + cloudpickle_register, + execute_parallel_tasks, + ExecutorBase, + executor_broker, +) +from pympipool.shared.interface import SrunInterface +from pympipool.shared.thread import RaisingThread + + +class PySlurmExecutor(ExecutorBase): + """ + Args: + max_workers (int): defines the number workers which can execute functions in parallel + cores_per_worker (int): number of MPI cores to be used for each function call + threads_per_core (int): number of OpenMP threads to be used for each function call + gpus_per_worker (int): number of GPUs per worker - defaults to 0 + oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI only) - default False + init_function (None): optional function to preset arguments for functions which are submitted later + cwd (str/None): current working directory where the parallel python task is executed + sleep_interval (float): synchronization interval - default 0.1 + """ + + def __init__( + self, + max_workers, + cores_per_worker=1, + threads_per_core=1, + gpus_per_worker=0, + oversubscribe=False, + init_function=None, + cwd=None, + sleep_interval=0.1, + ): + super().__init__() + self._process = RaisingThread( + target=executor_broker, + kwargs={ + # Broker Arguments + "future_queue": self._future_queue, + "max_workers": max_workers, + "sleep_interval": sleep_interval, + "executor_class": PySlurmSingleTaskExecutor, + # Executor Arguments + "cores": cores_per_worker, + "threads_per_core": threads_per_core, + "gpus_per_task": int(gpus_per_worker / cores_per_worker), + "oversubscribe": oversubscribe, + "init_function": init_function, + "cwd": cwd, + }, + ) + self._process.start() + + +class PySlurmSingleTaskExecutor(ExecutorBase): + """ + The pympipool.Executor behaves like the concurrent.futures.Executor but it uses mpi4py to execute parallel tasks. + In contrast to the mpi4py.futures.MPIPoolExecutor the pympipool.Executor can be executed in a serial python process + and does not require the python script to be executed with MPI. Still internally the pympipool.Executor uses the + mpi4py.futures.MPIPoolExecutor, consequently it is primarily an abstraction of its functionality to improve the + usability in particular when used in combination with Jupyter notebooks. + + Args: + cores (int): defines the number of MPI ranks to use for each function call + threads_per_core (int): number of OpenMP threads to be used for each function call + gpus_per_task (int): number of GPUs per MPI rank - defaults to 0 + oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI only) - default False + init_function (None): optional function to preset arguments for functions which are submitted later + cwd (str/None): current working directory where the parallel python task is executed + + Examples: + ``` + >>> import numpy as np + >>> from pympipool.mpi.executor import PyMPISingleTaskExecutor + >>> + >>> def calc(i, j, k): + >>> from mpi4py import MPI + >>> size = MPI.COMM_WORLD.Get_size() + >>> rank = MPI.COMM_WORLD.Get_rank() + >>> return np.array([i, j, k]), size, rank + >>> + >>> def init_k(): + >>> return {"k": 3} + >>> + >>> with PyMPISingleTaskExecutor(cores=2, init_function=init_k) as p: + >>> fs = p.submit(calc, 2, j=4) + >>> print(fs.result()) + [(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)] + ``` + """ + + def __init__( + self, + cores=1, + threads_per_core=1, + gpus_per_task=0, + oversubscribe=False, + init_function=None, + cwd=None, + ): + super().__init__() + self._process = RaisingThread( + target=execute_parallel_tasks, + kwargs={ + # Executor Arguments + "future_queue": self._future_queue, + "cores": cores, + "interface_class": SrunInterface, + # Interface Arguments + "threads_per_core": threads_per_core, + "gpus_per_core": gpus_per_task, + "cwd": cwd, + "oversubscribe": oversubscribe, + }, + ) + self._process.start() + self._set_init_function(init_function=init_function) + cloudpickle_register(ind=3) From 8189887381b5b5b51e7c41ec072ec96f03a50ec6 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Tue, 31 Oct 2023 11:30:37 +0100 Subject: [PATCH 2/8] update tests --- tests/test_parse.py | 2 +- tests/test_worker.py | 10 +++++----- tests/test_worker_memory.py | 4 ++-- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/tests/test_parse.py b/tests/test_parse.py index 32123162..a94c048d 100644 --- a/tests/test_parse.py +++ b/tests/test_parse.py @@ -2,7 +2,7 @@ import sys import unittest from pympipool.shared.backend import parse_arguments -from pympipool.shared.interface import SlurmSubprocessInterface, MpiExecInterface +from pympipool.shared.interface import SrunInterface, MpiExecInterface class TestParser(unittest.TestCase): diff --git a/tests/test_worker.py b/tests/test_worker.py index 5bc26747..386fb16f 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -3,7 +3,7 @@ from queue import Queue from time import sleep from concurrent.futures import CancelledError -from pympipool.mpi.executor import PyMPISingleTaskExecutor, get_interface +from pympipool.mpi.executor import PyMPISingleTaskExecutor, MpiExecInterface from pympipool.shared.executorbase import cloudpickle_register, execute_parallel_tasks from concurrent.futures import Future @@ -107,7 +107,7 @@ def test_execute_task_failed_no_argument(self): future_queue=q, cores=1, oversubscribe=False, - interface_class=get_interface, + interface_class=MpiExecInterface, ) q.join() @@ -121,7 +121,7 @@ def test_execute_task_failed_wrong_argument(self): future_queue=q, cores=1, oversubscribe=False, - interface_class=get_interface, + interface_class=MpiExecInterface, ) q.join() @@ -135,7 +135,7 @@ def test_execute_task(self): future_queue=q, cores=1, oversubscribe=False, - interface_class=get_interface, + interface_class=MpiExecInterface, ) self.assertEqual(f.result(), np.array(4)) q.join() @@ -150,7 +150,7 @@ def test_execute_task_parallel(self): future_queue=q, cores=2, oversubscribe=False, - interface_class=get_interface, + interface_class=MpiExecInterface, ) self.assertEqual(f.result(), [np.array(4), np.array(4)]) q.join() diff --git a/tests/test_worker_memory.py b/tests/test_worker_memory.py index f8aaa7cb..2d4da738 100644 --- a/tests/test_worker_memory.py +++ b/tests/test_worker_memory.py @@ -3,7 +3,7 @@ from queue import Queue from pympipool.shared.backend import call_funct from pympipool.shared.executorbase import cloudpickle_register, execute_parallel_tasks -from pympipool.mpi.executor import PyMPISingleTaskExecutor, get_interface +from pympipool.mpi.executor import PyMPISingleTaskExecutor, MpiExecInterface from concurrent.futures import Future @@ -43,7 +43,7 @@ def test_execute_task(self): future_queue=q, cores=1, oversubscribe=False, - interface_class=get_interface, + interface_class=MpiExecInterface, ) self.assertEqual(f.result(), np.array([5])) q.join() From c6bf57659e40791435b41c9b73558135de6dd177 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Tue, 31 Oct 2023 11:31:09 +0100 Subject: [PATCH 3/8] black formatting --- pympipool/shared/interface.py | 4 +--- pympipool/slurm/__init__.py | 2 +- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/pympipool/shared/interface.py b/pympipool/shared/interface.py index 147ca578..54818c79 100644 --- a/pympipool/shared/interface.py +++ b/pympipool/shared/interface.py @@ -3,9 +3,7 @@ class BaseInterface(ABC): - def __init__( - self, cwd, cores=1, oversubscribe=False - ): + def __init__(self, cwd, cores=1, oversubscribe=False): self._cwd = cwd self._cores = cores self._oversubscribe = oversubscribe diff --git a/pympipool/slurm/__init__.py b/pympipool/slurm/__init__.py index d9534d99..ed4aeab3 100644 --- a/pympipool/slurm/__init__.py +++ b/pympipool/slurm/__init__.py @@ -1 +1 @@ -from pympipool.slurm.executor import PySlurmExecutor \ No newline at end of file +from pympipool.slurm.executor import PySlurmExecutor From cf737272571a3e37eb241ed0360cf5f79ced9aa1 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Tue, 31 Oct 2023 11:34:13 +0100 Subject: [PATCH 4/8] Fix import --- pympipool/shared/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pympipool/shared/__init__.py b/pympipool/shared/__init__.py index 73fadf43..2daf2f44 100644 --- a/pympipool/shared/__init__.py +++ b/pympipool/shared/__init__.py @@ -8,4 +8,4 @@ ) from pympipool.shared.executorbase import cancel_items_in_queue from pympipool.shared.thread import RaisingThread -from pympipool.shared.interface import MpiExecInterface, SlurmSubprocessInterface +from pympipool.shared.interface import MpiExecInterface, SrunInterface From faa6643e92edba3f990d755710adead07a85885e Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Tue, 31 Oct 2023 11:36:56 +0100 Subject: [PATCH 5/8] no more GPU support for MPI --- tests/test_interface.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_interface.py b/tests/test_interface.py index 149e5f03..b8cac32e 100644 --- a/tests/test_interface.py +++ b/tests/test_interface.py @@ -18,7 +18,7 @@ def test_interface(self): task_dict = {"fn": calc, "args": (), "kwargs": {"i": 2}} interface = SocketInterface( interface=MpiExecInterface( - cwd=None, cores=1, gpus_per_core=0, oversubscribe=False + cwd=None, cores=1, oversubscribe=False ) ) interface.bootup( From cf7f52250456abf685b2eb1854207a4e931a054d Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Tue, 31 Oct 2023 11:40:17 +0100 Subject: [PATCH 6/8] Error type changes --- tests/test_meta.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_meta.py b/tests/test_meta.py index 643883c8..1bba9a07 100644 --- a/tests/test_meta.py +++ b/tests/test_meta.py @@ -110,7 +110,7 @@ def test_meta_executor_parallel(self): self.assertTrue(fs_1.done()) def test_errors(self): - with self.assertRaises(ValueError): + with self.assertRaises(TypeError): PyMPIExecutor(max_workers=1, cores_per_worker=1, threads_per_core=2) - with self.assertRaises(ValueError): + with self.assertRaises(TypeError): PyMPIExecutor(max_workers=1, cores_per_worker=1, gpus_per_worker=1) From 29ae9103d264a573dc3d3c3e4177c18e5c35a1e2 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Tue, 31 Oct 2023 11:44:11 +0100 Subject: [PATCH 7/8] more fixes --- tests/test_parse.py | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/tests/test_parse.py b/tests/test_parse.py index a94c048d..eaa953d1 100644 --- a/tests/test_parse.py +++ b/tests/test_parse.py @@ -22,7 +22,7 @@ def test_command_local(self): result_dict["zmqport"], ] interface = MpiExecInterface( - cwd=None, cores=2, gpus_per_core=0, oversubscribe=True + cwd=None, cores=2, oversubscribe=True ) self.assertEqual( command_lst, @@ -32,13 +32,6 @@ def test_command_local(self): ) self.assertEqual(result_dict, parse_arguments(command_lst)) - def test_mpiexec_gpu(self): - interface = MpiExecInterface( - cwd=os.path.abspath("."), cores=2, gpus_per_core=1, oversubscribe=True - ) - with self.assertRaises(ValueError): - interface.bootup(command_lst=[]) - def test_command_slurm(self): result_dict = { "host": "127.0.0.1", @@ -59,7 +52,7 @@ def test_command_slurm(self): "--zmqport", result_dict["zmqport"], ] - interface = SlurmSubprocessInterface( + interface = SrunInterface( cwd=os.path.abspath("."), cores=2, gpus_per_core=1, oversubscribe=True ) self.assertEqual( From 95206700a4351a4396b4937eebd71eff0919fe48 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Tue, 31 Oct 2023 11:58:05 +0100 Subject: [PATCH 8/8] fixes for flux --- pympipool/flux/executor.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pympipool/flux/executor.py b/pympipool/flux/executor.py index 4c8930da..95425118 100644 --- a/pympipool/flux/executor.py +++ b/pympipool/flux/executor.py @@ -136,10 +136,10 @@ def __init__( super().__init__( cwd=cwd, cores=cores, - gpus_per_core=gpus_per_core, - threads_per_core=threads_per_core, oversubscribe=oversubscribe, ) + self._threads_per_core = threads_per_core + self._gpus_per_core = gpus_per_core self._executor = executor self._future = None