diff --git a/pympipool/__init__.py b/pympipool/__init__.py index fb39ce25..aa165f21 100644 --- a/pympipool/__init__.py +++ b/pympipool/__init__.py @@ -9,8 +9,6 @@ from pympipool.interfaces.taskbroker import HPCExecutor from pympipool.interfaces.fluxbroker import PyFluxExecutor from pympipool.interfaces.taskexecutor import Executor -from pympipool.legacy.interfaces.executor import PoolExecutor -from pympipool.legacy.interfaces.pool import Pool, MPISpawnPool from pympipool.shared.thread import RaisingThread from pympipool.shared.taskexecutor import cancel_items_in_queue diff --git a/pympipool/legacy/__init__.py b/pympipool/legacy/__init__.py deleted file mode 100644 index 19f7bf84..00000000 --- a/pympipool/legacy/__init__.py +++ /dev/null @@ -1,2 +0,0 @@ -from pympipool.legacy.interfaces.executor import PoolExecutor -from pympipool.legacy.interfaces.pool import Pool, MPISpawnPool diff --git a/pympipool/legacy/backend/__init__.py b/pympipool/legacy/backend/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/pympipool/legacy/backend/mpipool.py b/pympipool/legacy/backend/mpipool.py deleted file mode 100644 index 60230a57..00000000 --- a/pympipool/legacy/backend/mpipool.py +++ /dev/null @@ -1,63 +0,0 @@ -from os.path import abspath -import pickle -import sys - -import cloudpickle - -from pympipool.shared.communication import ( - interface_connect, - interface_send, - interface_shutdown, - interface_receive, -) -from pympipool.legacy.shared.backend import parse_socket_communication, parse_arguments - - -def main(): - from mpi4py import MPI - - MPI.pickle.__init__( - cloudpickle.dumps, - cloudpickle.loads, - pickle.HIGHEST_PROTOCOL, - ) - from mpi4py.futures import MPIPoolExecutor - - future_dict = {} - argument_dict = parse_arguments(argument_lst=sys.argv) - - # required for flux interface - otherwise the current path is not included in the python path - cwd = abspath(".") - if cwd not in sys.path: - sys.path.insert(1, cwd) - - with MPIPoolExecutor( - max_workers=int(argument_dict["total_cores"]), - path=sys.path, # required for flux interface - otherwise the current path is not included in the python path - ) as executor: - if executor is not None: - context, socket = interface_connect( - host=argument_dict["host"], port=argument_dict["zmqport"] - ) - while True: - output = parse_socket_communication( - executor=executor, - input_dict=interface_receive(socket=socket), - future_dict=future_dict, - cores_per_task=int(argument_dict["cores_per_task"]), - ) - if "exit" in output.keys() and output["exit"]: - if "result" in output.keys(): - interface_send( - socket=socket, result_dict={"result": output["result"]} - ) - else: - interface_send(socket=socket, result_dict={"result": True}) - interface_shutdown(socket=socket, context=context) - break - elif isinstance(output, dict): - interface_send(socket=socket, result_dict=output) - - -if __name__ == "__main__": - main() diff --git a/pympipool/legacy/interfaces/__init__.py b/pympipool/legacy/interfaces/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/pympipool/legacy/interfaces/executor.py b/pympipool/legacy/interfaces/executor.py deleted file mode 100644 index 87dfeec9..00000000 --- a/pympipool/legacy/interfaces/executor.py +++ /dev/null @@ -1,72 +0,0 @@ -from pympipool.shared.base import ExecutorBase -from pympipool.shared.thread import RaisingThread -from pympipool.legacy.shared.interface import execute_serial_tasks -from pympipool.shared.taskexecutor import cloudpickle_register - - -class PoolExecutor(ExecutorBase): - """ - To combine the functionality of the pympipool.Pool and the pympipool.Executor the pympipool.PoolExecutor again - connects to the mpi4py.futures.MPIPoolExecutor. Still in contrast to the pympipool.Pool it does not implement the - map() and starmap() functions but rather the submit() function based on the concurrent.futures.Executor interface. - In this case the load balancing happens internally and the maximum number of workers max_workers defines the maximum - number of parallel tasks. But only serial python tasks can be executed in contrast to the pympipool.Executor which - can also execute MPI parallel python tasks. - - Args: - max_workers (int): defines the total number of MPI ranks to use - gpus_per_task (int): number of GPUs per MPI rank - defaults to 0 - oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI only) - default False - enable_flux_backend (bool): use the flux-framework as backend rather than just calling mpiexec - enable_slurm_backend (bool): enable the SLURM queueing system as backend - defaults to False - cwd (str/None): current working directory where the parallel python task is executed - sleep_interval (float): - queue_adapter (pysqa.queueadapter.QueueAdapter): generalized interface to various queuing systems - queue_adapter_kwargs (dict/None): keyword arguments for the submit_job() function of the queue adapter - - Examples: - ``` - >>> from pympipool import PoolExecutor - >>> - >>> def calc(i, j): - >>> return i + j - >>> - >>> with PoolExecutor(max_workers=2) as p: - >>> fs1 = p.submit(calc, 1, 2) - >>> fs2 = p.submit(calc, 3, 4) - >>> fs3 = p.submit(calc, 5, 6) - >>> fs4 = p.submit(calc, 7, 8) - >>> print(fs1.result(), fs2.result(), fs3.result(), fs4.result() - ``` - """ - - def __init__( - self, - max_workers=1, - gpus_per_task=0, - oversubscribe=False, - enable_flux_backend=False, - enable_slurm_backend=False, - cwd=None, - sleep_interval=0.1, - queue_adapter=None, - queue_adapter_kwargs=None, - ): - super().__init__() - self._process = RaisingThread( - target=execute_serial_tasks, - kwargs={ - "future_queue": self._future_queue, - "cores": max_workers, - "gpus_per_task": gpus_per_task, - "oversubscribe": oversubscribe, - "enable_flux_backend": enable_flux_backend, - "enable_slurm_backend": enable_slurm_backend, - "cwd": cwd, - "sleep_interval": sleep_interval, - "queue_adapter": queue_adapter, - "queue_adapter_kwargs": queue_adapter_kwargs, - }, - ) - self._process.start() - cloudpickle_register(ind=3) diff --git a/pympipool/legacy/interfaces/pool.py b/pympipool/legacy/interfaces/pool.py deleted file mode 100644 index da9b37fc..00000000 --- a/pympipool/legacy/interfaces/pool.py +++ /dev/null @@ -1,235 +0,0 @@ -from abc import ABC - -from pympipool.shared.communication import interface_bootup -from pympipool.shared.taskexecutor import cloudpickle_register -from pympipool.legacy.shared.interface import get_pool_command - - -class PoolBase(ABC): - """ - Base class for the Pool and MPISpawnPool classes defined below. The PoolBase class is not intended to be used - alone. Rather it implements the __enter__(), __exit__() and shutdown() function shared between the derived classes. - """ - - def __init__(self): - self._future_dict = {} - self._interface = None - cloudpickle_register(ind=3) - - def __enter__(self): - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - self.shutdown(wait=True) - return False - - def shutdown(self, wait=True): - self._interface.shutdown(wait=wait) - - -class Pool(PoolBase): - """ - The pympipool.Pool behaves like the multiprocessing.Pool but it uses mpi4py to distribute tasks. In contrast to the - mpi4py.futures.MPIPoolExecutor the pympipool.Pool can be executed in a serial python process and does not require - the python script to be executed with MPI. Still internally the pympipool.Pool uses the - mpi4py.futures.MPIPoolExecutor, consequently it is primarily an abstraction of its functionality to improve the - usability in particular when used in combination with Jupyter notebooks. - - Args: - max_workers (int): defines the total number of MPI ranks to use - gpus_per_task (int): number of GPUs per MPI rank - defaults to 0 - oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI only) - enable_flux_backend (bool): use the flux-framework as backend - enable_slurm_backend (bool): enable the SLURM queueing system as backend - defaults to False - cwd (str/None): current working directory where the parallel python task is executed - queue_adapter (pysqa.queueadapter.QueueAdapter): generalized interface to various queuing systems - queue_adapter_kwargs (dict/None): keyword arguments for the submit_job() function of the queue adapter - - Examples: - ``` - >>> import numpy as np - >>> from pympipool import Pool - >>> - >>> def calc(i): - >>> return np.array(i ** 2) - >>> - >>> with Pool(cores=2) as p: - >>> print(p.map(func=calc, iterable=[1, 2, 3, 4])) - ``` - """ - - def __init__( - self, - max_workers=1, - gpus_per_task=0, - oversubscribe=False, - enable_flux_backend=False, - enable_slurm_backend=False, - cwd=None, - queue_adapter=None, - queue_adapter_kwargs=None, - ): - super().__init__() - self._interface = interface_bootup( - command_lst=get_pool_command(cores_total=max_workers, ranks_per_task=1)[0], - cwd=cwd, - cores=max_workers, - gpus_per_core=gpus_per_task, - oversubscribe=oversubscribe, - enable_flux_backend=enable_flux_backend, - enable_slurm_backend=enable_slurm_backend, - queue_adapter=queue_adapter, - queue_adapter_kwargs=queue_adapter_kwargs, - ) - - def map(self, func, iterable, chunksize=None): - """ - Map a given function on a list of attributes. - - Args: - func: function to be applied to each element of the following list - iterable (list): list of arguments the function should be applied on - chunksize (int/None): - - Returns: - list: list of output generated from applying the function on the list of arguments - """ - # multiprocessing.pool.Pool and mpi4py.future.ExecutorPool have different defaults - if chunksize is None: - chunksize = 1 - return self._interface.send_and_receive_dict( - input_dict={ - "fn": func, - "iterable": iterable, - "chunksize": chunksize, - "map": True, - } - ) - - def starmap(self, func, iterable, chunksize=None): - """ - Map a given function on a list of attributes. - - Args: - func: function to be applied to each element of the following list - iterable (list): list of arguments the function should be applied on - chunksize (int/None): - - Returns: - list: list of output generated from applying the function on the list of arguments - """ - # multiprocessing.pool.Pool and mpi4py.future.ExecutorPool have different defaults - if chunksize is None: - chunksize = 1 - return self._interface.send_and_receive_dict( - input_dict={ - "fn": func, - "iterable": iterable, - "chunksize": chunksize, - "map": False, - } - ) - - -class MPISpawnPool(PoolBase): - """ - The pympipool.MPISpawnPool behaves like the multiprocessing.Pool but it uses mpi4py to distribute tasks. In contrast - to the mpi4py.futures.MPIPoolExecutor the pympipool.MPISpawnPool can be executed in a serial python process and does - not require the python script to be executed with MPI. Still internally the pympipool.Pool uses the - mpi4py.futures.MPIPoolExecutor, consequently it is primarily an abstraction of its functionality to improve the - usability in particular when used in combination with Jupyter notebooks. - - Args: - max_ranks (int): defines the total number of MPI ranks to use - ranks_per_task (int): defines the number of MPI ranks per task - gpus_per_task (int): number of GPUs per MPI rank - defaults to 0 - oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI only) - cwd (str/None): current working directory where the parallel python task is executed - queue_adapter (pysqa.queueadapter.QueueAdapter): generalized interface to various queuing systems - queue_adapter_kwargs (dict/None): keyword arguments for the submit_job() function of the queue adapter - - Simple example: - ``` - from pympipool import MPISpawnPool - - def calc(i, comm): - return i, comm.Get_size(), comm.Get_rank() - - with MPISpawnPool(max_ranks=4, ranks_per_task=2) as p: - print(p.map(func=calc, iterable=[1, 2, 3, 4])) - ``` - """ - - def __init__( - self, - max_ranks=1, - ranks_per_task=1, - gpus_per_task=0, - oversubscribe=False, - cwd=None, - queue_adapter=None, - queue_adapter_kwargs=None, - ): - super().__init__() - command_lst, cores = get_pool_command( - cores_total=max_ranks, ranks_per_task=ranks_per_task - ) - self._interface = interface_bootup( - command_lst=command_lst, - cwd=cwd, - cores=cores, - gpus_per_core=gpus_per_task, - oversubscribe=oversubscribe, - enable_flux_backend=False, - enable_slurm_backend=False, - queue_adapter=queue_adapter, - queue_adapter_kwargs=queue_adapter_kwargs, - ) - - def map(self, func, iterable, chunksize=None): - """ - Map a given function on a list of attributes. - - Args: - func: function to be applied to each element of the following list - iterable (list): list of arguments the function should be applied on - chunksize (int/None): - - Returns: - list: list of output generated from applying the function on the list of arguments - """ - # multiprocessing.pool.Pool and mpi4py.future.ExecutorPool have different defaults - if chunksize is None: - chunksize = 1 - return self._interface.send_and_receive_dict( - input_dict={ - "fn": func, - "iterable": iterable, - "chunksize": chunksize, - "map": True, - } - ) - - def starmap(self, func, iterable, chunksize=None): - """ - Map a given function on a list of attributes. - - Args: - func: function to be applied to each element of the following list - iterable (list): list of arguments the function should be applied on - chunksize (int/None): - - Returns: - list: list of output generated from applying the function on the list of arguments - """ - # multiprocessing.pool.Pool and mpi4py.future.ExecutorPool have different defaults - if chunksize is None: - chunksize = 1 - return self._interface.send_and_receive_dict( - input_dict={ - "fn": func, - "iterable": iterable, - "chunksize": chunksize, - "map": False, - } - ) diff --git a/pympipool/legacy/shared/__init__.py b/pympipool/legacy/shared/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/pympipool/legacy/shared/backend.py b/pympipool/legacy/shared/backend.py deleted file mode 100644 index 6ab70d36..00000000 --- a/pympipool/legacy/shared/backend.py +++ /dev/null @@ -1,133 +0,0 @@ -from tqdm import tqdm - -from pympipool.shared.backend import call_funct, update_default_dict_from_arguments - - -def map_funct(executor, funct, lst, chunksize=1, cores_per_task=1, map_flag=True): - if cores_per_task == 1: - if map_flag: - results = executor.map(funct, lst, chunksize=chunksize) - else: - results = executor.starmap(funct, lst, chunksize=chunksize) - return list(tqdm(results, desc="Tasks", total=len(lst))) - else: - lst_parallel = [] - for input_parameter in lst: - for _ in range(cores_per_task): - lst_parallel.append(input_parameter) - if map_flag: - results = executor.map( - _wrap(funct=funct, number_of_cores_per_communicator=cores_per_task), - lst_parallel, - chunksize=chunksize, - ) - else: - results = executor.starmap( - _wrap(funct=funct, number_of_cores_per_communicator=cores_per_task), - lst_parallel, - chunksize=chunksize, - ) - return list(tqdm(results, desc="Tasks", total=len(lst_parallel)))[ - ::cores_per_task - ] - - -def parse_arguments(argument_lst): - """ - Simple function to parse command line arguments - - Args: - argument_lst (list): list of arguments as strings - - Returns: - dict: dictionary with the parsed arguments and their corresponding values - """ - return update_default_dict_from_arguments( - argument_lst=argument_lst, - argument_dict={ - "total_cores": "--cores-total", - "zmqport": "--zmqport", - "cores_per_task": "--cores-per-task", - "host": "--host", - }, - default_dict={"host": "localhost"}, - ) - - -def parse_socket_communication(executor, input_dict, future_dict, cores_per_task=1): - if "shutdown" in input_dict.keys() and input_dict["shutdown"]: - executor.shutdown(wait=input_dict["wait"]) - done_dict = _update_futures(future_dict=future_dict) - # If close "shutdown" is communicated the process is shutdown. - if done_dict is not None and len(done_dict) > 0: - return {"exit": True, "result": done_dict} - else: - return {"exit": True} - elif "fn" in input_dict.keys() and "iterable" in input_dict.keys(): - # If a function "fn" and a list or arguments "iterable" are communicated, - # pympipool uses the map() function to apply the function on the list. - try: - output = map_funct( - executor=executor, - funct=input_dict["fn"], - lst=input_dict["iterable"], - cores_per_task=cores_per_task, - chunksize=input_dict["chunksize"], - map_flag=input_dict["map"], - ) - except Exception as error: - return {"error": error, "error_type": str(type(error))} - else: - return {"result": output} - elif ( - "fn" in input_dict.keys() - and "args" in input_dict.keys() - and "kwargs" in input_dict.keys() - ): - # If a function "fn", arguments "args" and keyword arguments "kwargs" are - # communicated pympipool uses submit() to asynchronously apply the function - # on the arguments and or keyword arguments. - future = call_funct(input_dict=input_dict, funct=executor.submit) - future_hash = hash(future) - future_dict[future_hash] = future - return {"result": future_hash} - elif "update" in input_dict.keys(): - # If update "update" is communicated pympipool checks for asynchronously submitted - # functions which have completed in the meantime and communicates their results. - done_dict = _update_futures( - future_dict=future_dict, hash_lst=input_dict["update"] - ) - return {"result": done_dict} - elif "cancel" in input_dict.keys(): - for k in input_dict["cancel"]: - future_dict[k].cancel() - return {"result": True} - - -def _update_futures(future_dict, hash_lst=None): - if hash_lst is None: - hash_lst = list(future_dict.keys()) - done_dict = { - k: f.result() - for k, f in {k: future_dict[k] for k in hash_lst}.items() - if f.done() - } - for k in done_dict.keys(): - del future_dict[k] - return done_dict - - -def _wrap(funct, number_of_cores_per_communicator=1): - def functwrapped(*args, **kwargs): - from mpi4py import MPI - - MPI.COMM_WORLD.Barrier() - rank = MPI.COMM_WORLD.Get_rank() - comm_new = MPI.COMM_WORLD.Split( - rank // number_of_cores_per_communicator, - rank % number_of_cores_per_communicator, - ) - comm_new.Barrier() - return funct(*args, comm=comm_new, **kwargs) - - return functwrapped diff --git a/pympipool/legacy/shared/interface.py b/pympipool/legacy/shared/interface.py deleted file mode 100644 index c9cd6b9a..00000000 --- a/pympipool/legacy/shared/interface.py +++ /dev/null @@ -1,117 +0,0 @@ -import os -import queue -import sys -import time - -from pympipool.shared.communication import interface_bootup - - -def execute_serial_tasks( - future_queue, - cores, - gpus_per_task=0, - oversubscribe=False, - enable_flux_backend=False, - enable_slurm_backend=False, - cwd=None, - sleep_interval=0.1, - queue_adapter=None, - queue_adapter_kwargs=None, -): - """ - Execute a single tasks in serial. - - Args: - future_queue (queue.Queue): task queue of dictionary objects which are submitted to the parallel process - cores (int): defines the total number of MPI ranks to use - gpus_per_task (int): number of GPUs per MPI rank - defaults to 0 - oversubscribe (bool): enable of disable the oversubscribe feature of OpenMPI - defaults to False - enable_flux_backend (bool): enable the flux-framework as backend - defaults to False - enable_slurm_backend (bool): enable the SLURM queueing system as backend - defaults to False - cwd (str/None): current working directory where the parallel python task is executed - sleep_interval (float): - queue_adapter (pysqa.queueadapter.QueueAdapter): generalized interface to various queuing systems - queue_adapter_kwargs (dict/None): keyword arguments for the submit_job() function of the queue adapter - """ - future_dict = {} - interface = interface_bootup( - command_lst=get_pool_command(cores_total=cores, ranks_per_task=1)[0], - cwd=cwd, - cores=cores, - gpus_per_core=gpus_per_task, - oversubscribe=oversubscribe, - enable_flux_backend=enable_flux_backend, - enable_slurm_backend=enable_slurm_backend, - queue_adapter=queue_adapter, - queue_adapter_kwargs=queue_adapter_kwargs, - ) - _execute_serial_tasks_loop( - interface=interface, - future_queue=future_queue, - future_dict=future_dict, - sleep_interval=sleep_interval, - ) - - -def get_pool_command(cores_total, ranks_per_task=1): - executable = os.path.abspath( - os.path.join(__file__, "..", "..", "backend", "mpipool.py") - ) - if ranks_per_task == 1: - command_lst = [sys.executable, "-m", "mpi4py.futures", executable] - cores = cores_total - else: - # Running MPI parallel tasks within the map() requires mpi4py to use mpi spawn: - # https://github.com/mpi4py/mpi4py/issues/324 - command_lst = [sys.executable, executable] - cores = 1 - command_lst += [ - "--cores-per-task", - str(ranks_per_task), - "--cores-total", - str(cores_total), - ] - return command_lst, cores - - -def _execute_serial_tasks_loop( - interface, future_queue, future_dict, sleep_interval=0.1 -): - while True: - try: - task_dict = future_queue.get_nowait() - except queue.Empty: - pass - else: - if "shutdown" in task_dict.keys() and task_dict["shutdown"]: - done_dict = interface.shutdown(wait=task_dict["wait"]) - if isinstance(done_dict, dict): - for k, v in done_dict.items(): - if k in future_dict.keys() and not future_dict[k].cancelled(): - future_dict.pop(k).set_result(v) - future_queue.task_done() - break - elif "fn" in task_dict.keys() and "future" in task_dict.keys(): - f = task_dict.pop("future") - future_hash = interface.send_and_receive_dict(input_dict=task_dict) - future_dict[future_hash] = f - future_queue.task_done() - _update_future_dict( - interface=interface, future_dict=future_dict, sleep_interval=sleep_interval - ) - - -def _update_future_dict(interface, future_dict, sleep_interval=0.1): - time.sleep(sleep_interval) - hash_to_update = [h for h, f in future_dict.items() if not f.done()] - hash_to_cancel = [h for h, f in future_dict.items() if f.cancelled()] - if len(hash_to_update) > 0: - for k, v in interface.send_and_receive_dict( - input_dict={"update": hash_to_update} - ).items(): - future_dict.pop(k).set_result(v) - if len(hash_to_cancel) > 0 and interface.send_and_receive_dict( - input_dict={"cancel": hash_to_cancel} - ): - for h in hash_to_cancel: - del future_dict[h] diff --git a/tests/test_communicator_split.py b/tests/test_communicator_split.py deleted file mode 100644 index 9e09f65b..00000000 --- a/tests/test_communicator_split.py +++ /dev/null @@ -1,66 +0,0 @@ -import unittest -from pympipool import MPISpawnPool - - -def get_ranks(input_parameter, comm=None): - from mpi4py import MPI - size = MPI.COMM_WORLD.Get_size() - rank = MPI.COMM_WORLD.Get_rank() - if comm is not None: - size_new = comm.Get_size() - rank_new = comm.Get_rank() - else: - size_new = 0 - rank_new = 0 - return size, rank, size_new, rank_new, input_parameter - - -def get_ranks_multi_input(input_parameter1, input_parameter2, comm=None): - from mpi4py import MPI - size = MPI.COMM_WORLD.Get_size() - rank = MPI.COMM_WORLD.Get_rank() - if comm is not None: - size_new = comm.Get_size() - rank_new = comm.Get_rank() - else: - size_new = 0 - rank_new = 0 - return size, rank, size_new, rank_new, input_parameter1, input_parameter2 - - -class TestCommunicator(unittest.TestCase): - def test_map_serial(self): - with MPISpawnPool(max_ranks=2, ranks_per_task=1) as p: - output = p.map(func=get_ranks, iterable=[1, 2, 3]) - self.assertEqual(output[0], (2, 1, 0, 0, 1)) - self.assertEqual(output[1], (2, 1, 0, 0, 2)) - self.assertEqual(output[2], (2, 1, 0, 0, 3)) - - def test_map_parallel(self): - with MPISpawnPool(max_ranks=2, ranks_per_task=2) as p: - output = p.map(func=get_ranks, iterable=[1, 2, 3, 4]) - self.assertEqual(output[0][::2], (2, 2, 1)) - self.assertEqual(output[1][::2], (2, 2, 2)) - self.assertEqual(output[2][::2], (2, 2, 3)) - self.assertEqual(output[3][::2], (2, 2, 4)) - - def test_starmap_serial(self): - with MPISpawnPool(max_ranks=2, ranks_per_task=1) as p: - output = p.starmap( - func=get_ranks_multi_input, - iterable=[[1, 1], [2, 2], [3, 3]] - ) - self.assertEqual(output[0], (2, 1, 0, 0, 1, 1)) - self.assertEqual(output[1], (2, 1, 0, 0, 2, 2)) - self.assertEqual(output[2], (2, 1, 0, 0, 3, 3)) - - def test_starmap_parallel(self): - with MPISpawnPool(max_ranks=2, ranks_per_task=2) as p: - output = p.starmap( - func=get_ranks_multi_input, - iterable=[[1, 1], [2, 2], [3, 3], [4, 4]] - ) - self.assertEqual(output[0][::2], (2, 2, 1)) - self.assertEqual(output[1][::2], (2, 2, 2)) - self.assertEqual(output[2][::2], (2, 2, 3)) - self.assertEqual(output[3][::2], (2, 2, 4)) diff --git a/tests/test_executor.py b/tests/test_executor.py index e87a742d..1c39c79a 100644 --- a/tests/test_executor.py +++ b/tests/test_executor.py @@ -1,6 +1,4 @@ import unittest -from concurrent.futures import ThreadPoolExecutor -from pympipool.legacy.shared.backend import map_funct, parse_socket_communication from pympipool.shared.backend import call_funct @@ -9,131 +7,6 @@ def function_multi_args(a, b): class TestExecutor(unittest.TestCase): - def test_exec_funct_single_core_map(self): - with ThreadPoolExecutor(max_workers=1) as executor: - output = map_funct( - executor=executor, - funct=sum, - lst=[[1, 1], [2, 2]], - cores_per_task=1, - chunksize=1, - ) - self.assertEqual(output, [2, 4]) - - def test_exec_funct_single_core_starmap(self): - with self.assertRaises(AttributeError): - with ThreadPoolExecutor(max_workers=1) as executor: - map_funct( - executor=executor, - funct=sum, - lst=[[1, 1], [2, 2]], - cores_per_task=1, - chunksize=1, - map_flag=False - ) - - def test_parse_socket_communication_close(self): - with ThreadPoolExecutor(max_workers=1) as executor: - output = parse_socket_communication( - executor=executor, - input_dict={"shutdown": True, "wait": True}, - future_dict={}, - cores_per_task=1 - ) - self.assertEqual(output, {"exit": True}) - - def test_parse_socket_communication_execute(self): - with ThreadPoolExecutor(max_workers=1) as executor: - output = parse_socket_communication( - executor=executor, - input_dict={"fn": sum, "iterable": [[1, 1]], "chunksize": 1, "map": True}, - future_dict={}, - cores_per_task=1 - ) - self.assertEqual(output, {"result": [2]}) - - def test_parse_socket_communication_error(self): - with ThreadPoolExecutor(max_workers=1) as executor: - output = parse_socket_communication( - executor=executor, - input_dict={"fn": sum, "iterable": [["a", "b"]], "chunksize": 1, "map": True}, - future_dict={}, - cores_per_task=1 - ) - self.assertEqual(output["error_type"], "") - - def test_parse_socket_communication_submit_args(self): - future_dict = {} - with ThreadPoolExecutor(max_workers=1) as executor: - output = parse_socket_communication( - executor=executor, - input_dict={"fn": sum, "args": [[1, 1]], "kwargs": {}}, - future_dict=future_dict, - cores_per_task=1 - ) - future = future_dict[output['result']] - self.assertEqual(future.result(), 2) - - def test_parse_socket_communication_submit_kwargs(self): - future_dict = {} - with ThreadPoolExecutor(max_workers=1) as executor: - output = parse_socket_communication( - executor=executor, - input_dict={"fn": function_multi_args, "args": (), "kwargs": {"a": 1, "b": 2}}, - future_dict=future_dict, - cores_per_task=1 - ) - future = future_dict[output['result']] - self.assertEqual(future.result(), 3) - - def test_parse_socket_communication_submit_both(self): - future_dict = {} - with ThreadPoolExecutor(max_workers=1) as executor: - output = parse_socket_communication( - executor=executor, - input_dict={"fn": function_multi_args, "args": [1], "kwargs": {"b": 2}}, - future_dict=future_dict, - cores_per_task=1 - ) - future = future_dict[output['result']] - self.assertEqual(future.result(), 3) - - def test_parse_socket_communication_update(self): - future_dict = {} - with ThreadPoolExecutor(max_workers=1) as executor: - output = parse_socket_communication( - executor=executor, - input_dict={"fn": sum, "args": [[1, 1]], "kwargs": {}}, - future_dict=future_dict, - cores_per_task=1 - ) - future_hash = output["result"] - result = parse_socket_communication( - executor=executor, - input_dict={"update": [future_hash]}, - future_dict=future_dict, - cores_per_task=1 - ) - self.assertEqual(result, {"result": {future_hash: 2}}) - - def test_parse_socket_communication_cancel(self): - future_dict = {} - with ThreadPoolExecutor(max_workers=1) as executor: - output = parse_socket_communication( - executor=executor, - input_dict={"fn": sum, "args": [[1, 1]], "kwargs": {}}, - future_dict=future_dict, - cores_per_task=1 - ) - future_hash = output["result"] - result = parse_socket_communication( - executor=executor, - input_dict={"cancel": [future_hash]}, - future_dict=future_dict, - cores_per_task=1 - ) - self.assertEqual(result, {"result": True}) - def test_funct_call_default(self): self.assertEqual(call_funct(input_dict={ "fn": sum, diff --git a/tests/test_multitask.py b/tests/test_multitask.py deleted file mode 100644 index 9b5ea46a..00000000 --- a/tests/test_multitask.py +++ /dev/null @@ -1,99 +0,0 @@ -import numpy as np -import unittest -from queue import Queue -from time import sleep -from pympipool import PoolExecutor -from pympipool.legacy.shared.interface import execute_serial_tasks -from pympipool.shared.taskexecutor import cloudpickle_register -from concurrent.futures import Future - - -def calc(i): - return np.array(i ** 2) - - -def sleep_one(i): - sleep(1) - return i - - -def wait_and_calc(n): - sleep(1) - return n ** 2 - - -def call_back(future): - global_lst.append(future.result()) - - -global_lst = [] - - -class TestFuturePool(unittest.TestCase): - def test_pool_serial(self): - with PoolExecutor(max_workers=1) as p: - output = p.submit(calc, i=2) - self.assertEqual(len(p), 1) - self.assertTrue(isinstance(output, Future)) - self.assertFalse(output.done()) - sleep(1) - self.assertTrue(output.done()) - self.assertEqual(len(p), 0) - self.assertEqual(output.result(), np.array(4)) - - def test_execute_task(self): - f = Future() - q = Queue() - q.put({"fn": calc, 'args': (), "kwargs": {"i": 2}, "future": f}) - q.put({"shutdown": True, "wait": True}) - cloudpickle_register(ind=1) - execute_serial_tasks( - future_queue=q, - cores=1, - oversubscribe=False, - enable_flux_backend=False - ) - self.assertEqual(f.result(), np.array(4)) - q.join() - - def test_pool_cancel(self): - with PoolExecutor(max_workers=2, sleep_interval=0) as p: - fs1 = p.submit(sleep_one, i=2) - fs2 = p.submit(sleep_one, i=2) - fs3 = p.submit(sleep_one, i=2) - fs4 = p.submit(sleep_one, i=2) - sleep(1) - fs1.cancel() - fs2.cancel() - fs3.cancel() - fs4.cancel() - self.assertTrue(fs1.done()) - self.assertTrue(fs2.done()) - self.assertTrue(fs3.done()) - self.assertTrue(fs4.done()) - - def test_cancel_task(self): - fs1 = Future() - fs1.cancel() - q = Queue() - q.put({"fn": sleep_one, 'args': (), "kwargs": {"i": 1}, "future": fs1}) - q.put({"shutdown": True, "wait": True}) - cloudpickle_register(ind=1) - execute_serial_tasks( - future_queue=q, - cores=1, - oversubscribe=False, - enable_flux_backend=False - ) - self.assertTrue(fs1.done()) - self.assertTrue(fs1.cancelled()) - q.join() - - def test_waiting(self): - exe = PoolExecutor(max_workers=2) - f1 = exe.submit(wait_and_calc, 42) - f2 = exe.submit(wait_and_calc, 84) - f1.add_done_callback(call_back) - f2.add_done_callback(call_back) - exe.shutdown(wait=True) - self.assertTrue([42**2, 84**2], global_lst) diff --git a/tests/test_parse_legacy.py b/tests/test_parse_legacy.py deleted file mode 100644 index 2e35e148..00000000 --- a/tests/test_parse_legacy.py +++ /dev/null @@ -1,76 +0,0 @@ -import unittest -import os -import sys - -from pympipool.legacy.shared.backend import parse_arguments -from pympipool.shared.connections import MpiExecInterface, FluxCmdInterface - - -class TestParser(unittest.TestCase): - def test_command_local(self): - result_dict = { - 'host': 'localhost', - 'total_cores': '2', - 'zmqport': '22', - 'cores_per_task': '1' - } - command_lst = [ - 'mpiexec', - '-n', result_dict['total_cores'], - '--oversubscribe', - sys.executable, '-m', 'mpi4py.futures', '/', - '--zmqport', result_dict['zmqport'], - '--cores-per-task', result_dict['cores_per_task'], - '--cores-total', result_dict['total_cores'] - ] - interface = MpiExecInterface( - cwd=None, - cores=2, - gpus_per_core=0, - oversubscribe=True - ) - self.assertEqual( - command_lst, - interface.generate_command( - command_lst=[ - sys.executable, '-m', 'mpi4py.futures', '/', - '--zmqport', result_dict['zmqport'], - '--cores-per-task', '1', '--cores-total', '2' - ] - ) - ) - self.assertEqual(result_dict, parse_arguments(command_lst)) - - def test_command_flux(self): - result_dict = { - 'host': "127.0.0.1", - 'total_cores': '2', - 'zmqport': '22', - 'cores_per_task': '2' - } - command_lst = [ - 'flux', 'run', '-n', '1', - "--cwd=" + os.path.abspath("."), - sys.executable, '/', - '--host', result_dict['host'], - '--zmqport', result_dict['zmqport'], - '--cores-per-task', result_dict['cores_per_task'], - '--cores-total', result_dict['total_cores'] - ] - interface = FluxCmdInterface( - cwd=os.path.abspath("."), - cores=1, - gpus_per_core=0, - oversubscribe=False - ) - self.assertEqual( - command_lst, - interface.generate_command( - command_lst=[ - sys.executable, '/', '--host', result_dict['host'], - '--zmqport', result_dict['zmqport'], - '--cores-per-task', '2', '--cores-total', '2' - ] - ) - ) - self.assertEqual(result_dict, parse_arguments(command_lst)) diff --git a/tests/test_pool.py b/tests/test_pool.py deleted file mode 100644 index f5a6e56c..00000000 --- a/tests/test_pool.py +++ /dev/null @@ -1,76 +0,0 @@ -import numpy as np -import unittest -from pympipool import Pool - - -def calc(i): - return np.array(i ** 2) - - -def calc_add(i, j): - return i + j - - -def calc_none(i): - return None - - -def calc_error_value_error(i): - raise ValueError("calc_error value error") - - -def calc_error_type_error(i): - raise TypeError("calc_error value error") - - -class TestPool(unittest.TestCase): - def test_map_serial(self): - with Pool(max_workers=1) as p: - output = p.map(func=calc, iterable=[1, 2, 3, 4]) - self.assertEqual(output[0], 1) - self.assertEqual(output[1], 4) - self.assertEqual(output[2], 9) - self.assertEqual(output[3], 16) - - def test_starmap_serial(self): - with Pool(max_workers=1) as p: - output = p.starmap(func=calc_add, iterable=[[1, 2], [3, 4]]) - self.assertEqual(output[0], 3) - self.assertEqual(output[1], 7) - - def test_map_parallel(self): - with Pool(max_workers=2) as p: - output = p.map(func=calc, iterable=[1, 2, 3, 4]) - self.assertEqual(output[0], 1) - self.assertEqual(output[1], 4) - self.assertEqual(output[2], 9) - self.assertEqual(output[3], 16) - - def test_starmap_parallel(self): - with Pool(max_workers=2) as p: - output = p.starmap(func=calc_add, iterable=[[1, 2], [3, 4]]) - self.assertEqual(output[0], 3) - self.assertEqual(output[1], 7) - - def test_pool_none(self): - with Pool(max_workers=2) as p: - output = p.map(func=calc_none, iterable=[1, 2, 3, 4]) - self.assertEqual(output, [None, None, None, None]) - - def test_pool_error(self): - with self.assertRaises(ValueError): - with Pool(max_workers=2) as p: - p.map(func=calc_error_value_error, iterable=[1, 2, 3, 4]) - with self.assertRaises(TypeError): - with Pool(max_workers=2) as p: - p.map(func=calc_error_type_error, iterable=[1, 2, 3, 4]) - - def test_shutdown(self): - p = Pool(max_workers=1) - output = p.map(func=calc, iterable=[1, 2, 3, 4]) - p.shutdown(wait=True) - p.shutdown(wait=False) - self.assertEqual(output[0], 1) - self.assertEqual(output[1], 4) - self.assertEqual(output[2], 9) - self.assertEqual(output[3], 16)