diff --git a/executorlib/interactive/blockallocation.py b/executorlib/interactive/blockallocation.py new file mode 100644 index 00000000..fb1e79e4 --- /dev/null +++ b/executorlib/interactive/blockallocation.py @@ -0,0 +1,145 @@ +from concurrent.futures import Future +from threading import Thread +from typing import Callable, Optional + +from executorlib.base.executor import ExecutorBase, cancel_items_in_queue +from executorlib.interactive.shared import execute_tasks +from executorlib.standalone.inputcheck import ( + check_resource_dict, + check_resource_dict_is_empty, +) +from executorlib.standalone.interactive.spawner import BaseSpawner, MpiExecSpawner + + +class BlockAllocationExecutor(ExecutorBase): + """ + The executorlib.interactive.executor.InteractiveExecutor leverages the exeutorlib interfaces to distribute python + tasks on a workstation or inside a queuing system allocation. In contrast to the mpi4py.futures.MPIPoolExecutor the + executorlib.interactive.executor.InteractiveExecutor can be executed in a serial python process and does not require + the python script to be executed with MPI. Consequently, it is primarily an abstraction of its functionality to + improves the usability in particular when used in combination with Jupyter notebooks. + + Args: + max_workers (int): defines the number workers which can execute functions in parallel + executor_kwargs (dict): keyword arguments for the executor + spawner (BaseSpawner): interface class to initiate python processes + + Examples: + + >>> import numpy as np + >>> from executorlib.interactive.shared import BlockAllocationExecutor + >>> + >>> def calc(i, j, k): + >>> from mpi4py import MPI + >>> size = MPI.COMM_WORLD.Get_size() + >>> rank = MPI.COMM_WORLD.Get_rank() + >>> return np.array([i, j, k]), size, rank + >>> + >>> def init_k(): + >>> return {"k": 3} + >>> + >>> with BlockAllocationExecutor(max_workers=2, executor_kwargs={"init_function": init_k}) as p: + >>> fs = p.submit(calc, 2, j=4) + >>> print(fs.result()) + [(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)] + + """ + + def __init__( + self, + max_workers: int = 1, + executor_kwargs: Optional[dict] = None, + spawner: type[BaseSpawner] = MpiExecSpawner, + ): + if executor_kwargs is None: + executor_kwargs = {} + super().__init__(max_cores=executor_kwargs.get("max_cores")) + executor_kwargs["future_queue"] = self._future_queue + executor_kwargs["spawner"] = spawner + executor_kwargs["queue_join_on_shutdown"] = False + self._process_kwargs = executor_kwargs + self._set_process( + process=[ + Thread( + target=execute_tasks, + kwargs=executor_kwargs, + ) + for _ in range(max_workers) + ], + ) + + def submit( # type: ignore + self, fn: Callable, *args, resource_dict: Optional[dict] = None, **kwargs + ) -> Future: + """ + Submits a callable to be executed with the given arguments. + + Schedules the callable to be executed as fn(*args, **kwargs) and returns + a Future instance representing the execution of the callable. + + Args: + fn (Callable): function to submit for execution + args: arguments for the submitted function + kwargs: keyword arguments for the submitted function + resource_dict (dict): resource dictionary, which defines the resources used for the execution of the + function. Example resource dictionary: { + cores: 1, + threads_per_core: 1, + gpus_per_worker: 0, + oversubscribe: False, + cwd: None, + executor: None, + hostname_localhost: False, + } + + Returns: + Future: A Future representing the given call. + """ + if resource_dict is None: + resource_dict = {} + check_resource_dict_is_empty(resource_dict=resource_dict) + check_resource_dict(function=fn) + f: Future = Future() + if self._future_queue is not None: + self._future_queue.put( + {"fn": fn, "args": args, "kwargs": kwargs, "future": f} + ) + return f + + def shutdown(self, wait: bool = True, *, cancel_futures: bool = False): + """Clean-up the resources associated with the Executor. + + It is safe to call this method several times. Otherwise, no other + methods can be called after this one. + + Args: + wait: If True then shutdown will not return until all running + futures have finished executing and the resources used by the + parallel_executors have been reclaimed. + cancel_futures: If True then shutdown will cancel all pending + futures. Futures that are completed or running will not be + cancelled. + """ + if self._future_queue is not None: + if cancel_futures: + cancel_items_in_queue(que=self._future_queue) + if isinstance(self._process, list): + for _ in range(len(self._process)): + self._future_queue.put({"shutdown": True, "wait": wait}) + if wait: + for process in self._process: + process.join() + self._future_queue.join() + self._process = None + self._future_queue = None + + def _set_process(self, process: list[Thread]): # type: ignore + """ + Set the process for the executor. + + Args: + process (List[RaisingThread]): The process for the executor. + """ + self._process = process + for process_instance in self._process: + process_instance.start() diff --git a/executorlib/interactive/executor.py b/executorlib/interactive/dependency.py similarity index 51% rename from executorlib/interactive/executor.py rename to executorlib/interactive/dependency.py index 105799b8..9cbbfb35 100644 --- a/executorlib/interactive/executor.py +++ b/executorlib/interactive/dependency.py @@ -1,9 +1,16 @@ +import queue from concurrent.futures import Future from threading import Thread +from time import sleep from typing import Any, Callable, Optional from executorlib.base.executor import ExecutorBase -from executorlib.interactive.shared import execute_tasks_with_dependencies +from executorlib.standalone.interactive.arguments import ( + check_exception_was_raised, + get_exception_lst, + get_future_objects_from_input, + update_futures_in_input, +) from executorlib.standalone.plot import ( draw, generate_nodes_and_edges, @@ -11,7 +18,7 @@ ) -class ExecutorWithDependencies(ExecutorBase): +class DependencyExecutor(ExecutorBase): """ ExecutorWithDependencies is a class that extends ExecutorBase and provides functionality for executing tasks with dependencies. @@ -20,8 +27,6 @@ class ExecutorWithDependencies(ExecutorBase): refresh_rate (float, optional): The refresh rate for updating the executor queue. Defaults to 0.01. plot_dependency_graph (bool, optional): Whether to generate and plot the dependency graph. Defaults to False. plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. - *args: Variable length argument list. - **kwargs: Arbitrary keyword arguments. Attributes: _future_hash_dict (Dict[str, Future]): A dictionary mapping task hash to future object. @@ -48,7 +53,7 @@ def __init__( } self._set_process( Thread( - target=execute_tasks_with_dependencies, + target=_execute_tasks_with_dependencies, kwargs=self._process_kwargs, ) ) @@ -132,3 +137,93 @@ def __exit__( edge_lst=edge_lst, filename=self._plot_dependency_graph_filename, ) + + +def _execute_tasks_with_dependencies( + future_queue: queue.Queue, + executor_queue: queue.Queue, + executor: ExecutorBase, + refresh_rate: float = 0.01, +): + """ + Resolve the dependencies of multiple tasks, by analysing which task requires concurrent.future.Futures objects from + other tasks. + + Args: + future_queue (Queue): Queue for receiving new tasks. + executor_queue (Queue): Queue for the internal executor. + executor (ExecutorBase): Executor to execute the tasks with after the dependencies are resolved. + refresh_rate (float): Set the refresh rate in seconds, how frequently the input queue is checked. + """ + wait_lst = [] + while True: + try: + task_dict = future_queue.get_nowait() + except queue.Empty: + task_dict = None + if ( # shutdown the executor + task_dict is not None and "shutdown" in task_dict and task_dict["shutdown"] + ): + executor.shutdown(wait=task_dict["wait"]) + future_queue.task_done() + future_queue.join() + break + elif ( # handle function submitted to the executor + task_dict is not None and "fn" in task_dict and "future" in task_dict + ): + future_lst, ready_flag = get_future_objects_from_input( + args=task_dict["args"], kwargs=task_dict["kwargs"] + ) + exception_lst = get_exception_lst(future_lst=future_lst) + if not check_exception_was_raised(future_obj=task_dict["future"]): + if len(exception_lst) > 0: + task_dict["future"].set_exception(exception_lst[0]) + elif len(future_lst) == 0 or ready_flag: + # No future objects are used in the input or all future objects are already done + task_dict["args"], task_dict["kwargs"] = update_futures_in_input( + args=task_dict["args"], kwargs=task_dict["kwargs"] + ) + executor_queue.put(task_dict) + else: # Otherwise add the function to the wait list + task_dict["future_lst"] = future_lst + wait_lst.append(task_dict) + future_queue.task_done() + elif len(wait_lst) > 0: + number_waiting = len(wait_lst) + # Check functions in the wait list and execute them if all future objects are now ready + wait_lst = _update_waiting_task( + wait_lst=wait_lst, executor_queue=executor_queue + ) + # if no job is ready, sleep for a moment + if len(wait_lst) == number_waiting: + sleep(refresh_rate) + else: + # If there is nothing else to do, sleep for a moment + sleep(refresh_rate) + + +def _update_waiting_task(wait_lst: list[dict], executor_queue: queue.Queue) -> list: + """ + Submit the waiting tasks, which future inputs have been completed, to the executor + + Args: + wait_lst (list): List of waiting tasks + executor_queue (Queue): Queue of the internal executor + + Returns: + list: list tasks which future inputs have not been completed + """ + wait_tmp_lst = [] + for task_wait_dict in wait_lst: + exception_lst = get_exception_lst(future_lst=task_wait_dict["future_lst"]) + if len(exception_lst) > 0: + task_wait_dict["future"].set_exception(exception_lst[0]) + elif all(future.done() for future in task_wait_dict["future_lst"]): + del task_wait_dict["future_lst"] + task_wait_dict["args"], task_wait_dict["kwargs"] = update_futures_in_input( + args=task_wait_dict["args"], kwargs=task_wait_dict["kwargs"] + ) + executor_queue.put(task_wait_dict) + else: + wait_tmp_lst.append(task_wait_dict) + return wait_tmp_lst diff --git a/executorlib/interactive/flux.py b/executorlib/interactive/fluxspawner.py similarity index 100% rename from executorlib/interactive/flux.py rename to executorlib/interactive/fluxspawner.py diff --git a/executorlib/interactive/onetoone.py b/executorlib/interactive/onetoone.py new file mode 100644 index 00000000..ca38ca52 --- /dev/null +++ b/executorlib/interactive/onetoone.py @@ -0,0 +1,222 @@ +import queue +from threading import Thread +from typing import Optional + +from executorlib.base.executor import ExecutorBase +from executorlib.interactive.shared import execute_tasks +from executorlib.standalone.interactive.spawner import BaseSpawner, MpiExecSpawner + + +class OneTaskPerProcessExecutor(ExecutorBase): + """ + The executorlib.interactive.executor.InteractiveStepExecutor leverages the executorlib interfaces to distribute python + tasks. In contrast to the mpi4py.futures.MPIPoolExecutor the executorlib.interactive.executor.InteractiveStepExecutor + can be executed in a serial python process and does not require the python script to be executed with MPI. + Consequently, it is primarily an abstraction of its functionality to improve the usability in particular when used + in combination with Jupyter notebooks. + + Args: + max_cores (int): defines the number workers which can execute functions in parallel + executor_kwargs (dict): keyword arguments for the executor + spawner (BaseSpawner): interface class to initiate python processes + + Examples: + + >>> import numpy as np + >>> from executorlib.interactive.onetoone import OneTaskPerProcessExecutor + >>> + >>> def calc(i, j, k): + >>> from mpi4py import MPI + >>> size = MPI.COMM_WORLD.Get_size() + >>> rank = MPI.COMM_WORLD.Get_rank() + >>> return np.array([i, j, k]), size, rank + >>> + >>> with OneTaskPerProcessExecutor(max_cores=2) as p: + >>> fs = p.submit(calc, 2, j=4, k=3, resource_dict={"cores": 2}) + >>> print(fs.result()) + + [(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)] + + """ + + def __init__( + self, + max_cores: Optional[int] = None, + max_workers: Optional[int] = None, + executor_kwargs: Optional[dict] = None, + spawner: type[BaseSpawner] = MpiExecSpawner, + ): + if executor_kwargs is None: + executor_kwargs = {} + super().__init__(max_cores=executor_kwargs.get("max_cores")) + executor_kwargs.update( + { + "future_queue": self._future_queue, + "spawner": spawner, + "max_cores": max_cores, + "max_workers": max_workers, + } + ) + self._process_kwargs = executor_kwargs + self._set_process( + Thread( + target=_execute_task_in_separate_process, + kwargs=executor_kwargs, + ) + ) + + +def _execute_task_in_separate_process( + future_queue: queue.Queue, + spawner: type[BaseSpawner] = MpiExecSpawner, + max_cores: Optional[int] = None, + max_workers: Optional[int] = None, + hostname_localhost: Optional[bool] = None, + **kwargs, +): + """ + Execute a single tasks in parallel using the message passing interface (MPI). + + Args: + future_queue (queue.Queue): task queue of dictionary objects which are submitted to the parallel process + spawner (BaseSpawner): Interface to start process on selected compute resources + max_cores (int): defines the number cores which can be used in parallel + max_workers (int): for backwards compatibility with the standard library, max_workers also defines the number of + cores which can be used in parallel - just like the max_cores parameter. Using max_cores is + recommended, as computers have a limited number of compute cores. + hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the + context of an HPC cluster this essential to be able to communicate to an + Executor running on a different compute node within the same allocation. And + in principle any computer should be able to resolve that their own hostname + points to the same address as localhost. Still MacOS >= 12 seems to disable + this look up for security reasons. So on MacOS it is required to set this + option to true + """ + active_task_dict: dict = {} + process_lst: list = [] + qtask_lst: list = [] + if "cores" not in kwargs: + kwargs["cores"] = 1 + while True: + task_dict = future_queue.get() + if "shutdown" in task_dict and task_dict["shutdown"]: + if task_dict["wait"]: + _ = [process.join() for process in process_lst] + future_queue.task_done() + future_queue.join() + break + elif "fn" in task_dict and "future" in task_dict: + qtask: queue.Queue = queue.Queue() + process, active_task_dict = _wrap_execute_task_in_separate_process( + task_dict=task_dict, + qtask=qtask, + active_task_dict=active_task_dict, + spawner=spawner, + executor_kwargs=kwargs, + max_cores=max_cores, + max_workers=max_workers, + hostname_localhost=hostname_localhost, + ) + qtask_lst.append(qtask) + process_lst.append(process) + future_queue.task_done() + + +def _wait_for_free_slots( + active_task_dict: dict, + cores_requested: int, + max_cores: Optional[int] = None, + max_workers: Optional[int] = None, +) -> dict: + """ + Wait for available computing resources to become available. + + Args: + active_task_dict (dict): Dictionary containing the future objects and the number of cores they require + cores_requested (int): Number of cores required for executing the next task + max_cores (int): Maximum number cores which can be used + max_workers (int): for backwards compatibility with the standard library, max_workers also defines the number of + cores which can be used in parallel - just like the max_cores parameter. Using max_cores is + recommended, as computers have a limited number of compute cores. + + Returns: + dict: Dictionary containing the future objects and the number of cores they require + """ + if max_cores is not None: + while sum(active_task_dict.values()) + cores_requested > max_cores: + active_task_dict = { + k: v for k, v in active_task_dict.items() if not k.done() + } + elif max_workers is not None and max_cores is None: + while len(active_task_dict.values()) + 1 > max_workers: + active_task_dict = { + k: v for k, v in active_task_dict.items() if not k.done() + } + return active_task_dict + + +def _wrap_execute_task_in_separate_process( + task_dict: dict, + active_task_dict: dict, + qtask: queue.Queue, + spawner: type[BaseSpawner], + executor_kwargs: dict, + max_cores: Optional[int] = None, + max_workers: Optional[int] = None, + hostname_localhost: Optional[bool] = None, +): + """ + Submit function to be executed in separate Python process + Args: + task_dict (dict): task submitted to the executor as dictionary. This dictionary has the following keys + {"fn": Callable, "args": (), "kwargs": {}, "resource_dict": {}} + active_task_dict (dict): Dictionary containing the future objects and the number of cores they require + qtask (queue.Queue): Queue to communicate with the thread linked to the process executing the python function + spawner (BaseSpawner): Interface to start process on selected compute resources + executor_kwargs (dict): keyword parameters used to initialize the Executor + max_cores (int): defines the number cores which can be used in parallel + max_workers (int): for backwards compatibility with the standard library, max_workers also defines the number of + cores which can be used in parallel - just like the max_cores parameter. Using max_cores is + recommended, as computers have a limited number of compute cores. + hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the + context of an HPC cluster this essential to be able to communicate to an + Executor running on a different compute node within the same allocation. And + in principle any computer should be able to resolve that their own hostname + points to the same address as localhost. Still MacOS >= 12 seems to disable + this look up for security reasons. So on MacOS it is required to set this + option to true + Returns: + RaisingThread, dict: thread for communicating with the python process which is executing the function and + dictionary containing the future objects and the number of cores they require + """ + resource_dict = task_dict.pop("resource_dict").copy() + qtask.put(task_dict) + qtask.put({"shutdown": True, "wait": True}) + if "cores" not in resource_dict or ( + resource_dict["cores"] == 1 and executor_kwargs["cores"] >= 1 + ): + resource_dict["cores"] = executor_kwargs["cores"] + slots_required = resource_dict["cores"] * resource_dict.get("threads_per_core", 1) + active_task_dict = _wait_for_free_slots( + active_task_dict=active_task_dict, + cores_requested=slots_required, + max_cores=max_cores, + max_workers=max_workers, + ) + active_task_dict[task_dict["future"]] = slots_required + task_kwargs = executor_kwargs.copy() + task_kwargs.update(resource_dict) + task_kwargs.update( + { + "future_queue": qtask, + "spawner": spawner, + "hostname_localhost": hostname_localhost, + "init_function": None, + } + ) + process = Thread( + target=execute_tasks, + kwargs=task_kwargs, + ) + process.start() + return process, active_task_dict diff --git a/executorlib/interactive/shared.py b/executorlib/interactive/shared.py index 4078e89b..8c134bba 100644 --- a/executorlib/interactive/shared.py +++ b/executorlib/interactive/shared.py @@ -3,23 +3,9 @@ import queue import sys import time -from concurrent.futures import Future -from threading import Thread -from time import sleep from typing import Callable, Optional -from executorlib.base.executor import ExecutorBase, cancel_items_in_queue from executorlib.standalone.command import get_command_path -from executorlib.standalone.inputcheck import ( - check_resource_dict, - check_resource_dict_is_empty, -) -from executorlib.standalone.interactive.arguments import ( - check_exception_was_raised, - get_exception_lst, - get_future_objects_from_input, - update_futures_in_input, -) from executorlib.standalone.interactive.communication import ( SocketInterface, interface_bootup, @@ -28,202 +14,7 @@ from executorlib.standalone.serialize import serialize_funct_h5 -class ExecutorBroker(ExecutorBase): - def submit( # type: ignore - self, fn: Callable, *args, resource_dict: Optional[dict] = None, **kwargs - ) -> Future: - """ - Submits a callable to be executed with the given arguments. - - Schedules the callable to be executed as fn(*args, **kwargs) and returns - a Future instance representing the execution of the callable. - - Args: - fn (Callable): function to submit for execution - args: arguments for the submitted function - kwargs: keyword arguments for the submitted function - resource_dict (dict): resource dictionary, which defines the resources used for the execution of the - function. Example resource dictionary: { - cores: 1, - threads_per_core: 1, - gpus_per_worker: 0, - oversubscribe: False, - cwd: None, - executor: None, - hostname_localhost: False, - } - - Returns: - Future: A Future representing the given call. - """ - if resource_dict is None: - resource_dict = {} - check_resource_dict_is_empty(resource_dict=resource_dict) - check_resource_dict(function=fn) - f: Future = Future() - if self._future_queue is not None: - self._future_queue.put( - {"fn": fn, "args": args, "kwargs": kwargs, "future": f} - ) - return f - - def shutdown(self, wait: bool = True, *, cancel_futures: bool = False): - """Clean-up the resources associated with the Executor. - - It is safe to call this method several times. Otherwise, no other - methods can be called after this one. - - Args: - wait: If True then shutdown will not return until all running - futures have finished executing and the resources used by the - parallel_executors have been reclaimed. - cancel_futures: If True then shutdown will cancel all pending - futures. Futures that are completed or running will not be - cancelled. - """ - if self._future_queue is not None: - if cancel_futures: - cancel_items_in_queue(que=self._future_queue) - if isinstance(self._process, list): - for _ in range(len(self._process)): - self._future_queue.put({"shutdown": True, "wait": wait}) - if wait: - for process in self._process: - process.join() - self._future_queue.join() - self._process = None - self._future_queue = None - - def _set_process(self, process: list[Thread]): # type: ignore - """ - Set the process for the executor. - - Args: - process (List[RaisingThread]): The process for the executor. - """ - self._process = process - for process_instance in self._process: - process_instance.start() - - -class InteractiveExecutor(ExecutorBroker): - """ - The executorlib.interactive.executor.InteractiveExecutor leverages the exeutorlib interfaces to distribute python - tasks on a workstation or inside a queuing system allocation. In contrast to the mpi4py.futures.MPIPoolExecutor the - executorlib.interactive.executor.InteractiveExecutor can be executed in a serial python process and does not require - the python script to be executed with MPI. Consequently, it is primarily an abstraction of its functionality to - improves the usability in particular when used in combination with Jupyter notebooks. - - Args: - max_workers (int): defines the number workers which can execute functions in parallel - executor_kwargs (dict): keyword arguments for the executor - spawner (BaseSpawner): interface class to initiate python processes - - Examples: - - >>> import numpy as np - >>> from executorlib.interactive.shared import InteractiveExecutor - >>> - >>> def calc(i, j, k): - >>> from mpi4py import MPI - >>> size = MPI.COMM_WORLD.Get_size() - >>> rank = MPI.COMM_WORLD.Get_rank() - >>> return np.array([i, j, k]), size, rank - >>> - >>> def init_k(): - >>> return {"k": 3} - >>> - >>> with InteractiveExecutor(max_workers=2, executor_kwargs={"init_function": init_k}) as p: - >>> fs = p.submit(calc, 2, j=4) - >>> print(fs.result()) - [(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)] - - """ - - def __init__( - self, - max_workers: int = 1, - executor_kwargs: Optional[dict] = None, - spawner: type[BaseSpawner] = MpiExecSpawner, - ): - if executor_kwargs is None: - executor_kwargs = {} - super().__init__(max_cores=executor_kwargs.get("max_cores")) - executor_kwargs["future_queue"] = self._future_queue - executor_kwargs["spawner"] = spawner - executor_kwargs["queue_join_on_shutdown"] = False - self._process_kwargs = executor_kwargs - self._set_process( - process=[ - Thread( - target=execute_parallel_tasks, - kwargs=executor_kwargs, - ) - for _ in range(max_workers) - ], - ) - - -class InteractiveStepExecutor(ExecutorBase): - """ - The executorlib.interactive.executor.InteractiveStepExecutor leverages the executorlib interfaces to distribute python - tasks. In contrast to the mpi4py.futures.MPIPoolExecutor the executorlib.interactive.executor.InteractiveStepExecutor - can be executed in a serial python process and does not require the python script to be executed with MPI. - Consequently, it is primarily an abstraction of its functionality to improve the usability in particular when used - in combination with Jupyter notebooks. - - Args: - max_cores (int): defines the number workers which can execute functions in parallel - executor_kwargs (dict): keyword arguments for the executor - spawner (BaseSpawner): interface class to initiate python processes - - Examples: - - >>> import numpy as np - >>> from executorlib.interactive.shared import InteractiveStepExecutor - >>> - >>> def calc(i, j, k): - >>> from mpi4py import MPI - >>> size = MPI.COMM_WORLD.Get_size() - >>> rank = MPI.COMM_WORLD.Get_rank() - >>> return np.array([i, j, k]), size, rank - >>> - >>> with InteractiveStepExecutor(max_cores=2) as p: - >>> fs = p.submit(calc, 2, j=4, k=3, resource_dict={"cores": 2}) - >>> print(fs.result()) - - [(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)] - - """ - - def __init__( - self, - max_cores: Optional[int] = None, - max_workers: Optional[int] = None, - executor_kwargs: Optional[dict] = None, - spawner: type[BaseSpawner] = MpiExecSpawner, - ): - if executor_kwargs is None: - executor_kwargs = {} - super().__init__(max_cores=executor_kwargs.get("max_cores")) - executor_kwargs.update( - { - "future_queue": self._future_queue, - "spawner": spawner, - "max_cores": max_cores, - "max_workers": max_workers, - } - ) - self._process_kwargs = executor_kwargs - self._set_process( - Thread( - target=execute_separate_tasks, - kwargs=executor_kwargs, - ) - ) - - -def execute_parallel_tasks( +def execute_tasks( future_queue: queue.Queue, cores: int = 1, spawner: type[BaseSpawner] = MpiExecSpawner, @@ -272,7 +63,7 @@ def execute_parallel_tasks( break elif "fn" in task_dict and "future" in task_dict: if cache_directory is None: - _execute_task( + _execute_task_without_cache( interface=interface, task_dict=task_dict, future_queue=future_queue ) else: @@ -284,125 +75,6 @@ def execute_parallel_tasks( ) -def execute_separate_tasks( - future_queue: queue.Queue, - spawner: type[BaseSpawner] = MpiExecSpawner, - max_cores: Optional[int] = None, - max_workers: Optional[int] = None, - hostname_localhost: Optional[bool] = None, - **kwargs, -): - """ - Execute a single tasks in parallel using the message passing interface (MPI). - - Args: - future_queue (queue.Queue): task queue of dictionary objects which are submitted to the parallel process - spawner (BaseSpawner): Interface to start process on selected compute resources - max_cores (int): defines the number cores which can be used in parallel - max_workers (int): for backwards compatibility with the standard library, max_workers also defines the number of - cores which can be used in parallel - just like the max_cores parameter. Using max_cores is - recommended, as computers have a limited number of compute cores. - hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the - context of an HPC cluster this essential to be able to communicate to an - Executor running on a different compute node within the same allocation. And - in principle any computer should be able to resolve that their own hostname - points to the same address as localhost. Still MacOS >= 12 seems to disable - this look up for security reasons. So on MacOS it is required to set this - option to true - """ - active_task_dict: dict = {} - process_lst: list = [] - qtask_lst: list = [] - if "cores" not in kwargs: - kwargs["cores"] = 1 - while True: - task_dict = future_queue.get() - if "shutdown" in task_dict and task_dict["shutdown"]: - if task_dict["wait"]: - _ = [process.join() for process in process_lst] - future_queue.task_done() - future_queue.join() - break - elif "fn" in task_dict and "future" in task_dict: - qtask: queue.Queue = queue.Queue() - process, active_task_dict = _submit_function_to_separate_process( - task_dict=task_dict, - qtask=qtask, - active_task_dict=active_task_dict, - spawner=spawner, - executor_kwargs=kwargs, - max_cores=max_cores, - max_workers=max_workers, - hostname_localhost=hostname_localhost, - ) - qtask_lst.append(qtask) - process_lst.append(process) - future_queue.task_done() - - -def execute_tasks_with_dependencies( - future_queue: queue.Queue, - executor_queue: queue.Queue, - executor: ExecutorBase, - refresh_rate: float = 0.01, -): - """ - Resolve the dependencies of multiple tasks, by analysing which task requires concurrent.future.Futures objects from - other tasks. - - Args: - future_queue (Queue): Queue for receiving new tasks. - executor_queue (Queue): Queue for the internal executor. - executor (ExecutorBase): Executor to execute the tasks with after the dependencies are resolved. - refresh_rate (float): Set the refresh rate in seconds, how frequently the input queue is checked. - """ - wait_lst = [] - while True: - try: - task_dict = future_queue.get_nowait() - except queue.Empty: - task_dict = None - if ( # shutdown the executor - task_dict is not None and "shutdown" in task_dict and task_dict["shutdown"] - ): - executor.shutdown(wait=task_dict["wait"]) - future_queue.task_done() - future_queue.join() - break - elif ( # handle function submitted to the executor - task_dict is not None and "fn" in task_dict and "future" in task_dict - ): - future_lst, ready_flag = get_future_objects_from_input( - args=task_dict["args"], kwargs=task_dict["kwargs"] - ) - exception_lst = get_exception_lst(future_lst=future_lst) - if not check_exception_was_raised(future_obj=task_dict["future"]): - if len(exception_lst) > 0: - task_dict["future"].set_exception(exception_lst[0]) - elif len(future_lst) == 0 or ready_flag: - # No future objects are used in the input or all future objects are already done - task_dict["args"], task_dict["kwargs"] = update_futures_in_input( - args=task_dict["args"], kwargs=task_dict["kwargs"] - ) - executor_queue.put(task_dict) - else: # Otherwise add the function to the wait list - task_dict["future_lst"] = future_lst - wait_lst.append(task_dict) - future_queue.task_done() - elif len(wait_lst) > 0: - number_waiting = len(wait_lst) - # Check functions in the wait list and execute them if all future objects are now ready - wait_lst = _submit_waiting_task( - wait_lst=wait_lst, executor_queue=executor_queue - ) - # if no job is ready, sleep for a moment - if len(wait_lst) == number_waiting: - sleep(refresh_rate) - else: - # If there is nothing else to do, sleep for a moment - sleep(refresh_rate) - - def _get_backend_path( cores: int, ) -> list: @@ -427,134 +99,7 @@ def _get_backend_path( return command_lst -def _wait_for_free_slots( - active_task_dict: dict, - cores_requested: int, - max_cores: Optional[int] = None, - max_workers: Optional[int] = None, -) -> dict: - """ - Wait for available computing resources to become available. - - Args: - active_task_dict (dict): Dictionary containing the future objects and the number of cores they require - cores_requested (int): Number of cores required for executing the next task - max_cores (int): Maximum number cores which can be used - max_workers (int): for backwards compatibility with the standard library, max_workers also defines the number of - cores which can be used in parallel - just like the max_cores parameter. Using max_cores is - recommended, as computers have a limited number of compute cores. - - Returns: - dict: Dictionary containing the future objects and the number of cores they require - """ - if max_cores is not None: - while sum(active_task_dict.values()) + cores_requested > max_cores: - active_task_dict = { - k: v for k, v in active_task_dict.items() if not k.done() - } - elif max_workers is not None and max_cores is None: - while len(active_task_dict.values()) + 1 > max_workers: - active_task_dict = { - k: v for k, v in active_task_dict.items() if not k.done() - } - return active_task_dict - - -def _submit_waiting_task(wait_lst: list[dict], executor_queue: queue.Queue) -> list: - """ - Submit the waiting tasks, which future inputs have been completed, to the executor - - Args: - wait_lst (list): List of waiting tasks - executor_queue (Queue): Queue of the internal executor - - Returns: - list: list tasks which future inputs have not been completed - """ - wait_tmp_lst = [] - for task_wait_dict in wait_lst: - exception_lst = get_exception_lst(future_lst=task_wait_dict["future_lst"]) - if len(exception_lst) > 0: - task_wait_dict["future"].set_exception(exception_lst[0]) - elif all(future.done() for future in task_wait_dict["future_lst"]): - del task_wait_dict["future_lst"] - task_wait_dict["args"], task_wait_dict["kwargs"] = update_futures_in_input( - args=task_wait_dict["args"], kwargs=task_wait_dict["kwargs"] - ) - executor_queue.put(task_wait_dict) - else: - wait_tmp_lst.append(task_wait_dict) - return wait_tmp_lst - - -def _submit_function_to_separate_process( - task_dict: dict, - active_task_dict: dict, - qtask: queue.Queue, - spawner: type[BaseSpawner], - executor_kwargs: dict, - max_cores: Optional[int] = None, - max_workers: Optional[int] = None, - hostname_localhost: Optional[bool] = None, -): - """ - Submit function to be executed in separate Python process - Args: - task_dict (dict): task submitted to the executor as dictionary. This dictionary has the following keys - {"fn": Callable, "args": (), "kwargs": {}, "resource_dict": {}} - active_task_dict (dict): Dictionary containing the future objects and the number of cores they require - qtask (queue.Queue): Queue to communicate with the thread linked to the process executing the python function - spawner (BaseSpawner): Interface to start process on selected compute resources - executor_kwargs (dict): keyword parameters used to initialize the Executor - max_cores (int): defines the number cores which can be used in parallel - max_workers (int): for backwards compatibility with the standard library, max_workers also defines the number of - cores which can be used in parallel - just like the max_cores parameter. Using max_cores is - recommended, as computers have a limited number of compute cores. - hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the - context of an HPC cluster this essential to be able to communicate to an - Executor running on a different compute node within the same allocation. And - in principle any computer should be able to resolve that their own hostname - points to the same address as localhost. Still MacOS >= 12 seems to disable - this look up for security reasons. So on MacOS it is required to set this - option to true - Returns: - RaisingThread, dict: thread for communicating with the python process which is executing the function and - dictionary containing the future objects and the number of cores they require - """ - resource_dict = task_dict.pop("resource_dict").copy() - qtask.put(task_dict) - qtask.put({"shutdown": True, "wait": True}) - if "cores" not in resource_dict or ( - resource_dict["cores"] == 1 and executor_kwargs["cores"] >= 1 - ): - resource_dict["cores"] = executor_kwargs["cores"] - slots_required = resource_dict["cores"] * resource_dict.get("threads_per_core", 1) - active_task_dict = _wait_for_free_slots( - active_task_dict=active_task_dict, - cores_requested=slots_required, - max_cores=max_cores, - max_workers=max_workers, - ) - active_task_dict[task_dict["future"]] = slots_required - task_kwargs = executor_kwargs.copy() - task_kwargs.update(resource_dict) - task_kwargs.update( - { - "future_queue": qtask, - "spawner": spawner, - "hostname_localhost": hostname_localhost, - "init_function": None, - } - ) - process = Thread( - target=execute_parallel_tasks, - kwargs=task_kwargs, - ) - process.start() - return process, active_task_dict - - -def _execute_task( +def _execute_task_without_cache( interface: SocketInterface, task_dict: dict, future_queue: queue.Queue ): """ diff --git a/executorlib/interactive/slurm.py b/executorlib/interactive/slurmspawner.py similarity index 100% rename from executorlib/interactive/slurm.py rename to executorlib/interactive/slurmspawner.py diff --git a/executorlib/interfaces/flux.py b/executorlib/interfaces/flux.py index 35814074..b6b1c058 100644 --- a/executorlib/interfaces/flux.py +++ b/executorlib/interfaces/flux.py @@ -1,10 +1,9 @@ +import contextlib from typing import Callable, Optional, Union -from executorlib.interactive.executor import ExecutorWithDependencies -from executorlib.interactive.shared import ( - InteractiveExecutor, - InteractiveStepExecutor, -) +from executorlib.interactive.blockallocation import BlockAllocationExecutor +from executorlib.interactive.dependency import DependencyExecutor +from executorlib.interactive.onetoone import OneTaskPerProcessExecutor from executorlib.standalone.inputcheck import ( check_command_line_argument_lst, check_init_function, @@ -15,13 +14,11 @@ validate_number_of_cores, ) -try: # The PyFluxExecutor requires flux-base to be installed. - from executorlib.interactive.flux import FluxPythonSpawner - from executorlib.interactive.flux import ( - validate_max_workers as validate_max_workers_flux, +with contextlib.suppress(ImportError): + from executorlib.interactive.fluxspawner import ( + FluxPythonSpawner, + validate_max_workers, ) -except ImportError: - pass class FluxJobExecutor: @@ -189,7 +186,7 @@ def __new__( {k: v for k, v in default_resource_dict.items() if k not in resource_dict} ) if not disable_dependencies: - return ExecutorWithDependencies( + return DependencyExecutor( executor=create_flux_executor( max_workers=max_workers, cache_directory=cache_directory, @@ -397,7 +394,7 @@ def __new__( disable_dependencies=disable_dependencies, ) else: - return ExecutorWithDependencies( + return DependencyExecutor( executor=create_flux_executor( max_workers=max_workers, cache_directory=cache_directory, @@ -430,7 +427,7 @@ def create_flux_executor( hostname_localhost: Optional[bool] = None, block_allocation: bool = False, init_function: Optional[Callable] = None, -) -> Union[InteractiveStepExecutor, InteractiveExecutor]: +) -> Union[OneTaskPerProcessExecutor, BlockAllocationExecutor]: """ Create a flux executor @@ -496,18 +493,18 @@ def create_flux_executor( cores_per_worker=cores_per_worker, set_local_cores=False, ) - validate_max_workers_flux( + validate_max_workers( max_workers=max_workers, cores=cores_per_worker, threads_per_core=resource_dict.get("threads_per_core", 1), ) - return InteractiveExecutor( + return BlockAllocationExecutor( max_workers=max_workers, executor_kwargs=resource_dict, spawner=FluxPythonSpawner, ) else: - return InteractiveStepExecutor( + return OneTaskPerProcessExecutor( max_cores=max_cores, max_workers=max_workers, executor_kwargs=resource_dict, diff --git a/executorlib/interfaces/single.py b/executorlib/interfaces/single.py index ec594c44..e551d55b 100644 --- a/executorlib/interfaces/single.py +++ b/executorlib/interfaces/single.py @@ -1,10 +1,8 @@ from typing import Callable, Optional, Union -from executorlib.interactive.executor import ExecutorWithDependencies -from executorlib.interactive.shared import ( - InteractiveExecutor, - InteractiveStepExecutor, -) +from executorlib.interactive.blockallocation import BlockAllocationExecutor +from executorlib.interactive.dependency import DependencyExecutor +from executorlib.interactive.onetoone import OneTaskPerProcessExecutor from executorlib.standalone.inputcheck import ( check_command_line_argument_lst, check_gpus_per_worker, @@ -164,7 +162,7 @@ def __new__( {k: v for k, v in default_resource_dict.items() if k not in resource_dict} ) if not disable_dependencies: - return ExecutorWithDependencies( + return DependencyExecutor( executor=create_single_node_executor( max_workers=max_workers, cache_directory=cache_directory, @@ -201,7 +199,7 @@ def create_single_node_executor( hostname_localhost: Optional[bool] = None, block_allocation: bool = False, init_function: Optional[Callable] = None, -) -> Union[InteractiveStepExecutor, InteractiveExecutor]: +) -> Union[OneTaskPerProcessExecutor, BlockAllocationExecutor]: """ Create a single node executor @@ -255,7 +253,7 @@ def create_single_node_executor( del resource_dict["slurm_cmd_args"] if block_allocation: resource_dict["init_function"] = init_function - return InteractiveExecutor( + return BlockAllocationExecutor( max_workers=validate_number_of_cores( max_cores=max_cores, max_workers=max_workers, @@ -266,7 +264,7 @@ def create_single_node_executor( spawner=MpiExecSpawner, ) else: - return InteractiveStepExecutor( + return OneTaskPerProcessExecutor( max_cores=max_cores, max_workers=max_workers, executor_kwargs=resource_dict, diff --git a/executorlib/interfaces/slurm.py b/executorlib/interfaces/slurm.py index daca5ec5..bdb2ef82 100644 --- a/executorlib/interfaces/slurm.py +++ b/executorlib/interfaces/slurm.py @@ -1,14 +1,9 @@ from typing import Callable, Optional, Union -from executorlib.interactive.executor import ExecutorWithDependencies -from executorlib.interactive.shared import ( - InteractiveExecutor, - InteractiveStepExecutor, -) -from executorlib.interactive.slurm import SrunSpawner -from executorlib.interactive.slurm import ( - validate_max_workers as validate_max_workers_slurm, -) +from executorlib.interactive.blockallocation import BlockAllocationExecutor +from executorlib.interactive.dependency import DependencyExecutor +from executorlib.interactive.onetoone import OneTaskPerProcessExecutor +from executorlib.interactive.slurmspawner import SrunSpawner, validate_max_workers from executorlib.standalone.inputcheck import ( check_init_function, check_plot_dependency_graph, @@ -188,7 +183,7 @@ def __new__( disable_dependencies=disable_dependencies, ) else: - return ExecutorWithDependencies( + return DependencyExecutor( executor=create_slurm_executor( max_workers=max_workers, cache_directory=cache_directory, @@ -361,7 +356,7 @@ def __new__( {k: v for k, v in default_resource_dict.items() if k not in resource_dict} ) if not disable_dependencies: - return ExecutorWithDependencies( + return DependencyExecutor( executor=create_slurm_executor( max_workers=max_workers, cache_directory=cache_directory, @@ -398,7 +393,7 @@ def create_slurm_executor( hostname_localhost: Optional[bool] = None, block_allocation: bool = False, init_function: Optional[Callable] = None, -) -> Union[InteractiveStepExecutor, InteractiveExecutor]: +) -> Union[OneTaskPerProcessExecutor, BlockAllocationExecutor]: """ Create a SLURM executor @@ -451,18 +446,18 @@ def create_slurm_executor( cores_per_worker=cores_per_worker, set_local_cores=False, ) - validate_max_workers_slurm( + validate_max_workers( max_workers=max_workers, cores=cores_per_worker, threads_per_core=resource_dict.get("threads_per_core", 1), ) - return InteractiveExecutor( + return BlockAllocationExecutor( max_workers=max_workers, executor_kwargs=resource_dict, spawner=SrunSpawner, ) else: - return InteractiveStepExecutor( + return OneTaskPerProcessExecutor( max_cores=max_cores, max_workers=max_workers, executor_kwargs=resource_dict, diff --git a/tests/test_dependencies_executor.py b/tests/test_dependencies_executor.py index 774df6d3..3650845a 100644 --- a/tests/test_dependencies_executor.py +++ b/tests/test_dependencies_executor.py @@ -1,13 +1,12 @@ from concurrent.futures import Future import unittest -import sys from time import sleep from queue import Queue from threading import Thread from executorlib import SingleNodeExecutor from executorlib.interfaces.single import create_single_node_executor -from executorlib.interactive.shared import execute_tasks_with_dependencies +from executorlib.interactive.dependency import _execute_tasks_with_dependencies from executorlib.standalone.serialize import cloudpickle_register @@ -91,7 +90,7 @@ def test_dependency_steps(self): }, ) process = Thread( - target=execute_tasks_with_dependencies, + target=_execute_tasks_with_dependencies, kwargs={ "future_queue": q, "executor_queue": executor._future_queue, @@ -143,7 +142,7 @@ def test_dependency_steps_error(self): }, ) process = Thread( - target=execute_tasks_with_dependencies, + target=_execute_tasks_with_dependencies, kwargs={ "future_queue": q, "executor_queue": executor._future_queue, @@ -197,7 +196,7 @@ def test_dependency_steps_error_before(self): }, ) process = Thread( - target=execute_tasks_with_dependencies, + target=_execute_tasks_with_dependencies, kwargs={ "future_queue": q, "executor_queue": executor._future_queue, diff --git a/tests/test_executor_backend_flux.py b/tests/test_executor_backend_flux.py index 0c7c7ce8..d5505e68 100644 --- a/tests/test_executor_backend_flux.py +++ b/tests/test_executor_backend_flux.py @@ -8,7 +8,7 @@ try: import flux.job - from executorlib.interactive.flux import FluxPythonSpawner + from executorlib.interactive.fluxspawner import FluxPythonSpawner skip_flux_test = "FLUX_URI" not in os.environ pmi = os.environ.get("EXECUTORLIB_PMIX", None) diff --git a/tests/test_flux_executor.py b/tests/test_flux_executor.py index 90475fbf..95e7bfe9 100644 --- a/tests/test_flux_executor.py +++ b/tests/test_flux_executor.py @@ -5,14 +5,14 @@ import numpy as np -from executorlib.interactive.shared import InteractiveExecutor +from executorlib.interactive.shared import execute_tasks +from executorlib.interactive.blockallocation import BlockAllocationExecutor from executorlib.standalone.serialize import cloudpickle_register -from executorlib.interactive.shared import execute_parallel_tasks try: import flux.job - from executorlib.interactive.flux import FluxPythonSpawner + from executorlib.interactive.fluxspawner import FluxPythonSpawner skip_flux_test = "FLUX_URI" not in os.environ pmi = os.environ.get("EXECUTORLIB_PMIX", None) @@ -48,7 +48,7 @@ def setUp(self): self.flux_executor = flux.job.FluxExecutor() def test_flux_executor_serial(self): - with InteractiveExecutor( + with BlockAllocationExecutor( max_workers=2, executor_kwargs={"flux_executor": self.flux_executor}, spawner=FluxPythonSpawner, @@ -61,7 +61,7 @@ def test_flux_executor_serial(self): self.assertTrue(fs_2.done()) def test_flux_executor_threads(self): - with InteractiveExecutor( + with BlockAllocationExecutor( max_workers=1, executor_kwargs={ "flux_executor": self.flux_executor, @@ -77,7 +77,7 @@ def test_flux_executor_threads(self): self.assertTrue(fs_2.done()) def test_flux_executor_parallel(self): - with InteractiveExecutor( + with BlockAllocationExecutor( max_workers=1, executor_kwargs={ "flux_executor": self.flux_executor, @@ -91,7 +91,7 @@ def test_flux_executor_parallel(self): self.assertTrue(fs_1.done()) def test_single_task(self): - with InteractiveExecutor( + with BlockAllocationExecutor( max_workers=1, executor_kwargs={ "flux_executor": self.flux_executor, @@ -112,7 +112,7 @@ def test_execute_task(self): q.put({"fn": calc, "args": (), "kwargs": {"i": 2}, "future": f}) q.put({"shutdown": True, "wait": True}) cloudpickle_register(ind=1) - execute_parallel_tasks( + execute_tasks( future_queue=q, cores=1, flux_executor=self.flux_executor, @@ -127,7 +127,7 @@ def test_execute_task_threads(self): q.put({"fn": calc, "args": (), "kwargs": {"i": 2}, "future": f}) q.put({"shutdown": True, "wait": True}) cloudpickle_register(ind=1) - execute_parallel_tasks( + execute_tasks( future_queue=q, cores=1, threads_per_core=1, @@ -138,7 +138,7 @@ def test_execute_task_threads(self): q.join() def test_internal_memory(self): - with InteractiveExecutor( + with BlockAllocationExecutor( max_workers=1, executor_kwargs={ "flux_executor": self.flux_executor, diff --git a/tests/test_local_executor.py b/tests/test_local_executor.py index be453da0..a52a1327 100644 --- a/tests/test_local_executor.py +++ b/tests/test_local_executor.py @@ -9,11 +9,9 @@ from executorlib.base.executor import ExecutorBase from executorlib.standalone.interactive.spawner import MpiExecSpawner -from executorlib.interactive.shared import ( - InteractiveExecutor, - InteractiveStepExecutor, - execute_parallel_tasks, -) +from executorlib.interactive.shared import execute_tasks +from executorlib.interactive.blockallocation import BlockAllocationExecutor +from executorlib.interactive.onetoone import OneTaskPerProcessExecutor from executorlib.standalone.interactive.backend import call_funct from executorlib.standalone.serialize import cloudpickle_register @@ -66,7 +64,7 @@ def sleep_one(i): class TestPyMpiExecutorSerial(unittest.TestCase): def test_pympiexecutor_two_workers(self): - with InteractiveExecutor( + with BlockAllocationExecutor( max_workers=2, executor_kwargs={}, spawner=MpiExecSpawner, @@ -80,7 +78,7 @@ def test_pympiexecutor_two_workers(self): self.assertTrue(fs_2.done()) def test_pympiexecutor_one_worker(self): - with InteractiveExecutor( + with BlockAllocationExecutor( max_workers=1, executor_kwargs={}, spawner=MpiExecSpawner, @@ -96,7 +94,7 @@ def test_pympiexecutor_one_worker(self): class TestPyMpiExecutorStepSerial(unittest.TestCase): def test_pympiexecutor_two_workers(self): - with InteractiveStepExecutor( + with OneTaskPerProcessExecutor( max_cores=2, executor_kwargs={}, spawner=MpiExecSpawner, @@ -110,7 +108,7 @@ def test_pympiexecutor_two_workers(self): self.assertTrue(fs_2.done()) def test_pympiexecutor_one_worker(self): - with InteractiveStepExecutor( + with OneTaskPerProcessExecutor( max_cores=1, executor_kwargs={}, spawner=MpiExecSpawner, @@ -129,7 +127,7 @@ def test_pympiexecutor_one_worker(self): ) class TestPyMpiExecutorMPI(unittest.TestCase): def test_pympiexecutor_one_worker_with_mpi(self): - with InteractiveExecutor( + with BlockAllocationExecutor( max_workers=1, executor_kwargs={"cores": 2}, spawner=MpiExecSpawner, @@ -140,7 +138,7 @@ def test_pympiexecutor_one_worker_with_mpi(self): self.assertTrue(fs_1.done()) def test_pympiexecutor_one_worker_with_mpi_multiple_submissions(self): - with InteractiveExecutor( + with BlockAllocationExecutor( max_workers=1, executor_kwargs={"cores": 2}, spawner=MpiExecSpawner, @@ -160,7 +158,7 @@ def test_pympiexecutor_one_worker_with_mpi_multiple_submissions(self): ) def test_pympiexecutor_one_worker_with_mpi_echo(self): - with InteractiveExecutor( + with BlockAllocationExecutor( max_workers=1, executor_kwargs={"cores": 2}, spawner=MpiExecSpawner, @@ -175,7 +173,7 @@ def test_pympiexecutor_one_worker_with_mpi_echo(self): ) class TestPyMpiStepExecutorMPI(unittest.TestCase): def test_pympiexecutor_one_worker_with_mpi(self): - with InteractiveStepExecutor( + with OneTaskPerProcessExecutor( max_cores=2, executor_kwargs={"cores": 2}, spawner=MpiExecSpawner, @@ -186,7 +184,7 @@ def test_pympiexecutor_one_worker_with_mpi(self): self.assertTrue(fs_1.done()) def test_pympiexecutor_one_worker_with_mpi_multiple_submissions(self): - with InteractiveStepExecutor( + with OneTaskPerProcessExecutor( max_cores=2, executor_kwargs={"cores": 2}, spawner=MpiExecSpawner, @@ -206,7 +204,7 @@ def test_pympiexecutor_one_worker_with_mpi_multiple_submissions(self): ) def test_pympiexecutor_one_worker_with_mpi_echo(self): - with InteractiveStepExecutor( + with OneTaskPerProcessExecutor( max_cores=2, executor_kwargs={"cores": 2}, spawner=MpiExecSpawner, @@ -218,7 +216,7 @@ def test_pympiexecutor_one_worker_with_mpi_echo(self): class TestPyMpiExecutorInitFunction(unittest.TestCase): def test_internal_memory(self): - with InteractiveExecutor( + with BlockAllocationExecutor( max_workers=1, executor_kwargs={ "cores": 1, @@ -246,7 +244,7 @@ def test_execute_task(self): q.put({"fn": get_global, "args": (), "kwargs": {}, "future": f}) q.put({"shutdown": True, "wait": True}) cloudpickle_register(ind=1) - execute_parallel_tasks( + execute_tasks( future_queue=q, cores=1, openmpi_oversubscribe=False, @@ -259,7 +257,7 @@ def test_execute_task(self): class TestFuturePool(unittest.TestCase): def test_pool_serial(self): - with InteractiveExecutor( + with BlockAllocationExecutor( max_workers=1, executor_kwargs={"cores": 1}, spawner=MpiExecSpawner, @@ -274,7 +272,7 @@ def test_pool_serial(self): self.assertEqual(output.result(), np.array(4)) def test_executor_multi_submission(self): - with InteractiveExecutor( + with BlockAllocationExecutor( max_workers=1, executor_kwargs={"cores": 1}, spawner=MpiExecSpawner, @@ -287,7 +285,7 @@ def test_executor_multi_submission(self): self.assertTrue(fs_2.done()) def test_shutdown(self): - p = InteractiveExecutor( + p = BlockAllocationExecutor( max_workers=1, executor_kwargs={"cores": 1}, spawner=MpiExecSpawner, @@ -303,7 +301,7 @@ def test_shutdown(self): fs2.result() def test_pool_serial_map(self): - with InteractiveExecutor( + with BlockAllocationExecutor( max_workers=1, executor_kwargs={"cores": 1}, spawner=MpiExecSpawner, @@ -313,7 +311,7 @@ def test_pool_serial_map(self): def test_executor_exception(self): with self.assertRaises(RuntimeError): - with InteractiveExecutor( + with BlockAllocationExecutor( max_workers=1, executor_kwargs={"cores": 1}, spawner=MpiExecSpawner, @@ -323,7 +321,7 @@ def test_executor_exception(self): def test_executor_exception_future(self): with self.assertRaises(RuntimeError): - with InteractiveExecutor( + with BlockAllocationExecutor( max_workers=1, executor_kwargs={"cores": 1}, spawner=MpiExecSpawner, @@ -344,7 +342,7 @@ def test_meta(self): "openmpi_oversubscribe": False, "max_workers": 1, } - with InteractiveExecutor( + with BlockAllocationExecutor( max_workers=1, executor_kwargs={ "cores": 2, @@ -372,7 +370,7 @@ def test_meta_step(self): "openmpi_oversubscribe": False, "max_cores": 2, } - with InteractiveStepExecutor( + with OneTaskPerProcessExecutor( max_cores=2, executor_kwargs={ "cores": 2, @@ -392,7 +390,7 @@ def test_meta_step(self): skip_mpi4py_test, "mpi4py is not installed, so the mpi4py tests are skipped." ) def test_pool_multi_core(self): - with InteractiveExecutor( + with BlockAllocationExecutor( max_workers=1, executor_kwargs={"cores": 2}, spawner=MpiExecSpawner, @@ -410,7 +408,7 @@ def test_pool_multi_core(self): skip_mpi4py_test, "mpi4py is not installed, so the mpi4py tests are skipped." ) def test_pool_multi_core_map(self): - with InteractiveExecutor( + with BlockAllocationExecutor( max_workers=1, executor_kwargs={"cores": 2}, spawner=MpiExecSpawner, @@ -428,7 +426,7 @@ def test_execute_task_failed_no_argument(self): q.put({"shutdown": True, "wait": True}) cloudpickle_register(ind=1) with self.assertRaises(TypeError): - execute_parallel_tasks( + execute_tasks( future_queue=q, cores=1, openmpi_oversubscribe=False, @@ -444,7 +442,7 @@ def test_execute_task_failed_wrong_argument(self): q.put({"shutdown": True, "wait": True}) cloudpickle_register(ind=1) with self.assertRaises(TypeError): - execute_parallel_tasks( + execute_tasks( future_queue=q, cores=1, openmpi_oversubscribe=False, @@ -459,7 +457,7 @@ def test_execute_task(self): q.put({"fn": calc_array, "args": (), "kwargs": {"i": 2}, "future": f}) q.put({"shutdown": True, "wait": True}) cloudpickle_register(ind=1) - execute_parallel_tasks( + execute_tasks( future_queue=q, cores=1, openmpi_oversubscribe=False, @@ -477,7 +475,7 @@ def test_execute_task_parallel(self): q.put({"fn": calc_array, "args": (), "kwargs": {"i": 2}, "future": f}) q.put({"shutdown": True, "wait": True}) cloudpickle_register(ind=1) - execute_parallel_tasks( + execute_tasks( future_queue=q, cores=2, openmpi_oversubscribe=False, @@ -500,7 +498,7 @@ def test_execute_task_cache(self): q.put({"fn": calc, "args": (), "kwargs": {"i": 1}, "future": f}) q.put({"shutdown": True, "wait": True}) cloudpickle_register(ind=1) - execute_parallel_tasks( + execute_tasks( future_queue=q, cores=1, openmpi_oversubscribe=False, @@ -519,7 +517,7 @@ def test_execute_task_cache_failed_no_argument(self): q.put({"fn": calc_array, "args": (), "kwargs": {}, "future": f}) cloudpickle_register(ind=1) with self.assertRaises(TypeError): - execute_parallel_tasks( + execute_tasks( future_queue=q, cores=1, openmpi_oversubscribe=False, diff --git a/tests/test_local_executor_future.py b/tests/test_local_executor_future.py index 7ab3f1cd..24ed3c04 100644 --- a/tests/test_local_executor_future.py +++ b/tests/test_local_executor_future.py @@ -5,7 +5,7 @@ import numpy as np -from executorlib.interactive.shared import InteractiveExecutor +from executorlib.interactive.blockallocation import BlockAllocationExecutor from executorlib.standalone.interactive.spawner import MpiExecSpawner @@ -18,7 +18,7 @@ def calc(i): class TestFuture(unittest.TestCase): def test_pool_serial(self): - with InteractiveExecutor( + with BlockAllocationExecutor( max_workers=1, executor_kwargs={"cores": 1}, spawner=MpiExecSpawner, @@ -34,7 +34,7 @@ def test_pool_serial(self): skip_mpi4py_test, "mpi4py is not installed, so the mpi4py tests are skipped." ) def test_pool_serial_multi_core(self): - with InteractiveExecutor( + with BlockAllocationExecutor( max_workers=1, executor_kwargs={"cores": 2}, spawner=MpiExecSpawner, @@ -67,7 +67,7 @@ def callback(future): def submit(): # Executor only exists in this scope and can get garbage collected after # this function is exits - future = InteractiveExecutor( + future = BlockAllocationExecutor( max_workers=1, executor_kwargs={}, spawner=MpiExecSpawner, @@ -108,7 +108,7 @@ def __init__(self): def run(self): self.running = True - future = InteractiveExecutor( + future = BlockAllocationExecutor( max_workers=1, executor_kwargs={}, spawner=MpiExecSpawner, diff --git a/tests/test_plot_dependency_flux.py b/tests/test_plot_dependency_flux.py index 7c1e2e58..1cc5ce54 100644 --- a/tests/test_plot_dependency_flux.py +++ b/tests/test_plot_dependency_flux.py @@ -10,7 +10,7 @@ try: import pygraphviz import flux.job - from executorlib.interactive.flux import FluxPythonSpawner + from executorlib.interactive.fluxspawner import FluxPythonSpawner skip_graphviz_flux_test = "FLUX_URI" not in os.environ except ImportError: diff --git a/tests/test_pysqa_subprocess.py b/tests/test_pysqa_subprocess.py index 15334c05..147e871d 100644 --- a/tests/test_pysqa_subprocess.py +++ b/tests/test_pysqa_subprocess.py @@ -1,5 +1,5 @@ import unittest -from executorlib.interactive.slurm import generate_slurm_command +from executorlib.interactive.slurmspawner import generate_slurm_command try: from executorlib.cache.queue_spawner import _pysqa_execute_command diff --git a/tests/test_shared_backend.py b/tests/test_shared_backend.py index fd374561..40bda2e1 100644 --- a/tests/test_shared_backend.py +++ b/tests/test_shared_backend.py @@ -4,7 +4,7 @@ from executorlib.standalone.interactive.backend import parse_arguments from executorlib.standalone.interactive.spawner import MpiExecSpawner -from executorlib.interactive.slurm import SrunSpawner +from executorlib.interactive.slurmspawner import SrunSpawner class TestParser(unittest.TestCase): diff --git a/tests/test_shell_executor.py b/tests/test_shell_executor.py index caea210e..7a69cf22 100644 --- a/tests/test_shell_executor.py +++ b/tests/test_shell_executor.py @@ -5,7 +5,7 @@ from executorlib import SingleNodeExecutor from executorlib.standalone.serialize import cloudpickle_register -from executorlib.interactive.shared import execute_parallel_tasks +from executorlib.interactive.shared import execute_tasks from executorlib.standalone.interactive.spawner import MpiExecSpawner @@ -32,7 +32,7 @@ def test_execute_single_task(self): test_queue.put({"shutdown": True, "wait": True}) cloudpickle_register(ind=1) self.assertFalse(f.done()) - execute_parallel_tasks( + execute_tasks( future_queue=test_queue, cores=1, openmpi_oversubscribe=False, @@ -58,7 +58,7 @@ def test_wrong_error(self): ) cloudpickle_register(ind=1) with self.assertRaises(TypeError): - execute_parallel_tasks( + execute_tasks( future_queue=test_queue, cores=1, openmpi_oversubscribe=False, @@ -85,7 +85,7 @@ def test_broken_executable(self): ) cloudpickle_register(ind=1) with self.assertRaises(FileNotFoundError): - execute_parallel_tasks( + execute_tasks( future_queue=test_queue, cores=1, openmpi_oversubscribe=False, diff --git a/tests/test_shell_interactive.py b/tests/test_shell_interactive.py index 553639af..6b4e2d77 100644 --- a/tests/test_shell_interactive.py +++ b/tests/test_shell_interactive.py @@ -6,7 +6,7 @@ from executorlib import SingleNodeExecutor from executorlib.standalone.serialize import cloudpickle_register -from executorlib.interactive.shared import execute_parallel_tasks +from executorlib.interactive.shared import execute_tasks from executorlib.standalone.interactive.spawner import MpiExecSpawner @@ -88,7 +88,7 @@ def test_execute_single_task(self): cloudpickle_register(ind=1) self.assertFalse(future_lines.done()) self.assertFalse(future_pattern.done()) - execute_parallel_tasks( + execute_tasks( future_queue=test_queue, cores=1, openmpi_oversubscribe=False,