From 6d6b950d232f18fe40e7d67d2249bb411cefaa08 Mon Sep 17 00:00:00 2001 From: jan-janssen Date: Sat, 30 Aug 2025 13:53:32 +0200 Subject: [PATCH 01/14] all tasks are stopped with stop function --- executorlib/standalone/interactive/communication.py | 8 +++++++- executorlib/standalone/interactive/spawner.py | 2 ++ executorlib/task_scheduler/interactive/blockallocation.py | 4 +++- executorlib/task_scheduler/interactive/pysqaspawner.py | 4 ++++ executorlib/task_scheduler/interactive/shared.py | 4 ++++ 5 files changed, 20 insertions(+), 2 deletions(-) diff --git a/executorlib/standalone/interactive/communication.py b/executorlib/standalone/interactive/communication.py index b5af3c56..8a37ebb3 100644 --- a/executorlib/standalone/interactive/communication.py +++ b/executorlib/standalone/interactive/communication.py @@ -43,6 +43,7 @@ def __init__( self._logger = logging.getLogger("executorlib") self._spawner = spawner self._command_lst: list[str] = [] + self._stop_function: Optional[callable] = None def send_dict(self, input_dict: dict): """ @@ -107,6 +108,7 @@ def bind_to_random_port(self) -> int: def bootup( self, command_lst: list[str], + stop_function: Optional[callable] = None, ): """ Boot up the client process to connect to the SocketInterface. @@ -115,8 +117,10 @@ def bootup( command_lst (list): list of strings to start the client process """ self._command_lst = command_lst + self._stop_function = stop_function self._spawner.bootup( command_lst=command_lst, + stop_function=stop_function, ) def restart(self): @@ -125,6 +129,7 @@ def restart(self): """ self._spawner.bootup( command_lst=self._command_lst, + stop_function=self._stop_function, ) def shutdown(self, wait: bool = True): @@ -163,6 +168,7 @@ def interface_bootup( hostname_localhost: Optional[bool] = None, log_obj_size: bool = False, worker_id: Optional[int] = None, + stop_function: Optional[callable] = None, ) -> SocketInterface: """ Start interface for ZMQ communication @@ -203,7 +209,7 @@ def interface_bootup( str(interface.bind_to_random_port()), ] interface.bootup( - command_lst=command_lst, + command_lst=command_lst, stop_function=stop_function, ) return interface diff --git a/executorlib/standalone/interactive/spawner.py b/executorlib/standalone/interactive/spawner.py index 4a5cb390..e01a3538 100644 --- a/executorlib/standalone/interactive/spawner.py +++ b/executorlib/standalone/interactive/spawner.py @@ -29,6 +29,7 @@ def __init__( def bootup( self, command_lst: list[str], + stop_function: Optional[callable] = None, ): """ Method to start the interface. @@ -87,6 +88,7 @@ def __init__( def bootup( self, command_lst: list[str], + stop_function: Optional[callable] = None, ): """ Method to start the subprocess interface. diff --git a/executorlib/task_scheduler/interactive/blockallocation.py b/executorlib/task_scheduler/interactive/blockallocation.py index 96cec2c1..5ecaf26c 100644 --- a/executorlib/task_scheduler/interactive/blockallocation.py +++ b/executorlib/task_scheduler/interactive/blockallocation.py @@ -61,11 +61,12 @@ def __init__( executor_kwargs["queue_join_on_shutdown"] = False self._process_kwargs = executor_kwargs self._max_workers = max_workers + self._shutdown_flag = False self._set_process( process=[ Thread( target=execute_tasks, - kwargs=executor_kwargs | {"worker_id": worker_id}, + kwargs=executor_kwargs | {"worker_id": worker_id, "stop_function": lambda : self._shutdown_flag}, ) for worker_id in range(self._max_workers) ], @@ -155,6 +156,7 @@ def shutdown(self, wait: bool = True, *, cancel_futures: bool = False): if self._future_queue is not None: if cancel_futures: cancel_items_in_queue(que=self._future_queue) + self._shutdown_flag = True if isinstance(self._process, list): for _ in range(len(self._process)): self._future_queue.put({"shutdown": True, "wait": wait}) diff --git a/executorlib/task_scheduler/interactive/pysqaspawner.py b/executorlib/task_scheduler/interactive/pysqaspawner.py index 73a8cb87..f65396a8 100644 --- a/executorlib/task_scheduler/interactive/pysqaspawner.py +++ b/executorlib/task_scheduler/interactive/pysqaspawner.py @@ -58,6 +58,7 @@ def __init__( def bootup( self, command_lst: list[str], + stop_function: Optional[callable] = None, ): """ Method to start the subprocess interface. @@ -77,6 +78,9 @@ def bootup( while True: if self._check_process_helper(command_lst=command_lst): break + elif stop_function is not None and stop_function(): + self.shutdown(wait=True) + break else: sleep(1) # Wait for the process to start diff --git a/executorlib/task_scheduler/interactive/shared.py b/executorlib/task_scheduler/interactive/shared.py index 883c3dac..9ea8f52b 100644 --- a/executorlib/task_scheduler/interactive/shared.py +++ b/executorlib/task_scheduler/interactive/shared.py @@ -28,6 +28,7 @@ def execute_tasks( log_obj_size: bool = False, error_log_file: Optional[str] = None, worker_id: Optional[int] = None, + stop_function: Optional[callable] = None, **kwargs, ) -> None: """ @@ -63,7 +64,10 @@ def execute_tasks( hostname_localhost=hostname_localhost, log_obj_size=log_obj_size, worker_id=worker_id, + stop_function=stop_function, ) + if interface._spawner._process is None: + return if init_function is not None: interface.send_dict( input_dict={"init": True, "fn": init_function, "args": (), "kwargs": {}} From 8dbca7abe692a2a499658ed494f687c2155b2458 Mon Sep 17 00:00:00 2001 From: pyiron-runner Date: Sat, 30 Aug 2025 11:54:05 +0000 Subject: [PATCH 02/14] Format black --- executorlib/standalone/interactive/communication.py | 3 ++- executorlib/task_scheduler/interactive/blockallocation.py | 6 +++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/executorlib/standalone/interactive/communication.py b/executorlib/standalone/interactive/communication.py index 8a37ebb3..552201d7 100644 --- a/executorlib/standalone/interactive/communication.py +++ b/executorlib/standalone/interactive/communication.py @@ -209,7 +209,8 @@ def interface_bootup( str(interface.bind_to_random_port()), ] interface.bootup( - command_lst=command_lst, stop_function=stop_function, + command_lst=command_lst, + stop_function=stop_function, ) return interface diff --git a/executorlib/task_scheduler/interactive/blockallocation.py b/executorlib/task_scheduler/interactive/blockallocation.py index 5ecaf26c..d71bd049 100644 --- a/executorlib/task_scheduler/interactive/blockallocation.py +++ b/executorlib/task_scheduler/interactive/blockallocation.py @@ -66,7 +66,11 @@ def __init__( process=[ Thread( target=execute_tasks, - kwargs=executor_kwargs | {"worker_id": worker_id, "stop_function": lambda : self._shutdown_flag}, + kwargs=executor_kwargs + | { + "worker_id": worker_id, + "stop_function": lambda: self._shutdown_flag, + }, ) for worker_id in range(self._max_workers) ], From 118389e9205c5b97a463448c7887a77e30aebb85 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Sat, 30 Aug 2025 15:40:08 +0200 Subject: [PATCH 03/14] add additional break --- .../standalone/interactive/communication.py | 31 +++++++++++++------ executorlib/standalone/interactive/spawner.py | 5 +-- .../interactive/pysqaspawner.py | 4 +-- .../task_scheduler/interactive/shared.py | 18 ++++++----- 4 files changed, 37 insertions(+), 21 deletions(-) diff --git a/executorlib/standalone/interactive/communication.py b/executorlib/standalone/interactive/communication.py index 552201d7..c57e8d56 100644 --- a/executorlib/standalone/interactive/communication.py +++ b/executorlib/standalone/interactive/communication.py @@ -109,7 +109,7 @@ def bootup( self, command_lst: list[str], stop_function: Optional[callable] = None, - ): + ) -> bool: """ Boot up the client process to connect to the SocketInterface. @@ -118,19 +118,25 @@ def bootup( """ self._command_lst = command_lst self._stop_function = stop_function - self._spawner.bootup( + if not self._spawner.bootup( command_lst=command_lst, stop_function=stop_function, - ) + ): + self._reset_socket() + return False + return True def restart(self): """ Restart the client process to onnect to the SocketInterface. """ - self._spawner.bootup( + if not self._spawner.bootup( command_lst=self._command_lst, stop_function=self._stop_function, - ) + ): + self._reset_socket() + return False + return True def shutdown(self, wait: bool = True): """ @@ -145,6 +151,10 @@ def shutdown(self, wait: bool = True): input_dict={"shutdown": True, "wait": wait} ) self._spawner.shutdown(wait=wait) + self._reset_socket() + return result + + def _reset_socket(self): if self._socket is not None: self._socket.close() if self._context is not None: @@ -152,7 +162,6 @@ def shutdown(self, wait: bool = True): self._process = None self._socket = None self._context = None - return result def __del__(self): """ @@ -169,7 +178,7 @@ def interface_bootup( log_obj_size: bool = False, worker_id: Optional[int] = None, stop_function: Optional[callable] = None, -) -> SocketInterface: +) -> Optional[SocketInterface]: """ Start interface for ZMQ communication @@ -208,11 +217,13 @@ def interface_bootup( "--zmqport", str(interface.bind_to_random_port()), ] - interface.bootup( + if interface.bootup( command_lst=command_lst, stop_function=stop_function, - ) - return interface + ): + return interface + else: + return None def interface_connect(host: str, port: str) -> tuple[zmq.Context, zmq.Socket]: diff --git a/executorlib/standalone/interactive/spawner.py b/executorlib/standalone/interactive/spawner.py index e01a3538..2abcd148 100644 --- a/executorlib/standalone/interactive/spawner.py +++ b/executorlib/standalone/interactive/spawner.py @@ -30,7 +30,7 @@ def bootup( self, command_lst: list[str], stop_function: Optional[callable] = None, - ): + ) -> bool: """ Method to start the interface. @@ -89,7 +89,7 @@ def bootup( self, command_lst: list[str], stop_function: Optional[callable] = None, - ): + ) -> bool: """ Method to start the subprocess interface. @@ -103,6 +103,7 @@ def bootup( cwd=self._cwd, stdin=subprocess.DEVNULL, ) + return True def generate_command(self, command_lst: list[str]) -> list[str]: """ diff --git a/executorlib/task_scheduler/interactive/pysqaspawner.py b/executorlib/task_scheduler/interactive/pysqaspawner.py index f65396a8..291621fc 100644 --- a/executorlib/task_scheduler/interactive/pysqaspawner.py +++ b/executorlib/task_scheduler/interactive/pysqaspawner.py @@ -77,10 +77,10 @@ def bootup( ) while True: if self._check_process_helper(command_lst=command_lst): - break + return True elif stop_function is not None and stop_function(): self.shutdown(wait=True) - break + return False else: sleep(1) # Wait for the process to start diff --git a/executorlib/task_scheduler/interactive/shared.py b/executorlib/task_scheduler/interactive/shared.py index 9ea8f52b..da2f265f 100644 --- a/executorlib/task_scheduler/interactive/shared.py +++ b/executorlib/task_scheduler/interactive/shared.py @@ -66,7 +66,7 @@ def execute_tasks( worker_id=worker_id, stop_function=stop_function, ) - if interface._spawner._process is None: + if interface is None: return if init_function is not None: interface.send_dict( @@ -84,22 +84,24 @@ def execute_tasks( if error_log_file is not None: task_dict["error_log_file"] = error_log_file if cache_directory is None: - _execute_task_without_cache( + result_flag = _execute_task_without_cache( interface=interface, task_dict=task_dict, future_queue=future_queue ) else: - _execute_task_with_cache( + result_flag = _execute_task_with_cache( interface=interface, task_dict=task_dict, future_queue=future_queue, cache_directory=cache_directory, cache_key=cache_key, ) + if not result_flag: + break def _execute_task_without_cache( interface: SocketInterface, task_dict: dict, future_queue: queue.Queue -): +) -> bool: """ Execute the task in the task_dict by communicating it via the interface. @@ -118,13 +120,14 @@ def _execute_task_without_cache( _reset_task_dict( future_obj=f, future_queue=future_queue, task_dict=task_dict ) - interface.restart() + return interface.restart() else: interface.shutdown(wait=True) _task_done(future_queue=future_queue) f.set_exception(exception=thread_exception) else: _task_done(future_queue=future_queue) + return True def _execute_task_with_cache( @@ -133,7 +136,7 @@ def _execute_task_with_cache( future_queue: queue.Queue, cache_directory: str, cache_key: Optional[str] = None, -): +) -> bool: """ Execute the task in the task_dict by communicating it via the interface using the cache in the cache directory. @@ -171,7 +174,7 @@ def _execute_task_with_cache( _reset_task_dict( future_obj=f, future_queue=future_queue, task_dict=task_dict ) - interface.restart() + return interface.restart() else: interface.shutdown(wait=True) _task_done(future_queue=future_queue) @@ -184,6 +187,7 @@ def _execute_task_with_cache( future = task_dict["future"] future.set_result(result) _task_done(future_queue=future_queue) + return True def _task_done(future_queue: queue.Queue): From 1c1f0f8015bf75c10ad703cef344bcf476647a9e Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sat, 30 Aug 2025 13:40:16 +0000 Subject: [PATCH 04/14] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- executorlib/standalone/interactive/communication.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/executorlib/standalone/interactive/communication.py b/executorlib/standalone/interactive/communication.py index c57e8d56..b8a70878 100644 --- a/executorlib/standalone/interactive/communication.py +++ b/executorlib/standalone/interactive/communication.py @@ -136,7 +136,7 @@ def restart(self): ): self._reset_socket() return False - return True + return True def shutdown(self, wait: bool = True): """ @@ -153,7 +153,7 @@ def shutdown(self, wait: bool = True): self._spawner.shutdown(wait=wait) self._reset_socket() return result - + def _reset_socket(self): if self._socket is not None: self._socket.close() From cf7e93bc8eaf5fb7439eaa103904c75cee43f1a5 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Sat, 30 Aug 2025 15:43:04 +0200 Subject: [PATCH 05/14] fix typing --- executorlib/standalone/interactive/communication.py | 8 ++++---- executorlib/standalone/interactive/spawner.py | 6 +++--- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/executorlib/standalone/interactive/communication.py b/executorlib/standalone/interactive/communication.py index b8a70878..b0c4bc39 100644 --- a/executorlib/standalone/interactive/communication.py +++ b/executorlib/standalone/interactive/communication.py @@ -1,7 +1,7 @@ import logging import sys from socket import gethostname -from typing import Any, Optional +from typing import Any, Callable, Optional import cloudpickle import zmq @@ -43,7 +43,7 @@ def __init__( self._logger = logging.getLogger("executorlib") self._spawner = spawner self._command_lst: list[str] = [] - self._stop_function: Optional[callable] = None + self._stop_function: Optional[Callable] = None def send_dict(self, input_dict: dict): """ @@ -108,7 +108,7 @@ def bind_to_random_port(self) -> int: def bootup( self, command_lst: list[str], - stop_function: Optional[callable] = None, + stop_function: Optional[Callable] = None, ) -> bool: """ Boot up the client process to connect to the SocketInterface. @@ -177,7 +177,7 @@ def interface_bootup( hostname_localhost: Optional[bool] = None, log_obj_size: bool = False, worker_id: Optional[int] = None, - stop_function: Optional[callable] = None, + stop_function: Optional[Callable] = None, ) -> Optional[SocketInterface]: """ Start interface for ZMQ communication diff --git a/executorlib/standalone/interactive/spawner.py b/executorlib/standalone/interactive/spawner.py index 2abcd148..ce90052b 100644 --- a/executorlib/standalone/interactive/spawner.py +++ b/executorlib/standalone/interactive/spawner.py @@ -1,7 +1,7 @@ import os import subprocess from abc import ABC, abstractmethod -from typing import Optional +from typing import Callable, Optional MPI_COMMAND = "mpiexec" @@ -29,7 +29,7 @@ def __init__( def bootup( self, command_lst: list[str], - stop_function: Optional[callable] = None, + stop_function: Optional[Callable] = None, ) -> bool: """ Method to start the interface. @@ -88,7 +88,7 @@ def __init__( def bootup( self, command_lst: list[str], - stop_function: Optional[callable] = None, + stop_function: Optional[Callable] = None, ) -> bool: """ Method to start the subprocess interface. From 93c29fd933231d6e4459e46134ae58f7280267b9 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Sat, 30 Aug 2025 15:52:58 +0200 Subject: [PATCH 06/14] fixes --- executorlib/task_scheduler/interactive/fluxspawner.py | 6 ++++-- executorlib/task_scheduler/interactive/pysqaspawner.py | 2 +- executorlib/task_scheduler/interactive/shared.py | 5 ++++- 3 files changed, 9 insertions(+), 4 deletions(-) diff --git a/executorlib/task_scheduler/interactive/fluxspawner.py b/executorlib/task_scheduler/interactive/fluxspawner.py index 5a35dd5c..378bbe92 100644 --- a/executorlib/task_scheduler/interactive/fluxspawner.py +++ b/executorlib/task_scheduler/interactive/fluxspawner.py @@ -1,5 +1,5 @@ import os -from typing import Optional +from typing import Callable, Optional import flux import flux.job @@ -75,7 +75,8 @@ def __init__( def bootup( self, command_lst: list[str], - ): + stop_function: Optional[Callable] = None, + ) -> bool: """ Boot up the client process to connect to the SocketInterface. @@ -126,6 +127,7 @@ def bootup( ) else: self._future = self._flux_executor.submit(jobspec=jobspec) + return True def shutdown(self, wait: bool = True): """ diff --git a/executorlib/task_scheduler/interactive/pysqaspawner.py b/executorlib/task_scheduler/interactive/pysqaspawner.py index 291621fc..31f57c8b 100644 --- a/executorlib/task_scheduler/interactive/pysqaspawner.py +++ b/executorlib/task_scheduler/interactive/pysqaspawner.py @@ -58,7 +58,7 @@ def __init__( def bootup( self, command_lst: list[str], - stop_function: Optional[callable] = None, + stop_function: Optional[Callable] = None, ): """ Method to start the subprocess interface. diff --git a/executorlib/task_scheduler/interactive/shared.py b/executorlib/task_scheduler/interactive/shared.py index da2f265f..492c0309 100644 --- a/executorlib/task_scheduler/interactive/shared.py +++ b/executorlib/task_scheduler/interactive/shared.py @@ -28,7 +28,7 @@ def execute_tasks( log_obj_size: bool = False, error_log_file: Optional[str] = None, worker_id: Optional[int] = None, - stop_function: Optional[callable] = None, + stop_function: Optional[Callable] = None, **kwargs, ) -> None: """ @@ -96,6 +96,9 @@ def execute_tasks( cache_key=cache_key, ) if not result_flag: + _task_done(future_queue=future_queue) + if queue_join_on_shutdown: + future_queue.join() break From 41c15478737f25a96f54807b5e635b5ce7c95037 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Sat, 30 Aug 2025 16:00:52 +0200 Subject: [PATCH 07/14] shutdown --- executorlib/task_scheduler/interactive/shared.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/executorlib/task_scheduler/interactive/shared.py b/executorlib/task_scheduler/interactive/shared.py index 492c0309..9d11f1ed 100644 --- a/executorlib/task_scheduler/interactive/shared.py +++ b/executorlib/task_scheduler/interactive/shared.py @@ -67,6 +67,9 @@ def execute_tasks( stop_function=stop_function, ) if interface is None: + _task_done(future_queue=future_queue) + if queue_join_on_shutdown: + future_queue.join() return if init_function is not None: interface.send_dict( From b24afb1498336fd9c524573d50934ab74a300fe8 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Sat, 30 Aug 2025 16:15:54 +0200 Subject: [PATCH 08/14] restructure --- .../task_scheduler/interactive/shared.py | 64 +++++++++---------- 1 file changed, 31 insertions(+), 33 deletions(-) diff --git a/executorlib/task_scheduler/interactive/shared.py b/executorlib/task_scheduler/interactive/shared.py index 9d11f1ed..47229855 100644 --- a/executorlib/task_scheduler/interactive/shared.py +++ b/executorlib/task_scheduler/interactive/shared.py @@ -66,43 +66,41 @@ def execute_tasks( worker_id=worker_id, stop_function=stop_function, ) - if interface is None: - _task_done(future_queue=future_queue) - if queue_join_on_shutdown: - future_queue.join() - return - if init_function is not None: - interface.send_dict( - input_dict={"init": True, "fn": init_function, "args": (), "kwargs": {}} - ) - while True: - task_dict = future_queue.get() - if "shutdown" in task_dict and task_dict["shutdown"]: - interface.shutdown(wait=task_dict["wait"]) - _task_done(future_queue=future_queue) - if queue_join_on_shutdown: - future_queue.join() - break - elif "fn" in task_dict and "future" in task_dict: - if error_log_file is not None: - task_dict["error_log_file"] = error_log_file - if cache_directory is None: - result_flag = _execute_task_without_cache( - interface=interface, task_dict=task_dict, future_queue=future_queue - ) - else: - result_flag = _execute_task_with_cache( - interface=interface, - task_dict=task_dict, - future_queue=future_queue, - cache_directory=cache_directory, - cache_key=cache_key, - ) - if not result_flag: + if interface is not None: + if init_function is not None: + interface.send_dict( + input_dict={"init": True, "fn": init_function, "args": (), "kwargs": {}} + ) + while True: + task_dict = future_queue.get() + if "shutdown" in task_dict and task_dict["shutdown"]: + interface.shutdown(wait=task_dict["wait"]) _task_done(future_queue=future_queue) if queue_join_on_shutdown: future_queue.join() break + elif "fn" in task_dict and "future" in task_dict: + if error_log_file is not None: + task_dict["error_log_file"] = error_log_file + if cache_directory is None: + result_flag = _execute_task_without_cache( + interface=interface, task_dict=task_dict, future_queue=future_queue + ) + else: + result_flag = _execute_task_with_cache( + interface=interface, + task_dict=task_dict, + future_queue=future_queue, + cache_directory=cache_directory, + cache_key=cache_key, + ) + if not result_flag: + if queue_join_on_shutdown: + future_queue.join() + break + else: + if queue_join_on_shutdown: + future_queue.join() def _execute_task_without_cache( From aec02c174fbb837486212d551f05fe288875ac59 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sat, 30 Aug 2025 14:16:03 +0000 Subject: [PATCH 09/14] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- executorlib/task_scheduler/interactive/shared.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/executorlib/task_scheduler/interactive/shared.py b/executorlib/task_scheduler/interactive/shared.py index 47229855..c55a2cd5 100644 --- a/executorlib/task_scheduler/interactive/shared.py +++ b/executorlib/task_scheduler/interactive/shared.py @@ -84,7 +84,9 @@ def execute_tasks( task_dict["error_log_file"] = error_log_file if cache_directory is None: result_flag = _execute_task_without_cache( - interface=interface, task_dict=task_dict, future_queue=future_queue + interface=interface, + task_dict=task_dict, + future_queue=future_queue, ) else: result_flag = _execute_task_with_cache( @@ -98,9 +100,8 @@ def execute_tasks( if queue_join_on_shutdown: future_queue.join() break - else: - if queue_join_on_shutdown: - future_queue.join() + elif queue_join_on_shutdown: + future_queue.join() def _execute_task_without_cache( From cfb44ab260586cc974969c40285026a88f458d7b Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Sat, 30 Aug 2025 16:51:56 +0200 Subject: [PATCH 10/14] the interface can only be none when it was cancelled before it started --- .../task_scheduler/interactive/shared.py | 62 +++++++++---------- 1 file changed, 30 insertions(+), 32 deletions(-) diff --git a/executorlib/task_scheduler/interactive/shared.py b/executorlib/task_scheduler/interactive/shared.py index c55a2cd5..5cfdae77 100644 --- a/executorlib/task_scheduler/interactive/shared.py +++ b/executorlib/task_scheduler/interactive/shared.py @@ -66,42 +66,40 @@ def execute_tasks( worker_id=worker_id, stop_function=stop_function, ) - if interface is not None: - if init_function is not None: - interface.send_dict( - input_dict={"init": True, "fn": init_function, "args": (), "kwargs": {}} - ) - while True: - task_dict = future_queue.get() - if "shutdown" in task_dict and task_dict["shutdown"]: + if init_function is not None and interface is not None: + interface.send_dict( + input_dict={"init": True, "fn": init_function, "args": (), "kwargs": {}} + ) + while True: + task_dict = future_queue.get() + if "shutdown" in task_dict and task_dict["shutdown"]: + if interface is not None: interface.shutdown(wait=task_dict["wait"]) - _task_done(future_queue=future_queue) + _task_done(future_queue=future_queue) + if queue_join_on_shutdown: + future_queue.join() + break + elif "fn" in task_dict and "future" in task_dict: + if error_log_file is not None: + task_dict["error_log_file"] = error_log_file + if cache_directory is None: + result_flag = _execute_task_without_cache( + interface=interface, + task_dict=task_dict, + future_queue=future_queue, + ) + else: + result_flag = _execute_task_with_cache( + interface=interface, + task_dict=task_dict, + future_queue=future_queue, + cache_directory=cache_directory, + cache_key=cache_key, + ) + if not result_flag: if queue_join_on_shutdown: future_queue.join() break - elif "fn" in task_dict and "future" in task_dict: - if error_log_file is not None: - task_dict["error_log_file"] = error_log_file - if cache_directory is None: - result_flag = _execute_task_without_cache( - interface=interface, - task_dict=task_dict, - future_queue=future_queue, - ) - else: - result_flag = _execute_task_with_cache( - interface=interface, - task_dict=task_dict, - future_queue=future_queue, - cache_directory=cache_directory, - cache_key=cache_key, - ) - if not result_flag: - if queue_join_on_shutdown: - future_queue.join() - break - elif queue_join_on_shutdown: - future_queue.join() def _execute_task_without_cache( From b48623916cc8a4b082fb6fd077563f8d85492eb1 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Sat, 30 Aug 2025 16:55:36 +0200 Subject: [PATCH 11/14] fix type hints --- executorlib/task_scheduler/interactive/shared.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/executorlib/task_scheduler/interactive/shared.py b/executorlib/task_scheduler/interactive/shared.py index 5cfdae77..95a81eba 100644 --- a/executorlib/task_scheduler/interactive/shared.py +++ b/executorlib/task_scheduler/interactive/shared.py @@ -82,13 +82,13 @@ def execute_tasks( elif "fn" in task_dict and "future" in task_dict: if error_log_file is not None: task_dict["error_log_file"] = error_log_file - if cache_directory is None: + if cache_directory is None and interface is not None: result_flag = _execute_task_without_cache( interface=interface, task_dict=task_dict, future_queue=future_queue, ) - else: + elif interface is not None: result_flag = _execute_task_with_cache( interface=interface, task_dict=task_dict, @@ -96,6 +96,8 @@ def execute_tasks( cache_directory=cache_directory, cache_key=cache_key, ) + else: + raise ValueError() if not result_flag: if queue_join_on_shutdown: future_queue.join() From 06626a8f593825d7d597cc5aaf975a33843ce3c5 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Sat, 30 Aug 2025 17:43:02 +0200 Subject: [PATCH 12/14] fixes --- .../task_scheduler/interactive/blockallocation.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/executorlib/task_scheduler/interactive/blockallocation.py b/executorlib/task_scheduler/interactive/blockallocation.py index d71bd049..eec242b1 100644 --- a/executorlib/task_scheduler/interactive/blockallocation.py +++ b/executorlib/task_scheduler/interactive/blockallocation.py @@ -13,6 +13,9 @@ from executorlib.task_scheduler.interactive.shared import execute_tasks +_task_schedulder_dict: dict = {} + + class BlockAllocationTaskScheduler(TaskSchedulerBase): """ The executorlib.interactive.executor.InteractiveExecutor leverages the exeutorlib executor to distribute python @@ -61,7 +64,9 @@ def __init__( executor_kwargs["queue_join_on_shutdown"] = False self._process_kwargs = executor_kwargs self._max_workers = max_workers - self._shutdown_flag = False + self_id = id(self) + self._self_id = self_id + _task_schedulder_dict[self._self_id] = False self._set_process( process=[ Thread( @@ -69,7 +74,7 @@ def __init__( kwargs=executor_kwargs | { "worker_id": worker_id, - "stop_function": lambda: self._shutdown_flag, + "stop_function": lambda: _task_schedulder_dict[self_id], }, ) for worker_id in range(self._max_workers) @@ -162,6 +167,7 @@ def shutdown(self, wait: bool = True, *, cancel_futures: bool = False): cancel_items_in_queue(que=self._future_queue) self._shutdown_flag = True if isinstance(self._process, list): + _task_schedulder_dict[self._self_id] = True for _ in range(len(self._process)): self._future_queue.put({"shutdown": True, "wait": wait}) if wait: From ab97c08f3cafa9e46dbbe0b724fd663da5ecdad0 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sat, 30 Aug 2025 15:43:16 +0000 Subject: [PATCH 13/14] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- executorlib/task_scheduler/interactive/blockallocation.py | 1 - 1 file changed, 1 deletion(-) diff --git a/executorlib/task_scheduler/interactive/blockallocation.py b/executorlib/task_scheduler/interactive/blockallocation.py index eec242b1..2e1d1f02 100644 --- a/executorlib/task_scheduler/interactive/blockallocation.py +++ b/executorlib/task_scheduler/interactive/blockallocation.py @@ -12,7 +12,6 @@ from executorlib.task_scheduler.base import TaskSchedulerBase from executorlib.task_scheduler.interactive.shared import execute_tasks - _task_schedulder_dict: dict = {} From 9e13b09932f2d93a48346bbec177ae4f60ef9839 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Sat, 30 Aug 2025 17:52:15 +0200 Subject: [PATCH 14/14] be more explizit with types --- executorlib/task_scheduler/interactive/shared.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/executorlib/task_scheduler/interactive/shared.py b/executorlib/task_scheduler/interactive/shared.py index 95a81eba..fea9f86a 100644 --- a/executorlib/task_scheduler/interactive/shared.py +++ b/executorlib/task_scheduler/interactive/shared.py @@ -88,7 +88,7 @@ def execute_tasks( task_dict=task_dict, future_queue=future_queue, ) - elif interface is not None: + elif cache_directory is not None and interface is not None: result_flag = _execute_task_with_cache( interface=interface, task_dict=task_dict,