Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expand cancellation usability from native trio threads #2392

Merged
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
d9d02db
Add tests and stub for thread cancellation and reuse.
richardsheridan Jul 22, 2022
0c0ff10
make from_thread tests pass
richardsheridan Mar 4, 2023
5f86fb2
remove failing tests and untested feature
richardsheridan Mar 4, 2023
5f75c86
refactor system tasks to use messages as well
richardsheridan Mar 4, 2023
7ccf0f7
factor out trio_token checking logic
richardsheridan Mar 9, 2023
6d3b2d6
Add trio.from_thread.check_cancelled api to allow threads to efficien…
richardsheridan Aug 8, 2022
2d911ea
Document trio.from_thread.check_cancelled
richardsheridan Aug 8, 2022
bc6c82a
Add Newsfragment
richardsheridan Aug 8, 2022
644b913
Linting and fix docs
richardsheridan Aug 8, 2022
b87456f
use cancel_register in _send_message_to_host_task
richardsheridan Mar 9, 2023
f6b27b2
Document thread reuse semantics
richardsheridan Mar 10, 2023
2b088cc
test coverage for cancelled host task
richardsheridan Mar 11, 2023
832da41
unnest unprotected_fn
richardsheridan Mar 11, 2023
5a10e9b
flip _send_message_to_host_task.in_trio_thread semantics
richardsheridan Mar 11, 2023
a09aa84
Merge branch 'master' into from_thread_check_cancelled
richardsheridan Sep 3, 2023
787d286
Aesthetic refactor of API functions
richardsheridan Sep 3, 2023
881180f
fix typos in docs
richardsheridan Sep 3, 2023
c0e11d4
remove extra cvar toggling
richardsheridan Sep 3, 2023
1930cae
Transmute AttributeError to RuntimeError
richardsheridan Sep 3, 2023
b02dfe9
type consistency between from_thread_run and from_thread_run_sync
richardsheridan Sep 6, 2023
1bb9b79
split up nonblocking send of message from blocking reception of response
richardsheridan Sep 6, 2023
689d45c
thread messages do not necessarily have RetT
richardsheridan Sep 16, 2023
c1990d4
Update docs based on review comments
richardsheridan Oct 7, 2023
d4d10bb
Merge remote-tracking branch 'upstream/master' into from_thread_check…
richardsheridan Oct 7, 2023
daed7bb
fiddle type completeness
richardsheridan Oct 7, 2023
2be054a
fix test_from_thread_run_during_shutdown
richardsheridan Oct 7, 2023
a667a52
apply nits from code review
richardsheridan Oct 7, 2023
d6df308
document "extra" checkpoints needed to pick up context
richardsheridan Oct 8, 2023
9f4e79e
add TODOs for future assert_never type cleverness
richardsheridan Oct 8, 2023
0e18c93
implement cancellation semantics suggestions from code review
richardsheridan Oct 8, 2023
eab30c4
adjust coverage pragma
richardsheridan Oct 8, 2023
2f79f15
revise and document cancellation semantics
richardsheridan Oct 15, 2023
96e45c5
Apply suggestions from code review
richardsheridan Oct 18, 2023
ab092b0
Merge branch 'master' into from_thread_check_cancelled
richardsheridan Oct 18, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/source/reference-core.rst
Expand Up @@ -1823,6 +1823,11 @@ to spawn a child thread, and then use a :ref:`memory channel

.. literalinclude:: reference-core/from-thread-example.py

You can also perform a non-blocking check for cancellation from threads spawned
richardsheridan marked this conversation as resolved.
Show resolved Hide resolved
by `trio.to_thread.run_sync`.

.. autofunction:: trio.from_thread.check_cancelled

Threads and task-local storage
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Expand Down
1 change: 1 addition & 0 deletions newsfragments/2392.feature.rst
@@ -0,0 +1 @@
If called from a thread spawned by `trio.to_thread.run_sync`, `trio.from_thread.run` and `trio.from_thread.run_sync` now reuse the task and cancellation status of the host task and have `trio.from_thread.check_cancelled` to efficiently poll for cancellation.
richardsheridan marked this conversation as resolved.
Show resolved Hide resolved
185 changes: 175 additions & 10 deletions trio/_tests/test_threads.py
Expand Up @@ -13,13 +13,12 @@
import pytest
import sniffio

from trio._core import TrioToken, current_trio_token

from .. import CapacityLimiter, Event, _core, sleep
from .. import CapacityLimiter, Event, _core, fail_after, sleep, sleep_forever
from .._core._tests.test_ki import ki_self
from .._core._tests.tutil import buggy_pypy_asyncgens
from .._threads import (
current_default_thread_limiter,
from_thread_check_cancelled,
from_thread_run,
from_thread_run_sync,
to_thread_run_sync,
Expand Down Expand Up @@ -810,25 +809,28 @@ def test_from_thread_run_during_shutdown():
save = []
record = []

async def agen():
async def agen(token):
try:
yield
finally:
with pytest.raises(_core.RunFinishedError), _core.CancelScope(shield=True):
await to_thread_run_sync(from_thread_run, sleep, 0)
await to_thread_run_sync(
partial(from_thread_run, sleep, 0, trio_token=token)
)
record.append("ok")

async def main():
save.append(agen())
async def main(use_system_task):
save.append(agen(_core.current_trio_token() if use_system_task else None))
await save[-1].asend(None)

_core.run(main)
_core.run(main, True) # System nursery will be closed and raise RunFinishedError
_core.run(main, False) # host task will not be rescheduled
assert record == ["ok"]
richardsheridan marked this conversation as resolved.
Show resolved Hide resolved


async def test_trio_token_weak_referenceable():
token = current_trio_token()
assert isinstance(token, TrioToken)
token = _core.current_trio_token()
assert isinstance(token, _core.TrioToken)
weak_reference = weakref.ref(token)
assert token is weak_reference()

Expand All @@ -842,3 +844,166 @@ def __bool__(self):

with pytest.raises(NotImplementedError):
await to_thread_run_sync(int, cancellable=BadBool())


async def test_from_thread_reuses_task():
task = _core.current_task()

async def async_current_task():
return _core.current_task()

assert task is await to_thread_run_sync(from_thread_run_sync, _core.current_task)
assert task is await to_thread_run_sync(from_thread_run, async_current_task)


async def test_recursive_to_thread():
tid = None

def get_tid_then_reenter():
nonlocal tid
tid = threading.get_ident()
return from_thread_run(to_thread_run_sync, threading.get_ident)

assert tid != await to_thread_run_sync(get_tid_then_reenter)


async def test_from_thread_host_cancelled():
queue = stdlib_queue.Queue()

def sync_check():
richardsheridan marked this conversation as resolved.
Show resolved Hide resolved
from_thread_run_sync(cancel_scope.cancel)
try:
from_thread_run_sync(bool)
except _core.Cancelled:
queue.put(True)
else:
queue.put(False)

with _core.CancelScope() as cancel_scope:
await to_thread_run_sync(sync_check)

assert not cancel_scope.cancelled_caught
assert not queue.get_nowait()

with _core.CancelScope() as cancel_scope:
await to_thread_run_sync(sync_check, cancellable=True)

assert cancel_scope.cancelled_caught
assert await to_thread_run_sync(partial(queue.get, timeout=1))
Copy link
Member

@oremanj oremanj Oct 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, this suggests that from_thread.run_sync does raise Cancelled if the thread was abandoned. Where is that coming from? It can't be from the cancel scope, because the cancel scope doesn't exist anymore (in the general case). If it's a general "this thread was abandoned" marker, then that's mildly backcompat-breaking and I think I disprefer it regardless - "sync functions have no checkpoints and therefore can't be the cause of a Cancelled" is a nice and easy rule to remember and works better if we don't have corner-case exceptions to it.

I'm not sure how async from_thread.run from an abandoned thread should work. Options I see are:

  • run it in a system task in a cancelled cancel scope (and raise Cancelled in the thread if the cancel scope catches a Cancelled)
  • run it in a system task that isn't cancelled, like we did before this PR
  • raise Cancelled immediately

I would prefer one of the first two, since the third is unlike anything done elsewhere in Trio (Cancelled is raised at checkpoints, entry to an async function is not a checkpoint) and makes it hard to hand off resources safely (if you 'own' a resource and you pass it to a task that begins [async] with resource:, then that's normally safe but wouldn't be with the new semantics because Cancelled could be raised before the recipient enters the context manager).

Regardless of what we choose, we need to document it carefully, because it's a subtle consideration.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I could on board for "run in cancelled system task" but documenting the nuances of "if you abandon your thread, your async functions are going to run in a weird different task" seems really hard.

What do you think of re-using RunFinishedError or making a TaskFinishedError to raise immediately? This would at least not trample on the semantics of Cancelled, but it wouldn't help with the resource handoff issue (on the other hand, users must already handle RunFinishedError in the present case).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I implemented the semantics I think you expected, and there were enough degrees of freedom in the test suite that it didn't need much adjustment. That makes me think that we should think of a way to (a) document the surprising abandon-to-system-task semantics and (b) anchor them in the test suite. I can't yet think of which behaviors really should be represented that way though.

Copy link
Member

@oremanj oremanj Oct 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, on further consideration I agree my previous proposal is fairly wonky and we can probably do better. I think my preference would actually be to have cancellable=True threads use the old semantics (whether they've actually been cancelled yet or not): don't pass cancellation, don't run from_thread.run reentrant calls in the to_thread.run_sync task, just use a system task (that is not cancelled) like we did before this PR. Rationale:

  • Making stuff fail that worked before isn't great for backcompat. If someone is using the explicit "abandon thread when I'm cancelled" flag, then they might specifically want the abandoned thread to continue operating; our attempt to interrupt it as soon as we regain control might be counterproductive.
  • Since the cancellable=True thread explicitly might outlive the original task, any attempt it makes to call back into Trio is probably for something unrelated to the original task's context, so it's strange for that attempt to reflect the cancellation status of the original task.
  • I imagine a conceptual distinction between cancellable=False threads being an "extension of the current task", vs cancellable=True being "fire-and-forget + retrieve the result if you're still around when it becomes available". It just feels wrong to propagate cancellation in the latter case.

This discussion is underscoring for me that cancellable=True is a bad name; cancellable=False threads are arguably more cancellable! My first thought for a replacement is detachable but maybe you have a better idea. Doesn't need to go in this PR but maybe should arrive in the same release to reduce confusion.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was going to suggest a new, overriding kwarg abandon_on_cancel. That tells you what happens and when, and my multiprocessing library could use kill_on_cancel instead. cancellable would then hang around indefinitely, deprecated but with unchanged semantics.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm equally happy with abandon_on_cancel as a name.


async def no_checkpoint():
return True

def async_check():
from_thread_run_sync(cancel_scope.cancel)
try:
assert from_thread_run(no_checkpoint)
except _core.Cancelled:
queue.put(True)
else:
queue.put(False)

with _core.CancelScope() as cancel_scope:
await to_thread_run_sync(async_check)

assert not cancel_scope.cancelled_caught
assert not queue.get_nowait()

with _core.CancelScope() as cancel_scope:
await to_thread_run_sync(async_check, cancellable=True)

assert cancel_scope.cancelled_caught
assert await to_thread_run_sync(partial(queue.get, timeout=1))

async def async_time_bomb():
cancel_scope.cancel()
with fail_after(10):
await sleep_forever()

with _core.CancelScope() as cancel_scope:
await to_thread_run_sync(from_thread_run, async_time_bomb)

assert cancel_scope.cancelled_caught


async def test_from_thread_check_cancelled():
q = stdlib_queue.Queue()

async def child(cancellable):
record.append("start")
try:
return await to_thread_run_sync(f, cancellable=cancellable)
except _core.Cancelled:
record.append("cancel")
finally:
record.append("exit")

def f():
try:
from_thread_check_cancelled()
except _core.Cancelled: # pragma: no cover, test failure path
q.put("Cancelled")
else:
q.put("Not Cancelled")
ev.wait()
return from_thread_check_cancelled()

# Base case: nothing cancelled so we shouldn't see cancels anywhere
record = []
ev = threading.Event()
async with _core.open_nursery() as nursery:
nursery.start_soon(child, False)
await wait_all_tasks_blocked()
assert record[0] == "start"
assert q.get(timeout=1) == "Not Cancelled"
ev.set()
# implicit assertion, Cancelled not raised via nursery
assert record[1] == "exit"

# cancellable=False case: a cancel will pop out but be handled by
# the appropriate cancel scope
record = []
ev = threading.Event()
async with _core.open_nursery() as nursery:
nursery.start_soon(child, False)
await wait_all_tasks_blocked()
assert record[0] == "start"
assert q.get(timeout=1) == "Not Cancelled"
nursery.cancel_scope.cancel()
ev.set()
assert nursery.cancel_scope.cancelled_caught
assert "cancel" in record
assert record[-1] == "exit"

# cancellable=True case: slightly different thread behavior needed
# check thread is cancelled "soon" after abandonment
def f(): # noqa: F811
ev.wait()
try:
from_thread_check_cancelled()
except _core.Cancelled:
q.put("Cancelled")
else: # pragma: no cover, test failure path
q.put("Not Cancelled")

record = []
ev = threading.Event()
async with _core.open_nursery() as nursery:
nursery.start_soon(child, True)
await wait_all_tasks_blocked()
assert record[0] == "start"
nursery.cancel_scope.cancel()
ev.set()
assert nursery.cancel_scope.cancelled_caught
assert "cancel" in record
assert record[-1] == "exit"
assert q.get(timeout=1) == "Cancelled"


async def test_from_thread_check_cancelled_raises_in_foreign_threads():
with pytest.raises(RuntimeError):
from_thread_check_cancelled()
q = stdlib_queue.Queue()
_core.start_thread_soon(from_thread_check_cancelled, lambda _: q.put(_))
with pytest.raises(RuntimeError):
q.get(timeout=1).unwrap()
2 changes: 1 addition & 1 deletion trio/_tests/verify_types_darwin.json
Expand Up @@ -40,7 +40,7 @@
],
"exportedSymbolCounts": {
"withAmbiguousType": 0,
"withKnownType": 631,
"withKnownType": 632,
"withUnknownType": 0
},
"ignoreUnknownTypesFromImports": true,
Expand Down
2 changes: 1 addition & 1 deletion trio/_tests/verify_types_linux.json
Expand Up @@ -28,7 +28,7 @@
],
"exportedSymbolCounts": {
"withAmbiguousType": 0,
"withKnownType": 628,
"withKnownType": 629,
"withUnknownType": 0
},
"ignoreUnknownTypesFromImports": true,
Expand Down
4 changes: 2 additions & 2 deletions trio/_tests/verify_types_windows.json
Expand Up @@ -7,7 +7,7 @@
"warningCount": 0
},
"typeCompleteness": {
"completenessScore": 0.9857369255150554,
"completenessScore": 0.9857594936708861,
"diagnostics": [
{
"message": "Return type annotation is missing",
Expand Down Expand Up @@ -144,7 +144,7 @@
],
"exportedSymbolCounts": {
"withAmbiguousType": 0,
"withKnownType": 622,
"withKnownType": 623,
"withUnknownType": 9
},
"ignoreUnknownTypesFromImports": true,
Expand Down