Skip to content

Commit 6754a50

Browse files
[3.14] gh-132969: Fix error/hang when shutdown(wait=False) and task exited abnormally (GH-133222) (GH-135344)
gh-132969: Fix error/hang when shutdown(wait=False) and task exited abnormally (GH-133222) When shutdown is called with wait=False, the executor thread keeps running even after the ProcessPoolExecutor's state is reset. The executor then tries to replenish the worker processes pool resulting in an error and a potential hang when it comes across a worker that has died. Fixed the issue by having _adjust_process_count() return without doing anything if the ProcessPoolExecutor's state has been reset. Added unit tests to validate two scenarios: max_workers < num_tasks (exception) max_workers > num_tasks (exception + hang) (cherry picked from commit 598aa7c) Co-authored-by: Ajay Kamdar <140011370+ogbiggles@users.noreply.github.com>
1 parent 8a78ee6 commit 6754a50

File tree

3 files changed

+70
-0
lines changed

3 files changed

+70
-0
lines changed

Lib/concurrent/futures/process.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -755,6 +755,11 @@ def _start_executor_manager_thread(self):
755755
self._executor_manager_thread_wakeup
756756

757757
def _adjust_process_count(self):
758+
# gh-132969: avoid error when state is reset and executor is still running,
759+
# which will happen when shutdown(wait=False) is called.
760+
if self._processes is None:
761+
return
762+
758763
# if there's an idle process, we don't need to spawn a new one.
759764
if self._idle_worker_semaphore.acquire(blocking=False):
760765
return

Lib/test/test_concurrent_futures/test_shutdown.py

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -330,6 +330,64 @@ def test_shutdown_no_wait(self):
330330
# shutdown.
331331
assert all([r == abs(v) for r, v in zip(res, range(-5, 5))])
332332

333+
@classmethod
334+
def _failing_task_gh_132969(cls, n):
335+
raise ValueError("failing task")
336+
337+
@classmethod
338+
def _good_task_gh_132969(cls, n):
339+
time.sleep(0.1 * n)
340+
return n
341+
342+
def _run_test_issue_gh_132969(self, max_workers):
343+
# max_workers=2 will repro exception
344+
# max_workers=4 will repro exception and then hang
345+
346+
# Repro conditions
347+
# max_tasks_per_child=1
348+
# a task ends abnormally
349+
# shutdown(wait=False) is called
350+
start_method = self.get_context().get_start_method()
351+
if (start_method == "fork" or
352+
(start_method == "forkserver" and sys.platform.startswith("win"))):
353+
self.skipTest(f"Skipping test for {start_method = }")
354+
executor = futures.ProcessPoolExecutor(
355+
max_workers=max_workers,
356+
max_tasks_per_child=1,
357+
mp_context=self.get_context())
358+
f1 = executor.submit(ProcessPoolShutdownTest._good_task_gh_132969, 1)
359+
f2 = executor.submit(ProcessPoolShutdownTest._failing_task_gh_132969, 2)
360+
f3 = executor.submit(ProcessPoolShutdownTest._good_task_gh_132969, 3)
361+
result = 0
362+
try:
363+
result += f1.result()
364+
result += f2.result()
365+
result += f3.result()
366+
except ValueError:
367+
# stop processing results upon first exception
368+
pass
369+
370+
# Ensure that the executor cleans up after called
371+
# shutdown with wait=False
372+
executor_manager_thread = executor._executor_manager_thread
373+
executor.shutdown(wait=False)
374+
time.sleep(0.2)
375+
executor_manager_thread.join()
376+
return result
377+
378+
def test_shutdown_gh_132969_case_1(self):
379+
# gh-132969: test that exception "object of type 'NoneType' has no len()"
380+
# is not raised when shutdown(wait=False) is called.
381+
result = self._run_test_issue_gh_132969(2)
382+
self.assertEqual(result, 1)
383+
384+
def test_shutdown_gh_132969_case_2(self):
385+
# gh-132969: test that process does not hang and
386+
# exception "object of type 'NoneType' has no len()" is not raised
387+
# when shutdown(wait=False) is called.
388+
result = self._run_test_issue_gh_132969(4)
389+
self.assertEqual(result, 1)
390+
333391

334392
create_executor_tests(globals(), ProcessPoolShutdownTest,
335393
executor_mixins=(ProcessPoolForkMixin,
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
Prevent the :class:`~concurrent.futures.ProcessPoolExecutor` executor thread,
2+
which remains running when :meth:`shutdown(wait=False)
3+
<concurrent.futures.Executor.shutdown>`, from
4+
attempting to adjust the pool's worker processes after the object state has already been reset during shutdown.
5+
A combination of conditions, including a worker process having terminated abormally,
6+
resulted in an exception and a potential hang when the still-running executor thread
7+
attempted to replace dead workers within the pool.

0 commit comments

Comments
 (0)