From 909d8f133d6c9c573057c0cd52f7aaaa8fa6d666 Mon Sep 17 00:00:00 2001 From: Tim Burke Date: Thu, 7 May 2020 16:00:30 -0700 Subject: [PATCH] Clean up threading book-keeping at fork when monkey-patched Previously, if we patched threading then forked (or, in some cases, used the subprocess module), Python would log an ignored exception like Exception ignored in: Traceback (most recent call last): File "/usr/lib/python3.7/threading.py", line 1335, in _after_fork assert len(_active) == 1 AssertionError: This comes down to threading in Python 3.7+ having an import side-effect of registering an at-fork callback. When we re-import threading to patch it, the old (but still registered) callback still points to the old thread-tracking dict, rather than the new dict that's actually doing the tracking. Now, register our own at_fork hook that will fix up the dict reference before threading's _at_fork runs and put it back afterwards. Closes #592 --- eventlet/patcher.py | 20 +++++++++++++++++ tests/patcher_test.py | 52 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 72 insertions(+) diff --git a/eventlet/patcher.py b/eventlet/patcher.py index bfc7faaa5d..a5786379fb 100644 --- a/eventlet/patcher.py +++ b/eventlet/patcher.py @@ -312,6 +312,26 @@ def monkey_patch(**on): for attr_name in deleted: if hasattr(orig_mod, attr_name): delattr(orig_mod, attr_name) + + if name == 'threading' and sys.version_info >= (3, 7): + def fix_threading_active( + _os=original('os'), + _global_dict=original('threading').current_thread.__globals__, + _patched=orig_mod + ): + _prefork_active = [None] + + def before_fork(): + _prefork_active[0] = _global_dict['_active'] + _global_dict['_active'] = _patched._active + + def after_fork(): + _global_dict['_active'] = _prefork_active[0] + + _os.register_at_fork( + before=before_fork, + after_in_parent=after_fork) + fix_threading_active() finally: imp.release_lock() diff --git a/tests/patcher_test.py b/tests/patcher_test.py index 8c60bf14ee..b6e5b562df 100644 --- a/tests/patcher_test.py +++ b/tests/patcher_test.py @@ -294,6 +294,58 @@ def test_subprocess_after_monkey_patch(): assert output.rstrip() == b'pass' +def test_fork_after_monkey_patch(): + code = '''\ +import eventlet +import eventlet.patcher +eventlet.monkey_patch() +import os +import sys +import threading +_threading = eventlet.patcher.original('threading') +import eventlet.green.threading +def target(): + eventlet.sleep(0.1) +def check(n, mod, tag): + assert len(mod._active) == n, 'Expected {} {} threads, got {}'.format(n, tag, mod._active) +threads = [ + threading.Thread(target=target, name='patched'), + _threading.Thread(target=target, name='original-1'), + _threading.Thread(target=target, name='original-2'), + eventlet.green.threading.Thread(target=target, name='green-1'), + eventlet.green.threading.Thread(target=target, name='green-2'), + eventlet.green.threading.Thread(target=target, name='green-3'), +] +for t in threads: t.start() +check(2, threading, 'pre-fork patched') +check(3, _threading, 'pre-fork original') +check(4, eventlet.green.threading, 'pre-fork green') +if os.fork() == 0: + # Inside the child, we should only have a main thread, + # but old pythons make it difficult to ensure + if sys.version_info >= (3, 7): + check(1, threading, 'child post-fork patched') + check(1, _threading, 'child post-fork original') + check(1, eventlet.green.threading, 'child post-fork green') + exit() +else: + os.wait() +check(2, threading, 'post-fork patched') +check(3, _threading, 'post-fork original') +check(4, eventlet.green.threading, 'post-fork green') +for t in threads: t.join() +check(1, threading, 'post-join patched') +check(1, _threading, 'post-join original') +check(1, eventlet.green.threading, 'post-join green') +print('pass') +''' + output = tests.run_python( + path=None, + args=['-c', code], + expect_pass=True, + ) + + class Threading(ProcessBase): def test_orig_thread(self): new_mod = """import eventlet