diff --git a/pympipool/legacy/__init__.py b/pympipool/legacy/__init__.py deleted file mode 100644 index 24948aa0..00000000 --- a/pympipool/legacy/__init__.py +++ /dev/null @@ -1,2 +0,0 @@ -from pympipool.legacy.interfaces.poolexecutor import PoolExecutor -from pympipool.legacy.interfaces.pool import Pool, MPISpawnPool diff --git a/pympipool/legacy/backend/__init__.py b/pympipool/legacy/backend/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/pympipool/legacy/backend/mpipool.py b/pympipool/legacy/backend/mpipool.py deleted file mode 100644 index 60230a57..00000000 --- a/pympipool/legacy/backend/mpipool.py +++ /dev/null @@ -1,63 +0,0 @@ -from os.path import abspath -import pickle -import sys - -import cloudpickle - -from pympipool.shared.communication import ( - interface_connect, - interface_send, - interface_shutdown, - interface_receive, -) -from pympipool.legacy.shared.backend import parse_socket_communication, parse_arguments - - -def main(): - from mpi4py import MPI - - MPI.pickle.__init__( - cloudpickle.dumps, - cloudpickle.loads, - pickle.HIGHEST_PROTOCOL, - ) - from mpi4py.futures import MPIPoolExecutor - - future_dict = {} - argument_dict = parse_arguments(argument_lst=sys.argv) - - # required for flux interface - otherwise the current path is not included in the python path - cwd = abspath(".") - if cwd not in sys.path: - sys.path.insert(1, cwd) - - with MPIPoolExecutor( - max_workers=int(argument_dict["total_cores"]), - path=sys.path, # required for flux interface - otherwise the current path is not included in the python path - ) as executor: - if executor is not None: - context, socket = interface_connect( - host=argument_dict["host"], port=argument_dict["zmqport"] - ) - while True: - output = parse_socket_communication( - executor=executor, - input_dict=interface_receive(socket=socket), - future_dict=future_dict, - cores_per_task=int(argument_dict["cores_per_task"]), - ) - if "exit" in output.keys() and output["exit"]: - if "result" in output.keys(): - interface_send( - socket=socket, result_dict={"result": output["result"]} - ) - else: - interface_send(socket=socket, result_dict={"result": True}) - interface_shutdown(socket=socket, context=context) - break - elif isinstance(output, dict): - interface_send(socket=socket, result_dict=output) - - -if __name__ == "__main__": - main() diff --git a/pympipool/legacy/interfaces/__init__.py b/pympipool/legacy/interfaces/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/pympipool/legacy/interfaces/pool.py b/pympipool/legacy/interfaces/pool.py deleted file mode 100644 index 120e4708..00000000 --- a/pympipool/legacy/interfaces/pool.py +++ /dev/null @@ -1,235 +0,0 @@ -from abc import ABC - -from pympipool.shared.executorbase import cloudpickle_register -from pympipool.legacy.shared.interface import get_pool_command -from pympipool.legacy.shared.connections import interface_bootup - - -class PoolBase(ABC): - """ - Base class for the Pool and MPISpawnPool classes defined below. The PoolBase class is not intended to be used - alone. Rather it implements the __enter__(), __exit__() and shutdown() function shared between the derived classes. - """ - - def __init__(self): - self._future_dict = {} - self._interface = None - cloudpickle_register(ind=3) - - def __enter__(self): - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - self.shutdown(wait=True) - return False - - def shutdown(self, wait=True): - self._interface.shutdown(wait=wait) - - -class Pool(PoolBase): - """ - The pympipool.Pool behaves like the multiprocessing.Pool but it uses mpi4py to distribute tasks. In contrast to the - mpi4py.futures.MPIPoolExecutor the pympipool.Pool can be executed in a serial python process and does not require - the python script to be executed with MPI. Still internally the pympipool.Pool uses the - mpi4py.futures.MPIPoolExecutor, consequently it is primarily an abstraction of its functionality to improve the - usability in particular when used in combination with Jupyter notebooks. - - Args: - max_workers (int): defines the total number of MPI ranks to use - gpus_per_task (int): number of GPUs per MPI rank - defaults to 0 - oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI only) - enable_flux_backend (bool): use the flux-framework as backend - enable_slurm_backend (bool): enable the SLURM queueing system as backend - defaults to False - cwd (str/None): current working directory where the parallel python task is executed - queue_adapter (pysqa.queueadapter.QueueAdapter): generalized interface to various queuing systems - queue_adapter_kwargs (dict/None): keyword arguments for the submit_job() function of the queue adapter - - Examples: - ``` - >>> import numpy as np - >>> from pympipool import Pool - >>> - >>> def calc(i): - >>> return np.array(i ** 2) - >>> - >>> with Pool(cores=2) as p: - >>> print(p.map(func=calc, iterable=[1, 2, 3, 4])) - ``` - """ - - def __init__( - self, - max_workers=1, - gpus_per_task=0, - oversubscribe=False, - enable_flux_backend=False, - enable_slurm_backend=False, - cwd=None, - queue_adapter=None, - queue_adapter_kwargs=None, - ): - super().__init__() - self._interface = interface_bootup( - command_lst=get_pool_command(cores_total=max_workers, ranks_per_task=1)[0], - cwd=cwd, - cores=max_workers, - gpus_per_core=gpus_per_task, - oversubscribe=oversubscribe, - enable_flux_backend=enable_flux_backend, - enable_slurm_backend=enable_slurm_backend, - queue_adapter=queue_adapter, - queue_adapter_kwargs=queue_adapter_kwargs, - ) - - def map(self, func, iterable, chunksize=None): - """ - Map a given function on a list of attributes. - - Args: - func: function to be applied to each element of the following list - iterable (list): list of arguments the function should be applied on - chunksize (int/None): - - Returns: - list: list of output generated from applying the function on the list of arguments - """ - # multiprocessing.pool.Pool and mpi4py.future.ExecutorPool have different defaults - if chunksize is None: - chunksize = 1 - return self._interface.send_and_receive_dict( - input_dict={ - "fn": func, - "iterable": iterable, - "chunksize": chunksize, - "map": True, - } - ) - - def starmap(self, func, iterable, chunksize=None): - """ - Map a given function on a list of attributes. - - Args: - func: function to be applied to each element of the following list - iterable (list): list of arguments the function should be applied on - chunksize (int/None): - - Returns: - list: list of output generated from applying the function on the list of arguments - """ - # multiprocessing.pool.Pool and mpi4py.future.ExecutorPool have different defaults - if chunksize is None: - chunksize = 1 - return self._interface.send_and_receive_dict( - input_dict={ - "fn": func, - "iterable": iterable, - "chunksize": chunksize, - "map": False, - } - ) - - -class MPISpawnPool(PoolBase): - """ - The pympipool.MPISpawnPool behaves like the multiprocessing.Pool but it uses mpi4py to distribute tasks. In contrast - to the mpi4py.futures.MPIPoolExecutor the pympipool.MPISpawnPool can be executed in a serial python process and does - not require the python script to be executed with MPI. Still internally the pympipool.Pool uses the - mpi4py.futures.MPIPoolExecutor, consequently it is primarily an abstraction of its functionality to improve the - usability in particular when used in combination with Jupyter notebooks. - - Args: - max_ranks (int): defines the total number of MPI ranks to use - ranks_per_task (int): defines the number of MPI ranks per task - gpus_per_task (int): number of GPUs per MPI rank - defaults to 0 - oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI only) - cwd (str/None): current working directory where the parallel python task is executed - queue_adapter (pysqa.queueadapter.QueueAdapter): generalized interface to various queuing systems - queue_adapter_kwargs (dict/None): keyword arguments for the submit_job() function of the queue adapter - - Simple example: - ``` - from pympipool import MPISpawnPool - - def calc(i, comm): - return i, comm.Get_size(), comm.Get_rank() - - with MPISpawnPool(max_ranks=4, ranks_per_task=2) as p: - print(p.map(func=calc, iterable=[1, 2, 3, 4])) - ``` - """ - - def __init__( - self, - max_ranks=1, - ranks_per_task=1, - gpus_per_task=0, - oversubscribe=False, - cwd=None, - queue_adapter=None, - queue_adapter_kwargs=None, - ): - super().__init__() - command_lst, cores = get_pool_command( - cores_total=max_ranks, ranks_per_task=ranks_per_task - ) - self._interface = interface_bootup( - command_lst=command_lst, - cwd=cwd, - cores=cores, - gpus_per_core=gpus_per_task, - oversubscribe=oversubscribe, - enable_flux_backend=False, - enable_slurm_backend=False, - queue_adapter=queue_adapter, - queue_adapter_kwargs=queue_adapter_kwargs, - ) - - def map(self, func, iterable, chunksize=None): - """ - Map a given function on a list of attributes. - - Args: - func: function to be applied to each element of the following list - iterable (list): list of arguments the function should be applied on - chunksize (int/None): - - Returns: - list: list of output generated from applying the function on the list of arguments - """ - # multiprocessing.pool.Pool and mpi4py.future.ExecutorPool have different defaults - if chunksize is None: - chunksize = 1 - return self._interface.send_and_receive_dict( - input_dict={ - "fn": func, - "iterable": iterable, - "chunksize": chunksize, - "map": True, - } - ) - - def starmap(self, func, iterable, chunksize=None): - """ - Map a given function on a list of attributes. - - Args: - func: function to be applied to each element of the following list - iterable (list): list of arguments the function should be applied on - chunksize (int/None): - - Returns: - list: list of output generated from applying the function on the list of arguments - """ - # multiprocessing.pool.Pool and mpi4py.future.ExecutorPool have different defaults - if chunksize is None: - chunksize = 1 - return self._interface.send_and_receive_dict( - input_dict={ - "fn": func, - "iterable": iterable, - "chunksize": chunksize, - "map": False, - } - ) diff --git a/pympipool/legacy/interfaces/poolexecutor.py b/pympipool/legacy/interfaces/poolexecutor.py deleted file mode 100644 index d94b8b25..00000000 --- a/pympipool/legacy/interfaces/poolexecutor.py +++ /dev/null @@ -1,122 +0,0 @@ -from pympipool.shared.executorbase import cloudpickle_register, ExecutorBase -from pympipool.shared.thread import RaisingThread -from pympipool.legacy.shared.connections import interface_bootup -from pympipool.legacy.shared.interface import ( - get_pool_command, - _execute_serial_tasks_loop, -) - - -class PoolExecutor(ExecutorBase): - """ - To combine the functionality of the pympipool.Pool and the pympipool.Executor the pympipool.PoolExecutor again - connects to the mpi4py.futures.MPIPoolExecutor. Still in contrast to the pympipool.Pool it does not implement the - map() and starmap() functions but rather the submit() function based on the concurrent.futures.Executor interface. - In this case the load balancing happens internally and the maximum number of workers max_workers defines the maximum - number of parallel tasks. But only serial python tasks can be executed in contrast to the pympipool.Executor which - can also execute MPI parallel python tasks. - - Args: - max_workers (int): defines the total number of MPI ranks to use - gpus_per_task (int): number of GPUs per MPI rank - defaults to 0 - oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI only) - default False - enable_flux_backend (bool): use the flux-framework as backend rather than just calling mpiexec - enable_slurm_backend (bool): enable the SLURM queueing system as backend - defaults to False - cwd (str/None): current working directory where the parallel python task is executed - sleep_interval (float): - queue_adapter (pysqa.queueadapter.QueueAdapter): generalized interface to various queuing systems - queue_adapter_kwargs (dict/None): keyword arguments for the submit_job() function of the queue adapter - - Examples: - ``` - >>> from pympipool import PoolExecutor - >>> - >>> def calc(i, j): - >>> return i + j - >>> - >>> with PoolExecutor(max_workers=2) as p: - >>> fs1 = p.submit(calc, 1, 2) - >>> fs2 = p.submit(calc, 3, 4) - >>> fs3 = p.submit(calc, 5, 6) - >>> fs4 = p.submit(calc, 7, 8) - >>> print(fs1.result(), fs2.result(), fs3.result(), fs4.result() - ``` - """ - - def __init__( - self, - max_workers=1, - gpus_per_task=0, - oversubscribe=False, - enable_flux_backend=False, - enable_slurm_backend=False, - cwd=None, - sleep_interval=0.1, - queue_adapter=None, - queue_adapter_kwargs=None, - ): - super().__init__() - self._process = RaisingThread( - target=execute_serial_tasks, - kwargs={ - "future_queue": self._future_queue, - "cores": max_workers, - "gpus_per_task": gpus_per_task, - "oversubscribe": oversubscribe, - "enable_flux_backend": enable_flux_backend, - "enable_slurm_backend": enable_slurm_backend, - "cwd": cwd, - "sleep_interval": sleep_interval, - "queue_adapter": queue_adapter, - "queue_adapter_kwargs": queue_adapter_kwargs, - }, - ) - self._process.start() - cloudpickle_register(ind=3) - - -def execute_serial_tasks( - future_queue, - cores, - gpus_per_task=0, - oversubscribe=False, - enable_flux_backend=False, - enable_slurm_backend=False, - cwd=None, - sleep_interval=0.1, - queue_adapter=None, - queue_adapter_kwargs=None, -): - """ - Execute a single tasks in serial. - - Args: - future_queue (queue.Queue): task queue of dictionary objects which are submitted to the parallel process - cores (int): defines the total number of MPI ranks to use - gpus_per_task (int): number of GPUs per MPI rank - defaults to 0 - oversubscribe (bool): enable of disable the oversubscribe feature of OpenMPI - defaults to False - enable_flux_backend (bool): enable the flux-framework as backend - defaults to False - enable_slurm_backend (bool): enable the SLURM queueing system as backend - defaults to False - cwd (str/None): current working directory where the parallel python task is executed - sleep_interval (float): - queue_adapter (pysqa.queueadapter.QueueAdapter): generalized interface to various queuing systems - queue_adapter_kwargs (dict/None): keyword arguments for the submit_job() function of the queue adapter - """ - future_dict = {} - interface = interface_bootup( - command_lst=get_pool_command(cores_total=cores, ranks_per_task=1)[0], - cwd=cwd, - cores=cores, - gpus_per_core=gpus_per_task, - oversubscribe=oversubscribe, - enable_flux_backend=enable_flux_backend, - enable_slurm_backend=enable_slurm_backend, - queue_adapter=queue_adapter, - queue_adapter_kwargs=queue_adapter_kwargs, - ) - _execute_serial_tasks_loop( - interface=interface, - future_queue=future_queue, - future_dict=future_dict, - sleep_interval=sleep_interval, - ) diff --git a/pympipool/legacy/shared/__init__.py b/pympipool/legacy/shared/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/pympipool/legacy/shared/backend.py b/pympipool/legacy/shared/backend.py deleted file mode 100644 index 6ab70d36..00000000 --- a/pympipool/legacy/shared/backend.py +++ /dev/null @@ -1,133 +0,0 @@ -from tqdm import tqdm - -from pympipool.shared.backend import call_funct, update_default_dict_from_arguments - - -def map_funct(executor, funct, lst, chunksize=1, cores_per_task=1, map_flag=True): - if cores_per_task == 1: - if map_flag: - results = executor.map(funct, lst, chunksize=chunksize) - else: - results = executor.starmap(funct, lst, chunksize=chunksize) - return list(tqdm(results, desc="Tasks", total=len(lst))) - else: - lst_parallel = [] - for input_parameter in lst: - for _ in range(cores_per_task): - lst_parallel.append(input_parameter) - if map_flag: - results = executor.map( - _wrap(funct=funct, number_of_cores_per_communicator=cores_per_task), - lst_parallel, - chunksize=chunksize, - ) - else: - results = executor.starmap( - _wrap(funct=funct, number_of_cores_per_communicator=cores_per_task), - lst_parallel, - chunksize=chunksize, - ) - return list(tqdm(results, desc="Tasks", total=len(lst_parallel)))[ - ::cores_per_task - ] - - -def parse_arguments(argument_lst): - """ - Simple function to parse command line arguments - - Args: - argument_lst (list): list of arguments as strings - - Returns: - dict: dictionary with the parsed arguments and their corresponding values - """ - return update_default_dict_from_arguments( - argument_lst=argument_lst, - argument_dict={ - "total_cores": "--cores-total", - "zmqport": "--zmqport", - "cores_per_task": "--cores-per-task", - "host": "--host", - }, - default_dict={"host": "localhost"}, - ) - - -def parse_socket_communication(executor, input_dict, future_dict, cores_per_task=1): - if "shutdown" in input_dict.keys() and input_dict["shutdown"]: - executor.shutdown(wait=input_dict["wait"]) - done_dict = _update_futures(future_dict=future_dict) - # If close "shutdown" is communicated the process is shutdown. - if done_dict is not None and len(done_dict) > 0: - return {"exit": True, "result": done_dict} - else: - return {"exit": True} - elif "fn" in input_dict.keys() and "iterable" in input_dict.keys(): - # If a function "fn" and a list or arguments "iterable" are communicated, - # pympipool uses the map() function to apply the function on the list. - try: - output = map_funct( - executor=executor, - funct=input_dict["fn"], - lst=input_dict["iterable"], - cores_per_task=cores_per_task, - chunksize=input_dict["chunksize"], - map_flag=input_dict["map"], - ) - except Exception as error: - return {"error": error, "error_type": str(type(error))} - else: - return {"result": output} - elif ( - "fn" in input_dict.keys() - and "args" in input_dict.keys() - and "kwargs" in input_dict.keys() - ): - # If a function "fn", arguments "args" and keyword arguments "kwargs" are - # communicated pympipool uses submit() to asynchronously apply the function - # on the arguments and or keyword arguments. - future = call_funct(input_dict=input_dict, funct=executor.submit) - future_hash = hash(future) - future_dict[future_hash] = future - return {"result": future_hash} - elif "update" in input_dict.keys(): - # If update "update" is communicated pympipool checks for asynchronously submitted - # functions which have completed in the meantime and communicates their results. - done_dict = _update_futures( - future_dict=future_dict, hash_lst=input_dict["update"] - ) - return {"result": done_dict} - elif "cancel" in input_dict.keys(): - for k in input_dict["cancel"]: - future_dict[k].cancel() - return {"result": True} - - -def _update_futures(future_dict, hash_lst=None): - if hash_lst is None: - hash_lst = list(future_dict.keys()) - done_dict = { - k: f.result() - for k, f in {k: future_dict[k] for k in hash_lst}.items() - if f.done() - } - for k in done_dict.keys(): - del future_dict[k] - return done_dict - - -def _wrap(funct, number_of_cores_per_communicator=1): - def functwrapped(*args, **kwargs): - from mpi4py import MPI - - MPI.COMM_WORLD.Barrier() - rank = MPI.COMM_WORLD.Get_rank() - comm_new = MPI.COMM_WORLD.Split( - rank // number_of_cores_per_communicator, - rank % number_of_cores_per_communicator, - ) - comm_new.Barrier() - return funct(*args, comm=comm_new, **kwargs) - - return functwrapped diff --git a/pympipool/legacy/shared/connections.py b/pympipool/legacy/shared/connections.py deleted file mode 100644 index 0b0a3577..00000000 --- a/pympipool/legacy/shared/connections.py +++ /dev/null @@ -1,181 +0,0 @@ -from socket import gethostname - -from pympipool.shared.interface import ( - BaseInterface, - MpiExecInterface, - SlurmSubprocessInterface, - SubprocessInterface, - generate_mpiexec_command, - generate_slurm_command, -) -from pympipool.shared.communication import SocketInterface - - -class PysqaInterface(BaseInterface): - def __init__( - self, - cwd=None, - cores=1, - gpus_per_core=0, - oversubscribe=False, - queue_adapter=None, - queue_type=None, - queue_adapter_kwargs=None, - ): - super().__init__( - cwd=cwd, - cores=cores, - gpus_per_core=gpus_per_core, - oversubscribe=oversubscribe, - ) - self._queue_adapter = queue_adapter - self._queue_type = queue_type - self._queue_adapter_kwargs = queue_adapter_kwargs - self._queue_id = None - - def bootup(self, command_lst): - if self._queue_type.lower() == "slurm": - command_prepend_lst = generate_slurm_command( - cores=self._cores, - cwd=self._cwd, - gpus_per_core=self._gpus_per_core, - oversubscribe=self._oversubscribe, - ) - else: - command_prepend_lst = generate_mpiexec_command( - cores=self._cores, - gpus_per_core=self._gpus_per_core, - oversubscribe=self._oversubscribe, - ) - self._queue_id = self._queue_adapter.submit_job( - working_directory=self._cwd, - cores=self._cores, - command=" ".join(command_prepend_lst + command_lst), - **self._queue_adapter_kwargs - ) - - def shutdown(self, wait=True): - self._queue_adapter.delete_job(process_id=self._queue_id) - - def poll(self): - return self._queue_adapter is not None - - -class FluxCmdInterface(SubprocessInterface): - def generate_command(self, command_lst): - command_prepend_lst = [ - "flux", - "run", - "-n", - str(self._cores), - ] - if self._cwd is not None: - command_prepend_lst += [ - "--cwd=" + self._cwd, - ] - if self._threads_per_core > 1: - command_prepend_lst += ["--cores-per-task=" + str(self._threads_per_core)] - if self._gpus_per_core > 0: - command_prepend_lst += ["--gpus-per-task=" + str(self._gpus_per_core)] - return super().generate_command( - command_lst=command_prepend_lst + command_lst, - ) - - -def get_connection_interface( - cwd=None, - cores=1, - gpus_per_core=0, - oversubscribe=False, - enable_flux_backend=False, - enable_slurm_backend=False, - queue_adapter=None, - queue_type=None, - queue_adapter_kwargs=None, -): - """ - Backwards compatibility adapter to get the connection interface - - Args: - cwd (str/None): current working directory where the parallel python task is executed - cores (int): defines the total number of MPI ranks to use - gpus_per_core (int): number of GPUs per MPI rank - defaults to 0 - oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI only) - default False - enable_flux_backend (bool): use the flux-framework as backend rather than just calling mpiexec - enable_slurm_backend (bool): enable the SLURM queueing system as backend - defaults to False - queue_adapter (pysqa.queueadapter.QueueAdapter): generalized interface to various queuing systems - queue_type (str): type of the queuing system - queue_adapter_kwargs (dict/None): keyword arguments for the submit_job() function of the queue adapter - - Returns: - pympipool.shared.interface.BaseInterface: Connection interface - """ - if queue_adapter is not None: - connections = PysqaInterface( - cwd=cwd, - cores=cores, - gpus_per_core=gpus_per_core, - oversubscribe=oversubscribe, - queue_adapter=queue_adapter, - queue_type=queue_type, - queue_adapter_kwargs=queue_adapter_kwargs, - ) - elif enable_flux_backend: - connections = FluxCmdInterface( - cwd=cwd, - cores=cores, - gpus_per_core=gpus_per_core, - oversubscribe=oversubscribe, - ) - elif enable_slurm_backend: - connections = SlurmSubprocessInterface( - cwd=cwd, - cores=cores, - gpus_per_core=gpus_per_core, - oversubscribe=oversubscribe, - ) - else: - connections = MpiExecInterface( - cwd=cwd, - cores=cores, - gpus_per_core=gpus_per_core, - oversubscribe=oversubscribe, - ) - return connections - - -def interface_bootup( - command_lst, - cwd=None, - cores=1, - gpus_per_core=0, - oversubscribe=False, - enable_flux_backend=False, - enable_slurm_backend=False, - queue_adapter=None, - queue_type=None, - queue_adapter_kwargs=None, -): - if enable_flux_backend or enable_slurm_backend or queue_adapter is not None: - command_lst += [ - "--host", - gethostname(), - ] - connections = get_connection_interface( - cwd=cwd, - cores=cores, - gpus_per_core=gpus_per_core, - oversubscribe=oversubscribe, - enable_flux_backend=enable_flux_backend, - enable_slurm_backend=enable_slurm_backend, - queue_adapter=queue_adapter, - queue_type=queue_type, - queue_adapter_kwargs=queue_adapter_kwargs, - ) - interface = SocketInterface(interface=connections) - command_lst += [ - "--zmqport", - str(interface.bind_to_random_port()), - ] - interface.bootup(command_lst=command_lst) - return interface diff --git a/pympipool/legacy/shared/interface.py b/pympipool/legacy/shared/interface.py deleted file mode 100644 index edd5e778..00000000 --- a/pympipool/legacy/shared/interface.py +++ /dev/null @@ -1,68 +0,0 @@ -import os -import queue -import sys -import time - - -def get_pool_command(cores_total, ranks_per_task=1): - executable = os.path.abspath( - os.path.join(__file__, "..", "..", "backend", "mpipool.py") - ) - if ranks_per_task == 1: - command_lst = [sys.executable, "-m", "mpi4py.futures", executable] - cores = cores_total - else: - # Running MPI parallel tasks within the map() requires mpi4py to use mpi spawn: - # https://github.com/mpi4py/mpi4py/issues/324 - command_lst = [sys.executable, executable] - cores = 1 - command_lst += [ - "--cores-per-task", - str(ranks_per_task), - "--cores-total", - str(cores_total), - ] - return command_lst, cores - - -def _execute_serial_tasks_loop( - interface, future_queue, future_dict, sleep_interval=0.1 -): - while True: - try: - task_dict = future_queue.get_nowait() - except queue.Empty: - pass - else: - if "shutdown" in task_dict.keys() and task_dict["shutdown"]: - done_dict = interface.shutdown(wait=task_dict["wait"]) - if isinstance(done_dict, dict): - for k, v in done_dict.items(): - if k in future_dict.keys() and not future_dict[k].cancelled(): - future_dict.pop(k).set_result(v) - future_queue.task_done() - break - elif "fn" in task_dict.keys() and "future" in task_dict.keys(): - f = task_dict.pop("future") - future_hash = interface.send_and_receive_dict(input_dict=task_dict) - future_dict[future_hash] = f - future_queue.task_done() - _update_future_dict( - interface=interface, future_dict=future_dict, sleep_interval=sleep_interval - ) - - -def _update_future_dict(interface, future_dict, sleep_interval=0.1): - time.sleep(sleep_interval) - hash_to_update = [h for h, f in future_dict.items() if not f.done()] - hash_to_cancel = [h for h, f in future_dict.items() if f.cancelled()] - if len(hash_to_update) > 0: - for k, v in interface.send_and_receive_dict( - input_dict={"update": hash_to_update} - ).items(): - future_dict.pop(k).set_result(v) - if len(hash_to_cancel) > 0 and interface.send_and_receive_dict( - input_dict={"cancel": hash_to_cancel} - ): - for h in hash_to_cancel: - del future_dict[h] diff --git a/tests/test_communicator_split.py b/tests/test_communicator_split.py deleted file mode 100644 index 44ab60ed..00000000 --- a/tests/test_communicator_split.py +++ /dev/null @@ -1,66 +0,0 @@ -import unittest -from pympipool.legacy.interfaces.pool import MPISpawnPool - - -def get_ranks(input_parameter, comm=None): - from mpi4py import MPI - - size = MPI.COMM_WORLD.Get_size() - rank = MPI.COMM_WORLD.Get_rank() - if comm is not None: - size_new = comm.Get_size() - rank_new = comm.Get_rank() - else: - size_new = 0 - rank_new = 0 - return size, rank, size_new, rank_new, input_parameter - - -def get_ranks_multi_input(input_parameter1, input_parameter2, comm=None): - from mpi4py import MPI - - size = MPI.COMM_WORLD.Get_size() - rank = MPI.COMM_WORLD.Get_rank() - if comm is not None: - size_new = comm.Get_size() - rank_new = comm.Get_rank() - else: - size_new = 0 - rank_new = 0 - return size, rank, size_new, rank_new, input_parameter1, input_parameter2 - - -class TestCommunicator(unittest.TestCase): - def test_map_serial(self): - with MPISpawnPool(max_ranks=2, ranks_per_task=1) as p: - output = p.map(func=get_ranks, iterable=[1, 2, 3]) - self.assertEqual(output[0], (2, 1, 0, 0, 1)) - self.assertEqual(output[1], (2, 1, 0, 0, 2)) - self.assertEqual(output[2], (2, 1, 0, 0, 3)) - - def test_map_parallel(self): - with MPISpawnPool(max_ranks=2, ranks_per_task=2) as p: - output = p.map(func=get_ranks, iterable=[1, 2, 3, 4]) - self.assertEqual(output[0][::2], (2, 2, 1)) - self.assertEqual(output[1][::2], (2, 2, 2)) - self.assertEqual(output[2][::2], (2, 2, 3)) - self.assertEqual(output[3][::2], (2, 2, 4)) - - def test_starmap_serial(self): - with MPISpawnPool(max_ranks=2, ranks_per_task=1) as p: - output = p.starmap( - func=get_ranks_multi_input, iterable=[[1, 1], [2, 2], [3, 3]] - ) - self.assertEqual(output[0], (2, 1, 0, 0, 1, 1)) - self.assertEqual(output[1], (2, 1, 0, 0, 2, 2)) - self.assertEqual(output[2], (2, 1, 0, 0, 3, 3)) - - def test_starmap_parallel(self): - with MPISpawnPool(max_ranks=2, ranks_per_task=2) as p: - output = p.starmap( - func=get_ranks_multi_input, iterable=[[1, 1], [2, 2], [3, 3], [4, 4]] - ) - self.assertEqual(output[0][::2], (2, 2, 1)) - self.assertEqual(output[1][::2], (2, 2, 2)) - self.assertEqual(output[2][::2], (2, 2, 3)) - self.assertEqual(output[3][::2], (2, 2, 4)) diff --git a/tests/test_connection.py b/tests/test_connection.py deleted file mode 100644 index 5533eef8..00000000 --- a/tests/test_connection.py +++ /dev/null @@ -1,95 +0,0 @@ -import unittest -from pympipool.shared.interface import BaseInterface, SlurmSubprocessInterface -from pympipool.legacy.shared.connections import ( - MpiExecInterface, - PysqaInterface, - FluxCmdInterface, - get_connection_interface, -) - - -class Interface(BaseInterface): - def __init__(self, cwd, cores=1, gpus_per_core=0, oversubscribe=False): - super().__init__( - cwd=cwd, - cores=cores, - gpus_per_core=gpus_per_core, - oversubscribe=oversubscribe, - ) - - -class TestExecutor(unittest.TestCase): - def setUp(self): - self.interface = Interface( - cwd=None, cores=1, gpus_per_core=0, oversubscribe=False - ) - - def test_bootup(self): - with self.assertRaises(NotImplementedError): - self.interface.bootup(command_lst=[]) - - def test_shutdown(self): - with self.assertRaises(NotImplementedError): - self.interface.shutdown(wait=True) - - def test_poll(self): - with self.assertRaises(NotImplementedError): - self.interface.poll() - - -class TestInterfaceConnection(unittest.TestCase): - def test_mpiexec(self): - interface = get_connection_interface( - cwd=None, - cores=1, - gpus_per_core=0, - oversubscribe=False, - enable_flux_backend=False, - enable_slurm_backend=False, - queue_adapter=None, - queue_type=None, - queue_adapter_kwargs=None, - ) - self.assertIsInstance(interface, MpiExecInterface) - - def test_slurm(self): - interface = get_connection_interface( - cwd=None, - cores=1, - gpus_per_core=0, - oversubscribe=False, - enable_flux_backend=False, - enable_slurm_backend=True, - queue_adapter=None, - queue_type=None, - queue_adapter_kwargs=None, - ) - self.assertIsInstance(interface, SlurmSubprocessInterface) - - def test_pysqa(self): - interface = get_connection_interface( - cwd=None, - cores=1, - gpus_per_core=0, - oversubscribe=False, - enable_flux_backend=False, - enable_slurm_backend=False, - queue_adapter=True, - queue_type=None, - queue_adapter_kwargs=None, - ) - self.assertIsInstance(interface, PysqaInterface) - - def test_flux_cmd(self): - interface = get_connection_interface( - cwd=None, - cores=1, - gpus_per_core=0, - oversubscribe=False, - enable_flux_backend=True, - enable_slurm_backend=False, - queue_adapter=None, - queue_type=None, - queue_adapter_kwargs=None, - ) - self.assertIsInstance(interface, FluxCmdInterface) diff --git a/tests/test_executor.py b/tests/test_executor.py deleted file mode 100644 index eb88a26c..00000000 --- a/tests/test_executor.py +++ /dev/null @@ -1,154 +0,0 @@ -import unittest -from concurrent.futures import ThreadPoolExecutor -from pympipool.legacy.shared.backend import map_funct, parse_socket_communication -from pympipool.shared.backend import call_funct - - -def function_multi_args(a, b): - return a + b - - -class TestExecutor(unittest.TestCase): - def test_exec_funct_single_core_map(self): - with ThreadPoolExecutor(max_workers=1) as executor: - output = map_funct( - executor=executor, - funct=sum, - lst=[[1, 1], [2, 2]], - cores_per_task=1, - chunksize=1, - ) - self.assertEqual(output, [2, 4]) - - def test_exec_funct_single_core_starmap(self): - with self.assertRaises(AttributeError): - with ThreadPoolExecutor(max_workers=1) as executor: - map_funct( - executor=executor, - funct=sum, - lst=[[1, 1], [2, 2]], - cores_per_task=1, - chunksize=1, - map_flag=False, - ) - - def test_parse_socket_communication_close(self): - with ThreadPoolExecutor(max_workers=1) as executor: - output = parse_socket_communication( - executor=executor, - input_dict={"shutdown": True, "wait": True}, - future_dict={}, - cores_per_task=1, - ) - self.assertEqual(output, {"exit": True}) - - def test_parse_socket_communication_execute(self): - with ThreadPoolExecutor(max_workers=1) as executor: - output = parse_socket_communication( - executor=executor, - input_dict={ - "fn": sum, - "iterable": [[1, 1]], - "chunksize": 1, - "map": True, - }, - future_dict={}, - cores_per_task=1, - ) - self.assertEqual(output, {"result": [2]}) - - def test_parse_socket_communication_error(self): - with ThreadPoolExecutor(max_workers=1) as executor: - output = parse_socket_communication( - executor=executor, - input_dict={ - "fn": sum, - "iterable": [["a", "b"]], - "chunksize": 1, - "map": True, - }, - future_dict={}, - cores_per_task=1, - ) - self.assertEqual(output["error_type"], "") - - def test_parse_socket_communication_submit_args(self): - future_dict = {} - with ThreadPoolExecutor(max_workers=1) as executor: - output = parse_socket_communication( - executor=executor, - input_dict={"fn": sum, "args": [[1, 1]], "kwargs": {}}, - future_dict=future_dict, - cores_per_task=1, - ) - future = future_dict[output["result"]] - self.assertEqual(future.result(), 2) - - def test_parse_socket_communication_submit_kwargs(self): - future_dict = {} - with ThreadPoolExecutor(max_workers=1) as executor: - output = parse_socket_communication( - executor=executor, - input_dict={ - "fn": function_multi_args, - "args": (), - "kwargs": {"a": 1, "b": 2}, - }, - future_dict=future_dict, - cores_per_task=1, - ) - future = future_dict[output["result"]] - self.assertEqual(future.result(), 3) - - def test_parse_socket_communication_submit_both(self): - future_dict = {} - with ThreadPoolExecutor(max_workers=1) as executor: - output = parse_socket_communication( - executor=executor, - input_dict={"fn": function_multi_args, "args": [1], "kwargs": {"b": 2}}, - future_dict=future_dict, - cores_per_task=1, - ) - future = future_dict[output["result"]] - self.assertEqual(future.result(), 3) - - def test_parse_socket_communication_update(self): - future_dict = {} - with ThreadPoolExecutor(max_workers=1) as executor: - output = parse_socket_communication( - executor=executor, - input_dict={"fn": sum, "args": [[1, 1]], "kwargs": {}}, - future_dict=future_dict, - cores_per_task=1, - ) - future_hash = output["result"] - result = parse_socket_communication( - executor=executor, - input_dict={"update": [future_hash]}, - future_dict=future_dict, - cores_per_task=1, - ) - self.assertEqual(result, {"result": {future_hash: 2}}) - - def test_parse_socket_communication_cancel(self): - future_dict = {} - with ThreadPoolExecutor(max_workers=1) as executor: - output = parse_socket_communication( - executor=executor, - input_dict={"fn": sum, "args": [[1, 1]], "kwargs": {}}, - future_dict=future_dict, - cores_per_task=1, - ) - future_hash = output["result"] - result = parse_socket_communication( - executor=executor, - input_dict={"cancel": [future_hash]}, - future_dict=future_dict, - cores_per_task=1, - ) - self.assertEqual(result, {"result": True}) - - def test_funct_call_default(self): - self.assertEqual( - call_funct(input_dict={"fn": sum, "args": [[1, 2, 3]], "kwargs": {}}), 6 - ) diff --git a/tests/test_interface.py b/tests/test_interface.py index 61fb6a97..149e5f03 100644 --- a/tests/test_interface.py +++ b/tests/test_interface.py @@ -5,7 +5,7 @@ import unittest from pympipool.shared.communication import SocketInterface from pympipool.shared.executorbase import cloudpickle_register -from pympipool.legacy.shared.connections import MpiExecInterface +from pympipool.shared.interface import MpiExecInterface def calc(i): diff --git a/tests/test_multitask.py b/tests/test_multitask.py deleted file mode 100644 index a9bdb17d..00000000 --- a/tests/test_multitask.py +++ /dev/null @@ -1,92 +0,0 @@ -import numpy as np -import unittest -from queue import Queue -from time import sleep -from pympipool.legacy.interfaces.poolexecutor import PoolExecutor, execute_serial_tasks -from pympipool.shared.executorbase import cloudpickle_register -from concurrent.futures import Future - - -def calc(i): - return np.array(i**2) - - -def sleep_one(i): - sleep(1) - return i - - -def wait_and_calc(n): - sleep(1) - return n**2 - - -def call_back(future): - global_lst.append(future.result()) - - -global_lst = [] - - -class TestFuturePool(unittest.TestCase): - def test_pool_serial(self): - with PoolExecutor(max_workers=1) as p: - output = p.submit(calc, i=2) - self.assertEqual(len(p), 1) - self.assertTrue(isinstance(output, Future)) - self.assertFalse(output.done()) - sleep(1) - self.assertTrue(output.done()) - self.assertEqual(len(p), 0) - self.assertEqual(output.result(), np.array(4)) - - def test_execute_task(self): - f = Future() - q = Queue() - q.put({"fn": calc, "args": (), "kwargs": {"i": 2}, "future": f}) - q.put({"shutdown": True, "wait": True}) - cloudpickle_register(ind=1) - execute_serial_tasks( - future_queue=q, cores=1, oversubscribe=False, enable_flux_backend=False - ) - self.assertEqual(f.result(), np.array(4)) - q.join() - - def test_pool_cancel(self): - with PoolExecutor(max_workers=2, sleep_interval=0) as p: - fs1 = p.submit(sleep_one, i=2) - fs2 = p.submit(sleep_one, i=2) - fs3 = p.submit(sleep_one, i=2) - fs4 = p.submit(sleep_one, i=2) - sleep(1) - fs1.cancel() - fs2.cancel() - fs3.cancel() - fs4.cancel() - self.assertTrue(fs1.done()) - self.assertTrue(fs2.done()) - self.assertTrue(fs3.done()) - self.assertTrue(fs4.done()) - - def test_cancel_task(self): - fs1 = Future() - fs1.cancel() - q = Queue() - q.put({"fn": sleep_one, "args": (), "kwargs": {"i": 1}, "future": fs1}) - q.put({"shutdown": True, "wait": True}) - cloudpickle_register(ind=1) - execute_serial_tasks( - future_queue=q, cores=1, oversubscribe=False, enable_flux_backend=False - ) - self.assertTrue(fs1.done()) - self.assertTrue(fs1.cancelled()) - q.join() - - def test_waiting(self): - exe = PoolExecutor(max_workers=2) - f1 = exe.submit(wait_and_calc, 42) - f2 = exe.submit(wait_and_calc, 84) - f1.add_done_callback(call_back) - f2.add_done_callback(call_back) - exe.shutdown(wait=True) - self.assertTrue([42**2, 84**2], global_lst) diff --git a/tests/test_parse.py b/tests/test_parse.py index f44ef613..32123162 100644 --- a/tests/test_parse.py +++ b/tests/test_parse.py @@ -2,8 +2,7 @@ import sys import unittest from pympipool.shared.backend import parse_arguments -from pympipool.shared.interface import SlurmSubprocessInterface -from pympipool.legacy.shared.connections import MpiExecInterface, FluxCmdInterface +from pympipool.shared.interface import SlurmSubprocessInterface, MpiExecInterface class TestParser(unittest.TestCase): @@ -33,43 +32,6 @@ def test_command_local(self): ) self.assertEqual(result_dict, parse_arguments(command_lst)) - def test_command_flux(self): - result_dict = { - "host": "127.0.0.1", - "zmqport": "22", - } - command_lst = [ - "flux", - "run", - "-n", - "2", - "--cwd=" + os.path.abspath("."), - "--gpus-per-task=1", - sys.executable, - "/", - "--host", - result_dict["host"], - "--zmqport", - result_dict["zmqport"], - ] - interface = FluxCmdInterface( - cwd=os.path.abspath("."), cores=2, gpus_per_core=1, oversubscribe=False - ) - self.assertEqual( - command_lst, - interface.generate_command( - command_lst=[ - sys.executable, - "/", - "--host", - result_dict["host"], - "--zmqport", - result_dict["zmqport"], - ] - ), - ) - self.assertEqual(result_dict, parse_arguments(command_lst)) - def test_mpiexec_gpu(self): interface = MpiExecInterface( cwd=os.path.abspath("."), cores=2, gpus_per_core=1, oversubscribe=True diff --git a/tests/test_parse_legacy.py b/tests/test_parse_legacy.py deleted file mode 100644 index 46b4c041..00000000 --- a/tests/test_parse_legacy.py +++ /dev/null @@ -1,99 +0,0 @@ -import unittest -import os -import sys - -from pympipool.legacy.shared.backend import parse_arguments -from pympipool.legacy.shared.connections import MpiExecInterface, FluxCmdInterface - - -class TestParser(unittest.TestCase): - def test_command_local(self): - result_dict = { - "host": "localhost", - "total_cores": "2", - "zmqport": "22", - "cores_per_task": "1", - } - command_lst = [ - "mpiexec", - "-n", - result_dict["total_cores"], - "--oversubscribe", - sys.executable, - "-m", - "mpi4py.futures", - "/", - "--zmqport", - result_dict["zmqport"], - "--cores-per-task", - result_dict["cores_per_task"], - "--cores-total", - result_dict["total_cores"], - ] - interface = MpiExecInterface( - cwd=None, cores=2, gpus_per_core=0, oversubscribe=True - ) - self.assertEqual( - command_lst, - interface.generate_command( - command_lst=[ - sys.executable, - "-m", - "mpi4py.futures", - "/", - "--zmqport", - result_dict["zmqport"], - "--cores-per-task", - "1", - "--cores-total", - "2", - ] - ), - ) - self.assertEqual(result_dict, parse_arguments(command_lst)) - - def test_command_flux(self): - result_dict = { - "host": "127.0.0.1", - "total_cores": "2", - "zmqport": "22", - "cores_per_task": "2", - } - command_lst = [ - "flux", - "run", - "-n", - "1", - "--cwd=" + os.path.abspath("."), - sys.executable, - "/", - "--host", - result_dict["host"], - "--zmqport", - result_dict["zmqport"], - "--cores-per-task", - result_dict["cores_per_task"], - "--cores-total", - result_dict["total_cores"], - ] - interface = FluxCmdInterface( - cwd=os.path.abspath("."), cores=1, gpus_per_core=0, oversubscribe=False - ) - self.assertEqual( - command_lst, - interface.generate_command( - command_lst=[ - sys.executable, - "/", - "--host", - result_dict["host"], - "--zmqport", - result_dict["zmqport"], - "--cores-per-task", - "2", - "--cores-total", - "2", - ] - ), - ) - self.assertEqual(result_dict, parse_arguments(command_lst)) diff --git a/tests/test_pool.py b/tests/test_pool.py deleted file mode 100644 index f574b055..00000000 --- a/tests/test_pool.py +++ /dev/null @@ -1,76 +0,0 @@ -import numpy as np -import unittest -from pympipool.legacy.interfaces.pool import Pool - - -def calc(i): - return np.array(i**2) - - -def calc_add(i, j): - return i + j - - -def calc_none(i): - return None - - -def calc_error_value_error(i): - raise ValueError("calc_error value error") - - -def calc_error_type_error(i): - raise TypeError("calc_error value error") - - -class TestPool(unittest.TestCase): - def test_map_serial(self): - with Pool(max_workers=1) as p: - output = p.map(func=calc, iterable=[1, 2, 3, 4]) - self.assertEqual(output[0], 1) - self.assertEqual(output[1], 4) - self.assertEqual(output[2], 9) - self.assertEqual(output[3], 16) - - def test_starmap_serial(self): - with Pool(max_workers=1) as p: - output = p.starmap(func=calc_add, iterable=[[1, 2], [3, 4]]) - self.assertEqual(output[0], 3) - self.assertEqual(output[1], 7) - - def test_map_parallel(self): - with Pool(max_workers=2) as p: - output = p.map(func=calc, iterable=[1, 2, 3, 4]) - self.assertEqual(output[0], 1) - self.assertEqual(output[1], 4) - self.assertEqual(output[2], 9) - self.assertEqual(output[3], 16) - - def test_starmap_parallel(self): - with Pool(max_workers=2) as p: - output = p.starmap(func=calc_add, iterable=[[1, 2], [3, 4]]) - self.assertEqual(output[0], 3) - self.assertEqual(output[1], 7) - - def test_pool_none(self): - with Pool(max_workers=2) as p: - output = p.map(func=calc_none, iterable=[1, 2, 3, 4]) - self.assertEqual(output, [None, None, None, None]) - - def test_pool_error(self): - with self.assertRaises(ValueError): - with Pool(max_workers=2) as p: - p.map(func=calc_error_value_error, iterable=[1, 2, 3, 4]) - with self.assertRaises(TypeError): - with Pool(max_workers=2) as p: - p.map(func=calc_error_type_error, iterable=[1, 2, 3, 4]) - - def test_shutdown(self): - p = Pool(max_workers=1) - output = p.map(func=calc, iterable=[1, 2, 3, 4]) - p.shutdown(wait=True) - p.shutdown(wait=False) - self.assertEqual(output[0], 1) - self.assertEqual(output[1], 4) - self.assertEqual(output[2], 9) - self.assertEqual(output[3], 16)