diff --git a/executorlib/task_scheduler/interactive/blockallocation.py b/executorlib/task_scheduler/interactive/blockallocation.py index ff4c7375..c34a5fa8 100644 --- a/executorlib/task_scheduler/interactive/blockallocation.py +++ b/executorlib/task_scheduler/interactive/blockallocation.py @@ -3,14 +3,16 @@ from threading import Thread from typing import Callable, Optional +from executorlib.standalone.command import get_interactive_execute_command from executorlib.standalone.inputcheck import ( check_resource_dict, check_resource_dict_is_empty, ) +from executorlib.standalone.interactive.communication import interface_bootup from executorlib.standalone.interactive.spawner import BaseSpawner, MpiExecSpawner from executorlib.standalone.queue import cancel_items_in_queue from executorlib.task_scheduler.base import TaskSchedulerBase -from executorlib.task_scheduler.interactive.shared import execute_multiple_tasks +from executorlib.task_scheduler.interactive.shared import execute_task_dict, task_done class BlockAllocationTaskScheduler(TaskSchedulerBase): @@ -64,7 +66,7 @@ def __init__( self._set_process( process=[ Thread( - target=execute_multiple_tasks, + target=_execute_multiple_tasks, kwargs=executor_kwargs | {"worker_id": worker_id}, ) for worker_id in range(self._max_workers) @@ -90,7 +92,7 @@ def max_workers(self, max_workers: int): elif self._max_workers < max_workers: new_process_lst = [ Thread( - target=execute_multiple_tasks, + target=_execute_multiple_tasks, kwargs=self._process_kwargs, ) for _ in range(max_workers - self._max_workers) @@ -175,3 +177,74 @@ def _set_process(self, process: list[Thread]): # type: ignore self._process = process for process_instance in self._process: process_instance.start() + + +def _execute_multiple_tasks( + future_queue: queue.Queue, + cores: int = 1, + spawner: type[BaseSpawner] = MpiExecSpawner, + hostname_localhost: Optional[bool] = None, + init_function: Optional[Callable] = None, + cache_directory: Optional[str] = None, + cache_key: Optional[str] = None, + queue_join_on_shutdown: bool = True, + log_obj_size: bool = False, + error_log_file: Optional[str] = None, + worker_id: Optional[int] = None, + **kwargs, +) -> None: + """ + Execute a single tasks in parallel using the message passing interface (MPI). + + Args: + future_queue (queue.Queue): task queue of dictionary objects which are submitted to the parallel process + cores (int): defines the total number of MPI ranks to use + spawner (BaseSpawner): Spawner to start process on selected compute resources + hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the + context of an HPC cluster this essential to be able to communicate to an + Executor running on a different compute node within the same allocation. And + in principle any computer should be able to resolve that their own hostname + points to the same address as localhost. Still MacOS >= 12 seems to disable + this look up for security reasons. So on MacOS it is required to set this + option to true + init_function (Callable): optional function to preset arguments for functions which are submitted later + cache_directory (str, optional): The directory to store cache files. Defaults to "executorlib_cache". + cache_key (str, optional): By default the cache_key is generated based on the function hash, this can be + overwritten by setting the cache_key. + queue_join_on_shutdown (bool): Join communication queue when thread is closed. Defaults to True. + log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. + error_log_file (str): Name of the error log file to use for storing exceptions raised by the Python functions + submitted to the Executor. + worker_id (int): Communicate the worker which ID was assigned to it for future reference and resource + distribution. + """ + interface = interface_bootup( + command_lst=get_interactive_execute_command( + cores=cores, + ), + connections=spawner(cores=cores, **kwargs), + hostname_localhost=hostname_localhost, + log_obj_size=log_obj_size, + worker_id=worker_id, + ) + if init_function is not None: + interface.send_dict( + input_dict={"init": True, "fn": init_function, "args": (), "kwargs": {}} + ) + while True: + task_dict = future_queue.get() + if "shutdown" in task_dict and task_dict["shutdown"]: + interface.shutdown(wait=task_dict["wait"]) + task_done(future_queue=future_queue) + if queue_join_on_shutdown: + future_queue.join() + break + elif "fn" in task_dict and "future" in task_dict: + execute_task_dict( + task_dict=task_dict, + interface=interface, + cache_directory=cache_directory, + cache_key=cache_key, + error_log_file=error_log_file, + ) + task_done(future_queue=future_queue) diff --git a/executorlib/task_scheduler/interactive/onetoone.py b/executorlib/task_scheduler/interactive/onetoone.py index e97fa4bc..02a63814 100644 --- a/executorlib/task_scheduler/interactive/onetoone.py +++ b/executorlib/task_scheduler/interactive/onetoone.py @@ -2,9 +2,11 @@ from threading import Thread from typing import Optional +from executorlib.standalone.command import get_interactive_execute_command +from executorlib.standalone.interactive.communication import interface_bootup from executorlib.standalone.interactive.spawner import BaseSpawner, MpiExecSpawner from executorlib.task_scheduler.base import TaskSchedulerBase -from executorlib.task_scheduler.interactive.shared import execute_single_task +from executorlib.task_scheduler.interactive.shared import execute_task_dict class OneProcessTaskScheduler(TaskSchedulerBase): @@ -60,13 +62,13 @@ def __init__( self._process_kwargs = executor_kwargs self._set_process( Thread( - target=_execute_task_in_separate_process, + target=_execute_single_task, kwargs=executor_kwargs, ) ) -def _execute_task_in_separate_process( +def _execute_single_task( future_queue: queue.Queue, spawner: type[BaseSpawner] = MpiExecSpawner, max_cores: Optional[int] = None, @@ -166,7 +168,6 @@ def _wrap_execute_task_in_separate_process( task_dict (dict): task submitted to the executor as dictionary. This dictionary has the following keys {"fn": Callable, "args": (), "kwargs": {}, "resource_dict": {}} active_task_dict (dict): Dictionary containing the future objects and the number of cores they require - qtask (queue.Queue): Queue to communicate with the thread linked to the process executing the python function spawner (BaseSpawner): Interface to start process on selected compute resources executor_kwargs (dict): keyword parameters used to initialize the Executor max_cores (int): defines the number cores which can be used in parallel @@ -207,8 +208,61 @@ def _wrap_execute_task_in_separate_process( } ) process = Thread( - target=execute_single_task, + target=_execute_task_in_thread, kwargs=task_kwargs, ) process.start() return process, active_task_dict + + +def _execute_task_in_thread( + task_dict: dict, + cores: int = 1, + spawner: type[BaseSpawner] = MpiExecSpawner, + hostname_localhost: Optional[bool] = None, + cache_directory: Optional[str] = None, + cache_key: Optional[str] = None, + log_obj_size: bool = False, + error_log_file: Optional[str] = None, + worker_id: Optional[int] = None, + **kwargs, +) -> None: + """ + Execute a single tasks in parallel using the message passing interface (MPI). + + Args: + task_dict (dict): task submitted to the executor as dictionary. This dictionary has the following keys + {"fn": Callable, "args": (), "kwargs": {}, "resource_dict": {}} + cores (int): defines the total number of MPI ranks to use + spawner (BaseSpawner): Spawner to start process on selected compute resources + hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the + context of an HPC cluster this essential to be able to communicate to an + Executor running on a different compute node within the same allocation. And + in principle any computer should be able to resolve that their own hostname + points to the same address as localhost. Still MacOS >= 12 seems to disable + this look up for security reasons. So on MacOS it is required to set this + option to true + cache_directory (str, optional): The directory to store cache files. Defaults to "executorlib_cache". + cache_key (str, optional): By default the cache_key is generated based on the function hash, this can be + overwritten by setting the cache_key. + log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. + error_log_file (str): Name of the error log file to use for storing exceptions raised by the Python functions + submitted to the Executor. + worker_id (int): Communicate the worker which ID was assigned to it for future reference and resource + distribution. + """ + execute_task_dict( + task_dict=task_dict, + interface=interface_bootup( + command_lst=get_interactive_execute_command( + cores=cores, + ), + connections=spawner(cores=cores, **kwargs), + hostname_localhost=hostname_localhost, + log_obj_size=log_obj_size, + worker_id=worker_id, + ), + cache_directory=cache_directory, + cache_key=cache_key, + error_log_file=error_log_file, + ) diff --git a/executorlib/task_scheduler/interactive/shared.py b/executorlib/task_scheduler/interactive/shared.py index 0b46324d..8be9076f 100644 --- a/executorlib/task_scheduler/interactive/shared.py +++ b/executorlib/task_scheduler/interactive/shared.py @@ -2,143 +2,13 @@ import os import queue import time -from typing import Callable, Optional +from typing import Optional -from executorlib.standalone.command import get_interactive_execute_command -from executorlib.standalone.interactive.communication import ( - SocketInterface, - interface_bootup, -) -from executorlib.standalone.interactive.spawner import BaseSpawner, MpiExecSpawner +from executorlib.standalone.interactive.communication import SocketInterface from executorlib.standalone.serialize import serialize_funct -def execute_multiple_tasks( - future_queue: queue.Queue, - cores: int = 1, - spawner: type[BaseSpawner] = MpiExecSpawner, - hostname_localhost: Optional[bool] = None, - init_function: Optional[Callable] = None, - cache_directory: Optional[str] = None, - cache_key: Optional[str] = None, - queue_join_on_shutdown: bool = True, - log_obj_size: bool = False, - error_log_file: Optional[str] = None, - worker_id: Optional[int] = None, - **kwargs, -) -> None: - """ - Execute a single tasks in parallel using the message passing interface (MPI). - - Args: - future_queue (queue.Queue): task queue of dictionary objects which are submitted to the parallel process - cores (int): defines the total number of MPI ranks to use - spawner (BaseSpawner): Spawner to start process on selected compute resources - hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the - context of an HPC cluster this essential to be able to communicate to an - Executor running on a different compute node within the same allocation. And - in principle any computer should be able to resolve that their own hostname - points to the same address as localhost. Still MacOS >= 12 seems to disable - this look up for security reasons. So on MacOS it is required to set this - option to true - init_function (Callable): optional function to preset arguments for functions which are submitted later - cache_directory (str, optional): The directory to store cache files. Defaults to "executorlib_cache". - cache_key (str, optional): By default the cache_key is generated based on the function hash, this can be - overwritten by setting the cache_key. - queue_join_on_shutdown (bool): Join communication queue when thread is closed. Defaults to True. - log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. - error_log_file (str): Name of the error log file to use for storing exceptions raised by the Python functions - submitted to the Executor. - worker_id (int): Communicate the worker which ID was assigned to it for future reference and resource - distribution. - """ - interface = interface_bootup( - command_lst=get_interactive_execute_command( - cores=cores, - ), - connections=spawner(cores=cores, **kwargs), - hostname_localhost=hostname_localhost, - log_obj_size=log_obj_size, - worker_id=worker_id, - ) - if init_function is not None: - interface.send_dict( - input_dict={"init": True, "fn": init_function, "args": (), "kwargs": {}} - ) - while True: - task_dict = future_queue.get() - if "shutdown" in task_dict and task_dict["shutdown"]: - interface.shutdown(wait=task_dict["wait"]) - _task_done(future_queue=future_queue) - if queue_join_on_shutdown: - future_queue.join() - break - elif "fn" in task_dict and "future" in task_dict: - _execute_task_dict( - task_dict=task_dict, - interface=interface, - cache_directory=cache_directory, - cache_key=cache_key, - error_log_file=error_log_file, - ) - _task_done(future_queue=future_queue) - - -def execute_single_task( - task_dict: dict, - cores: int = 1, - spawner: type[BaseSpawner] = MpiExecSpawner, - hostname_localhost: Optional[bool] = None, - cache_directory: Optional[str] = None, - cache_key: Optional[str] = None, - log_obj_size: bool = False, - error_log_file: Optional[str] = None, - worker_id: Optional[int] = None, - **kwargs, -) -> None: - """ - Execute a single tasks in parallel using the message passing interface (MPI). - - Args: - future_queue (queue.Queue): task queue of dictionary objects which are submitted to the parallel process - cores (int): defines the total number of MPI ranks to use - spawner (BaseSpawner): Spawner to start process on selected compute resources - hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the - context of an HPC cluster this essential to be able to communicate to an - Executor running on a different compute node within the same allocation. And - in principle any computer should be able to resolve that their own hostname - points to the same address as localhost. Still MacOS >= 12 seems to disable - this look up for security reasons. So on MacOS it is required to set this - option to true - init_function (Callable): optional function to preset arguments for functions which are submitted later - cache_directory (str, optional): The directory to store cache files. Defaults to "executorlib_cache". - cache_key (str, optional): By default the cache_key is generated based on the function hash, this can be - overwritten by setting the cache_key. - queue_join_on_shutdown (bool): Join communication queue when thread is closed. Defaults to True. - log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. - error_log_file (str): Name of the error log file to use for storing exceptions raised by the Python functions - submitted to the Executor. - worker_id (int): Communicate the worker which ID was assigned to it for future reference and resource - distribution. - """ - _execute_task_dict( - task_dict=task_dict, - interface=interface_bootup( - command_lst=get_interactive_execute_command( - cores=cores, - ), - connections=spawner(cores=cores, **kwargs), - hostname_localhost=hostname_localhost, - log_obj_size=log_obj_size, - worker_id=worker_id, - ), - cache_directory=cache_directory, - cache_key=cache_key, - error_log_file=error_log_file, - ) - - -def _execute_task_dict( +def execute_task_dict( task_dict: dict, interface: SocketInterface, cache_directory: Optional[str] = None, @@ -171,6 +41,11 @@ def _execute_task_dict( ) +def task_done(future_queue: queue.Queue): + with contextlib.suppress(ValueError): + future_queue.task_done() + + def _execute_task_without_cache(interface: SocketInterface, task_dict: dict): """ Execute the task in the task_dict by communicating it via the interface. @@ -233,8 +108,3 @@ def _execute_task_with_cache( _, _, result = get_output(file_name=file_name) future = task_dict["future"] future.set_result(result) - - -def _task_done(future_queue: queue.Queue): - with contextlib.suppress(ValueError): - future_queue.task_done() diff --git a/tests/test_fluxpythonspawner.py b/tests/test_fluxpythonspawner.py index 7ec10fcc..264f378f 100644 --- a/tests/test_fluxpythonspawner.py +++ b/tests/test_fluxpythonspawner.py @@ -5,8 +5,7 @@ import numpy as np -from executorlib.task_scheduler.interactive.shared import execute_multiple_tasks -from executorlib.task_scheduler.interactive.blockallocation import BlockAllocationTaskScheduler +from executorlib.task_scheduler.interactive.blockallocation import BlockAllocationTaskScheduler, _execute_multiple_tasks from executorlib.standalone.serialize import cloudpickle_register @@ -112,7 +111,7 @@ def test_execute_task(self): q.put({"fn": calc, "args": (), "kwargs": {"i": 2}, "future": f}) q.put({"shutdown": True, "wait": True}) cloudpickle_register(ind=1) - execute_multiple_tasks( + _execute_multiple_tasks( future_queue=q, cores=1, flux_executor=self.flux_executor, @@ -127,7 +126,7 @@ def test_execute_task_threads(self): q.put({"fn": calc, "args": (), "kwargs": {"i": 2}, "future": f}) q.put({"shutdown": True, "wait": True}) cloudpickle_register(ind=1) - execute_multiple_tasks( + _execute_multiple_tasks( future_queue=q, cores=1, threads_per_core=1, diff --git a/tests/test_mpiexecspawner.py b/tests/test_mpiexecspawner.py index 9ebf02de..eb367e86 100644 --- a/tests/test_mpiexecspawner.py +++ b/tests/test_mpiexecspawner.py @@ -10,8 +10,7 @@ from executorlib.task_scheduler.base import TaskSchedulerBase from executorlib.standalone.interactive.spawner import MpiExecSpawner -from executorlib.task_scheduler.interactive.shared import execute_multiple_tasks -from executorlib.task_scheduler.interactive.blockallocation import BlockAllocationTaskScheduler +from executorlib.task_scheduler.interactive.blockallocation import BlockAllocationTaskScheduler, _execute_multiple_tasks from executorlib.task_scheduler.interactive.onetoone import OneProcessTaskScheduler from executorlib.standalone.interactive.backend import call_funct from executorlib.standalone.serialize import cloudpickle_register @@ -261,7 +260,7 @@ def test_execute_task(self): q.put({"fn": get_global, "args": (), "kwargs": {}, "future": f}) q.put({"shutdown": True, "wait": True}) cloudpickle_register(ind=1) - execute_multiple_tasks( + _execute_multiple_tasks( future_queue=q, cores=1, openmpi_oversubscribe=False, @@ -443,7 +442,7 @@ def test_execute_task_failed_no_argument(self): q.put({"fn": calc_array, "args": (), "kwargs": {}, "future": f}) q.put({"shutdown": True, "wait": True}) cloudpickle_register(ind=1) - execute_multiple_tasks( + _execute_multiple_tasks( future_queue=q, cores=1, openmpi_oversubscribe=False, @@ -459,7 +458,7 @@ def test_execute_task_failed_wrong_argument(self): q.put({"fn": calc_array, "args": (), "kwargs": {"j": 4}, "future": f}) q.put({"shutdown": True, "wait": True}) cloudpickle_register(ind=1) - execute_multiple_tasks( + _execute_multiple_tasks( future_queue=q, cores=1, openmpi_oversubscribe=False, @@ -475,7 +474,7 @@ def test_execute_task(self): q.put({"fn": calc_array, "args": (), "kwargs": {"i": 2}, "future": f}) q.put({"shutdown": True, "wait": True}) cloudpickle_register(ind=1) - execute_multiple_tasks( + _execute_multiple_tasks( future_queue=q, cores=1, openmpi_oversubscribe=False, @@ -493,7 +492,7 @@ def test_execute_task_parallel(self): q.put({"fn": calc_array, "args": (), "kwargs": {"i": 2}, "future": f}) q.put({"shutdown": True, "wait": True}) cloudpickle_register(ind=1) - execute_multiple_tasks( + _execute_multiple_tasks( future_queue=q, cores=2, openmpi_oversubscribe=False, @@ -516,7 +515,7 @@ def test_execute_task_cache(self): q.put({"fn": calc, "args": (), "kwargs": {"i": 1}, "future": f}) q.put({"shutdown": True, "wait": True}) cloudpickle_register(ind=1) - execute_multiple_tasks( + _execute_multiple_tasks( future_queue=q, cores=1, openmpi_oversubscribe=False, @@ -535,7 +534,7 @@ def test_execute_task_cache_failed_no_argument(self): q.put({"fn": calc_array, "args": (), "kwargs": {}, "future": f}) q.put({"shutdown": True, "wait": True}) cloudpickle_register(ind=1) - execute_multiple_tasks( + _execute_multiple_tasks( future_queue=q, cores=1, openmpi_oversubscribe=False, diff --git a/tests/test_singlenodeexecutor_shell_executor.py b/tests/test_singlenodeexecutor_shell_executor.py index 9dae5e99..1e60f338 100644 --- a/tests/test_singlenodeexecutor_shell_executor.py +++ b/tests/test_singlenodeexecutor_shell_executor.py @@ -5,7 +5,7 @@ from executorlib import SingleNodeExecutor from executorlib.standalone.serialize import cloudpickle_register -from executorlib.task_scheduler.interactive.shared import execute_multiple_tasks +from executorlib.task_scheduler.interactive.blockallocation import _execute_multiple_tasks from executorlib.standalone.interactive.spawner import MpiExecSpawner @@ -32,7 +32,7 @@ def test_execute_single_task(self): test_queue.put({"shutdown": True, "wait": True}) cloudpickle_register(ind=1) self.assertFalse(f.done()) - execute_multiple_tasks( + _execute_multiple_tasks( future_queue=test_queue, cores=1, openmpi_oversubscribe=False, @@ -58,7 +58,7 @@ def test_wrong_error(self): ) cloudpickle_register(ind=1) with self.assertRaises(TypeError): - execute_multiple_tasks( + _execute_multiple_tasks( future_queue=test_queue, cores=1, openmpi_oversubscribe=False, @@ -85,7 +85,7 @@ def test_broken_executable(self): ) cloudpickle_register(ind=1) with self.assertRaises(FileNotFoundError): - execute_multiple_tasks( + _execute_multiple_tasks( future_queue=test_queue, cores=1, openmpi_oversubscribe=False, diff --git a/tests/test_singlenodeexecutor_shell_interactive.py b/tests/test_singlenodeexecutor_shell_interactive.py index ed1f4f68..d9f42622 100644 --- a/tests/test_singlenodeexecutor_shell_interactive.py +++ b/tests/test_singlenodeexecutor_shell_interactive.py @@ -6,7 +6,7 @@ from executorlib import SingleNodeExecutor from executorlib.standalone.serialize import cloudpickle_register -from executorlib.task_scheduler.interactive.shared import execute_multiple_tasks +from executorlib.task_scheduler.interactive.blockallocation import _execute_multiple_tasks from executorlib.standalone.interactive.spawner import MpiExecSpawner @@ -88,7 +88,7 @@ def test_execute_single_task(self): cloudpickle_register(ind=1) self.assertFalse(future_lines.done()) self.assertFalse(future_pattern.done()) - execute_multiple_tasks( + _execute_multiple_tasks( future_queue=test_queue, cores=1, openmpi_oversubscribe=False,