Skip to content

Conversation

@jan-janssen
Copy link
Member

@jan-janssen jan-janssen commented Aug 31, 2025

Summary by CodeRabbit

  • New Features

    • Stoppable interactive clients with controlled boot/shutdown and optional stop callbacks.
    • Bounded automatic retry for failed worker startups and task re-queuing.
  • Bug Fixes

    • More resilient task submission: connection failures treated non-fatally and distinguished from other errors.
    • Safer shutdown and initialization to avoid spurious restarts and empty-queue errors.
    • Prevent resubmission when futures are already completed or canceled.
  • Documentation

    • Documented restart_limit resource option across executors.
  • Tests

    • Added unit tests covering interactive boot, stop behavior, and task submission paths.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Aug 31, 2025

Caution

Review failed

The pull request is closed.

Walkthrough

Adds stoppable boot semantics and boot-status tracking to interactive SocketInterface and spawners; propagates new optional stop_function through interface_bootup and spawner.bootup; changes task submission helpers to return boolean for transient socket errors, adds task reset/retry flow and per-instance boot interruption in block allocation; updates tests and docstrings accordingly.

Changes

Cohort / File(s) Summary
Interactive communication & spawners
executorlib/standalone/interactive/communication.py, executorlib/standalone/interactive/spawner.py, executorlib/task_scheduler/interactive/spawner_flux.py
Add Callable typing; SocketInterface stores last command_lst and optional stop_function, adds status property; receive_dict raises ExecutorlibSocketError on poll failure; SocketInterface.bootup signature now command_lst: Optional[list[str]] = None, stop_function: Optional[Callable] = None and persists inputs, validates presence of a command, updates _booted_sucessfully and resets socket on failure; spawner bootup signatures accept stop_function: Optional[Callable] = None and return bool; docstrings updated.
Task execution core & helpers
executorlib/task_scheduler/interactive/shared.py
execute_task_dict now accepts interface: Optional[SocketInterface] = None and returns bool; internal helpers _execute_task_without_cache and _execute_task_with_cache return bool; ExecutorlibSocketError treated as a non-fatal submission signal (return False); added reset_task_dict to re-enqueue tasks and mark futures back to PENDING; task_done guarded to suppress ValueError; imports and docstrings expanded.
Block allocation scheduler
executorlib/task_scheduler/interactive/blockallocation.py
Introduce module-level _interrupt_bootup_dict and per-instance self_id; provide per-worker stop_function that reads the instance interrupt flag; _execute_multiple_tasks signature extended with stop_function: Optional[Callable] = None and restart_limit: int = 0; on shutdown set interrupt flag before teardown; use reset_task_dict to handle failed submissions and permit bounded retries via restart_limit.
One-to-one scheduler thread
executorlib/task_scheduler/interactive/onetoone.py
_execute_task_in_thread updated to check execute_task_dict(...) return value and, on falsy result, set future_obj exception to ExecutorlibSocketError("SocketInterface crashed during execution."); signature adjusted (removed explicit return annotation).
Executors docstrings
executorlib/executor/flux.py, executorlib/executor/single.py, executorlib/executor/slurm.py
Docstring updates: add restart_limit key description to resource_dict (default 0). No behavioral/signature changes.
Tests
tests/test_standalone_interactive_communication.py, tests/test_task_scheduler_interactive_shared.py
Tests updated to use SocketInterface.status rather than bootup boolean; add BrokenSpawner that fails bootup; add tests for incorrect boot input and broken spawner; add comprehensive tests for execute_task_dict including error handling and optional cache paths; adjust typing imports in tests.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  actor Caller
  participant Scheduler
  participant Interface as SocketInterface
  participant Spawner
  note over Scheduler,Interface: bootup with optional stop_function
  Caller->>Scheduler: interface_bootup(command_lst, ..., stop_function)
  Scheduler->>Interface: bootup(command_lst, stop_function)
  Interface->>Spawner: bootup(command_lst, stop_function)
  Spawner-->>Interface: bool (success?)
  alt success
    Interface-->>Scheduler: status=True (persisted cmd/stop)
    Scheduler-->>Caller: Interface
  else failure
    Interface-->>Scheduler: status=False (socket reset)
    Scheduler-->>Caller: Interface (status=False)
  end
Loading
sequenceDiagram
  autonumber
  actor Producer
  participant BlockScheduler as BlockAllocationWorker
  participant Shared as shared.execute_task_dict
  participant Interface as SocketInterface
  participant Queue as future_queue
  Producer->>BlockScheduler: submit task_dict + future
  BlockScheduler->>Interface: ensure init (if needed)
  BlockScheduler->>Shared: execute_task_dict(task_dict, future, Interface, ...)
  alt submission succeeded
    Shared-->>BlockScheduler: True
    BlockScheduler->>Queue: task_done()
  else transient socket error
    Shared-->>BlockScheduler: False
    BlockScheduler->>Shared: reset_task_dict(future, Queue, task_dict)
    opt restart allowed
      BlockScheduler->>Interface: bootup(stored_cmd, stored_stop)
      Interface-->>BlockScheduler: status(bool)
    end
  end
  note over BlockScheduler: on shutdown set interrupt flag before tearing down interface
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

Poem

I twitch my whiskers, press restart,
A tiny hop to mend the part.
If sockets snag, I nudge the queue,
Reset, retry — I'll see it through.
With carrot-debug and rabbit grace, I boot once more and win the race. 🐰

Warning

There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure.

🔧 Ruff (0.12.2)
executorlib/executor/flux.py

ruff: error while loading shared libraries: libpthread.so.0: cannot open shared object file: No such file or directory

executorlib/executor/single.py

ruff: error while loading shared libraries: libpthread.so.0: cannot open shared object file: No such file or directory

tests/test_standalone_interactive_communication.py

ruff: error while loading shared libraries: libpthread.so.0: cannot open shared object file: No such file or directory

  • 7 others

📜 Recent review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 9242429 and 87118a2.

📒 Files selected for processing (10)
  • executorlib/executor/flux.py (1 hunks)
  • executorlib/executor/single.py (1 hunks)
  • executorlib/executor/slurm.py (1 hunks)
  • executorlib/standalone/interactive/communication.py (7 hunks)
  • executorlib/task_scheduler/interactive/blockallocation.py (7 hunks)
  • executorlib/task_scheduler/interactive/onetoone.py (4 hunks)
  • executorlib/task_scheduler/interactive/shared.py (7 hunks)
  • executorlib/task_scheduler/interactive/spawner_flux.py (2 hunks)
  • tests/test_standalone_interactive_communication.py (10 hunks)
  • tests/test_task_scheduler_interactive_shared.py (1 hunks)
✨ Finishing Touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch interrupt_bootup

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@codecov
Copy link

codecov bot commented Aug 31, 2025

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 97.84%. Comparing base (3524942) to head (87118a2).
⚠️ Report is 2 commits behind head on main.

Additional details and impacted files
@@            Coverage Diff             @@
##             main     #801      +/-   ##
==========================================
+ Coverage   97.76%   97.84%   +0.07%     
==========================================
  Files          32       32              
  Lines        1479     1530      +51     
==========================================
+ Hits         1446     1497      +51     
  Misses         33       33              

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@jan-janssen jan-janssen marked this pull request as draft August 31, 2025 20:51
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (3)
executorlib/standalone/interactive/spawner.py (1)

128-141: shutdown(wait=False) still blocks — communicate() ignores wait and can hang.

communicate() is unnecessary here (stdout/stderr aren’t piped) and causes wait=False to block. Respect the wait flag and add a safe kill fallback.

-    if self._process is not None:
-        self._process.communicate()
-        self._process.terminate()
-        if wait:
-            self._process.wait()
-    self._process = None
+    if self._process is not None:
+        try:
+            if wait:
+                self._process.terminate()
+                self._process.wait()
+            else:
+                self._process.terminate()
+        finally:
+            self._process = None
executorlib/standalone/interactive/communication.py (1)

156-163: Guard against None socket after _reset_socket to avoid AttributeError.

shutdown() calls send_and_receive_dict unconditionally when the process is alive; this crashes when the socket was reset.

-        if self._spawner.poll():
+        if self._spawner.poll() and self._socket is not None:
             result = self.send_and_receive_dict(
                 input_dict={"shutdown": True, "wait": wait}
             )
             self._spawner.shutdown(wait=wait)
executorlib/task_scheduler/interactive/blockallocation.py (1)

106-115: New workers don’t receive stop_function/worker_id.

Scale-up path drops the interrupt semantics. Pass both, mirroring init.

-                new_process_lst = [
-                    Thread(
-                        target=_execute_multiple_tasks,
-                        kwargs=self._process_kwargs,
-                    )
-                    for _ in range(max_workers - self._max_workers)
-                ]
+                new_process_lst = []
+                for worker_id in range(self._max_workers, max_workers):
+                    new_process_lst.append(
+                        Thread(
+                            target=_execute_multiple_tasks,
+                            kwargs=self._process_kwargs
+                            | {
+                                "worker_id": worker_id,
+                                "stop_function": lambda: _interrupt_bootup_dict[self._self_id],
+                            },
+                        )
+                    )
🧹 Nitpick comments (9)
executorlib/standalone/interactive/spawner.py (1)

29-34: Define precise stop_function type and intent.

Use a callable signature to reflect the intended predicate and tighten the contract in both signature and docs.

-    def bootup(
-        self,
-        command_lst: list[str],
-        stop_function: Optional[Callable] = None,
-    ) -> bool:
+    def bootup(
+        self,
+        command_lst: list[str],
+        stop_function: Optional[Callable[[], bool]] = None,
+    ) -> bool:
@@
-            stop_function (Callable): Function to stop the interface.
+            stop_function (Callable[[], bool], optional): Predicate returning True if boot should be aborted.

Also applies to: 39-43

executorlib/task_scheduler/interactive/onetoone.py (1)

233-261: Thread target’s return value is ignored — bool result is misleading.

Thread doesn’t capture the returned bool; callers can’t observe it. Either remove the return type/statement or persist the flag via a queue/callback.

-def _execute_task_in_thread(..., **kwargs,) -> bool:
+def _execute_task_in_thread(..., **kwargs,) -> None:
@@
-    return execute_task_dict(
+    _ = execute_task_dict(
         ...
     )

If you need the flag, inject a results queue and put() it there.

executorlib/standalone/interactive/communication.py (2)

169-176: Also clear the poller when resetting to avoid stale registrations.

-        if self._socket is not None:
+        if self._socket is not None:
             self._socket.close()
-        if self._context is not None:
+        if self._context is not None:
             self._context.term()
         self._process = None
         self._socket = None
         self._context = None
+        self._poller = None

212-214: Docstring return type is outdated.

interface_bootup now returns Optional[SocketInterface].

-    Returns:
-         executorlib.shared.communication.SocketInterface: socket interface for zmq communication
+    Returns:
+         Optional[SocketInterface]: socket interface for ZMQ communication if bootup succeeds, otherwise None.
executorlib/task_scheduler/interactive/blockallocation.py (4)

21-21: Global interrupt dict needs cleanup to prevent stale entries.

Delete the per-instance key during shutdown after threads join.


172-179: Cleanup interrupt flag on full shutdown.

Avoid leaking keys after threads stop.

                 if wait:
                     for process in self._process:
                         process.join()
                     cancel_items_in_queue(que=self._future_queue)
                     self._future_queue.join()
+            # Clear interrupt flag for this instance
+            _interrupt_bootup_dict.pop(self._self_id, None)

234-235: Doc clarity: specify predicate signature for stop_function.

-        stop_function (Callable): Function to stop the interface.
+        stop_function (Callable[[], bool]): Predicate returning True if boot should be aborted.

269-279: Avoid double task_done() calls on failure path.

reset_task_dict already marks the current task done; calling task_done() beforehand is redundant (currently suppressed) and obscures intent.

-            if not result_flag:
-                task_done(future_queue=future_queue)
-                reset_task_dict(
+            if not result_flag:
+                reset_task_dict(
                     future_obj=f, future_queue=future_queue, task_dict=task_dict
                 )
                 if interface is not None:
-                    interface.restart()
+                    _ = interface.restart()
                 else:
                     break
             else:
                 task_done(future_queue=future_queue)
executorlib/task_scheduler/interactive/shared.py (1)

73-89: Docstring claims a bool return but function returns None.

Align docs with behavior.

 def reset_task_dict(future_obj: Future, future_queue: queue.Queue, task_dict: dict):
@@
-    Returns:
-        bool: True if the task was submitted successfully, False otherwise.
+    Returns:
+        None
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 016a2ba and 9242429.

📒 Files selected for processing (5)
  • executorlib/standalone/interactive/communication.py (7 hunks)
  • executorlib/standalone/interactive/spawner.py (4 hunks)
  • executorlib/task_scheduler/interactive/blockallocation.py (5 hunks)
  • executorlib/task_scheduler/interactive/onetoone.py (3 hunks)
  • executorlib/task_scheduler/interactive/shared.py (6 hunks)
🧰 Additional context used
🧬 Code graph analysis (4)
executorlib/task_scheduler/interactive/onetoone.py (1)
executorlib/task_scheduler/interactive/shared.py (1)
  • execute_task_dict (16-59)
executorlib/standalone/interactive/communication.py (1)
executorlib/standalone/interactive/spawner.py (2)
  • bootup (29-44)
  • bootup (92-114)
executorlib/task_scheduler/interactive/blockallocation.py (4)
executorlib/task_scheduler/interactive/shared.py (3)
  • execute_task_dict (16-59)
  • reset_task_dict (73-88)
  • task_done (62-70)
executorlib/task_scheduler/base.py (3)
  • _set_process (171-179)
  • future_queue (63-70)
  • shutdown (146-169)
executorlib/standalone/queue.py (1)
  • cancel_items_in_queue (4-19)
executorlib/standalone/interactive/communication.py (3)
  • send_dict (48-59)
  • shutdown (149-163)
  • restart (134-147)
executorlib/task_scheduler/interactive/shared.py (2)
executorlib/standalone/interactive/communication.py (4)
  • ExecutorlibSocketError (10-11)
  • SocketInterface (14-182)
  • send_and_receive_dict (84-96)
  • shutdown (149-163)
executorlib/standalone/hdf.py (1)
  • get_output (71-87)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (6)
  • GitHub Check: benchmark (ubuntu-latest, 3.13, .ci_support/environment-openmpi.yml)
  • GitHub Check: notebooks_integration
  • GitHub Check: benchmark (ubuntu-latest, 3.13, .ci_support/environment-mpich.yml)
  • GitHub Check: unittest_flux_mpich
  • GitHub Check: unittest_flux_openmpi
  • GitHub Check: notebooks
🔇 Additional comments (6)
executorlib/standalone/interactive/spawner.py (1)

142-149: LGTM: poll() semantics are clear and correct.

Returns True only while the process is alive.

executorlib/task_scheduler/interactive/onetoone.py (1)

262-277: Boot failure path may leave futures pending (interface can be None).

execute_task_dict will return False when interface is None, but in the OneProcess flow it’s not re-queued/reset like in BlockAllocation. Please verify this path doesn’t strand futures.

executorlib/task_scheduler/interactive/blockallocation.py (2)

72-75: Per-instance interrupt flag initialization looks good.

Capturing self_id avoids cross-instance clashes.


76-84: Nice: propagate stop_function to workers.

Ensures workers can observe shutdown intent during boot.

executorlib/task_scheduler/interactive/shared.py (2)

6-6: Private API usage (PENDING) — verify compatibility.

Future._state and PENDING are internal; confirm your supported Python versions, or consider an alternative reset strategy if this changes upstream.


106-115: Error handling semantics are solid.

ExecutorlibSocketError triggers non-fatal False (for retry), other errors set exception and cleanly shutdown.

Comment on lines 123 to 133
self._command_lst = command_lst
self._stop_function = stop_function
if not self._spawner.bootup(
command_lst=command_lst,
)
stop_function=stop_function,
):
self._reset_socket()
return False
else:
return True

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Honor stop_function at the interface layer as well.

Early-abort here too so we don’t even delegate to the spawner when a shutdown is in progress.

-        self._command_lst = command_lst
-        self._stop_function = stop_function
+        self._command_lst = command_lst
+        self._stop_function = stop_function
+        if stop_function and stop_function():
+            return False
         if not self._spawner.bootup(
             command_lst=command_lst,
             stop_function=stop_function,
         ):
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
self._command_lst = command_lst
self._stop_function = stop_function
if not self._spawner.bootup(
command_lst=command_lst,
)
stop_function=stop_function,
):
self._reset_socket()
return False
else:
return True
self._command_lst = command_lst
self._stop_function = stop_function
if stop_function and stop_function():
return False
if not self._spawner.bootup(
command_lst=command_lst,
stop_function=stop_function,
):
self._reset_socket()
return False
else:
return True
🤖 Prompt for AI Agents
In executorlib/standalone/interactive/communication.py around lines 123 to 133,
the method currently always delegates to the spawner even if a shutdown is
already in progress; call the provided stop_function() immediately after
assigning self._stop_function (and before calling self._spawner.bootup), and if
it indicates shutdown (truthy), call self._reset_socket() and return False so we
avoid delegating to the spawner; otherwise proceed to call self._spawner.bootup
as before.

Comment on lines 141 to 147
if not self._spawner.bootup(
command_lst=self._command_lst,
stop_function=self._stop_function,
):
self._reset_socket()
return False
return True
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

restart() can’t recover after _reset_socket — reinit socket and port.

If _reset_socket was called earlier, _context/_socket are None; re-create them, bind a new port, and update the stored command list before spawning.

-        if not self._spawner.bootup(
+        # Reinitialize socket when it was reset previously.
+        if self._socket is None or self._context is None:
+            self._context = zmq.Context()
+            self._socket = self._context.socket(zmq.PAIR)
+            self._poller = zmq.Poller()
+            self._poller.register(self._socket, zmq.POLLIN)
+            port = self.bind_to_random_port()
+            try:
+                idx = self._command_lst.index("--zmqport")
+                self._command_lst[idx + 1] = str(port)
+            except ValueError:
+                self._command_lst += ["--zmqport", str(port)]
+        if not self._spawner.bootup(
             command_lst=self._command_lst,
             stop_function=self._stop_function,
         ):
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if not self._spawner.bootup(
command_lst=self._command_lst,
stop_function=self._stop_function,
):
self._reset_socket()
return False
return True
# Reinitialize socket when it was reset previously.
if self._socket is None or self._context is None:
self._context = zmq.Context()
self._socket = self._context.socket(zmq.PAIR)
self._poller = zmq.Poller()
self._poller.register(self._socket, zmq.POLLIN)
port = self.bind_to_random_port()
try:
idx = self._command_lst.index("--zmqport")
self._command_lst[idx + 1] = str(port)
except ValueError:
self._command_lst += ["--zmqport", str(port)]
if not self._spawner.bootup(
command_lst=self._command_lst,
stop_function=self._stop_function,
):
self._reset_socket()
return False
return True
🤖 Prompt for AI Agents
In executorlib/standalone/interactive/communication.py around lines 141-147,
restart() currently assumes _context and _socket exist after a prior
_reset_socket call; modify restart() to detect if _context or _socket is None
and, when so, recreate the ZMQ context and socket, bind the socket to a new
ephemeral/available port, update the stored command list to include the new
port/address as required, and only then call self._spawner.bootup(...); ensure
_reset_socket clears references so this re-initialization always runs before
spawning and return False on any failure to reinit.

Comment on lines 92 to 97
def bootup(
self,
command_lst: list[str],
):
stop_function: Optional[Callable] = None,
) -> bool:
"""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Boot-interrupt is currently a no-op — stop_function is accepted but never used.

Without consulting stop_function, shutdown during bootup won’t be interrupted. Short-circuit before spawn, and abort immediately if the signal flips after spawn.

 def bootup(
     self,
     command_lst: list[str],
-    stop_function: Optional[Callable] = None,
+    stop_function: Optional[Callable[[], bool]] = None,
 ) -> bool:
@@
-        if self._cwd is not None:
+        if stop_function and stop_function():
+            return False
+        if self._cwd is not None:
             os.makedirs(self._cwd, exist_ok=True)
         self._process = subprocess.Popen(
             args=self.generate_command(command_lst=command_lst),
             cwd=self._cwd,
             stdin=subprocess.DEVNULL,
         )
-        return self.poll()
+        # Abort immediately if a stop is requested during/after spawn.
+        if stop_function and stop_function():
+            self.shutdown(wait=False)
+            return False
+        return self.poll()

Also applies to: 100-106, 114-114

🤖 Prompt for AI Agents
In executorlib/standalone/interactive/spawner.py around lines 92-97 (also check
100-106 and 114), the bootup method accepts stop_function but never calls it, so
shutdown signals during boot are ignored; modify bootup to check stop_function
immediately before starting the spawn and again right after spawn (and at any
other blocking points) and abort/return False if stop_function indicates a stop,
ensuring you short-circuit before launching the process and after spawn to clean
up; also propagate this check into the logic at lines 100-106 and 114 so any
in-progress boot can be interrupted and resources released.

@jan-janssen jan-janssen marked this pull request as ready for review September 7, 2025 20:32
@jan-janssen jan-janssen merged commit d93e777 into main Sep 7, 2025
62 of 63 checks passed
@jan-janssen jan-janssen deleted the interrupt_bootup branch September 7, 2025 20:32
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants