diff --git a/tornado/platform/asyncio.py b/tornado/platform/asyncio.py index 8d16aa6d15..79e60848b4 100644 --- a/tornado/platform/asyncio.py +++ b/tornado/platform/asyncio.py @@ -23,6 +23,7 @@ """ import asyncio +import atexit import concurrent.futures import errno import functools @@ -59,6 +60,31 @@ def fileno(self) -> int: _T = TypeVar("_T") +# Collection of selector thread event loops to shut down on exit. +_selector_loops: Set["SelectorThread"] = set() + + +def _atexit_callback() -> None: + for loop in _selector_loops: + with loop._select_cond: + loop._closing_selector = True + loop._select_cond.notify() + try: + loop._waker_w.send(b"a") + except BlockingIOError: + pass + if loop._thread is not None: + # If we don't join our (daemon) thread here, we may get a deadlock + # during interpreter shutdown. I don't really understand why. This + # deadlock happens every time in CI (both travis and appveyor) but + # I've never been able to reproduce locally. + loop._thread.join() + _selector_loops.clear() + + +atexit.register(_atexit_callback) + + class BaseAsyncIOLoop(IOLoop): def initialize( # type: ignore self, asyncio_loop: asyncio.AbstractEventLoop, **kwargs: Any @@ -453,6 +479,7 @@ async def thread_manager_anext() -> None: self._waker_r, self._waker_w = socket.socketpair() self._waker_r.setblocking(False) self._waker_w.setblocking(False) + _selector_loops.add(self) self.add_reader(self._waker_r, self._consume_waker) def close(self) -> None: @@ -464,6 +491,7 @@ def close(self) -> None: self._wake_selector() if self._thread is not None: self._thread.join() + _selector_loops.discard(self) self.remove_reader(self._waker_r) self._waker_r.close() self._waker_w.close() diff --git a/tornado/test/circlerefs_test.py b/tornado/test/circlerefs_test.py index 5c71858ff7..5c25adffd8 100644 --- a/tornado/test/circlerefs_test.py +++ b/tornado/test/circlerefs_test.py @@ -197,21 +197,21 @@ def test_run_on_executor(self): # and tornado.concurrent.chain_future. import concurrent.futures - thread_pool = concurrent.futures.ThreadPoolExecutor(1) + with concurrent.futures.ThreadPoolExecutor(1) as thread_pool: - class Factory(object): - executor = thread_pool + class Factory(object): + executor = thread_pool - @tornado.concurrent.run_on_executor - def run(self): - return None + @tornado.concurrent.run_on_executor + def run(self): + return None - factory = Factory() + factory = Factory() - async def main(): - # The cycle is not reported on the first call. It's not clear why. - for i in range(2): - await factory.run() + async def main(): + # The cycle is not reported on the first call. It's not clear why. + for i in range(2): + await factory.run() - with assert_no_cycle_garbage(): - asyncio.run(main()) + with assert_no_cycle_garbage(): + asyncio.run(main())