Skip to content

Commit

Permalink
pythongh-104090: Add exit code to multiprocessing ResourceTracker (py…
Browse files Browse the repository at this point in the history
…thonGH-115410)

This builds on python#106807, which adds
a return code to ResourceTracker, to make future debugging easier.
Testing this “in situ” proved difficult, since the global ResourceTracker is
involved in test infrastructure. So, the tests here create a new instance and
feed it fake data.

---------

Co-authored-by: Yonatan Bitton <yonatan.bitton@perception-point.io>
Co-authored-by: Yonatan Bitton <bityob@gmail.com>
Co-authored-by: Antoine Pitrou <antoine@python.org>
  • Loading branch information
4 people committed Feb 21, 2024
1 parent b052fa3 commit 4a9e649
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 7 deletions.
38 changes: 32 additions & 6 deletions Lib/multiprocessing/resource_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,12 @@
_HAVE_SIGMASK = hasattr(signal, 'pthread_sigmask')
_IGNORED_SIGNALS = (signal.SIGINT, signal.SIGTERM)

def cleanup_noop(name):
raise RuntimeError('noop should never be registered or cleaned up')

_CLEANUP_FUNCS = {
'noop': lambda: None,
'noop': cleanup_noop,
'dummy': lambda name: None, # Dummy resource used in tests
}

if os.name == 'posix':
Expand Down Expand Up @@ -61,6 +65,7 @@ def __init__(self):
self._lock = threading.RLock()
self._fd = None
self._pid = None
self._exitcode = None

def _reentrant_call_error(self):
# gh-109629: this happens if an explicit call to the ResourceTracker
Expand All @@ -84,9 +89,16 @@ def _stop(self):
os.close(self._fd)
self._fd = None

os.waitpid(self._pid, 0)
_, status = os.waitpid(self._pid, 0)

self._pid = None

try:
self._exitcode = os.waitstatus_to_exitcode(status)
except ValueError:
# os.waitstatus_to_exitcode may raise an exception for invalid values
self._exitcode = None

def getfd(self):
self.ensure_running()
return self._fd
Expand Down Expand Up @@ -119,6 +131,7 @@ def ensure_running(self):
pass
self._fd = None
self._pid = None
self._exitcode = None

warnings.warn('resource_tracker: process died unexpectedly, '
'relaunching. Some resources might leak.')
Expand Down Expand Up @@ -221,6 +234,8 @@ def main(fd):
pass

cache = {rtype: set() for rtype in _CLEANUP_FUNCS.keys()}
exit_code = 0

try:
# keep track of registered/unregistered resources
with open(fd, 'rb') as f:
Expand All @@ -242,6 +257,7 @@ def main(fd):
else:
raise RuntimeError('unrecognized command %r' % cmd)
except Exception:
exit_code = 3
try:
sys.excepthook(*sys.exc_info())
except:
Expand All @@ -251,10 +267,17 @@ def main(fd):
for rtype, rtype_cache in cache.items():
if rtype_cache:
try:
warnings.warn(
f'resource_tracker: There appear to be {len(rtype_cache)} '
f'leaked {rtype} objects to clean up at shutdown: {rtype_cache}'
)
exit_code = 1
if rtype == 'dummy':
# The test 'dummy' resource is expected to leak.
# We skip the warning (and *only* the warning) for it.
pass
else:
warnings.warn(
f'resource_tracker: There appear to be '
f'{len(rtype_cache)} leaked {rtype} objects to '
f'clean up at shutdown: {rtype_cache}'
)
except Exception:
pass
for name in rtype_cache:
Expand All @@ -265,6 +288,9 @@ def main(fd):
try:
_CLEANUP_FUNCS[rtype](name)
except Exception as e:
exit_code = 2
warnings.warn('resource_tracker: %r: %s' % (name, e))
finally:
pass

sys.exit(exit_code)
35 changes: 34 additions & 1 deletion Lib/test/_test_multiprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -5609,8 +5609,9 @@ def create_and_register_resource(rtype):
'''
for rtype in resource_tracker._CLEANUP_FUNCS:
with self.subTest(rtype=rtype):
if rtype == "noop":
if rtype in ("noop", "dummy"):
# Artefact resource type used by the resource_tracker
# or tests
continue
r, w = os.pipe()
p = subprocess.Popen([sys.executable,
Expand Down Expand Up @@ -5730,6 +5731,38 @@ def test_too_long_name_resource(self):
with self.assertRaises(ValueError):
resource_tracker.register(too_long_name_resource, rtype)

def _test_resource_tracker_leak_resources(self, cleanup):
# We use a separate instance for testing, since the main global
# _resource_tracker may be used to watch test infrastructure.
from multiprocessing.resource_tracker import ResourceTracker
tracker = ResourceTracker()
tracker.ensure_running()
self.assertTrue(tracker._check_alive())

self.assertIsNone(tracker._exitcode)
tracker.register('somename', 'dummy')
if cleanup:
tracker.unregister('somename', 'dummy')
expected_exit_code = 0
else:
expected_exit_code = 1

self.assertTrue(tracker._check_alive())
self.assertIsNone(tracker._exitcode)
tracker._stop()
self.assertEqual(tracker._exitcode, expected_exit_code)

def test_resource_tracker_exit_code(self):
"""
Test the exit code of the resource tracker.
If no leaked resources were found, exit code should be 0, otherwise 1
"""
for cleanup in [True, False]:
with self.subTest(cleanup=cleanup):
self._test_resource_tracker_leak_resources(
cleanup=cleanup,
)

class TestSimpleQueue(unittest.TestCase):

Expand Down
26 changes: 26 additions & 0 deletions Lib/test/test_concurrent_futures/test_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import queue
import time
import unittest
import sys
from concurrent.futures._base import BrokenExecutor
from logging.handlers import QueueHandler

Expand Down Expand Up @@ -109,6 +110,31 @@ def _assert_logged(self, msg):
create_executor_tests(globals(), FailingInitializerMixin)


@unittest.skipIf(sys.platform == "win32", "Resource Tracker doesn't run on Windows")
class FailingInitializerResourcesTest(unittest.TestCase):
"""
Source: https://github.com/python/cpython/issues/104090
"""

def _test(self, test_class):
runner = unittest.TextTestRunner()
runner.run(test_class('test_initializer'))

# GH-104090:
# Stop resource tracker manually now, so we can verify there are not leaked resources by checking
# the process exit code
from multiprocessing.resource_tracker import _resource_tracker
_resource_tracker._stop()

self.assertEqual(_resource_tracker._exitcode, 0)

def test_spawn(self):
self._test(ProcessPoolSpawnFailingInitializerTest)

def test_forkserver(self):
self._test(ProcessPoolForkserverFailingInitializerTest)


def setUpModule():
setup_module()

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
The multiprocessing resource tracker now exits with non-zero status code if a resource
leak was detected. It still exits with status code 0 otherwise.

0 comments on commit 4a9e649

Please sign in to comment.