Skip to content

Commit

Permalink
Merge pull request #732 from njsmith/detach-coroutine
Browse files Browse the repository at this point in the history
Support handing off coroutines between Trio and other runners
  • Loading branch information
njsmith committed Oct 16, 2018
2 parents 7f58c05 + c503b2e commit 6d1f968
Show file tree
Hide file tree
Showing 7 changed files with 301 additions and 4 deletions.
38 changes: 38 additions & 0 deletions docs/source/reference-hazmat.rst
Expand Up @@ -552,3 +552,41 @@ Task API
used to share data between the different tasks involved in
putting a task to sleep and then waking it up again. (See
:func:`wait_task_rescheduled` for details.)


.. _live-coroutine-handoff:

Handing off live coroutine objects between coroutine runners
------------------------------------------------------------

Internally, Python's async/await syntax is built around the idea of
"coroutine objects" and "coroutine runners". A coroutine object
represents the state of an async callstack. But by itself, this is
just a static object that sits there. If you want it to do anything,
you need a coroutine runner to push it forward. Every Trio task has an
associated coroutine object (see :data:`Task.coro`), and the Trio
scheduler acts as their coroutine runner.

But of course, Trio isn't the only coroutine runner in Python –
:mod:`asyncio` has one, other event loops have them, you can even
define your own.

And in some very, very unusual circumstances, it even makes sense to
transfer a single coroutine object back and forth between different
coroutine runners. That's what this section is about. This is an
*extremely* exotic use case, and assumes a lot of expertise in how
Python async/await works internally. For motivating examples, see
`trio-asyncio issue #42
<https://github.com/python-trio/trio-asyncio/issues/42>`__, and `trio
issue #649 <https://github.com/python-trio/trio/issues/649>`__. For
more details on how coroutines work, we recommend André Caron's `A
tale of event loops
<https://github.com/AndreLouisCaron/a-tale-of-event-loops>`__, or
going straight to `PEP 492
<https://www.python.org/dev/peps/pep-0492/>`__ for the full details.

.. autofunction:: permanently_detach_coroutine_object

.. autofunction:: temporarily_detach_coroutine_object

.. autofunction:: reattach_detached_coroutine_object
6 changes: 6 additions & 0 deletions newsfragments/649.feature.rst
@@ -0,0 +1,6 @@
New :mod:`trio.hazmat` features to allow cleanly switching live
coroutine objects between Trio and other coroutine runners. Frankly,
we're not even sure this is a good idea, but we want to `try it out in
trio-asyncio
<https://github.com/python-trio/trio-asyncio/issues/42>`__, so here we
are. For details see :ref:`live-coroutine-handoff`.
9 changes: 7 additions & 2 deletions trio/_core/__init__.py
Expand Up @@ -21,15 +21,20 @@ def _public(fn):

from ._multierror import MultiError

from ._traps import cancel_shielded_checkpoint, Abort, wait_task_rescheduled

from ._ki import (
enable_ki_protection, disable_ki_protection, currently_ki_protected
)

# TODO: make the _run namespace a lot less magical
from ._run import *

# Has to come after _run to resolve a circular import
from ._traps import (
cancel_shielded_checkpoint, Abort, wait_task_rescheduled,
temporarily_detach_coroutine_object, permanently_detach_coroutine_object,
reattach_detached_coroutine_object
)

from ._entry_queue import TrioToken

from ._parking_lot import ParkingLot
Expand Down
4 changes: 4 additions & 0 deletions trio/_core/_run.py
Expand Up @@ -31,6 +31,7 @@
Abort,
wait_task_rescheduled,
CancelShieldedCheckpoint,
PermanentlyDetachCoroutineObject,
WaitTaskRescheduled,
)
from .. import _core
Expand Down Expand Up @@ -1481,6 +1482,9 @@ def run_impl(runner, async_fn, args):
if runner.ki_pending and task is runner.main_task:
task._attempt_delivery_of_pending_ki()
task._attempt_delivery_of_any_pending_cancel()
elif type(msg) is PermanentlyDetachCoroutineObject:
# Pretend the task just exited with the given outcome
runner.task_exited(task, msg.final_outcome)
else:
exc = TypeError(
"trio.run received unrecognized yield message {!r}. "
Expand Down
109 changes: 107 additions & 2 deletions trio/_core/_traps.py
@@ -1,12 +1,13 @@
# These are the only 2 functions that ever yield back to the task runner.
# These are the only functions that ever yield back to the task runner.

import types
import enum
from functools import wraps

import attr
import outcome

__all__ = ["cancel_shielded_checkpoint", "Abort", "wait_task_rescheduled"]
from . import _run


# Helper for the bottommost 'yield'. You can't use 'yield' inside an async
Expand Down Expand Up @@ -163,3 +164,107 @@ def abort(inner_raise_cancel):
"""
return (await _async_yield(WaitTaskRescheduled(abort_func))).unwrap()


# Not exported in the trio._core namespace, but imported directly by _run.
@attr.s(frozen=True)
class PermanentlyDetachCoroutineObject:
final_outcome = attr.ib()


async def permanently_detach_coroutine_object(final_outcome):
"""Permanently detach the current task from the Trio scheduler.
Normally, a Trio task doesn't exit until its coroutine object exits. When
you call this function, Trio acts like the coroutine object just exited
and the task terminates with the given outcome. This is useful if you want
to permanently switch the coroutine object over to a different coroutine
runner.
When the calling coroutine enters this function it's running under Trio,
and when the function returns it's running under the foreign coroutine
runner.
You should make sure that the coroutine object has released any
Trio-specific resources it has acquired (e.g. nurseries).
Args:
final_outcome (outcome.Outcome): Trio acts as if the current task exited
with the given return value or exception.
Returns or raises whatever value or exception the new coroutine runner
uses to resume the coroutine.
"""
if _run.current_task().child_nurseries:
raise RuntimeError(
"can't permanently detach a coroutine object with open nurseries"
)
return await _async_yield(PermanentlyDetachCoroutineObject(final_outcome))


async def temporarily_detach_coroutine_object(abort_func):
"""Temporarily detach the current coroutine object from the Trio
scheduler.
When the calling coroutine enters this function it's running under Trio,
and when the function returns it's running under the foreign coroutine
runner.
The Trio :class:`Task` will continue to exist, but will be suspended until
you use :func:`reattach_detached_coroutine_object` to resume it. In the
mean time, you can use another coroutine runner to schedule the coroutine
object. In fact, you have to – the function doesn't return until the
coroutine is advanced from outside.
Note that you'll need to save the current :class:`Task` object to later
resume; you can retrieve it with :func:`current_task`. You can also use
this :class:`Task` object to retrieve the coroutine object – see
:data:`Task.coro`.
Args:
abort_func: Same as for :func:`wait_task_rescheduled`, except that it
must return :data:`Abort.FAILED`. (If it returned
:data:`Abort.SUCCEEDED`, then Trio would attempt to reschedule the
detached task directly without going through
:func:`reattach_detached_coroutine_object`, which would be bad.)
Your ``abort_func`` should still arrange for whatever the coroutine
object is doing to be cancelled, and then reattach to Trio and call
the ``raise_cancel`` callback, if possible.
Returns or raises whatever value or exception the new coroutine runner
uses to resume the coroutine.
"""
return await _async_yield(WaitTaskRescheduled(abort_func))


async def reattach_detached_coroutine_object(task, yield_value):
"""Reattach a coroutine object that was detached using
:func:`temporarily_detach_coroutine_object`.
When the calling coroutine enters this function it's running under the
foreign coroutine runner, and when the function returns it's running under
Trio.
This must be called from inside the coroutine being resumed, and yields
whatever value you pass in. (Presumably you'll pass a value that will
cause the current coroutine runner to stop scheduling this task.) Then the
coroutine is resumed by the Trio scheduler at the next opportunity.
Args:
task (Task): The Trio task object that the current coroutine was
detached from.
yield_value (object): The object to yield to the current coroutine
runner.
"""
# This is a kind of crude check – in particular, it can fail if the
# passed-in task is where the coroutine *runner* is running. But this is
# an experts-only interface, and there's no easy way to do a more accurate
# check, so I guess that's OK.
if not task.coro.cr_running:
raise RuntimeError("given task does not match calling coroutine")
_run.reschedule(task, outcome.Value("reattaching"))
value = await _async_yield(yield_value)
assert value == outcome.Value("reattaching")
136 changes: 136 additions & 0 deletions trio/_core/tests/test_run.py
Expand Up @@ -4,6 +4,7 @@
import sys
import threading
import time
import types
import warnings
from contextlib import contextmanager
from math import inf
Expand Down Expand Up @@ -2015,3 +2016,138 @@ async def test_Task_custom_sleep_data():
assert task.custom_sleep_data == 1
await _core.checkpoint()
assert task.custom_sleep_data is None


@types.coroutine
def async_yield(value):
yield value


async def test_permanently_detach_coroutine_object():
task = None
pdco_outcome = None

async def detachable_coroutine(task_outcome, yield_value):
await sleep(0)
nonlocal task, pdco_outcome
task = _core.current_task()
pdco_outcome = await outcome.acapture(
_core.permanently_detach_coroutine_object, task_outcome
)
await async_yield(yield_value)

async with _core.open_nursery() as nursery:
nursery.start_soon(
detachable_coroutine, outcome.Value(None), "I'm free!"
)

# If we get here then Trio thinks the task has exited... but the coroutine
# is still iterable
assert pdco_outcome is None
assert task.coro.send("be free!") == "I'm free!"
assert pdco_outcome == outcome.Value("be free!")
with pytest.raises(StopIteration):
task.coro.send(None)

# Check the exception paths too
task = None
pdco_outcome = None
with pytest.raises(KeyError):
async with _core.open_nursery() as nursery:
nursery.start_soon(
detachable_coroutine, outcome.Error(KeyError()), "uh oh"
)
throw_in = ValueError()
assert task.coro.throw(throw_in) == "uh oh"
assert pdco_outcome == outcome.Error(throw_in)
with pytest.raises(StopIteration):
task.coro.send(None)

async def bad_detach():
async with _core.open_nursery():
with pytest.raises(RuntimeError) as excinfo:
await _core.permanently_detach_coroutine_object(
outcome.Value(None)
)
assert "open nurser" in str(excinfo.value)

async with _core.open_nursery() as nursery:
nursery.start_soon(bad_detach)


async def test_detach_and_reattach_coroutine_object():
unrelated_task = None
task = None

async def unrelated_coroutine():
nonlocal unrelated_task
unrelated_task = _core.current_task()

async def reattachable_coroutine():
await sleep(0)

nonlocal task
task = _core.current_task()

def abort_fn(_): # pragma: no cover
return _core.Abort.FAILED

got = await _core.temporarily_detach_coroutine_object(abort_fn)
assert got == "not trio!"

await async_yield(1)
await async_yield(2)

with pytest.raises(RuntimeError) as excinfo:
await _core.reattach_detached_coroutine_object(
unrelated_task, None
)
assert "does not match" in str(excinfo.value)

await _core.reattach_detached_coroutine_object(task, "byebye")

await sleep(0)

async with _core.open_nursery() as nursery:
nursery.start_soon(unrelated_coroutine)
nursery.start_soon(reattachable_coroutine)
await wait_all_tasks_blocked()
assert unrelated_task is not None
assert task is not None

# Okay, it's detached. Here's our coroutine runner:
assert task.coro.send("not trio!") == 1
assert task.coro.send(None) == 2
assert task.coro.send(None) == "byebye"

# Now it's been reattached, and we can leave the nursery


async def test_detached_coroutine_cancellation():
abort_fn_called = False
task = None

async def reattachable_coroutine():
await sleep(0)

nonlocal task
task = _core.current_task()

def abort_fn(_):
nonlocal abort_fn_called
abort_fn_called = True
return _core.Abort.FAILED

await _core.temporarily_detach_coroutine_object(abort_fn)
await _core.reattach_detached_coroutine_object(task, None)
with pytest.raises(_core.Cancelled):
await sleep(0)

async with _core.open_nursery() as nursery:
nursery.start_soon(reattachable_coroutine)
await wait_all_tasks_blocked()
assert task is not None
nursery.cancel_scope.cancel()
task.coro.send(None)

assert abort_fn_called
3 changes: 3 additions & 0 deletions trio/hazmat.py
Expand Up @@ -39,6 +39,9 @@
"notify_socket_close",
"TrioToken",
"current_trio_token",
"temporarily_detach_coroutine_object",
"permanently_detach_coroutine_object",
"reattach_detached_coroutine_object",
# kqueue symbols
"current_kqueue",
"monitor_kevent",
Expand Down

0 comments on commit 6d1f968

Please sign in to comment.