Skip to content

Commit

Permalink
asyncio: Manage our own thread instead of an executor
Browse files Browse the repository at this point in the history
Python 3.9 changed the behavior of ThreadPoolExecutor at interpreter
shutdown (after the already-tricky import-order issues around
atexit hooks). Avoid these issues by managing the thread by hand.
  • Loading branch information
bdarnell committed Oct 24, 2020
1 parent c938ddb commit 15832bc
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 71 deletions.
2 changes: 1 addition & 1 deletion appveyor.yml
Expand Up @@ -54,7 +54,7 @@ environment:
PYTHON_VERSION: "3.8.x"
PYTHON_ARCH: "32"
TOX_ENV: "py38"
TOX_ARGS: "tornado.test.websocket_test"
TOX_ARGS: "--fail-if-logs=false tornado.test.websocket_test"

- PYTHON: "C:\\Python38-x64"
PYTHON_VERSION: "3.8.x"
Expand Down
167 changes: 97 additions & 70 deletions tornado/platform/asyncio.py
Expand Up @@ -50,26 +50,27 @@ def fileno(self) -> int:
_T = TypeVar("_T")


# Collection of sockets to write to at shutdown to wake up any selector threads.
_waker_sockets = set() # type: Set[socket.socket]
# Collection of selector thread event loops to shut down on exit.
_selector_loops = set() # type: Set[AddThreadSelectorEventLoop]


def _atexit_callback() -> None:
for fd in _waker_sockets:
for loop in _selector_loops:
with loop._select_cond:
loop._closing_selector = True
loop._select_cond.notify()
try:
fd.send(b"a")
loop._waker_w.send(b"a")
except BlockingIOError:
pass
# If we don't join our (daemon) thread here, we may get a deadlock
# during interpreter shutdown. I don't really understand why. This
# deadlock happens every time in CI (both travis and appveyor) but
# I've never been able to reproduce locally.
loop._thread.join()
_selector_loops.clear()


# atexit callbacks are run in LIFO order. Our callback must run before
# ThreadPoolExecutor's or it will deadlock (the pool's threads can't
# finish their work items until we write to their waker sockets). In
# recent versions of Python the thread pool atexit callback is
# registered in a getattr hook the first time TPE is *referenced*
# (instead of older versions of python where it was registered when
# concurrent.futures was imported).
concurrent.futures.ThreadPoolExecutor
atexit.register(_atexit_callback)


Expand Down Expand Up @@ -422,7 +423,10 @@ class AddThreadSelectorEventLoop(asyncio.AbstractEventLoop):
# in this set are proxied through to the underlying loop.
MY_ATTRIBUTES = {
"_consume_waker",
"_executor",
"_select_cond",
"_select_args",
"_closing_selector",
"_thread",
"_handle_event",
"_readers",
"_real_loop",
Expand All @@ -447,10 +451,21 @@ def __getattribute__(self, name: str) -> Any:

def __init__(self, real_loop: asyncio.AbstractEventLoop) -> None:
self._real_loop = real_loop
# Create our own executor to ensure we always have a thread
# available (we'll keep it 100% busy) instead of contending
# with the application for a thread in the default executor.
self._executor = concurrent.futures.ThreadPoolExecutor(1)

# Create a thread to run the select system call. We manage this thread
# manually so we can trigger a clean shutdown from an atexit hook. Note
# that due to the order of operations at shutdown, only daemon threads
# can be shut down in this way (non-daemon threads would require the
# introduction of a new hook: https://bugs.python.org/issue41962)
self._select_cond = threading.Condition()
self._select_args = (
None
) # type: Optional[Tuple[List[_FileDescriptorLike], List[_FileDescriptorLike]]]
self._closing_selector = False
self._thread = threading.Thread(
name="Tornado selector", daemon=True, target=self._run_select,
)
self._thread.start()
# Start the select loop once the loop is started.
self._real_loop.call_soon(self._start_select)

Expand All @@ -462,7 +477,7 @@ def __init__(self, real_loop: asyncio.AbstractEventLoop) -> None:
self._waker_r, self._waker_w = socket.socketpair()
self._waker_r.setblocking(False)
self._waker_w.setblocking(False)
_waker_sockets.add(self._waker_w)
_selector_loops.add(self)
self.add_reader(self._waker_r, self._consume_waker)

def __del__(self) -> None:
Expand All @@ -471,14 +486,17 @@ def __del__(self) -> None:
# can get a clean shutdown notification. If we're just left to
# be GC'd, we must explicitly close our sockets to avoid
# logging warnings.
_waker_sockets.discard(self._waker_w)
_selector_loops.discard(self)
self._waker_r.close()
self._waker_w.close()

def close(self) -> None:
with self._select_cond:
self._closing_selector = True
self._select_cond.notify()
self._wake_selector()
self._executor.shutdown()
_waker_sockets.discard(self._waker_w)
self._thread.join()
_selector_loops.discard(self)
self._waker_r.close()
self._waker_w.close()
self._real_loop.close()
Expand All @@ -499,55 +517,64 @@ def _start_select(self) -> None:
# Capture reader and writer sets here in the event loop
# thread to avoid any problems with concurrent
# modification while the select loop uses them.
f = self.run_in_executor(
self._executor,
self._run_select,
list(self._readers.keys()),
list(self._writers.keys()),
)
asyncio.ensure_future(f).add_done_callback(self._handle_select)

def _run_select(
self, to_read: List[int], to_write: List[int]
) -> Tuple[List[int], List[int]]:
# We use the simpler interface of the select module instead of
# the more stateful interface in the selectors module because
# this class is only intended for use on windows, where
# select.select is the only option. The selector interface
# does not have well-documented thread-safety semantics that
# we can rely on so ensuring proper synchronization would be
# tricky.
try:
# On windows, selecting on a socket for write will not
# return the socket when there is an error (but selecting
# for reads works). Also select for errors when selecting
# for writes, and merge the results.
#
# This pattern is also used in
# https://github.com/python/cpython/blob/v3.8.0/Lib/selectors.py#L312-L317
rs, ws, xs = select.select(to_read, to_write, to_write)
ws = ws + xs
except OSError as e:
# After remove_reader or remove_writer is called, the file
# descriptor may subsequently be closed on the event loop
# thread. It's possible that this select thread hasn't
# gotten into the select system call by the time that
# happens in which case (at least on macOS), select may
# raise a "bad file descriptor" error. If we get that
# error, check and see if we're also being woken up by
# polling the waker alone. If we are, just return to the
# event loop and we'll get the updated set of file
# descriptors on the next iteration. Otherwise, raise the
# original error.
if e.errno == getattr(errno, "WSAENOTSOCK", errno.EBADF):
rs, _, _ = select.select([self._waker_r.fileno()], [], [], 0)
if rs:
return rs, []
raise
return rs, ws

def _handle_select(self, f: "asyncio.Future[Tuple[List[int], List[int]]]") -> None:
rs, ws = f.result()
with self._select_cond:
assert self._select_args is None
self._select_args = (list(self._readers.keys()), list(self._writers.keys()))
self._select_cond.notify()

def _run_select(self) -> None:
while True:
with self._select_cond:
while self._select_args is None and not self._closing_selector:
self._select_cond.wait()
if self._closing_selector:
return
assert self._select_args is not None
to_read, to_write = self._select_args
self._select_args = None

# We use the simpler interface of the select module instead of
# the more stateful interface in the selectors module because
# this class is only intended for use on windows, where
# select.select is the only option. The selector interface
# does not have well-documented thread-safety semantics that
# we can rely on so ensuring proper synchronization would be
# tricky.
try:
# On windows, selecting on a socket for write will not
# return the socket when there is an error (but selecting
# for reads works). Also select for errors when selecting
# for writes, and merge the results.
#
# This pattern is also used in
# https://github.com/python/cpython/blob/v3.8.0/Lib/selectors.py#L312-L317
rs, ws, xs = select.select(to_read, to_write, to_write)
ws = ws + xs
except OSError as e:
# After remove_reader or remove_writer is called, the file
# descriptor may subsequently be closed on the event loop
# thread. It's possible that this select thread hasn't
# gotten into the select system call by the time that
# happens in which case (at least on macOS), select may
# raise a "bad file descriptor" error. If we get that
# error, check and see if we're also being woken up by
# polling the waker alone. If we are, just return to the
# event loop and we'll get the updated set of file
# descriptors on the next iteration. Otherwise, raise the
# original error.
if e.errno == getattr(errno, "WSAENOTSOCK", errno.EBADF):
rs, _, _ = select.select([self._waker_r.fileno()], [], [], 0)
if rs:
ws = []
else:
raise
else:
raise
self._real_loop.call_soon_threadsafe(self._handle_select, rs, ws)

def _handle_select(
self, rs: List["_FileDescriptorLike"], ws: List["_FileDescriptorLike"]
) -> None:
for r in rs:
self._handle_event(r, self._readers)
for w in ws:
Expand Down

0 comments on commit 15832bc

Please sign in to comment.