From d9d02dbc355670ac6a5dff9d731a1f028a197fd9 Mon Sep 17 00:00:00 2001 From: richardsheridan Date: Fri, 22 Jul 2022 07:50:59 -0400 Subject: [PATCH 01/31] Add tests and stub for thread cancellation and reuse. --- trio/_threads.py | 55 ++++++++++++++++++++- trio/tests/test_threads.py | 98 +++++++++++++++++++++++++++++++++++++- 2 files changed, 149 insertions(+), 4 deletions(-) diff --git a/trio/_threads.py b/trio/_threads.py index 807212e0f9..af70ae194c 100644 --- a/trio/_threads.py +++ b/trio/_threads.py @@ -57,6 +57,24 @@ class ThreadPlaceholder: name = attr.ib() +# Types for the to_thread_run_sync message loop +@attr.s(frozen=True, eq=False) +class ThreadDone: + result = attr.ib() + + +@attr.s(frozen=True, eq=False) +class Run: + async_fn = attr.ib() + queue = attr.ib(init=False, factory=stdlib_queue.SimpleQueue) + + +@attr.s(frozen=True, eq=False) +class RunSync: + sync_fn = attr.ib() + queue = attr.ib(init=False, factory=stdlib_queue.SimpleQueue) + + @enable_ki_protection async def to_thread_run_sync( sync_fn, *args, thread_name: Optional[str] = None, cancellable=False, limiter=None @@ -171,7 +189,9 @@ def do_release_then_return_result(): result = outcome.capture(do_release_then_return_result) if task_register[0] is not None: - trio.lowlevel.reschedule(task_register[0], result) + trio.lowlevel.reschedule( + task_register[0], outcome.Value(ThreadDone(result)) + ) current_trio_token = trio.lowlevel.current_trio_token() @@ -224,7 +244,21 @@ def abort(_): else: return trio.lowlevel.Abort.FAILED - return await trio.lowlevel.wait_task_rescheduled(abort) + while True: + msg_from_thread = await trio.lowlevel.wait_task_rescheduled(abort) + if type(msg_from_thread) is ThreadDone: + return msg_from_thread.result.unwrap() + elif type(msg_from_thread) is Run: + result = await outcome.acapture(msg_from_thread.async_fn) + msg_from_thread.queue.put(result) + elif type(msg_from_thread) is RunSync: + result = outcome.capture(msg_from_thread.sync_fn) + msg_from_thread.queue.put(result) + else: + raise TypeError( + "trio.to_thread.run_sync received unrecognized thread message {!r}." + "".format(msg_from_thread) + ) def _run_fn_as_system_task(cb, fn, *args, context, trio_token=None): @@ -382,3 +416,20 @@ def unprotected_fn(): context=context, trio_token=trio_token, ) + + +def from_thread_check_cancelled(): + """Check if the Trio task that controls this thread has been cancelled. + + This check only works if the thread was spawned by `trio.to_thread.run_sync`. + + Returns: + bool: True if `Cancelled` has been raised from the corresponding call + to `trio.to_thread.run_sync`, False otherwise. + + Raises: + AttributeError: if this thread was not created with + `trio.to_thread.run_sync`. + """ + + return False diff --git a/trio/tests/test_threads.py b/trio/tests/test_threads.py index ce852d4612..bc38019f2d 100644 --- a/trio/tests/test_threads.py +++ b/trio/tests/test_threads.py @@ -21,6 +21,7 @@ from_thread_run, from_thread_run_sync, to_thread_run_sync, + from_thread_check_cancelled, ) from ..testing import wait_all_tasks_blocked @@ -849,8 +850,8 @@ async def main(): async def test_trio_token_weak_referenceable(): - token = current_trio_token() - assert isinstance(token, TrioToken) + token = _core.current_trio_token() + assert isinstance(token, _core.TrioToken) weak_reference = weakref.ref(token) assert token is weak_reference() @@ -864,3 +865,96 @@ def __bool__(self): with pytest.raises(NotImplementedError): await to_thread_run_sync(int, cancellable=BadBool()) + + +async def test_from_thread_reuses_task(): + task = _core.current_task() + + async def async_current_task(): + return _core.current_task() + + assert task is await to_thread_run_sync(from_thread_run_sync, _core.current_task) + assert task is await to_thread_run_sync(from_thread_run, async_current_task) + + +async def test_to_thread_reuses_thread(): + tid = None + + def get_tid_then_reenter(): + nonlocal tid + tid = threading.current_thread() + return from_thread_run(to_thread_run_sync, threading.current_thread) + + assert tid == await to_thread_run_sync(from_thread_run, get_tid_then_reenter) + + +@pytest.mark.parametrize("parties", [1, 2, 3]) +async def test_to_thread_reuses_thread_once_w_nursery(parties): + barrier = threading.Barrier(parties) + tids = [] + + def wait_then_get_tid(): + barrier.wait(1.0) + tids.append(threading.current_thread()) + + async def re_reenter(): + async with _core.open_nursery() as nursery: + for _ in range(barrier.parties): + nursery.start_soon(to_thread_run_sync, wait_then_get_tid) + + def reenter_then_get_tid(): + from_thread_run(re_reenter) + return threading.current_thread() + + main_tid = await to_thread_run_sync(reenter_then_get_tid) + assert tids.count(main_tid) == 1 + + +async def test_from_thread_check_cancelled(): + assert False is await to_thread_run_sync(from_thread_check_cancelled) + + +async def test_cancel_uncancellable_then_check(): + def in_thread(): + from_thread_run_sync(cancel_scope.cancel) + return from_thread_check_cancelled() + + with _core.CancelScope() as cancel_scope: + assert False is await to_thread_run_sync(in_thread) + + +async def test_cancel_cancellable_then_wait_then_check(): + q = stdlib_queue.SimpleQueue() + ev = threading.Event() + + async def foo(): + pass # pragma: no cover + + def in_thread(): + from_thread_run_sync(cancel_scope.cancel) + ev.wait(timeout=1.0) + q.put(from_thread_check_cancelled()) + try: + from_thread_run_sync(bool) + except _core.Cancelled: + q.put(True) + except BaseException as exc: # pragma: no cover + q.put(exc) + else: + q.put(False) + try: + from_thread_run(foo) + except _core.Cancelled: + q.put(True) + except BaseException as exc: # pragma: no cover + q.put(exc) + else: + q.put(False) + + with _core.CancelScope() as cancel_scope: + await to_thread_run_sync(in_thread, cancellable=True) + assert cancel_scope.cancelled_caught + ev.set() + assert True is q.get() + assert True is q.get() + assert True is q.get() From 0c0ff10e6ba2bfafae34cb58232b9ef7a7a206aa Mon Sep 17 00:00:00 2001 From: richardsheridan Date: Sat, 4 Mar 2023 14:47:27 -0500 Subject: [PATCH 02/31] make from_thread tests pass --- trio/_threads.py | 147 +++++++++++++++++++++++++------------ trio/tests/test_threads.py | 15 ++-- 2 files changed, 110 insertions(+), 52 deletions(-) diff --git a/trio/_threads.py b/trio/_threads.py index af70ae194c..1099c056c5 100644 --- a/trio/_threads.py +++ b/trio/_threads.py @@ -65,15 +65,58 @@ class ThreadDone: @attr.s(frozen=True, eq=False) class Run: - async_fn = attr.ib() + afn = attr.ib() + args = attr.ib() + context = attr.ib() queue = attr.ib(init=False, factory=stdlib_queue.SimpleQueue) + async def run(self): + @disable_ki_protection + async def unprotected_afn(): + coro = coroutine_or_error(self.afn, *self.args) + return await coro + + task = trio.lowlevel.current_task() + old_context = task.context + task.context = self.context.copy() + task.context.run(current_async_library_cvar.set, "trio") + try: + await trio.lowlevel.cancel_shielded_checkpoint() + result = await outcome.acapture(unprotected_afn) + self.queue.put(result) + finally: + task.context = old_context + await trio.lowlevel.cancel_shielded_checkpoint() + @attr.s(frozen=True, eq=False) class RunSync: - sync_fn = attr.ib() + fn = attr.ib() + args = attr.ib() + context = attr.ib() queue = attr.ib(init=False, factory=stdlib_queue.SimpleQueue) + def run_sync(self): + context = self.context.copy() + context.run(current_async_library_cvar.set, "trio") + + result = outcome.capture(context.run, _unprotected_fn, self.fn, self.args) + self.queue.put(result) + + +@disable_ki_protection +def _unprotected_fn(fn, args): + ret = fn(*args) + + if inspect.iscoroutine(ret): + # Manually close coroutine to avoid RuntimeWarnings + ret.close() + raise TypeError( + "Trio expected a sync function, but {!r} appears to be " + "asynchronous".format(getattr(fn, "__qualname__", fn)) + ) + + return ret @enable_ki_protection async def to_thread_run_sync( @@ -201,6 +244,7 @@ def do_release_then_return_result(): def worker_fn(): current_async_library_cvar.set(None) TOKEN_LOCAL.token = current_trio_token + TOKEN_LOCAL.task_register = task_register try: ret = sync_fn(*args) @@ -215,6 +259,7 @@ def worker_fn(): return ret finally: del TOKEN_LOCAL.token + del TOKEN_LOCAL.task_register context = contextvars.copy_context() contextvars_aware_worker_fn = functools.partial(context.run, worker_fn) @@ -246,40 +291,26 @@ def abort(_): while True: msg_from_thread = await trio.lowlevel.wait_task_rescheduled(abort) - if type(msg_from_thread) is ThreadDone: + if isinstance(msg_from_thread, ThreadDone): return msg_from_thread.result.unwrap() - elif type(msg_from_thread) is Run: - result = await outcome.acapture(msg_from_thread.async_fn) - msg_from_thread.queue.put(result) - elif type(msg_from_thread) is RunSync: - result = outcome.capture(msg_from_thread.sync_fn) - msg_from_thread.queue.put(result) - else: + elif isinstance(msg_from_thread, Run): + await msg_from_thread.run() + elif isinstance(msg_from_thread, RunSync): + msg_from_thread.run_sync() + else: # pragma: no cover, internal debugging guard raise TypeError( "trio.to_thread.run_sync received unrecognized thread message {!r}." "".format(msg_from_thread) ) + del msg_from_thread -def _run_fn_as_system_task(cb, fn, *args, context, trio_token=None): - """Helper function for from_thread.run and from_thread.run_sync. +def _raise_if_trio_run(): + """Raise a RuntimeError if this function is called within a trio run. - Since this internally uses TrioToken.run_sync_soon, all warnings about - raised exceptions canceling all tasks should be noted. + Avoids deadlock by making sure we're not called from inside a context + that we might be waiting for and blocking it. """ - - if trio_token and not isinstance(trio_token, TrioToken): - raise RuntimeError("Passed kwarg trio_token is not of type TrioToken") - - if not trio_token: - try: - trio_token = TOKEN_LOCAL.token - except AttributeError: - raise RuntimeError( - "this thread wasn't created by Trio, pass kwarg trio_token=..." - ) - - # Avoid deadlock by making sure we're not called from Trio thread try: trio.lowlevel.current_task() except RuntimeError: @@ -287,6 +318,34 @@ def _run_fn_as_system_task(cb, fn, *args, context, trio_token=None): else: raise RuntimeError("this is a blocking function; call it from a thread") + +def _send_message_to_host_task(message): + try: + token = TOKEN_LOCAL.token + except AttributeError: + raise RuntimeError( + "this thread wasn't created by Trio, pass kwarg trio_token=..." + ) + task_register = TOKEN_LOCAL.task_register + + def in_trio_thread(): + task = task_register[0] + if task is None: + message.queue.put(outcome.Error(trio.Cancelled._create())) + trio.lowlevel.reschedule(task, outcome.Value(message)) + + token.run_sync_soon(in_trio_thread) + + +def _run_fn_as_system_task(cb, fn, *args, context, trio_token): + """Helper function for from_thread.run and from_thread.run_sync. + + Since this internally uses TrioToken.run_sync_soon, all warnings about + raised exceptions canceling all tasks should be noted. + """ + if not isinstance(trio_token, TrioToken): + raise RuntimeError("Passed kwarg trio_token is not of type TrioToken") + q = stdlib_queue.SimpleQueue() trio_token.run_sync_soon(context.run, cb, q, fn, args) return q.get().unwrap() @@ -326,6 +385,13 @@ def from_thread_run(afn, *args, trio_token=None): "foreign" thread, spawned using some other framework, and still want to enter Trio. """ + _raise_if_trio_run() + context = contextvars.copy_context() + + if not trio_token: + message_to_trio = Run(afn, args, context) + _send_message_to_host_task(message_to_trio) + return message_to_trio.queue.get().unwrap() def callback(q, afn, args): @disable_ki_protection @@ -346,7 +412,6 @@ async def await_in_trio_thread_task(): outcome.Error(trio.RunFinishedError("system nursery is closed")) ) - context = contextvars.copy_context() return _run_fn_as_system_task( callback, afn, @@ -386,29 +451,19 @@ def from_thread_run_sync(fn, *args, trio_token=None): "foreign" thread, spawned using some other framework, and still want to enter Trio. """ + _raise_if_trio_run() + context = contextvars.copy_context() + + if not trio_token: + message_to_trio = RunSync(fn, args, context) + _send_message_to_host_task(message_to_trio) + return message_to_trio.queue.get().unwrap() def callback(q, fn, args): current_async_library_cvar.set("trio") - - @disable_ki_protection - def unprotected_fn(): - ret = fn(*args) - - if inspect.iscoroutine(ret): - # Manually close coroutine to avoid RuntimeWarnings - ret.close() - raise TypeError( - "Trio expected a sync function, but {!r} appears to be " - "asynchronous".format(getattr(fn, "__qualname__", fn)) - ) - - return ret - - res = outcome.capture(unprotected_fn) + res = outcome.capture(_unprotected_fn, fn, args) q.put_nowait(res) - context = contextvars.copy_context() - return _run_fn_as_system_task( callback, fn, diff --git a/trio/tests/test_threads.py b/trio/tests/test_threads.py index bc38019f2d..9bad281c9c 100644 --- a/trio/tests/test_threads.py +++ b/trio/tests/test_threads.py @@ -701,7 +701,7 @@ def thread_fn(token): assert callee_token == caller_token -async def test_from_thread_no_token(): +def test_from_thread_no_token(): # Test that a "raw call" to trio.from_thread.run() fails because no token # has been provided @@ -833,19 +833,22 @@ def test_from_thread_run_during_shutdown(): save = [] record = [] - async def agen(): + async def agen(token): try: yield finally: with pytest.raises(_core.RunFinishedError), _core.CancelScope(shield=True): - await to_thread_run_sync(from_thread_run, sleep, 0) + await to_thread_run_sync( + partial(from_thread_run, sleep, 0, trio_token=token) + ) record.append("ok") - async def main(): - save.append(agen()) + async def main(use_system_task): + save.append(agen(_core.current_trio_token() if use_system_task else None)) await save[-1].asend(None) - _core.run(main) + _core.run(main, True) # System nursery will be closed and raise RunFinishedError + _core.run(main, False) # host task will not be rescheduled assert record == ["ok"] From 5f86fb20eb2e2eb3ae94a3a36a12dfeabbb03dfd Mon Sep 17 00:00:00 2001 From: richardsheridan Date: Sat, 4 Mar 2023 15:18:20 -0500 Subject: [PATCH 03/31] remove failing tests and untested feature --- trio/_threads.py | 18 +-------- trio/tests/test_threads.py | 83 ++------------------------------------ 2 files changed, 5 insertions(+), 96 deletions(-) diff --git a/trio/_threads.py b/trio/_threads.py index 1099c056c5..f53ae60c29 100644 --- a/trio/_threads.py +++ b/trio/_threads.py @@ -118,6 +118,7 @@ def _unprotected_fn(fn, args): return ret + @enable_ki_protection async def to_thread_run_sync( sync_fn, *args, thread_name: Optional[str] = None, cancellable=False, limiter=None @@ -471,20 +472,3 @@ def callback(q, fn, args): context=context, trio_token=trio_token, ) - - -def from_thread_check_cancelled(): - """Check if the Trio task that controls this thread has been cancelled. - - This check only works if the thread was spawned by `trio.to_thread.run_sync`. - - Returns: - bool: True if `Cancelled` has been raised from the corresponding call - to `trio.to_thread.run_sync`, False otherwise. - - Raises: - AttributeError: if this thread was not created with - `trio.to_thread.run_sync`. - """ - - return False diff --git a/trio/tests/test_threads.py b/trio/tests/test_threads.py index 9bad281c9c..6b10d86a21 100644 --- a/trio/tests/test_threads.py +++ b/trio/tests/test_threads.py @@ -11,8 +11,6 @@ import pytest from sniffio import current_async_library_cvar -from trio._core import TrioToken, current_trio_token - from .. import CapacityLimiter, Event, _core, sleep from .._core.tests.test_ki import ki_self from .._core.tests.tutil import buggy_pypy_asyncgens @@ -21,7 +19,6 @@ from_thread_run, from_thread_run_sync, to_thread_run_sync, - from_thread_check_cancelled, ) from ..testing import wait_all_tasks_blocked @@ -880,84 +877,12 @@ async def async_current_task(): assert task is await to_thread_run_sync(from_thread_run, async_current_task) -async def test_to_thread_reuses_thread(): +async def test_recursive_to_thread(): tid = None def get_tid_then_reenter(): nonlocal tid - tid = threading.current_thread() - return from_thread_run(to_thread_run_sync, threading.current_thread) - - assert tid == await to_thread_run_sync(from_thread_run, get_tid_then_reenter) - - -@pytest.mark.parametrize("parties", [1, 2, 3]) -async def test_to_thread_reuses_thread_once_w_nursery(parties): - barrier = threading.Barrier(parties) - tids = [] - - def wait_then_get_tid(): - barrier.wait(1.0) - tids.append(threading.current_thread()) - - async def re_reenter(): - async with _core.open_nursery() as nursery: - for _ in range(barrier.parties): - nursery.start_soon(to_thread_run_sync, wait_then_get_tid) - - def reenter_then_get_tid(): - from_thread_run(re_reenter) - return threading.current_thread() - - main_tid = await to_thread_run_sync(reenter_then_get_tid) - assert tids.count(main_tid) == 1 - - -async def test_from_thread_check_cancelled(): - assert False is await to_thread_run_sync(from_thread_check_cancelled) + tid = threading.get_ident() + return from_thread_run(to_thread_run_sync, threading.get_ident) - -async def test_cancel_uncancellable_then_check(): - def in_thread(): - from_thread_run_sync(cancel_scope.cancel) - return from_thread_check_cancelled() - - with _core.CancelScope() as cancel_scope: - assert False is await to_thread_run_sync(in_thread) - - -async def test_cancel_cancellable_then_wait_then_check(): - q = stdlib_queue.SimpleQueue() - ev = threading.Event() - - async def foo(): - pass # pragma: no cover - - def in_thread(): - from_thread_run_sync(cancel_scope.cancel) - ev.wait(timeout=1.0) - q.put(from_thread_check_cancelled()) - try: - from_thread_run_sync(bool) - except _core.Cancelled: - q.put(True) - except BaseException as exc: # pragma: no cover - q.put(exc) - else: - q.put(False) - try: - from_thread_run(foo) - except _core.Cancelled: - q.put(True) - except BaseException as exc: # pragma: no cover - q.put(exc) - else: - q.put(False) - - with _core.CancelScope() as cancel_scope: - await to_thread_run_sync(in_thread, cancellable=True) - assert cancel_scope.cancelled_caught - ev.set() - assert True is q.get() - assert True is q.get() - assert True is q.get() + assert tid != await to_thread_run_sync(get_tid_then_reenter) From 5f75c86436833b9c0d954f4aadbf38003f2be302 Mon Sep 17 00:00:00 2001 From: richardsheridan Date: Sat, 4 Mar 2023 16:23:30 -0500 Subject: [PATCH 04/31] refactor system tasks to use messages as well --- trio/_threads.py | 128 ++++++++++++++++++++--------------------------- 1 file changed, 53 insertions(+), 75 deletions(-) diff --git a/trio/_threads.py b/trio/_threads.py index f53ae60c29..b920c3bb49 100644 --- a/trio/_threads.py +++ b/trio/_threads.py @@ -70,24 +70,28 @@ class Run: context = attr.ib() queue = attr.ib(init=False, factory=stdlib_queue.SimpleQueue) - async def run(self): - @disable_ki_protection - async def unprotected_afn(): - coro = coroutine_or_error(self.afn, *self.args) - return await coro + @disable_ki_protection + async def unprotected_afn(self): + coro = coroutine_or_error(self.afn, *self.args) + return await coro + async def run(self): task = trio.lowlevel.current_task() old_context = task.context task.context = self.context.copy() task.context.run(current_async_library_cvar.set, "trio") try: await trio.lowlevel.cancel_shielded_checkpoint() - result = await outcome.acapture(unprotected_afn) - self.queue.put(result) + result = await outcome.acapture(self.unprotected_afn) + self.queue.put_nowait(result) finally: task.context = old_context await trio.lowlevel.cancel_shielded_checkpoint() + async def run_system(self): + result = await outcome.acapture(self.unprotected_afn) + self.queue.put_nowait(result) + @attr.s(frozen=True, eq=False) class RunSync: @@ -97,26 +101,24 @@ class RunSync: queue = attr.ib(init=False, factory=stdlib_queue.SimpleQueue) def run_sync(self): - context = self.context.copy() - context.run(current_async_library_cvar.set, "trio") - - result = outcome.capture(context.run, _unprotected_fn, self.fn, self.args) - self.queue.put(result) + @disable_ki_protection + def unprotected_fn(): + ret = self.fn(*self.args) + if inspect.iscoroutine(ret): + # Manually close coroutine to avoid RuntimeWarnings + ret.close() + raise TypeError( + "Trio expected a sync function, but {!r} appears to be " + "asynchronous".format(getattr(self.fn, "__qualname__", self.fn)) + ) -@disable_ki_protection -def _unprotected_fn(fn, args): - ret = fn(*args) + return ret - if inspect.iscoroutine(ret): - # Manually close coroutine to avoid RuntimeWarnings - ret.close() - raise TypeError( - "Trio expected a sync function, but {!r} appears to be " - "asynchronous".format(getattr(fn, "__qualname__", fn)) - ) + self.context.run(current_async_library_cvar.set, "trio") - return ret + result = outcome.capture(self.context.run, unprotected_fn) + self.queue.put_nowait(result) @enable_ki_protection @@ -332,24 +334,38 @@ def _send_message_to_host_task(message): def in_trio_thread(): task = task_register[0] if task is None: - message.queue.put(outcome.Error(trio.Cancelled._create())) + message.queue.put_nowait(outcome.Error(trio.Cancelled._create())) trio.lowlevel.reschedule(task, outcome.Value(message)) token.run_sync_soon(in_trio_thread) -def _run_fn_as_system_task(cb, fn, *args, context, trio_token): - """Helper function for from_thread.run and from_thread.run_sync. - - Since this internally uses TrioToken.run_sync_soon, all warnings about - raised exceptions canceling all tasks should be noted. - """ +def _send_message_to_system_task(message, trio_token): if not isinstance(trio_token, TrioToken): raise RuntimeError("Passed kwarg trio_token is not of type TrioToken") - q = stdlib_queue.SimpleQueue() - trio_token.run_sync_soon(context.run, cb, q, fn, args) - return q.get().unwrap() + if isinstance(message, RunSync): + run_sync = message.run_sync + elif isinstance(message, Run): + + def run_sync(): + try: + trio.lowlevel.spawn_system_task( + message.run_system, name=message.afn, context=message.context + ) + except RuntimeError: # system nursery is closed + message.queue.put_nowait( + outcome.Error(trio.RunFinishedError("system nursery is closed")) + ) + + else: # pragma: no cover, internal debugging guard + raise TypeError( + "trio.to_thread.run_sync received unrecognized thread message {!r}." + "".format(message) + ) + + trio_token.run_sync_soon(run_sync) + return message.queue.get().unwrap() def from_thread_run(afn, *args, trio_token=None): @@ -387,39 +403,13 @@ def from_thread_run(afn, *args, trio_token=None): to enter Trio. """ _raise_if_trio_run() - context = contextvars.copy_context() + message_to_trio = Run(afn, args, contextvars.copy_context()) if not trio_token: - message_to_trio = Run(afn, args, context) _send_message_to_host_task(message_to_trio) return message_to_trio.queue.get().unwrap() - def callback(q, afn, args): - @disable_ki_protection - async def unprotected_afn(): - coro = coroutine_or_error(afn, *args) - return await coro - - async def await_in_trio_thread_task(): - q.put_nowait(await outcome.acapture(unprotected_afn)) - - context = contextvars.copy_context() - try: - trio.lowlevel.spawn_system_task( - await_in_trio_thread_task, name=afn, context=context - ) - except RuntimeError: # system nursery is closed - q.put_nowait( - outcome.Error(trio.RunFinishedError("system nursery is closed")) - ) - - return _run_fn_as_system_task( - callback, - afn, - *args, - context=context, - trio_token=trio_token, - ) + return _send_message_to_system_task(message_to_trio, trio_token) def from_thread_run_sync(fn, *args, trio_token=None): @@ -453,22 +443,10 @@ def from_thread_run_sync(fn, *args, trio_token=None): to enter Trio. """ _raise_if_trio_run() - context = contextvars.copy_context() + message_to_trio = RunSync(fn, args, contextvars.copy_context()) if not trio_token: - message_to_trio = RunSync(fn, args, context) _send_message_to_host_task(message_to_trio) return message_to_trio.queue.get().unwrap() - def callback(q, fn, args): - current_async_library_cvar.set("trio") - res = outcome.capture(_unprotected_fn, fn, args) - q.put_nowait(res) - - return _run_fn_as_system_task( - callback, - fn, - *args, - context=context, - trio_token=trio_token, - ) + return _send_message_to_system_task(message_to_trio, trio_token) From 7ccf0f784e6382eec7730e062e477a1d9a947ba6 Mon Sep 17 00:00:00 2001 From: richardsheridan Date: Thu, 9 Mar 2023 07:36:55 -0500 Subject: [PATCH 05/31] factor out trio_token checking logic --- trio/_threads.py | 53 ++++++++++++++++++++++++++---------------------- 1 file changed, 29 insertions(+), 24 deletions(-) diff --git a/trio/_threads.py b/trio/_threads.py index b920c3bb49..c0f0728873 100644 --- a/trio/_threads.py +++ b/trio/_threads.py @@ -308,12 +308,25 @@ def abort(_): del msg_from_thread -def _raise_if_trio_run(): +def _check_token(trio_token): """Raise a RuntimeError if this function is called within a trio run. Avoids deadlock by making sure we're not called from inside a context that we might be waiting for and blocking it. """ + + if trio_token and not isinstance(trio_token, TrioToken): + raise RuntimeError("Passed kwarg trio_token is not of type TrioToken") + + if not trio_token: + try: + trio_token = TOKEN_LOCAL.token + except AttributeError: + raise RuntimeError( + "this thread wasn't created by Trio, pass kwarg trio_token=..." + ) + + # Avoid deadlock by making sure we're not called from Trio thread try: trio.lowlevel.current_task() except RuntimeError: @@ -321,14 +334,10 @@ def _raise_if_trio_run(): else: raise RuntimeError("this is a blocking function; call it from a thread") + return trio_token -def _send_message_to_host_task(message): - try: - token = TOKEN_LOCAL.token - except AttributeError: - raise RuntimeError( - "this thread wasn't created by Trio, pass kwarg trio_token=..." - ) + +def _send_message_to_host_task(message, trio_token): task_register = TOKEN_LOCAL.task_register def in_trio_thread(): @@ -337,13 +346,11 @@ def in_trio_thread(): message.queue.put_nowait(outcome.Error(trio.Cancelled._create())) trio.lowlevel.reschedule(task, outcome.Value(message)) - token.run_sync_soon(in_trio_thread) + trio_token.run_sync_soon(in_trio_thread) + return message.queue.get().unwrap() def _send_message_to_system_task(message, trio_token): - if not isinstance(trio_token, TrioToken): - raise RuntimeError("Passed kwarg trio_token is not of type TrioToken") - if isinstance(message, RunSync): run_sync = message.run_sync elif isinstance(message, Run): @@ -402,14 +409,13 @@ def from_thread_run(afn, *args, trio_token=None): "foreign" thread, spawned using some other framework, and still want to enter Trio. """ - _raise_if_trio_run() + checked_token = _check_token(trio_token) message_to_trio = Run(afn, args, contextvars.copy_context()) - if not trio_token: - _send_message_to_host_task(message_to_trio) - return message_to_trio.queue.get().unwrap() - - return _send_message_to_system_task(message_to_trio, trio_token) + if trio_token: + return _send_message_to_system_task(message_to_trio, checked_token) + else: + return _send_message_to_host_task(message_to_trio, checked_token) def from_thread_run_sync(fn, *args, trio_token=None): @@ -442,11 +448,10 @@ def from_thread_run_sync(fn, *args, trio_token=None): "foreign" thread, spawned using some other framework, and still want to enter Trio. """ - _raise_if_trio_run() + checked_token = _check_token(trio_token) message_to_trio = RunSync(fn, args, contextvars.copy_context()) - if not trio_token: - _send_message_to_host_task(message_to_trio) - return message_to_trio.queue.get().unwrap() - - return _send_message_to_system_task(message_to_trio, trio_token) + if trio_token: + return _send_message_to_system_task(message_to_trio, checked_token) + else: + return _send_message_to_host_task(message_to_trio, checked_token) From 6d3b2d6169b1d6c0ba479ca61abef591c81128f1 Mon Sep 17 00:00:00 2001 From: richardsheridan Date: Mon, 8 Aug 2022 12:35:57 -0400 Subject: [PATCH 06/31] Add trio.from_thread.check_cancelled api to allow threads to efficiently poll for cancellation --- trio/_threads.py | 44 ++++++++++++++++---- trio/from_thread.py | 1 + trio/tests/test_threads.py | 84 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 120 insertions(+), 9 deletions(-) diff --git a/trio/_threads.py b/trio/_threads.py index c0f0728873..a74fd17520 100644 --- a/trio/_threads.py +++ b/trio/_threads.py @@ -22,8 +22,8 @@ from ._sync import CapacityLimiter from ._util import coroutine_or_error -# Global due to Threading API, thread local storage for trio token -TOKEN_LOCAL = threading.local() +# Global due to Threading API, thread local storage for trio token and raise_cancel +THREAD_LOCAL = threading.local() _limiter_local = RunVar("limiter") # I pulled this number out of the air; it isn't based on anything. Probably we @@ -217,6 +217,9 @@ async def to_thread_run_sync( # for the result – or None if this function was cancelled and we should # discard the result. task_register = [trio.lowlevel.current_task()] + # Holds a reference to the raise_cancel function provided if a cancellation + # is attempted against this task - or None if no such delivery has happened. + cancel_register = [None] name = f"trio.to_thread.run_sync-{next(_thread_counter)}" placeholder = ThreadPlaceholder(name) @@ -246,8 +249,9 @@ def do_release_then_return_result(): def worker_fn(): current_async_library_cvar.set(None) - TOKEN_LOCAL.token = current_trio_token - TOKEN_LOCAL.task_register = task_register + THREAD_LOCAL.token = current_trio_token + THREAD_LOCAL.cancel_register = cancel_register + THREAD_LOCAL.task_register = task_register try: ret = sync_fn(*args) @@ -261,8 +265,9 @@ def worker_fn(): return ret finally: - del TOKEN_LOCAL.token - del TOKEN_LOCAL.task_register + del THREAD_LOCAL.token + del THREAD_LOCAL.cancel_register + del THREAD_LOCAL.task_register context = contextvars.copy_context() contextvars_aware_worker_fn = functools.partial(context.run, worker_fn) @@ -285,8 +290,11 @@ def deliver_worker_fn_result(result): limiter.release_on_behalf_of(placeholder) raise - def abort(_): + def abort(raise_cancel): + # fill so from_thread_check_cancelled can raise + cancel_register[0] = raise_cancel if cancellable: + # empty so report_back_in_trio_thread_fn cannot reschedule task_register[0] = None return trio.lowlevel.Abort.SUCCEEDED else: @@ -308,6 +316,24 @@ def abort(_): del msg_from_thread +def from_thread_check_cancelled(): + """Raise trio.Cancelled if the associated Trio task entered a cancelled status. + + Only applicable to threads spawned by `trio.to_thread.run_sync`. Poll to allow + ``cancellable=False`` threads to raise :exc:`trio.Cancelled` at a suitable + place, or to end abandoned ``cancellable=True`` sooner than they may otherwise. + + Raises: + Cancelled: If the corresponding call to `trio.to_thread.run_sync` has had a + delivery of cancellation attempted against it, regardless of the value of + ``cancellable`` supplied as an argument to it. + AttributeError: If this thread is not spawned from `trio.to_thread.run_sync`. + """ + raise_cancel = THREAD_LOCAL.cancel_register[0] + if raise_cancel is not None: + raise_cancel() + + def _check_token(trio_token): """Raise a RuntimeError if this function is called within a trio run. @@ -320,7 +346,7 @@ def _check_token(trio_token): if not trio_token: try: - trio_token = TOKEN_LOCAL.token + trio_token = THREAD_LOCAL.token except AttributeError: raise RuntimeError( "this thread wasn't created by Trio, pass kwarg trio_token=..." @@ -338,7 +364,7 @@ def _check_token(trio_token): def _send_message_to_host_task(message, trio_token): - task_register = TOKEN_LOCAL.task_register + task_register = THREAD_LOCAL.task_register def in_trio_thread(): task = task_register[0] diff --git a/trio/from_thread.py b/trio/from_thread.py index 296a5a89ea..93dffd31f4 100644 --- a/trio/from_thread.py +++ b/trio/from_thread.py @@ -5,3 +5,4 @@ from ._threads import from_thread_run as run from ._threads import from_thread_run_sync as run_sync +from ._threads import from_thread_check_cancelled as check_cancelled diff --git a/trio/tests/test_threads.py b/trio/tests/test_threads.py index 6b10d86a21..c453eefe1b 100644 --- a/trio/tests/test_threads.py +++ b/trio/tests/test_threads.py @@ -18,6 +18,7 @@ current_default_thread_limiter, from_thread_run, from_thread_run_sync, + from_thread_check_cancelled, to_thread_run_sync, ) from ..testing import wait_all_tasks_blocked @@ -886,3 +887,86 @@ def get_tid_then_reenter(): return from_thread_run(to_thread_run_sync, threading.get_ident) assert tid != await to_thread_run_sync(get_tid_then_reenter) + + +async def test_from_thread_check_cancelled(): + q = stdlib_queue.Queue() + + async def child(cancellable): + record.append("start") + try: + return await to_thread_run_sync(f, cancellable=cancellable) + except _core.Cancelled: + record.append("cancel") + finally: + record.append("exit") + + def f(): + try: + from_thread_check_cancelled() + except _core.Cancelled: # pragma: no cover, test failure path + q.put("Cancelled") + else: + q.put("Not Cancelled") + ev.wait() + return from_thread_check_cancelled() + + # Base case: nothing cancelled so we shouldn't see cancels anywhere + record = [] + ev = threading.Event() + async with _core.open_nursery() as nursery: + nursery.start_soon(child, False) + await wait_all_tasks_blocked() + assert record[0] == "start" + assert q.get(timeout=1) == "Not Cancelled" + ev.set() + # implicit assertion, Cancelled not raised via nursery + assert record[1] == "exit" + + # cancellable=False case: a cancel will pop out but be handled by + # the appropriate cancel scope + record = [] + ev = threading.Event() + async with _core.open_nursery() as nursery: + nursery.start_soon(child, False) + await wait_all_tasks_blocked() + assert record[0] == "start" + assert q.get(timeout=1) == "Not Cancelled" + nursery.cancel_scope.cancel() + ev.set() + assert nursery.cancel_scope.cancelled_caught + assert "cancel" in record + assert record[-1] == "exit" + + # cancellable=True case: slightly different thread behavior needed + # check thread is cancelled "soon" after abandonment + def f(): + ev.wait() + try: + from_thread_check_cancelled() + except _core.Cancelled: + q.put("Cancelled") + else: # pragma: no cover, test failure path + q.put("Not Cancelled") + + record = [] + ev = threading.Event() + async with _core.open_nursery() as nursery: + nursery.start_soon(child, True) + await wait_all_tasks_blocked() + assert record[0] == "start" + nursery.cancel_scope.cancel() + ev.set() + assert nursery.cancel_scope.cancelled_caught + assert "cancel" in record + assert record[-1] == "exit" + assert q.get(timeout=1) == "Cancelled" + + +async def test_from_thread_check_cancelled_raises_in_foreign_threads(): + with pytest.raises(AttributeError): + from_thread_check_cancelled() + q = stdlib_queue.Queue() + _core.start_thread_soon(from_thread_check_cancelled, lambda _: q.put(_)) + with pytest.raises(AttributeError): + q.get(timeout=1).unwrap() From 2d911ea767f8caff52123a031b5263ced77e0d80 Mon Sep 17 00:00:00 2001 From: richardsheridan Date: Mon, 8 Aug 2022 13:00:10 -0400 Subject: [PATCH 07/31] Document trio.from_thread.check_cancelled --- docs/source/reference-core.rst | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/docs/source/reference-core.rst b/docs/source/reference-core.rst index 922ae4680e..055cd9dd18 100644 --- a/docs/source/reference-core.rst +++ b/docs/source/reference-core.rst @@ -1808,6 +1808,11 @@ to spawn a child thread, and then use a :ref:`memory channel .. literalinclude:: reference-core/from-thread-example.py +You can also perform a non-blocking check for cancellation from threads spawned +by func:`trio.to_thread.run_sync`. + +.. autofunction:: trio.from_thread.check_cancelled + Threads and task-local storage ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ From bc6c82a3903bc1677adc14b30df7528c4ad47bb3 Mon Sep 17 00:00:00 2001 From: richardsheridan Date: Mon, 8 Aug 2022 13:00:19 -0400 Subject: [PATCH 08/31] Add Newsfragment --- newsfragments/2392.feature.rst | 1 + 1 file changed, 1 insertion(+) create mode 100644 newsfragments/2392.feature.rst diff --git a/newsfragments/2392.feature.rst b/newsfragments/2392.feature.rst new file mode 100644 index 0000000000..06bf5792d4 --- /dev/null +++ b/newsfragments/2392.feature.rst @@ -0,0 +1 @@ +If called from a thread spawned by `trio.to_thread.run_sync`, `trio.from_thread.run` and `trio.from_thread.run_sync` now reuse the task and cancellation status of the host task and have `trio.from_thread.check_cancelled` to efficiently poll for cancellation. From 644b9137ef931db835bbd14a81f2896477d47c59 Mon Sep 17 00:00:00 2001 From: richardsheridan Date: Mon, 8 Aug 2022 13:43:04 -0400 Subject: [PATCH 09/31] Linting and fix docs --- trio/_threads.py | 4 ++-- trio/tests/test_threads.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/trio/_threads.py b/trio/_threads.py index a74fd17520..b0c8bb807e 100644 --- a/trio/_threads.py +++ b/trio/_threads.py @@ -325,8 +325,8 @@ def from_thread_check_cancelled(): Raises: Cancelled: If the corresponding call to `trio.to_thread.run_sync` has had a - delivery of cancellation attempted against it, regardless of the value of - ``cancellable`` supplied as an argument to it. + delivery of cancellation attempted against it, regardless of the value of + ``cancellable`` supplied as an argument to it. AttributeError: If this thread is not spawned from `trio.to_thread.run_sync`. """ raise_cancel = THREAD_LOCAL.cancel_register[0] diff --git a/trio/tests/test_threads.py b/trio/tests/test_threads.py index c453eefe1b..50b15cc6f6 100644 --- a/trio/tests/test_threads.py +++ b/trio/tests/test_threads.py @@ -940,7 +940,7 @@ def f(): # cancellable=True case: slightly different thread behavior needed # check thread is cancelled "soon" after abandonment - def f(): + def f(): # noqa: F811 ev.wait() try: from_thread_check_cancelled() From b87456f41112066ea7391f68e0350f2172a33413 Mon Sep 17 00:00:00 2001 From: richardsheridan Date: Wed, 8 Mar 2023 23:04:33 -0500 Subject: [PATCH 10/31] use cancel_register in _send_message_to_host_task avoids creating a Cancelled manually and a double-reschedule race --- trio/_threads.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/trio/_threads.py b/trio/_threads.py index b0c8bb807e..523247748c 100644 --- a/trio/_threads.py +++ b/trio/_threads.py @@ -365,12 +365,15 @@ def _check_token(trio_token): def _send_message_to_host_task(message, trio_token): task_register = THREAD_LOCAL.task_register + cancel_register = THREAD_LOCAL.cancel_register def in_trio_thread(): - task = task_register[0] - if task is None: - message.queue.put_nowait(outcome.Error(trio.Cancelled._create())) - trio.lowlevel.reschedule(task, outcome.Value(message)) + raise_cancel = cancel_register[0] + if raise_cancel is None: + task = task_register[0] + trio.lowlevel.reschedule(task, outcome.Value(message)) + else: + message.queue.put_nowait(outcome.capture(raise_cancel)) trio_token.run_sync_soon(in_trio_thread) return message.queue.get().unwrap() From f6b27b239dbf671b644c7dd6adaa9870160e23e8 Mon Sep 17 00:00:00 2001 From: richardsheridan Date: Thu, 9 Mar 2023 20:47:12 -0500 Subject: [PATCH 11/31] Document thread reuse semantics --- trio/_threads.py | 26 ++++++++++++++------------ 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/trio/_threads.py b/trio/_threads.py index 523247748c..1ce9fb03fb 100644 --- a/trio/_threads.py +++ b/trio/_threads.py @@ -418,25 +418,25 @@ def from_thread_run(afn, *args, trio_token=None): RunFinishedError: if the corresponding call to :func:`trio.run` has already completed, or if the run has started its final cleanup phase and can no longer spawn new system tasks. - Cancelled: if the corresponding call to :func:`trio.run` completes + Cancelled: if the corresponding task or call to :func:`trio.run` completes while ``afn(*args)`` is running, then ``afn`` is likely to raise - :exc:`trio.Cancelled`, and this will propagate out into + :exc:`trio.Cancelled`. RuntimeError: if you try calling this from inside the Trio thread, - which would otherwise cause a deadlock. - AttributeError: if no ``trio_token`` was provided, and we can't infer - one from context. + which would otherwise cause a deadlock, or if no ``trio_token`` was + provided, and we can't infer one from context. TypeError: if ``afn`` is not an asynchronous function. **Locating a Trio Token**: There are two ways to specify which `trio.run` loop to reenter: - Spawn this thread from `trio.to_thread.run_sync`. Trio will - automatically capture the relevant Trio token and use it when you - want to re-enter Trio. + automatically capture the relevant Trio token and use it + to re-enter the same Trio task. - Pass a keyword argument, ``trio_token`` specifying a specific `trio.run` loop to re-enter. This is useful in case you have a "foreign" thread, spawned using some other framework, and still want - to enter Trio. + to enter Trio, or if you want to avoid the cancellation context of + `trio.to_thread.run_sync`. """ checked_token = _check_token(trio_token) message_to_trio = Run(afn, args, contextvars.copy_context()) @@ -460,10 +460,11 @@ def from_thread_run_sync(fn, *args, trio_token=None): Raises: RunFinishedError: if the corresponding call to `trio.run` has already completed. + Cancelled: if the corresponding `trio.to_thread.run_sync` task is + cancellable and exits before this function is called RuntimeError: if you try calling this from inside the Trio thread, - which would otherwise cause a deadlock. - AttributeError: if no ``trio_token`` was provided, and we can't infer - one from context. + which would otherwise cause a deadlock or if no ``trio_token`` was + provided, and we can't infer one from context. TypeError: if ``fn`` is an async function. **Locating a Trio Token**: There are two ways to specify which @@ -475,7 +476,8 @@ def from_thread_run_sync(fn, *args, trio_token=None): - Pass a keyword argument, ``trio_token`` specifying a specific `trio.run` loop to re-enter. This is useful in case you have a "foreign" thread, spawned using some other framework, and still want - to enter Trio. + to enter Trio, or if you want to avoid the cancellation context of + `trio.to_thread.run_sync`. """ checked_token = _check_token(trio_token) message_to_trio = RunSync(fn, args, contextvars.copy_context()) From 2b088cc0ff13c7002b79239e962954add73c1522 Mon Sep 17 00:00:00 2001 From: richardsheridan Date: Sat, 11 Mar 2023 00:19:08 -0500 Subject: [PATCH 12/31] test coverage for cancelled host task --- trio/tests/test_threads.py | 25 ++++++++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) diff --git a/trio/tests/test_threads.py b/trio/tests/test_threads.py index 50b15cc6f6..ba4f3d6f39 100644 --- a/trio/tests/test_threads.py +++ b/trio/tests/test_threads.py @@ -11,7 +11,7 @@ import pytest from sniffio import current_async_library_cvar -from .. import CapacityLimiter, Event, _core, sleep +from .. import CapacityLimiter, Event, _core, sleep, sleep_forever, fail_after from .._core.tests.test_ki import ki_self from .._core.tests.tutil import buggy_pypy_asyncgens from .._threads import ( @@ -889,6 +889,29 @@ def get_tid_then_reenter(): assert tid != await to_thread_run_sync(get_tid_then_reenter) +async def test_from_thread_host_cancelled(): + def sync_time_bomb(): + deadline = time.perf_counter() + 10 + while time.perf_counter() < deadline: + from_thread_run_sync(cancel_scope.cancel) + assert False # pragma: no cover + + with _core.CancelScope() as cancel_scope: + await to_thread_run_sync(sync_time_bomb) + + assert cancel_scope.cancelled_caught + + async def async_time_bomb(): + cancel_scope.cancel() + with fail_after(10): + await sleep_forever() + + with _core.CancelScope() as cancel_scope: + await to_thread_run_sync(from_thread_run, async_time_bomb) + + assert cancel_scope.cancelled_caught + + async def test_from_thread_check_cancelled(): q = stdlib_queue.Queue() From 832da418df16121ce22c0112ce276f36fa805a1a Mon Sep 17 00:00:00 2001 From: richardsheridan Date: Sat, 11 Mar 2023 00:45:07 -0500 Subject: [PATCH 13/31] unnest unprotected_fn --- trio/_threads.py | 27 +++++++++++++-------------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/trio/_threads.py b/trio/_threads.py index 1ce9fb03fb..5556e50538 100644 --- a/trio/_threads.py +++ b/trio/_threads.py @@ -100,24 +100,23 @@ class RunSync: context = attr.ib() queue = attr.ib(init=False, factory=stdlib_queue.SimpleQueue) - def run_sync(self): - @disable_ki_protection - def unprotected_fn(): - ret = self.fn(*self.args) + @disable_ki_protection + def unprotected_fn(self): + ret = self.fn(*self.args) - if inspect.iscoroutine(ret): - # Manually close coroutine to avoid RuntimeWarnings - ret.close() - raise TypeError( - "Trio expected a sync function, but {!r} appears to be " - "asynchronous".format(getattr(self.fn, "__qualname__", self.fn)) - ) + if inspect.iscoroutine(ret): + # Manually close coroutine to avoid RuntimeWarnings + ret.close() + raise TypeError( + "Trio expected a sync function, but {!r} appears to be " + "asynchronous".format(getattr(self.fn, "__qualname__", self.fn)) + ) - return ret + return ret + def run_sync(self): self.context.run(current_async_library_cvar.set, "trio") - - result = outcome.capture(self.context.run, unprotected_fn) + result = outcome.capture(self.context.run, self.unprotected_fn) self.queue.put_nowait(result) From 5a10e9b4f2420c6b924acbcacabe82997e69270d Mon Sep 17 00:00:00 2001 From: richardsheridan Date: Sat, 11 Mar 2023 09:20:22 -0500 Subject: [PATCH 14/31] flip _send_message_to_host_task.in_trio_thread semantics from_thread.run_sync now only raises in the cancellable=True case --- trio/_threads.py | 16 +++++++------ trio/tests/test_threads.py | 48 +++++++++++++++++++++++++++++++++----- 2 files changed, 51 insertions(+), 13 deletions(-) diff --git a/trio/_threads.py b/trio/_threads.py index 5556e50538..cd76f1907a 100644 --- a/trio/_threads.py +++ b/trio/_threads.py @@ -367,12 +367,12 @@ def _send_message_to_host_task(message, trio_token): cancel_register = THREAD_LOCAL.cancel_register def in_trio_thread(): - raise_cancel = cancel_register[0] - if raise_cancel is None: - task = task_register[0] - trio.lowlevel.reschedule(task, outcome.Value(message)) - else: + task = task_register[0] + if task is None: + raise_cancel = cancel_register[0] message.queue.put_nowait(outcome.capture(raise_cancel)) + else: + trio.lowlevel.reschedule(task, outcome.Value(message)) trio_token.run_sync_soon(in_trio_thread) return message.queue.get().unwrap() @@ -417,7 +417,9 @@ def from_thread_run(afn, *args, trio_token=None): RunFinishedError: if the corresponding call to :func:`trio.run` has already completed, or if the run has started its final cleanup phase and can no longer spawn new system tasks. - Cancelled: if the corresponding task or call to :func:`trio.run` completes + Cancelled: if the corresponding `trio.to_thread.run_sync` task is + cancellable and exits before this function is called, or + if the task enters cancelled status or call to :func:`trio.run` completes while ``afn(*args)`` is running, then ``afn`` is likely to raise :exc:`trio.Cancelled`. RuntimeError: if you try calling this from inside the Trio thread, @@ -460,7 +462,7 @@ def from_thread_run_sync(fn, *args, trio_token=None): RunFinishedError: if the corresponding call to `trio.run` has already completed. Cancelled: if the corresponding `trio.to_thread.run_sync` task is - cancellable and exits before this function is called + cancellable and exits before this function is called. RuntimeError: if you try calling this from inside the Trio thread, which would otherwise cause a deadlock or if no ``trio_token`` was provided, and we can't infer one from context. diff --git a/trio/tests/test_threads.py b/trio/tests/test_threads.py index ba4f3d6f39..920b3d95f0 100644 --- a/trio/tests/test_threads.py +++ b/trio/tests/test_threads.py @@ -890,16 +890,52 @@ def get_tid_then_reenter(): async def test_from_thread_host_cancelled(): - def sync_time_bomb(): - deadline = time.perf_counter() + 10 - while time.perf_counter() < deadline: - from_thread_run_sync(cancel_scope.cancel) - assert False # pragma: no cover + queue = stdlib_queue.Queue() + + def sync_check(): + from_thread_run_sync(cancel_scope.cancel) + try: + from_thread_run_sync(bool) + except _core.Cancelled: + queue.put(True) + else: + queue.put(False) + + with _core.CancelScope() as cancel_scope: + await to_thread_run_sync(sync_check) + + assert not cancel_scope.cancelled_caught + assert not queue.get_nowait() + + with _core.CancelScope() as cancel_scope: + await to_thread_run_sync(sync_check, cancellable=True) + + assert cancel_scope.cancelled_caught + assert await to_thread_run_sync(partial(queue.get, timeout=1)) + + async def no_checkpoint(): + return True + + def async_check(): + from_thread_run_sync(cancel_scope.cancel) + try: + assert from_thread_run(no_checkpoint) + except _core.Cancelled: + queue.put(True) + else: + queue.put(False) + + with _core.CancelScope() as cancel_scope: + await to_thread_run_sync(async_check) + + assert not cancel_scope.cancelled_caught + assert not queue.get_nowait() with _core.CancelScope() as cancel_scope: - await to_thread_run_sync(sync_time_bomb) + await to_thread_run_sync(async_check, cancellable=True) assert cancel_scope.cancelled_caught + assert await to_thread_run_sync(partial(queue.get, timeout=1)) async def async_time_bomb(): cancel_scope.cancel() From 787d2862c54da56634b98e942fcda3a175421097 Mon Sep 17 00:00:00 2001 From: richardsheridan Date: Sun, 3 Sep 2023 13:59:11 -0400 Subject: [PATCH 15/31] Aesthetic refactor of API functions --- trio/_threads.py | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/trio/_threads.py b/trio/_threads.py index bf8ecd115e..313a4ec189 100644 --- a/trio/_threads.py +++ b/trio/_threads.py @@ -478,13 +478,14 @@ def from_thread_run( to enter Trio, or if you want to avoid the cancellation context of `trio.to_thread.run_sync`. """ - checked_token = _check_token(trio_token) + if trio_token is None: + send_message = _send_message_to_host_task + else: + send_message = _send_message_to_system_task + message_to_trio = Run(afn, args, contextvars.copy_context()) - if trio_token: - return _send_message_to_system_task(message_to_trio, checked_token) - else: - return _send_message_to_host_task(message_to_trio, checked_token) + return send_message(message_to_trio, _check_token(trio_token)) def from_thread_run_sync( @@ -523,10 +524,11 @@ def from_thread_run_sync( to enter Trio, or if you want to avoid the cancellation context of `trio.to_thread.run_sync`. """ - checked_token = _check_token(trio_token) + if trio_token is None: + send_message = _send_message_to_host_task + else: + send_message = _send_message_to_system_task + message_to_trio = RunSync(fn, args, contextvars.copy_context()) - if trio_token: - return _send_message_to_system_task(message_to_trio, checked_token) - else: - return _send_message_to_host_task(message_to_trio, checked_token) + return send_message(message_to_trio, _check_token(trio_token)) From 881180f114f934d4dc3338a067e8003c99b032aa Mon Sep 17 00:00:00 2001 From: richardsheridan Date: Sun, 3 Sep 2023 14:54:13 -0400 Subject: [PATCH 16/31] fix typos in docs --- docs/source/reference-core.rst | 2 +- trio/_threads.py | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/docs/source/reference-core.rst b/docs/source/reference-core.rst index 6d04d3ce4a..b50471646a 100644 --- a/docs/source/reference-core.rst +++ b/docs/source/reference-core.rst @@ -1824,7 +1824,7 @@ to spawn a child thread, and then use a :ref:`memory channel .. literalinclude:: reference-core/from-thread-example.py You can also perform a non-blocking check for cancellation from threads spawned -by func:`trio.to_thread.run_sync`. +by `trio.to_thread.run_sync`. .. autofunction:: trio.from_thread.check_cancelled diff --git a/trio/_threads.py b/trio/_threads.py index 313a4ec189..8f6f849eda 100644 --- a/trio/_threads.py +++ b/trio/_threads.py @@ -351,7 +351,8 @@ def from_thread_check_cancelled() -> None: Only applicable to threads spawned by `trio.to_thread.run_sync`. Poll to allow ``cancellable=False`` threads to raise :exc:`trio.Cancelled` at a suitable - place, or to end abandoned ``cancellable=True`` sooner than they may otherwise. + place, or to end abandoned ``cancellable=True`` threads sooner than they may + otherwise. Raises: Cancelled: If the corresponding call to `trio.to_thread.run_sync` has had a From c0e11d403bc19ca1656d720c52d9ab91ea0297ed Mon Sep 17 00:00:00 2001 From: richardsheridan Date: Sun, 3 Sep 2023 17:28:29 -0400 Subject: [PATCH 17/31] remove extra cvar toggling --- trio/_threads.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/trio/_threads.py b/trio/_threads.py index 8f6f849eda..155c10f4cf 100644 --- a/trio/_threads.py +++ b/trio/_threads.py @@ -95,7 +95,6 @@ async def run(self) -> None: task = trio.lowlevel.current_task() old_context = task.context task.context = self.context.copy() - task.context.run(current_async_library_cvar.set, "trio") try: await trio.lowlevel.cancel_shielded_checkpoint() result = await outcome.acapture(self.unprotected_afn) @@ -133,7 +132,6 @@ def unprotected_fn(self) -> RetT: return ret def run_sync(self) -> None: - self.context.run(current_async_library_cvar.set, "trio") result = outcome.capture(self.context.run, self.unprotected_fn) self.queue.put_nowait(result) From 1930cae14bcef0c2dbe5d964ec49ea4f457ebcd6 Mon Sep 17 00:00:00 2001 From: richardsheridan Date: Sun, 3 Sep 2023 18:52:13 -0400 Subject: [PATCH 18/31] Transmute AttributeError to RuntimeError --- trio/_tests/test_threads.py | 4 ++-- trio/_threads.py | 9 +++++++-- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/trio/_tests/test_threads.py b/trio/_tests/test_threads.py index 0be067da5b..5684e4a62d 100644 --- a/trio/_tests/test_threads.py +++ b/trio/_tests/test_threads.py @@ -1001,9 +1001,9 @@ def f(): # noqa: F811 async def test_from_thread_check_cancelled_raises_in_foreign_threads(): - with pytest.raises(AttributeError): + with pytest.raises(RuntimeError): from_thread_check_cancelled() q = stdlib_queue.Queue() _core.start_thread_soon(from_thread_check_cancelled, lambda _: q.put(_)) - with pytest.raises(AttributeError): + with pytest.raises(RuntimeError): q.get(timeout=1).unwrap() diff --git a/trio/_threads.py b/trio/_threads.py index 155c10f4cf..38931658d1 100644 --- a/trio/_threads.py +++ b/trio/_threads.py @@ -356,9 +356,14 @@ def from_thread_check_cancelled() -> None: Cancelled: If the corresponding call to `trio.to_thread.run_sync` has had a delivery of cancellation attempted against it, regardless of the value of ``cancellable`` supplied as an argument to it. - AttributeError: If this thread is not spawned from `trio.to_thread.run_sync`. + RuntimeError: If this thread is not spawned from `trio.to_thread.run_sync`. """ - raise_cancel = TOKEN_LOCAL.cancel_register[0] + try: + raise_cancel = TOKEN_LOCAL.cancel_register[0] + except AttributeError: + raise RuntimeError( + "this thread wasn't created by Trio, can't check for cancellation" + ) if raise_cancel is not None: raise_cancel() From b02dfe9a0e235e3e41d102b81ff92c47fc529a2d Mon Sep 17 00:00:00 2001 From: richardsheridan Date: Tue, 5 Sep 2023 23:23:40 -0400 Subject: [PATCH 19/31] type consistency between from_thread_run and from_thread_run_sync --- trio/_threads.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/trio/_threads.py b/trio/_threads.py index 38931658d1..8df3a28518 100644 --- a/trio/_threads.py +++ b/trio/_threads.py @@ -494,7 +494,7 @@ def from_thread_run( def from_thread_run_sync( fn: Callable[..., RetT], - *args: tuple[object, ...], + *args: object, trio_token: TrioToken | None = None, ) -> RetT: """Run the given sync function in the parent Trio thread, blocking until it From 1bb9b79219f3cb303e2c59ecda74f96558ab9017 Mon Sep 17 00:00:00 2001 From: richardsheridan Date: Tue, 5 Sep 2023 23:26:11 -0400 Subject: [PATCH 20/31] split up nonblocking send of message from blocking reception of response --- trio/_threads.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/trio/_threads.py b/trio/_threads.py index 8df3a28518..b9aeaf089d 100644 --- a/trio/_threads.py +++ b/trio/_threads.py @@ -399,7 +399,7 @@ def _check_token(trio_token: TrioToken | None) -> TrioToken: def _send_message_to_host_task( message: Run[RetT] | RunSync[RetT], trio_token: TrioToken -) -> RetT: +) -> None: task_register = TOKEN_LOCAL.task_register cancel_register = TOKEN_LOCAL.cancel_register @@ -412,12 +412,11 @@ def in_trio_thread() -> None: trio.lowlevel.reschedule(task, outcome.Value(message)) trio_token.run_sync_soon(in_trio_thread) - return message.queue.get().unwrap() # type: ignore[no-any-return] def _send_message_to_system_task( message: Run[RetT] | RunSync[RetT], trio_token: TrioToken -) -> RetT: +) -> None: if type(message) is RunSync: run_sync = message.run_sync elif type(message) is Run: @@ -437,9 +436,7 @@ def run_sync() -> None: "trio.to_thread.run_sync received unrecognized thread message {!r}." "".format(message) ) - trio_token.run_sync_soon(run_sync) - return message.queue.get().unwrap() # type: ignore[no-any-return] def from_thread_run( @@ -489,7 +486,8 @@ def from_thread_run( message_to_trio = Run(afn, args, contextvars.copy_context()) - return send_message(message_to_trio, _check_token(trio_token)) + send_message(message_to_trio, _check_token(trio_token)) + return message_to_trio.queue.get().unwrap() # type: ignore[no-any-return] def from_thread_run_sync( @@ -535,4 +533,5 @@ def from_thread_run_sync( message_to_trio = RunSync(fn, args, contextvars.copy_context()) - return send_message(message_to_trio, _check_token(trio_token)) + send_message(message_to_trio, _check_token(trio_token)) + return message_to_trio.queue.get().unwrap() # type: ignore[no-any-return] From 689d45ccc047681b08e756cdac13efb5363714be Mon Sep 17 00:00:00 2001 From: richardsheridan Date: Sat, 16 Sep 2023 12:03:17 -0400 Subject: [PATCH 21/31] thread messages do not necessarily have RetT --- trio/_threads.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/trio/_threads.py b/trio/_threads.py index b9aeaf089d..05c296a3b3 100644 --- a/trio/_threads.py +++ b/trio/_threads.py @@ -327,8 +327,8 @@ def abort(raise_cancel: RaiseCancelT) -> trio.lowlevel.Abort: while True: # wait_task_rescheduled return value cannot be typed - msg_from_thread: ThreadDone[RetT] | Run[RetT] | RunSync[ - RetT + msg_from_thread: ThreadDone[RetT] | Run[object] | RunSync[ + object ] = await trio.lowlevel.wait_task_rescheduled(abort) if type(msg_from_thread) is ThreadDone: return msg_from_thread.result.unwrap() # type: ignore[no-any-return] From c1990d4a36848b18bdab09c04c3508b6f34cbbfd Mon Sep 17 00:00:00 2001 From: richardsheridan Date: Sat, 7 Oct 2023 11:27:17 -0400 Subject: [PATCH 22/31] Update docs based on review comments --- docs/source/reference-core.rst | 7 +++++-- newsfragments/2392.feature.rst | 6 +++++- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/docs/source/reference-core.rst b/docs/source/reference-core.rst index b50471646a..104746ddd0 100644 --- a/docs/source/reference-core.rst +++ b/docs/source/reference-core.rst @@ -1823,8 +1823,11 @@ to spawn a child thread, and then use a :ref:`memory channel .. literalinclude:: reference-core/from-thread-example.py -You can also perform a non-blocking check for cancellation from threads spawned -by `trio.to_thread.run_sync`. +You can also use :func:`trio.from_thread.check_cancelled` to check for cancellation from +a thread that was spawned by :func:`trio.to_thread.run_sync`. If the call to +:func:`~trio.to_thread.run_sync` was cancelled, then +:func:`~trio.from_thread.check_cancelled` will raise :func:`trio.Cancelled`. +It's like ``trio.from_thread.run(trio.sleep, 0)``, but much faster. .. autofunction:: trio.from_thread.check_cancelled diff --git a/newsfragments/2392.feature.rst b/newsfragments/2392.feature.rst index 06bf5792d4..985d3235af 100644 --- a/newsfragments/2392.feature.rst +++ b/newsfragments/2392.feature.rst @@ -1 +1,5 @@ -If called from a thread spawned by `trio.to_thread.run_sync`, `trio.from_thread.run` and `trio.from_thread.run_sync` now reuse the task and cancellation status of the host task and have `trio.from_thread.check_cancelled` to efficiently poll for cancellation. +If called from a thread spawned by `trio.to_thread.run_sync`, `trio.from_thread.run` and +`trio.from_thread.run_sync` now reuse the task and cancellation status of the host task; +this means that context variables and cancel scopes naturally propagate 'through' +threads spawned by Trio. You can also use `trio.from_thread.check_cancelled` +to efficiently check for cancellation without reentering the Trio thread. From daed7bbee0c323fc0906fe47ad1e5115ed4d8482 Mon Sep 17 00:00:00 2001 From: richardsheridan Date: Sat, 7 Oct 2023 11:44:34 -0400 Subject: [PATCH 23/31] fiddle type completeness --- trio/_tests/verify_types_darwin.json | 2 +- trio/_tests/verify_types_linux.json | 2 +- trio/_tests/verify_types_windows.json | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/trio/_tests/verify_types_darwin.json b/trio/_tests/verify_types_darwin.json index e83a324714..6625368f20 100644 --- a/trio/_tests/verify_types_darwin.json +++ b/trio/_tests/verify_types_darwin.json @@ -40,7 +40,7 @@ ], "exportedSymbolCounts": { "withAmbiguousType": 0, - "withKnownType": 630, + "withKnownType": 631, "withUnknownType": 0 }, "ignoreUnknownTypesFromImports": true, diff --git a/trio/_tests/verify_types_linux.json b/trio/_tests/verify_types_linux.json index 7c9d745dba..73ee6f7855 100644 --- a/trio/_tests/verify_types_linux.json +++ b/trio/_tests/verify_types_linux.json @@ -28,7 +28,7 @@ ], "exportedSymbolCounts": { "withAmbiguousType": 0, - "withKnownType": 627, + "withKnownType": 628, "withUnknownType": 0 }, "ignoreUnknownTypesFromImports": true, diff --git a/trio/_tests/verify_types_windows.json b/trio/_tests/verify_types_windows.json index a58416fe76..f2c1f0dd6c 100644 --- a/trio/_tests/verify_types_windows.json +++ b/trio/_tests/verify_types_windows.json @@ -64,7 +64,7 @@ ], "exportedSymbolCounts": { "withAmbiguousType": 0, - "withKnownType": 630, + "withKnownType": 631, "withUnknownType": 0 }, "ignoreUnknownTypesFromImports": true, From 2be054aa50e7a6f694af27f24729641dd31d721e Mon Sep 17 00:00:00 2001 From: richardsheridan Date: Sat, 7 Oct 2023 18:46:35 -0400 Subject: [PATCH 24/31] fix test_from_thread_run_during_shutdown --- trio/_tests/test_threads.py | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/trio/_tests/test_threads.py b/trio/_tests/test_threads.py index 5684e4a62d..6cf16b48a8 100644 --- a/trio/_tests/test_threads.py +++ b/trio/_tests/test_threads.py @@ -813,19 +813,23 @@ async def agen(token): try: yield finally: - with pytest.raises(_core.RunFinishedError), _core.CancelScope(shield=True): - await to_thread_run_sync( - partial(from_thread_run, sleep, 0, trio_token=token) - ) - record.append("ok") + with _core.CancelScope(shield=True): + try: + await to_thread_run_sync( + partial(from_thread_run, sleep, 0, trio_token=token) + ) + except _core.RunFinishedError: + record.append("finished") + else: + record.append("clean") async def main(use_system_task): save.append(agen(_core.current_trio_token() if use_system_task else None)) await save[-1].asend(None) _core.run(main, True) # System nursery will be closed and raise RunFinishedError - _core.run(main, False) # host task will not be rescheduled - assert record == ["ok"] + _core.run(main, False) # host task will be rescheduled as normal + assert record == ["finished", "clean"] async def test_trio_token_weak_referenceable(): From a667a52c78a7f38c9a0475f59a6b245c4710c57f Mon Sep 17 00:00:00 2001 From: richardsheridan Date: Sat, 7 Oct 2023 19:12:10 -0400 Subject: [PATCH 25/31] apply nits from code review --- trio/_tests/test_threads.py | 2 +- trio/_threads.py | 35 ++++++++++++++++++----------------- 2 files changed, 19 insertions(+), 18 deletions(-) diff --git a/trio/_tests/test_threads.py b/trio/_tests/test_threads.py index 6cf16b48a8..118d272ba1 100644 --- a/trio/_tests/test_threads.py +++ b/trio/_tests/test_threads.py @@ -644,7 +644,7 @@ async def async_fn(): # pragma: no cover def thread_fn(): from_thread_run_sync(async_fn) - with pytest.raises(TypeError, match="expected a sync function"): + with pytest.raises(TypeError, match="expected a synchronous function"): await to_thread_run_sync(thread_fn) diff --git a/trio/_threads.py b/trio/_threads.py index 05c296a3b3..d2559dbeb6 100644 --- a/trio/_threads.py +++ b/trio/_threads.py @@ -29,15 +29,16 @@ RetT = TypeVar("RetT") -class _TokenLocal(threading.local): - """Global due to Threading API, thread local storage for trio token.""" +class _ParentTaskData(threading.local): + """Global due to Threading API, thread local storage for data related to the + parent task of native Trio threads.""" token: TrioToken cancel_register: list[RaiseCancelT | None] task_register: list[trio.lowlevel.Task | None] -TOKEN_LOCAL = _TokenLocal() +PARENT_TASK_DATA = _ParentTaskData() _limiter_local: RunVar[CapacityLimiter] = RunVar("limiter") # I pulled this number out of the air; it isn't based on anything. Probably we @@ -125,7 +126,7 @@ def unprotected_fn(self) -> RetT: # Manually close coroutine to avoid RuntimeWarnings ret.close() raise TypeError( - "Trio expected a sync function, but {!r} appears to be " + "Trio expected a synchronous function, but {!r} appears to be " "asynchronous".format(getattr(self.fn, "__qualname__", self.fn)) ) @@ -273,9 +274,9 @@ def worker_fn() -> RetT: # the new thread sees that it's not running in async context. current_async_library_cvar.set(None) - TOKEN_LOCAL.token = current_trio_token - TOKEN_LOCAL.cancel_register = cancel_register - TOKEN_LOCAL.task_register = task_register + PARENT_TASK_DATA.token = current_trio_token + PARENT_TASK_DATA.cancel_register = cancel_register + PARENT_TASK_DATA.task_register = task_register try: ret = sync_fn(*args) @@ -289,9 +290,9 @@ def worker_fn() -> RetT: return ret finally: - del TOKEN_LOCAL.token - del TOKEN_LOCAL.cancel_register - del TOKEN_LOCAL.task_register + del PARENT_TASK_DATA.token + del PARENT_TASK_DATA.cancel_register + del PARENT_TASK_DATA.task_register context = contextvars.copy_context() # Partial confuses type checkers, coerce to a callable. @@ -330,11 +331,11 @@ def abort(raise_cancel: RaiseCancelT) -> trio.lowlevel.Abort: msg_from_thread: ThreadDone[RetT] | Run[object] | RunSync[ object ] = await trio.lowlevel.wait_task_rescheduled(abort) - if type(msg_from_thread) is ThreadDone: + if isinstance(msg_from_thread, ThreadDone): return msg_from_thread.result.unwrap() # type: ignore[no-any-return] - elif type(msg_from_thread) is Run: + elif isinstance(msg_from_thread, Run): await msg_from_thread.run() - elif type(msg_from_thread) is RunSync: + elif isinstance(msg_from_thread, RunSync): msg_from_thread.run_sync() else: # pragma: no cover, internal debugging guard raise TypeError( @@ -359,7 +360,7 @@ def from_thread_check_cancelled() -> None: RuntimeError: If this thread is not spawned from `trio.to_thread.run_sync`. """ try: - raise_cancel = TOKEN_LOCAL.cancel_register[0] + raise_cancel = PARENT_TASK_DATA.cancel_register[0] except AttributeError: raise RuntimeError( "this thread wasn't created by Trio, can't check for cancellation" @@ -380,7 +381,7 @@ def _check_token(trio_token: TrioToken | None) -> TrioToken: if trio_token is None: try: - trio_token = TOKEN_LOCAL.token + trio_token = PARENT_TASK_DATA.token except AttributeError: raise RuntimeError( "this thread wasn't created by Trio, pass kwarg trio_token=..." @@ -400,8 +401,8 @@ def _check_token(trio_token: TrioToken | None) -> TrioToken: def _send_message_to_host_task( message: Run[RetT] | RunSync[RetT], trio_token: TrioToken ) -> None: - task_register = TOKEN_LOCAL.task_register - cancel_register = TOKEN_LOCAL.cancel_register + task_register = PARENT_TASK_DATA.task_register + cancel_register = PARENT_TASK_DATA.cancel_register def in_trio_thread() -> None: task = task_register[0] From d6df308cdac5f205f0af376a86b89f407c346c34 Mon Sep 17 00:00:00 2001 From: richardsheridan Date: Sun, 8 Oct 2023 18:48:50 -0400 Subject: [PATCH 26/31] document "extra" checkpoints needed to pick up context --- trio/_threads.py | 1 + 1 file changed, 1 insertion(+) diff --git a/trio/_threads.py b/trio/_threads.py index d2559dbeb6..3e6b388833 100644 --- a/trio/_threads.py +++ b/trio/_threads.py @@ -93,6 +93,7 @@ async def unprotected_afn(self) -> RetT: return await coro async def run(self) -> None: + # we use extra checkpoints to pick up and reset any context changes task = trio.lowlevel.current_task() old_context = task.context task.context = self.context.copy() From 9f4e79e4777af3f9b5dd5b25243e542a9b791a72 Mon Sep 17 00:00:00 2001 From: richardsheridan Date: Sun, 8 Oct 2023 19:17:44 -0400 Subject: [PATCH 27/31] add TODOs for future assert_never type cleverness --- trio/_threads.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/trio/_threads.py b/trio/_threads.py index 3e6b388833..dff4485342 100644 --- a/trio/_threads.py +++ b/trio/_threads.py @@ -338,7 +338,7 @@ def abort(raise_cancel: RaiseCancelT) -> trio.lowlevel.Abort: await msg_from_thread.run() elif isinstance(msg_from_thread, RunSync): msg_from_thread.run_sync() - else: # pragma: no cover, internal debugging guard + else: # pragma: no cover, internal debugging guard TODO: use assert_never raise TypeError( "trio.to_thread.run_sync received unrecognized thread message {!r}." "".format(msg_from_thread) @@ -433,7 +433,7 @@ def run_sync() -> None: outcome.Error(trio.RunFinishedError("system nursery is closed")) ) - else: # pragma: no cover, internal debugging guard + else: # pragma: no cover, internal debugging guard TODO: use assert_never raise TypeError( "trio.to_thread.run_sync received unrecognized thread message {!r}." "".format(message) From 0e18c93c9d4915ab33dda4ec2a5ae98e44fc4cff Mon Sep 17 00:00:00 2001 From: richardsheridan Date: Sun, 8 Oct 2023 19:19:46 -0400 Subject: [PATCH 28/31] implement cancellation semantics suggestions from code review --- trio/_tests/test_threads.py | 5 +++-- trio/_threads.py | 18 ++++++++++++------ 2 files changed, 15 insertions(+), 8 deletions(-) diff --git a/trio/_tests/test_threads.py b/trio/_tests/test_threads.py index 118d272ba1..fb1682984b 100644 --- a/trio/_tests/test_threads.py +++ b/trio/_tests/test_threads.py @@ -879,6 +879,7 @@ def sync_check(): try: from_thread_run_sync(bool) except _core.Cancelled: + # pragma: no cover, sync functions don't raise Cancelled queue.put(True) else: queue.put(False) @@ -893,7 +894,7 @@ def sync_check(): await to_thread_run_sync(sync_check, cancellable=True) assert cancel_scope.cancelled_caught - assert await to_thread_run_sync(partial(queue.get, timeout=1)) + assert not await to_thread_run_sync(partial(queue.get, timeout=1)) async def no_checkpoint(): return True @@ -917,7 +918,7 @@ def async_check(): await to_thread_run_sync(async_check, cancellable=True) assert cancel_scope.cancelled_caught - assert await to_thread_run_sync(partial(queue.get, timeout=1)) + assert not await to_thread_run_sync(partial(queue.get, timeout=1)) async def async_time_bomb(): cancel_scope.cancel() diff --git a/trio/_threads.py b/trio/_threads.py index dff4485342..7620bef55c 100644 --- a/trio/_threads.py +++ b/trio/_threads.py @@ -17,6 +17,7 @@ from trio._core._traps import RaiseCancelT from ._core import ( + CancelScope, RunVar, TrioToken, disable_ki_protection, @@ -86,6 +87,7 @@ class Run(Generic[RetT]): queue: stdlib_queue.SimpleQueue[outcome.Outcome[RetT]] = attr.ib( init=False, factory=stdlib_queue.SimpleQueue ) + scope: CancelScope = attr.ib(init=False, factory=CancelScope) @disable_ki_protection async def unprotected_afn(self) -> RetT: @@ -106,7 +108,12 @@ async def run(self) -> None: await trio.lowlevel.cancel_shielded_checkpoint() async def run_system(self) -> None: - result = await outcome.acapture(self.unprotected_afn) + # NOTE: There is potential here to only conditionally enter a CancelScope + # when we need it, sparing some computation. But doing so adds substantial + # complexity, so we'll leave it until real need is demonstrated. + with self.scope: + result = await outcome.acapture(self.unprotected_afn) + assert not self.scope.cancelled_caught, "any Cancelled should go to our parent" self.queue.put_nowait(result) @@ -403,13 +410,14 @@ def _send_message_to_host_task( message: Run[RetT] | RunSync[RetT], trio_token: TrioToken ) -> None: task_register = PARENT_TASK_DATA.task_register - cancel_register = PARENT_TASK_DATA.cancel_register def in_trio_thread() -> None: task = task_register[0] if task is None: - raise_cancel = cancel_register[0] - message.queue.put_nowait(outcome.capture(raise_cancel)) + # Our parent task is gone! Punt to a system task. + if isinstance(message, Run): + message.scope.cancel() + _send_message_to_system_task(message, trio_token) else: trio.lowlevel.reschedule(task, outcome.Value(message)) @@ -509,8 +517,6 @@ def from_thread_run_sync( Raises: RunFinishedError: if the corresponding call to `trio.run` has already completed. - Cancelled: if the corresponding `trio.to_thread.run_sync` task is - cancellable and exits before this function is called. RuntimeError: if you try calling this from inside the Trio thread, which would otherwise cause a deadlock or if no ``trio_token`` was provided, and we can't infer one from context. From eab30c4b4ce9b0d1cae192e707bf01858d704e8f Mon Sep 17 00:00:00 2001 From: richardsheridan Date: Sun, 8 Oct 2023 19:31:47 -0400 Subject: [PATCH 29/31] adjust coverage pragma --- trio/_tests/test_threads.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/trio/_tests/test_threads.py b/trio/_tests/test_threads.py index fb1682984b..24b450cc59 100644 --- a/trio/_tests/test_threads.py +++ b/trio/_tests/test_threads.py @@ -878,9 +878,8 @@ def sync_check(): from_thread_run_sync(cancel_scope.cancel) try: from_thread_run_sync(bool) - except _core.Cancelled: - # pragma: no cover, sync functions don't raise Cancelled - queue.put(True) + except _core.Cancelled: # pragma: no cover + queue.put(True) # sync functions don't raise Cancelled else: queue.put(False) @@ -903,8 +902,8 @@ def async_check(): from_thread_run_sync(cancel_scope.cancel) try: assert from_thread_run(no_checkpoint) - except _core.Cancelled: - queue.put(True) + except _core.Cancelled: # pragma: no cover + queue.put(True) # async functions raise Cancelled at checkpoints else: queue.put(False) From 2f79f155783bf7d6d68165360f85f3bd54f9934a Mon Sep 17 00:00:00 2001 From: richardsheridan Date: Sun, 15 Oct 2023 12:23:08 -0400 Subject: [PATCH 30/31] revise and document cancellation semantics in short, cancellable threads always use system tasks. normal threads use the host task, unless passed a token --- docs/source/reference-core.rst | 13 ++- trio/_tests/test_threads.py | 34 ++++--- trio/_threads.py | 165 ++++++++++++++++----------------- 3 files changed, 110 insertions(+), 102 deletions(-) diff --git a/docs/source/reference-core.rst b/docs/source/reference-core.rst index db1a93f121..160fa0fe97 100644 --- a/docs/source/reference-core.rst +++ b/docs/source/reference-core.rst @@ -1823,9 +1823,20 @@ to spawn a child thread, and then use a :ref:`memory channel .. literalinclude:: reference-core/from-thread-example.py +.. note:: + + The ``from_thread.run*`` functions reuse the host task that called + :func:`trio.to_thread.run_sync` to run your provided function in the typical case, + namely when ``cancellable=False`` so Trio can be sure that the task will always be + around to perform the work. If you pass ``cancellable=True`` at the outset, or if + you provide a :class:`~trio.lowlevel.TrioToken` when calling back in to Trio, your + functions will be executed in a new system task. Therefore, the + :func:`~trio.lowlevel.current_task`, :func:`current_effective_deadline`, or other + task-tree specific values may differ depending on keyword argument values. + You can also use :func:`trio.from_thread.check_cancelled` to check for cancellation from a thread that was spawned by :func:`trio.to_thread.run_sync`. If the call to -:func:`~trio.to_thread.run_sync` was cancelled, then +:func:`~trio.to_thread.run_sync` was cancelled (even if ``cancellable=False``!), then :func:`~trio.from_thread.check_cancelled` will raise :func:`trio.Cancelled`. It's like ``trio.from_thread.run(trio.sleep, 0)``, but much faster. diff --git a/trio/_tests/test_threads.py b/trio/_tests/test_threads.py index 24b450cc59..a151c03077 100644 --- a/trio/_tests/test_threads.py +++ b/trio/_tests/test_threads.py @@ -933,14 +933,16 @@ async def async_time_bomb(): async def test_from_thread_check_cancelled(): q = stdlib_queue.Queue() - async def child(cancellable): - record.append("start") - try: - return await to_thread_run_sync(f, cancellable=cancellable) - except _core.Cancelled: - record.append("cancel") - finally: - record.append("exit") + async def child(cancellable, scope): + with scope: + record.append("start") + try: + return await to_thread_run_sync(f, cancellable=cancellable) + except _core.Cancelled: + record.append("cancel") + raise + finally: + record.append("exit") def f(): try: @@ -956,7 +958,7 @@ def f(): record = [] ev = threading.Event() async with _core.open_nursery() as nursery: - nursery.start_soon(child, False) + nursery.start_soon(child, False, _core.CancelScope()) await wait_all_tasks_blocked() assert record[0] == "start" assert q.get(timeout=1) == "Not Cancelled" @@ -968,14 +970,15 @@ def f(): # the appropriate cancel scope record = [] ev = threading.Event() + scope = _core.CancelScope() # Nursery cancel scope gives false positives async with _core.open_nursery() as nursery: - nursery.start_soon(child, False) + nursery.start_soon(child, False, scope) await wait_all_tasks_blocked() assert record[0] == "start" assert q.get(timeout=1) == "Not Cancelled" - nursery.cancel_scope.cancel() + scope.cancel() ev.set() - assert nursery.cancel_scope.cancelled_caught + assert scope.cancelled_caught assert "cancel" in record assert record[-1] == "exit" @@ -992,13 +995,14 @@ def f(): # noqa: F811 record = [] ev = threading.Event() + scope = _core.CancelScope() async with _core.open_nursery() as nursery: - nursery.start_soon(child, True) + nursery.start_soon(child, True, scope) await wait_all_tasks_blocked() assert record[0] == "start" - nursery.cancel_scope.cancel() + scope.cancel() ev.set() - assert nursery.cancel_scope.cancelled_caught + assert scope.cancelled_caught assert "cancel" in record assert record[-1] == "exit" assert q.get(timeout=1) == "Cancelled" diff --git a/trio/_threads.py b/trio/_threads.py index 7620bef55c..a7ad9c5969 100644 --- a/trio/_threads.py +++ b/trio/_threads.py @@ -17,7 +17,6 @@ from trio._core._traps import RaiseCancelT from ._core import ( - CancelScope, RunVar, TrioToken, disable_ki_protection, @@ -35,6 +34,7 @@ class _ParentTaskData(threading.local): parent task of native Trio threads.""" token: TrioToken + abandon_on_cancel: bool cancel_register: list[RaiseCancelT | None] task_register: list[trio.lowlevel.Task | None] @@ -74,11 +74,6 @@ class ThreadPlaceholder: # Types for the to_thread_run_sync message loop -@attr.s(frozen=True, eq=False) -class ThreadDone(Generic[RetT]): - result: outcome.Outcome[RetT] = attr.ib() - - @attr.s(frozen=True, eq=False) class Run(Generic[RetT]): afn: Callable[..., Awaitable[RetT]] = attr.ib() @@ -87,7 +82,6 @@ class Run(Generic[RetT]): queue: stdlib_queue.SimpleQueue[outcome.Outcome[RetT]] = attr.ib( init=False, factory=stdlib_queue.SimpleQueue ) - scope: CancelScope = attr.ib(init=False, factory=CancelScope) @disable_ki_protection async def unprotected_afn(self) -> RetT: @@ -108,14 +102,32 @@ async def run(self) -> None: await trio.lowlevel.cancel_shielded_checkpoint() async def run_system(self) -> None: - # NOTE: There is potential here to only conditionally enter a CancelScope - # when we need it, sparing some computation. But doing so adds substantial - # complexity, so we'll leave it until real need is demonstrated. - with self.scope: - result = await outcome.acapture(self.unprotected_afn) - assert not self.scope.cancelled_caught, "any Cancelled should go to our parent" + result = await outcome.acapture(self.unprotected_afn) self.queue.put_nowait(result) + def run_in_host_task(self, token: TrioToken) -> None: + task_register = PARENT_TASK_DATA.task_register + + def in_trio_thread() -> None: + task = task_register[0] + assert task is not None, "guaranteed by abandon_on_cancel semantics" + trio.lowlevel.reschedule(task, outcome.Value(self)) + + token.run_sync_soon(in_trio_thread) + + def run_in_system_nursery(self, token: TrioToken) -> None: + def in_trio_thread() -> None: + try: + trio.lowlevel.spawn_system_task( + self.run, name=self.afn, context=self.context + ) + except RuntimeError: # system nursery is closed + self.queue.put_nowait( + outcome.Error(trio.RunFinishedError("system nursery is closed")) + ) + + token.run_sync_soon(in_trio_thread) + @attr.s(frozen=True, eq=False) class RunSync(Generic[RetT]): @@ -144,6 +156,19 @@ def run_sync(self) -> None: result = outcome.capture(self.context.run, self.unprotected_fn) self.queue.put_nowait(result) + def run_in_host_task(self, token: TrioToken) -> None: + task_register = PARENT_TASK_DATA.task_register + + def in_trio_thread() -> None: + task = task_register[0] + assert task is not None, "guaranteed by abandon_on_cancel semantics" + trio.lowlevel.reschedule(task, outcome.Value(self)) + + token.run_sync_soon(in_trio_thread) + + def run_in_system_nursery(self, token: TrioToken) -> None: + token.run_sync_soon(self.run_sync) + @enable_ki_protection # Decorator used on function with Coroutine[Any, Any, RetT] async def to_thread_run_sync( # type: ignore[misc] @@ -237,7 +262,7 @@ async def to_thread_run_sync( # type: ignore[misc] """ await trio.lowlevel.checkpoint_if_cancelled() - cancellable = bool(cancellable) # raise early if cancellable.__bool__ raises + abandon_on_cancel = bool(cancellable) # raise early if cancellable.__bool__ raises if limiter is None: limiter = current_default_thread_limiter() @@ -266,9 +291,7 @@ def do_release_then_return_result() -> RetT: result = outcome.capture(do_release_then_return_result) if task_register[0] is not None: - trio.lowlevel.reschedule( - task_register[0], outcome.Value(ThreadDone(result)) - ) + trio.lowlevel.reschedule(task_register[0], outcome.Value(result)) current_trio_token = trio.lowlevel.current_trio_token() @@ -283,6 +306,7 @@ def worker_fn() -> RetT: current_async_library_cvar.set(None) PARENT_TASK_DATA.token = current_trio_token + PARENT_TASK_DATA.abandon_on_cancel = abandon_on_cancel PARENT_TASK_DATA.cancel_register = cancel_register PARENT_TASK_DATA.task_register = task_register try: @@ -299,6 +323,7 @@ def worker_fn() -> RetT: return ret finally: del PARENT_TASK_DATA.token + del PARENT_TASK_DATA.abandon_on_cancel del PARENT_TASK_DATA.cancel_register del PARENT_TASK_DATA.task_register @@ -327,7 +352,7 @@ def deliver_worker_fn_result(result: outcome.Outcome[RetT]) -> None: def abort(raise_cancel: RaiseCancelT) -> trio.lowlevel.Abort: # fill so from_thread_check_cancelled can raise cancel_register[0] = raise_cancel - if cancellable: + if abandon_on_cancel: # empty so report_back_in_trio_thread_fn cannot reschedule task_register[0] = None return trio.lowlevel.Abort.SUCCEEDED @@ -336,11 +361,11 @@ def abort(raise_cancel: RaiseCancelT) -> trio.lowlevel.Abort: while True: # wait_task_rescheduled return value cannot be typed - msg_from_thread: ThreadDone[RetT] | Run[object] | RunSync[ + msg_from_thread: outcome.Outcome[RetT] | Run[object] | RunSync[ object ] = await trio.lowlevel.wait_task_rescheduled(abort) - if isinstance(msg_from_thread, ThreadDone): - return msg_from_thread.result.unwrap() # type: ignore[no-any-return] + if isinstance(msg_from_thread, outcome.Outcome): + return msg_from_thread.unwrap() # type: ignore[no-any-return] elif isinstance(msg_from_thread, Run): await msg_from_thread.run() elif isinstance(msg_from_thread, RunSync): @@ -354,10 +379,10 @@ def abort(raise_cancel: RaiseCancelT) -> trio.lowlevel.Abort: def from_thread_check_cancelled() -> None: - """Raise trio.Cancelled if the associated Trio task entered a cancelled status. + """Raise `trio.Cancelled` if the associated Trio task entered a cancelled status. Only applicable to threads spawned by `trio.to_thread.run_sync`. Poll to allow - ``cancellable=False`` threads to raise :exc:`trio.Cancelled` at a suitable + ``cancellable=False`` threads to raise :exc:`~trio.Cancelled` at a suitable place, or to end abandoned ``cancellable=True`` threads sooner than they may otherwise. @@ -366,6 +391,13 @@ def from_thread_check_cancelled() -> None: delivery of cancellation attempted against it, regardless of the value of ``cancellable`` supplied as an argument to it. RuntimeError: If this thread is not spawned from `trio.to_thread.run_sync`. + + .. note:: + + The check for cancellation attempts of ``cancellable=False`` threads is + interrupted while executing ``from_thread.run*`` functions, which can lead to + edge cases where this function may raise or not depending on the timing of + :class:`~trio.CancelScope` shields being raised or lowered in the Trio threads. """ try: raise_cancel = PARENT_TASK_DATA.cancel_register[0] @@ -406,49 +438,6 @@ def _check_token(trio_token: TrioToken | None) -> TrioToken: return trio_token -def _send_message_to_host_task( - message: Run[RetT] | RunSync[RetT], trio_token: TrioToken -) -> None: - task_register = PARENT_TASK_DATA.task_register - - def in_trio_thread() -> None: - task = task_register[0] - if task is None: - # Our parent task is gone! Punt to a system task. - if isinstance(message, Run): - message.scope.cancel() - _send_message_to_system_task(message, trio_token) - else: - trio.lowlevel.reschedule(task, outcome.Value(message)) - - trio_token.run_sync_soon(in_trio_thread) - - -def _send_message_to_system_task( - message: Run[RetT] | RunSync[RetT], trio_token: TrioToken -) -> None: - if type(message) is RunSync: - run_sync = message.run_sync - elif type(message) is Run: - - def run_sync() -> None: - try: - trio.lowlevel.spawn_system_task( - message.run_system, name=message.afn, context=message.context - ) - except RuntimeError: # system nursery is closed - message.queue.put_nowait( - outcome.Error(trio.RunFinishedError("system nursery is closed")) - ) - - else: # pragma: no cover, internal debugging guard TODO: use assert_never - raise TypeError( - "trio.to_thread.run_sync received unrecognized thread message {!r}." - "".format(message) - ) - trio_token.run_sync_soon(run_sync) - - def from_thread_run( afn: Callable[..., Awaitable[RetT]], *args: object, @@ -467,17 +456,15 @@ def from_thread_run( RunFinishedError: if the corresponding call to :func:`trio.run` has already completed, or if the run has started its final cleanup phase and can no longer spawn new system tasks. - Cancelled: if the corresponding `trio.to_thread.run_sync` task is - cancellable and exits before this function is called, or - if the task enters cancelled status or call to :func:`trio.run` completes - while ``afn(*args)`` is running, then ``afn`` is likely to raise + Cancelled: if the task enters cancelled status or call to :func:`trio.run` + completes while ``afn(*args)`` is running, then ``afn`` is likely to raise :exc:`trio.Cancelled`. RuntimeError: if you try calling this from inside the Trio thread, which would otherwise cause a deadlock, or if no ``trio_token`` was provided, and we can't infer one from context. TypeError: if ``afn`` is not an asynchronous function. - **Locating a Trio Token**: There are two ways to specify which + **Locating a TrioToken**: There are two ways to specify which `trio.run` loop to reenter: - Spawn this thread from `trio.to_thread.run_sync`. Trio will @@ -486,17 +473,20 @@ def from_thread_run( - Pass a keyword argument, ``trio_token`` specifying a specific `trio.run` loop to re-enter. This is useful in case you have a "foreign" thread, spawned using some other framework, and still want - to enter Trio, or if you want to avoid the cancellation context of - `trio.to_thread.run_sync`. + to enter Trio, or if you want to use a new system task to call ``afn``, + maybe to avoid the cancellation context of a corresponding + `trio.to_thread.run_sync` task. """ - if trio_token is None: - send_message = _send_message_to_host_task - else: - send_message = _send_message_to_system_task + token_provided = trio_token is not None + trio_token = _check_token(trio_token) message_to_trio = Run(afn, args, contextvars.copy_context()) - send_message(message_to_trio, _check_token(trio_token)) + if token_provided or PARENT_TASK_DATA.abandon_on_cancel: + message_to_trio.run_in_system_nursery(trio_token) + else: + message_to_trio.run_in_host_task(trio_token) + return message_to_trio.queue.get().unwrap() # type: ignore[no-any-return] @@ -522,7 +512,7 @@ def from_thread_run_sync( provided, and we can't infer one from context. TypeError: if ``fn`` is an async function. - **Locating a Trio Token**: There are two ways to specify which + **Locating a TrioToken**: There are two ways to specify which `trio.run` loop to reenter: - Spawn this thread from `trio.to_thread.run_sync`. Trio will @@ -531,15 +521,18 @@ def from_thread_run_sync( - Pass a keyword argument, ``trio_token`` specifying a specific `trio.run` loop to re-enter. This is useful in case you have a "foreign" thread, spawned using some other framework, and still want - to enter Trio, or if you want to avoid the cancellation context of - `trio.to_thread.run_sync`. + to enter Trio, or if you want to use a new system task to call ``fn``, + maybe to avoid the cancellation context of a corresponding + `trio.to_thread.run_sync` task. """ - if trio_token is None: - send_message = _send_message_to_host_task - else: - send_message = _send_message_to_system_task + token_provided = trio_token is not None + trio_token = _check_token(trio_token) message_to_trio = RunSync(fn, args, contextvars.copy_context()) - send_message(message_to_trio, _check_token(trio_token)) + if token_provided or PARENT_TASK_DATA.abandon_on_cancel: + message_to_trio.run_in_system_nursery(trio_token) + else: + message_to_trio.run_in_host_task(trio_token) + return message_to_trio.queue.get().unwrap() # type: ignore[no-any-return] From 96e45c5ffc5f9694dd9eb463c638794c04fa8947 Mon Sep 17 00:00:00 2001 From: richardsheridan Date: Tue, 17 Oct 2023 21:50:28 -0400 Subject: [PATCH 31/31] Apply suggestions from code review Documentation clarifications Fix function name typo leading to missed coverage Co-authored-by: Joshua Oreman --- docs/source/reference-core.rst | 4 ++-- trio/_threads.py | 21 +++++++++++++++------ 2 files changed, 17 insertions(+), 8 deletions(-) diff --git a/docs/source/reference-core.rst b/docs/source/reference-core.rst index 160fa0fe97..333cb0537b 100644 --- a/docs/source/reference-core.rst +++ b/docs/source/reference-core.rst @@ -1826,8 +1826,8 @@ to spawn a child thread, and then use a :ref:`memory channel .. note:: The ``from_thread.run*`` functions reuse the host task that called - :func:`trio.to_thread.run_sync` to run your provided function in the typical case, - namely when ``cancellable=False`` so Trio can be sure that the task will always be + :func:`trio.to_thread.run_sync` to run your provided function, as long as you're + using the default ``cancellable=False`` so Trio can be sure that the task will remain around to perform the work. If you pass ``cancellable=True`` at the outset, or if you provide a :class:`~trio.lowlevel.TrioToken` when calling back in to Trio, your functions will be executed in a new system task. Therefore, the diff --git a/trio/_threads.py b/trio/_threads.py index a7ad9c5969..7649587751 100644 --- a/trio/_threads.py +++ b/trio/_threads.py @@ -119,7 +119,7 @@ def run_in_system_nursery(self, token: TrioToken) -> None: def in_trio_thread() -> None: try: trio.lowlevel.spawn_system_task( - self.run, name=self.afn, context=self.context + self.run_system, name=self.afn, context=self.context ) except RuntimeError: # system nursery is closed self.queue.put_nowait( @@ -394,10 +394,16 @@ def from_thread_check_cancelled() -> None: .. note:: - The check for cancellation attempts of ``cancellable=False`` threads is - interrupted while executing ``from_thread.run*`` functions, which can lead to - edge cases where this function may raise or not depending on the timing of - :class:`~trio.CancelScope` shields being raised or lowered in the Trio threads. + To be precise, :func:`~trio.from_thread.check_cancelled` checks whether the task + running :func:`trio.to_thread.run_sync` has ever been cancelled since the last + time it was running a :func:`trio.from_thread.run` or :func:`trio.from_thread.run_sync` + function. It may raise `trio.Cancelled` even if a cancellation occurred that was + later hidden by a modification to `trio.CancelScope.shield` between the cancelled + `~trio.CancelScope` and :func:`trio.to_thread.run_sync`. This differs from the + behavior of normal Trio checkpoints, which raise `~trio.Cancelled` only if the + cancellation is still active when the checkpoint executes. The distinction here is + *exceedingly* unlikely to be relevant to your application, but we mention it + for completeness. """ try: raise_cancel = PARENT_TASK_DATA.cancel_register[0] @@ -456,7 +462,10 @@ def from_thread_run( RunFinishedError: if the corresponding call to :func:`trio.run` has already completed, or if the run has started its final cleanup phase and can no longer spawn new system tasks. - Cancelled: if the task enters cancelled status or call to :func:`trio.run` + Cancelled: If the original call to :func:`trio.to_thread.run_sync` is cancelled + (if *trio_token* is None) or the call to :func:`trio.run` completes + (if *trio_token* is not None) while ``afn(*args)`` is running, + then *afn* is likely to raise completes while ``afn(*args)`` is running, then ``afn`` is likely to raise :exc:`trio.Cancelled`. RuntimeError: if you try calling this from inside the Trio thread,