From f6c332592af7912ac1910160d5e4b84cfb05180b Mon Sep 17 00:00:00 2001 From: Joshua Oreman Date: Wed, 30 Sep 2020 22:07:21 -0800 Subject: [PATCH 1/2] Make trio.from_thread.run() raise RunFinishedError if the system nursery is closed --- newsfragments/1738.bugfix.rst | 6 ++++++ trio/_core/_run.py | 9 +++++++++ trio/_threads.py | 12 ++++++++++-- trio/tests/test_threads.py | 22 ++++++++++++++++++++++ 4 files changed, 47 insertions(+), 2 deletions(-) create mode 100644 newsfragments/1738.bugfix.rst diff --git a/newsfragments/1738.bugfix.rst b/newsfragments/1738.bugfix.rst new file mode 100644 index 0000000000..ddb42050b4 --- /dev/null +++ b/newsfragments/1738.bugfix.rst @@ -0,0 +1,6 @@ +:func:`trio.from_thread.run` no longer crashes the Trio run if it is +executed after the system nursery has been closed but before the run +has finished. Calls made at this time will now raise +`trio.RunFinishedError`. This fixes a regression introduced in +Trio 0.17.0. The window in question is only one scheduler tick long in +most cases, but may be longer if async generators need to be cleaned up. diff --git a/trio/_core/_run.py b/trio/_core/_run.py index 7012000ac3..fe51f7bfc8 100644 --- a/trio/_core/_run.py +++ b/trio/_core/_run.py @@ -1546,6 +1546,15 @@ def spawn_system_task(self, async_fn, *args, name=None): * System tasks do not inherit context variables from their creator. + Towards the end of a call to :meth:`trio.run`, after the main + task and all system tasks have exited, the system nursery + becomes closed. At this point, new calls to + :func:`spawn_system_task` will raise ``RuntimeError("Nursery + is closed to new arrivals")`` instead of creating a system + task. It's possible to encounter this state either in + a ``finally`` block in an async generator, or in a callback + passed to :meth:`TrioToken.run_sync_soon` at the right moment. + Args: async_fn: An async callable. args: Positional arguments for ``async_fn``. If you want to pass diff --git a/trio/_threads.py b/trio/_threads.py index 1f90268aa0..648b87d801 100644 --- a/trio/_threads.py +++ b/trio/_threads.py @@ -1,3 +1,5 @@ +# coding: utf-8 + import threading import queue as stdlib_queue from itertools import count @@ -248,7 +250,8 @@ def from_thread_run(afn, *args, trio_token=None): Raises: RunFinishedError: if the corresponding call to :func:`trio.run` has - already completed. + already completed, or if the run has started its final cleanup phase + and can no longer spawn new system tasks. Cancelled: if the corresponding call to :func:`trio.run` completes while ``afn(*args)`` is running, then ``afn`` is likely to raise :exc:`trio.Cancelled`, and this will propagate out into @@ -279,7 +282,12 @@ async def unprotected_afn(): async def await_in_trio_thread_task(): q.put_nowait(await outcome.acapture(unprotected_afn)) - trio.lowlevel.spawn_system_task(await_in_trio_thread_task, name=afn) + try: + trio.lowlevel.spawn_system_task(await_in_trio_thread_task, name=afn) + except RuntimeError: # system nursery is closed + q.put_nowait( + outcome.Error(trio.RunFinishedError("system nursery is closed")) + ) return _run_fn_as_system_task(callback, afn, *args, trio_token=trio_token) diff --git a/trio/tests/test_threads.py b/trio/tests/test_threads.py index e90a856a59..9da2838cbd 100644 --- a/trio/tests/test_threads.py +++ b/trio/tests/test_threads.py @@ -7,6 +7,7 @@ from .. import _core from .. import Event, CapacityLimiter, sleep from ..testing import wait_all_tasks_blocked +from .._core.tests.tutil import buggy_pypy_asyncgens from .._threads import ( to_thread_run_sync, current_default_thread_limiter, @@ -554,3 +555,24 @@ def not_called(): # pragma: no cover trio_token = _core.current_trio_token() with pytest.raises(RuntimeError): from_thread_run_sync(not_called, trio_token=trio_token) + + +@pytest.mark.skipif(buggy_pypy_asyncgens, reason="pypy 7.2.0 is buggy") +def test_from_thread_run_during_shutdown(): + save = [] + record = [] + + async def agen(): + try: + yield + finally: + with pytest.raises(_core.RunFinishedError), _core.CancelScope(shield=True): + await to_thread_run_sync(from_thread_run, sleep, 0) + record.append("ok") + + async def main(): + save.append(agen()) + await save[-1].asend(None) + + _core.run(main) + assert record == ["ok"] From 5d940e01ea9afed99db255edc9b7e2e5e2de8bf7 Mon Sep 17 00:00:00 2001 From: Joshua Oreman Date: Wed, 30 Sep 2020 22:16:28 -0800 Subject: [PATCH 2/2] Rerun gen_exports --- trio/_core/_generated_run.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/trio/_core/_generated_run.py b/trio/_core/_generated_run.py index 8c03bf2e09..0da2c4f48e 100644 --- a/trio/_core/_generated_run.py +++ b/trio/_core/_generated_run.py @@ -126,6 +126,15 @@ def spawn_system_task(async_fn, *args, name=None): * System tasks do not inherit context variables from their creator. + Towards the end of a call to :meth:`trio.run`, after the main + task and all system tasks have exited, the system nursery + becomes closed. At this point, new calls to + :func:`spawn_system_task` will raise ``RuntimeError("Nursery + is closed to new arrivals")`` instead of creating a system + task. It's possible to encounter this state either in + a ``finally`` block in an async generator, or in a callback + passed to :meth:`TrioToken.run_sync_soon` at the right moment. + Args: async_fn: An async callable. args: Positional arguments for ``async_fn``. If you want to pass