Skip to content
This repository was archived by the owner on Nov 23, 2017. It is now read-only.

Submitting a coroutine to an asyncio event loop #273

Closed
wants to merge 35 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
c4beaeb
add submit_to_loop and connect_futures
Sep 28, 2015
67884a7
add tests for submit_to_loop
Sep 28, 2015
0ad31d8
fix submit_to_loop test case
Sep 28, 2015
c214c1e
rename connect_futures to chain_future
Sep 28, 2015
7bafb68
make Future._copy_state a static method
Sep 29, 2015
da69a9f
update docstrings
Sep 29, 2015
5fbff3f
move submit_to_loop to tasks and remove_schedule
Sep 29, 2015
1bc0d7f
move submit_to_loop test case to test_tasks
Sep 29, 2015
48ebfbf
move _copy_state outside of Future
Sep 29, 2015
d264662
move _safe_callback inside chain_future as _safe_call
Sep 29, 2015
e7f5165
add better conditions for _safe_call
Sep 29, 2015
c3c9479
remove _safe_call
Sep 29, 2015
9a4b924
update chain_future docstring
Sep 29, 2015
59a0c3c
Change loop assignments in chain_future
Sep 29, 2015
b5ca258
use is to compare loops in chain_future
Sep 30, 2015
d04b220
fix concurrency issue in _copy_state
Sep 30, 2015
6211387
rename submit_to_loop to submit_coroutine
Sep 30, 2015
4c4be4b
improve _copy_state
Sep 30, 2015
62b644f
disambiguate error message in submit_coroutine
Sep 30, 2015
7d0df19
revert assert in _copy_state
Oct 1, 2015
3445dba
remove chain_future from __all__
Oct 1, 2015
5e95a8c
fix _copy_state
Oct 1, 2015
26fd022
add test for coroutine submission when the task is cancelled
Oct 1, 2015
76a16f9
rename chain_future to _chain_future
Oct 1, 2015
a197c8a
rename submit_coroutine to run_coroutine_threadsafe
Oct 1, 2015
52cdfd3
fix typo
Oct 1, 2015
86245f9
revert Future._copy_state
Oct 2, 2015
f172f4d
add _copy_state_to_concurrent_future
Oct 2, 2015
69d4b88
revert _copy_state tests
Oct 2, 2015
8f30100
move _copy_state inside _chain_future
Oct 2, 2015
79f7ee3
rename _copy_state_to_concurrent_future to _set_concurrent_future_sta…
Oct 2, 2015
d1776b6
rename _copy_state to _set_state inside _chain_future
Oct 2, 2015
30a3876
fix typo
Oct 2, 2015
b8f4451
rename 'other' argument to 'source' in set_concurrent_future_state
Oct 2, 2015
9cbeeda
rename callbacks in _chain_future
Oct 2, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 58 additions & 16 deletions asyncio/futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -390,22 +390,64 @@ def __iter__(self):
__await__ = __iter__ # make compatible with 'await' expression


def wrap_future(fut, *, loop=None):
"""Wrap concurrent.futures.Future object."""
if isinstance(fut, Future):
return fut
assert isinstance(fut, concurrent.futures.Future), \
'concurrent.futures.Future is expected, got {!r}'.format(fut)
if loop is None:
loop = events.get_event_loop()
new_future = Future(loop=loop)
def _set_concurrent_future_state(concurrent, source):
"""Copy state from a future to a concurrent.futures.Future."""
assert source.done()
if source.cancelled():
concurrent.cancel()
if not concurrent.set_running_or_notify_cancel():
return
exception = source.exception()
if exception is not None:
concurrent.set_exception(exception)
else:
result = source.result()
concurrent.set_result(result)


def _chain_future(source, destination):
"""Chain two futures so that when one completes, so does the other.

The result (or exception) of source will be copied to destination.
If destination is cancelled, source gets cancelled too.
Compatible with both asyncio.Future and concurrent.futures.Future.
"""
if not isinstance(source, (Future, concurrent.futures.Future)):
raise TypeError('A future is required for source argument')
if not isinstance(destination, (Future, concurrent.futures.Future)):
raise TypeError('A future is required for destination argument')
source_loop = source._loop if isinstance(source, Future) else None
dest_loop = destination._loop if isinstance(destination, Future) else None

def _set_state(future, other):
if isinstance(future, Future):
future._copy_state(other)
else:
_set_concurrent_future_state(future, other)

def _check_cancel_other(f):
if f.cancelled():
fut.cancel()
def _call_check_cancel(destination):
if destination.cancelled():
if source_loop is None or source_loop is dest_loop:
source.cancel()
else:
source_loop.call_soon_threadsafe(source.cancel)

new_future.add_done_callback(_check_cancel_other)
fut.add_done_callback(
lambda future: loop.call_soon_threadsafe(
new_future._copy_state, future))
def _call_set_state(source):
if dest_loop is None or dest_loop is source_loop:
_set_state(destination, source)
else:
dest_loop.call_soon_threadsafe(_set_state, destination, source)

destination.add_done_callback(_call_check_cancel)
source.add_done_callback(_call_set_state)


def wrap_future(future, *, loop=None):
"""Wrap concurrent.futures.Future object."""
if isinstance(future, Future):
return future
assert isinstance(future, concurrent.futures.Future), \
'concurrent.futures.Future is expected, got {!r}'.format(future)
new_future = Future(loop=loop)
_chain_future(future, new_future)
return new_future
18 changes: 17 additions & 1 deletion asyncio/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
__all__ = ['Task',
'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
'wait', 'wait_for', 'as_completed', 'sleep', 'async',
'gather', 'shield', 'ensure_future',
'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe',
]

import concurrent.futures
Expand Down Expand Up @@ -680,3 +680,19 @@ def _done_callback(inner):

inner.add_done_callback(_done_callback)
return outer


def run_coroutine_threadsafe(coro, loop):
"""Submit a coroutine object to a given event loop.

Return a concurrent.futures.Future to access the result.
"""
if not coroutines.iscoroutine(coro):
raise TypeError('A coroutine object is required')
future = concurrent.futures.Future()

def callback():
futures._chain_future(ensure_future(coro, loop=loop), future)

loop.call_soon_threadsafe(callback)
return future
2 changes: 0 additions & 2 deletions tests/test_futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,6 @@ def func_repr(func):
'<Future cancelled>')

def test_copy_state(self):
# Test the internal _copy_state method since it's being directly
# invoked in other modules.
f = asyncio.Future(loop=self.loop)
f.set_result(10)

Expand Down
67 changes: 67 additions & 0 deletions tests/test_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2082,5 +2082,72 @@ def outer():
self.assertIsInstance(f.exception(), RuntimeError)


class RunCoroutineThreadsafeTests(test_utils.TestCase):
"""Test case for futures.submit_to_loop."""

def setUp(self):
self.loop = self.new_test_loop(self.time_gen)

def time_gen(self):
"""Handle the timer."""
yield 0 # second
yield 1 # second

@asyncio.coroutine
def add(self, a, b, fail=False, cancel=False):
"""Wait 1 second and return a + b."""
yield from asyncio.sleep(1, loop=self.loop)
if fail:
raise RuntimeError("Fail!")
if cancel:
asyncio.tasks.Task.current_task(self.loop).cancel()
yield
return a + b

def target(self, fail=False, cancel=False, timeout=None):
"""Run add coroutine in the event loop."""
coro = self.add(1, 2, fail=fail, cancel=cancel)
future = asyncio.run_coroutine_threadsafe(coro, self.loop)
try:
return future.result(timeout)
finally:
future.done() or future.cancel()

def test_run_coroutine_threadsafe(self):
"""Test coroutine submission from a thread to an event loop."""
future = self.loop.run_in_executor(None, self.target)
result = self.loop.run_until_complete(future)
self.assertEqual(result, 3)

def test_run_coroutine_threadsafe_with_exception(self):
"""Test coroutine submission from a thread to an event loop
when an exception is raised."""
future = self.loop.run_in_executor(None, self.target, True)
with self.assertRaises(RuntimeError) as exc_context:
self.loop.run_until_complete(future)
self.assertIn("Fail!", exc_context.exception.args)

def test_run_coroutine_threadsafe_with_timeout(self):
"""Test coroutine submission from a thread to an event loop
when a timeout is raised."""
callback = lambda: self.target(timeout=0)
future = self.loop.run_in_executor(None, callback)
with self.assertRaises(asyncio.TimeoutError):
self.loop.run_until_complete(future)
# Clear the time generator and tasks
test_utils.run_briefly(self.loop)
# Check that there's no pending task (add has been cancelled)
for task in asyncio.Task.all_tasks(self.loop):
self.assertTrue(task.done())

def test_run_coroutine_threadsafe_task_cancelled(self):
"""Test coroutine submission from a tread to an event loop
when the task is cancelled."""
callback = lambda: self.target(cancel=True)
future = self.loop.run_in_executor(None, callback)
with self.assertRaises(asyncio.CancelledError):
self.loop.run_until_complete(future)


if __name__ == '__main__':
unittest.main()