From 9ba95927e6d3e843d8934157c02c7b84b5341a18 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sat, 29 Jul 2023 19:07:05 -0600 Subject: [PATCH 01/20] Update connection backend to support Flux python API --- pympipool/shared/communication.py | 48 ++----- pympipool/shared/connections.py | 203 ++++++++++++++++++++++++++++++ pympipool/shared/taskexecutor.py | 123 ++++++++---------- tests/test_interface.py | 23 ++-- tests/test_parse.py | 36 +++--- 5 files changed, 288 insertions(+), 145 deletions(-) create mode 100644 pympipool/shared/connections.py diff --git a/pympipool/shared/communication.py b/pympipool/shared/communication.py index e70641ff..75b9661e 100644 --- a/pympipool/shared/communication.py +++ b/pympipool/shared/communication.py @@ -1,5 +1,3 @@ -import subprocess - import cloudpickle import zmq @@ -9,16 +7,14 @@ class SocketInterface(object): The SocketInterface is an abstraction layer on top of the zero message queue. Args: - queue_adapter (pysqa.queueadapter.QueueAdapter): generalized interface to various queuing systems - queue_adapter_kwargs (dict/None): keyword arguments for the submit_job() function of the queue adapter + interface (pympipool.shared.connections.BaseInterface): Interface for starting the parallel process """ - def __init__(self, queue_adapter=None, queue_adapter_kwargs=None): + def __init__(self, interface=None): self._context = zmq.Context() self._socket = self._context.socket(zmq.PAIR) self._process = None - self._queue_adapter = queue_adapter - self._queue_adapter_kwargs = queue_adapter_kwargs + self._interface = interface def send_dict(self, input_dict): """ @@ -68,44 +64,22 @@ def bind_to_random_port(self): """ return self._socket.bind_to_random_port("tcp://*") - def bootup(self, command_lst, cwd=None, cores=None): + def bootup(self, command_lst): """ Boot up the client process to connect to the SocketInterface. Args: command_lst (list): list of strings to start the client process - cwd (str/None): current working directory where the parallel python task is executed - cores (str/ None): if the job is submitted to a queuing system using the pysqa.queueadapter.QueueAdapter - then cores defines the number of cores to be used for the specific queuing system allocation to execute - the client process. """ - if self._queue_adapter is not None: - self._queue_adapter.submit_job( - working_directory=cwd, - cores=cores, - command=" ".join(command_lst), - **self._queue_adapter_kwargs - ) - else: - self._process = subprocess.Popen( - args=command_lst, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - stdin=subprocess.PIPE, - cwd=cwd, - ) + self._interface.bootup(command_lst=command_lst) def shutdown(self, wait=True): result = None - if self._process is not None and self._process.poll() is None: - result = self.send_and_receive_dict( - input_dict={"shutdown": True, "wait": wait} - ) - self._process_close(wait=wait) - elif self._queue_adapter is not None and self._socket is not None: + if self._interface.poll(): result = self.send_and_receive_dict( input_dict={"shutdown": True, "wait": wait} ) + self._interface.shutdown(wait=wait) if self._socket is not None: self._socket.close() if self._context is not None: @@ -115,14 +89,6 @@ def shutdown(self, wait=True): self._context = None return result - def _process_close(self, wait=True): - self._process.terminate() - self._process.stdout.close() - self._process.stdin.close() - self._process.stderr.close() - if wait: - self._process.wait() - def __del__(self): self.shutdown(wait=True) diff --git a/pympipool/shared/connections.py b/pympipool/shared/connections.py new file mode 100644 index 00000000..05190b7a --- /dev/null +++ b/pympipool/shared/connections.py @@ -0,0 +1,203 @@ +from abc import ABC +import subprocess + + +class BaseInterface(ABC): + def __init__(self, cwd, cores=1, gpus_per_core=0, oversubscribe=False): + self._cwd = cwd + self._cores = cores + self._gpus_per_core = gpus_per_core + self._oversubscribe = oversubscribe + + def bootup(self, command_lst): + raise NotImplementedError + + def shutdown(self, wait=True): + raise NotImplementedError + + def poll(self): + raise NotImplementedError + + +class SubprocessInterface(BaseInterface): + def __init__(self, cwd, cores=1, gpus_per_core=0, oversubscribe=False): + super().__init__( + cwd=cwd, + cores=cores, + gpus_per_core=gpus_per_core, + oversubscribe=oversubscribe, + ) + self._process = None + + def bootup(self, command_lst): + self._process = subprocess.Popen( + args=self.generate_command(command_lst=command_lst), + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + stdin=subprocess.PIPE, + cwd=self._cwd, + ) + + def generate_command(self, command_lst): + return command_lst + + def shutdown(self, wait=True): + self._process.terminate() + self._process.stdout.close() + self._process.stdin.close() + self._process.stderr.close() + if wait: + self._process.wait() + self._process = None + + def poll(self): + return self._process is not None and self._process.poll() is None + + +class MpiExecInterface(SubprocessInterface): + def generate_command(self, command_lst): + command_prepend_lst = generate_mpiexec_command( + cores=self._cores, + gpus_per_core=self._gpus_per_core, + oversubscribe=self._oversubscribe, + ) + return super().generate_command( + command_lst=command_prepend_lst + command_lst, + ) + + +class SlurmSubprocessInterface(SubprocessInterface): + def generate_command(self, command_lst): + command_prepend_lst = generate_slurm_command( + cores=self._cores, + cwd=self._cwd, + gpus_per_core=self._gpus_per_core, + oversubscribe=self._oversubscribe, + ) + return super().generate_command( + command_lst=command_prepend_lst + command_lst, + ) + + +class PysqaInterface(BaseInterface): + def __init__( + self, + cwd, + cores=1, + gpus_per_core=0, + oversubscribe=False, + queue_adapter=None, + queue_type=None, + queue_adapter_kwargs={}, + ): + super().__init__( + cwd=cwd, + cores=cores, + gpus_per_core=gpus_per_core, + oversubscribe=oversubscribe, + ) + self._queue_adapter = queue_adapter + self._queue_type = queue_type + self._queue_adapter_kwargs = queue_adapter_kwargs + self._queue_id = None + + def bootup(self, command_lst): + if self._queue_type.lower() == "slurm": + command_prepend_lst = generate_slurm_command( + cores=self._cores, + cwd=self._cwd, + gpus_per_core=self._gpus_per_core, + oversubscribe=self._oversubscribe, + ) + else: + command_prepend_lst = generate_mpiexec_command( + cores=self._cores, + gpus_per_core=self._gpus_per_core, + oversubscribe=self._oversubscribe, + ) + self._queue_id = self._queue_adapter.submit_job( + working_directory=self._cwd, + cores=self._cores, + command=" ".join(command_prepend_lst + command_lst), + **self._queue_adapter_kwargs + ) + + def shutdown(self, wait=True): + self._queue_adapter.delete_job(process_id=self._queue_id) + + def poll(self): + return self._queue_adapter is not None + + +class FluxCmdInterface(SubprocessInterface): + def generate_command(self, command_lst): + command_prepend_lst = [ + "flux", + "run", + "-n", + str(self._cores), + "--cwd=" + self._cwd, + ] + if self._gpus_per_core > 0: + command_prepend_lst += ["--gpus-per-task=" + str(self._gpus_per_core)] + return super().generate_command( + command_lst=command_prepend_lst + command_lst, + ) + + +class FluxPythonInterface(BaseInterface): + def __init__( + self, cwd, cores=1, gpus_per_core=0, oversubscribe=False, executor=None + ): + super().__init__( + cwd=cwd, + cores=cores, + gpus_per_core=gpus_per_core, + oversubscribe=oversubscribe, + ) + self._executor = executor + self._future = None + + def bootup(self, command_lst): + import flux.job + + if self._oversubscribe: + raise ValueError( + "Oversubscribing is currently not supported for the Flux adapter." + ) + if self._executor is None: + self._executor = flux.job.FluxExecutor() + jobspec = flux.job.JobspecV1.from_command( + command=" ".join(command_lst), + num_tasks=1, + cores_per_task=self._cores, + gpus_per_task=self._gpus_per_core, + num_nodes=None, + exclusive=False, + ) + jobspec.cwd = self._cwd + self._future = self._executor.submit(jobspec) + + def shutdown(self, wait=True): + self._executor.shutdown(wait=wait) + + def poll(self): + return self._executor is not None + + +def generate_slurm_command(cores, cwd, gpus_per_core=0, oversubscribe=False): + command_prepend_lst = ["srun", "-n", str(cores), "-D", cwd] + if gpus_per_core > 0: + command_prepend_lst += ["--gpus-per-task=" + str(gpus_per_core)] + if oversubscribe: + command_prepend_lst += ["--oversubscribe"] + return command_prepend_lst + + +def generate_mpiexec_command(cores, gpus_per_core=0, oversubscribe=False): + command_prepend_lst = ["mpiexec", "-n", str(cores)] + if oversubscribe: + command_prepend_lst += ["--oversubscribe"] + if gpus_per_core > 0: + raise ValueError() + return command_prepend_lst diff --git a/pympipool/shared/taskexecutor.py b/pympipool/shared/taskexecutor.py index 00133ace..94df0349 100644 --- a/pympipool/shared/taskexecutor.py +++ b/pympipool/shared/taskexecutor.py @@ -6,6 +6,12 @@ import cloudpickle from pympipool.shared.communication import SocketInterface +from pympipool.shared.connections import ( + MpiExecInterface, + SlurmSubprocessInterface, + FluxCmdInterface, + PysqaInterface, +) def cancel_items_in_queue(que): @@ -46,61 +52,6 @@ def cloudpickle_register(ind=2): pass -def command_line_options( - hostname, - port_selected, - path, - cores, - gpus_per_task=0, - oversubscribe=False, - enable_flux_backend=False, - enable_slurm_backend=False, - enable_multi_host=False, -): - """ - Translate the individual parameters to command line options. - - Args: - hostname (str): name of the host where the SocketInterface instance runs the client process should conenct to. - port_selected (int): port the SocketInterface instance runs on. - path (str): path to the python script which should be executed as client process. - cores (int): defines the total number of MPI ranks to use - cores_per_task (int): number of MPI ranks per task - defaults to 1 - gpus_per_task (int): number of GPUs per MPI rank - defaults to 0 - oversubscribe (bool): enable of disable the oversubscribe feature of OpenMPI - defaults to False - enable_flux_backend (bool): enable the flux-framework as backend - defaults to False - enable_slurm_backend (bool): enable the SLURM queueing system as backend - defaults to False - enable_mpi4py_backend (bool): enable the mpi4py.futures module - defaults to True - enable_multi_host (bool): communicate the host to connect to - defaults to False - - Returns: - list: list of strings to be executed on the command line - """ - if enable_flux_backend: - command_lst = ["flux", "run"] - elif enable_slurm_backend: - command_lst = ["srun"] - else: - command_lst = ["mpiexec"] - if gpus_per_task > 0 and (enable_flux_backend or enable_slurm_backend): - command_lst += ["--gpus-per-task=" + str(gpus_per_task)] - elif gpus_per_task > 0: - raise ValueError("GPU binding is only supported for flux and SLURM backend.") - if oversubscribe: - command_lst += ["--oversubscribe"] - command_lst += ["-n", str(cores), "python", path] - if enable_flux_backend or enable_slurm_backend or enable_multi_host: - command_lst += [ - "--host", - hostname, - ] - command_lst += [ - "--zmqport", - str(port_selected), - ] - return command_lst - - def execute_parallel_tasks( future_queue, cores, @@ -126,26 +77,52 @@ def execute_parallel_tasks( queue_adapter (pysqa.queueadapter.QueueAdapter): generalized interface to various queuing systems queue_adapter_kwargs (dict/None): keyword arguments for the submit_job() function of the queue adapter """ - interface = SocketInterface( - queue_adapter=queue_adapter, queue_adapter_kwargs=queue_adapter_kwargs - ) - interface.bootup( - command_lst=command_line_options( - hostname=socket.gethostname(), - port_selected=interface.bind_to_random_port(), - path=os.path.abspath( - os.path.join(__file__, "..", "..", "backend", "mpiexec.py") - ), + command_lst = [ + "python", + os.path.abspath(os.path.join(__file__, "..", "..", "backend", "mpiexec.py")), + ] + if enable_flux_backend or enable_slurm_backend: + command_lst += [ + "--host", + socket.gethostname(), + ] + if queue_adapter is not None: + connections = PysqaInterface( + cwd=cwd, + cores=cores, + gpus_per_core=gpus_per_task, + oversubscribe=oversubscribe, + queue_adapter=queue_adapter, + queue_type=None, + queue_adapter_kwargs=queue_adapter_kwargs, + ) + elif enable_flux_backend: + connections = FluxCmdInterface( + cwd=cwd, cores=cores, - gpus_per_task=gpus_per_task, + gpus_per_core=gpus_per_task, oversubscribe=oversubscribe, - enable_flux_backend=enable_flux_backend, - enable_slurm_backend=enable_slurm_backend, - enable_multi_host=queue_adapter is not None, - ), - cwd=cwd, - cores=cores, - ) + ) + elif enable_slurm_backend: + connections = SlurmSubprocessInterface( + cwd=cwd, + cores=cores, + gpus_per_core=gpus_per_task, + oversubscribe=oversubscribe, + ) + else: + connections = MpiExecInterface( + cwd=cwd, + cores=cores, + gpus_per_core=gpus_per_task, + oversubscribe=oversubscribe, + ) + interface = SocketInterface(interface=connections) + command_lst += [ + "--zmqport", + str(interface.bind_to_random_port()), + ] + interface.bootup(command_lst=command_lst) _execute_parallel_tasks_loop(interface=interface, future_queue=future_queue) diff --git a/tests/test_interface.py b/tests/test_interface.py index 0d717766..9ff69fab 100644 --- a/tests/test_interface.py +++ b/tests/test_interface.py @@ -4,7 +4,8 @@ import numpy as np import unittest from pympipool.shared.communication import SocketInterface -from pympipool.shared.taskexecutor import command_line_options, cloudpickle_register +from pympipool.shared.taskexecutor import cloudpickle_register +from pympipool.shared.connections import MpiExecInterface def calc(i): @@ -15,19 +16,17 @@ class TestInterface(unittest.TestCase): def test_interface(self): cloudpickle_register(ind=1) task_dict = {"fn": calc, 'args': (), "kwargs": {"i": 2}} - interface = SocketInterface(queue_adapter=None, queue_adapter_kwargs=None) - interface.bootup( - command_lst=command_line_options( - hostname=socket.gethostname(), - port_selected=interface.bind_to_random_port(), - path=os.path.abspath(os.path.join(__file__, "..", "..", "pympipool", "backend", "mpiexec.py")), + interface = SocketInterface( + interface=MpiExecInterface( + cwd=os.path.abspath("."), cores=1, - gpus_per_task=0, - oversubscribe=False, - enable_flux_backend=False, - enable_slurm_backend=False, - enable_multi_host=False, + gpus_per_core=0, + oversubscribe=False ) ) + interface.bootup(command_lst=[ + "python", + os.path.abspath(os.path.join(__file__, "..", "..", "pympipool", "backend", "mpiexec.py")) + ]) self.assertEqual(interface.send_and_receive_dict(input_dict=task_dict), np.array(4)) interface.shutdown(wait=True) \ No newline at end of file diff --git a/tests/test_parse.py b/tests/test_parse.py index ebbe97f1..e8bd889c 100644 --- a/tests/test_parse.py +++ b/tests/test_parse.py @@ -1,6 +1,7 @@ +import os import unittest from pympipool.shared.backend import parse_arguments -from pympipool.shared.taskexecutor import command_line_options +from pympipool.shared.connections import MpiExecInterface, FluxCmdInterface class TestParser(unittest.TestCase): @@ -10,19 +11,17 @@ def test_command_local(self): 'zmqport': '22', } command_lst = [ - 'mpiexec', '--oversubscribe', + 'mpiexec', '-n', '2', + '--oversubscribe', 'python', '/', '--zmqport', result_dict['zmqport'] ] - self.assertEqual(command_lst, command_line_options( - hostname=result_dict['host'], - port_selected=result_dict['zmqport'], - path="/", - cores=2, - oversubscribe=True, - enable_flux_backend=False, - )) + interface = MpiExecInterface(cwd=os.path.abspath("."), cores=2, gpus_per_core=0, oversubscribe=True) + self.assertEqual( + command_lst, + interface.generate_command(command_lst=['python', '/', '--zmqport', result_dict['zmqport']]) + ) self.assertEqual(result_dict, parse_arguments(command_lst)) def test_command_flux(self): @@ -31,16 +30,15 @@ def test_command_flux(self): 'zmqport': '22', } command_lst = [ - 'flux', 'run', '-n', '2', 'python', '/', + 'flux', 'run', '-n', '2', + "--cwd=" + os.path.abspath("."), + 'python', '/', '--host', result_dict['host'], '--zmqport', result_dict['zmqport'] ] - self.assertEqual(command_lst, command_line_options( - hostname=result_dict['host'], - port_selected=result_dict['zmqport'], - path="/", - cores=2, - oversubscribe=False, - enable_flux_backend=True, - )) + interface = FluxCmdInterface(cwd=os.path.abspath("."), cores=2, gpus_per_core=0, oversubscribe=False) + self.assertEqual( + command_lst, + interface.generate_command(command_lst=['python', '/', '--host', result_dict['host'], '--zmqport', result_dict['zmqport']]) + ) self.assertEqual(result_dict, parse_arguments(command_lst)) From c00a29620dcc704b7673542d4799f709ba46dcb5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sat, 29 Jul 2023 19:07:51 -0600 Subject: [PATCH 02/20] Export new interfaces in init --- pympipool/__init__.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/pympipool/__init__.py b/pympipool/__init__.py index 73eda1ef..412d0658 100644 --- a/pympipool/__init__.py +++ b/pympipool/__init__.py @@ -11,6 +11,13 @@ from pympipool.legacy.interfaces.pool import Pool, MPISpawnPool from pympipool.shared.thread import RaisingThread from pympipool.shared.taskexecutor import cancel_items_in_queue +from pympipool.shared.connections import ( + MpiExecInterface, + SlurmSubprocessInterface, + FluxPythonInterface, + FluxCmdInterface, + PysqaInterface, +) from ._version import get_versions From 7c055d1da96eb93e92c3e5cd5aae227dd907babd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sat, 29 Jul 2023 20:09:46 -0600 Subject: [PATCH 03/20] More refactoring --- pympipool/legacy/interfaces/pool.py | 81 ++++++++------ pympipool/legacy/shared/interface.py | 157 +++------------------------ pympipool/shared/connections.py | 45 ++++++++ pympipool/shared/taskexecutor.py | 81 +++++++------- 4 files changed, 147 insertions(+), 217 deletions(-) diff --git a/pympipool/legacy/interfaces/pool.py b/pympipool/legacy/interfaces/pool.py index bf4d5d1c..4f6b0b1a 100644 --- a/pympipool/legacy/interfaces/pool.py +++ b/pympipool/legacy/interfaces/pool.py @@ -1,8 +1,7 @@ from abc import ABC +import os -from pympipool.shared.communication import SocketInterface -from pympipool.shared.taskexecutor import cloudpickle_register -from pympipool.legacy.shared.interface import get_parallel_subprocess_command +from pympipool.shared.taskexecutor import cloudpickle_register, interface_init class PoolBase(ABC): @@ -11,11 +10,9 @@ class PoolBase(ABC): alone. Rather it implements the __enter__(), __exit__() and shutdown() function shared between the derived classes. """ - def __init__(self, queue_adapter=None, queue_adapter_kwargs=None): + def __init__(self): self._future_dict = {} - self._interface = SocketInterface( - queue_adapter=queue_adapter, queue_adapter_kwargs=queue_adapter_kwargs - ) + self._interface = None cloudpickle_register(ind=3) def __enter__(self): @@ -71,24 +68,27 @@ def __init__( queue_adapter=None, queue_adapter_kwargs=None, ): - super().__init__( - queue_adapter=queue_adapter, queue_adapter_kwargs=queue_adapter_kwargs - ) - self._interface.bootup( - command_lst=get_parallel_subprocess_command( - port_selected=self._interface.bind_to_random_port(), - cores=max_workers, - cores_per_task=1, - gpus_per_task=gpus_per_task, - oversubscribe=oversubscribe, - enable_flux_backend=enable_flux_backend, - enable_slurm_backend=enable_slurm_backend, - enable_mpi4py_backend=True, - enable_multi_host=queue_adapter is not None, + super().__init__() + command_lst = [ + "python", + "-m", + "mpi4py.futures", + os.path.abspath( + os.path.join(__file__, "..", "..", "backend", "mpipool.py") ), + ] + self._interface, command_lst = interface_init( + command_lst=command_lst, cwd=cwd, cores=max_workers, + gpus_per_core=gpus_per_task, + oversubscribe=oversubscribe, + enable_flux_backend=enable_flux_backend, + enable_slurm_backend=enable_slurm_backend, + queue_adapter=queue_adapter, + queue_adapter_kwargs=queue_adapter_kwargs, ) + self._interface.bootup(command_lst=command_lst) def map(self, func, iterable, chunksize=None): """ @@ -178,24 +178,33 @@ def __init__( queue_adapter=None, queue_adapter_kwargs=None, ): - super().__init__( - queue_adapter=queue_adapter, queue_adapter_kwargs=queue_adapter_kwargs - ) - self._interface.bootup( - command_lst=get_parallel_subprocess_command( - port_selected=self._interface.bind_to_random_port(), - cores=max_ranks, - cores_per_task=ranks_per_task, - gpus_per_task=gpus_per_task, - oversubscribe=oversubscribe, - enable_flux_backend=False, - enable_slurm_backend=False, - enable_mpi4py_backend=True, - enable_multi_host=queue_adapter is not None, + super().__init__() + command_lst = [ + "python", + "-m", + "mpi4py.futures", + os.path.abspath( + os.path.join(__file__, "..", "..", "backend", "mpipool.py") ), + ] + self._interface, command_lst = interface_init( + command_lst=command_lst, cwd=cwd, - cores=max_ranks, + cores=1, + gpus_per_core=gpus_per_task, + oversubscribe=oversubscribe, + enable_flux_backend=False, + enable_slurm_backend=False, + queue_adapter=queue_adapter, + queue_adapter_kwargs=queue_adapter_kwargs, ) + command_lst += [ + "--cores-per-task", + str(ranks_per_task), + "--cores-total", + str(max_ranks), + ] + self._interface.bootup(command_lst=command_lst) def map(self, func, iterable, chunksize=None): """ diff --git a/pympipool/legacy/shared/interface.py b/pympipool/legacy/shared/interface.py index d181d15f..62279579 100644 --- a/pympipool/legacy/shared/interface.py +++ b/pympipool/legacy/shared/interface.py @@ -1,133 +1,8 @@ import os import queue -import socket import time - -from pympipool.shared.communication import SocketInterface - - -def command_line_options( - hostname, - port_selected, - path, - cores, - cores_per_task=1, - gpus_per_task=0, - oversubscribe=False, - enable_flux_backend=False, - enable_slurm_backend=False, - enable_mpi4py_backend=True, - enable_multi_host=False, -): - """ - Translate the individual parameters to command line options. - - Args: - hostname (str): name of the host where the SocketInterface instance runs the client process should conenct to. - port_selected (int): port the SocketInterface instance runs on. - path (str): path to the python script which should be executed as client process. - cores (int): defines the total number of MPI ranks to use - cores_per_task (int): number of MPI ranks per task - defaults to 1 - gpus_per_task (int): number of GPUs per MPI rank - defaults to 0 - oversubscribe (bool): enable of disable the oversubscribe feature of OpenMPI - defaults to False - enable_flux_backend (bool): enable the flux-framework as backend - defaults to False - enable_slurm_backend (bool): enable the SLURM queueing system as backend - defaults to False - enable_mpi4py_backend (bool): enable the mpi4py.futures module - defaults to True - enable_multi_host (bool): communicate the host to connect to - defaults to False - - Returns: - list: list of strings to be executed on the command line - """ - if enable_flux_backend: - command_lst = ["flux", "run"] - elif enable_slurm_backend: - command_lst = ["srun"] - else: - command_lst = ["mpiexec"] - if gpus_per_task > 0 and (enable_flux_backend or enable_slurm_backend): - command_lst += ["--gpus-per-task=" + str(gpus_per_task)] - elif gpus_per_task > 0: - raise ValueError("GPU binding is only supported for flux and SLURM backend.") - if oversubscribe: - command_lst += ["--oversubscribe"] - if cores_per_task == 1 and enable_mpi4py_backend: - command_lst += ["-n", str(cores), "python", "-m", "mpi4py.futures"] - elif cores_per_task > 1 and enable_mpi4py_backend: - # Running MPI parallel tasks within the map() requires mpi4py to use mpi spawn: - # https://github.com/mpi4py/mpi4py/issues/324 - command_lst += ["-n", "1", "python"] - else: - command_lst += ["-n", str(cores), "python"] - command_lst += [path] - if enable_flux_backend or enable_slurm_backend or enable_multi_host: - command_lst += [ - "--host", - hostname, - ] - command_lst += [ - "--zmqport", - str(port_selected), - ] - if enable_mpi4py_backend: - command_lst += [ - "--cores-per-task", - str(cores_per_task), - "--cores-total", - str(cores), - ] - return command_lst - - -def get_parallel_subprocess_command( - port_selected, - cores, - cores_per_task=1, - gpus_per_task=0, - oversubscribe=False, - enable_flux_backend=False, - enable_slurm_backend=False, - enable_mpi4py_backend=True, - enable_multi_host=False, -): - """ - Translate the individual parameters to command line options. - - Args: - port_selected (int): port the SocketInterface instance runs on. - cores (int): defines the total number of MPI ranks to use - cores_per_task (int): number of MPI ranks per task - defaults to 1 - gpus_per_task (int): number of GPUs per MPI rank - defaults to 0 - oversubscribe (bool): enable of disable the oversubscribe feature of OpenMPI - defaults to False - enable_flux_backend (bool): enable the flux-framework as backend - defaults to False - enable_slurm_backend (bool): enable the SLURM queueing system as backend - defaults to False - enable_mpi4py_backend (bool): enable the mpi4py.futures module - defaults to True - enable_multi_host (bool): communicate the host to connect to - defaults to False - - Returns: - list: list of strings to be executed on the command line - """ - if enable_mpi4py_backend: - executable = os.path.abspath( - os.path.join(__file__, "..", "..", "backend", "mpipool.py") - ) - else: - executable = os.path.abspath( - os.path.join(__file__, "..", "..", "..", "backend", "mpiexec.py") - ) - return command_line_options( - hostname=socket.gethostname(), - port_selected=port_selected, - path=executable, - cores=cores, - cores_per_task=cores_per_task, - gpus_per_task=gpus_per_task, - oversubscribe=oversubscribe, - enable_flux_backend=enable_flux_backend, - enable_slurm_backend=enable_slurm_backend, - enable_mpi4py_backend=enable_mpi4py_backend, - enable_multi_host=enable_multi_host, - ) +from pympipool.shared.taskexecutor import interface_init def execute_serial_tasks( @@ -158,24 +33,24 @@ def execute_serial_tasks( queue_adapter_kwargs (dict/None): keyword arguments for the submit_job() function of the queue adapter """ future_dict = {} - interface = SocketInterface( - queue_adapter=queue_adapter, queue_adapter_kwargs=queue_adapter_kwargs - ) - interface.bootup( - command_lst=get_parallel_subprocess_command( - port_selected=interface.bind_to_random_port(), - cores=cores, - gpus_per_task=gpus_per_task, - cores_per_task=1, - oversubscribe=oversubscribe, - enable_flux_backend=enable_flux_backend, - enable_slurm_backend=enable_slurm_backend, - enable_mpi4py_backend=True, - enable_multi_host=queue_adapter is not None, - ), + command_lst = [ + "python", + "-m", + "mpi4py.futures", + os.path.abspath(os.path.join(__file__, "..", "..", "backend", "mpipool.py")), + ] + interface, command_lst = interface_init( + command_lst=command_lst, cwd=cwd, cores=cores, + gpus_per_core=gpus_per_task, + oversubscribe=oversubscribe, + enable_flux_backend=enable_flux_backend, + enable_slurm_backend=enable_slurm_backend, + queue_adapter=queue_adapter, + queue_adapter_kwargs=queue_adapter_kwargs, ) + interface.bootup(command_lst=command_lst) _execute_serial_tasks_loop( interface=interface, future_queue=future_queue, diff --git a/pympipool/shared/connections.py b/pympipool/shared/connections.py index 05190b7a..565ecd21 100644 --- a/pympipool/shared/connections.py +++ b/pympipool/shared/connections.py @@ -201,3 +201,48 @@ def generate_mpiexec_command(cores, gpus_per_core=0, oversubscribe=False): if gpus_per_core > 0: raise ValueError() return command_prepend_lst + + +def get_connection_interface( + cwd, + cores=1, + gpus_per_core=0, + oversubscribe=False, + enable_flux_backend=False, + enable_slurm_backend=False, + queue_adapter=None, + queue_type=None, + queue_adapter_kwargs=None, +): + if queue_adapter is not None: + connections = PysqaInterface( + cwd=cwd, + cores=cores, + gpus_per_core=gpus_per_core, + oversubscribe=oversubscribe, + queue_adapter=queue_adapter, + queue_type=queue_type, + queue_adapter_kwargs=queue_adapter_kwargs, + ) + elif enable_flux_backend: + connections = FluxCmdInterface( + cwd=cwd, + cores=cores, + gpus_per_core=gpus_per_core, + oversubscribe=oversubscribe, + ) + elif enable_slurm_backend: + connections = SlurmSubprocessInterface( + cwd=cwd, + cores=cores, + gpus_per_core=gpus_per_core, + oversubscribe=oversubscribe, + ) + else: + connections = MpiExecInterface( + cwd=cwd, + cores=cores, + gpus_per_core=gpus_per_core, + oversubscribe=oversubscribe, + ) + return connections diff --git a/pympipool/shared/taskexecutor.py b/pympipool/shared/taskexecutor.py index 94df0349..6073e5bb 100644 --- a/pympipool/shared/taskexecutor.py +++ b/pympipool/shared/taskexecutor.py @@ -6,12 +6,7 @@ import cloudpickle from pympipool.shared.communication import SocketInterface -from pympipool.shared.connections import ( - MpiExecInterface, - SlurmSubprocessInterface, - FluxCmdInterface, - PysqaInterface, -) +from pympipool.shared.connections import get_connection_interface def cancel_items_in_queue(que): @@ -81,49 +76,55 @@ def execute_parallel_tasks( "python", os.path.abspath(os.path.join(__file__, "..", "..", "backend", "mpiexec.py")), ] - if enable_flux_backend or enable_slurm_backend: + interface, command_lst = interface_init( + command_lst=command_lst, + cwd=cwd, + cores=cores, + gpus_per_core=gpus_per_task, + oversubscribe=oversubscribe, + enable_flux_backend=enable_flux_backend, + enable_slurm_backend=enable_slurm_backend, + queue_adapter=queue_adapter, + queue_adapter_kwargs=queue_adapter_kwargs, + ) + interface.bootup(command_lst=command_lst) + _execute_parallel_tasks_loop(interface=interface, future_queue=future_queue) + + +def interface_init( + command_lst, + cwd, + cores=1, + gpus_per_core=0, + oversubscribe=False, + enable_flux_backend=False, + enable_slurm_backend=False, + queue_adapter=None, + queue_type=None, + queue_adapter_kwargs=None, +): + if enable_flux_backend or enable_slurm_backend or queue_adapter is not None: command_lst += [ "--host", socket.gethostname(), ] - if queue_adapter is not None: - connections = PysqaInterface( - cwd=cwd, - cores=cores, - gpus_per_core=gpus_per_task, - oversubscribe=oversubscribe, - queue_adapter=queue_adapter, - queue_type=None, - queue_adapter_kwargs=queue_adapter_kwargs, - ) - elif enable_flux_backend: - connections = FluxCmdInterface( - cwd=cwd, - cores=cores, - gpus_per_core=gpus_per_task, - oversubscribe=oversubscribe, - ) - elif enable_slurm_backend: - connections = SlurmSubprocessInterface( - cwd=cwd, - cores=cores, - gpus_per_core=gpus_per_task, - oversubscribe=oversubscribe, - ) - else: - connections = MpiExecInterface( - cwd=cwd, - cores=cores, - gpus_per_core=gpus_per_task, - oversubscribe=oversubscribe, - ) + connections = get_connection_interface( + cwd=cwd, + cores=cores, + gpus_per_core=gpus_per_core, + oversubscribe=oversubscribe, + enable_flux_backend=enable_flux_backend, + enable_slurm_backend=enable_slurm_backend, + queue_adapter=queue_adapter, + queue_type=queue_type, + queue_adapter_kwargs=queue_adapter_kwargs, + ) interface = SocketInterface(interface=connections) command_lst += [ "--zmqport", str(interface.bind_to_random_port()), ] - interface.bootup(command_lst=command_lst) - _execute_parallel_tasks_loop(interface=interface, future_queue=future_queue) + return interface, command_lst def _execute_parallel_tasks_loop(interface, future_queue): From 78e116b258e2a53d11fbcd0e4e5ed2f84b1bba45 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sat, 29 Jul 2023 20:41:30 -0600 Subject: [PATCH 04/20] minor fixes --- pympipool/legacy/interfaces/pool.py | 8 ++++++-- pympipool/legacy/shared/interface.py | 6 ++++++ pympipool/shared/connections.py | 2 +- tests/test_interface.py | 5 +++-- 4 files changed, 16 insertions(+), 5 deletions(-) diff --git a/pympipool/legacy/interfaces/pool.py b/pympipool/legacy/interfaces/pool.py index 4f6b0b1a..e7c31d04 100644 --- a/pympipool/legacy/interfaces/pool.py +++ b/pympipool/legacy/interfaces/pool.py @@ -88,6 +88,12 @@ def __init__( queue_adapter=queue_adapter, queue_adapter_kwargs=queue_adapter_kwargs, ) + command_lst += [ + "--cores-per-task", + str(1), + "--cores-total", + str(max_workers), + ] self._interface.bootup(command_lst=command_lst) def map(self, func, iterable, chunksize=None): @@ -181,8 +187,6 @@ def __init__( super().__init__() command_lst = [ "python", - "-m", - "mpi4py.futures", os.path.abspath( os.path.join(__file__, "..", "..", "backend", "mpipool.py") ), diff --git a/pympipool/legacy/shared/interface.py b/pympipool/legacy/shared/interface.py index 62279579..0bc32dfe 100644 --- a/pympipool/legacy/shared/interface.py +++ b/pympipool/legacy/shared/interface.py @@ -50,6 +50,12 @@ def execute_serial_tasks( queue_adapter=queue_adapter, queue_adapter_kwargs=queue_adapter_kwargs, ) + command_lst += [ + "--cores-per-task", + str(1), + "--cores-total", + str(cores), + ] interface.bootup(command_lst=command_lst) _execute_serial_tasks_loop( interface=interface, diff --git a/pympipool/shared/connections.py b/pympipool/shared/connections.py index 565ecd21..dce3cab7 100644 --- a/pympipool/shared/connections.py +++ b/pympipool/shared/connections.py @@ -88,7 +88,7 @@ def __init__( oversubscribe=False, queue_adapter=None, queue_type=None, - queue_adapter_kwargs={}, + queue_adapter_kwargs=None, ): super().__init__( cwd=cwd, diff --git a/tests/test_interface.py b/tests/test_interface.py index 9ff69fab..b3562068 100644 --- a/tests/test_interface.py +++ b/tests/test_interface.py @@ -1,5 +1,4 @@ import os -import socket import numpy as np import unittest @@ -26,7 +25,9 @@ def test_interface(self): ) interface.bootup(command_lst=[ "python", - os.path.abspath(os.path.join(__file__, "..", "..", "pympipool", "backend", "mpiexec.py")) + os.path.abspath(os.path.join(__file__, "..", "..", "pympipool", "backend", "mpiexec.py")), + "--zmqport", + str(interface.bind_to_random_port()), ]) self.assertEqual(interface.send_and_receive_dict(input_dict=task_dict), np.array(4)) interface.shutdown(wait=True) \ No newline at end of file From f512ee7a7e454ce8af45c198fcd837bd078505ae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sat, 29 Jul 2023 20:52:15 -0600 Subject: [PATCH 05/20] small fix --- pympipool/legacy/interfaces/pool.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/pympipool/legacy/interfaces/pool.py b/pympipool/legacy/interfaces/pool.py index e7c31d04..0ee2c07e 100644 --- a/pympipool/legacy/interfaces/pool.py +++ b/pympipool/legacy/interfaces/pool.py @@ -185,12 +185,15 @@ def __init__( queue_adapter_kwargs=None, ): super().__init__() - command_lst = [ - "python", - os.path.abspath( - os.path.join(__file__, "..", "..", "backend", "mpipool.py") - ), - ] + executable = os.path.abspath( + os.path.join(__file__, "..", "..", "backend", "mpipool.py") + ) + if ranks_per_task == 1: + command_lst = ["python", "-m", "mpi4py.futures", executable] + else: + # Running MPI parallel tasks within the map() requires mpi4py to use mpi spawn: + # https://github.com/mpi4py/mpi4py/issues/324 + command_lst = ["-n", "1", "python", executable] self._interface, command_lst = interface_init( command_lst=command_lst, cwd=cwd, From ae9c654fcd29210513c32a25bdb5d8ba400c90cb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sat, 29 Jul 2023 21:00:05 -0600 Subject: [PATCH 06/20] more bug fixes --- pympipool/legacy/interfaces/pool.py | 34 +++++++++++++--------------- pympipool/legacy/shared/interface.py | 15 +++++------- pympipool/shared/taskexecutor.py | 8 +++---- 3 files changed, 26 insertions(+), 31 deletions(-) diff --git a/pympipool/legacy/interfaces/pool.py b/pympipool/legacy/interfaces/pool.py index 0ee2c07e..b8cec78d 100644 --- a/pympipool/legacy/interfaces/pool.py +++ b/pympipool/legacy/interfaces/pool.py @@ -1,7 +1,7 @@ from abc import ABC import os -from pympipool.shared.taskexecutor import cloudpickle_register, interface_init +from pympipool.shared.taskexecutor import cloudpickle_register, interface_bootup class PoolBase(ABC): @@ -76,8 +76,12 @@ def __init__( os.path.abspath( os.path.join(__file__, "..", "..", "backend", "mpipool.py") ), + "--cores-per-task", + str(1), + "--cores-total", + str(max_workers), ] - self._interface, command_lst = interface_init( + self._interface = interface_bootup( command_lst=command_lst, cwd=cwd, cores=max_workers, @@ -88,13 +92,6 @@ def __init__( queue_adapter=queue_adapter, queue_adapter_kwargs=queue_adapter_kwargs, ) - command_lst += [ - "--cores-per-task", - str(1), - "--cores-total", - str(max_workers), - ] - self._interface.bootup(command_lst=command_lst) def map(self, func, iterable, chunksize=None): """ @@ -190,14 +187,22 @@ def __init__( ) if ranks_per_task == 1: command_lst = ["python", "-m", "mpi4py.futures", executable] + cores = max_ranks else: # Running MPI parallel tasks within the map() requires mpi4py to use mpi spawn: # https://github.com/mpi4py/mpi4py/issues/324 command_lst = ["-n", "1", "python", executable] - self._interface, command_lst = interface_init( + cores = 1 + command_lst += [ + "--cores-per-task", + str(ranks_per_task), + "--cores-total", + str(max_ranks), + ] + self._interface = interface_bootup( command_lst=command_lst, cwd=cwd, - cores=1, + cores=cores, gpus_per_core=gpus_per_task, oversubscribe=oversubscribe, enable_flux_backend=False, @@ -205,13 +210,6 @@ def __init__( queue_adapter=queue_adapter, queue_adapter_kwargs=queue_adapter_kwargs, ) - command_lst += [ - "--cores-per-task", - str(ranks_per_task), - "--cores-total", - str(max_ranks), - ] - self._interface.bootup(command_lst=command_lst) def map(self, func, iterable, chunksize=None): """ diff --git a/pympipool/legacy/shared/interface.py b/pympipool/legacy/shared/interface.py index 0bc32dfe..d11930e6 100644 --- a/pympipool/legacy/shared/interface.py +++ b/pympipool/legacy/shared/interface.py @@ -2,7 +2,7 @@ import queue import time -from pympipool.shared.taskexecutor import interface_init +from pympipool.shared.taskexecutor import interface_bootup def execute_serial_tasks( @@ -38,8 +38,12 @@ def execute_serial_tasks( "-m", "mpi4py.futures", os.path.abspath(os.path.join(__file__, "..", "..", "backend", "mpipool.py")), + "--cores-per-task", + str(1), + "--cores-total", + str(cores), ] - interface, command_lst = interface_init( + interface = interface_bootup( command_lst=command_lst, cwd=cwd, cores=cores, @@ -50,13 +54,6 @@ def execute_serial_tasks( queue_adapter=queue_adapter, queue_adapter_kwargs=queue_adapter_kwargs, ) - command_lst += [ - "--cores-per-task", - str(1), - "--cores-total", - str(cores), - ] - interface.bootup(command_lst=command_lst) _execute_serial_tasks_loop( interface=interface, future_queue=future_queue, diff --git a/pympipool/shared/taskexecutor.py b/pympipool/shared/taskexecutor.py index 6073e5bb..563ae326 100644 --- a/pympipool/shared/taskexecutor.py +++ b/pympipool/shared/taskexecutor.py @@ -76,7 +76,7 @@ def execute_parallel_tasks( "python", os.path.abspath(os.path.join(__file__, "..", "..", "backend", "mpiexec.py")), ] - interface, command_lst = interface_init( + interface = interface_bootup( command_lst=command_lst, cwd=cwd, cores=cores, @@ -87,11 +87,10 @@ def execute_parallel_tasks( queue_adapter=queue_adapter, queue_adapter_kwargs=queue_adapter_kwargs, ) - interface.bootup(command_lst=command_lst) _execute_parallel_tasks_loop(interface=interface, future_queue=future_queue) -def interface_init( +def interface_bootup( command_lst, cwd, cores=1, @@ -124,7 +123,8 @@ def interface_init( "--zmqport", str(interface.bind_to_random_port()), ] - return interface, command_lst + interface.bootup(command_lst=command_lst) + return interface def _execute_parallel_tasks_loop(interface, future_queue): From b87679460b755f2706f467ab3ad3bc42e8a40567 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sat, 29 Jul 2023 21:04:26 -0600 Subject: [PATCH 07/20] add docstring for get_connection_interface() --- pympipool/shared/connections.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/pympipool/shared/connections.py b/pympipool/shared/connections.py index dce3cab7..7bf31240 100644 --- a/pympipool/shared/connections.py +++ b/pympipool/shared/connections.py @@ -214,6 +214,23 @@ def get_connection_interface( queue_type=None, queue_adapter_kwargs=None, ): + """ + Backwards compatibility adapter to get the connection interface + + Args: + cwd (str/None): current working directory where the parallel python task is executed + cores (int): defines the total number of MPI ranks to use + gpus_per_core (int): number of GPUs per MPI rank - defaults to 0 + oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI only) - default False + enable_flux_backend (bool): use the flux-framework as backend rather than just calling mpiexec + enable_slurm_backend (bool): enable the SLURM queueing system as backend - defaults to False + queue_adapter (pysqa.queueadapter.QueueAdapter): generalized interface to various queuing systems + queue_type (str): type of the queuing system + queue_adapter_kwargs (dict/None): keyword arguments for the submit_job() function of the queue adapter + + Returns: + pympipool.shared.connections.BaseInterface: Connection interface + """ if queue_adapter is not None: connections = PysqaInterface( cwd=cwd, From 7d7f41ab0f6664afa125214af0c477c7912d4869 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sat, 29 Jul 2023 21:11:51 -0600 Subject: [PATCH 08/20] fix test parse legacy --- tests/test_parse_legacy.py | 39 ++++++++++++++++++-------------------- 1 file changed, 18 insertions(+), 21 deletions(-) diff --git a/tests/test_parse_legacy.py b/tests/test_parse_legacy.py index c4f28b76..b04a2997 100644 --- a/tests/test_parse_legacy.py +++ b/tests/test_parse_legacy.py @@ -1,6 +1,7 @@ import unittest +import os from pympipool.legacy.shared.backend import parse_arguments -from pympipool.legacy.shared.interface import command_line_options +from pympipool.shared.connections import MpiExecInterface, FluxCmdInterface class TestParser(unittest.TestCase): @@ -12,22 +13,19 @@ def test_command_local(self): 'cores_per_task': '1' } command_lst = [ - 'mpiexec', '--oversubscribe', + 'mpiexec', '-n', result_dict['total_cores'], + '--oversubscribe', 'python', '-m', 'mpi4py.futures', '/', '--zmqport', result_dict['zmqport'], '--cores-per-task', result_dict['cores_per_task'], '--cores-total', result_dict['total_cores'] ] - self.assertEqual(command_lst, command_line_options( - hostname=result_dict['host'], - port_selected=result_dict['zmqport'], - path="/", - cores=int(result_dict['total_cores']), - cores_per_task=int(result_dict['cores_per_task']), - oversubscribe=True, - enable_flux_backend=False, - )) + interface = MpiExecInterface(cwd=os.path.abspath("."), cores=2, gpus_per_core=0, oversubscribe=True) + self.assertEqual( + command_lst, + interface.generate_command(command_lst=['python', '-m', 'mpi4py.futures', '/', '--zmqport', result_dict['zmqport'], '--cores-per-task', '1', '--cores-total', '2']) + ) self.assertEqual(result_dict, parse_arguments(command_lst)) def test_command_flux(self): @@ -38,19 +36,18 @@ def test_command_flux(self): 'cores_per_task': '2' } command_lst = [ - 'flux', 'run', '-n', '1', 'python', '/', + 'flux', 'run', '-n', '1', + "--cwd=" + os.path.abspath("."), + 'python', '/', '--host', result_dict['host'], '--zmqport', result_dict['zmqport'], '--cores-per-task', result_dict['cores_per_task'], '--cores-total', result_dict['total_cores'] ] - self.assertEqual(command_lst, command_line_options( - hostname=result_dict['host'], - port_selected=result_dict['zmqport'], - path="/", - cores=int(result_dict['total_cores']), - cores_per_task=int(result_dict['cores_per_task']), - oversubscribe=False, - enable_flux_backend=True, - )) + interface = FluxCmdInterface(cwd=os.path.abspath("."), cores=1, gpus_per_core=0, oversubscribe=False) + self.assertEqual( + command_lst, + interface.generate_command( + command_lst=['python', '/', '--host', result_dict['host'], '--zmqport', result_dict['zmqport'], '--cores-per-task', '2', '--cores-total', '2']) + ) self.assertEqual(result_dict, parse_arguments(command_lst)) From c2fbe93a1872cd4f9fd1648239534e9f06c21de3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sat, 29 Jul 2023 21:20:54 -0600 Subject: [PATCH 09/20] reformat legacy tests --- tests/test_parse_legacy.py | 29 +++++++++++++++++++++++++---- 1 file changed, 25 insertions(+), 4 deletions(-) diff --git a/tests/test_parse_legacy.py b/tests/test_parse_legacy.py index b04a2997..ef0ec746 100644 --- a/tests/test_parse_legacy.py +++ b/tests/test_parse_legacy.py @@ -21,10 +21,21 @@ def test_command_local(self): '--cores-per-task', result_dict['cores_per_task'], '--cores-total', result_dict['total_cores'] ] - interface = MpiExecInterface(cwd=os.path.abspath("."), cores=2, gpus_per_core=0, oversubscribe=True) + interface = MpiExecInterface( + cwd=os.path.abspath("."), + cores=2, + gpus_per_core=0, + oversubscribe=True + ) self.assertEqual( command_lst, - interface.generate_command(command_lst=['python', '-m', 'mpi4py.futures', '/', '--zmqport', result_dict['zmqport'], '--cores-per-task', '1', '--cores-total', '2']) + interface.generate_command( + command_lst=[ + 'python', '-m', 'mpi4py.futures', '/', + '--zmqport', result_dict['zmqport'], + '--cores-per-task', '1', '--cores-total', '2' + ] + ) ) self.assertEqual(result_dict, parse_arguments(command_lst)) @@ -44,10 +55,20 @@ def test_command_flux(self): '--cores-per-task', result_dict['cores_per_task'], '--cores-total', result_dict['total_cores'] ] - interface = FluxCmdInterface(cwd=os.path.abspath("."), cores=1, gpus_per_core=0, oversubscribe=False) + interface = FluxCmdInterface( + cwd=os.path.abspath("."), + cores=1, + gpus_per_core=0, + oversubscribe=False + ) self.assertEqual( command_lst, interface.generate_command( - command_lst=['python', '/', '--host', result_dict['host'], '--zmqport', result_dict['zmqport'], '--cores-per-task', '2', '--cores-total', '2']) + command_lst=[ + 'python', '/', '--host', result_dict['host'], + '--zmqport', result_dict['zmqport'], + '--cores-per-task', '2', '--cores-total', '2' + ] + ) ) self.assertEqual(result_dict, parse_arguments(command_lst)) From db3c875e4d8bfac50f98eb41a167e7a38745c49f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sat, 29 Jul 2023 22:03:08 -0600 Subject: [PATCH 10/20] add test for SLURM interface --- tests/test_parse.py | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/tests/test_parse.py b/tests/test_parse.py index e8bd889c..eafc6b26 100644 --- a/tests/test_parse.py +++ b/tests/test_parse.py @@ -1,7 +1,7 @@ import os import unittest from pympipool.shared.backend import parse_arguments -from pympipool.shared.connections import MpiExecInterface, FluxCmdInterface +from pympipool.shared.connections import MpiExecInterface, FluxCmdInterface, SlurmSubprocessInterface class TestParser(unittest.TestCase): @@ -42,3 +42,22 @@ def test_command_flux(self): interface.generate_command(command_lst=['python', '/', '--host', result_dict['host'], '--zmqport', result_dict['zmqport']]) ) self.assertEqual(result_dict, parse_arguments(command_lst)) + + def test_command_slurm(self): + result_dict = { + 'host': "127.0.0.1", + 'zmqport': '22', + } + command_lst = [ + 'srun', '-n', '2', + "-D", os.path.abspath("."), + 'python', '/', + '--host', result_dict['host'], + '--zmqport', result_dict['zmqport'] + ] + interface = SlurmSubprocessInterface(cwd=os.path.abspath("."), cores=2, gpus_per_core=0, oversubscribe=False) + self.assertEqual( + command_lst, + interface.generate_command(command_lst=['python', '/', '--host', result_dict['host'], '--zmqport', result_dict['zmqport']]) + ) + self.assertEqual(result_dict, parse_arguments(command_lst)) From 71744e1fb3e00c7af85ac3bfdd53dc22d7558039 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sat, 29 Jul 2023 22:25:28 -0600 Subject: [PATCH 11/20] Add test for connection class --- tests/test_connection.py | 35 +++++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) create mode 100644 tests/test_connection.py diff --git a/tests/test_connection.py b/tests/test_connection.py new file mode 100644 index 00000000..6353dff8 --- /dev/null +++ b/tests/test_connection.py @@ -0,0 +1,35 @@ +import os +import unittest +from pympipool.shared.connections import BaseInterface + + +class Interface(BaseInterface): + def __init__(self, cwd, cores=1, gpus_per_core=0, oversubscribe=False): + super().__init__( + cwd=cwd, + cores=cores, + gpus_per_core=gpus_per_core, + oversubscribe=oversubscribe, + ) + + +class TestExecutor(unittest.TestCase): + def setUp(self): + self.interface = Interface( + cwd=os.path.abspath("."), + cores=1, + gpus_per_core=0, + oversubscribe=False + ) + + def test_bootup(self): + with self.assertRaises(NotImplementedError): + self.interface.bootup(command_lst=[]) + + def test_shutdown(self): + with self.assertRaises(NotImplementedError): + self.interface.shutdown(wait=True) + + def test_poll(self): + with self.assertRaises(NotImplementedError): + self.interface.poll() From f0c694e0db94e256f04bfe9abbd73e9d536e0787 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sat, 29 Jul 2023 22:49:11 -0600 Subject: [PATCH 12/20] More tests --- tests/test_connection.py | 67 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 66 insertions(+), 1 deletion(-) diff --git a/tests/test_connection.py b/tests/test_connection.py index 6353dff8..4ef6a1df 100644 --- a/tests/test_connection.py +++ b/tests/test_connection.py @@ -1,6 +1,13 @@ import os import unittest -from pympipool.shared.connections import BaseInterface +from pympipool.shared.connections import ( + BaseInterface, + MpiExecInterface, + SlurmSubprocessInterface, + PysqaInterface, + FluxCmdInterface, + get_connection_interface +) class Interface(BaseInterface): @@ -33,3 +40,61 @@ def test_shutdown(self): def test_poll(self): with self.assertRaises(NotImplementedError): self.interface.poll() + + +class TestInterfaceConnection(unittest.TestCase): + def test_mpiexec(self): + interface = get_connection_interface( + cwd=os.path.abspath("."), + cores=1, + gpus_per_core=0, + oversubscribe=False, + enable_flux_backend=False, + enable_slurm_backend=False, + queue_adapter=None, + queue_type=None, + queue_adapter_kwargs=None, + ) + self.assertIsInstance(interface, MpiExecInterface) + + def test_slurm(self): + interface = get_connection_interface( + cwd=os.path.abspath("."), + cores=1, + gpus_per_core=0, + oversubscribe=False, + enable_flux_backend=False, + enable_slurm_backend=True, + queue_adapter=None, + queue_type=None, + queue_adapter_kwargs=None, + ) + self.assertIsInstance(interface, SlurmSubprocessInterface) + + def test_pysqa(self): + interface = get_connection_interface( + cwd=os.path.abspath("."), + cores=1, + gpus_per_core=0, + oversubscribe=False, + enable_flux_backend=False, + enable_slurm_backend=False, + queue_adapter=True, + queue_type=None, + queue_adapter_kwargs=None, + ) + self.assertIsInstance(interface, PysqaInterface) + + def test_flux_cmd(self): + interface = get_connection_interface( + cwd=os.path.abspath("."), + cores=1, + gpus_per_core=0, + oversubscribe=False, + enable_flux_backend=True, + enable_slurm_backend=False, + queue_adapter=None, + queue_type=None, + queue_adapter_kwargs=None, + ) + self.assertIsInstance(interface, FluxCmdInterface) From 00d31b23aff7cf9cd832fdaa6df8adafe2acd9af Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sat, 29 Jul 2023 22:55:46 -0600 Subject: [PATCH 13/20] extend parse test --- tests/test_parse.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/tests/test_parse.py b/tests/test_parse.py index eafc6b26..bb12b7ab 100644 --- a/tests/test_parse.py +++ b/tests/test_parse.py @@ -32,17 +32,23 @@ def test_command_flux(self): command_lst = [ 'flux', 'run', '-n', '2', "--cwd=" + os.path.abspath("."), + '--gpus-per-task=1', 'python', '/', '--host', result_dict['host'], '--zmqport', result_dict['zmqport'] ] - interface = FluxCmdInterface(cwd=os.path.abspath("."), cores=2, gpus_per_core=0, oversubscribe=False) + interface = FluxCmdInterface(cwd=os.path.abspath("."), cores=2, gpus_per_core=1, oversubscribe=False) self.assertEqual( command_lst, interface.generate_command(command_lst=['python', '/', '--host', result_dict['host'], '--zmqport', result_dict['zmqport']]) ) self.assertEqual(result_dict, parse_arguments(command_lst)) + def test_mpiexec_gpu(self): + interface = MpiExecInterface(cwd=os.path.abspath("."), cores=2, gpus_per_core=1, oversubscribe=True) + with self.assertRaises(ValueError): + interface.bootup(command_lst=[]) + def test_command_slurm(self): result_dict = { 'host': "127.0.0.1", @@ -51,11 +57,13 @@ def test_command_slurm(self): command_lst = [ 'srun', '-n', '2', "-D", os.path.abspath("."), + '--gpus-per-task=1', + '--oversubscribe', 'python', '/', '--host', result_dict['host'], '--zmqport', result_dict['zmqport'] ] - interface = SlurmSubprocessInterface(cwd=os.path.abspath("."), cores=2, gpus_per_core=0, oversubscribe=False) + interface = SlurmSubprocessInterface(cwd=os.path.abspath("."), cores=2, gpus_per_core=1, oversubscribe=True) self.assertEqual( command_lst, interface.generate_command(command_lst=['python', '/', '--host', result_dict['host'], '--zmqport', result_dict['zmqport']]) From 4ba63c105ffc7a9f763aa7e834c4391d143693b1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sat, 29 Jul 2023 23:06:29 -0600 Subject: [PATCH 14/20] Fixes for working directory --- pympipool/shared/connections.py | 13 ++++++++----- pympipool/shared/taskexecutor.py | 2 +- tests/test_connection.py | 10 +++++----- tests/test_interface.py | 2 +- tests/test_parse.py | 2 +- tests/test_parse_legacy.py | 2 +- 6 files changed, 17 insertions(+), 14 deletions(-) diff --git a/pympipool/shared/connections.py b/pympipool/shared/connections.py index 7bf31240..8ed61982 100644 --- a/pympipool/shared/connections.py +++ b/pympipool/shared/connections.py @@ -20,7 +20,7 @@ def poll(self): class SubprocessInterface(BaseInterface): - def __init__(self, cwd, cores=1, gpus_per_core=0, oversubscribe=False): + def __init__(self, cwd=None, cores=1, gpus_per_core=0, oversubscribe=False): super().__init__( cwd=cwd, cores=cores, @@ -82,7 +82,7 @@ def generate_command(self, command_lst): class PysqaInterface(BaseInterface): def __init__( self, - cwd, + cwd=None, cores=1, gpus_per_core=0, oversubscribe=False, @@ -136,8 +136,11 @@ def generate_command(self, command_lst): "run", "-n", str(self._cores), - "--cwd=" + self._cwd, ] + if self._cwd is not None: + command_prepend_lst += [ + "--cwd=" + self._cwd, + ] if self._gpus_per_core > 0: command_prepend_lst += ["--gpus-per-task=" + str(self._gpus_per_core)] return super().generate_command( @@ -147,7 +150,7 @@ def generate_command(self, command_lst): class FluxPythonInterface(BaseInterface): def __init__( - self, cwd, cores=1, gpus_per_core=0, oversubscribe=False, executor=None + self, cwd=None, cores=1, gpus_per_core=0, oversubscribe=False, executor=None ): super().__init__( cwd=cwd, @@ -204,7 +207,7 @@ def generate_mpiexec_command(cores, gpus_per_core=0, oversubscribe=False): def get_connection_interface( - cwd, + cwd=None, cores=1, gpus_per_core=0, oversubscribe=False, diff --git a/pympipool/shared/taskexecutor.py b/pympipool/shared/taskexecutor.py index 563ae326..a872ae43 100644 --- a/pympipool/shared/taskexecutor.py +++ b/pympipool/shared/taskexecutor.py @@ -92,7 +92,7 @@ def execute_parallel_tasks( def interface_bootup( command_lst, - cwd, + cwd=None, cores=1, gpus_per_core=0, oversubscribe=False, diff --git a/tests/test_connection.py b/tests/test_connection.py index 4ef6a1df..c5abd005 100644 --- a/tests/test_connection.py +++ b/tests/test_connection.py @@ -23,7 +23,7 @@ def __init__(self, cwd, cores=1, gpus_per_core=0, oversubscribe=False): class TestExecutor(unittest.TestCase): def setUp(self): self.interface = Interface( - cwd=os.path.abspath("."), + cwd=None, cores=1, gpus_per_core=0, oversubscribe=False @@ -45,7 +45,7 @@ def test_poll(self): class TestInterfaceConnection(unittest.TestCase): def test_mpiexec(self): interface = get_connection_interface( - cwd=os.path.abspath("."), + cwd=None, cores=1, gpus_per_core=0, oversubscribe=False, @@ -59,7 +59,7 @@ def test_mpiexec(self): def test_slurm(self): interface = get_connection_interface( - cwd=os.path.abspath("."), + cwd=None, cores=1, gpus_per_core=0, oversubscribe=False, @@ -73,7 +73,7 @@ def test_slurm(self): def test_pysqa(self): interface = get_connection_interface( - cwd=os.path.abspath("."), + cwd=None, cores=1, gpus_per_core=0, oversubscribe=False, @@ -87,7 +87,7 @@ def test_pysqa(self): def test_flux_cmd(self): interface = get_connection_interface( - cwd=os.path.abspath("."), + cwd=None, cores=1, gpus_per_core=0, oversubscribe=False, diff --git a/tests/test_interface.py b/tests/test_interface.py index b3562068..abcc3d8d 100644 --- a/tests/test_interface.py +++ b/tests/test_interface.py @@ -17,7 +17,7 @@ def test_interface(self): task_dict = {"fn": calc, 'args': (), "kwargs": {"i": 2}} interface = SocketInterface( interface=MpiExecInterface( - cwd=os.path.abspath("."), + cwd=None, cores=1, gpus_per_core=0, oversubscribe=False diff --git a/tests/test_parse.py b/tests/test_parse.py index bb12b7ab..52b8fda9 100644 --- a/tests/test_parse.py +++ b/tests/test_parse.py @@ -17,7 +17,7 @@ def test_command_local(self): 'python', '/', '--zmqport', result_dict['zmqport'] ] - interface = MpiExecInterface(cwd=os.path.abspath("."), cores=2, gpus_per_core=0, oversubscribe=True) + interface = MpiExecInterface(cwd=None, cores=2, gpus_per_core=0, oversubscribe=True) self.assertEqual( command_lst, interface.generate_command(command_lst=['python', '/', '--zmqport', result_dict['zmqport']]) diff --git a/tests/test_parse_legacy.py b/tests/test_parse_legacy.py index ef0ec746..cef871db 100644 --- a/tests/test_parse_legacy.py +++ b/tests/test_parse_legacy.py @@ -22,7 +22,7 @@ def test_command_local(self): '--cores-total', result_dict['total_cores'] ] interface = MpiExecInterface( - cwd=os.path.abspath("."), + cwd=None, cores=2, gpus_per_core=0, oversubscribe=True From 909d1374d6c6391e7ce12385abb5a49dfa3c73fc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sat, 29 Jul 2023 23:19:05 -0600 Subject: [PATCH 15/20] try to fix mpiexec bug --- pympipool/legacy/interfaces/pool.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pympipool/legacy/interfaces/pool.py b/pympipool/legacy/interfaces/pool.py index b8cec78d..a3ac9bcc 100644 --- a/pympipool/legacy/interfaces/pool.py +++ b/pympipool/legacy/interfaces/pool.py @@ -191,7 +191,7 @@ def __init__( else: # Running MPI parallel tasks within the map() requires mpi4py to use mpi spawn: # https://github.com/mpi4py/mpi4py/issues/324 - command_lst = ["-n", "1", "python", executable] + command_lst = ["python", executable] cores = 1 command_lst += [ "--cores-per-task", From 4ae1193cff72bf19e0269e54fd99c069dd0ad4d2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sat, 29 Jul 2023 23:28:17 -0600 Subject: [PATCH 16/20] Implement central get_pool_command() function --- pympipool/legacy/interfaces/pool.py | 37 ++++++---------------------- pympipool/legacy/shared/interface.py | 36 ++++++++++++++++++--------- 2 files changed, 33 insertions(+), 40 deletions(-) diff --git a/pympipool/legacy/interfaces/pool.py b/pympipool/legacy/interfaces/pool.py index a3ac9bcc..14d0baa9 100644 --- a/pympipool/legacy/interfaces/pool.py +++ b/pympipool/legacy/interfaces/pool.py @@ -2,6 +2,7 @@ import os from pympipool.shared.taskexecutor import cloudpickle_register, interface_bootup +from pympipool.legacy.shared.interface import get_pool_command class PoolBase(ABC): @@ -69,20 +70,11 @@ def __init__( queue_adapter_kwargs=None, ): super().__init__() - command_lst = [ - "python", - "-m", - "mpi4py.futures", - os.path.abspath( - os.path.join(__file__, "..", "..", "backend", "mpipool.py") - ), - "--cores-per-task", - str(1), - "--cores-total", - str(max_workers), - ] self._interface = interface_bootup( - command_lst=command_lst, + command_lst=get_pool_command( + cores_total=max_workers, + ranks_per_task=1 + )[0], cwd=cwd, cores=max_workers, gpus_per_core=gpus_per_task, @@ -182,23 +174,10 @@ def __init__( queue_adapter_kwargs=None, ): super().__init__() - executable = os.path.abspath( - os.path.join(__file__, "..", "..", "backend", "mpipool.py") + command_lst, cores = get_pool_command( + cores_total=max_ranks, + ranks_per_task=ranks_per_task ) - if ranks_per_task == 1: - command_lst = ["python", "-m", "mpi4py.futures", executable] - cores = max_ranks - else: - # Running MPI parallel tasks within the map() requires mpi4py to use mpi spawn: - # https://github.com/mpi4py/mpi4py/issues/324 - command_lst = ["python", executable] - cores = 1 - command_lst += [ - "--cores-per-task", - str(ranks_per_task), - "--cores-total", - str(max_ranks), - ] self._interface = interface_bootup( command_lst=command_lst, cwd=cwd, diff --git a/pympipool/legacy/shared/interface.py b/pympipool/legacy/shared/interface.py index d11930e6..99489f6f 100644 --- a/pympipool/legacy/shared/interface.py +++ b/pympipool/legacy/shared/interface.py @@ -33,18 +33,11 @@ def execute_serial_tasks( queue_adapter_kwargs (dict/None): keyword arguments for the submit_job() function of the queue adapter """ future_dict = {} - command_lst = [ - "python", - "-m", - "mpi4py.futures", - os.path.abspath(os.path.join(__file__, "..", "..", "backend", "mpipool.py")), - "--cores-per-task", - str(1), - "--cores-total", - str(cores), - ] interface = interface_bootup( - command_lst=command_lst, + command_lst=get_pool_command( + cores_total=cores, + ranks_per_task=1 + )[0], cwd=cwd, cores=cores, gpus_per_core=gpus_per_task, @@ -62,6 +55,27 @@ def execute_serial_tasks( ) +def get_pool_command(cores_total, ranks_per_task=1): + executable = os.path.abspath( + os.path.join(__file__, "..", "..", "backend", "mpipool.py") + ) + if ranks_per_task == 1: + command_lst = ["python", "-m", "mpi4py.futures", executable] + cores = cores_total + else: + # Running MPI parallel tasks within the map() requires mpi4py to use mpi spawn: + # https://github.com/mpi4py/mpi4py/issues/324 + command_lst = ["python", executable] + cores = 1 + command_lst += [ + "--cores-per-task", + str(ranks_per_task), + "--cores-total", + str(cores_total), + ] + return command_lst, cores + + def _execute_serial_tasks_loop( interface, future_queue, future_dict, sleep_interval=0.1 ): From 7fb3b20cee2829db8fb63c23216d9e4daa63c9a8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sat, 29 Jul 2023 23:31:23 -0600 Subject: [PATCH 17/20] black formatting --- pympipool/legacy/interfaces/pool.py | 8 ++------ pympipool/legacy/shared/interface.py | 5 +---- 2 files changed, 3 insertions(+), 10 deletions(-) diff --git a/pympipool/legacy/interfaces/pool.py b/pympipool/legacy/interfaces/pool.py index 14d0baa9..df485489 100644 --- a/pympipool/legacy/interfaces/pool.py +++ b/pympipool/legacy/interfaces/pool.py @@ -71,10 +71,7 @@ def __init__( ): super().__init__() self._interface = interface_bootup( - command_lst=get_pool_command( - cores_total=max_workers, - ranks_per_task=1 - )[0], + command_lst=get_pool_command(cores_total=max_workers, ranks_per_task=1)[0], cwd=cwd, cores=max_workers, gpus_per_core=gpus_per_task, @@ -175,8 +172,7 @@ def __init__( ): super().__init__() command_lst, cores = get_pool_command( - cores_total=max_ranks, - ranks_per_task=ranks_per_task + cores_total=max_ranks, ranks_per_task=ranks_per_task ) self._interface = interface_bootup( command_lst=command_lst, diff --git a/pympipool/legacy/shared/interface.py b/pympipool/legacy/shared/interface.py index 99489f6f..e4f425fa 100644 --- a/pympipool/legacy/shared/interface.py +++ b/pympipool/legacy/shared/interface.py @@ -34,10 +34,7 @@ def execute_serial_tasks( """ future_dict = {} interface = interface_bootup( - command_lst=get_pool_command( - cores_total=cores, - ranks_per_task=1 - )[0], + command_lst=get_pool_command(cores_total=cores, ranks_per_task=1)[0], cwd=cwd, cores=cores, gpus_per_core=gpus_per_task, From 71161b59a62b3063876e28aebf3c83031482e1bf Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Sat, 29 Jul 2023 23:43:29 -0600 Subject: [PATCH 18/20] Update __init__.py --- pympipool/__init__.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/pympipool/__init__.py b/pympipool/__init__.py index 412d0658..b440c8ca 100644 --- a/pympipool/__init__.py +++ b/pympipool/__init__.py @@ -12,11 +12,12 @@ from pympipool.shared.thread import RaisingThread from pympipool.shared.taskexecutor import cancel_items_in_queue from pympipool.shared.connections import ( - MpiExecInterface, - SlurmSubprocessInterface, - FluxPythonInterface, + get_connection_interface, FluxCmdInterface, + FluxPythonInterface, + MpiExecInterface, PysqaInterface, + SlurmSubprocessInterface, ) from ._version import get_versions From 9df37cc041440e39735db1f06be98954f7a1eac2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sun, 30 Jul 2023 07:46:53 -0600 Subject: [PATCH 19/20] Move interface_bootup() to communication interface --- pympipool/legacy/interfaces/pool.py | 4 +-- pympipool/legacy/shared/interface.py | 2 +- pympipool/shared/communication.py | 40 +++++++++++++++++++++++++++ pympipool/shared/taskexecutor.py | 41 +--------------------------- 4 files changed, 44 insertions(+), 43 deletions(-) diff --git a/pympipool/legacy/interfaces/pool.py b/pympipool/legacy/interfaces/pool.py index df485489..fffdd60b 100644 --- a/pympipool/legacy/interfaces/pool.py +++ b/pympipool/legacy/interfaces/pool.py @@ -1,7 +1,7 @@ from abc import ABC -import os -from pympipool.shared.taskexecutor import cloudpickle_register, interface_bootup +from pympipool.shared.communication import interface_bootup +from pympipool.shared.taskexecutor import cloudpickle_register from pympipool.legacy.shared.interface import get_pool_command diff --git a/pympipool/legacy/shared/interface.py b/pympipool/legacy/shared/interface.py index e4f425fa..b8afb912 100644 --- a/pympipool/legacy/shared/interface.py +++ b/pympipool/legacy/shared/interface.py @@ -2,7 +2,7 @@ import queue import time -from pympipool.shared.taskexecutor import interface_bootup +from pympipool.shared.communication import interface_bootup def execute_serial_tasks( diff --git a/pympipool/shared/communication.py b/pympipool/shared/communication.py index 75b9661e..c3adf120 100644 --- a/pympipool/shared/communication.py +++ b/pympipool/shared/communication.py @@ -1,6 +1,9 @@ import cloudpickle +import socket import zmq +from pympipool.shared.connections import get_connection_interface + class SocketInterface(object): """ @@ -93,6 +96,43 @@ def __del__(self): self.shutdown(wait=True) +def interface_bootup( + command_lst, + cwd=None, + cores=1, + gpus_per_core=0, + oversubscribe=False, + enable_flux_backend=False, + enable_slurm_backend=False, + queue_adapter=None, + queue_type=None, + queue_adapter_kwargs=None, +): + if enable_flux_backend or enable_slurm_backend or queue_adapter is not None: + command_lst += [ + "--host", + socket.gethostname(), + ] + connections = get_connection_interface( + cwd=cwd, + cores=cores, + gpus_per_core=gpus_per_core, + oversubscribe=oversubscribe, + enable_flux_backend=enable_flux_backend, + enable_slurm_backend=enable_slurm_backend, + queue_adapter=queue_adapter, + queue_type=queue_type, + queue_adapter_kwargs=queue_adapter_kwargs, + ) + interface = SocketInterface(interface=connections) + command_lst += [ + "--zmqport", + str(interface.bind_to_random_port()), + ] + interface.bootup(command_lst=command_lst) + return interface + + def connect_to_socket_interface(host, port): """ Connect to an existing SocketInterface instance by providing the hostname and the port as strings. diff --git a/pympipool/shared/taskexecutor.py b/pympipool/shared/taskexecutor.py index a872ae43..c976bacd 100644 --- a/pympipool/shared/taskexecutor.py +++ b/pympipool/shared/taskexecutor.py @@ -1,12 +1,10 @@ import inspect import os -import socket import queue import cloudpickle -from pympipool.shared.communication import SocketInterface -from pympipool.shared.connections import get_connection_interface +from pympipool.shared.communication import interface_bootup def cancel_items_in_queue(que): @@ -90,43 +88,6 @@ def execute_parallel_tasks( _execute_parallel_tasks_loop(interface=interface, future_queue=future_queue) -def interface_bootup( - command_lst, - cwd=None, - cores=1, - gpus_per_core=0, - oversubscribe=False, - enable_flux_backend=False, - enable_slurm_backend=False, - queue_adapter=None, - queue_type=None, - queue_adapter_kwargs=None, -): - if enable_flux_backend or enable_slurm_backend or queue_adapter is not None: - command_lst += [ - "--host", - socket.gethostname(), - ] - connections = get_connection_interface( - cwd=cwd, - cores=cores, - gpus_per_core=gpus_per_core, - oversubscribe=oversubscribe, - enable_flux_backend=enable_flux_backend, - enable_slurm_backend=enable_slurm_backend, - queue_adapter=queue_adapter, - queue_type=queue_type, - queue_adapter_kwargs=queue_adapter_kwargs, - ) - interface = SocketInterface(interface=connections) - command_lst += [ - "--zmqport", - str(interface.bind_to_random_port()), - ] - interface.bootup(command_lst=command_lst) - return interface - - def _execute_parallel_tasks_loop(interface, future_queue): while True: task_dict = future_queue.get() From ea81df7167abb758ad2d4069f35cc24c24c145d8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sun, 30 Jul 2023 08:24:09 -0600 Subject: [PATCH 20/20] socket interface refactoring --- pympipool/__init__.py | 17 +++++------------ pympipool/backend/mpiexec.py | 20 ++++++++++---------- pympipool/legacy/backend/mpipool.py | 20 ++++++++++---------- pympipool/shared/communication.py | 8 ++++---- tests/test_zmq.py | 18 +++++++++--------- 5 files changed, 38 insertions(+), 45 deletions(-) diff --git a/pympipool/__init__.py b/pympipool/__init__.py index b440c8ca..6a6ff0aa 100644 --- a/pympipool/__init__.py +++ b/pympipool/__init__.py @@ -1,9 +1,10 @@ from pympipool.shared.communication import ( SocketInterface, - connect_to_socket_interface, - send_result, - close_connection, - receive_instruction, + interface_connect, + interface_bootup, + interface_send, + interface_shutdown, + interface_receive, ) from pympipool.interfaces.taskbroker import HPCExecutor from pympipool.interfaces.taskexecutor import Executor @@ -11,14 +12,6 @@ from pympipool.legacy.interfaces.pool import Pool, MPISpawnPool from pympipool.shared.thread import RaisingThread from pympipool.shared.taskexecutor import cancel_items_in_queue -from pympipool.shared.connections import ( - get_connection_interface, - FluxCmdInterface, - FluxPythonInterface, - MpiExecInterface, - PysqaInterface, - SlurmSubprocessInterface, -) from ._version import get_versions diff --git a/pympipool/backend/mpiexec.py b/pympipool/backend/mpiexec.py index 452ad42d..44217cf0 100644 --- a/pympipool/backend/mpiexec.py +++ b/pympipool/backend/mpiexec.py @@ -5,10 +5,10 @@ import cloudpickle from pympipool.shared.communication import ( - connect_to_socket_interface, - send_result, - close_connection, - receive_instruction, + interface_connect, + interface_send, + interface_shutdown, + interface_receive, ) from pympipool.shared.backend import call_funct, parse_arguments @@ -26,7 +26,7 @@ def main(): argument_dict = parse_arguments(argument_lst=sys.argv) if mpi_rank_zero: - context, socket = connect_to_socket_interface( + context, socket = interface_connect( host=argument_dict["host"], port=argument_dict["zmqport"] ) else: @@ -43,7 +43,7 @@ def main(): while True: # Read from socket if mpi_rank_zero: - input_dict = receive_instruction(socket=socket) + input_dict = interface_receive(socket=socket) else: input_dict = None input_dict = MPI.COMM_WORLD.bcast(input_dict, root=0) @@ -51,8 +51,8 @@ def main(): # Parse input if "shutdown" in input_dict.keys() and input_dict["shutdown"]: if mpi_rank_zero: - send_result(socket=socket, result_dict={"result": True}) - close_connection(socket=socket, context=context) + interface_send(socket=socket, result_dict={"result": True}) + interface_shutdown(socket=socket, context=context) break elif ( "fn" in input_dict.keys() @@ -69,14 +69,14 @@ def main(): output_reply = output except Exception as error: if mpi_rank_zero: - send_result( + interface_send( socket=socket, result_dict={"error": error, "error_type": str(type(error))}, ) else: # Send output if mpi_rank_zero: - send_result(socket=socket, result_dict={"result": output_reply}) + interface_send(socket=socket, result_dict={"result": output_reply}) elif ( "init" in input_dict.keys() and input_dict["init"] diff --git a/pympipool/legacy/backend/mpipool.py b/pympipool/legacy/backend/mpipool.py index 584dfcce..60230a57 100644 --- a/pympipool/legacy/backend/mpipool.py +++ b/pympipool/legacy/backend/mpipool.py @@ -5,10 +5,10 @@ import cloudpickle from pympipool.shared.communication import ( - connect_to_socket_interface, - send_result, - close_connection, - receive_instruction, + interface_connect, + interface_send, + interface_shutdown, + interface_receive, ) from pympipool.legacy.shared.backend import parse_socket_communication, parse_arguments @@ -36,27 +36,27 @@ def main(): path=sys.path, # required for flux interface - otherwise the current path is not included in the python path ) as executor: if executor is not None: - context, socket = connect_to_socket_interface( + context, socket = interface_connect( host=argument_dict["host"], port=argument_dict["zmqport"] ) while True: output = parse_socket_communication( executor=executor, - input_dict=receive_instruction(socket=socket), + input_dict=interface_receive(socket=socket), future_dict=future_dict, cores_per_task=int(argument_dict["cores_per_task"]), ) if "exit" in output.keys() and output["exit"]: if "result" in output.keys(): - send_result( + interface_send( socket=socket, result_dict={"result": output["result"]} ) else: - send_result(socket=socket, result_dict={"result": True}) - close_connection(socket=socket, context=context) + interface_send(socket=socket, result_dict={"result": True}) + interface_shutdown(socket=socket, context=context) break elif isinstance(output, dict): - send_result(socket=socket, result_dict=output) + interface_send(socket=socket, result_dict=output) if __name__ == "__main__": diff --git a/pympipool/shared/communication.py b/pympipool/shared/communication.py index c3adf120..35969c33 100644 --- a/pympipool/shared/communication.py +++ b/pympipool/shared/communication.py @@ -133,7 +133,7 @@ def interface_bootup( return interface -def connect_to_socket_interface(host, port): +def interface_connect(host, port): """ Connect to an existing SocketInterface instance by providing the hostname and the port as strings. @@ -147,7 +147,7 @@ def connect_to_socket_interface(host, port): return context, socket -def send_result(socket, result_dict): +def interface_send(socket, result_dict): """ Send results to a SocketInterface instance. @@ -158,7 +158,7 @@ def send_result(socket, result_dict): socket.send(cloudpickle.dumps(result_dict)) -def receive_instruction(socket): +def interface_receive(socket): """ Receive instructions from a SocketInterface instance. @@ -168,7 +168,7 @@ def receive_instruction(socket): return cloudpickle.loads(socket.recv()) -def close_connection(socket, context): +def interface_shutdown(socket, context): """ Close the connection to a SocketInterface instance. diff --git a/tests/test_zmq.py b/tests/test_zmq.py index 2dbe78b6..72b0de06 100644 --- a/tests/test_zmq.py +++ b/tests/test_zmq.py @@ -1,10 +1,10 @@ import unittest import zmq from pympipool.shared.communication import ( - connect_to_socket_interface, - close_connection, - send_result, - receive_instruction + interface_connect, + interface_shutdown, + interface_send, + interface_receive ) @@ -16,8 +16,8 @@ def test_initialize_zmq(self): context_server = zmq.Context() socket_server = context_server.socket(zmq.PAIR) port = str(socket_server.bind_to_random_port("tcp://*")) - context_client, socket_client = connect_to_socket_interface(host=host, port=port) - send_result(socket=socket_server, result_dict={"message": message}) - self.assertEqual(receive_instruction(socket=socket_client), {"message": message}) - close_connection(socket=socket_client, context=context_client) - close_connection(socket=socket_server, context=context_server) + context_client, socket_client = interface_connect(host=host, port=port) + interface_send(socket=socket_server, result_dict={"message": message}) + self.assertEqual(interface_receive(socket=socket_client), {"message": message}) + interface_shutdown(socket=socket_client, context=context_client) + interface_shutdown(socket=socket_server, context=context_server)