From 5d236cafd7126e640fb25541fcc7e0a494450143 Mon Sep 17 00:00:00 2001 From: Julien Palard Date: Sun, 4 Nov 2018 23:40:32 +0100 Subject: [PATCH] bpo-19675: Terminate processes if construction of a pool is failing. (GH-5614) --- Lib/multiprocessing/pool.py | 12 +++++- Lib/test/_test_multiprocessing.py | 43 +++++++++++++++++++ .../2018-02-10-23-41-05.bpo-19675.-dj35-.rst | 1 + 3 files changed, 54 insertions(+), 2 deletions(-) create mode 100644 Misc/NEWS.d/next/Library/2018-02-10-23-41-05.bpo-19675.-dj35-.rst diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py index 574b5db5afb684..7a6d014901463e 100644 --- a/Lib/multiprocessing/pool.py +++ b/Lib/multiprocessing/pool.py @@ -174,7 +174,15 @@ def __init__(self, processes=None, initializer=None, initargs=(), self._processes = processes self._pool = [] - self._repopulate_pool() + try: + self._repopulate_pool() + except Exception: + for p in self._pool: + if p.exitcode is None: + p.terminate() + for p in self._pool: + p.join() + raise self._worker_handler = threading.Thread( target=Pool._handle_workers, @@ -251,10 +259,10 @@ def _repopulate_pool_static(ctx, Process, processes, pool, inqueue, initargs, maxtasksperchild, wrap_exception) ) - pool.append(w) w.name = w.name.replace('Process', 'PoolWorker') w.daemon = True w.start() + pool.append(w) util.debug('added worker') @staticmethod diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index dc59e9fd740a0f..7993fcb08e465a 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -3,6 +3,7 @@ # import unittest +import unittest.mock import queue as pyqueue import contextlib import time @@ -4635,6 +4636,48 @@ def test_empty(self): proc.join() +class TestPoolNotLeakOnFailure(unittest.TestCase): + + def test_release_unused_processes(self): + # Issue #19675: During pool creation, if we can't create a process, + # don't leak already created ones. + will_fail_in = 3 + forked_processes = [] + + class FailingForkProcess: + def __init__(self, **kwargs): + self.name = 'Fake Process' + self.exitcode = None + self.state = None + forked_processes.append(self) + + def start(self): + nonlocal will_fail_in + if will_fail_in <= 0: + raise OSError("Manually induced OSError") + will_fail_in -= 1 + self.state = 'started' + + def terminate(self): + self.state = 'stopping' + + def join(self): + if self.state == 'stopping': + self.state = 'stopped' + + def is_alive(self): + return self.state == 'started' or self.state == 'stopping' + + with self.assertRaisesRegex(OSError, 'Manually induced OSError'): + p = multiprocessing.pool.Pool(5, context=unittest.mock.MagicMock( + Process=FailingForkProcess)) + p.close() + p.join() + self.assertFalse( + any(process.is_alive() for process in forked_processes)) + + + class MiscTestCase(unittest.TestCase): def test__all__(self): # Just make sure names in blacklist are excluded diff --git a/Misc/NEWS.d/next/Library/2018-02-10-23-41-05.bpo-19675.-dj35-.rst b/Misc/NEWS.d/next/Library/2018-02-10-23-41-05.bpo-19675.-dj35-.rst new file mode 100644 index 00000000000000..958550d3e8d74a --- /dev/null +++ b/Misc/NEWS.d/next/Library/2018-02-10-23-41-05.bpo-19675.-dj35-.rst @@ -0,0 +1 @@ +``multiprocessing.Pool`` no longer leaks processes if its initialization fails.