From 5f1322977fc11ce7bfeebd86d8fd2c875596bb5e Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Sun, 31 Aug 2025 22:40:55 +0200 Subject: [PATCH 01/29] Interactive: Interrupt interface bootup when the executor is shutdown during bootup --- .../standalone/interactive/communication.py | 52 +++++++++-- executorlib/standalone/interactive/spawner.py | 17 +++- .../interactive/blockallocation.py | 86 ++++++++++++------- .../task_scheduler/interactive/onetoone.py | 33 +++---- .../task_scheduler/interactive/shared.py | 66 +++++++++++--- 5 files changed, 182 insertions(+), 72 deletions(-) diff --git a/executorlib/standalone/interactive/communication.py b/executorlib/standalone/interactive/communication.py index 1be6f13a..c2a34b12 100644 --- a/executorlib/standalone/interactive/communication.py +++ b/executorlib/standalone/interactive/communication.py @@ -1,7 +1,7 @@ import logging import sys from socket import gethostname -from typing import Any, Optional +from typing import Any, Callable, Optional import cloudpickle import zmq @@ -42,6 +42,8 @@ def __init__( if log_obj_size: self._logger = logging.getLogger("executorlib") self._spawner = spawner + self._command_lst: list[str] = [] + self._stop_function: Optional[Callable] = None def send_dict(self, input_dict: dict): """ @@ -85,7 +87,7 @@ def send_and_receive_dict(self, input_dict: dict) -> dict: Args: input_dict (dict): dictionary of commands to be communicated. The key "shutdown" is reserved to stop the - connected client from listening. + connected client from listening. Returns: dict: dictionary with response received from the connected client @@ -106,16 +108,43 @@ def bind_to_random_port(self) -> int: def bootup( self, command_lst: list[str], - ): + stop_function: Optional[Callable] = None, + ) -> bool: """ Boot up the client process to connect to the SocketInterface. Args: command_lst (list): list of strings to start the client process + stop_function (Callable): Function to stop the interface. + + Returns: + bool: Whether the interface was successfully started. """ - self._spawner.bootup( + self._command_lst = command_lst + self._stop_function = stop_function + if not self._spawner.bootup( command_lst=command_lst, - ) + stop_function=stop_function, + ): + self._reset_socket() + return False + else: + return True + + def restart(self) -> bool: + """ + Restart the client process to connect to the SocketInterface. + + Returns: + bool: Whether the interface was successfully started. + """ + if not self._spawner.bootup( + command_lst=self._command_lst, + stop_function=self._stop_function, + ): + self._reset_socket() + return False + return True def shutdown(self, wait: bool = True): """ @@ -159,7 +188,8 @@ def interface_bootup( hostname_localhost: Optional[bool] = None, log_obj_size: bool = False, worker_id: Optional[int] = None, -) -> SocketInterface: + stop_function: Optional[Callable] = None, +) -> Optional[SocketInterface]: """ Start interface for ZMQ communication @@ -177,6 +207,7 @@ def interface_bootup( log_obj_size (boolean): Enable debug mode which reports the size of the communicated objects. worker_id (int): Communicate the worker which ID was assigned to it for future reference and resource distribution. + stop_function (Callable): Function to stop the interface. Returns: executorlib.shared.communication.SocketInterface: socket interface for zmq communication @@ -198,10 +229,13 @@ def interface_bootup( "--zmqport", str(interface.bind_to_random_port()), ] - interface.bootup( + if interface.bootup( command_lst=command_lst, - ) - return interface + stop_function=stop_function, + ): + return interface + else: + return None def interface_connect(host: str, port: str) -> tuple[zmq.Context, zmq.Socket]: diff --git a/executorlib/standalone/interactive/spawner.py b/executorlib/standalone/interactive/spawner.py index 4a5cb390..0f957c1a 100644 --- a/executorlib/standalone/interactive/spawner.py +++ b/executorlib/standalone/interactive/spawner.py @@ -1,7 +1,7 @@ import os import subprocess from abc import ABC, abstractmethod -from typing import Optional +from typing import Callable, Optional MPI_COMMAND = "mpiexec" @@ -29,12 +29,17 @@ def __init__( def bootup( self, command_lst: list[str], - ): + stop_function: Optional[Callable] = None, + ) -> bool: """ Method to start the interface. Args: command_lst (list[str]): The command list to execute. + stop_function (Callable): Function to stop the interface. + + Returns: + bool: Whether the interface was successfully started. """ raise NotImplementedError @@ -87,12 +92,17 @@ def __init__( def bootup( self, command_lst: list[str], - ): + stop_function: Optional[Callable] = None, + ) -> bool: """ Method to start the subprocess interface. Args: command_lst (list[str]): The command list to execute. + stop_function (Callable): Function to stop the interface. + + Returns: + bool: Whether the interface was successfully started. """ if self._cwd is not None: os.makedirs(self._cwd, exist_ok=True) @@ -101,6 +111,7 @@ def bootup( cwd=self._cwd, stdin=subprocess.DEVNULL, ) + return self.poll() def generate_command(self, command_lst: list[str]) -> list[str]: """ diff --git a/executorlib/task_scheduler/interactive/blockallocation.py b/executorlib/task_scheduler/interactive/blockallocation.py index 7955899d..d68ff4a1 100644 --- a/executorlib/task_scheduler/interactive/blockallocation.py +++ b/executorlib/task_scheduler/interactive/blockallocation.py @@ -12,7 +12,9 @@ from executorlib.standalone.interactive.spawner import BaseSpawner, MpiExecSpawner from executorlib.standalone.queue import cancel_items_in_queue from executorlib.task_scheduler.base import TaskSchedulerBase -from executorlib.task_scheduler.interactive.shared import execute_task_dict, task_done +from executorlib.task_scheduler.interactive.shared import execute_task_dict, task_done, reset_task_dict + +_interrupt_bootup_dict: dict = {} class BlockAllocationTaskScheduler(TaskSchedulerBase): @@ -63,11 +65,17 @@ def __init__( executor_kwargs["queue_join_on_shutdown"] = False self._process_kwargs = executor_kwargs self._max_workers = max_workers + self_id = id(self) + self._self_id = self_id + _interrupt_bootup_dict[self._self_id] = False self._set_process( process=[ Thread( target=_execute_multiple_tasks, - kwargs=executor_kwargs | {"worker_id": worker_id}, + kwargs=executor_kwargs | { + "worker_id": worker_id, + "stop_function": lambda: _interrupt_bootup_dict[self_id], + }, ) for worker_id in range(self._max_workers) ], @@ -147,22 +155,22 @@ def shutdown(self, wait: bool = True, *, cancel_futures: bool = False): methods can be called after this one. Args: - wait: If True then shutdown will not return until all running - futures have finished executing and the resources used by the - parallel_executors have been reclaimed. - cancel_futures: If True then shutdown will cancel all pending - futures. Futures that are completed or running will not be - cancelled. + wait (bool): If True then shutdown will not return until all running futures have finished executing and + the resources used by the parallel_executors have been reclaimed. + cancel_futures (bool): If True then shutdown will cancel all pending futures. Futures that are completed or + running will not be cancelled. """ if self._future_queue is not None: if cancel_futures: cancel_items_in_queue(que=self._future_queue) if isinstance(self._process, list): + _interrupt_bootup_dict[self._self_id] = True for _ in range(len(self._process)): self._future_queue.put({"shutdown": True, "wait": wait}) if wait: for process in self._process: process.join() + cancel_items_in_queue(que=self._future_queue) self._future_queue.join() self._process = None self._future_queue = None @@ -191,32 +199,34 @@ def _execute_multiple_tasks( log_obj_size: bool = False, error_log_file: Optional[str] = None, worker_id: Optional[int] = None, + stop_function: Optional[Callable] = None, **kwargs, ) -> None: """ Execute a single tasks in parallel using the message passing interface (MPI). Args: - future_queue (queue.Queue): task queue of dictionary objects which are submitted to the parallel process - cores (int): defines the total number of MPI ranks to use - spawner (BaseSpawner): Spawner to start process on selected compute resources - hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the - context of an HPC cluster this essential to be able to communicate to an - Executor running on a different compute node within the same allocation. And - in principle any computer should be able to resolve that their own hostname - points to the same address as localhost. Still MacOS >= 12 seems to disable - this look up for security reasons. So on MacOS it is required to set this - option to true - init_function (Callable): optional function to preset arguments for functions which are submitted later - cache_directory (str, optional): The directory to store cache files. Defaults to "executorlib_cache". - cache_key (str, optional): By default the cache_key is generated based on the function hash, this can be - overwritten by setting the cache_key. - queue_join_on_shutdown (bool): Join communication queue when thread is closed. Defaults to True. - log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. - error_log_file (str): Name of the error log file to use for storing exceptions raised by the Python functions - submitted to the Executor. - worker_id (int): Communicate the worker which ID was assigned to it for future reference and resource - distribution. + future_queue (queue.Queue): task queue of dictionary objects which are submitted to the parallel process + cores (int): defines the total number of MPI ranks to use + spawner (BaseSpawner): Spawner to start process on selected compute resources + hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the + context of an HPC cluster this essential to be able to communicate to an + Executor running on a different compute node within the same allocation. And + in principle any computer should be able to resolve that their own hostname + points to the same address as localhost. Still MacOS >= 12 seems to disable + this look up for security reasons. So on MacOS it is required to set this + option to true + init_function (Callable): optional function to preset arguments for functions which are submitted later + cache_directory (str, optional): The directory to store cache files. Defaults to "executorlib_cache". + cache_key (str, optional): By default the cache_key is generated based on the function hash, this can be + overwritten by setting the cache_key. + queue_join_on_shutdown (bool): Join communication queue when thread is closed. Defaults to True. + log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. + error_log_file (str): Name of the error log file to use for storing exceptions raised by the Python functions + submitted to the Executor. + worker_id (int): Communicate the worker which ID was assigned to it for future reference and resource + distribution. + stop_function (Callable): Function to stop the interface. """ interface = interface_bootup( command_lst=get_interactive_execute_command( @@ -226,22 +236,24 @@ def _execute_multiple_tasks( hostname_localhost=hostname_localhost, log_obj_size=log_obj_size, worker_id=worker_id, + stop_function=stop_function, ) - if init_function is not None: + if init_function is not None and interface is not None: interface.send_dict( input_dict={"init": True, "fn": init_function, "args": (), "kwargs": {}} ) while True: task_dict = future_queue.get() if "shutdown" in task_dict and task_dict["shutdown"]: - interface.shutdown(wait=task_dict["wait"]) + if interface is not None: + interface.shutdown(wait=task_dict["wait"]) task_done(future_queue=future_queue) if queue_join_on_shutdown: future_queue.join() break elif "fn" in task_dict and "future" in task_dict: f = task_dict.pop("future") - execute_task_dict( + result_flag = execute_task_dict( task_dict=task_dict, future_obj=f, interface=interface, @@ -249,4 +261,14 @@ def _execute_multiple_tasks( cache_key=cache_key, error_log_file=error_log_file, ) - task_done(future_queue=future_queue) + if not result_flag: + task_done(future_queue=future_queue) + reset_task_dict( + future_obj=f, future_queue=future_queue, task_dict=task_dict + ) + if interface is not None: + interface.restart() + else: + break + else: + task_done(future_queue=future_queue) diff --git a/executorlib/task_scheduler/interactive/onetoone.py b/executorlib/task_scheduler/interactive/onetoone.py index 3b631565..fc4cfb28 100644 --- a/executorlib/task_scheduler/interactive/onetoone.py +++ b/executorlib/task_scheduler/interactive/onetoone.py @@ -81,19 +81,19 @@ def _execute_single_task( Execute a single tasks in parallel using the message passing interface (MPI). Args: - future_queue (queue.Queue): task queue of dictionary objects which are submitted to the parallel process - spawner (BaseSpawner): Interface to start process on selected compute resources - max_cores (int): defines the number cores which can be used in parallel - max_workers (int): for backwards compatibility with the standard library, max_workers also defines the number of - cores which can be used in parallel - just like the max_cores parameter. Using max_cores is - recommended, as computers have a limited number of compute cores. - hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the - context of an HPC cluster this essential to be able to communicate to an - Executor running on a different compute node within the same allocation. And - in principle any computer should be able to resolve that their own hostname - points to the same address as localhost. Still MacOS >= 12 seems to disable - this look up for security reasons. So on MacOS it is required to set this - option to true + future_queue (queue.Queue): task queue of dictionary objects which are submitted to the parallel process + spawner (BaseSpawner): Interface to start process on selected compute resources + max_cores (int): defines the number cores which can be used in parallel + max_workers (int): for backwards compatibility with the standard library, max_workers also defines the number of + cores which can be used in parallel - just like the max_cores parameter. Using max_cores is + recommended, as computers have a limited number of compute cores. + hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the + context of an HPC cluster this essential to be able to communicate to an + Executor running on a different compute node within the same allocation. And + in principle any computer should be able to resolve that their own hostname + points to the same address as localhost. Still MacOS >= 12 seems to disable + this look up for security reasons. So on MacOS it is required to set this + option to true """ active_task_dict: dict = {} process_lst: list = [] @@ -230,7 +230,7 @@ def _execute_task_in_thread( error_log_file: Optional[str] = None, worker_id: Optional[int] = None, **kwargs, -) -> None: +) -> bool: """ Execute a single tasks in parallel using the message passing interface (MPI). @@ -255,8 +255,11 @@ def _execute_task_in_thread( submitted to the Executor. worker_id (int): Communicate the worker which ID was assigned to it for future reference and resource distribution. + + Returns: + bool: True if the task was submitted successfully, False otherwise. """ - execute_task_dict( + return execute_task_dict( task_dict=task_dict, future_obj=future_obj, interface=interface_bootup( diff --git a/executorlib/task_scheduler/interactive/shared.py b/executorlib/task_scheduler/interactive/shared.py index 68dd68d6..504dc5e2 100644 --- a/executorlib/task_scheduler/interactive/shared.py +++ b/executorlib/task_scheduler/interactive/shared.py @@ -3,20 +3,24 @@ import queue import time from concurrent.futures import Future +from concurrent.futures._base import PENDING from typing import Optional -from executorlib.standalone.interactive.communication import SocketInterface +from executorlib.standalone.interactive.communication import ( + ExecutorlibSocketError, + SocketInterface, +) from executorlib.standalone.serialize import serialize_funct def execute_task_dict( task_dict: dict, future_obj: Future, - interface: SocketInterface, + interface: Optional[SocketInterface] = None, cache_directory: Optional[str] = None, cache_key: Optional[str] = None, error_log_file: Optional[str] = None, -): +) -> bool: """ Execute the task in the task_dict by communicating it via the interface. @@ -30,22 +34,29 @@ def execute_task_dict( overwritten by setting the cache_key. error_log_file (str): Name of the error log file to use for storing exceptions raised by the Python functions submitted to the Executor. + + Returns: + bool: True if the task was submitted successfully, False otherwise. """ if not future_obj.done() and future_obj.set_running_or_notify_cancel(): if error_log_file is not None: task_dict["error_log_file"] = error_log_file - if cache_directory is None: - _execute_task_without_cache( + if cache_directory is None and interface is not None: + return _execute_task_without_cache( interface=interface, task_dict=task_dict, future_obj=future_obj ) - else: - _execute_task_with_cache( + elif cache_directory is not None and interface is not None: + return _execute_task_with_cache( interface=interface, task_dict=task_dict, cache_directory=cache_directory, cache_key=cache_key, future_obj=future_obj, ) + else: + return False + else: + return True def task_done(future_queue: queue.Queue): @@ -59,9 +70,27 @@ def task_done(future_queue: queue.Queue): future_queue.task_done() +def reset_task_dict(future_obj: Future, future_queue: queue.Queue, task_dict: dict): + """ + Reset the task dictionary for resubmission to the queue. + + Args: + future_obj (Future): A Future representing the given call. + future_queue (queue): Queue of task dictionaries waiting for execution. + task_dict (dict): task submitted to the executor as dictionary. This dictionary has the following keys + {"fn": Callable, "args": (), "kwargs": {}, "resource_dict": {}} + + Returns: + bool: True if the task was submitted successfully, False otherwise. + """ + future_obj._state = PENDING + task_done(future_queue=future_queue) + future_queue.put(task_dict | {"future": future_obj}) + + def _execute_task_without_cache( interface: SocketInterface, task_dict: dict, future_obj: Future -): +) -> bool: """ Execute the task in the task_dict by communicating it via the interface. @@ -70,12 +99,19 @@ def _execute_task_without_cache( task_dict (dict): task submitted to the executor as dictionary. This dictionary has the following keys {"fn": Callable, "args": (), "kwargs": {}, "resource_dict": {}} future_obj (Future): A Future representing the given call. + + Returns: + bool: True if the task was submitted successfully, False otherwise. """ try: future_obj.set_result(interface.send_and_receive_dict(input_dict=task_dict)) except Exception as thread_exception: - interface.shutdown(wait=True) - future_obj.set_exception(exception=thread_exception) + if isinstance(thread_exception, ExecutorlibSocketError): + return False + else: + interface.shutdown(wait=True) + future_obj.set_exception(exception=thread_exception) + return True def _execute_task_with_cache( @@ -84,7 +120,7 @@ def _execute_task_with_cache( future_obj: Future, cache_directory: str, cache_key: Optional[str] = None, -): +) -> bool: """ Execute the task in the task_dict by communicating it via the interface using the cache in the cache directory. @@ -116,8 +152,12 @@ def _execute_task_with_cache( dump(file_name=file_name, data_dict=data_dict) future_obj.set_result(result) except Exception as thread_exception: - interface.shutdown(wait=True) - future_obj.set_exception(exception=thread_exception) + if isinstance(thread_exception, ExecutorlibSocketError): + return False + else: + interface.shutdown(wait=True) + future_obj.set_exception(exception=thread_exception) else: _, _, result = get_output(file_name=file_name) future_obj.set_result(result) + return True From 924242964f5636faa6a51f8d26b38fbd019d4ac1 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sun, 31 Aug 2025 20:41:32 +0000 Subject: [PATCH 02/29] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- executorlib/standalone/interactive/communication.py | 4 ++-- .../task_scheduler/interactive/blockallocation.py | 9 +++++++-- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/executorlib/standalone/interactive/communication.py b/executorlib/standalone/interactive/communication.py index c2a34b12..85e47997 100644 --- a/executorlib/standalone/interactive/communication.py +++ b/executorlib/standalone/interactive/communication.py @@ -139,8 +139,8 @@ def restart(self) -> bool: bool: Whether the interface was successfully started. """ if not self._spawner.bootup( - command_lst=self._command_lst, - stop_function=self._stop_function, + command_lst=self._command_lst, + stop_function=self._stop_function, ): self._reset_socket() return False diff --git a/executorlib/task_scheduler/interactive/blockallocation.py b/executorlib/task_scheduler/interactive/blockallocation.py index d68ff4a1..df6105cb 100644 --- a/executorlib/task_scheduler/interactive/blockallocation.py +++ b/executorlib/task_scheduler/interactive/blockallocation.py @@ -12,7 +12,11 @@ from executorlib.standalone.interactive.spawner import BaseSpawner, MpiExecSpawner from executorlib.standalone.queue import cancel_items_in_queue from executorlib.task_scheduler.base import TaskSchedulerBase -from executorlib.task_scheduler.interactive.shared import execute_task_dict, task_done, reset_task_dict +from executorlib.task_scheduler.interactive.shared import ( + execute_task_dict, + reset_task_dict, + task_done, +) _interrupt_bootup_dict: dict = {} @@ -72,7 +76,8 @@ def __init__( process=[ Thread( target=_execute_multiple_tasks, - kwargs=executor_kwargs | { + kwargs=executor_kwargs + | { "worker_id": worker_id, "stop_function": lambda: _interrupt_bootup_dict[self_id], }, From 7ce8719556c1490a92e955f4e27f05e708623014 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Sun, 31 Aug 2025 22:47:47 +0200 Subject: [PATCH 03/29] fix flux interface --- .../task_scheduler/interactive/spawner_flux.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/executorlib/task_scheduler/interactive/spawner_flux.py b/executorlib/task_scheduler/interactive/spawner_flux.py index 5a35dd5c..3c061a41 100644 --- a/executorlib/task_scheduler/interactive/spawner_flux.py +++ b/executorlib/task_scheduler/interactive/spawner_flux.py @@ -1,5 +1,5 @@ import os -from typing import Optional +from typing import Callable, Optional import flux import flux.job @@ -75,14 +75,20 @@ def __init__( def bootup( self, command_lst: list[str], - ): + stop_function: Optional[Callable] = None, + ) -> bool: """ Boot up the client process to connect to the SocketInterface. Args: command_lst (list[str]): List of strings to start the client process. + stop_function (Callable): Function to stop the interface. + Raises: ValueError: If oversubscribing is not supported for the Flux adapter or if conda environments are not supported. + + Returns: + bool: Whether the interface was successfully started. """ if self._openmpi_oversubscribe: raise ValueError( @@ -126,6 +132,7 @@ def bootup( ) else: self._future = self._flux_executor.submit(jobspec=jobspec) + return self.poll() def shutdown(self, wait: bool = True): """ From f22591b84dab3e3bd1463148c5f3db7af36ec698 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Sun, 7 Sep 2025 11:47:15 +0200 Subject: [PATCH 04/29] sync changes --- .../standalone/interactive/communication.py | 32 ++----- .../interactive/blockallocation.py | 88 +++++++++++-------- .../task_scheduler/interactive/onetoone.py | 2 +- .../task_scheduler/interactive/shared.py | 2 +- 4 files changed, 63 insertions(+), 61 deletions(-) diff --git a/executorlib/standalone/interactive/communication.py b/executorlib/standalone/interactive/communication.py index 85e47997..41052cc6 100644 --- a/executorlib/standalone/interactive/communication.py +++ b/executorlib/standalone/interactive/communication.py @@ -43,7 +43,6 @@ def __init__( self._logger = logging.getLogger("executorlib") self._spawner = spawner self._command_lst: list[str] = [] - self._stop_function: Optional[Callable] = None def send_dict(self, input_dict: dict): """ @@ -107,7 +106,7 @@ def bind_to_random_port(self) -> int: def bootup( self, - command_lst: list[str], + command_lst: Optional[list[str]] = None, stop_function: Optional[Callable] = None, ) -> bool: """ @@ -120,8 +119,10 @@ def bootup( Returns: bool: Whether the interface was successfully started. """ - self._command_lst = command_lst - self._stop_function = stop_function + if command_lst is None and len(self._command_lst) > 0: + command_lst = self._command_lst + else: + self._command_lst = command_lst if not self._spawner.bootup( command_lst=command_lst, stop_function=stop_function, @@ -131,21 +132,6 @@ def bootup( else: return True - def restart(self) -> bool: - """ - Restart the client process to connect to the SocketInterface. - - Returns: - bool: Whether the interface was successfully started. - """ - if not self._spawner.bootup( - command_lst=self._command_lst, - stop_function=self._stop_function, - ): - self._reset_socket() - return False - return True - def shutdown(self, wait: bool = True): """ Shutdown the SocketInterface and the connected client process. @@ -189,7 +175,7 @@ def interface_bootup( log_obj_size: bool = False, worker_id: Optional[int] = None, stop_function: Optional[Callable] = None, -) -> Optional[SocketInterface]: +) -> tuple[SocketInterface, bool]: """ Start interface for ZMQ communication @@ -210,7 +196,7 @@ def interface_bootup( stop_function (Callable): Function to stop the interface. Returns: - executorlib.shared.communication.SocketInterface: socket interface for zmq communication + executorlib.shared.communication.SocketInterface, bool: socket interface for zmq communication, success flag """ if hostname_localhost is None and sys.platform != "darwin": hostname_localhost = False @@ -233,9 +219,9 @@ def interface_bootup( command_lst=command_lst, stop_function=stop_function, ): - return interface + return interface, True else: - return None + return interface, False def interface_connect(host: str, port: str) -> tuple[zmq.Context, zmq.Socket]: diff --git a/executorlib/task_scheduler/interactive/blockallocation.py b/executorlib/task_scheduler/interactive/blockallocation.py index b44c1644..332bd8a2 100644 --- a/executorlib/task_scheduler/interactive/blockallocation.py +++ b/executorlib/task_scheduler/interactive/blockallocation.py @@ -8,7 +8,7 @@ check_resource_dict, check_resource_dict_is_empty, ) -from executorlib.standalone.interactive.communication import interface_bootup +from executorlib.standalone.interactive.communication import interface_bootup, SocketInterface, ExecutorlibSocketError from executorlib.standalone.interactive.spawner import BaseSpawner, MpiExecSpawner from executorlib.standalone.queue import cancel_items_in_queue from executorlib.task_scheduler.base import TaskSchedulerBase @@ -234,7 +234,7 @@ def _execute_multiple_tasks( distribution. stop_function (Callable): Function to stop the interface. """ - interface = interface_bootup( + interface, interface_bootup_flag = interface_bootup( command_lst=get_interactive_execute_command( cores=cores, ), @@ -244,44 +244,60 @@ def _execute_multiple_tasks( worker_id=worker_id, stop_function=stop_function, ) + interface_initialization_exception = set_init_function( + interface=interface, + interface_bootup_flag=interface_bootup_flag, + init_function=init_function, + ) + restart_counter = 0 + restart_limit = 2 + while True: + if not interface_bootup_flag and restart_counter > restart_limit: + interface_bootup_flag = True # no more restarts + interface_initialization_exception = ExecutorlibSocketError() + elif not interface_bootup_flag: + interface_bootup_flag = interface.bootup( + stop_function=stop_function, + ) + interface_initialization_exception = set_init_function( + interface=interface, + interface_bootup_flag=interface_bootup_flag, + init_function=init_function, + ) + restart_counter += 1 + else: # interface_bootup_flag = True + task_dict = future_queue.get() + if "shutdown" in task_dict and task_dict["shutdown"]: + if interface is not None: + interface.shutdown(wait=task_dict["wait"]) + task_done(future_queue=future_queue) + if queue_join_on_shutdown: + future_queue.join() + break + elif "fn" in task_dict and "future" in task_dict: + f = task_dict.pop("future") + if interface_initialization_exception is not None: + f.set_exception(exception=interface_initialization_exception) + else: + # The interface failed during the execution + interface_bootup_flag = execute_task_dict( + task_dict=task_dict, + future_obj=f, + interface=interface, + cache_directory=cache_directory, + cache_key=cache_key, + error_log_file=error_log_file, + ) + task_done(future_queue=future_queue) + + +def set_init_function(interface: SocketInterface, interface_bootup_flag: bool, init_function: callable) -> Optional[Exception]: interface_initialization_exception = None - if init_function is not None and interface is not None: + if init_function is not None and interface_bootup_flag: try: _ = interface.send_and_receive_dict( input_dict={"init": True, "fn": init_function, "args": (), "kwargs": {}} ) except Exception as init_exception: interface_initialization_exception = init_exception - while True: - task_dict = future_queue.get() - if "shutdown" in task_dict and task_dict["shutdown"]: - if interface is not None: - interface.shutdown(wait=task_dict["wait"]) - task_done(future_queue=future_queue) - if queue_join_on_shutdown: - future_queue.join() - break - elif "fn" in task_dict and "future" in task_dict: - f = task_dict.pop("future") - if interface_initialization_exception is not None: - f.set_exception(exception=interface_initialization_exception) - else: - result_flag = execute_task_dict( - task_dict=task_dict, - future_obj=f, - interface=interface, - cache_directory=cache_directory, - cache_key=cache_key, - error_log_file=error_log_file, - ) - if not result_flag: - task_done(future_queue=future_queue) - reset_task_dict( - future_obj=f, future_queue=future_queue, task_dict=task_dict - ) - if interface is not None: - interface.restart() - else: - break - else: - task_done(future_queue=future_queue) + return interface_initialization_exception diff --git a/executorlib/task_scheduler/interactive/onetoone.py b/executorlib/task_scheduler/interactive/onetoone.py index daffcd59..b1900fb4 100644 --- a/executorlib/task_scheduler/interactive/onetoone.py +++ b/executorlib/task_scheduler/interactive/onetoone.py @@ -270,7 +270,7 @@ def _execute_task_in_thread( hostname_localhost=hostname_localhost, log_obj_size=log_obj_size, worker_id=worker_id, - ), + )[0], cache_directory=cache_directory, cache_key=cache_key, error_log_file=error_log_file, diff --git a/executorlib/task_scheduler/interactive/shared.py b/executorlib/task_scheduler/interactive/shared.py index 504dc5e2..74093016 100644 --- a/executorlib/task_scheduler/interactive/shared.py +++ b/executorlib/task_scheduler/interactive/shared.py @@ -16,7 +16,7 @@ def execute_task_dict( task_dict: dict, future_obj: Future, - interface: Optional[SocketInterface] = None, + interface: SocketInterface, cache_directory: Optional[str] = None, cache_key: Optional[str] = None, error_log_file: Optional[str] = None, From 77c60a4b5640ff4f33278e13afda14dd7eba3f1b Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sun, 7 Sep 2025 09:47:24 +0000 Subject: [PATCH 05/29] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- .../interactive/blockallocation.py | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/executorlib/task_scheduler/interactive/blockallocation.py b/executorlib/task_scheduler/interactive/blockallocation.py index 332bd8a2..1daf1691 100644 --- a/executorlib/task_scheduler/interactive/blockallocation.py +++ b/executorlib/task_scheduler/interactive/blockallocation.py @@ -8,13 +8,16 @@ check_resource_dict, check_resource_dict_is_empty, ) -from executorlib.standalone.interactive.communication import interface_bootup, SocketInterface, ExecutorlibSocketError +from executorlib.standalone.interactive.communication import ( + ExecutorlibSocketError, + SocketInterface, + interface_bootup, +) from executorlib.standalone.interactive.spawner import BaseSpawner, MpiExecSpawner from executorlib.standalone.queue import cancel_items_in_queue from executorlib.task_scheduler.base import TaskSchedulerBase from executorlib.task_scheduler.interactive.shared import ( execute_task_dict, - reset_task_dict, task_done, ) @@ -245,8 +248,8 @@ def _execute_multiple_tasks( stop_function=stop_function, ) interface_initialization_exception = set_init_function( - interface=interface, - interface_bootup_flag=interface_bootup_flag, + interface=interface, + interface_bootup_flag=interface_bootup_flag, init_function=init_function, ) restart_counter = 0 @@ -260,8 +263,8 @@ def _execute_multiple_tasks( stop_function=stop_function, ) interface_initialization_exception = set_init_function( - interface=interface, - interface_bootup_flag=interface_bootup_flag, + interface=interface, + interface_bootup_flag=interface_bootup_flag, init_function=init_function, ) restart_counter += 1 @@ -291,7 +294,9 @@ def _execute_multiple_tasks( task_done(future_queue=future_queue) -def set_init_function(interface: SocketInterface, interface_bootup_flag: bool, init_function: callable) -> Optional[Exception]: +def set_init_function( + interface: SocketInterface, interface_bootup_flag: bool, init_function: callable +) -> Optional[Exception]: interface_initialization_exception = None if init_function is not None and interface_bootup_flag: try: From dd923388edf9b69f82846d156beebbe588b9bbc7 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Sun, 7 Sep 2025 11:51:13 +0200 Subject: [PATCH 06/29] fix type hints --- executorlib/standalone/interactive/communication.py | 2 +- executorlib/task_scheduler/interactive/blockallocation.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/executorlib/standalone/interactive/communication.py b/executorlib/standalone/interactive/communication.py index 41052cc6..72b8ad31 100644 --- a/executorlib/standalone/interactive/communication.py +++ b/executorlib/standalone/interactive/communication.py @@ -123,7 +123,7 @@ def bootup( command_lst = self._command_lst else: self._command_lst = command_lst - if not self._spawner.bootup( + if command_lst is not None and not self._spawner.bootup( command_lst=command_lst, stop_function=stop_function, ): diff --git a/executorlib/task_scheduler/interactive/blockallocation.py b/executorlib/task_scheduler/interactive/blockallocation.py index 1daf1691..33fb71bf 100644 --- a/executorlib/task_scheduler/interactive/blockallocation.py +++ b/executorlib/task_scheduler/interactive/blockallocation.py @@ -295,7 +295,7 @@ def _execute_multiple_tasks( def set_init_function( - interface: SocketInterface, interface_bootup_flag: bool, init_function: callable + interface: SocketInterface, interface_bootup_flag: bool, init_function: Callable ) -> Optional[Exception]: interface_initialization_exception = None if init_function is not None and interface_bootup_flag: From d25859d0ab7d777f6d5d91d43a45413b000dd69f Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Sun, 7 Sep 2025 11:58:11 +0200 Subject: [PATCH 07/29] type fixes --- .../standalone/interactive/communication.py | 17 ++++++++++------- .../interactive/blockallocation.py | 2 +- 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/executorlib/standalone/interactive/communication.py b/executorlib/standalone/interactive/communication.py index 72b8ad31..a9909a5b 100644 --- a/executorlib/standalone/interactive/communication.py +++ b/executorlib/standalone/interactive/communication.py @@ -123,14 +123,17 @@ def bootup( command_lst = self._command_lst else: self._command_lst = command_lst - if command_lst is not None and not self._spawner.bootup( - command_lst=command_lst, - stop_function=stop_function, - ): - self._reset_socket() - return False + if command_lst is not None: + if not self._spawner.bootup( + command_lst=command_lst, + stop_function=stop_function, + ): + self._reset_socket() + return False + else: + return True else: - return True + return False def shutdown(self, wait: bool = True): """ diff --git a/executorlib/task_scheduler/interactive/blockallocation.py b/executorlib/task_scheduler/interactive/blockallocation.py index 33fb71bf..8e2d6604 100644 --- a/executorlib/task_scheduler/interactive/blockallocation.py +++ b/executorlib/task_scheduler/interactive/blockallocation.py @@ -295,7 +295,7 @@ def _execute_multiple_tasks( def set_init_function( - interface: SocketInterface, interface_bootup_flag: bool, init_function: Callable + interface: SocketInterface, interface_bootup_flag: bool = True, init_function: Optional[Callable] = None ) -> Optional[Exception]: interface_initialization_exception = None if init_function is not None and interface_bootup_flag: From b2ee7ed9d5f43baf08599869c4e361e5fffa4b00 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sun, 7 Sep 2025 09:58:33 +0000 Subject: [PATCH 08/29] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- executorlib/task_scheduler/interactive/blockallocation.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/executorlib/task_scheduler/interactive/blockallocation.py b/executorlib/task_scheduler/interactive/blockallocation.py index 8e2d6604..4a98c499 100644 --- a/executorlib/task_scheduler/interactive/blockallocation.py +++ b/executorlib/task_scheduler/interactive/blockallocation.py @@ -295,7 +295,9 @@ def _execute_multiple_tasks( def set_init_function( - interface: SocketInterface, interface_bootup_flag: bool = True, init_function: Optional[Callable] = None + interface: SocketInterface, + interface_bootup_flag: bool = True, + init_function: Optional[Callable] = None, ) -> Optional[Exception]: interface_initialization_exception = None if init_function is not None and interface_bootup_flag: From 8fc6d0436372639d6f357f2909959d16326c1be3 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Sun, 7 Sep 2025 12:03:31 +0200 Subject: [PATCH 09/29] fixes --- .../standalone/interactive/communication.py | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/executorlib/standalone/interactive/communication.py b/executorlib/standalone/interactive/communication.py index a9909a5b..f2785ade 100644 --- a/executorlib/standalone/interactive/communication.py +++ b/executorlib/standalone/interactive/communication.py @@ -121,19 +121,18 @@ def bootup( """ if command_lst is None and len(self._command_lst) > 0: command_lst = self._command_lst - else: + elif command_lst is not None: self._command_lst = command_lst - if command_lst is not None: - if not self._spawner.bootup( - command_lst=command_lst, - stop_function=stop_function, - ): - self._reset_socket() - return False - else: - return True else: + raise ValueError() + if command_lst is not None and not self._spawner.bootup( + command_lst=command_lst, + stop_function=stop_function, + ): + self._reset_socket() return False + else: + return True def shutdown(self, wait: bool = True): """ From 357263cd2a474cc1634497bc84560bf5bb32c56e Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Sun, 7 Sep 2025 12:44:08 +0200 Subject: [PATCH 10/29] remove shutdown --- executorlib/task_scheduler/interactive/shared.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/executorlib/task_scheduler/interactive/shared.py b/executorlib/task_scheduler/interactive/shared.py index 74093016..11416d98 100644 --- a/executorlib/task_scheduler/interactive/shared.py +++ b/executorlib/task_scheduler/interactive/shared.py @@ -109,7 +109,6 @@ def _execute_task_without_cache( if isinstance(thread_exception, ExecutorlibSocketError): return False else: - interface.shutdown(wait=True) future_obj.set_exception(exception=thread_exception) return True @@ -155,7 +154,6 @@ def _execute_task_with_cache( if isinstance(thread_exception, ExecutorlibSocketError): return False else: - interface.shutdown(wait=True) future_obj.set_exception(exception=thread_exception) else: _, _, result = get_output(file_name=file_name) From c1fbfc42e723cdfa0162c4595c4af0ca189a5848 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Sun, 7 Sep 2025 16:20:03 +0200 Subject: [PATCH 11/29] fixes --- .../task_scheduler/interactive/blockallocation.py | 12 +++++++----- executorlib/task_scheduler/interactive/onetoone.py | 7 ++++--- executorlib/task_scheduler/interactive/shared.py | 6 ++---- 3 files changed, 13 insertions(+), 12 deletions(-) diff --git a/executorlib/task_scheduler/interactive/blockallocation.py b/executorlib/task_scheduler/interactive/blockallocation.py index 4a98c499..d1c45991 100644 --- a/executorlib/task_scheduler/interactive/blockallocation.py +++ b/executorlib/task_scheduler/interactive/blockallocation.py @@ -19,6 +19,7 @@ from executorlib.task_scheduler.interactive.shared import ( execute_task_dict, task_done, + reset_task_dict, ) _interrupt_bootup_dict: dict = {} @@ -179,7 +180,6 @@ def shutdown(self, wait: bool = True, *, cancel_futures: bool = False): if wait: for process in self._process: process.join() - cancel_items_in_queue(que=self._future_queue) self._future_queue.join() self._process = None self._future_queue = None @@ -247,7 +247,7 @@ def _execute_multiple_tasks( worker_id=worker_id, stop_function=stop_function, ) - interface_initialization_exception = set_init_function( + interface_initialization_exception = _set_init_function( interface=interface, interface_bootup_flag=interface_bootup_flag, init_function=init_function, @@ -262,7 +262,7 @@ def _execute_multiple_tasks( interface_bootup_flag = interface.bootup( stop_function=stop_function, ) - interface_initialization_exception = set_init_function( + interface_initialization_exception = _set_init_function( interface=interface, interface_bootup_flag=interface_bootup_flag, init_function=init_function, @@ -291,10 +291,12 @@ def _execute_multiple_tasks( cache_key=cache_key, error_log_file=error_log_file, ) - task_done(future_queue=future_queue) + if not interface_bootup_flag: + reset_task_dict(future_obj=f, future_queue=future_queue, task_dict=task_dict) + task_done(future_queue=future_queue) -def set_init_function( +def _set_init_function( interface: SocketInterface, interface_bootup_flag: bool = True, init_function: Optional[Callable] = None, diff --git a/executorlib/task_scheduler/interactive/onetoone.py b/executorlib/task_scheduler/interactive/onetoone.py index b1900fb4..2cf67cbc 100644 --- a/executorlib/task_scheduler/interactive/onetoone.py +++ b/executorlib/task_scheduler/interactive/onetoone.py @@ -4,7 +4,7 @@ from typing import Optional from executorlib.standalone.command import get_interactive_execute_command -from executorlib.standalone.interactive.communication import interface_bootup +from executorlib.standalone.interactive.communication import interface_bootup, ExecutorlibSocketError from executorlib.standalone.interactive.spawner import BaseSpawner, MpiExecSpawner from executorlib.task_scheduler.base import TaskSchedulerBase from executorlib.task_scheduler.interactive.shared import execute_task_dict @@ -259,7 +259,7 @@ def _execute_task_in_thread( Returns: bool: True if the task was submitted successfully, False otherwise. """ - return execute_task_dict( + if not execute_task_dict( task_dict=task_dict, future_obj=future_obj, interface=interface_bootup( @@ -274,4 +274,5 @@ def _execute_task_in_thread( cache_directory=cache_directory, cache_key=cache_key, error_log_file=error_log_file, - ) + ): + future_obj.set_exception(ExecutorlibSocketError()) diff --git a/executorlib/task_scheduler/interactive/shared.py b/executorlib/task_scheduler/interactive/shared.py index 11416d98..e4084222 100644 --- a/executorlib/task_scheduler/interactive/shared.py +++ b/executorlib/task_scheduler/interactive/shared.py @@ -41,11 +41,11 @@ def execute_task_dict( if not future_obj.done() and future_obj.set_running_or_notify_cancel(): if error_log_file is not None: task_dict["error_log_file"] = error_log_file - if cache_directory is None and interface is not None: + if cache_directory is None: return _execute_task_without_cache( interface=interface, task_dict=task_dict, future_obj=future_obj ) - elif cache_directory is not None and interface is not None: + else: return _execute_task_with_cache( interface=interface, task_dict=task_dict, @@ -53,8 +53,6 @@ def execute_task_dict( cache_key=cache_key, future_obj=future_obj, ) - else: - return False else: return True From 37868801d6707492e8ea9d293dc8094567e789e6 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sun, 7 Sep 2025 14:20:13 +0000 Subject: [PATCH 12/29] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- executorlib/task_scheduler/interactive/blockallocation.py | 6 ++++-- executorlib/task_scheduler/interactive/onetoone.py | 5 ++++- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/executorlib/task_scheduler/interactive/blockallocation.py b/executorlib/task_scheduler/interactive/blockallocation.py index d1c45991..37621184 100644 --- a/executorlib/task_scheduler/interactive/blockallocation.py +++ b/executorlib/task_scheduler/interactive/blockallocation.py @@ -18,8 +18,8 @@ from executorlib.task_scheduler.base import TaskSchedulerBase from executorlib.task_scheduler.interactive.shared import ( execute_task_dict, - task_done, reset_task_dict, + task_done, ) _interrupt_bootup_dict: dict = {} @@ -292,7 +292,9 @@ def _execute_multiple_tasks( error_log_file=error_log_file, ) if not interface_bootup_flag: - reset_task_dict(future_obj=f, future_queue=future_queue, task_dict=task_dict) + reset_task_dict( + future_obj=f, future_queue=future_queue, task_dict=task_dict + ) task_done(future_queue=future_queue) diff --git a/executorlib/task_scheduler/interactive/onetoone.py b/executorlib/task_scheduler/interactive/onetoone.py index 2cf67cbc..27e064ef 100644 --- a/executorlib/task_scheduler/interactive/onetoone.py +++ b/executorlib/task_scheduler/interactive/onetoone.py @@ -4,7 +4,10 @@ from typing import Optional from executorlib.standalone.command import get_interactive_execute_command -from executorlib.standalone.interactive.communication import interface_bootup, ExecutorlibSocketError +from executorlib.standalone.interactive.communication import ( + ExecutorlibSocketError, + interface_bootup, +) from executorlib.standalone.interactive.spawner import BaseSpawner, MpiExecSpawner from executorlib.task_scheduler.base import TaskSchedulerBase from executorlib.task_scheduler.interactive.shared import execute_task_dict From 7d07722182ae36808933cc3297821645ab9193a3 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Sun, 7 Sep 2025 16:22:36 +0200 Subject: [PATCH 13/29] type fixes --- executorlib/task_scheduler/interactive/onetoone.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/executorlib/task_scheduler/interactive/onetoone.py b/executorlib/task_scheduler/interactive/onetoone.py index 2cf67cbc..788fddf0 100644 --- a/executorlib/task_scheduler/interactive/onetoone.py +++ b/executorlib/task_scheduler/interactive/onetoone.py @@ -230,7 +230,7 @@ def _execute_task_in_thread( error_log_file: Optional[str] = None, worker_id: Optional[int] = None, **kwargs, -) -> bool: +): """ Execute a single tasks in parallel using the message passing interface (MPI). @@ -255,9 +255,6 @@ def _execute_task_in_thread( submitted to the Executor. worker_id (int): Communicate the worker which ID was assigned to it for future reference and resource distribution. - - Returns: - bool: True if the task was submitted successfully, False otherwise. """ if not execute_task_dict( task_dict=task_dict, From 355d94152f52617b026da2e3e9139ab65a52da10 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Sun, 7 Sep 2025 16:52:12 +0200 Subject: [PATCH 14/29] add tests --- .../test_task_scheduler_interactive_shared.py | 175 ++++++++++++++++++ 1 file changed, 175 insertions(+) create mode 100644 tests/test_task_scheduler_interactive_shared.py diff --git a/tests/test_task_scheduler_interactive_shared.py b/tests/test_task_scheduler_interactive_shared.py new file mode 100644 index 00000000..2585ff90 --- /dev/null +++ b/tests/test_task_scheduler_interactive_shared.py @@ -0,0 +1,175 @@ +import shutil +from concurrent.futures import Future +from unittest import TestCase + +from executorlib.standalone.command import get_interactive_execute_command +from executorlib.standalone.interactive.communication import interface_bootup, ExecutorlibSocketError +from executorlib.standalone.interactive.spawner import SubprocessSpawner +from executorlib.task_scheduler.interactive.shared import execute_task_dict + + +def get_error(): + raise ExecutorlibSocketError() + + +class TestExecuteTaskDictWithoutCache(TestCase): + def test_execute_task_sum(self): + f = Future() + interface, success_flag = interface_bootup( + command_lst=get_interactive_execute_command( + cores=1, + ), + connections=SubprocessSpawner(), + hostname_localhost=True, + log_obj_size=False, + worker_id=1, + stop_function=None, + ) + self.assertTrue(success_flag) + self.assertFalse(f.done()) + result = execute_task_dict( + task_dict={"fn": sum, "args": ([1, 2], ), "kwargs": {}}, + future_obj=f, + interface=interface, + cache_directory=None, + cache_key=None, + error_log_file=None, + ) + self.assertTrue(result) + self.assertTrue(f.done()) + self.assertEqual(f.result(), 3) + + def test_execute_task_done(self): + f = Future() + f.set_result(5) + interface, success_flag = interface_bootup( + command_lst=get_interactive_execute_command( + cores=1, + ), + connections=SubprocessSpawner(), + hostname_localhost=True, + log_obj_size=False, + worker_id=1, + stop_function=None, + ) + self.assertTrue(success_flag) + self.assertTrue(f.done()) + result = execute_task_dict( + task_dict={"fn": sum, "args": ([1, 2], ), "kwargs": {}}, + future_obj=f, + interface=interface, + cache_directory=None, + cache_key=None, + error_log_file=None, + ) + self.assertTrue(result) + self.assertTrue(f.done()) + self.assertEqual(f.result(), 5) + + def test_execute_task_error(self): + f = Future() + interface, success_flag = interface_bootup( + command_lst=get_interactive_execute_command( + cores=1, + ), + connections=SubprocessSpawner(), + hostname_localhost=True, + log_obj_size=False, + worker_id=1, + stop_function=None, + ) + self.assertTrue(success_flag) + self.assertFalse(f.done()) + result = execute_task_dict( + task_dict={"fn": get_error, "args": (), "kwargs": {}}, + future_obj=f, + interface=interface, + cache_directory=None, + cache_key=None, + error_log_file=None, + ) + self.assertFalse(result) + self.assertFalse(f.done()) + + +class TestExecuteTaskDictWithCache(TestCase): + def tearDown(self): + shutil.rmtree("cache_execute_task", ignore_errors=True) + + def test_execute_task_sum(self): + f = Future() + interface, success_flag = interface_bootup( + command_lst=get_interactive_execute_command( + cores=1, + ), + connections=SubprocessSpawner(), + hostname_localhost=True, + log_obj_size=False, + worker_id=1, + stop_function=None, + ) + self.assertTrue(success_flag) + self.assertFalse(f.done()) + result = execute_task_dict( + task_dict={"fn": sum, "args": ([1, 2], ), "kwargs": {}}, + future_obj=f, + interface=interface, + cache_directory="cache_execute_task", + cache_key=None, + error_log_file=None, + ) + self.assertTrue(result) + self.assertTrue(f.done()) + self.assertEqual(f.result(), 3) + + def test_execute_task_done(self): + f = Future() + f.set_result(5) + interface, success_flag = interface_bootup( + command_lst=get_interactive_execute_command( + cores=1, + ), + connections=SubprocessSpawner(), + hostname_localhost=True, + log_obj_size=False, + worker_id=1, + stop_function=None, + ) + self.assertTrue(success_flag) + self.assertTrue(f.done()) + result = execute_task_dict( + task_dict={"fn": sum, "args": ([1, 2], ), "kwargs": {}}, + future_obj=f, + interface=interface, + cache_directory="cache_execute_task", + cache_key=None, + error_log_file=None, + ) + self.assertTrue(result) + self.assertTrue(f.done()) + self.assertEqual(f.result(), 5) + + def test_execute_task_error(self): + f = Future() + interface, success_flag = interface_bootup( + command_lst=get_interactive_execute_command( + cores=1, + ), + connections=SubprocessSpawner(), + hostname_localhost=True, + log_obj_size=False, + worker_id=1, + stop_function=None, + ) + self.assertTrue(success_flag) + self.assertFalse(f.done()) + result = execute_task_dict( + task_dict={"fn": get_error, "args": (), "kwargs": {}}, + future_obj=f, + interface=interface, + cache_directory="cache_execute_task", + cache_key=None, + error_log_file=None, + ) + self.assertFalse(result) + self.assertFalse(f.done()) \ No newline at end of file From 1ab9fca0e1380aecdafa57b4c510521ec2cd72ea Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Sun, 7 Sep 2025 16:59:05 +0200 Subject: [PATCH 15/29] more tests --- ...st_standalone_interactive_communication.py | 23 +++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/tests/test_standalone_interactive_communication.py b/tests/test_standalone_interactive_communication.py index 2a1f410a..8b6ceaaf 100644 --- a/tests/test_standalone_interactive_communication.py +++ b/tests/test_standalone_interactive_communication.py @@ -3,6 +3,7 @@ import sys import unittest from time import sleep +from typing import Callable, Optional import numpy as np import zmq @@ -26,6 +27,10 @@ def calc(i): return np.array(i**2) +class BrokenSpawner(MpiExecSpawner): + def bootup(self, command_lst: list[str], stop_function: Optional[Callable] = None,): + return False + class TestInterface(unittest.TestCase): @unittest.skipIf( skip_mpi4py_test, "mpi4py is not installed, so the mpi4py tests are skipped." @@ -132,6 +137,24 @@ def test_interface_serial_with_error(self): self.assertFalse(interface._spawner.poll()) interface.shutdown(wait=True) + def test_interface_serial_wrong_input(self): + cloudpickle_register(ind=1) + interface = SocketInterface( + spawner=MpiExecSpawner(cwd=None, cores=1, openmpi_oversubscribe=False), + log_obj_size=True, + ) + with self.assertRaises(ValueError): + interface.bootup(command_lst=None) + + def test_interface_serial_with_broken_spawner(self): + cloudpickle_register(ind=1) + interface = SocketInterface( + spawner=BrokenSpawner(cwd=None, cores=1, openmpi_oversubscribe=False), + log_obj_size=True, + ) + success_flag = interface.bootup(command_lst=["bash", "exit"]) + self.assertFalse(success_flag) + def test_interface_serial_with_stopped_process(self): cloudpickle_register(ind=1) task_dict = {"fn": calc, "args": (), "kwargs": {"i": 2}} From 5ddaf7ba405ca6803d0c5d38136f4a94b228bea9 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Sun, 7 Sep 2025 17:05:05 +0200 Subject: [PATCH 16/29] fix minimal --- tests/test_task_scheduler_interactive_shared.py | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/tests/test_task_scheduler_interactive_shared.py b/tests/test_task_scheduler_interactive_shared.py index 2585ff90..ae176e09 100644 --- a/tests/test_task_scheduler_interactive_shared.py +++ b/tests/test_task_scheduler_interactive_shared.py @@ -1,18 +1,25 @@ import shutil from concurrent.futures import Future -from unittest import TestCase +import unittest from executorlib.standalone.command import get_interactive_execute_command from executorlib.standalone.interactive.communication import interface_bootup, ExecutorlibSocketError from executorlib.standalone.interactive.spawner import SubprocessSpawner from executorlib.task_scheduler.interactive.shared import execute_task_dict +try: + import h5py + + skip_h5py_test = False +except ImportError: + skip_h5py_test = True + def get_error(): raise ExecutorlibSocketError() -class TestExecuteTaskDictWithoutCache(TestCase): +class TestExecuteTaskDictWithoutCache(unittest.TestCase): def test_execute_task_sum(self): f = Future() interface, success_flag = interface_bootup( @@ -92,7 +99,10 @@ def test_execute_task_error(self): self.assertFalse(f.done()) -class TestExecuteTaskDictWithCache(TestCase): +@unittest.skipIf( + skip_h5py_test, "h5py is not installed, so the h5io tests are skipped." +) +class TestExecuteTaskDictWithCache(unittest.TestCase): def tearDown(self): shutil.rmtree("cache_execute_task", ignore_errors=True) From c4d44b853870526ca0abe65bbdd105abc007974a Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Sun, 7 Sep 2025 17:32:17 +0200 Subject: [PATCH 17/29] fix interface --- .../standalone/interactive/communication.py | 34 +++++++++++-------- .../interactive/blockallocation.py | 14 ++++---- .../task_scheduler/interactive/onetoone.py | 2 +- ...st_standalone_interactive_communication.py | 24 ++++++------- 4 files changed, 39 insertions(+), 35 deletions(-) diff --git a/executorlib/standalone/interactive/communication.py b/executorlib/standalone/interactive/communication.py index f2785ade..840dc844 100644 --- a/executorlib/standalone/interactive/communication.py +++ b/executorlib/standalone/interactive/communication.py @@ -43,6 +43,15 @@ def __init__( self._logger = logging.getLogger("executorlib") self._spawner = spawner self._command_lst: list[str] = [] + self._booted_sucessfully: bool = False + self._stop_function: Optional[Callable] = None + + @property + def status(self) -> bool: + return self._booted_sucessfully + + def overwrite_status(self, status: bool): + self._booted_sucessfully = status def send_dict(self, input_dict: dict): """ @@ -108,16 +117,13 @@ def bootup( self, command_lst: Optional[list[str]] = None, stop_function: Optional[Callable] = None, - ) -> bool: + ): """ Boot up the client process to connect to the SocketInterface. Args: command_lst (list): list of strings to start the client process stop_function (Callable): Function to stop the interface. - - Returns: - bool: Whether the interface was successfully started. """ if command_lst is None and len(self._command_lst) > 0: command_lst = self._command_lst @@ -125,14 +131,16 @@ def bootup( self._command_lst = command_lst else: raise ValueError() - if command_lst is not None and not self._spawner.bootup( - command_lst=command_lst, - stop_function=stop_function, + if stop_function is not None: + self._stop_function = stop_function + if not self._spawner.bootup( + command_lst=self._command_lst, + stop_function=self._stop_function, ): self._reset_socket() - return False + self._booted_sucessfully = False else: - return True + self._booted_sucessfully = True def shutdown(self, wait: bool = True): """ @@ -217,13 +225,11 @@ def interface_bootup( "--zmqport", str(interface.bind_to_random_port()), ] - if interface.bootup( + interface.bootup( command_lst=command_lst, stop_function=stop_function, - ): - return interface, True - else: - return interface, False + ) + return interface def interface_connect(host: str, port: str) -> tuple[zmq.Context, zmq.Socket]: diff --git a/executorlib/task_scheduler/interactive/blockallocation.py b/executorlib/task_scheduler/interactive/blockallocation.py index 37621184..4e3f09bb 100644 --- a/executorlib/task_scheduler/interactive/blockallocation.py +++ b/executorlib/task_scheduler/interactive/blockallocation.py @@ -237,7 +237,7 @@ def _execute_multiple_tasks( distribution. stop_function (Callable): Function to stop the interface. """ - interface, interface_bootup_flag = interface_bootup( + interface = interface_bootup( command_lst=get_interactive_execute_command( cores=cores, ), @@ -249,19 +249,17 @@ def _execute_multiple_tasks( ) interface_initialization_exception = _set_init_function( interface=interface, - interface_bootup_flag=interface_bootup_flag, + interface_bootup_flag=interface.status, init_function=init_function, ) restart_counter = 0 restart_limit = 2 while True: - if not interface_bootup_flag and restart_counter > restart_limit: - interface_bootup_flag = True # no more restarts + if not interface.status and restart_counter > restart_limit: + interface.overwrite_status(status=True) # no more restarts interface_initialization_exception = ExecutorlibSocketError() - elif not interface_bootup_flag: - interface_bootup_flag = interface.bootup( - stop_function=stop_function, - ) + elif not interface.status: + interface.bootup() interface_initialization_exception = _set_init_function( interface=interface, interface_bootup_flag=interface_bootup_flag, diff --git a/executorlib/task_scheduler/interactive/onetoone.py b/executorlib/task_scheduler/interactive/onetoone.py index 945fcf33..f637f8b7 100644 --- a/executorlib/task_scheduler/interactive/onetoone.py +++ b/executorlib/task_scheduler/interactive/onetoone.py @@ -270,7 +270,7 @@ def _execute_task_in_thread( hostname_localhost=hostname_localhost, log_obj_size=log_obj_size, worker_id=worker_id, - )[0], + ), cache_directory=cache_directory, cache_key=cache_key, error_log_file=error_log_file, diff --git a/tests/test_standalone_interactive_communication.py b/tests/test_standalone_interactive_communication.py index 8b6ceaaf..a644a42d 100644 --- a/tests/test_standalone_interactive_communication.py +++ b/tests/test_standalone_interactive_communication.py @@ -41,7 +41,7 @@ def test_interface_mpi(self): interface = SocketInterface( spawner=MpiExecSpawner(cwd=None, cores=1, openmpi_oversubscribe=False) ) - success_flag = interface.bootup( + interface.bootup( command_lst=[ sys.executable, os.path.abspath( @@ -58,7 +58,7 @@ def test_interface_mpi(self): str(interface.bind_to_random_port()), ] ) - self.assertTrue(success_flag) + self.assertTrue(interface.status) self.assertEqual( interface.send_and_receive_dict(input_dict=task_dict), np.array(4) ) @@ -71,7 +71,7 @@ def test_interface_serial_without_debug(self): spawner=MpiExecSpawner(cwd=None, cores=1, openmpi_oversubscribe=False), log_obj_size=False, ) - success_flag = interface.bootup( + interface.bootup( command_lst=[ sys.executable, os.path.abspath( @@ -88,7 +88,7 @@ def test_interface_serial_without_debug(self): str(interface.bind_to_random_port()), ] ) - self.assertTrue(success_flag) + self.assertTrue(interface.status) self.assertEqual( interface.send_and_receive_dict(input_dict=task_dict), np.array(4) ) @@ -101,7 +101,7 @@ def test_interface_serial_with_debug(self): spawner=MpiExecSpawner(cwd=None, cores=1, openmpi_oversubscribe=False), log_obj_size=True, ) - success_flag = interface.bootup( + interface.bootup( command_lst=[ sys.executable, os.path.abspath( @@ -118,7 +118,7 @@ def test_interface_serial_with_debug(self): str(interface.bind_to_random_port()), ] ) - self.assertTrue(success_flag) + self.assertTrue(interface.status) self.assertEqual( interface.send_and_receive_dict(input_dict=task_dict), np.array(4) ) @@ -130,8 +130,8 @@ def test_interface_serial_with_error(self): spawner=MpiExecSpawner(cwd=None, cores=1, openmpi_oversubscribe=False), log_obj_size=True, ) - success_flag = interface.bootup(command_lst=["bash", "exit"]) - self.assertTrue(success_flag) + interface.bootup(command_lst=["bash", "exit"]) + self.assertTrue(interface.status) while interface._spawner.poll(): sleep(0.1) self.assertFalse(interface._spawner.poll()) @@ -152,8 +152,8 @@ def test_interface_serial_with_broken_spawner(self): spawner=BrokenSpawner(cwd=None, cores=1, openmpi_oversubscribe=False), log_obj_size=True, ) - success_flag = interface.bootup(command_lst=["bash", "exit"]) - self.assertFalse(success_flag) + interface.bootup(command_lst=["bash", "exit"]) + self.assertFalse(interface.status) def test_interface_serial_with_stopped_process(self): cloudpickle_register(ind=1) @@ -162,7 +162,7 @@ def test_interface_serial_with_stopped_process(self): spawner=MpiExecSpawner(cwd=None, cores=1, openmpi_oversubscribe=False), log_obj_size=True, ) - success_flag = interface.bootup( + interface.bootup( command_lst=[ sys.executable, os.path.abspath( @@ -179,7 +179,7 @@ def test_interface_serial_with_stopped_process(self): str(interface.bind_to_random_port()), ] ) - self.assertTrue(success_flag) + self.assertTrue(interface.status) interface.send_dict(input_dict=task_dict) interface._spawner._process.terminate() with self.assertRaises(ExecutorlibSocketError): From 489aae9f519e56d3d40d25ac520763f68aa23e68 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Sun, 7 Sep 2025 17:35:32 +0200 Subject: [PATCH 18/29] clean up interface --- executorlib/standalone/interactive/communication.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/executorlib/standalone/interactive/communication.py b/executorlib/standalone/interactive/communication.py index 840dc844..ac5fd467 100644 --- a/executorlib/standalone/interactive/communication.py +++ b/executorlib/standalone/interactive/communication.py @@ -125,14 +125,12 @@ def bootup( command_lst (list): list of strings to start the client process stop_function (Callable): Function to stop the interface. """ - if command_lst is None and len(self._command_lst) > 0: - command_lst = self._command_lst - elif command_lst is not None: + if command_lst is not None: self._command_lst = command_lst - else: - raise ValueError() if stop_function is not None: self._stop_function = stop_function + if len(self._command_lst) == 0: + raise ValueError() if not self._spawner.bootup( command_lst=self._command_lst, stop_function=self._stop_function, From 73b55f87acfb6dfe96a533e7ef449876fa3307c6 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Sun, 7 Sep 2025 17:37:14 +0200 Subject: [PATCH 19/29] fixes --- executorlib/task_scheduler/interactive/blockallocation.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/executorlib/task_scheduler/interactive/blockallocation.py b/executorlib/task_scheduler/interactive/blockallocation.py index 4e3f09bb..2a2c1758 100644 --- a/executorlib/task_scheduler/interactive/blockallocation.py +++ b/executorlib/task_scheduler/interactive/blockallocation.py @@ -262,7 +262,7 @@ def _execute_multiple_tasks( interface.bootup() interface_initialization_exception = _set_init_function( interface=interface, - interface_bootup_flag=interface_bootup_flag, + interface_bootup_flag=interface.status, init_function=init_function, ) restart_counter += 1 From cde4c07533f319406514e4012406a9a1cb3144a0 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Sun, 7 Sep 2025 17:42:44 +0200 Subject: [PATCH 20/29] fix type hints --- executorlib/standalone/interactive/communication.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/executorlib/standalone/interactive/communication.py b/executorlib/standalone/interactive/communication.py index ac5fd467..9153b52d 100644 --- a/executorlib/standalone/interactive/communication.py +++ b/executorlib/standalone/interactive/communication.py @@ -183,7 +183,7 @@ def interface_bootup( log_obj_size: bool = False, worker_id: Optional[int] = None, stop_function: Optional[Callable] = None, -) -> tuple[SocketInterface, bool]: +) -> SocketInterface: """ Start interface for ZMQ communication @@ -204,7 +204,7 @@ def interface_bootup( stop_function (Callable): Function to stop the interface. Returns: - executorlib.shared.communication.SocketInterface, bool: socket interface for zmq communication, success flag + executorlib.shared.communication.SocketInterface: socket interface for zmq communication """ if hostname_localhost is None and sys.platform != "darwin": hostname_localhost = False From 53cb830ab11f55d28d1572c95688554b73e86d5d Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Sun, 7 Sep 2025 17:50:16 +0200 Subject: [PATCH 21/29] fix tests --- .../test_task_scheduler_interactive_shared.py | 31 ++++++++++++------- 1 file changed, 19 insertions(+), 12 deletions(-) diff --git a/tests/test_task_scheduler_interactive_shared.py b/tests/test_task_scheduler_interactive_shared.py index ae176e09..bd691d3b 100644 --- a/tests/test_task_scheduler_interactive_shared.py +++ b/tests/test_task_scheduler_interactive_shared.py @@ -5,6 +5,7 @@ from executorlib.standalone.command import get_interactive_execute_command from executorlib.standalone.interactive.communication import interface_bootup, ExecutorlibSocketError from executorlib.standalone.interactive.spawner import SubprocessSpawner +from executorlib.standalone.serialize import cloudpickle_register from executorlib.task_scheduler.interactive.shared import execute_task_dict try: @@ -21,8 +22,9 @@ def get_error(): class TestExecuteTaskDictWithoutCache(unittest.TestCase): def test_execute_task_sum(self): + cloudpickle_register(ind=1) f = Future() - interface, success_flag = interface_bootup( + interface = interface_bootup( command_lst=get_interactive_execute_command( cores=1, ), @@ -32,7 +34,7 @@ def test_execute_task_sum(self): worker_id=1, stop_function=None, ) - self.assertTrue(success_flag) + self.assertTrue(interface.status) self.assertFalse(f.done()) result = execute_task_dict( task_dict={"fn": sum, "args": ([1, 2], ), "kwargs": {}}, @@ -47,9 +49,10 @@ def test_execute_task_sum(self): self.assertEqual(f.result(), 3) def test_execute_task_done(self): + cloudpickle_register(ind=1) f = Future() f.set_result(5) - interface, success_flag = interface_bootup( + interface = interface_bootup( command_lst=get_interactive_execute_command( cores=1, ), @@ -59,7 +62,7 @@ def test_execute_task_done(self): worker_id=1, stop_function=None, ) - self.assertTrue(success_flag) + self.assertTrue(interface.status) self.assertTrue(f.done()) result = execute_task_dict( task_dict={"fn": sum, "args": ([1, 2], ), "kwargs": {}}, @@ -74,8 +77,9 @@ def test_execute_task_done(self): self.assertEqual(f.result(), 5) def test_execute_task_error(self): + cloudpickle_register(ind=1) f = Future() - interface, success_flag = interface_bootup( + interface = interface_bootup( command_lst=get_interactive_execute_command( cores=1, ), @@ -85,7 +89,7 @@ def test_execute_task_error(self): worker_id=1, stop_function=None, ) - self.assertTrue(success_flag) + self.assertTrue(interface.status) self.assertFalse(f.done()) result = execute_task_dict( task_dict={"fn": get_error, "args": (), "kwargs": {}}, @@ -107,8 +111,9 @@ def tearDown(self): shutil.rmtree("cache_execute_task", ignore_errors=True) def test_execute_task_sum(self): + cloudpickle_register(ind=1) f = Future() - interface, success_flag = interface_bootup( + interface = interface_bootup( command_lst=get_interactive_execute_command( cores=1, ), @@ -118,7 +123,7 @@ def test_execute_task_sum(self): worker_id=1, stop_function=None, ) - self.assertTrue(success_flag) + self.assertTrue(interface.status) self.assertFalse(f.done()) result = execute_task_dict( task_dict={"fn": sum, "args": ([1, 2], ), "kwargs": {}}, @@ -133,9 +138,10 @@ def test_execute_task_sum(self): self.assertEqual(f.result(), 3) def test_execute_task_done(self): + cloudpickle_register(ind=1) f = Future() f.set_result(5) - interface, success_flag = interface_bootup( + interface = interface_bootup( command_lst=get_interactive_execute_command( cores=1, ), @@ -145,7 +151,7 @@ def test_execute_task_done(self): worker_id=1, stop_function=None, ) - self.assertTrue(success_flag) + self.assertTrue(interface.status) self.assertTrue(f.done()) result = execute_task_dict( task_dict={"fn": sum, "args": ([1, 2], ), "kwargs": {}}, @@ -160,8 +166,9 @@ def test_execute_task_done(self): self.assertEqual(f.result(), 5) def test_execute_task_error(self): + cloudpickle_register(ind=1) f = Future() - interface, success_flag = interface_bootup( + interface = interface_bootup( command_lst=get_interactive_execute_command( cores=1, ), @@ -171,7 +178,7 @@ def test_execute_task_error(self): worker_id=1, stop_function=None, ) - self.assertTrue(success_flag) + self.assertTrue(interface.status) self.assertFalse(f.done()) result = execute_task_dict( task_dict={"fn": get_error, "args": (), "kwargs": {}}, From 45b6eb0a6aaac01a1d7710a11a17e8aadae96e40 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Sun, 7 Sep 2025 17:57:21 +0200 Subject: [PATCH 22/29] fix tests --- .../standalone/interactive/communication.py | 3 ++- .../task_scheduler/interactive/blockallocation.py | 14 +++++--------- 2 files changed, 7 insertions(+), 10 deletions(-) diff --git a/executorlib/standalone/interactive/communication.py b/executorlib/standalone/interactive/communication.py index 9153b52d..6aafde5d 100644 --- a/executorlib/standalone/interactive/communication.py +++ b/executorlib/standalone/interactive/communication.py @@ -50,7 +50,8 @@ def __init__( def status(self) -> bool: return self._booted_sucessfully - def overwrite_status(self, status: bool): + @status.setter + def status(self, status: bool): self._booted_sucessfully = status def send_dict(self, input_dict: dict): diff --git a/executorlib/task_scheduler/interactive/blockallocation.py b/executorlib/task_scheduler/interactive/blockallocation.py index 2a2c1758..3ef1716f 100644 --- a/executorlib/task_scheduler/interactive/blockallocation.py +++ b/executorlib/task_scheduler/interactive/blockallocation.py @@ -249,28 +249,25 @@ def _execute_multiple_tasks( ) interface_initialization_exception = _set_init_function( interface=interface, - interface_bootup_flag=interface.status, init_function=init_function, ) restart_counter = 0 restart_limit = 2 while True: if not interface.status and restart_counter > restart_limit: - interface.overwrite_status(status=True) # no more restarts + interface.status = True # no more restarts interface_initialization_exception = ExecutorlibSocketError() elif not interface.status: interface.bootup() interface_initialization_exception = _set_init_function( interface=interface, - interface_bootup_flag=interface.status, init_function=init_function, ) restart_counter += 1 else: # interface_bootup_flag = True task_dict = future_queue.get() if "shutdown" in task_dict and task_dict["shutdown"]: - if interface is not None: - interface.shutdown(wait=task_dict["wait"]) + interface.shutdown(wait=task_dict["wait"]) task_done(future_queue=future_queue) if queue_join_on_shutdown: future_queue.join() @@ -281,7 +278,7 @@ def _execute_multiple_tasks( f.set_exception(exception=interface_initialization_exception) else: # The interface failed during the execution - interface_bootup_flag = execute_task_dict( + interface.status = status=execute_task_dict( task_dict=task_dict, future_obj=f, interface=interface, @@ -289,7 +286,7 @@ def _execute_multiple_tasks( cache_key=cache_key, error_log_file=error_log_file, ) - if not interface_bootup_flag: + if not interface.status: reset_task_dict( future_obj=f, future_queue=future_queue, task_dict=task_dict ) @@ -298,11 +295,10 @@ def _execute_multiple_tasks( def _set_init_function( interface: SocketInterface, - interface_bootup_flag: bool = True, init_function: Optional[Callable] = None, ) -> Optional[Exception]: interface_initialization_exception = None - if init_function is not None and interface_bootup_flag: + if init_function is not None and interface.status: try: _ = interface.send_and_receive_dict( input_dict={"init": True, "fn": init_function, "args": (), "kwargs": {}} From 8e6027957193997a3682c72dfb8d1b5a0110b05b Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sun, 7 Sep 2025 15:57:30 +0000 Subject: [PATCH 23/29] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- executorlib/task_scheduler/interactive/blockallocation.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/executorlib/task_scheduler/interactive/blockallocation.py b/executorlib/task_scheduler/interactive/blockallocation.py index 3ef1716f..80268839 100644 --- a/executorlib/task_scheduler/interactive/blockallocation.py +++ b/executorlib/task_scheduler/interactive/blockallocation.py @@ -278,7 +278,7 @@ def _execute_multiple_tasks( f.set_exception(exception=interface_initialization_exception) else: # The interface failed during the execution - interface.status = status=execute_task_dict( + interface.status = status = execute_task_dict( task_dict=task_dict, future_obj=f, interface=interface, From 71cca2fdf62bcc137b38b3df047952b457638748 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Sun, 7 Sep 2025 17:57:46 +0200 Subject: [PATCH 24/29] update docstring --- executorlib/task_scheduler/interactive/blockallocation.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/executorlib/task_scheduler/interactive/blockallocation.py b/executorlib/task_scheduler/interactive/blockallocation.py index 3ef1716f..79406238 100644 --- a/executorlib/task_scheduler/interactive/blockallocation.py +++ b/executorlib/task_scheduler/interactive/blockallocation.py @@ -264,7 +264,7 @@ def _execute_multiple_tasks( init_function=init_function, ) restart_counter += 1 - else: # interface_bootup_flag = True + else: # interface.status == True task_dict = future_queue.get() if "shutdown" in task_dict and task_dict["shutdown"]: interface.shutdown(wait=task_dict["wait"]) From 79aa4a32795bfdfdb18ee745f7c307792c0de1b5 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Sun, 7 Sep 2025 17:58:40 +0200 Subject: [PATCH 25/29] fix --- executorlib/task_scheduler/interactive/blockallocation.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/executorlib/task_scheduler/interactive/blockallocation.py b/executorlib/task_scheduler/interactive/blockallocation.py index 05c70429..7f2ad728 100644 --- a/executorlib/task_scheduler/interactive/blockallocation.py +++ b/executorlib/task_scheduler/interactive/blockallocation.py @@ -278,7 +278,7 @@ def _execute_multiple_tasks( f.set_exception(exception=interface_initialization_exception) else: # The interface failed during the execution - interface.status = status = execute_task_dict( + interface.status = execute_task_dict( task_dict=task_dict, future_obj=f, interface=interface, From 20d0c6ba220376c46db1bf1cf3ee0476a87f2820 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Sun, 7 Sep 2025 18:10:31 +0200 Subject: [PATCH 26/29] Add restart_limit to resource_dict --- executorlib/executor/flux.py | 1 + executorlib/executor/single.py | 1 + executorlib/executor/slurm.py | 1 + executorlib/task_scheduler/interactive/blockallocation.py | 3 ++- 4 files changed, 5 insertions(+), 1 deletion(-) diff --git a/executorlib/executor/flux.py b/executorlib/executor/flux.py index c1e46c4d..be8fa27c 100644 --- a/executorlib/executor/flux.py +++ b/executorlib/executor/flux.py @@ -43,6 +43,7 @@ class FluxJobExecutor(BaseExecutor): compute notes. Defaults to False. - error_log_file (str): Name of the error log file to use for storing exceptions raised by the Python functions submitted to the Executor. + - restart_limit (int): The maximum number of restarting worker processes. Default: 0 pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None flux_executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux flux_executor_nesting (bool): Provide hierarchically nested Flux job scheduler inside the submitted function. diff --git a/executorlib/executor/single.py b/executorlib/executor/single.py index 8c8c5ba1..2f75d0c0 100644 --- a/executorlib/executor/single.py +++ b/executorlib/executor/single.py @@ -120,6 +120,7 @@ def __init__( only) - error_log_file (str): Name of the error log file to use for storing exceptions raised by the Python functions submitted to the Executor. + - restart_limit (int): The maximum number of restarting worker processes. Default: 0 hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the context of an HPC cluster this essential to be able to communicate to an Executor running on a different compute node within the same allocation. And diff --git a/executorlib/executor/slurm.py b/executorlib/executor/slurm.py index 16747bd5..e8244d1b 100644 --- a/executorlib/executor/slurm.py +++ b/executorlib/executor/slurm.py @@ -43,6 +43,7 @@ class SlurmClusterExecutor(BaseExecutor): - slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM only) - error_log_file (str): Name of the error log file to use for storing exceptions raised by the Python functions submitted to the Executor. + - restart_limit (int): The maximum number of restarting worker processes. Default: 0 pysqa_config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend). pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the diff --git a/executorlib/task_scheduler/interactive/blockallocation.py b/executorlib/task_scheduler/interactive/blockallocation.py index 7f2ad728..98e198d1 100644 --- a/executorlib/task_scheduler/interactive/blockallocation.py +++ b/executorlib/task_scheduler/interactive/blockallocation.py @@ -209,6 +209,7 @@ def _execute_multiple_tasks( error_log_file: Optional[str] = None, worker_id: Optional[int] = None, stop_function: Optional[Callable] = None, + restart_limit: int = 0, **kwargs, ) -> None: """ @@ -236,6 +237,7 @@ def _execute_multiple_tasks( worker_id (int): Communicate the worker which ID was assigned to it for future reference and resource distribution. stop_function (Callable): Function to stop the interface. + restart_limit (int): The maximum number of restarting worker processes. """ interface = interface_bootup( command_lst=get_interactive_execute_command( @@ -252,7 +254,6 @@ def _execute_multiple_tasks( init_function=init_function, ) restart_counter = 0 - restart_limit = 2 while True: if not interface.status and restart_counter > restart_limit: interface.status = True # no more restarts From 5e23ad5e77ff4671ef4f3834f4da9a8b1d1f9ad2 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Sun, 7 Sep 2025 18:29:46 +0200 Subject: [PATCH 27/29] Add error messages --- executorlib/standalone/interactive/communication.py | 4 ++-- executorlib/task_scheduler/interactive/blockallocation.py | 2 +- executorlib/task_scheduler/interactive/onetoone.py | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/executorlib/standalone/interactive/communication.py b/executorlib/standalone/interactive/communication.py index 6aafde5d..0013b2fc 100644 --- a/executorlib/standalone/interactive/communication.py +++ b/executorlib/standalone/interactive/communication.py @@ -78,7 +78,7 @@ def receive_dict(self) -> dict: while len(response_lst) == 0: response_lst = self._poller.poll(self._time_out_ms) if not self._spawner.poll(): - raise ExecutorlibSocketError() + raise ExecutorlibSocketError("SocketInterface crashed during execution.") data = self._socket.recv(zmq.NOBLOCK) if self._logger is not None: self._logger.warning( @@ -131,7 +131,7 @@ def bootup( if stop_function is not None: self._stop_function = stop_function if len(self._command_lst) == 0: - raise ValueError() + raise ValueError("No command defined to boot up SocketInterface.") if not self._spawner.bootup( command_lst=self._command_lst, stop_function=self._stop_function, diff --git a/executorlib/task_scheduler/interactive/blockallocation.py b/executorlib/task_scheduler/interactive/blockallocation.py index 98e198d1..d6dd3746 100644 --- a/executorlib/task_scheduler/interactive/blockallocation.py +++ b/executorlib/task_scheduler/interactive/blockallocation.py @@ -257,7 +257,7 @@ def _execute_multiple_tasks( while True: if not interface.status and restart_counter > restart_limit: interface.status = True # no more restarts - interface_initialization_exception = ExecutorlibSocketError() + interface_initialization_exception = ExecutorlibSocketError("SocketInterface crashed during execution.") elif not interface.status: interface.bootup() interface_initialization_exception = _set_init_function( diff --git a/executorlib/task_scheduler/interactive/onetoone.py b/executorlib/task_scheduler/interactive/onetoone.py index f637f8b7..1b23f30a 100644 --- a/executorlib/task_scheduler/interactive/onetoone.py +++ b/executorlib/task_scheduler/interactive/onetoone.py @@ -275,4 +275,4 @@ def _execute_task_in_thread( cache_key=cache_key, error_log_file=error_log_file, ): - future_obj.set_exception(ExecutorlibSocketError()) + future_obj.set_exception(ExecutorlibSocketError("SocketInterface crashed during execution.")) From 2a472b54306eeab21fcf512ecdb8ef9fd64d8c8e Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sun, 7 Sep 2025 16:29:55 +0000 Subject: [PATCH 28/29] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- executorlib/standalone/interactive/communication.py | 4 +++- executorlib/task_scheduler/interactive/blockallocation.py | 4 +++- executorlib/task_scheduler/interactive/onetoone.py | 4 +++- 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/executorlib/standalone/interactive/communication.py b/executorlib/standalone/interactive/communication.py index 0013b2fc..68c6379a 100644 --- a/executorlib/standalone/interactive/communication.py +++ b/executorlib/standalone/interactive/communication.py @@ -78,7 +78,9 @@ def receive_dict(self) -> dict: while len(response_lst) == 0: response_lst = self._poller.poll(self._time_out_ms) if not self._spawner.poll(): - raise ExecutorlibSocketError("SocketInterface crashed during execution.") + raise ExecutorlibSocketError( + "SocketInterface crashed during execution." + ) data = self._socket.recv(zmq.NOBLOCK) if self._logger is not None: self._logger.warning( diff --git a/executorlib/task_scheduler/interactive/blockallocation.py b/executorlib/task_scheduler/interactive/blockallocation.py index d6dd3746..e21f1be4 100644 --- a/executorlib/task_scheduler/interactive/blockallocation.py +++ b/executorlib/task_scheduler/interactive/blockallocation.py @@ -257,7 +257,9 @@ def _execute_multiple_tasks( while True: if not interface.status and restart_counter > restart_limit: interface.status = True # no more restarts - interface_initialization_exception = ExecutorlibSocketError("SocketInterface crashed during execution.") + interface_initialization_exception = ExecutorlibSocketError( + "SocketInterface crashed during execution." + ) elif not interface.status: interface.bootup() interface_initialization_exception = _set_init_function( diff --git a/executorlib/task_scheduler/interactive/onetoone.py b/executorlib/task_scheduler/interactive/onetoone.py index 1b23f30a..d303ea94 100644 --- a/executorlib/task_scheduler/interactive/onetoone.py +++ b/executorlib/task_scheduler/interactive/onetoone.py @@ -275,4 +275,6 @@ def _execute_task_in_thread( cache_key=cache_key, error_log_file=error_log_file, ): - future_obj.set_exception(ExecutorlibSocketError("SocketInterface crashed during execution.")) + future_obj.set_exception( + ExecutorlibSocketError("SocketInterface crashed during execution.") + ) From 87118a286032e76c8af2ea27e5ae9e5accf56b65 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Sun, 7 Sep 2025 22:09:57 +0200 Subject: [PATCH 29/29] Use random hash --- executorlib/task_scheduler/interactive/blockallocation.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/executorlib/task_scheduler/interactive/blockallocation.py b/executorlib/task_scheduler/interactive/blockallocation.py index e21f1be4..d093b77d 100644 --- a/executorlib/task_scheduler/interactive/blockallocation.py +++ b/executorlib/task_scheduler/interactive/blockallocation.py @@ -1,4 +1,5 @@ import queue +import random from concurrent.futures import Future from threading import Thread from typing import Callable, Optional @@ -73,7 +74,7 @@ def __init__( executor_kwargs["queue_join_on_shutdown"] = False self._process_kwargs = executor_kwargs self._max_workers = max_workers - self_id = id(self) + self_id = random.getrandbits(128) self._self_id = self_id _interrupt_bootup_dict[self._self_id] = False self._set_process( @@ -270,7 +271,8 @@ def _execute_multiple_tasks( else: # interface.status == True task_dict = future_queue.get() if "shutdown" in task_dict and task_dict["shutdown"]: - interface.shutdown(wait=task_dict["wait"]) + if interface.status: + interface.shutdown(wait=task_dict["wait"]) task_done(future_queue=future_queue) if queue_join_on_shutdown: future_queue.join()