Skip to content

Commit

Permalink
gh-91231: Add shutdown_timeout to multiprocessing BaseManager (#32112)
Browse files Browse the repository at this point in the history
Add an optional keyword 'shutdown_timeout' parameter to the
multiprocessing.BaseManager constructor. Kill the process if
terminate() takes longer than the timeout.

Multiprocessing tests pass test.support.SHORT_TIMEOUT
to BaseManager.shutdown_timeout.
  • Loading branch information
vstinner committed Apr 19, 2022
1 parent 7407008 commit 061a8bf
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 20 deletions.
16 changes: 15 additions & 1 deletion Doc/library/multiprocessing.rst
Expand Up @@ -1676,7 +1676,7 @@ Manager processes will be shutdown as soon as they are garbage collected or
their parent process exits. The manager classes are defined in the
:mod:`multiprocessing.managers` module:

.. class:: BaseManager([address[, authkey]])
.. class:: BaseManager(address=None, authkey=None, serializer='pickle', ctx=None, *, shutdown_timeout=1.0)

Create a BaseManager object.

Expand All @@ -1691,6 +1691,20 @@ their parent process exits. The manager classes are defined in the
*authkey* is ``None`` then ``current_process().authkey`` is used.
Otherwise *authkey* is used and it must be a byte string.

*serializer* must be ``'pickle'`` (use :mod:`pickle` serialization) or
``'xmlrpclib'`` (use :mod:`xmlrpc.client` serialization).

*ctx* is a context object, or ``None`` (use the current context). See the
:func:`get_context` function.

*shutdown_timeout* is a timeout in seconds used to wait until the process
used by the manager completes in the :meth:`shutdown` method. If the
shutdown times out, the process is terminated. If terminating the process
also times out, the process is killed.

.. versionchanged: 3.11
Added the *shutdown_timeout* parameter.
.. method:: start([initializer[, initargs]])

Start a subprocess to start the manager. If *initializer* is not ``None``
Expand Down
16 changes: 10 additions & 6 deletions Lib/multiprocessing/managers.py
Expand Up @@ -497,7 +497,7 @@ class BaseManager(object):
_Server = Server

def __init__(self, address=None, authkey=None, serializer='pickle',
ctx=None):
ctx=None, *, shutdown_timeout=1.0):
if authkey is None:
authkey = process.current_process().authkey
self._address = address # XXX not final address if eg ('', 0)
Expand All @@ -507,6 +507,7 @@ def __init__(self, address=None, authkey=None, serializer='pickle',
self._serializer = serializer
self._Listener, self._Client = listener_client[serializer]
self._ctx = ctx or get_context()
self._shutdown_timeout = shutdown_timeout

def get_server(self):
'''
Expand Down Expand Up @@ -570,8 +571,8 @@ def start(self, initializer=None, initargs=()):
self._state.value = State.STARTED
self.shutdown = util.Finalize(
self, type(self)._finalize_manager,
args=(self._process, self._address, self._authkey,
self._state, self._Client),
args=(self._process, self._address, self._authkey, self._state,
self._Client, self._shutdown_timeout),
exitpriority=0
)

Expand Down Expand Up @@ -656,7 +657,8 @@ def __exit__(self, exc_type, exc_val, exc_tb):
self.shutdown()

@staticmethod
def _finalize_manager(process, address, authkey, state, _Client):
def _finalize_manager(process, address, authkey, state, _Client,
shutdown_timeout):
'''
Shutdown the manager process; will be registered as a finalizer
'''
Expand All @@ -671,15 +673,17 @@ def _finalize_manager(process, address, authkey, state, _Client):
except Exception:
pass

process.join(timeout=1.0)
process.join(timeout=shutdown_timeout)
if process.is_alive():
util.info('manager still alive')
if hasattr(process, 'terminate'):
util.info('trying to `terminate()` manager process')
process.terminate()
process.join(timeout=0.1)
process.join(timeout=shutdown_timeout)
if process.is_alive():
util.info('manager still alive after terminate')
process.kill()
process.join()

state.value = State.SHUTDOWN
try:
Expand Down
34 changes: 21 additions & 13 deletions Lib/test/_test_multiprocessing.py
Expand Up @@ -119,6 +119,9 @@ def _resource_unlink(name, rtype):
else:
TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1

# BaseManager.shutdown_timeout
SHUTDOWN_TIMEOUT = support.SHORT_TIMEOUT

HAVE_GETVALUE = not getattr(_multiprocessing,
'HAVE_BROKEN_SEM_GETVALUE', False)

Expand Down Expand Up @@ -2897,7 +2900,7 @@ class _TestMyManager(BaseTestCase):
ALLOWED_TYPES = ('manager',)

def test_mymanager(self):
manager = MyManager()
manager = MyManager(shutdown_timeout=SHUTDOWN_TIMEOUT)
manager.start()
self.common(manager)
manager.shutdown()
Expand All @@ -2908,15 +2911,16 @@ def test_mymanager(self):
self.assertIn(manager._process.exitcode, (0, -signal.SIGTERM))

def test_mymanager_context(self):
with MyManager() as manager:
manager = MyManager(shutdown_timeout=SHUTDOWN_TIMEOUT)
with manager:
self.common(manager)
# bpo-30356: BaseManager._finalize_manager() sends SIGTERM
# to the manager process if it takes longer than 1 second to stop,
# which happens on slow buildbots.
self.assertIn(manager._process.exitcode, (0, -signal.SIGTERM))

def test_mymanager_context_prestarted(self):
manager = MyManager()
manager = MyManager(shutdown_timeout=SHUTDOWN_TIMEOUT)
manager.start()
with manager:
self.common(manager)
Expand Down Expand Up @@ -2978,8 +2982,8 @@ class _TestRemoteManager(BaseTestCase):
@classmethod
def _putter(cls, address, authkey):
manager = QueueManager2(
address=address, authkey=authkey, serializer=SERIALIZER
)
address=address, authkey=authkey, serializer=SERIALIZER,
shutdown_timeout=SHUTDOWN_TIMEOUT)
manager.connect()
queue = manager.get_queue()
# Note that xmlrpclib will deserialize object as a list not a tuple
Expand All @@ -2989,8 +2993,8 @@ def test_remote(self):
authkey = os.urandom(32)

manager = QueueManager(
address=(socket_helper.HOST, 0), authkey=authkey, serializer=SERIALIZER
)
address=(socket_helper.HOST, 0), authkey=authkey, serializer=SERIALIZER,
shutdown_timeout=SHUTDOWN_TIMEOUT)
manager.start()
self.addCleanup(manager.shutdown)

Expand All @@ -2999,8 +3003,8 @@ def test_remote(self):
p.start()

manager2 = QueueManager2(
address=manager.address, authkey=authkey, serializer=SERIALIZER
)
address=manager.address, authkey=authkey, serializer=SERIALIZER,
shutdown_timeout=SHUTDOWN_TIMEOUT)
manager2.connect()
queue = manager2.get_queue()

Expand All @@ -3020,15 +3024,17 @@ class _TestManagerRestart(BaseTestCase):
@classmethod
def _putter(cls, address, authkey):
manager = QueueManager(
address=address, authkey=authkey, serializer=SERIALIZER)
address=address, authkey=authkey, serializer=SERIALIZER,
shutdown_timeout=SHUTDOWN_TIMEOUT)
manager.connect()
queue = manager.get_queue()
queue.put('hello world')

def test_rapid_restart(self):
authkey = os.urandom(32)
manager = QueueManager(
address=(socket_helper.HOST, 0), authkey=authkey, serializer=SERIALIZER)
address=(socket_helper.HOST, 0), authkey=authkey,
serializer=SERIALIZER, shutdown_timeout=SHUTDOWN_TIMEOUT)
try:
srvr = manager.get_server()
addr = srvr.address
Expand All @@ -3048,7 +3054,8 @@ def test_rapid_restart(self):
manager.shutdown()

manager = QueueManager(
address=addr, authkey=authkey, serializer=SERIALIZER)
address=addr, authkey=authkey, serializer=SERIALIZER,
shutdown_timeout=SHUTDOWN_TIMEOUT)
try:
manager.start()
self.addCleanup(manager.shutdown)
Expand All @@ -3059,7 +3066,8 @@ def test_rapid_restart(self):
# (sporadic failure on buildbots)
time.sleep(1.0)
manager = QueueManager(
address=addr, authkey=authkey, serializer=SERIALIZER)
address=addr, authkey=authkey, serializer=SERIALIZER,
shutdown_timeout=SHUTDOWN_TIMEOUT)
if hasattr(manager, "shutdown"):
self.addCleanup(manager.shutdown)

Expand Down
@@ -0,0 +1,3 @@
Add an optional keyword *shutdown_timeout* parameter to the
:class:`multiprocessing.BaseManager` constructor. Kill the process if
terminate() takes longer than the timeout. Patch by Victor Stinner.

0 comments on commit 061a8bf

Please sign in to comment.