Skip to content

Commit

Permalink
Merge pull request #3029 from minrk/reuse-selector
Browse files Browse the repository at this point in the history
separate SelectorThread into its own object
  • Loading branch information
bdarnell committed May 15, 2023
2 parents b5dad63 + 91bddc0 commit f28b245
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 44 deletions.
115 changes: 71 additions & 44 deletions tornado/platform/asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def fileno(self) -> int:


# Collection of selector thread event loops to shut down on exit.
_selector_loops = set() # type: Set[AddThreadSelectorEventLoop]
_selector_loops = set() # type: Set[SelectorThread]


def _atexit_callback() -> None:
Expand Down Expand Up @@ -427,53 +427,18 @@ def get_event_loop(self) -> asyncio.AbstractEventLoop:
return loop


class AddThreadSelectorEventLoop(asyncio.AbstractEventLoop):
"""Wrap an event loop to add implementations of the ``add_reader`` method family.
class SelectorThread:
"""Define ``add_reader`` methods to be called in a background select thread.
Instances of this class start a second thread to run a selector.
This thread is completely hidden from the user; all callbacks are
run on the wrapped event loop's thread.
This class is used automatically by Tornado; applications should not need
to refer to it directly.
It is safe to wrap any event loop with this class, although it only makes sense
for event loops that do not implement the ``add_reader`` family of methods
themselves (i.e. ``WindowsProactorEventLoop``)
Closing the ``AddThreadSelectorEventLoop`` also closes the wrapped event loop.
This thread is completely hidden from the user;
all callbacks are run on the wrapped event loop's thread.
Typically used via ``AddThreadSelectorEventLoop``,
but can be attached to a running asyncio loop.
"""

# This class is a __getattribute__-based proxy. All attributes other than those
# in this set are proxied through to the underlying loop.
MY_ATTRIBUTES = {
"_consume_waker",
"_select_cond",
"_select_args",
"_closing_selector",
"_thread",
"_handle_event",
"_readers",
"_real_loop",
"_start_select",
"_run_select",
"_handle_select",
"_wake_selector",
"_waker_r",
"_waker_w",
"_writers",
"add_reader",
"add_writer",
"close",
"remove_reader",
"remove_writer",
}

def __getattribute__(self, name: str) -> Any:
if name in AddThreadSelectorEventLoop.MY_ATTRIBUTES:
return super().__getattribute__(name)
return getattr(self._real_loop, name)
_closed = False

def __init__(self, real_loop: asyncio.AbstractEventLoop) -> None:
self._real_loop = real_loop
Expand Down Expand Up @@ -519,6 +484,8 @@ def __del__(self) -> None:
self._waker_w.close()

def close(self) -> None:
if self._closed:
return
with self._select_cond:
self._closing_selector = True
self._select_cond.notify()
Expand All @@ -527,7 +494,7 @@ def close(self) -> None:
_selector_loops.discard(self)
self._waker_r.close()
self._waker_w.close()
self._real_loop.close()
self._closed = True

def _wake_selector(self) -> None:
try:
Expand Down Expand Up @@ -661,3 +628,63 @@ def remove_writer(self, fd: "_FileDescriptorLike") -> bool:
return False
self._wake_selector()
return True


class AddThreadSelectorEventLoop(asyncio.AbstractEventLoop):
"""Wrap an event loop to add implementations of the ``add_reader`` method family.
Instances of this class start a second thread to run a selector.
This thread is completely hidden from the user; all callbacks are
run on the wrapped event loop's thread.
This class is used automatically by Tornado; applications should not need
to refer to it directly.
It is safe to wrap any event loop with this class, although it only makes sense
for event loops that do not implement the ``add_reader`` family of methods
themselves (i.e. ``WindowsProactorEventLoop``)
Closing the ``AddThreadSelectorEventLoop`` also closes the wrapped event loop.
"""

# This class is a __getattribute__-based proxy. All attributes other than those
# in this set are proxied through to the underlying loop.
MY_ATTRIBUTES = {
"_real_loop",
"_selector",
"add_reader",
"add_writer",
"close",
"remove_reader",
"remove_writer",
}

def __getattribute__(self, name: str) -> Any:
if name in AddThreadSelectorEventLoop.MY_ATTRIBUTES:
return super().__getattribute__(name)
return getattr(self._real_loop, name)

def __init__(self, real_loop: asyncio.AbstractEventLoop) -> None:
self._real_loop = real_loop
self._selector = SelectorThread(real_loop)

def close(self) -> None:
self._selector.close()
self._real_loop.close()

def add_reader(
self, fd: "_FileDescriptorLike", callback: Callable[..., None], *args: Any
) -> None:
return self._selector.add_reader(fd, callback, *args)

def add_writer(
self, fd: "_FileDescriptorLike", callback: Callable[..., None], *args: Any
) -> None:
return self._selector.add_writer(fd, callback, *args)

def remove_reader(self, fd: "_FileDescriptorLike") -> bool:
return self._selector.remove_reader(fd)

def remove_writer(self, fd: "_FileDescriptorLike") -> bool:
return self._selector.remove_writer(fd)
6 changes: 6 additions & 0 deletions tornado/test/asyncio_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
AsyncIOLoop,
to_asyncio_future,
AnyThreadEventLoopPolicy,
AddThreadSelectorEventLoop,
)
from tornado.testing import AsyncTestCase, gen_test

Expand Down Expand Up @@ -105,6 +106,11 @@ async def native_coroutine_with_adapter2():
42,
)

def test_add_thread_close_idempotent(self):
loop = AddThreadSelectorEventLoop(asyncio.get_event_loop()) # type: ignore
loop.close()
loop.close()


class LeakTest(unittest.TestCase):
def setUp(self):
Expand Down

0 comments on commit f28b245

Please sign in to comment.