Skip to content

Fix dead worker draining shared task queue after restart_limit exhausted#946

Merged
jan-janssen merged 5 commits intopyiron:mainfrom
IlgarBaghishov:fix/dead-worker-queue-drain
Mar 9, 2026
Merged

Fix dead worker draining shared task queue after restart_limit exhausted#946
jan-janssen merged 5 commits intopyiron:mainfrom
IlgarBaghishov:fix/dead-worker-queue-drain

Conversation

@IlgarBaghishov
Copy link
Contributor

@IlgarBaghishov IlgarBaghishov commented Mar 6, 2026

Problem

Two bugs in _execute_multiple_tasks() in blockallocation.py:

Bug 1: Dead worker drains the shared task queue

When block_allocation=True and a worker process exhausts its restart_limit, the permanently dead worker thread sets interface.status = True (faking that the worker is alive) and stores an ExecutorlibSocketError as interface_initialization_exception, but continues the while True loop. On the next iteration, it falls into the else branch (since interface.status is now True), calls future_queue.get(), and immediately fails the task with the stored exception. This repeats indefinitely.

Since the dead worker "processes" tasks instantly (just sets an exception on the future), it pulls from the shared queue orders of magnitude faster than healthy workers doing real computation. A single dead worker effectively drains the entire queue, starving all healthy workers.

Evidence from a real run: 12 workers (2 nodes x 2 GPUs x 6 jobs/GPU), 159,782 total tasks. One worker crashed and exhausted its restart_limit. It drained the entire shared queue — each of the 11 healthy workers only completed ~1 task before the queue was empty. All 159,782 futures were reported as "FINISHED" (nearly all were errors from the dead worker).

Bug 2: Off-by-one in restart_limit

The condition was restart_counter > restart_limit, but restart_counter is incremented after bootup(). So with restart_limit=2:

Event restart_counter before check > 2? Result
Initial start Normal boot
Crash -> restart 1 0 No Restarts
Crash -> restart 2 1 No Restarts
Crash -> restart 3 2 No Restarts (unexpected!)
Crash 3 Yes Permanently dead

restart_limit=N was allowing N+1 restarts. restart_limit=0 was allowing 1 restart instead of none.

Fix

Fix 1: Dead worker recycling

When a worker is permanently dead (restart_counter >= restart_limit), instead of faking the interface as alive and continuing the main loop, it now enters a dedicated inner loop where it:

  1. Recycles tasks back into the shared queue if healthy workers still exist (so they pick them up)
  2. Fails tasks immediately if all workers are dead (prevents infinite recycling/hang)
  3. Consumes its shutdown message and breaks cleanly (prevents queue.join() hang in shutdown())

A shared alive_workers counter (a single-element list [N] for pass-by-reference) protected by a threading.Lock tracks how many worker threads are still alive. Each permanently dead worker atomically decrements this counter and checks whether healthy workers remain.

Why not just break?

shutdown() sends one {"shutdown": True} message per worker thread, then calls queue.join(). If a dead thread exits without consuming its shutdown message, queue.join() waits forever (unfinished_tasks never reaches 0). The fix ensures each thread — alive or dead — consumes exactly one shutdown message.

Why the counter?

Shutdown messages and real tasks share the same queue, so the dead worker must keep calling queue.get() to find its shutdown message. When it pulls a real task, it must decide: recycle (healthy workers exist to pick it up) or fail (all workers dead, prevent infinite bouncing). The alive_workers counter is the minimal mechanism to make this decision correctly in all cases.

The new parameters (alive_workers, alive_workers_lock) are Optional with None defaults for backward compatibility. When None, tasks are failed immediately (safe fallback).

Fix 2: Off-by-one

Changed restart_counter > restart_limit to restart_counter >= restart_limit. Now restart_limit=N means exactly N restarts.

Note: This is a behavior change — users who previously set restart_limit=0 and got 1 restart will now get 0 restarts. This is the correct/expected semantic.

Test plan

Tested on TACC Vista HPC (2 nodes, Flux scheduler):

  • Some-dead-workers: 1 of 4 workers dies after exhausting restart_limit -> all 20 tasks completed by the remaining 3 healthy workers (dead worker's tasks recycled)
  • All-workers-dead: All 4 workers die -> 10 tasks fail immediately with ExecutorlibSocketError, no hang
  • Normal operation (no worker deaths): unaffected — the fix code is only reached when restart_counter >= restart_limit
  • Full integration test with real workload (12 workers, 2 deliberately crashing): all non-crashing tasks completed successfully, dead workers correctly limited to restart_limit restarts

Summary by CodeRabbit

  • Bug Fixes

    • Improved handling of permanently dead worker threads: worker count is tracked and decremented safely, pending tasks are either recycled or explicitly failed to prevent hangs and task loss.
  • Documentation

    • Clarified docstrings and signatures describing worker-liveness and shutdown semantics.
  • Tests

    • Added a unit test verifying dead-worker draining behavior and that pending tasks are failed when no workers remain.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Mar 6, 2026

📝 Walkthrough

Walkthrough

Adds cooperative worker-death tracking to the block-allocation scheduler: an alive_workers counter (list) and alive_workers_lock are introduced and passed to worker threads. _execute_multiple_tasks signature is extended and a new _drain_dead_worker reclaims or fails queued tasks when workers are permanently lost.

Changes

Cohort / File(s) Summary
Scheduler & worker wiring
src/executorlib/task_scheduler/interactive/blockallocation.py
Introduce alive_workers (list) and alive_workers_lock in scheduler init; pass both into worker thread kwargs; import Lock.
Execution & restart logic
src/executorlib/task_scheduler/interactive/blockallocation.py
Extend _execute_multiple_tasks(...) signature to accept alive_workers and alive_workers_lock; change restart logic to call _drain_dead_worker when restart limit reached instead of raising socket-init error.
Dead-worker handling
src/executorlib/task_scheduler/interactive/blockallocation.py
Add _drain_dead_worker(future_queue, alive_workers, alive_workers_lock) to atomically decrement alive_workers, consume shutdown sentinel(s), and either recycle queued tasks or fail them with ExecutorlibSocketError.
Tests
tests/unit/task_scheduler/interactive/test_blockallocation.py
New unit test covering _drain_dead_worker: ensures alive_workers is decremented and pending task futures fail with ExecutorlibSocketError.

Sequence Diagram

sequenceDiagram
    participant Client
    participant Worker
    participant SharedState as "Shared State\n(alive_workers, lock)"
    participant Queue as "Task Queue"

    Client->>Queue: enqueue task (future)
    Queue->>Worker: deliver task
    Worker->>SharedState: check alive count (acquire lock)
    SharedState-->>Worker: allowed / denied (release lock)

    alt worker permanently dies / restart limit reached
        Worker->>SharedState: decrement alive_workers (with lock)
        SharedState-->>Worker: updated count
        alt alive_workers > 0
            Worker->>Queue: recycle queued tasks for other workers
        else no healthy workers
            Worker->>Queue: fail queued futures with ExecutorlibSocketError
        end
    else worker restarts normally
        Worker->>Client: complete task future
    end
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

Possibly related PRs

Poem

🐰 I count my kin with gentle care,

Locks held tight in moonlit air.
If one falls silent, I tidy the queue,
Rehome the hops or mourn them true.
Together we bound — steady and fair.

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 40.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately and specifically describes the main change: fixing dead worker behavior after restart_limit is exhausted, focusing on preventing queue draining when workers restart.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Tip

Try Coding Plans. Let us write the prompt for your AI agent so you can ship faster (with fewer bugs).
Share your feedback on Discord.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@codecov
Copy link

codecov bot commented Mar 6, 2026

Codecov Report

❌ Patch coverage is 96.66667% with 1 line in your changes missing coverage. Please review.
✅ Project coverage is 94.13%. Comparing base (92f0391) to head (0c906b6).
⚠️ Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
...rlib/task_scheduler/interactive/blockallocation.py 96.66% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #946      +/-   ##
==========================================
+ Coverage   94.10%   94.13%   +0.02%     
==========================================
  Files          39       39              
  Lines        2054     2080      +26     
==========================================
+ Hits         1933     1958      +25     
- Misses        121      122       +1     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@jan-janssen
Copy link
Member

@IlgarBaghishov Can you ask Claude to fix the issue?

PLR0915 Too many statements (56 > 50)
   --> src/executorlib/task_scheduler/interactive/blockallocation.py:218:5
    |
218 | def _execute_multiple_tasks(
    |     ^^^^^^^^^^^^^^^^^^^^^^^
219 |     future_queue: queue.Queue,
220 |     cores: int = 1,
    |

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/executorlib/task_scheduler/interactive/blockallocation.py`:
- Around line 86-87: The liveness counter alive_workers created at startup isn’t
kept in sync when max_workers is changed, so newly spawned workers don’t receive
the shared counter and scaled-down removals don’t update the count; update the
max_workers setter (the method that adds/removes threads) to modify the shared
counter stored on the scheduler (self._alive_workers[0]) whenever workers are
intentionally added or removed, ensure you hold the existing alive_workers_lock
while updating, and pass the same alive_workers (and its Lock) into any newly
spawned worker threads (the code that creates startup worker threads) so all
workers share and update the same counter.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 89f3f5c5-b8c4-4a5b-8ae4-4d1eb520e1b3

📥 Commits

Reviewing files that changed from the base of the PR and between 92f0391 and f9cebdb.

📒 Files selected for processing (1)
  • src/executorlib/task_scheduler/interactive/blockallocation.py

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
src/executorlib/task_scheduler/interactive/blockallocation.py (1)

359-359: Simplify key check with dict.get().

As flagged by static analysis (RUF019), the pattern "shutdown" in task_dict and task_dict["shutdown"] can be simplified.

♻️ Proposed fix
-        if "shutdown" in task_dict and task_dict["shutdown"]:
+        if task_dict.get("shutdown"):
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/executorlib/task_scheduler/interactive/blockallocation.py` at line 359,
Replace the explicit key-existence check "shutdown" in task_dict and
task_dict["shutdown"] with the simpler dict.get() form to read truthiness in one
step; locate the occurrence in blockallocation.py where the variable task_dict
is inspected (the conditional using "shutdown") and change it to use
task_dict.get("shutdown") so it returns False/None when missing and avoids the
redundant membership test.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/executorlib/task_scheduler/interactive/blockallocation.py`:
- Around line 354-371: The loop uses a stale snapshot of has_healthy_workers
which can cause requeued tasks to be orphaned; modify the while True loop in
blockallocation.py to recompute/validate has_healthy_workers under the shared
lock before deciding to requeue: acquire the same lock used to manage
alive_workers, compute has_healthy_workers = (alive_workers is not None and
len(alive_workers) > 0) (or equivalent), and if it is now False, pop the
"future" from task_dict and call f.set_exception(ExecutorlibSocketError(...))
and task_done(...) instead of re-putting the task into future_queue; keep the
existing fallback for alive_workers is None behavior.

---

Nitpick comments:
In `@src/executorlib/task_scheduler/interactive/blockallocation.py`:
- Line 359: Replace the explicit key-existence check "shutdown" in task_dict and
task_dict["shutdown"] with the simpler dict.get() form to read truthiness in one
step; locate the occurrence in blockallocation.py where the variable task_dict
is inspected (the conditional using "shutdown") and change it to use
task_dict.get("shutdown") so it returns False/None when missing and avoids the
redundant membership test.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 2755c938-c8dd-4d0a-8c51-7b0f6158c8b0

📥 Commits

Reviewing files that changed from the base of the PR and between f9cebdb and 5c0ec0f.

📒 Files selected for processing (1)
  • src/executorlib/task_scheduler/interactive/blockallocation.py

Comment on lines +354 to +371
while True:
try:
task_dict = future_queue.get(timeout=1)
except queue.Empty:
continue
if "shutdown" in task_dict and task_dict["shutdown"]:
task_done(future_queue=future_queue)
break
elif "fn" in task_dict and "future" in task_dict:
if has_healthy_workers:
future_queue.put(task_dict)
task_done(future_queue=future_queue)
else:
f = task_dict.pop("future")
f.set_exception(
ExecutorlibSocketError("SocketInterface crashed during execution.")
)
task_done(future_queue=future_queue)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Race condition can orphan tasks when all workers are dead.

When has_healthy_workers=True is captured at entry (Line 351), this value becomes stale if the remaining healthy workers subsequently die. A dead worker with stale has_healthy_workers=True will keep requeuing tasks, while the last dead worker (with has_healthy_workers=False) may consume its shutdown message and exit before processing all requeued items.

Example scenario with 2 workers both exhausting restarts:

  1. Worker A enters drain first: alive_workers 2→1, has_healthy_workers=True
  2. Worker B enters drain: alive_workers 1→0, has_healthy_workers=False
  3. Worker A requeues a task, Worker B fails another task
  4. Worker B gets its shutdown message and exits
  5. Worker A gets its shutdown message and exits
  6. The task requeued by A after B exited is orphaned (future never completes)

Consider re-checking has_healthy_workers under the lock on each iteration, or having workers with has_healthy_workers=True fail tasks if they detect they're the last active thread in the drain.

💡 Suggested fix direction
     while True:
         try:
             task_dict = future_queue.get(timeout=1)
         except queue.Empty:
             continue
-        if "shutdown" in task_dict and task_dict["shutdown"]:
+        if task_dict.get("shutdown"):
             task_done(future_queue=future_queue)
             break
         elif "fn" in task_dict and "future" in task_dict:
-            if has_healthy_workers:
+            # Re-check under lock to avoid stale reads
+            with alive_workers_lock:
+                current_healthy = alive_workers[0] > 0 if alive_workers else False
+            if current_healthy:
                 future_queue.put(task_dict)
                 task_done(future_queue=future_queue)
             else:

This requires the lock and list to be non-None in this path, which they will be for initial workers. For the fallback case (alive_workers is None), the current behavior (fail immediately) is correct.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
while True:
try:
task_dict = future_queue.get(timeout=1)
except queue.Empty:
continue
if "shutdown" in task_dict and task_dict["shutdown"]:
task_done(future_queue=future_queue)
break
elif "fn" in task_dict and "future" in task_dict:
if has_healthy_workers:
future_queue.put(task_dict)
task_done(future_queue=future_queue)
else:
f = task_dict.pop("future")
f.set_exception(
ExecutorlibSocketError("SocketInterface crashed during execution.")
)
task_done(future_queue=future_queue)
while True:
try:
task_dict = future_queue.get(timeout=1)
except queue.Empty:
continue
if task_dict.get("shutdown"):
task_done(future_queue=future_queue)
break
elif "fn" in task_dict and "future" in task_dict:
# Re-check under lock to avoid stale reads
with alive_workers_lock:
current_healthy = alive_workers[0] > 0 if alive_workers else False
if current_healthy:
future_queue.put(task_dict)
task_done(future_queue=future_queue)
else:
f = task_dict.pop("future")
f.set_exception(
ExecutorlibSocketError("SocketInterface crashed during execution.")
)
task_done(future_queue=future_queue)
🧰 Tools
🪛 Ruff (0.15.4)

[warning] 359-359: Unnecessary key check before dictionary access

Replace with dict.get

(RUF019)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/executorlib/task_scheduler/interactive/blockallocation.py` around lines
354 - 371, The loop uses a stale snapshot of has_healthy_workers which can cause
requeued tasks to be orphaned; modify the while True loop in blockallocation.py
to recompute/validate has_healthy_workers under the shared lock before deciding
to requeue: acquire the same lock used to manage alive_workers, compute
has_healthy_workers = (alive_workers is not None and len(alive_workers) > 0) (or
equivalent), and if it is now False, pop the "future" from task_dict and call
f.set_exception(ExecutorlibSocketError(...)) and task_done(...) instead of
re-putting the task into future_queue; keep the existing fallback for
alive_workers is None behavior.

@IlgarBaghishov
Copy link
Contributor Author

@jan-janssen Fixed both "Too many statements (56 > 50)" and "Race condition can orphan tasks when all workers are dead."

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

♻️ Duplicate comments (1)
src/executorlib/task_scheduler/interactive/blockallocation.py (1)

86-87: ⚠️ Potential issue | 🟠 Major

Persist the shared liveness state on the scheduler.

alive_workers/alive_workers_lock only exist as locals here, so the max_workers setter later spawns threads from self._process_kwargs without the same shared state and never updates the count on intentional scale-downs. That leaves _drain_dead_worker() making recycle-vs-fail decisions from stale membership, which can still cause endless requeueing or premature task failure after resizing.

Also applies to: 104-105

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/executorlib/task_scheduler/interactive/blockallocation.py` around lines
86 - 87, The alive_workers and alive_workers_lock are defined as local variables
so other methods/threads (e.g., the max_workers setter which spawns threads via
self._process_kwargs and _drain_dead_worker) cannot share updated liveness
state, causing stale membership and incorrect drain/requeue behavior; refactor
by promoting alive_workers and alive_workers_lock to scheduler instance
attributes (e.g., self.alive_workers, self.alive_workers_lock), update all
references (including where threads are spawned in the max_workers setter and in
_drain_dead_worker) to use these instance-level symbols, and ensure any thread
creation receives self (or the instance attributes) rather than creating
independent locals so scale-downs correctly update the shared liveness state.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Duplicate comments:
In `@src/executorlib/task_scheduler/interactive/blockallocation.py`:
- Around line 86-87: The alive_workers and alive_workers_lock are defined as
local variables so other methods/threads (e.g., the max_workers setter which
spawns threads via self._process_kwargs and _drain_dead_worker) cannot share
updated liveness state, causing stale membership and incorrect drain/requeue
behavior; refactor by promoting alive_workers and alive_workers_lock to
scheduler instance attributes (e.g., self.alive_workers,
self.alive_workers_lock), update all references (including where threads are
spawned in the max_workers setter and in _drain_dead_worker) to use these
instance-level symbols, and ensure any thread creation receives self (or the
instance attributes) rather than creating independent locals so scale-downs
correctly update the shared liveness state.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 5740a1cf-1d8a-4bd1-add1-c0b1d621cd91

📥 Commits

Reviewing files that changed from the base of the PR and between 5c0ec0f and 431be63.

📒 Files selected for processing (1)
  • src/executorlib/task_scheduler/interactive/blockallocation.py

@jan-janssen
Copy link
Member

@IlgarBaghishov Could you also add a unit test in particular to the _drain_dead_worker() function?

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@tests/unit/task_scheduler/interactive/test_blockallocation.py`:
- Around line 12-33: The test only covers the branch where _drain_dead_worker
decrements alive_workers to 0 and fails the task; add a new unit test that
exercises the healthy-worker requeue path by providing alive_workers with more
than one entry (e.g., alive_workers = [1, 2]) and verifying that when
_drain_dead_worker encounters a shutdown sentinel it re-queues the task instead
of failing it: call _drain_dead_worker with that setup, then assert the task is
either requeued in future_queue (or that future.result() returns the expected
value) and that alive_workers was decremented appropriately for the dead worker;
reference the _drain_dead_worker helper and the future/
future_queue/future.result checks to locate where to add the test.
- Around line 31-33: The assertion awaiting the future currently calls
future.result() without a timeout; update the test that expects
ExecutorlibSocketError to call future.result with a short timeout (e.g., pass
timeout= a small number) so the test fails fast if _drain_dead_worker() does not
set the exception; locate the assertion around future.result() in
tests/unit/task_scheduler/interactive/test_blockallocation.py and modify that
call to include a timeout and still assert that ExecutorlibSocketError is
raised.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: dd01bab9-0bdc-438b-bd64-ac4b708fc9d5

📥 Commits

Reviewing files that changed from the base of the PR and between 431be63 and 0c906b6.

📒 Files selected for processing (1)
  • tests/unit/task_scheduler/interactive/test_blockallocation.py

Comment on lines +12 to +33
def test_fail_tasks_when_no_workers_remain(self):
future_queue = queue.Queue()
alive_workers = [1]
alive_workers_lock = Lock()
future = Future()

# Add a task and then the shutdown sentinel
future_queue.put({"fn": lambda: 42, "future": future})
future_queue.put({"shutdown": True})

_drain_dead_worker(
future_queue=future_queue,
alive_workers=alive_workers,
alive_workers_lock=alive_workers_lock,
)

# Worker count should be decremented
self.assertEqual(alive_workers[0], 0)

# Task should fail with ExecutorlibSocketError
with self.assertRaises(ExecutorlibSocketError):
future.result()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Add a test for the healthy-worker requeue path.

This only covers the alive_workers -> 0 branch. The regression fixed in src/executorlib/task_scheduler/interactive/blockallocation.py:335-374 was the case where a dead worker must put work back when another worker is still alive, and that behavior is still untested here. Without that case, the original queue-drain bug can come back while this file still passes.

🧪 Suggested follow-up test
 class TestDrainDeadWorker(unittest.TestCase):
+    def test_requeue_tasks_when_other_workers_remain(self):
+        future_queue = queue.Queue()
+        alive_workers = [2]
+        alive_workers_lock = Lock()
+        future = Future()
+        task = {"fn": lambda: 42, "future": future}
+
+        future_queue.put(task)
+        future_queue.put({"shutdown": True})
+
+        _drain_dead_worker(
+            future_queue=future_queue,
+            alive_workers=alive_workers,
+            alive_workers_lock=alive_workers_lock,
+        )
+
+        self.assertEqual(alive_workers[0], 1)
+        self.assertFalse(future.done())
+        self.assertEqual(future_queue.get_nowait(), task)
+        with self.assertRaises(queue.Empty):
+            future_queue.get_nowait()
+
     def test_fail_tasks_when_no_workers_remain(self):
         future_queue = queue.Queue()
         alive_workers = [1]
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def test_fail_tasks_when_no_workers_remain(self):
future_queue = queue.Queue()
alive_workers = [1]
alive_workers_lock = Lock()
future = Future()
# Add a task and then the shutdown sentinel
future_queue.put({"fn": lambda: 42, "future": future})
future_queue.put({"shutdown": True})
_drain_dead_worker(
future_queue=future_queue,
alive_workers=alive_workers,
alive_workers_lock=alive_workers_lock,
)
# Worker count should be decremented
self.assertEqual(alive_workers[0], 0)
# Task should fail with ExecutorlibSocketError
with self.assertRaises(ExecutorlibSocketError):
future.result()
def test_requeue_tasks_when_other_workers_remain(self):
future_queue = queue.Queue()
alive_workers = [2]
alive_workers_lock = Lock()
future = Future()
task = {"fn": lambda: 42, "future": future}
future_queue.put(task)
future_queue.put({"shutdown": True})
_drain_dead_worker(
future_queue=future_queue,
alive_workers=alive_workers,
alive_workers_lock=alive_workers_lock,
)
self.assertEqual(alive_workers[0], 1)
self.assertFalse(future.done())
self.assertEqual(future_queue.get_nowait(), task)
with self.assertRaises(queue.Empty):
future_queue.get_nowait()
def test_fail_tasks_when_no_workers_remain(self):
future_queue = queue.Queue()
alive_workers = [1]
alive_workers_lock = Lock()
future = Future()
# Add a task and then the shutdown sentinel
future_queue.put({"fn": lambda: 42, "future": future})
future_queue.put({"shutdown": True})
_drain_dead_worker(
future_queue=future_queue,
alive_workers=alive_workers,
alive_workers_lock=alive_workers_lock,
)
# Worker count should be decremented
self.assertEqual(alive_workers[0], 0)
# Task should fail with ExecutorlibSocketError
with self.assertRaises(ExecutorlibSocketError):
future.result()
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/unit/task_scheduler/interactive/test_blockallocation.py` around lines
12 - 33, The test only covers the branch where _drain_dead_worker decrements
alive_workers to 0 and fails the task; add a new unit test that exercises the
healthy-worker requeue path by providing alive_workers with more than one entry
(e.g., alive_workers = [1, 2]) and verifying that when _drain_dead_worker
encounters a shutdown sentinel it re-queues the task instead of failing it: call
_drain_dead_worker with that setup, then assert the task is either requeued in
future_queue (or that future.result() returns the expected value) and that
alive_workers was decremented appropriately for the dead worker; reference the
_drain_dead_worker helper and the future/ future_queue/future.result checks to
locate where to add the test.

Comment on lines +31 to +33
# Task should fail with ExecutorlibSocketError
with self.assertRaises(ExecutorlibSocketError):
future.result()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

cat -n tests/unit/task_scheduler/interactive/test_blockallocation.py | head -50

Repository: pyiron/executorlib

Length of output: 1864


🏁 Script executed:

find . -name "test_blockallocation.py" -type f

Repository: pyiron/executorlib

Length of output: 125


🏁 Script executed:

wc -l tests/unit/task_scheduler/interactive/test_blockallocation.py

Repository: pyiron/executorlib

Length of output: 126


🏁 Script executed:

cat -n tests/unit/task_scheduler/interactive/test_blockallocation.py

Repository: pyiron/executorlib

Length of output: 1417


🏁 Script executed:

find . -name "blockallocation.py" -type f | grep -v test

Repository: pyiron/executorlib

Length of output: 125


🏁 Script executed:

cat -n executorlib/task_scheduler/interactive/blockallocation.py

Repository: pyiron/executorlib

Length of output: 151


🏁 Script executed:

cat -n src/executorlib/task_scheduler/interactive/blockallocation.py

Repository: pyiron/executorlib

Length of output: 20515


Add a timeout to the future assertion for test robustness.

While _drain_dead_worker() correctly completes the future via f.set_exception(), a timeout safeguards against regressions where this path might be skipped, which would cause the test to hang indefinitely instead of failing fast.

Suggested change
         # Task should fail with ExecutorlibSocketError
         with self.assertRaises(ExecutorlibSocketError):
-            future.result()
+            future.result(timeout=1)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
# Task should fail with ExecutorlibSocketError
with self.assertRaises(ExecutorlibSocketError):
future.result()
# Task should fail with ExecutorlibSocketError
with self.assertRaises(ExecutorlibSocketError):
future.result(timeout=1)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/unit/task_scheduler/interactive/test_blockallocation.py` around lines
31 - 33, The assertion awaiting the future currently calls future.result()
without a timeout; update the test that expects ExecutorlibSocketError to call
future.result with a short timeout (e.g., pass timeout= a small number) so the
test fails fast if _drain_dead_worker() does not set the exception; locate the
assertion around future.result() in
tests/unit/task_scheduler/interactive/test_blockallocation.py and modify that
call to include a timeout and still assert that ExecutorlibSocketError is
raised.

@jan-janssen jan-janssen merged commit 847cae1 into pyiron:main Mar 9, 2026
34 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants