From 77fefaba55d8f3e23b0760406a54f6d915762805 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Thu, 27 Jul 2023 17:31:22 -0600 Subject: [PATCH] Remove legacy interface --- pympipool/__init__.py | 2 - pympipool/legacy/__init__.py | 0 pympipool/legacy/backend/__init__.py | 0 pympipool/legacy/backend/mpipool.py | 63 ------ pympipool/legacy/interfaces/__init__.py | 0 pympipool/legacy/interfaces/executor.py | 70 ------- pympipool/legacy/interfaces/pool.py | 246 ------------------------ pympipool/legacy/shared/__init__.py | 0 pympipool/legacy/shared/backend.py | 133 ------------- pympipool/legacy/shared/interface.py | 227 ---------------------- tests/test_communicator_split.py | 66 ------- tests/test_executor.py | 127 ------------ tests/test_multitask.py | 99 ---------- tests/test_parse_legacy.py | 56 ------ tests/test_pool.py | 76 -------- 15 files changed, 1165 deletions(-) delete mode 100644 pympipool/legacy/__init__.py delete mode 100644 pympipool/legacy/backend/__init__.py delete mode 100644 pympipool/legacy/backend/mpipool.py delete mode 100644 pympipool/legacy/interfaces/__init__.py delete mode 100644 pympipool/legacy/interfaces/executor.py delete mode 100644 pympipool/legacy/interfaces/pool.py delete mode 100644 pympipool/legacy/shared/__init__.py delete mode 100644 pympipool/legacy/shared/backend.py delete mode 100644 pympipool/legacy/shared/interface.py delete mode 100644 tests/test_communicator_split.py delete mode 100644 tests/test_multitask.py delete mode 100644 tests/test_parse_legacy.py delete mode 100644 tests/test_pool.py diff --git a/pympipool/__init__.py b/pympipool/__init__.py index 73eda1ef..b0fcd107 100644 --- a/pympipool/__init__.py +++ b/pympipool/__init__.py @@ -7,8 +7,6 @@ ) from pympipool.interfaces.taskbroker import HPCExecutor from pympipool.interfaces.taskexecutor import Executor -from pympipool.legacy.interfaces.executor import PoolExecutor -from pympipool.legacy.interfaces.pool import Pool, MPISpawnPool from pympipool.shared.thread import RaisingThread from pympipool.shared.taskexecutor import cancel_items_in_queue diff --git a/pympipool/legacy/__init__.py b/pympipool/legacy/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/pympipool/legacy/backend/__init__.py b/pympipool/legacy/backend/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/pympipool/legacy/backend/mpipool.py b/pympipool/legacy/backend/mpipool.py deleted file mode 100644 index 584dfcce..00000000 --- a/pympipool/legacy/backend/mpipool.py +++ /dev/null @@ -1,63 +0,0 @@ -from os.path import abspath -import pickle -import sys - -import cloudpickle - -from pympipool.shared.communication import ( - connect_to_socket_interface, - send_result, - close_connection, - receive_instruction, -) -from pympipool.legacy.shared.backend import parse_socket_communication, parse_arguments - - -def main(): - from mpi4py import MPI - - MPI.pickle.__init__( - cloudpickle.dumps, - cloudpickle.loads, - pickle.HIGHEST_PROTOCOL, - ) - from mpi4py.futures import MPIPoolExecutor - - future_dict = {} - argument_dict = parse_arguments(argument_lst=sys.argv) - - # required for flux interface - otherwise the current path is not included in the python path - cwd = abspath(".") - if cwd not in sys.path: - sys.path.insert(1, cwd) - - with MPIPoolExecutor( - max_workers=int(argument_dict["total_cores"]), - path=sys.path, # required for flux interface - otherwise the current path is not included in the python path - ) as executor: - if executor is not None: - context, socket = connect_to_socket_interface( - host=argument_dict["host"], port=argument_dict["zmqport"] - ) - while True: - output = parse_socket_communication( - executor=executor, - input_dict=receive_instruction(socket=socket), - future_dict=future_dict, - cores_per_task=int(argument_dict["cores_per_task"]), - ) - if "exit" in output.keys() and output["exit"]: - if "result" in output.keys(): - send_result( - socket=socket, result_dict={"result": output["result"]} - ) - else: - send_result(socket=socket, result_dict={"result": True}) - close_connection(socket=socket, context=context) - break - elif isinstance(output, dict): - send_result(socket=socket, result_dict=output) - - -if __name__ == "__main__": - main() diff --git a/pympipool/legacy/interfaces/__init__.py b/pympipool/legacy/interfaces/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/pympipool/legacy/interfaces/executor.py b/pympipool/legacy/interfaces/executor.py deleted file mode 100644 index 72281e45..00000000 --- a/pympipool/legacy/interfaces/executor.py +++ /dev/null @@ -1,70 +0,0 @@ -from pympipool.interfaces.taskexecutor import ExecutorBase -from pympipool.shared.thread import RaisingThread -from pympipool.legacy.shared.interface import execute_serial_tasks - - -class PoolExecutor(ExecutorBase): - """ - To combine the functionality of the pympipool.Pool and the pympipool.Executor the pympipool.PoolExecutor again - connects to the mpi4py.futures.MPIPoolExecutor. Still in contrast to the pympipool.Pool it does not implement the - map() and starmap() functions but rather the submit() function based on the concurrent.futures.Executor interface. - In this case the load balancing happens internally and the maximum number of workers max_workers defines the maximum - number of parallel tasks. But only serial python tasks can be executed in contrast to the pympipool.Executor which - can also execute MPI parallel python tasks. - - Args: - max_workers (int): defines the total number of MPI ranks to use - gpus_per_task (int): number of GPUs per MPI rank - defaults to 0 - oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI only) - default False - enable_flux_backend (bool): use the flux-framework as backend rather than just calling mpiexec - enable_slurm_backend (bool): enable the SLURM queueing system as backend - defaults to False - cwd (str/None): current working directory where the parallel python task is executed - sleep_interval (float): - queue_adapter (pysqa.queueadapter.QueueAdapter): generalized interface to various queuing systems - queue_adapter_kwargs (dict/None): keyword arguments for the submit_job() function of the queue adapter - - Simple example: - ``` - from pympipool import PoolExecutor - - def calc(i, j): - return i + j - - with PoolExecutor(max_workers=2) as p: - fs1 = p.submit(calc, 1, 2) - fs2 = p.submit(calc, 3, 4) - fs3 = p.submit(calc, 5, 6) - fs4 = p.submit(calc, 7, 8) - print(fs1.result(), fs2.result(), fs3.result(), fs4.result() - ``` - """ - - def __init__( - self, - max_workers=1, - gpus_per_task=0, - oversubscribe=False, - enable_flux_backend=False, - enable_slurm_backend=False, - cwd=None, - sleep_interval=0.1, - queue_adapter=None, - queue_adapter_kwargs=None, - ): - super().__init__() - self._process = RaisingThread( - target=execute_serial_tasks, - kwargs={ - "future_queue": self._future_queue, - "cores": max_workers, - "gpus_per_task": gpus_per_task, - "oversubscribe": oversubscribe, - "enable_flux_backend": enable_flux_backend, - "enable_slurm_backend": enable_slurm_backend, - "cwd": cwd, - "sleep_interval": sleep_interval, - "queue_adapter": queue_adapter, - "queue_adapter_kwargs": queue_adapter_kwargs, - }, - ) - self._process.start() diff --git a/pympipool/legacy/interfaces/pool.py b/pympipool/legacy/interfaces/pool.py deleted file mode 100644 index bf4d5d1c..00000000 --- a/pympipool/legacy/interfaces/pool.py +++ /dev/null @@ -1,246 +0,0 @@ -from abc import ABC - -from pympipool.shared.communication import SocketInterface -from pympipool.shared.taskexecutor import cloudpickle_register -from pympipool.legacy.shared.interface import get_parallel_subprocess_command - - -class PoolBase(ABC): - """ - Base class for the Pool and MPISpawnPool classes defined below. The PoolBase class is not intended to be used - alone. Rather it implements the __enter__(), __exit__() and shutdown() function shared between the derived classes. - """ - - def __init__(self, queue_adapter=None, queue_adapter_kwargs=None): - self._future_dict = {} - self._interface = SocketInterface( - queue_adapter=queue_adapter, queue_adapter_kwargs=queue_adapter_kwargs - ) - cloudpickle_register(ind=3) - - def __enter__(self): - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - self.shutdown(wait=True) - return False - - def shutdown(self, wait=True): - self._interface.shutdown(wait=wait) - - -class Pool(PoolBase): - """ - The pympipool.Pool behaves like the multiprocessing.Pool but it uses mpi4py to distribute tasks. In contrast to the - mpi4py.futures.MPIPoolExecutor the pympipool.Pool can be executed in a serial python process and does not require - the python script to be executed with MPI. Still internally the pympipool.Pool uses the - mpi4py.futures.MPIPoolExecutor, consequently it is primarily an abstraction of its functionality to improve the - usability in particular when used in combination with Jupyter notebooks. - - Args: - max_workers (int): defines the total number of MPI ranks to use - gpus_per_task (int): number of GPUs per MPI rank - defaults to 0 - oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI only) - enable_flux_backend (bool): use the flux-framework as backend - enable_slurm_backend (bool): enable the SLURM queueing system as backend - defaults to False - cwd (str/None): current working directory where the parallel python task is executed - queue_adapter (pysqa.queueadapter.QueueAdapter): generalized interface to various queuing systems - queue_adapter_kwargs (dict/None): keyword arguments for the submit_job() function of the queue adapter - - Simple example: - ``` - import numpy as np - from pympipool import Pool - - def calc(i): - return np.array(i ** 2) - - with Pool(cores=2) as p: - print(p.map(func=calc, iterable=[1, 2, 3, 4])) - ``` - """ - - def __init__( - self, - max_workers=1, - gpus_per_task=0, - oversubscribe=False, - enable_flux_backend=False, - enable_slurm_backend=False, - cwd=None, - queue_adapter=None, - queue_adapter_kwargs=None, - ): - super().__init__( - queue_adapter=queue_adapter, queue_adapter_kwargs=queue_adapter_kwargs - ) - self._interface.bootup( - command_lst=get_parallel_subprocess_command( - port_selected=self._interface.bind_to_random_port(), - cores=max_workers, - cores_per_task=1, - gpus_per_task=gpus_per_task, - oversubscribe=oversubscribe, - enable_flux_backend=enable_flux_backend, - enable_slurm_backend=enable_slurm_backend, - enable_mpi4py_backend=True, - enable_multi_host=queue_adapter is not None, - ), - cwd=cwd, - cores=max_workers, - ) - - def map(self, func, iterable, chunksize=None): - """ - Map a given function on a list of attributes. - - Args: - func: function to be applied to each element of the following list - iterable (list): list of arguments the function should be applied on - chunksize (int/None): - - Returns: - list: list of output generated from applying the function on the list of arguments - """ - # multiprocessing.pool.Pool and mpi4py.future.ExecutorPool have different defaults - if chunksize is None: - chunksize = 1 - return self._interface.send_and_receive_dict( - input_dict={ - "fn": func, - "iterable": iterable, - "chunksize": chunksize, - "map": True, - } - ) - - def starmap(self, func, iterable, chunksize=None): - """ - Map a given function on a list of attributes. - - Args: - func: function to be applied to each element of the following list - iterable (list): list of arguments the function should be applied on - chunksize (int/None): - - Returns: - list: list of output generated from applying the function on the list of arguments - """ - # multiprocessing.pool.Pool and mpi4py.future.ExecutorPool have different defaults - if chunksize is None: - chunksize = 1 - return self._interface.send_and_receive_dict( - input_dict={ - "fn": func, - "iterable": iterable, - "chunksize": chunksize, - "map": False, - } - ) - - -class MPISpawnPool(PoolBase): - """ - The pympipool.MPISpawnPool behaves like the multiprocessing.Pool but it uses mpi4py to distribute tasks. In contrast - to the mpi4py.futures.MPIPoolExecutor the pympipool.MPISpawnPool can be executed in a serial python process and does - not require the python script to be executed with MPI. Still internally the pympipool.Pool uses the - mpi4py.futures.MPIPoolExecutor, consequently it is primarily an abstraction of its functionality to improve the - usability in particular when used in combination with Jupyter notebooks. - - Args: - max_ranks (int): defines the total number of MPI ranks to use - ranks_per_task (int): defines the number of MPI ranks per task - gpus_per_task (int): number of GPUs per MPI rank - defaults to 0 - oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI only) - cwd (str/None): current working directory where the parallel python task is executed - queue_adapter (pysqa.queueadapter.QueueAdapter): generalized interface to various queuing systems - queue_adapter_kwargs (dict/None): keyword arguments for the submit_job() function of the queue adapter - - Simple example: - ``` - from pympipool import MPISpawnPool - - def calc(i, comm): - return i, comm.Get_size(), comm.Get_rank() - - with MPISpawnPool(max_ranks=4, ranks_per_task=2) as p: - print(p.map(func=calc, iterable=[1, 2, 3, 4])) - ``` - """ - - def __init__( - self, - max_ranks=1, - ranks_per_task=1, - gpus_per_task=0, - oversubscribe=False, - cwd=None, - queue_adapter=None, - queue_adapter_kwargs=None, - ): - super().__init__( - queue_adapter=queue_adapter, queue_adapter_kwargs=queue_adapter_kwargs - ) - self._interface.bootup( - command_lst=get_parallel_subprocess_command( - port_selected=self._interface.bind_to_random_port(), - cores=max_ranks, - cores_per_task=ranks_per_task, - gpus_per_task=gpus_per_task, - oversubscribe=oversubscribe, - enable_flux_backend=False, - enable_slurm_backend=False, - enable_mpi4py_backend=True, - enable_multi_host=queue_adapter is not None, - ), - cwd=cwd, - cores=max_ranks, - ) - - def map(self, func, iterable, chunksize=None): - """ - Map a given function on a list of attributes. - - Args: - func: function to be applied to each element of the following list - iterable (list): list of arguments the function should be applied on - chunksize (int/None): - - Returns: - list: list of output generated from applying the function on the list of arguments - """ - # multiprocessing.pool.Pool and mpi4py.future.ExecutorPool have different defaults - if chunksize is None: - chunksize = 1 - return self._interface.send_and_receive_dict( - input_dict={ - "fn": func, - "iterable": iterable, - "chunksize": chunksize, - "map": True, - } - ) - - def starmap(self, func, iterable, chunksize=None): - """ - Map a given function on a list of attributes. - - Args: - func: function to be applied to each element of the following list - iterable (list): list of arguments the function should be applied on - chunksize (int/None): - - Returns: - list: list of output generated from applying the function on the list of arguments - """ - # multiprocessing.pool.Pool and mpi4py.future.ExecutorPool have different defaults - if chunksize is None: - chunksize = 1 - return self._interface.send_and_receive_dict( - input_dict={ - "fn": func, - "iterable": iterable, - "chunksize": chunksize, - "map": False, - } - ) diff --git a/pympipool/legacy/shared/__init__.py b/pympipool/legacy/shared/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/pympipool/legacy/shared/backend.py b/pympipool/legacy/shared/backend.py deleted file mode 100644 index 854297ad..00000000 --- a/pympipool/legacy/shared/backend.py +++ /dev/null @@ -1,133 +0,0 @@ -from tqdm import tqdm - -from pympipool.shared.backend import call_funct, _update_default_dict_from_arguments - - -def map_funct(executor, funct, lst, chunksize=1, cores_per_task=1, map_flag=True): - if cores_per_task == 1: - if map_flag: - results = executor.map(funct, lst, chunksize=chunksize) - else: - results = executor.starmap(funct, lst, chunksize=chunksize) - return list(tqdm(results, desc="Tasks", total=len(lst))) - else: - lst_parallel = [] - for input_parameter in lst: - for _ in range(cores_per_task): - lst_parallel.append(input_parameter) - if map_flag: - results = executor.map( - _wrap(funct=funct, number_of_cores_per_communicator=cores_per_task), - lst_parallel, - chunksize=chunksize, - ) - else: - results = executor.starmap( - _wrap(funct=funct, number_of_cores_per_communicator=cores_per_task), - lst_parallel, - chunksize=chunksize, - ) - return list(tqdm(results, desc="Tasks", total=len(lst_parallel)))[ - ::cores_per_task - ] - - -def parse_arguments(argument_lst): - """ - Simple function to parse command line arguments - - Args: - argument_lst (list): list of arguments as strings - - Returns: - dict: dictionary with the parsed arguments and their corresponding values - """ - return _update_default_dict_from_arguments( - argument_lst=argument_lst, - argument_dict={ - "total_cores": "--cores-total", - "zmqport": "--zmqport", - "cores_per_task": "--cores-per-task", - "host": "--host", - }, - default_dict={"host": "localhost"}, - ) - - -def parse_socket_communication(executor, input_dict, future_dict, cores_per_task=1): - if "shutdown" in input_dict.keys() and input_dict["shutdown"]: - executor.shutdown(wait=input_dict["wait"]) - done_dict = _update_futures(future_dict=future_dict) - # If close "shutdown" is communicated the process is shutdown. - if done_dict is not None and len(done_dict) > 0: - return {"exit": True, "result": done_dict} - else: - return {"exit": True} - elif "fn" in input_dict.keys() and "iterable" in input_dict.keys(): - # If a function "fn" and a list or arguments "iterable" are communicated, - # pympipool uses the map() function to apply the function on the list. - try: - output = map_funct( - executor=executor, - funct=input_dict["fn"], - lst=input_dict["iterable"], - cores_per_task=cores_per_task, - chunksize=input_dict["chunksize"], - map_flag=input_dict["map"], - ) - except Exception as error: - return {"error": error, "error_type": str(type(error))} - else: - return {"result": output} - elif ( - "fn" in input_dict.keys() - and "args" in input_dict.keys() - and "kwargs" in input_dict.keys() - ): - # If a function "fn", arguments "args" and keyword arguments "kwargs" are - # communicated pympipool uses submit() to asynchronously apply the function - # on the arguments and or keyword arguments. - future = call_funct(input_dict=input_dict, funct=executor.submit) - future_hash = hash(future) - future_dict[future_hash] = future - return {"result": future_hash} - elif "update" in input_dict.keys(): - # If update "update" is communicated pympipool checks for asynchronously submitted - # functions which have completed in the meantime and communicates their results. - done_dict = _update_futures( - future_dict=future_dict, hash_lst=input_dict["update"] - ) - return {"result": done_dict} - elif "cancel" in input_dict.keys(): - for k in input_dict["cancel"]: - future_dict[k].cancel() - return {"result": True} - - -def _update_futures(future_dict, hash_lst=None): - if hash_lst is None: - hash_lst = list(future_dict.keys()) - done_dict = { - k: f.result() - for k, f in {k: future_dict[k] for k in hash_lst}.items() - if f.done() - } - for k in done_dict.keys(): - del future_dict[k] - return done_dict - - -def _wrap(funct, number_of_cores_per_communicator=1): - def functwrapped(*args, **kwargs): - from mpi4py import MPI - - MPI.COMM_WORLD.Barrier() - rank = MPI.COMM_WORLD.Get_rank() - comm_new = MPI.COMM_WORLD.Split( - rank // number_of_cores_per_communicator, - rank % number_of_cores_per_communicator, - ) - comm_new.Barrier() - return funct(*args, comm=comm_new, **kwargs) - - return functwrapped diff --git a/pympipool/legacy/shared/interface.py b/pympipool/legacy/shared/interface.py deleted file mode 100644 index d181d15f..00000000 --- a/pympipool/legacy/shared/interface.py +++ /dev/null @@ -1,227 +0,0 @@ -import os -import queue -import socket -import time - - -from pympipool.shared.communication import SocketInterface - - -def command_line_options( - hostname, - port_selected, - path, - cores, - cores_per_task=1, - gpus_per_task=0, - oversubscribe=False, - enable_flux_backend=False, - enable_slurm_backend=False, - enable_mpi4py_backend=True, - enable_multi_host=False, -): - """ - Translate the individual parameters to command line options. - - Args: - hostname (str): name of the host where the SocketInterface instance runs the client process should conenct to. - port_selected (int): port the SocketInterface instance runs on. - path (str): path to the python script which should be executed as client process. - cores (int): defines the total number of MPI ranks to use - cores_per_task (int): number of MPI ranks per task - defaults to 1 - gpus_per_task (int): number of GPUs per MPI rank - defaults to 0 - oversubscribe (bool): enable of disable the oversubscribe feature of OpenMPI - defaults to False - enable_flux_backend (bool): enable the flux-framework as backend - defaults to False - enable_slurm_backend (bool): enable the SLURM queueing system as backend - defaults to False - enable_mpi4py_backend (bool): enable the mpi4py.futures module - defaults to True - enable_multi_host (bool): communicate the host to connect to - defaults to False - - Returns: - list: list of strings to be executed on the command line - """ - if enable_flux_backend: - command_lst = ["flux", "run"] - elif enable_slurm_backend: - command_lst = ["srun"] - else: - command_lst = ["mpiexec"] - if gpus_per_task > 0 and (enable_flux_backend or enable_slurm_backend): - command_lst += ["--gpus-per-task=" + str(gpus_per_task)] - elif gpus_per_task > 0: - raise ValueError("GPU binding is only supported for flux and SLURM backend.") - if oversubscribe: - command_lst += ["--oversubscribe"] - if cores_per_task == 1 and enable_mpi4py_backend: - command_lst += ["-n", str(cores), "python", "-m", "mpi4py.futures"] - elif cores_per_task > 1 and enable_mpi4py_backend: - # Running MPI parallel tasks within the map() requires mpi4py to use mpi spawn: - # https://github.com/mpi4py/mpi4py/issues/324 - command_lst += ["-n", "1", "python"] - else: - command_lst += ["-n", str(cores), "python"] - command_lst += [path] - if enable_flux_backend or enable_slurm_backend or enable_multi_host: - command_lst += [ - "--host", - hostname, - ] - command_lst += [ - "--zmqport", - str(port_selected), - ] - if enable_mpi4py_backend: - command_lst += [ - "--cores-per-task", - str(cores_per_task), - "--cores-total", - str(cores), - ] - return command_lst - - -def get_parallel_subprocess_command( - port_selected, - cores, - cores_per_task=1, - gpus_per_task=0, - oversubscribe=False, - enable_flux_backend=False, - enable_slurm_backend=False, - enable_mpi4py_backend=True, - enable_multi_host=False, -): - """ - Translate the individual parameters to command line options. - - Args: - port_selected (int): port the SocketInterface instance runs on. - cores (int): defines the total number of MPI ranks to use - cores_per_task (int): number of MPI ranks per task - defaults to 1 - gpus_per_task (int): number of GPUs per MPI rank - defaults to 0 - oversubscribe (bool): enable of disable the oversubscribe feature of OpenMPI - defaults to False - enable_flux_backend (bool): enable the flux-framework as backend - defaults to False - enable_slurm_backend (bool): enable the SLURM queueing system as backend - defaults to False - enable_mpi4py_backend (bool): enable the mpi4py.futures module - defaults to True - enable_multi_host (bool): communicate the host to connect to - defaults to False - - Returns: - list: list of strings to be executed on the command line - """ - if enable_mpi4py_backend: - executable = os.path.abspath( - os.path.join(__file__, "..", "..", "backend", "mpipool.py") - ) - else: - executable = os.path.abspath( - os.path.join(__file__, "..", "..", "..", "backend", "mpiexec.py") - ) - return command_line_options( - hostname=socket.gethostname(), - port_selected=port_selected, - path=executable, - cores=cores, - cores_per_task=cores_per_task, - gpus_per_task=gpus_per_task, - oversubscribe=oversubscribe, - enable_flux_backend=enable_flux_backend, - enable_slurm_backend=enable_slurm_backend, - enable_mpi4py_backend=enable_mpi4py_backend, - enable_multi_host=enable_multi_host, - ) - - -def execute_serial_tasks( - future_queue, - cores, - gpus_per_task=0, - oversubscribe=False, - enable_flux_backend=False, - enable_slurm_backend=False, - cwd=None, - sleep_interval=0.1, - queue_adapter=None, - queue_adapter_kwargs=None, -): - """ - Execute a single tasks in serial. - - Args: - future_queue (queue.Queue): task queue of dictionary objects which are submitted to the parallel process - cores (int): defines the total number of MPI ranks to use - gpus_per_task (int): number of GPUs per MPI rank - defaults to 0 - oversubscribe (bool): enable of disable the oversubscribe feature of OpenMPI - defaults to False - enable_flux_backend (bool): enable the flux-framework as backend - defaults to False - enable_slurm_backend (bool): enable the SLURM queueing system as backend - defaults to False - cwd (str/None): current working directory where the parallel python task is executed - sleep_interval (float): - queue_adapter (pysqa.queueadapter.QueueAdapter): generalized interface to various queuing systems - queue_adapter_kwargs (dict/None): keyword arguments for the submit_job() function of the queue adapter - """ - future_dict = {} - interface = SocketInterface( - queue_adapter=queue_adapter, queue_adapter_kwargs=queue_adapter_kwargs - ) - interface.bootup( - command_lst=get_parallel_subprocess_command( - port_selected=interface.bind_to_random_port(), - cores=cores, - gpus_per_task=gpus_per_task, - cores_per_task=1, - oversubscribe=oversubscribe, - enable_flux_backend=enable_flux_backend, - enable_slurm_backend=enable_slurm_backend, - enable_mpi4py_backend=True, - enable_multi_host=queue_adapter is not None, - ), - cwd=cwd, - cores=cores, - ) - _execute_serial_tasks_loop( - interface=interface, - future_queue=future_queue, - future_dict=future_dict, - sleep_interval=sleep_interval, - ) - - -def _execute_serial_tasks_loop( - interface, future_queue, future_dict, sleep_interval=0.1 -): - while True: - try: - task_dict = future_queue.get_nowait() - except queue.Empty: - pass - else: - if "shutdown" in task_dict.keys() and task_dict["shutdown"]: - done_dict = interface.shutdown(wait=task_dict["wait"]) - if isinstance(done_dict, dict): - for k, v in done_dict.items(): - if k in future_dict.keys() and not future_dict[k].cancelled(): - future_dict.pop(k).set_result(v) - future_queue.task_done() - break - elif "fn" in task_dict.keys() and "future" in task_dict.keys(): - f = task_dict.pop("future") - future_hash = interface.send_and_receive_dict(input_dict=task_dict) - future_dict[future_hash] = f - future_queue.task_done() - _update_future_dict( - interface=interface, future_dict=future_dict, sleep_interval=sleep_interval - ) - - -def _update_future_dict(interface, future_dict, sleep_interval=0.1): - time.sleep(sleep_interval) - hash_to_update = [h for h, f in future_dict.items() if not f.done()] - hash_to_cancel = [h for h, f in future_dict.items() if f.cancelled()] - if len(hash_to_update) > 0: - for k, v in interface.send_and_receive_dict( - input_dict={"update": hash_to_update} - ).items(): - future_dict.pop(k).set_result(v) - if len(hash_to_cancel) > 0 and interface.send_and_receive_dict( - input_dict={"cancel": hash_to_cancel} - ): - for h in hash_to_cancel: - del future_dict[h] diff --git a/tests/test_communicator_split.py b/tests/test_communicator_split.py deleted file mode 100644 index 9e09f65b..00000000 --- a/tests/test_communicator_split.py +++ /dev/null @@ -1,66 +0,0 @@ -import unittest -from pympipool import MPISpawnPool - - -def get_ranks(input_parameter, comm=None): - from mpi4py import MPI - size = MPI.COMM_WORLD.Get_size() - rank = MPI.COMM_WORLD.Get_rank() - if comm is not None: - size_new = comm.Get_size() - rank_new = comm.Get_rank() - else: - size_new = 0 - rank_new = 0 - return size, rank, size_new, rank_new, input_parameter - - -def get_ranks_multi_input(input_parameter1, input_parameter2, comm=None): - from mpi4py import MPI - size = MPI.COMM_WORLD.Get_size() - rank = MPI.COMM_WORLD.Get_rank() - if comm is not None: - size_new = comm.Get_size() - rank_new = comm.Get_rank() - else: - size_new = 0 - rank_new = 0 - return size, rank, size_new, rank_new, input_parameter1, input_parameter2 - - -class TestCommunicator(unittest.TestCase): - def test_map_serial(self): - with MPISpawnPool(max_ranks=2, ranks_per_task=1) as p: - output = p.map(func=get_ranks, iterable=[1, 2, 3]) - self.assertEqual(output[0], (2, 1, 0, 0, 1)) - self.assertEqual(output[1], (2, 1, 0, 0, 2)) - self.assertEqual(output[2], (2, 1, 0, 0, 3)) - - def test_map_parallel(self): - with MPISpawnPool(max_ranks=2, ranks_per_task=2) as p: - output = p.map(func=get_ranks, iterable=[1, 2, 3, 4]) - self.assertEqual(output[0][::2], (2, 2, 1)) - self.assertEqual(output[1][::2], (2, 2, 2)) - self.assertEqual(output[2][::2], (2, 2, 3)) - self.assertEqual(output[3][::2], (2, 2, 4)) - - def test_starmap_serial(self): - with MPISpawnPool(max_ranks=2, ranks_per_task=1) as p: - output = p.starmap( - func=get_ranks_multi_input, - iterable=[[1, 1], [2, 2], [3, 3]] - ) - self.assertEqual(output[0], (2, 1, 0, 0, 1, 1)) - self.assertEqual(output[1], (2, 1, 0, 0, 2, 2)) - self.assertEqual(output[2], (2, 1, 0, 0, 3, 3)) - - def test_starmap_parallel(self): - with MPISpawnPool(max_ranks=2, ranks_per_task=2) as p: - output = p.starmap( - func=get_ranks_multi_input, - iterable=[[1, 1], [2, 2], [3, 3], [4, 4]] - ) - self.assertEqual(output[0][::2], (2, 2, 1)) - self.assertEqual(output[1][::2], (2, 2, 2)) - self.assertEqual(output[2][::2], (2, 2, 3)) - self.assertEqual(output[3][::2], (2, 2, 4)) diff --git a/tests/test_executor.py b/tests/test_executor.py index e87a742d..1c39c79a 100644 --- a/tests/test_executor.py +++ b/tests/test_executor.py @@ -1,6 +1,4 @@ import unittest -from concurrent.futures import ThreadPoolExecutor -from pympipool.legacy.shared.backend import map_funct, parse_socket_communication from pympipool.shared.backend import call_funct @@ -9,131 +7,6 @@ def function_multi_args(a, b): class TestExecutor(unittest.TestCase): - def test_exec_funct_single_core_map(self): - with ThreadPoolExecutor(max_workers=1) as executor: - output = map_funct( - executor=executor, - funct=sum, - lst=[[1, 1], [2, 2]], - cores_per_task=1, - chunksize=1, - ) - self.assertEqual(output, [2, 4]) - - def test_exec_funct_single_core_starmap(self): - with self.assertRaises(AttributeError): - with ThreadPoolExecutor(max_workers=1) as executor: - map_funct( - executor=executor, - funct=sum, - lst=[[1, 1], [2, 2]], - cores_per_task=1, - chunksize=1, - map_flag=False - ) - - def test_parse_socket_communication_close(self): - with ThreadPoolExecutor(max_workers=1) as executor: - output = parse_socket_communication( - executor=executor, - input_dict={"shutdown": True, "wait": True}, - future_dict={}, - cores_per_task=1 - ) - self.assertEqual(output, {"exit": True}) - - def test_parse_socket_communication_execute(self): - with ThreadPoolExecutor(max_workers=1) as executor: - output = parse_socket_communication( - executor=executor, - input_dict={"fn": sum, "iterable": [[1, 1]], "chunksize": 1, "map": True}, - future_dict={}, - cores_per_task=1 - ) - self.assertEqual(output, {"result": [2]}) - - def test_parse_socket_communication_error(self): - with ThreadPoolExecutor(max_workers=1) as executor: - output = parse_socket_communication( - executor=executor, - input_dict={"fn": sum, "iterable": [["a", "b"]], "chunksize": 1, "map": True}, - future_dict={}, - cores_per_task=1 - ) - self.assertEqual(output["error_type"], "") - - def test_parse_socket_communication_submit_args(self): - future_dict = {} - with ThreadPoolExecutor(max_workers=1) as executor: - output = parse_socket_communication( - executor=executor, - input_dict={"fn": sum, "args": [[1, 1]], "kwargs": {}}, - future_dict=future_dict, - cores_per_task=1 - ) - future = future_dict[output['result']] - self.assertEqual(future.result(), 2) - - def test_parse_socket_communication_submit_kwargs(self): - future_dict = {} - with ThreadPoolExecutor(max_workers=1) as executor: - output = parse_socket_communication( - executor=executor, - input_dict={"fn": function_multi_args, "args": (), "kwargs": {"a": 1, "b": 2}}, - future_dict=future_dict, - cores_per_task=1 - ) - future = future_dict[output['result']] - self.assertEqual(future.result(), 3) - - def test_parse_socket_communication_submit_both(self): - future_dict = {} - with ThreadPoolExecutor(max_workers=1) as executor: - output = parse_socket_communication( - executor=executor, - input_dict={"fn": function_multi_args, "args": [1], "kwargs": {"b": 2}}, - future_dict=future_dict, - cores_per_task=1 - ) - future = future_dict[output['result']] - self.assertEqual(future.result(), 3) - - def test_parse_socket_communication_update(self): - future_dict = {} - with ThreadPoolExecutor(max_workers=1) as executor: - output = parse_socket_communication( - executor=executor, - input_dict={"fn": sum, "args": [[1, 1]], "kwargs": {}}, - future_dict=future_dict, - cores_per_task=1 - ) - future_hash = output["result"] - result = parse_socket_communication( - executor=executor, - input_dict={"update": [future_hash]}, - future_dict=future_dict, - cores_per_task=1 - ) - self.assertEqual(result, {"result": {future_hash: 2}}) - - def test_parse_socket_communication_cancel(self): - future_dict = {} - with ThreadPoolExecutor(max_workers=1) as executor: - output = parse_socket_communication( - executor=executor, - input_dict={"fn": sum, "args": [[1, 1]], "kwargs": {}}, - future_dict=future_dict, - cores_per_task=1 - ) - future_hash = output["result"] - result = parse_socket_communication( - executor=executor, - input_dict={"cancel": [future_hash]}, - future_dict=future_dict, - cores_per_task=1 - ) - self.assertEqual(result, {"result": True}) - def test_funct_call_default(self): self.assertEqual(call_funct(input_dict={ "fn": sum, diff --git a/tests/test_multitask.py b/tests/test_multitask.py deleted file mode 100644 index 9b5ea46a..00000000 --- a/tests/test_multitask.py +++ /dev/null @@ -1,99 +0,0 @@ -import numpy as np -import unittest -from queue import Queue -from time import sleep -from pympipool import PoolExecutor -from pympipool.legacy.shared.interface import execute_serial_tasks -from pympipool.shared.taskexecutor import cloudpickle_register -from concurrent.futures import Future - - -def calc(i): - return np.array(i ** 2) - - -def sleep_one(i): - sleep(1) - return i - - -def wait_and_calc(n): - sleep(1) - return n ** 2 - - -def call_back(future): - global_lst.append(future.result()) - - -global_lst = [] - - -class TestFuturePool(unittest.TestCase): - def test_pool_serial(self): - with PoolExecutor(max_workers=1) as p: - output = p.submit(calc, i=2) - self.assertEqual(len(p), 1) - self.assertTrue(isinstance(output, Future)) - self.assertFalse(output.done()) - sleep(1) - self.assertTrue(output.done()) - self.assertEqual(len(p), 0) - self.assertEqual(output.result(), np.array(4)) - - def test_execute_task(self): - f = Future() - q = Queue() - q.put({"fn": calc, 'args': (), "kwargs": {"i": 2}, "future": f}) - q.put({"shutdown": True, "wait": True}) - cloudpickle_register(ind=1) - execute_serial_tasks( - future_queue=q, - cores=1, - oversubscribe=False, - enable_flux_backend=False - ) - self.assertEqual(f.result(), np.array(4)) - q.join() - - def test_pool_cancel(self): - with PoolExecutor(max_workers=2, sleep_interval=0) as p: - fs1 = p.submit(sleep_one, i=2) - fs2 = p.submit(sleep_one, i=2) - fs3 = p.submit(sleep_one, i=2) - fs4 = p.submit(sleep_one, i=2) - sleep(1) - fs1.cancel() - fs2.cancel() - fs3.cancel() - fs4.cancel() - self.assertTrue(fs1.done()) - self.assertTrue(fs2.done()) - self.assertTrue(fs3.done()) - self.assertTrue(fs4.done()) - - def test_cancel_task(self): - fs1 = Future() - fs1.cancel() - q = Queue() - q.put({"fn": sleep_one, 'args': (), "kwargs": {"i": 1}, "future": fs1}) - q.put({"shutdown": True, "wait": True}) - cloudpickle_register(ind=1) - execute_serial_tasks( - future_queue=q, - cores=1, - oversubscribe=False, - enable_flux_backend=False - ) - self.assertTrue(fs1.done()) - self.assertTrue(fs1.cancelled()) - q.join() - - def test_waiting(self): - exe = PoolExecutor(max_workers=2) - f1 = exe.submit(wait_and_calc, 42) - f2 = exe.submit(wait_and_calc, 84) - f1.add_done_callback(call_back) - f2.add_done_callback(call_back) - exe.shutdown(wait=True) - self.assertTrue([42**2, 84**2], global_lst) diff --git a/tests/test_parse_legacy.py b/tests/test_parse_legacy.py deleted file mode 100644 index c4f28b76..00000000 --- a/tests/test_parse_legacy.py +++ /dev/null @@ -1,56 +0,0 @@ -import unittest -from pympipool.legacy.shared.backend import parse_arguments -from pympipool.legacy.shared.interface import command_line_options - - -class TestParser(unittest.TestCase): - def test_command_local(self): - result_dict = { - 'host': 'localhost', - 'total_cores': '2', - 'zmqport': '22', - 'cores_per_task': '1' - } - command_lst = [ - 'mpiexec', '--oversubscribe', - '-n', result_dict['total_cores'], - 'python', '-m', 'mpi4py.futures', '/', - '--zmqport', result_dict['zmqport'], - '--cores-per-task', result_dict['cores_per_task'], - '--cores-total', result_dict['total_cores'] - ] - self.assertEqual(command_lst, command_line_options( - hostname=result_dict['host'], - port_selected=result_dict['zmqport'], - path="/", - cores=int(result_dict['total_cores']), - cores_per_task=int(result_dict['cores_per_task']), - oversubscribe=True, - enable_flux_backend=False, - )) - self.assertEqual(result_dict, parse_arguments(command_lst)) - - def test_command_flux(self): - result_dict = { - 'host': "127.0.0.1", - 'total_cores': '2', - 'zmqport': '22', - 'cores_per_task': '2' - } - command_lst = [ - 'flux', 'run', '-n', '1', 'python', '/', - '--host', result_dict['host'], - '--zmqport', result_dict['zmqport'], - '--cores-per-task', result_dict['cores_per_task'], - '--cores-total', result_dict['total_cores'] - ] - self.assertEqual(command_lst, command_line_options( - hostname=result_dict['host'], - port_selected=result_dict['zmqport'], - path="/", - cores=int(result_dict['total_cores']), - cores_per_task=int(result_dict['cores_per_task']), - oversubscribe=False, - enable_flux_backend=True, - )) - self.assertEqual(result_dict, parse_arguments(command_lst)) diff --git a/tests/test_pool.py b/tests/test_pool.py deleted file mode 100644 index f5a6e56c..00000000 --- a/tests/test_pool.py +++ /dev/null @@ -1,76 +0,0 @@ -import numpy as np -import unittest -from pympipool import Pool - - -def calc(i): - return np.array(i ** 2) - - -def calc_add(i, j): - return i + j - - -def calc_none(i): - return None - - -def calc_error_value_error(i): - raise ValueError("calc_error value error") - - -def calc_error_type_error(i): - raise TypeError("calc_error value error") - - -class TestPool(unittest.TestCase): - def test_map_serial(self): - with Pool(max_workers=1) as p: - output = p.map(func=calc, iterable=[1, 2, 3, 4]) - self.assertEqual(output[0], 1) - self.assertEqual(output[1], 4) - self.assertEqual(output[2], 9) - self.assertEqual(output[3], 16) - - def test_starmap_serial(self): - with Pool(max_workers=1) as p: - output = p.starmap(func=calc_add, iterable=[[1, 2], [3, 4]]) - self.assertEqual(output[0], 3) - self.assertEqual(output[1], 7) - - def test_map_parallel(self): - with Pool(max_workers=2) as p: - output = p.map(func=calc, iterable=[1, 2, 3, 4]) - self.assertEqual(output[0], 1) - self.assertEqual(output[1], 4) - self.assertEqual(output[2], 9) - self.assertEqual(output[3], 16) - - def test_starmap_parallel(self): - with Pool(max_workers=2) as p: - output = p.starmap(func=calc_add, iterable=[[1, 2], [3, 4]]) - self.assertEqual(output[0], 3) - self.assertEqual(output[1], 7) - - def test_pool_none(self): - with Pool(max_workers=2) as p: - output = p.map(func=calc_none, iterable=[1, 2, 3, 4]) - self.assertEqual(output, [None, None, None, None]) - - def test_pool_error(self): - with self.assertRaises(ValueError): - with Pool(max_workers=2) as p: - p.map(func=calc_error_value_error, iterable=[1, 2, 3, 4]) - with self.assertRaises(TypeError): - with Pool(max_workers=2) as p: - p.map(func=calc_error_type_error, iterable=[1, 2, 3, 4]) - - def test_shutdown(self): - p = Pool(max_workers=1) - output = p.map(func=calc, iterable=[1, 2, 3, 4]) - p.shutdown(wait=True) - p.shutdown(wait=False) - self.assertEqual(output[0], 1) - self.assertEqual(output[1], 4) - self.assertEqual(output[2], 9) - self.assertEqual(output[3], 16)