diff --git a/pympipool/__init__.py b/pympipool/__init__.py index 73eda1ef..6a6ff0aa 100644 --- a/pympipool/__init__.py +++ b/pympipool/__init__.py @@ -1,9 +1,10 @@ from pympipool.shared.communication import ( SocketInterface, - connect_to_socket_interface, - send_result, - close_connection, - receive_instruction, + interface_connect, + interface_bootup, + interface_send, + interface_shutdown, + interface_receive, ) from pympipool.interfaces.taskbroker import HPCExecutor from pympipool.interfaces.taskexecutor import Executor diff --git a/pympipool/backend/mpiexec.py b/pympipool/backend/mpiexec.py index 452ad42d..44217cf0 100644 --- a/pympipool/backend/mpiexec.py +++ b/pympipool/backend/mpiexec.py @@ -5,10 +5,10 @@ import cloudpickle from pympipool.shared.communication import ( - connect_to_socket_interface, - send_result, - close_connection, - receive_instruction, + interface_connect, + interface_send, + interface_shutdown, + interface_receive, ) from pympipool.shared.backend import call_funct, parse_arguments @@ -26,7 +26,7 @@ def main(): argument_dict = parse_arguments(argument_lst=sys.argv) if mpi_rank_zero: - context, socket = connect_to_socket_interface( + context, socket = interface_connect( host=argument_dict["host"], port=argument_dict["zmqport"] ) else: @@ -43,7 +43,7 @@ def main(): while True: # Read from socket if mpi_rank_zero: - input_dict = receive_instruction(socket=socket) + input_dict = interface_receive(socket=socket) else: input_dict = None input_dict = MPI.COMM_WORLD.bcast(input_dict, root=0) @@ -51,8 +51,8 @@ def main(): # Parse input if "shutdown" in input_dict.keys() and input_dict["shutdown"]: if mpi_rank_zero: - send_result(socket=socket, result_dict={"result": True}) - close_connection(socket=socket, context=context) + interface_send(socket=socket, result_dict={"result": True}) + interface_shutdown(socket=socket, context=context) break elif ( "fn" in input_dict.keys() @@ -69,14 +69,14 @@ def main(): output_reply = output except Exception as error: if mpi_rank_zero: - send_result( + interface_send( socket=socket, result_dict={"error": error, "error_type": str(type(error))}, ) else: # Send output if mpi_rank_zero: - send_result(socket=socket, result_dict={"result": output_reply}) + interface_send(socket=socket, result_dict={"result": output_reply}) elif ( "init" in input_dict.keys() and input_dict["init"] diff --git a/pympipool/legacy/backend/mpipool.py b/pympipool/legacy/backend/mpipool.py index 584dfcce..60230a57 100644 --- a/pympipool/legacy/backend/mpipool.py +++ b/pympipool/legacy/backend/mpipool.py @@ -5,10 +5,10 @@ import cloudpickle from pympipool.shared.communication import ( - connect_to_socket_interface, - send_result, - close_connection, - receive_instruction, + interface_connect, + interface_send, + interface_shutdown, + interface_receive, ) from pympipool.legacy.shared.backend import parse_socket_communication, parse_arguments @@ -36,27 +36,27 @@ def main(): path=sys.path, # required for flux interface - otherwise the current path is not included in the python path ) as executor: if executor is not None: - context, socket = connect_to_socket_interface( + context, socket = interface_connect( host=argument_dict["host"], port=argument_dict["zmqport"] ) while True: output = parse_socket_communication( executor=executor, - input_dict=receive_instruction(socket=socket), + input_dict=interface_receive(socket=socket), future_dict=future_dict, cores_per_task=int(argument_dict["cores_per_task"]), ) if "exit" in output.keys() and output["exit"]: if "result" in output.keys(): - send_result( + interface_send( socket=socket, result_dict={"result": output["result"]} ) else: - send_result(socket=socket, result_dict={"result": True}) - close_connection(socket=socket, context=context) + interface_send(socket=socket, result_dict={"result": True}) + interface_shutdown(socket=socket, context=context) break elif isinstance(output, dict): - send_result(socket=socket, result_dict=output) + interface_send(socket=socket, result_dict=output) if __name__ == "__main__": diff --git a/pympipool/legacy/interfaces/pool.py b/pympipool/legacy/interfaces/pool.py index bf4d5d1c..fffdd60b 100644 --- a/pympipool/legacy/interfaces/pool.py +++ b/pympipool/legacy/interfaces/pool.py @@ -1,8 +1,8 @@ from abc import ABC -from pympipool.shared.communication import SocketInterface +from pympipool.shared.communication import interface_bootup from pympipool.shared.taskexecutor import cloudpickle_register -from pympipool.legacy.shared.interface import get_parallel_subprocess_command +from pympipool.legacy.shared.interface import get_pool_command class PoolBase(ABC): @@ -11,11 +11,9 @@ class PoolBase(ABC): alone. Rather it implements the __enter__(), __exit__() and shutdown() function shared between the derived classes. """ - def __init__(self, queue_adapter=None, queue_adapter_kwargs=None): + def __init__(self): self._future_dict = {} - self._interface = SocketInterface( - queue_adapter=queue_adapter, queue_adapter_kwargs=queue_adapter_kwargs - ) + self._interface = None cloudpickle_register(ind=3) def __enter__(self): @@ -71,23 +69,17 @@ def __init__( queue_adapter=None, queue_adapter_kwargs=None, ): - super().__init__( - queue_adapter=queue_adapter, queue_adapter_kwargs=queue_adapter_kwargs - ) - self._interface.bootup( - command_lst=get_parallel_subprocess_command( - port_selected=self._interface.bind_to_random_port(), - cores=max_workers, - cores_per_task=1, - gpus_per_task=gpus_per_task, - oversubscribe=oversubscribe, - enable_flux_backend=enable_flux_backend, - enable_slurm_backend=enable_slurm_backend, - enable_mpi4py_backend=True, - enable_multi_host=queue_adapter is not None, - ), + super().__init__() + self._interface = interface_bootup( + command_lst=get_pool_command(cores_total=max_workers, ranks_per_task=1)[0], cwd=cwd, cores=max_workers, + gpus_per_core=gpus_per_task, + oversubscribe=oversubscribe, + enable_flux_backend=enable_flux_backend, + enable_slurm_backend=enable_slurm_backend, + queue_adapter=queue_adapter, + queue_adapter_kwargs=queue_adapter_kwargs, ) def map(self, func, iterable, chunksize=None): @@ -178,23 +170,20 @@ def __init__( queue_adapter=None, queue_adapter_kwargs=None, ): - super().__init__( - queue_adapter=queue_adapter, queue_adapter_kwargs=queue_adapter_kwargs + super().__init__() + command_lst, cores = get_pool_command( + cores_total=max_ranks, ranks_per_task=ranks_per_task ) - self._interface.bootup( - command_lst=get_parallel_subprocess_command( - port_selected=self._interface.bind_to_random_port(), - cores=max_ranks, - cores_per_task=ranks_per_task, - gpus_per_task=gpus_per_task, - oversubscribe=oversubscribe, - enable_flux_backend=False, - enable_slurm_backend=False, - enable_mpi4py_backend=True, - enable_multi_host=queue_adapter is not None, - ), + self._interface = interface_bootup( + command_lst=command_lst, cwd=cwd, - cores=max_ranks, + cores=cores, + gpus_per_core=gpus_per_task, + oversubscribe=oversubscribe, + enable_flux_backend=False, + enable_slurm_backend=False, + queue_adapter=queue_adapter, + queue_adapter_kwargs=queue_adapter_kwargs, ) def map(self, func, iterable, chunksize=None): diff --git a/pympipool/legacy/shared/interface.py b/pympipool/legacy/shared/interface.py index d181d15f..b8afb912 100644 --- a/pympipool/legacy/shared/interface.py +++ b/pympipool/legacy/shared/interface.py @@ -1,133 +1,8 @@ import os import queue -import socket import time - -from pympipool.shared.communication import SocketInterface - - -def command_line_options( - hostname, - port_selected, - path, - cores, - cores_per_task=1, - gpus_per_task=0, - oversubscribe=False, - enable_flux_backend=False, - enable_slurm_backend=False, - enable_mpi4py_backend=True, - enable_multi_host=False, -): - """ - Translate the individual parameters to command line options. - - Args: - hostname (str): name of the host where the SocketInterface instance runs the client process should conenct to. - port_selected (int): port the SocketInterface instance runs on. - path (str): path to the python script which should be executed as client process. - cores (int): defines the total number of MPI ranks to use - cores_per_task (int): number of MPI ranks per task - defaults to 1 - gpus_per_task (int): number of GPUs per MPI rank - defaults to 0 - oversubscribe (bool): enable of disable the oversubscribe feature of OpenMPI - defaults to False - enable_flux_backend (bool): enable the flux-framework as backend - defaults to False - enable_slurm_backend (bool): enable the SLURM queueing system as backend - defaults to False - enable_mpi4py_backend (bool): enable the mpi4py.futures module - defaults to True - enable_multi_host (bool): communicate the host to connect to - defaults to False - - Returns: - list: list of strings to be executed on the command line - """ - if enable_flux_backend: - command_lst = ["flux", "run"] - elif enable_slurm_backend: - command_lst = ["srun"] - else: - command_lst = ["mpiexec"] - if gpus_per_task > 0 and (enable_flux_backend or enable_slurm_backend): - command_lst += ["--gpus-per-task=" + str(gpus_per_task)] - elif gpus_per_task > 0: - raise ValueError("GPU binding is only supported for flux and SLURM backend.") - if oversubscribe: - command_lst += ["--oversubscribe"] - if cores_per_task == 1 and enable_mpi4py_backend: - command_lst += ["-n", str(cores), "python", "-m", "mpi4py.futures"] - elif cores_per_task > 1 and enable_mpi4py_backend: - # Running MPI parallel tasks within the map() requires mpi4py to use mpi spawn: - # https://github.com/mpi4py/mpi4py/issues/324 - command_lst += ["-n", "1", "python"] - else: - command_lst += ["-n", str(cores), "python"] - command_lst += [path] - if enable_flux_backend or enable_slurm_backend or enable_multi_host: - command_lst += [ - "--host", - hostname, - ] - command_lst += [ - "--zmqport", - str(port_selected), - ] - if enable_mpi4py_backend: - command_lst += [ - "--cores-per-task", - str(cores_per_task), - "--cores-total", - str(cores), - ] - return command_lst - - -def get_parallel_subprocess_command( - port_selected, - cores, - cores_per_task=1, - gpus_per_task=0, - oversubscribe=False, - enable_flux_backend=False, - enable_slurm_backend=False, - enable_mpi4py_backend=True, - enable_multi_host=False, -): - """ - Translate the individual parameters to command line options. - - Args: - port_selected (int): port the SocketInterface instance runs on. - cores (int): defines the total number of MPI ranks to use - cores_per_task (int): number of MPI ranks per task - defaults to 1 - gpus_per_task (int): number of GPUs per MPI rank - defaults to 0 - oversubscribe (bool): enable of disable the oversubscribe feature of OpenMPI - defaults to False - enable_flux_backend (bool): enable the flux-framework as backend - defaults to False - enable_slurm_backend (bool): enable the SLURM queueing system as backend - defaults to False - enable_mpi4py_backend (bool): enable the mpi4py.futures module - defaults to True - enable_multi_host (bool): communicate the host to connect to - defaults to False - - Returns: - list: list of strings to be executed on the command line - """ - if enable_mpi4py_backend: - executable = os.path.abspath( - os.path.join(__file__, "..", "..", "backend", "mpipool.py") - ) - else: - executable = os.path.abspath( - os.path.join(__file__, "..", "..", "..", "backend", "mpiexec.py") - ) - return command_line_options( - hostname=socket.gethostname(), - port_selected=port_selected, - path=executable, - cores=cores, - cores_per_task=cores_per_task, - gpus_per_task=gpus_per_task, - oversubscribe=oversubscribe, - enable_flux_backend=enable_flux_backend, - enable_slurm_backend=enable_slurm_backend, - enable_mpi4py_backend=enable_mpi4py_backend, - enable_multi_host=enable_multi_host, - ) +from pympipool.shared.communication import interface_bootup def execute_serial_tasks( @@ -158,23 +33,16 @@ def execute_serial_tasks( queue_adapter_kwargs (dict/None): keyword arguments for the submit_job() function of the queue adapter """ future_dict = {} - interface = SocketInterface( - queue_adapter=queue_adapter, queue_adapter_kwargs=queue_adapter_kwargs - ) - interface.bootup( - command_lst=get_parallel_subprocess_command( - port_selected=interface.bind_to_random_port(), - cores=cores, - gpus_per_task=gpus_per_task, - cores_per_task=1, - oversubscribe=oversubscribe, - enable_flux_backend=enable_flux_backend, - enable_slurm_backend=enable_slurm_backend, - enable_mpi4py_backend=True, - enable_multi_host=queue_adapter is not None, - ), + interface = interface_bootup( + command_lst=get_pool_command(cores_total=cores, ranks_per_task=1)[0], cwd=cwd, cores=cores, + gpus_per_core=gpus_per_task, + oversubscribe=oversubscribe, + enable_flux_backend=enable_flux_backend, + enable_slurm_backend=enable_slurm_backend, + queue_adapter=queue_adapter, + queue_adapter_kwargs=queue_adapter_kwargs, ) _execute_serial_tasks_loop( interface=interface, @@ -184,6 +52,27 @@ def execute_serial_tasks( ) +def get_pool_command(cores_total, ranks_per_task=1): + executable = os.path.abspath( + os.path.join(__file__, "..", "..", "backend", "mpipool.py") + ) + if ranks_per_task == 1: + command_lst = ["python", "-m", "mpi4py.futures", executable] + cores = cores_total + else: + # Running MPI parallel tasks within the map() requires mpi4py to use mpi spawn: + # https://github.com/mpi4py/mpi4py/issues/324 + command_lst = ["python", executable] + cores = 1 + command_lst += [ + "--cores-per-task", + str(ranks_per_task), + "--cores-total", + str(cores_total), + ] + return command_lst, cores + + def _execute_serial_tasks_loop( interface, future_queue, future_dict, sleep_interval=0.1 ): diff --git a/pympipool/shared/communication.py b/pympipool/shared/communication.py index e70641ff..35969c33 100644 --- a/pympipool/shared/communication.py +++ b/pympipool/shared/communication.py @@ -1,24 +1,23 @@ -import subprocess - import cloudpickle +import socket import zmq +from pympipool.shared.connections import get_connection_interface + class SocketInterface(object): """ The SocketInterface is an abstraction layer on top of the zero message queue. Args: - queue_adapter (pysqa.queueadapter.QueueAdapter): generalized interface to various queuing systems - queue_adapter_kwargs (dict/None): keyword arguments for the submit_job() function of the queue adapter + interface (pympipool.shared.connections.BaseInterface): Interface for starting the parallel process """ - def __init__(self, queue_adapter=None, queue_adapter_kwargs=None): + def __init__(self, interface=None): self._context = zmq.Context() self._socket = self._context.socket(zmq.PAIR) self._process = None - self._queue_adapter = queue_adapter - self._queue_adapter_kwargs = queue_adapter_kwargs + self._interface = interface def send_dict(self, input_dict): """ @@ -68,44 +67,22 @@ def bind_to_random_port(self): """ return self._socket.bind_to_random_port("tcp://*") - def bootup(self, command_lst, cwd=None, cores=None): + def bootup(self, command_lst): """ Boot up the client process to connect to the SocketInterface. Args: command_lst (list): list of strings to start the client process - cwd (str/None): current working directory where the parallel python task is executed - cores (str/ None): if the job is submitted to a queuing system using the pysqa.queueadapter.QueueAdapter - then cores defines the number of cores to be used for the specific queuing system allocation to execute - the client process. """ - if self._queue_adapter is not None: - self._queue_adapter.submit_job( - working_directory=cwd, - cores=cores, - command=" ".join(command_lst), - **self._queue_adapter_kwargs - ) - else: - self._process = subprocess.Popen( - args=command_lst, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - stdin=subprocess.PIPE, - cwd=cwd, - ) + self._interface.bootup(command_lst=command_lst) def shutdown(self, wait=True): result = None - if self._process is not None and self._process.poll() is None: - result = self.send_and_receive_dict( - input_dict={"shutdown": True, "wait": wait} - ) - self._process_close(wait=wait) - elif self._queue_adapter is not None and self._socket is not None: + if self._interface.poll(): result = self.send_and_receive_dict( input_dict={"shutdown": True, "wait": wait} ) + self._interface.shutdown(wait=wait) if self._socket is not None: self._socket.close() if self._context is not None: @@ -115,19 +92,48 @@ def shutdown(self, wait=True): self._context = None return result - def _process_close(self, wait=True): - self._process.terminate() - self._process.stdout.close() - self._process.stdin.close() - self._process.stderr.close() - if wait: - self._process.wait() - def __del__(self): self.shutdown(wait=True) -def connect_to_socket_interface(host, port): +def interface_bootup( + command_lst, + cwd=None, + cores=1, + gpus_per_core=0, + oversubscribe=False, + enable_flux_backend=False, + enable_slurm_backend=False, + queue_adapter=None, + queue_type=None, + queue_adapter_kwargs=None, +): + if enable_flux_backend or enable_slurm_backend or queue_adapter is not None: + command_lst += [ + "--host", + socket.gethostname(), + ] + connections = get_connection_interface( + cwd=cwd, + cores=cores, + gpus_per_core=gpus_per_core, + oversubscribe=oversubscribe, + enable_flux_backend=enable_flux_backend, + enable_slurm_backend=enable_slurm_backend, + queue_adapter=queue_adapter, + queue_type=queue_type, + queue_adapter_kwargs=queue_adapter_kwargs, + ) + interface = SocketInterface(interface=connections) + command_lst += [ + "--zmqport", + str(interface.bind_to_random_port()), + ] + interface.bootup(command_lst=command_lst) + return interface + + +def interface_connect(host, port): """ Connect to an existing SocketInterface instance by providing the hostname and the port as strings. @@ -141,7 +147,7 @@ def connect_to_socket_interface(host, port): return context, socket -def send_result(socket, result_dict): +def interface_send(socket, result_dict): """ Send results to a SocketInterface instance. @@ -152,7 +158,7 @@ def send_result(socket, result_dict): socket.send(cloudpickle.dumps(result_dict)) -def receive_instruction(socket): +def interface_receive(socket): """ Receive instructions from a SocketInterface instance. @@ -162,7 +168,7 @@ def receive_instruction(socket): return cloudpickle.loads(socket.recv()) -def close_connection(socket, context): +def interface_shutdown(socket, context): """ Close the connection to a SocketInterface instance. diff --git a/pympipool/shared/connections.py b/pympipool/shared/connections.py new file mode 100644 index 00000000..8ed61982 --- /dev/null +++ b/pympipool/shared/connections.py @@ -0,0 +1,268 @@ +from abc import ABC +import subprocess + + +class BaseInterface(ABC): + def __init__(self, cwd, cores=1, gpus_per_core=0, oversubscribe=False): + self._cwd = cwd + self._cores = cores + self._gpus_per_core = gpus_per_core + self._oversubscribe = oversubscribe + + def bootup(self, command_lst): + raise NotImplementedError + + def shutdown(self, wait=True): + raise NotImplementedError + + def poll(self): + raise NotImplementedError + + +class SubprocessInterface(BaseInterface): + def __init__(self, cwd=None, cores=1, gpus_per_core=0, oversubscribe=False): + super().__init__( + cwd=cwd, + cores=cores, + gpus_per_core=gpus_per_core, + oversubscribe=oversubscribe, + ) + self._process = None + + def bootup(self, command_lst): + self._process = subprocess.Popen( + args=self.generate_command(command_lst=command_lst), + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + stdin=subprocess.PIPE, + cwd=self._cwd, + ) + + def generate_command(self, command_lst): + return command_lst + + def shutdown(self, wait=True): + self._process.terminate() + self._process.stdout.close() + self._process.stdin.close() + self._process.stderr.close() + if wait: + self._process.wait() + self._process = None + + def poll(self): + return self._process is not None and self._process.poll() is None + + +class MpiExecInterface(SubprocessInterface): + def generate_command(self, command_lst): + command_prepend_lst = generate_mpiexec_command( + cores=self._cores, + gpus_per_core=self._gpus_per_core, + oversubscribe=self._oversubscribe, + ) + return super().generate_command( + command_lst=command_prepend_lst + command_lst, + ) + + +class SlurmSubprocessInterface(SubprocessInterface): + def generate_command(self, command_lst): + command_prepend_lst = generate_slurm_command( + cores=self._cores, + cwd=self._cwd, + gpus_per_core=self._gpus_per_core, + oversubscribe=self._oversubscribe, + ) + return super().generate_command( + command_lst=command_prepend_lst + command_lst, + ) + + +class PysqaInterface(BaseInterface): + def __init__( + self, + cwd=None, + cores=1, + gpus_per_core=0, + oversubscribe=False, + queue_adapter=None, + queue_type=None, + queue_adapter_kwargs=None, + ): + super().__init__( + cwd=cwd, + cores=cores, + gpus_per_core=gpus_per_core, + oversubscribe=oversubscribe, + ) + self._queue_adapter = queue_adapter + self._queue_type = queue_type + self._queue_adapter_kwargs = queue_adapter_kwargs + self._queue_id = None + + def bootup(self, command_lst): + if self._queue_type.lower() == "slurm": + command_prepend_lst = generate_slurm_command( + cores=self._cores, + cwd=self._cwd, + gpus_per_core=self._gpus_per_core, + oversubscribe=self._oversubscribe, + ) + else: + command_prepend_lst = generate_mpiexec_command( + cores=self._cores, + gpus_per_core=self._gpus_per_core, + oversubscribe=self._oversubscribe, + ) + self._queue_id = self._queue_adapter.submit_job( + working_directory=self._cwd, + cores=self._cores, + command=" ".join(command_prepend_lst + command_lst), + **self._queue_adapter_kwargs + ) + + def shutdown(self, wait=True): + self._queue_adapter.delete_job(process_id=self._queue_id) + + def poll(self): + return self._queue_adapter is not None + + +class FluxCmdInterface(SubprocessInterface): + def generate_command(self, command_lst): + command_prepend_lst = [ + "flux", + "run", + "-n", + str(self._cores), + ] + if self._cwd is not None: + command_prepend_lst += [ + "--cwd=" + self._cwd, + ] + if self._gpus_per_core > 0: + command_prepend_lst += ["--gpus-per-task=" + str(self._gpus_per_core)] + return super().generate_command( + command_lst=command_prepend_lst + command_lst, + ) + + +class FluxPythonInterface(BaseInterface): + def __init__( + self, cwd=None, cores=1, gpus_per_core=0, oversubscribe=False, executor=None + ): + super().__init__( + cwd=cwd, + cores=cores, + gpus_per_core=gpus_per_core, + oversubscribe=oversubscribe, + ) + self._executor = executor + self._future = None + + def bootup(self, command_lst): + import flux.job + + if self._oversubscribe: + raise ValueError( + "Oversubscribing is currently not supported for the Flux adapter." + ) + if self._executor is None: + self._executor = flux.job.FluxExecutor() + jobspec = flux.job.JobspecV1.from_command( + command=" ".join(command_lst), + num_tasks=1, + cores_per_task=self._cores, + gpus_per_task=self._gpus_per_core, + num_nodes=None, + exclusive=False, + ) + jobspec.cwd = self._cwd + self._future = self._executor.submit(jobspec) + + def shutdown(self, wait=True): + self._executor.shutdown(wait=wait) + + def poll(self): + return self._executor is not None + + +def generate_slurm_command(cores, cwd, gpus_per_core=0, oversubscribe=False): + command_prepend_lst = ["srun", "-n", str(cores), "-D", cwd] + if gpus_per_core > 0: + command_prepend_lst += ["--gpus-per-task=" + str(gpus_per_core)] + if oversubscribe: + command_prepend_lst += ["--oversubscribe"] + return command_prepend_lst + + +def generate_mpiexec_command(cores, gpus_per_core=0, oversubscribe=False): + command_prepend_lst = ["mpiexec", "-n", str(cores)] + if oversubscribe: + command_prepend_lst += ["--oversubscribe"] + if gpus_per_core > 0: + raise ValueError() + return command_prepend_lst + + +def get_connection_interface( + cwd=None, + cores=1, + gpus_per_core=0, + oversubscribe=False, + enable_flux_backend=False, + enable_slurm_backend=False, + queue_adapter=None, + queue_type=None, + queue_adapter_kwargs=None, +): + """ + Backwards compatibility adapter to get the connection interface + + Args: + cwd (str/None): current working directory where the parallel python task is executed + cores (int): defines the total number of MPI ranks to use + gpus_per_core (int): number of GPUs per MPI rank - defaults to 0 + oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI only) - default False + enable_flux_backend (bool): use the flux-framework as backend rather than just calling mpiexec + enable_slurm_backend (bool): enable the SLURM queueing system as backend - defaults to False + queue_adapter (pysqa.queueadapter.QueueAdapter): generalized interface to various queuing systems + queue_type (str): type of the queuing system + queue_adapter_kwargs (dict/None): keyword arguments for the submit_job() function of the queue adapter + + Returns: + pympipool.shared.connections.BaseInterface: Connection interface + """ + if queue_adapter is not None: + connections = PysqaInterface( + cwd=cwd, + cores=cores, + gpus_per_core=gpus_per_core, + oversubscribe=oversubscribe, + queue_adapter=queue_adapter, + queue_type=queue_type, + queue_adapter_kwargs=queue_adapter_kwargs, + ) + elif enable_flux_backend: + connections = FluxCmdInterface( + cwd=cwd, + cores=cores, + gpus_per_core=gpus_per_core, + oversubscribe=oversubscribe, + ) + elif enable_slurm_backend: + connections = SlurmSubprocessInterface( + cwd=cwd, + cores=cores, + gpus_per_core=gpus_per_core, + oversubscribe=oversubscribe, + ) + else: + connections = MpiExecInterface( + cwd=cwd, + cores=cores, + gpus_per_core=gpus_per_core, + oversubscribe=oversubscribe, + ) + return connections diff --git a/pympipool/shared/taskexecutor.py b/pympipool/shared/taskexecutor.py index 5e393f2e..c976bacd 100644 --- a/pympipool/shared/taskexecutor.py +++ b/pympipool/shared/taskexecutor.py @@ -1,11 +1,10 @@ import inspect import os -import socket import queue import cloudpickle -from pympipool.shared.communication import SocketInterface +from pympipool.shared.communication import interface_bootup def cancel_items_in_queue(que): @@ -46,59 +45,6 @@ def cloudpickle_register(ind=2): pass -def command_line_options( - hostname, - port_selected, - path, - cores, - gpus_per_task=0, - oversubscribe=False, - enable_flux_backend=False, - enable_slurm_backend=False, - enable_multi_host=False, -): - """ - Translate the individual parameters to command line options. - - Args: - hostname (str): name of the host where the SocketInterface instance runs the client process should conenct to. - port_selected (int): port the SocketInterface instance runs on. - path (str): path to the python script which should be executed as client process. - cores (int): defines the total number of MPI ranks to use - gpus_per_task (int): number of GPUs per MPI rank - defaults to 0 - oversubscribe (bool): enable of disable the oversubscribe feature of OpenMPI - defaults to False - enable_flux_backend (bool): enable the flux-framework as backend - defaults to False - enable_slurm_backend (bool): enable the SLURM queueing system as backend - defaults to False - enable_multi_host (bool): communicate the host to connect to - defaults to False - - Returns: - list: list of strings to be executed on the command line - """ - if enable_flux_backend: - command_lst = ["flux", "run"] - elif enable_slurm_backend: - command_lst = ["srun"] - else: - command_lst = ["mpiexec"] - if gpus_per_task > 0 and (enable_flux_backend or enable_slurm_backend): - command_lst += ["--gpus-per-task=" + str(gpus_per_task)] - elif gpus_per_task > 0: - raise ValueError("GPU binding is only supported for flux and SLURM backend.") - if oversubscribe: - command_lst += ["--oversubscribe"] - command_lst += ["-n", str(cores), "python", path] - if enable_flux_backend or enable_slurm_backend or enable_multi_host: - command_lst += [ - "--host", - hostname, - ] - command_lst += [ - "--zmqport", - str(port_selected), - ] - return command_lst - - def execute_parallel_tasks( future_queue, cores, @@ -124,25 +70,20 @@ def execute_parallel_tasks( queue_adapter (pysqa.queueadapter.QueueAdapter): generalized interface to various queuing systems queue_adapter_kwargs (dict/None): keyword arguments for the submit_job() function of the queue adapter """ - interface = SocketInterface( - queue_adapter=queue_adapter, queue_adapter_kwargs=queue_adapter_kwargs - ) - interface.bootup( - command_lst=command_line_options( - hostname=socket.gethostname(), - port_selected=interface.bind_to_random_port(), - path=os.path.abspath( - os.path.join(__file__, "..", "..", "backend", "mpiexec.py") - ), - cores=cores, - gpus_per_task=gpus_per_task, - oversubscribe=oversubscribe, - enable_flux_backend=enable_flux_backend, - enable_slurm_backend=enable_slurm_backend, - enable_multi_host=queue_adapter is not None, - ), + command_lst = [ + "python", + os.path.abspath(os.path.join(__file__, "..", "..", "backend", "mpiexec.py")), + ] + interface = interface_bootup( + command_lst=command_lst, cwd=cwd, cores=cores, + gpus_per_core=gpus_per_task, + oversubscribe=oversubscribe, + enable_flux_backend=enable_flux_backend, + enable_slurm_backend=enable_slurm_backend, + queue_adapter=queue_adapter, + queue_adapter_kwargs=queue_adapter_kwargs, ) _execute_parallel_tasks_loop(interface=interface, future_queue=future_queue) diff --git a/tests/test_connection.py b/tests/test_connection.py new file mode 100644 index 00000000..c5abd005 --- /dev/null +++ b/tests/test_connection.py @@ -0,0 +1,100 @@ +import os +import unittest +from pympipool.shared.connections import ( + BaseInterface, + MpiExecInterface, + SlurmSubprocessInterface, + PysqaInterface, + FluxCmdInterface, + get_connection_interface +) + + +class Interface(BaseInterface): + def __init__(self, cwd, cores=1, gpus_per_core=0, oversubscribe=False): + super().__init__( + cwd=cwd, + cores=cores, + gpus_per_core=gpus_per_core, + oversubscribe=oversubscribe, + ) + + +class TestExecutor(unittest.TestCase): + def setUp(self): + self.interface = Interface( + cwd=None, + cores=1, + gpus_per_core=0, + oversubscribe=False + ) + + def test_bootup(self): + with self.assertRaises(NotImplementedError): + self.interface.bootup(command_lst=[]) + + def test_shutdown(self): + with self.assertRaises(NotImplementedError): + self.interface.shutdown(wait=True) + + def test_poll(self): + with self.assertRaises(NotImplementedError): + self.interface.poll() + + +class TestInterfaceConnection(unittest.TestCase): + def test_mpiexec(self): + interface = get_connection_interface( + cwd=None, + cores=1, + gpus_per_core=0, + oversubscribe=False, + enable_flux_backend=False, + enable_slurm_backend=False, + queue_adapter=None, + queue_type=None, + queue_adapter_kwargs=None, + ) + self.assertIsInstance(interface, MpiExecInterface) + + def test_slurm(self): + interface = get_connection_interface( + cwd=None, + cores=1, + gpus_per_core=0, + oversubscribe=False, + enable_flux_backend=False, + enable_slurm_backend=True, + queue_adapter=None, + queue_type=None, + queue_adapter_kwargs=None, + ) + self.assertIsInstance(interface, SlurmSubprocessInterface) + + def test_pysqa(self): + interface = get_connection_interface( + cwd=None, + cores=1, + gpus_per_core=0, + oversubscribe=False, + enable_flux_backend=False, + enable_slurm_backend=False, + queue_adapter=True, + queue_type=None, + queue_adapter_kwargs=None, + ) + self.assertIsInstance(interface, PysqaInterface) + + def test_flux_cmd(self): + interface = get_connection_interface( + cwd=None, + cores=1, + gpus_per_core=0, + oversubscribe=False, + enable_flux_backend=True, + enable_slurm_backend=False, + queue_adapter=None, + queue_type=None, + queue_adapter_kwargs=None, + ) + self.assertIsInstance(interface, FluxCmdInterface) diff --git a/tests/test_interface.py b/tests/test_interface.py index 0d717766..abcc3d8d 100644 --- a/tests/test_interface.py +++ b/tests/test_interface.py @@ -1,10 +1,10 @@ import os -import socket import numpy as np import unittest from pympipool.shared.communication import SocketInterface -from pympipool.shared.taskexecutor import command_line_options, cloudpickle_register +from pympipool.shared.taskexecutor import cloudpickle_register +from pympipool.shared.connections import MpiExecInterface def calc(i): @@ -15,19 +15,19 @@ class TestInterface(unittest.TestCase): def test_interface(self): cloudpickle_register(ind=1) task_dict = {"fn": calc, 'args': (), "kwargs": {"i": 2}} - interface = SocketInterface(queue_adapter=None, queue_adapter_kwargs=None) - interface.bootup( - command_lst=command_line_options( - hostname=socket.gethostname(), - port_selected=interface.bind_to_random_port(), - path=os.path.abspath(os.path.join(__file__, "..", "..", "pympipool", "backend", "mpiexec.py")), + interface = SocketInterface( + interface=MpiExecInterface( + cwd=None, cores=1, - gpus_per_task=0, - oversubscribe=False, - enable_flux_backend=False, - enable_slurm_backend=False, - enable_multi_host=False, + gpus_per_core=0, + oversubscribe=False ) ) + interface.bootup(command_lst=[ + "python", + os.path.abspath(os.path.join(__file__, "..", "..", "pympipool", "backend", "mpiexec.py")), + "--zmqport", + str(interface.bind_to_random_port()), + ]) self.assertEqual(interface.send_and_receive_dict(input_dict=task_dict), np.array(4)) interface.shutdown(wait=True) \ No newline at end of file diff --git a/tests/test_parse.py b/tests/test_parse.py index ebbe97f1..52b8fda9 100644 --- a/tests/test_parse.py +++ b/tests/test_parse.py @@ -1,6 +1,7 @@ +import os import unittest from pympipool.shared.backend import parse_arguments -from pympipool.shared.taskexecutor import command_line_options +from pympipool.shared.connections import MpiExecInterface, FluxCmdInterface, SlurmSubprocessInterface class TestParser(unittest.TestCase): @@ -10,19 +11,17 @@ def test_command_local(self): 'zmqport': '22', } command_lst = [ - 'mpiexec', '--oversubscribe', + 'mpiexec', '-n', '2', + '--oversubscribe', 'python', '/', '--zmqport', result_dict['zmqport'] ] - self.assertEqual(command_lst, command_line_options( - hostname=result_dict['host'], - port_selected=result_dict['zmqport'], - path="/", - cores=2, - oversubscribe=True, - enable_flux_backend=False, - )) + interface = MpiExecInterface(cwd=None, cores=2, gpus_per_core=0, oversubscribe=True) + self.assertEqual( + command_lst, + interface.generate_command(command_lst=['python', '/', '--zmqport', result_dict['zmqport']]) + ) self.assertEqual(result_dict, parse_arguments(command_lst)) def test_command_flux(self): @@ -31,16 +30,42 @@ def test_command_flux(self): 'zmqport': '22', } command_lst = [ - 'flux', 'run', '-n', '2', 'python', '/', + 'flux', 'run', '-n', '2', + "--cwd=" + os.path.abspath("."), + '--gpus-per-task=1', + 'python', '/', + '--host', result_dict['host'], + '--zmqport', result_dict['zmqport'] + ] + interface = FluxCmdInterface(cwd=os.path.abspath("."), cores=2, gpus_per_core=1, oversubscribe=False) + self.assertEqual( + command_lst, + interface.generate_command(command_lst=['python', '/', '--host', result_dict['host'], '--zmqport', result_dict['zmqport']]) + ) + self.assertEqual(result_dict, parse_arguments(command_lst)) + + def test_mpiexec_gpu(self): + interface = MpiExecInterface(cwd=os.path.abspath("."), cores=2, gpus_per_core=1, oversubscribe=True) + with self.assertRaises(ValueError): + interface.bootup(command_lst=[]) + + def test_command_slurm(self): + result_dict = { + 'host': "127.0.0.1", + 'zmqport': '22', + } + command_lst = [ + 'srun', '-n', '2', + "-D", os.path.abspath("."), + '--gpus-per-task=1', + '--oversubscribe', + 'python', '/', '--host', result_dict['host'], '--zmqport', result_dict['zmqport'] ] - self.assertEqual(command_lst, command_line_options( - hostname=result_dict['host'], - port_selected=result_dict['zmqport'], - path="/", - cores=2, - oversubscribe=False, - enable_flux_backend=True, - )) + interface = SlurmSubprocessInterface(cwd=os.path.abspath("."), cores=2, gpus_per_core=1, oversubscribe=True) + self.assertEqual( + command_lst, + interface.generate_command(command_lst=['python', '/', '--host', result_dict['host'], '--zmqport', result_dict['zmqport']]) + ) self.assertEqual(result_dict, parse_arguments(command_lst)) diff --git a/tests/test_parse_legacy.py b/tests/test_parse_legacy.py index c4f28b76..cef871db 100644 --- a/tests/test_parse_legacy.py +++ b/tests/test_parse_legacy.py @@ -1,6 +1,7 @@ import unittest +import os from pympipool.legacy.shared.backend import parse_arguments -from pympipool.legacy.shared.interface import command_line_options +from pympipool.shared.connections import MpiExecInterface, FluxCmdInterface class TestParser(unittest.TestCase): @@ -12,22 +13,30 @@ def test_command_local(self): 'cores_per_task': '1' } command_lst = [ - 'mpiexec', '--oversubscribe', + 'mpiexec', '-n', result_dict['total_cores'], + '--oversubscribe', 'python', '-m', 'mpi4py.futures', '/', '--zmqport', result_dict['zmqport'], '--cores-per-task', result_dict['cores_per_task'], '--cores-total', result_dict['total_cores'] ] - self.assertEqual(command_lst, command_line_options( - hostname=result_dict['host'], - port_selected=result_dict['zmqport'], - path="/", - cores=int(result_dict['total_cores']), - cores_per_task=int(result_dict['cores_per_task']), - oversubscribe=True, - enable_flux_backend=False, - )) + interface = MpiExecInterface( + cwd=None, + cores=2, + gpus_per_core=0, + oversubscribe=True + ) + self.assertEqual( + command_lst, + interface.generate_command( + command_lst=[ + 'python', '-m', 'mpi4py.futures', '/', + '--zmqport', result_dict['zmqport'], + '--cores-per-task', '1', '--cores-total', '2' + ] + ) + ) self.assertEqual(result_dict, parse_arguments(command_lst)) def test_command_flux(self): @@ -38,19 +47,28 @@ def test_command_flux(self): 'cores_per_task': '2' } command_lst = [ - 'flux', 'run', '-n', '1', 'python', '/', + 'flux', 'run', '-n', '1', + "--cwd=" + os.path.abspath("."), + 'python', '/', '--host', result_dict['host'], '--zmqport', result_dict['zmqport'], '--cores-per-task', result_dict['cores_per_task'], '--cores-total', result_dict['total_cores'] ] - self.assertEqual(command_lst, command_line_options( - hostname=result_dict['host'], - port_selected=result_dict['zmqport'], - path="/", - cores=int(result_dict['total_cores']), - cores_per_task=int(result_dict['cores_per_task']), - oversubscribe=False, - enable_flux_backend=True, - )) + interface = FluxCmdInterface( + cwd=os.path.abspath("."), + cores=1, + gpus_per_core=0, + oversubscribe=False + ) + self.assertEqual( + command_lst, + interface.generate_command( + command_lst=[ + 'python', '/', '--host', result_dict['host'], + '--zmqport', result_dict['zmqport'], + '--cores-per-task', '2', '--cores-total', '2' + ] + ) + ) self.assertEqual(result_dict, parse_arguments(command_lst)) diff --git a/tests/test_zmq.py b/tests/test_zmq.py index 2dbe78b6..72b0de06 100644 --- a/tests/test_zmq.py +++ b/tests/test_zmq.py @@ -1,10 +1,10 @@ import unittest import zmq from pympipool.shared.communication import ( - connect_to_socket_interface, - close_connection, - send_result, - receive_instruction + interface_connect, + interface_shutdown, + interface_send, + interface_receive ) @@ -16,8 +16,8 @@ def test_initialize_zmq(self): context_server = zmq.Context() socket_server = context_server.socket(zmq.PAIR) port = str(socket_server.bind_to_random_port("tcp://*")) - context_client, socket_client = connect_to_socket_interface(host=host, port=port) - send_result(socket=socket_server, result_dict={"message": message}) - self.assertEqual(receive_instruction(socket=socket_client), {"message": message}) - close_connection(socket=socket_client, context=context_client) - close_connection(socket=socket_server, context=context_server) + context_client, socket_client = interface_connect(host=host, port=port) + interface_send(socket=socket_server, result_dict={"message": message}) + self.assertEqual(interface_receive(socket=socket_client), {"message": message}) + interface_shutdown(socket=socket_client, context=context_client) + interface_shutdown(socket=socket_server, context=context_server)