Skip to content

Commit

Permalink
Merge pull request #3332 from bdarnell/selector-thread-atexit
Browse files Browse the repository at this point in the history
Revert "asyncio: Remove atexit hook"
  • Loading branch information
bdarnell committed Oct 29, 2023
2 parents f5df43f + 3340c39 commit ec59fa0
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 13 deletions.
28 changes: 28 additions & 0 deletions tornado/platform/asyncio.py
Expand Up @@ -23,6 +23,7 @@
"""

import asyncio
import atexit
import concurrent.futures
import errno
import functools
Expand Down Expand Up @@ -59,6 +60,31 @@ def fileno(self) -> int:
_T = TypeVar("_T")


# Collection of selector thread event loops to shut down on exit.
_selector_loops: Set["SelectorThread"] = set()


def _atexit_callback() -> None:
for loop in _selector_loops:
with loop._select_cond:
loop._closing_selector = True
loop._select_cond.notify()
try:
loop._waker_w.send(b"a")
except BlockingIOError:
pass
if loop._thread is not None:
# If we don't join our (daemon) thread here, we may get a deadlock
# during interpreter shutdown. I don't really understand why. This
# deadlock happens every time in CI (both travis and appveyor) but
# I've never been able to reproduce locally.
loop._thread.join()
_selector_loops.clear()


atexit.register(_atexit_callback)


class BaseAsyncIOLoop(IOLoop):
def initialize( # type: ignore
self, asyncio_loop: asyncio.AbstractEventLoop, **kwargs: Any
Expand Down Expand Up @@ -453,6 +479,7 @@ async def thread_manager_anext() -> None:
self._waker_r, self._waker_w = socket.socketpair()
self._waker_r.setblocking(False)
self._waker_w.setblocking(False)
_selector_loops.add(self)
self.add_reader(self._waker_r, self._consume_waker)

def close(self) -> None:
Expand All @@ -464,6 +491,7 @@ def close(self) -> None:
self._wake_selector()
if self._thread is not None:
self._thread.join()
_selector_loops.discard(self)
self.remove_reader(self._waker_r)
self._waker_r.close()
self._waker_w.close()
Expand Down
26 changes: 13 additions & 13 deletions tornado/test/circlerefs_test.py
Expand Up @@ -197,21 +197,21 @@ def test_run_on_executor(self):
# and tornado.concurrent.chain_future.
import concurrent.futures

thread_pool = concurrent.futures.ThreadPoolExecutor(1)
with concurrent.futures.ThreadPoolExecutor(1) as thread_pool:

class Factory(object):
executor = thread_pool
class Factory(object):
executor = thread_pool

@tornado.concurrent.run_on_executor
def run(self):
return None
@tornado.concurrent.run_on_executor
def run(self):
return None

factory = Factory()
factory = Factory()

async def main():
# The cycle is not reported on the first call. It's not clear why.
for i in range(2):
await factory.run()
async def main():
# The cycle is not reported on the first call. It's not clear why.
for i in range(2):
await factory.run()

with assert_no_cycle_garbage():
asyncio.run(main())
with assert_no_cycle_garbage():
asyncio.run(main())

0 comments on commit ec59fa0

Please sign in to comment.