Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
5f13229
Interactive: Interrupt interface bootup when the executor is shutdown…
jan-janssen Aug 31, 2025
9242429
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Aug 31, 2025
7ce8719
fix flux interface
jan-janssen Aug 31, 2025
bd94aa1
Merge remote-tracking branch 'origin/main' into interrupt_bootup
jan-janssen Sep 1, 2025
0cf4154
Merge commit '92cd615b41bd33fc1480cb093a4b71075ee1a094' into interrup…
jan-janssen Sep 7, 2025
539556f
Merge commit 'faff4cecd04641f13e0402413c98340a5a25f897' into interrup…
jan-janssen Sep 7, 2025
47be5ce
Merge commit 'e05297e5abf9f7efefb534164dec4fac3edf7941' into interrup…
jan-janssen Sep 7, 2025
f22591b
sync changes
jan-janssen Sep 7, 2025
77c60a4
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 7, 2025
dd92338
fix type hints
jan-janssen Sep 7, 2025
d25859d
type fixes
jan-janssen Sep 7, 2025
b2ee7ed
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 7, 2025
8fc6d04
fixes
jan-janssen Sep 7, 2025
357263c
remove shutdown
jan-janssen Sep 7, 2025
026eb4c
Merge commit '3524942234ea82e8a0f93348a4572186361a8b94' into interrup…
jan-janssen Sep 7, 2025
c1fbfc4
fixes
jan-janssen Sep 7, 2025
3786880
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 7, 2025
7d07722
type fixes
jan-janssen Sep 7, 2025
3d1de42
Merge commit '37868801d6707492e8ea9d293dc8094567e789e6' into interrup…
jan-janssen Sep 7, 2025
355d941
add tests
jan-janssen Sep 7, 2025
1ab9fca
more tests
jan-janssen Sep 7, 2025
5ddaf7b
fix minimal
jan-janssen Sep 7, 2025
c4d44b8
fix interface
jan-janssen Sep 7, 2025
489aae9
clean up interface
jan-janssen Sep 7, 2025
73b55f8
fixes
jan-janssen Sep 7, 2025
cde4c07
fix type hints
jan-janssen Sep 7, 2025
53cb830
fix tests
jan-janssen Sep 7, 2025
45b6eb0
fix tests
jan-janssen Sep 7, 2025
8e60279
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 7, 2025
71cca2f
update docstring
jan-janssen Sep 7, 2025
7c4d8b2
Merge remote-tracking branch 'origin/interrupt_bootup' into interrupt…
jan-janssen Sep 7, 2025
79aa4a3
fix
jan-janssen Sep 7, 2025
20d0c6b
Add restart_limit to resource_dict
jan-janssen Sep 7, 2025
5e23ad5
Add error messages
jan-janssen Sep 7, 2025
2a472b5
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 7, 2025
87118a2
Use random hash
jan-janssen Sep 7, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions executorlib/executor/flux.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class FluxJobExecutor(BaseExecutor):
compute notes. Defaults to False.
- error_log_file (str): Name of the error log file to use for storing exceptions raised
by the Python functions submitted to the Executor.
- restart_limit (int): The maximum number of restarting worker processes. Default: 0
pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None
flux_executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux
flux_executor_nesting (bool): Provide hierarchically nested Flux job scheduler inside the submitted function.
Expand Down
1 change: 1 addition & 0 deletions executorlib/executor/single.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ def __init__(
only)
- error_log_file (str): Name of the error log file to use for storing exceptions
raised by the Python functions submitted to the Executor.
- restart_limit (int): The maximum number of restarting worker processes. Default: 0
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
context of an HPC cluster this essential to be able to communicate to an
Executor running on a different compute node within the same allocation. And
Expand Down
1 change: 1 addition & 0 deletions executorlib/executor/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class SlurmClusterExecutor(BaseExecutor):
- slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM only)
- error_log_file (str): Name of the error log file to use for storing exceptions raised
by the Python functions submitted to the Executor.
- restart_limit (int): The maximum number of restarting worker processes. Default: 0
pysqa_config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend).
pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
Expand Down
46 changes: 36 additions & 10 deletions executorlib/standalone/interactive/communication.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging
import sys
from socket import gethostname
from typing import Any, Optional
from typing import Any, Callable, Optional

import cloudpickle
import zmq
Expand Down Expand Up @@ -42,6 +42,17 @@ def __init__(
if log_obj_size:
self._logger = logging.getLogger("executorlib")
self._spawner = spawner
self._command_lst: list[str] = []
self._booted_sucessfully: bool = False
self._stop_function: Optional[Callable] = None

@property
def status(self) -> bool:
return self._booted_sucessfully

@status.setter
def status(self, status: bool):
self._booted_sucessfully = status

def send_dict(self, input_dict: dict):
"""
Expand All @@ -67,7 +78,9 @@ def receive_dict(self) -> dict:
while len(response_lst) == 0:
response_lst = self._poller.poll(self._time_out_ms)
if not self._spawner.poll():
raise ExecutorlibSocketError()
raise ExecutorlibSocketError(
"SocketInterface crashed during execution."
)
data = self._socket.recv(zmq.NOBLOCK)
if self._logger is not None:
self._logger.warning(
Expand Down Expand Up @@ -105,20 +118,30 @@ def bind_to_random_port(self) -> int:

def bootup(
self,
command_lst: list[str],
) -> bool:
command_lst: Optional[list[str]] = None,
stop_function: Optional[Callable] = None,
):
"""
Boot up the client process to connect to the SocketInterface.

Args:
command_lst (list): list of strings to start the client process

Returns:
bool: Whether the interface was successfully started.
stop_function (Callable): Function to stop the interface.
"""
return self._spawner.bootup(
command_lst=command_lst,
)
if command_lst is not None:
self._command_lst = command_lst
if stop_function is not None:
self._stop_function = stop_function
if len(self._command_lst) == 0:
raise ValueError("No command defined to boot up SocketInterface.")
if not self._spawner.bootup(
command_lst=self._command_lst,
stop_function=self._stop_function,
):
self._reset_socket()
self._booted_sucessfully = False
else:
self._booted_sucessfully = True

def shutdown(self, wait: bool = True):
"""
Expand Down Expand Up @@ -162,6 +185,7 @@ def interface_bootup(
hostname_localhost: Optional[bool] = None,
log_obj_size: bool = False,
worker_id: Optional[int] = None,
stop_function: Optional[Callable] = None,
) -> SocketInterface:
"""
Start interface for ZMQ communication
Expand All @@ -180,6 +204,7 @@ def interface_bootup(
log_obj_size (boolean): Enable debug mode which reports the size of the communicated objects.
worker_id (int): Communicate the worker which ID was assigned to it for future reference and resource
distribution.
stop_function (Callable): Function to stop the interface.

Returns:
executorlib.shared.communication.SocketInterface: socket interface for zmq communication
Expand All @@ -203,6 +228,7 @@ def interface_bootup(
]
interface.bootup(
command_lst=command_lst,
stop_function=stop_function,
)
return interface

Expand Down
16 changes: 13 additions & 3 deletions executorlib/standalone/interactive/spawner.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import os
import subprocess
from abc import ABC, abstractmethod
from typing import Optional
from typing import Callable, Optional

MPI_COMMAND = "mpiexec"

Expand Down Expand Up @@ -29,12 +29,17 @@ def __init__(
def bootup(
self,
command_lst: list[str],
):
stop_function: Optional[Callable] = None,
) -> bool:
"""
Method to start the interface.

Args:
command_lst (list[str]): The command list to execute.
stop_function (Callable): Function to stop the interface.

Returns:
bool: Whether the interface was successfully started.
"""
raise NotImplementedError

Expand Down Expand Up @@ -87,12 +92,17 @@ def __init__(
def bootup(
self,
command_lst: list[str],
):
stop_function: Optional[Callable] = None,
) -> bool:
"""
Comment on lines 92 to 97
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Boot-interrupt is currently a no-op — stop_function is accepted but never used.

Without consulting stop_function, shutdown during bootup won’t be interrupted. Short-circuit before spawn, and abort immediately if the signal flips after spawn.

 def bootup(
     self,
     command_lst: list[str],
-    stop_function: Optional[Callable] = None,
+    stop_function: Optional[Callable[[], bool]] = None,
 ) -> bool:
@@
-        if self._cwd is not None:
+        if stop_function and stop_function():
+            return False
+        if self._cwd is not None:
             os.makedirs(self._cwd, exist_ok=True)
         self._process = subprocess.Popen(
             args=self.generate_command(command_lst=command_lst),
             cwd=self._cwd,
             stdin=subprocess.DEVNULL,
         )
-        return self.poll()
+        # Abort immediately if a stop is requested during/after spawn.
+        if stop_function and stop_function():
+            self.shutdown(wait=False)
+            return False
+        return self.poll()

Also applies to: 100-106, 114-114

🤖 Prompt for AI Agents
In executorlib/standalone/interactive/spawner.py around lines 92-97 (also check
100-106 and 114), the bootup method accepts stop_function but never calls it, so
shutdown signals during boot are ignored; modify bootup to check stop_function
immediately before starting the spawn and again right after spawn (and at any
other blocking points) and abort/return False if stop_function indicates a stop,
ensuring you short-circuit before launching the process and after spawn to clean
up; also propagate this check into the logic at lines 100-106 and 114 so any
in-progress boot can be interrupted and resources released.

Method to start the subprocess interface.

Args:
command_lst (list[str]): The command list to execute.
stop_function (Callable): Function to stop the interface.

Returns:
bool: Whether the interface was successfully started.
"""
if self._cwd is not None:
os.makedirs(self._cwd, exist_ok=True)
Expand Down
107 changes: 81 additions & 26 deletions executorlib/task_scheduler/interactive/blockallocation.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import queue
import random
from concurrent.futures import Future
from threading import Thread
from typing import Callable, Optional
Expand All @@ -8,11 +9,21 @@
check_resource_dict,
check_resource_dict_is_empty,
)
from executorlib.standalone.interactive.communication import interface_bootup
from executorlib.standalone.interactive.communication import (
ExecutorlibSocketError,
SocketInterface,
interface_bootup,
)
from executorlib.standalone.interactive.spawner import BaseSpawner, MpiExecSpawner
from executorlib.standalone.queue import cancel_items_in_queue
from executorlib.task_scheduler.base import TaskSchedulerBase
from executorlib.task_scheduler.interactive.shared import execute_task_dict, task_done
from executorlib.task_scheduler.interactive.shared import (
execute_task_dict,
reset_task_dict,
task_done,
)

_interrupt_bootup_dict: dict = {}


class BlockAllocationTaskScheduler(TaskSchedulerBase):
Expand Down Expand Up @@ -63,11 +74,18 @@ def __init__(
executor_kwargs["queue_join_on_shutdown"] = False
self._process_kwargs = executor_kwargs
self._max_workers = max_workers
self_id = random.getrandbits(128)
self._self_id = self_id
_interrupt_bootup_dict[self._self_id] = False
self._set_process(
process=[
Thread(
target=_execute_multiple_tasks,
kwargs=executor_kwargs | {"worker_id": worker_id},
kwargs=executor_kwargs
| {
"worker_id": worker_id,
"stop_function": lambda: _interrupt_bootup_dict[self_id],
},
)
for worker_id in range(self._max_workers)
],
Expand Down Expand Up @@ -157,6 +175,7 @@ def shutdown(self, wait: bool = True, *, cancel_futures: bool = False):
if cancel_futures:
cancel_items_in_queue(que=self._future_queue)
if isinstance(self._process, list):
_interrupt_bootup_dict[self._self_id] = True
for _ in range(len(self._process)):
self._future_queue.put({"shutdown": True, "wait": wait})
if wait:
Expand Down Expand Up @@ -190,6 +209,8 @@ def _execute_multiple_tasks(
log_obj_size: bool = False,
error_log_file: Optional[str] = None,
worker_id: Optional[int] = None,
stop_function: Optional[Callable] = None,
restart_limit: int = 0,
**kwargs,
) -> None:
"""
Expand All @@ -216,6 +237,8 @@ def _execute_multiple_tasks(
submitted to the Executor.
worker_id (int): Communicate the worker which ID was assigned to it for future reference and resource
distribution.
stop_function (Callable): Function to stop the interface.
restart_limit (int): The maximum number of restarting worker processes.
"""
interface = interface_bootup(
command_lst=get_interactive_execute_command(
Expand All @@ -225,34 +248,66 @@ def _execute_multiple_tasks(
hostname_localhost=hostname_localhost,
log_obj_size=log_obj_size,
worker_id=worker_id,
stop_function=stop_function,
)
interface_initialization_exception = _set_init_function(
interface=interface,
init_function=init_function,
)
restart_counter = 0
while True:
if not interface.status and restart_counter > restart_limit:
interface.status = True # no more restarts
interface_initialization_exception = ExecutorlibSocketError(
"SocketInterface crashed during execution."
)
elif not interface.status:
interface.bootup()
interface_initialization_exception = _set_init_function(
interface=interface,
init_function=init_function,
)
restart_counter += 1
else: # interface.status == True
task_dict = future_queue.get()
if "shutdown" in task_dict and task_dict["shutdown"]:
if interface.status:
interface.shutdown(wait=task_dict["wait"])
task_done(future_queue=future_queue)
if queue_join_on_shutdown:
future_queue.join()
break
elif "fn" in task_dict and "future" in task_dict:
f = task_dict.pop("future")
if interface_initialization_exception is not None:
f.set_exception(exception=interface_initialization_exception)
else:
# The interface failed during the execution
interface.status = execute_task_dict(
task_dict=task_dict,
future_obj=f,
interface=interface,
cache_directory=cache_directory,
cache_key=cache_key,
error_log_file=error_log_file,
)
if not interface.status:
reset_task_dict(
future_obj=f, future_queue=future_queue, task_dict=task_dict
)
task_done(future_queue=future_queue)


def _set_init_function(
interface: SocketInterface,
init_function: Optional[Callable] = None,
) -> Optional[Exception]:
interface_initialization_exception = None
if init_function is not None:
if init_function is not None and interface.status:
try:
_ = interface.send_and_receive_dict(
input_dict={"init": True, "fn": init_function, "args": (), "kwargs": {}}
)
except Exception as init_exception:
interface_initialization_exception = init_exception
while True:
task_dict = future_queue.get()
if "shutdown" in task_dict and task_dict["shutdown"]:
interface.shutdown(wait=task_dict["wait"])
task_done(future_queue=future_queue)
if queue_join_on_shutdown:
future_queue.join()
break
elif "fn" in task_dict and "future" in task_dict:
f = task_dict.pop("future")
if interface_initialization_exception is not None:
f.set_exception(exception=interface_initialization_exception)
else:
execute_task_dict(
task_dict=task_dict,
future_obj=f,
interface=interface,
cache_directory=cache_directory,
cache_key=cache_key,
error_log_file=error_log_file,
)
task_done(future_queue=future_queue)
return interface_initialization_exception
14 changes: 10 additions & 4 deletions executorlib/task_scheduler/interactive/onetoone.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@
from typing import Optional

from executorlib.standalone.command import get_interactive_execute_command
from executorlib.standalone.interactive.communication import interface_bootup
from executorlib.standalone.interactive.communication import (
ExecutorlibSocketError,
interface_bootup,
)
from executorlib.standalone.interactive.spawner import BaseSpawner, MpiExecSpawner
from executorlib.task_scheduler.base import TaskSchedulerBase
from executorlib.task_scheduler.interactive.shared import execute_task_dict
Expand Down Expand Up @@ -230,7 +233,7 @@ def _execute_task_in_thread(
error_log_file: Optional[str] = None,
worker_id: Optional[int] = None,
**kwargs,
) -> None:
):
"""
Execute a single tasks in parallel using the message passing interface (MPI).

Expand All @@ -256,7 +259,7 @@ def _execute_task_in_thread(
worker_id (int): Communicate the worker which ID was assigned to it for future reference and resource
distribution.
"""
execute_task_dict(
if not execute_task_dict(
task_dict=task_dict,
future_obj=future_obj,
interface=interface_bootup(
Expand All @@ -271,4 +274,7 @@ def _execute_task_in_thread(
cache_directory=cache_directory,
cache_key=cache_key,
error_log_file=error_log_file,
)
):
future_obj.set_exception(
ExecutorlibSocketError("SocketInterface crashed during execution.")
)
Loading
Loading