Skip to content

Commit

Permalink
bpo-19675: Terminate processes if construction of a pool is failing. (G…
Browse files Browse the repository at this point in the history
  • Loading branch information
JulienPalard committed Nov 4, 2018
1 parent b4db249 commit 5d236ca
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 2 deletions.
12 changes: 10 additions & 2 deletions Lib/multiprocessing/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,15 @@ def __init__(self, processes=None, initializer=None, initargs=(),

self._processes = processes
self._pool = []
self._repopulate_pool()
try:
self._repopulate_pool()
except Exception:
for p in self._pool:
if p.exitcode is None:
p.terminate()
for p in self._pool:
p.join()
raise

self._worker_handler = threading.Thread(
target=Pool._handle_workers,
Expand Down Expand Up @@ -251,10 +259,10 @@ def _repopulate_pool_static(ctx, Process, processes, pool, inqueue,
initargs, maxtasksperchild,
wrap_exception)
)
pool.append(w)
w.name = w.name.replace('Process', 'PoolWorker')
w.daemon = True
w.start()
pool.append(w)
util.debug('added worker')

@staticmethod
Expand Down
43 changes: 43 additions & 0 deletions Lib/test/_test_multiprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#

import unittest
import unittest.mock
import queue as pyqueue
import contextlib
import time
Expand Down Expand Up @@ -4635,6 +4636,48 @@ def test_empty(self):
proc.join()


class TestPoolNotLeakOnFailure(unittest.TestCase):

def test_release_unused_processes(self):
# Issue #19675: During pool creation, if we can't create a process,
# don't leak already created ones.
will_fail_in = 3
forked_processes = []

class FailingForkProcess:
def __init__(self, **kwargs):
self.name = 'Fake Process'
self.exitcode = None
self.state = None
forked_processes.append(self)

def start(self):
nonlocal will_fail_in
if will_fail_in <= 0:
raise OSError("Manually induced OSError")
will_fail_in -= 1
self.state = 'started'

def terminate(self):
self.state = 'stopping'

def join(self):
if self.state == 'stopping':
self.state = 'stopped'

def is_alive(self):
return self.state == 'started' or self.state == 'stopping'

with self.assertRaisesRegex(OSError, 'Manually induced OSError'):
p = multiprocessing.pool.Pool(5, context=unittest.mock.MagicMock(
Process=FailingForkProcess))
p.close()
p.join()
self.assertFalse(
any(process.is_alive() for process in forked_processes))



class MiscTestCase(unittest.TestCase):
def test__all__(self):
# Just make sure names in blacklist are excluded
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
``multiprocessing.Pool`` no longer leaks processes if its initialization fails.

0 comments on commit 5d236ca

Please sign in to comment.