Skip to content

Commit

Permalink
Merge pull request #1476 from bdarnell/async-statements
Browse files Browse the repository at this point in the history
Support Python 3.5 `async` statements
  • Loading branch information
bdarnell committed Aug 3, 2015
2 parents 959da8f + d7130c1 commit 916cf57
Show file tree
Hide file tree
Showing 9 changed files with 280 additions and 47 deletions.
5 changes: 4 additions & 1 deletion .travis.yml
Expand Up @@ -7,6 +7,7 @@ python:
- 3.2
- 3.3
- 3.4
- nightly
- pypy3

env:
Expand Down Expand Up @@ -47,7 +48,9 @@ script:
# We use "python -m coverage" instead of the "bin/coverage" script
# so we can pass additional arguments to python. However, this doesn't
# work on 2.6, so skip coverage on that version.
- if [[ $TRAVIS_PYTHON_VERSION != 2.6 ]]; then export TARGET="-m coverage run $TARGET"; fi
# coverage needs a function that was removed in python 3.6 so we can't
# run it with nightly cpython.
- if [[ $TRAVIS_PYTHON_VERSION != 2.6 && $TRAVIS_PYTHON_VERSION != nightly ]]; then export TARGET="-m coverage run $TARGET"; fi
- python $TARGET
- python $TARGET --ioloop=tornado.platform.select.SelectIOLoop
- python -O $TARGET
Expand Down
45 changes: 40 additions & 5 deletions tornado/gen.py
Expand Up @@ -108,6 +108,11 @@ def get(self):
except ImportError:
def isawaitable(x): return False

try:
import builtins # py3
except ImportError:
import __builtin__ as builtins


class KeyReuseError(Exception):
pass
Expand Down Expand Up @@ -333,7 +338,22 @@ class WaitIterator(object):
arguments were used in the construction of the `WaitIterator`,
``current_index`` will use the corresponding keyword).
On Python 3.5, `WaitIterator` implements the async iterator
protocol, so it can be used with the ``async for`` statement (note
that in this version the entire iteration is aborted if any value
raises an exception, while the previous example can continue past
individual errors)::
async for result in gen.WaitIterator(future1, future2):
print("Result {} received from {} at {}".format(
result, wait_iterator.current_future,
wait_iterator.current_index))
.. versionadded:: 4.1
.. versionchanged:: 4.3
Added ``async for`` support in Python 3.5.
"""
def __init__(self, *args, **kwargs):
if args and kwargs:
Expand Down Expand Up @@ -390,6 +410,16 @@ def _return_result(self, done):
self.current_future = done
self.current_index = self._unfinished.pop(done)

@coroutine
def __aiter__(self):
raise Return(self)

def __anext__(self):
if self.done():
# Lookup by name to silence pyflakes on older versions.
raise getattr(builtins, 'StopAsyncIteration')()
return self.next()


class YieldPoint(object):
"""Base class for objects that may be yielded from the generator.
Expand Down Expand Up @@ -624,11 +654,12 @@ def get_result(self):
def multi_future(children, quiet_exceptions=()):
"""Wait for multiple asynchronous futures in parallel.
Takes a list of ``Futures`` (but *not* other ``YieldPoints``) and returns
a new Future that resolves when all the other Futures are done.
If all the ``Futures`` succeeded, the returned Future's result is a list
of their results. If any failed, the returned Future raises the exception
of the first one to fail.
Takes a list of ``Futures`` or other yieldable objects (with the
exception of the legacy `.YieldPoint` interfaces) and returns a
new Future that resolves when all the other Futures are done. If
all the ``Futures`` succeeded, the returned Future's result is a
list of their results. If any failed, the returned Future raises
the exception of the first one to fail.
Instead of a list, the argument may also be a dictionary whose values are
Futures, in which case a parallel dictionary is returned mapping the same
Expand All @@ -649,12 +680,16 @@ def multi_future(children, quiet_exceptions=()):
If multiple ``Futures`` fail, any exceptions after the first (which is
raised) will be logged. Added the ``quiet_exceptions``
argument to suppress this logging for selected exception types.
.. versionchanged:: 4.3
Added support for other yieldable objects.
"""
if isinstance(children, dict):
keys = list(children.keys())
children = children.values()
else:
keys = None
children = list(map(convert_yielded, children))
assert all(is_future(i) for i in children)
unfinished_children = set(children)

Expand Down
48 changes: 47 additions & 1 deletion tornado/locks.py
Expand Up @@ -327,6 +327,20 @@ def worker(worker_id):
# Now the semaphore has been released.
print("Worker %d is done" % worker_id)
In Python 3.5, the semaphore itself can be used as an async context
manager::
async def worker(worker_id):
async with sem:
print("Worker %d is working" % worker_id)
await use_some_resource()
# Now the semaphore has been released.
print("Worker %d is done" % worker_id)
.. versionchanged:: 4.3
Added ``async with`` support in Python 3.5.
"""
def __init__(self, value=1):
super(Semaphore, self).__init__()
Expand Down Expand Up @@ -389,6 +403,14 @@ def __enter__(self):

__exit__ = __enter__

@gen.coroutine
def __aenter__(self):
yield self.acquire()

@gen.coroutine
def __aexit__(self, typ, value, tb):
self.release()


class BoundedSemaphore(Semaphore):
"""A semaphore that prevents release() being called too many times.
Expand Down Expand Up @@ -418,7 +440,7 @@ class Lock(object):
Releasing an unlocked lock raises `RuntimeError`.
`acquire` supports the context manager protocol:
`acquire` supports the context manager protocol in all Python versions:
>>> from tornado import gen, locks
>>> lock = locks.Lock()
Expand All @@ -430,6 +452,22 @@ class Lock(object):
... pass
...
... # Now the lock is released.
In Python 3.5, `Lock` also supports the async context manager
protocol. Note that in this case there is no `acquire`, because
``async with`` includes both the ``yield`` and the ``acquire``
(just as it does with `threading.Lock`):
>>> async def f(): # doctest: +SKIP
... async with lock:
... # Do something holding the lock.
... pass
...
... # Now the lock is released.
.. versionchanged:: 3.5
Added ``async with`` support in Python 3.5.
"""
def __init__(self):
self._block = BoundedSemaphore(value=1)
Expand Down Expand Up @@ -464,3 +502,11 @@ def __enter__(self):
"Use Lock like 'with (yield lock)', not like 'with lock'")

__exit__ = __enter__

@gen.coroutine
def __aenter__(self):
yield self.acquire()

@gen.coroutine
def __aexit__(self, typ, value, tb):
self.release()
27 changes: 27 additions & 0 deletions tornado/queues.py
Expand Up @@ -44,6 +44,14 @@ def on_timeout():
lambda _: io_loop.remove_timeout(timeout_handle))


class _QueueIterator(object):
def __init__(self, q):
self.q = q

def __anext__(self):
return self.q.get()


class Queue(object):
"""Coordinate producer and consumer coroutines.
Expand Down Expand Up @@ -96,6 +104,21 @@ def main():
Doing work on 3
Doing work on 4
Done
In Python 3.5, `Queue` implements the async iterator protocol, so
``consumer()`` could be rewritten as::
async def consumer():
async for item in q:
try:
print('Doing work on %s' % item)
yield gen.sleep(0.01)
finally:
q.task_done()
.. versionchanged:: 4.3
Added ``async for`` support in Python 3.5.
"""
def __init__(self, maxsize=0):
if maxsize is None:
Expand Down Expand Up @@ -220,6 +243,10 @@ def join(self, timeout=None):
"""
return self._finished.wait(timeout)

@gen.coroutine
def __aiter__(self):
return _QueueIterator(self)

# These three are overridable in subclasses.
def _init(self):
self._queue = collections.deque()
Expand Down
94 changes: 68 additions & 26 deletions tornado/test/gen_test.py
Expand Up @@ -6,7 +6,6 @@
import sys
import textwrap
import time
import platform
import weakref

from tornado.concurrent import return_future, Future
Expand All @@ -16,7 +15,7 @@
from tornado.log import app_log
from tornado import stack_context
from tornado.testing import AsyncHTTPTestCase, AsyncTestCase, ExpectLog, gen_test
from tornado.test.util import unittest, skipOnTravis
from tornado.test.util import unittest, skipOnTravis, skipBefore33, skipBefore35, skipNotCPython, exec_test
from tornado.web import Application, RequestHandler, asynchronous, HTTPError

from tornado import gen
Expand All @@ -26,11 +25,6 @@
except ImportError:
futures = None

skipBefore33 = unittest.skipIf(sys.version_info < (3, 3), 'PEP 380 (yield from) not available')
skipBefore35 = unittest.skipIf(sys.version_info < (3, 5), 'PEP 492 (async/await) not available')
skipNotCPython = unittest.skipIf(platform.python_implementation() != 'CPython',
'Not CPython implementation')


class GenEngineTest(AsyncTestCase):
def setUp(self):
Expand Down Expand Up @@ -694,19 +688,13 @@ def f():
@skipBefore33
@gen_test
def test_async_return(self):
# It is a compile-time error to return a value in a generator
# before Python 3.3, so we must test this with exec.
# Flatten the real global and local namespace into our fake globals:
# it's all global from the perspective of f().
global_namespace = dict(globals(), **locals())
local_namespace = {}
exec(textwrap.dedent("""
namespace = exec_test(globals(), locals(), """
@gen.coroutine
def f():
yield gen.Task(self.io_loop.add_callback)
return 42
"""), global_namespace, local_namespace)
result = yield local_namespace['f']()
""")
result = yield namespace['f']()
self.assertEqual(result, 42)
self.finished = True

Expand All @@ -716,16 +704,14 @@ def test_async_early_return(self):
# A yield statement exists but is not executed, which means
# this function "returns" via an exception. This exception
# doesn't happen before the exception handling is set up.
global_namespace = dict(globals(), **locals())
local_namespace = {}
exec(textwrap.dedent("""
namespace = exec_test(globals(), locals(), """
@gen.coroutine
def f():
if True:
return 42
yield gen.Task(self.io_loop.add_callback)
"""), global_namespace, local_namespace)
result = yield local_namespace['f']()
""")
result = yield namespace['f']()
self.assertEqual(result, 42)
self.finished = True

Expand All @@ -735,17 +721,33 @@ def test_async_await(self):
# This test verifies that an async function can await a
# yield-based gen.coroutine, and that a gen.coroutine
# (the test method itself) can yield an async function.
global_namespace = dict(globals(), **locals())
local_namespace = {}
exec(textwrap.dedent("""
namespace = exec_test(globals(), locals(), """
async def f():
await gen.Task(self.io_loop.add_callback)
return 42
"""), global_namespace, local_namespace)
result = yield local_namespace['f']()
""")
result = yield namespace['f']()
self.assertEqual(result, 42)
self.finished = True

@skipBefore35
@gen_test
def test_async_await_mixed_multi(self):
namespace = exec_test(globals(), locals(), """
async def f1():
await gen.Task(self.io_loop.add_callback)
return 42
""")

@gen.coroutine
def f2():
yield gen.Task(self.io_loop.add_callback)
raise gen.Return(43)

results = yield [namespace['f1'](), f2()]
self.assertEqual(results, [42, 43])
self.finished = True

@gen_test
def test_sync_return_no_value(self):
@gen.coroutine
Expand Down Expand Up @@ -1283,6 +1285,46 @@ def test_iterator(self):
self.assertEqual(g.current_index, 3, 'wrong index')
i += 1

@skipBefore35
@gen_test
def test_iterator_async_await(self):
# Recreate the previous test with py35 syntax. It's a little clunky
# because of the way the previous test handles an exception on
# a single iteration.
futures = [Future(), Future(), Future(), Future()]
self.finish_coroutines(0, futures)
self.finished = False

namespace = exec_test(globals(), locals(), """
async def f():
i = 0
g = gen.WaitIterator(*futures)
try:
async for r in g:
if i == 0:
self.assertEqual(r, 24, 'iterator value incorrect')
self.assertEqual(g.current_index, 2, 'wrong index')
else:
raise Exception("expected exception on iteration 1")
i += 1
except ZeroDivisionError:
i += 1
async for r in g:
if i == 2:
self.assertEqual(r, 42, 'iterator value incorrect')
self.assertEqual(g.current_index, 1, 'wrong index')
elif i == 3:
self.assertEqual(r, 84, 'iterator value incorrect')
self.assertEqual(g.current_index, 3, 'wrong index')
else:
raise Exception("didn't expect iteration %d" % i)
i += 1
self.finished = True
""")
yield namespace['f']()
self.assertTrue(self.finished)


@gen_test
def test_no_ref(self):
# In this usage, there is no direct hard reference to the
Expand Down

0 comments on commit 916cf57

Please sign in to comment.