Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Add tornado.gen module for simpler generator-based async code.

  • Loading branch information...
commit e4ead597956457aada766b09d624a3d9f7b888d9 1 parent c0142d2
Ben Darnell bdarnell authored
250 tornado/gen.py
View
@@ -0,0 +1,250 @@
+"""``tornado.gen`` is a generator-based interface to make it easier to
+work in an asynchronous environment. Code using the ``gen`` module
+is technically asynchronous, but it is written as a single generator
+instead of a collection of separate functions.
+
+For example, the following asynchronous handler::
+
+ class AsyncHandler(RequestHandler):
+ @asynchronous
+ def get(self):
+ http_client = AsyncHTTPClient()
+ http_client.fetch("http://example.com",
+ callback=self.on_fetch)
+
+ def on_fetch(self, response):
+ do_something_with_response(response)
+ self.render("template.html")
+
+could be written with ``gen`` as::
+
+ class GenAsyncHandler(RequestHandler):
+ @asynchronous
+ @gen.engine
+ def get(self):
+ http_client = AsyncHTTPClient()
+ response = yield gen.Task(http_client.fetch("http://example.com"))
+ do_something_with_response(response)
+ self.render("template.html")
+
+`Task` works with any function that takes a ``callback`` keyword argument
+(and runs that callback with zero or one arguments). For more complicated
+interfaces, `Task` can be split into two parts: `Callback` and `Wait`::
+
+ class GenAsyncHandler2(RequestHandler):
+ @asynchronous
+ @gen.engine
+ def get(self):
+ http_client = AsyncHTTPClient()
+ http_client.fetch("http://example.com",
+ callback=(yield gen.Callback("key"))
+ response = yield gen.Wait("key")
+ do_something_with_response(response)
+ self.render("template.html")
+
+The ``key`` argument to `Callback` and `Wait` allows for multiple
+asynchronous operations to proceed in parallel: yield several
+callbacks with different keys, then wait for them once all the async
+operations have started.
+"""
+
+import functools
+import types
+
+class KeyReuseError(Exception): pass
+class UnknownKeyError(Exception): pass
+class LeakedCallbackError(Exception): pass
+class BadYieldError(Exception): pass
+
+def engine(func):
+ """Decorator for asynchronous generators.
+
+ Any generator that yields objects from this module must be wrapped
+ in this decorator. The decorator only works on functions that are
+ already asynchronous. For `~tornado.web.RequestHandler`
+ ``get``/``post``/etc methods, this means that both the `tornado.gen.engine`
+ and `tornado.web.asynchronous` decorators must be used (in either order).
+ In most other cases, it means that it doesn't make sense to use
+ ``gen.engine`` on functions that don't already take a callback argument.
+ """
+ @functools.wraps(func)
+ def wrapper(*args, **kwargs):
+ gen = func(*args, **kwargs)
+ if isinstance(gen, types.GeneratorType):
+ Runner(gen).run()
+ return
+ assert gen is None, gen
+ # no yield, so we're done
+ return wrapper
+
+class YieldPoint(object):
+ """Base class for objects that may be yielded from the generator."""
+ def start(self, runner):
+ """Called by the runner after the generator has yielded.
+
+ No other methods will be called on this object before ``start``.
+ """
+ raise NotImplementedError()
+
+ def is_ready(self):
+ """Called by the runner to determine whether to resume the generator.
+
+ May be called repeatedly until it returns True.
+ """
+ raise NotImplementedError()
+
+ def get_result(self):
+ """Returns the value to use as the result of the yield expression.
+
+ This method will only be called once, and only after `is_ready`
+ has returned true.
+ """
+ raise NotImplementedError()
+
+class Callback(YieldPoint):
+ """Returns a callable object that will allow a matching `Wait` to proceed.
+
+ The key may be any value suitable for use as a dictionary key, and is
+ used to match ``Callbacks`` to their corresponding ``Waits``. The key
+ must be unique among outstanding callbacks within a single run of the
+ generator function, but may be reused across different runs of the same
+ function (so constants generally work fine).
+
+ The callback may be called with zero or one arguments; if an argument
+ is given it will be returned by `Wait`.
+ """
+ def __init__(self, key):
+ self.key = key
+
+ def start(self, runner):
+ self.runner = runner
+ runner.register_callback(self.key)
+
+ def is_ready(self):
+ return True
+
+ def get_result(self):
+ return self.callback
+
+ def callback(self, arg=None):
+ self.runner.set_result(self.key, arg)
+
+class Wait(YieldPoint):
+ """Returns the argument passed to the result of a previous `Callback`."""
+ def __init__(self, key):
+ self.key = key
+
+ def start(self, runner):
+ self.runner = runner
+
+ def is_ready(self):
+ return self.runner.is_ready(self.key)
+
+ def get_result(self):
+ return self.runner.pop_result(self.key)
+
+class Task(YieldPoint):
+ """Runs a single asynchronous operation.
+
+ Takes a function (and optional additional arguments) and runs it with
+ those arguments plus a ``callback`` keyword argument. The argument passed
+ to the callback is returned as the result of the yield expression.
+
+ A `Task` is equivalent to a `Callback`/`Wait` pair (with a unique
+ key generated automatically)::
+
+ result = yield gen.Task(func, args)
+
+ func(args, callback=(yield gen.Callback(key)))
+ result = yield gen.Wait(key)
+ """
+ def __init__(self, func, *args, **kwargs):
+ assert "callback" not in kwargs
+ kwargs["callback"] = self.callback
+ self.func = functools.partial(func, *args, **kwargs)
+
+ def start(self, runner):
+ self.runner = runner
+ self.key = object()
+ runner.register_callback(self.key)
+ self.func()
+
+ def is_ready(self):
+ return self.runner.is_ready(self.key)
+
+ def get_result(self):
+ return self.runner.pop_result(self.key)
+
+ def callback(self, arg=None):
+ self.runner.set_result(self.key, arg)
+
+class _NullYieldPoint(YieldPoint):
+ def start(self, runner):
+ pass
+ def is_ready(self):
+ return True
+ def get_result(self):
+ return None
+
+class Runner(object):
+ """Internal implementation of `tornado.gen.engine`.
+
+ Maintains information about pending callbacks and their results.
+ """
+ def __init__(self, gen):
+ self.gen = gen
+ self.yield_point = _NullYieldPoint()
+ self.pending_callbacks = set()
+ self.results = {}
+ self.waiting = None
+ self.running = False
+
+ def register_callback(self, key):
+ """Adds ``key`` to the list of callbacks."""
+ if key in self.pending_callbacks:
+ raise KeyReuseError("key %r is already pending" % key)
+ self.pending_callbacks.add(key)
+
+ def is_ready(self, key):
+ """Returns true if a result is available for ``key``."""
+ if key not in self.pending_callbacks:
+ raise UnknownKeyError("key %r is not pending" % key)
+ return key in self.results
+
+ def set_result(self, key, result):
+ """Sets the result for ``key`` and attempts to resume the generator."""
+ self.results[key] = result
+ self.run()
+
+ def pop_result(self, key):
+ """Returns the result for ``key`` and unregisters it."""
+ self.pending_callbacks.remove(key)
+ return self.results.pop(key)
+
+ def run(self):
+ """Starts or resumes the generator, running until it reaches a
+ yield point that is not ready.
+ """
+ if self.running:
+ return
+ try:
+ self.running = True
+ while True:
+ if not self.yield_point.is_ready():
+ return
+ next = self.yield_point.get_result()
+ try:
+ yielded = self.gen.send(next)
+ except StopIteration:
+ if self.pending_callbacks:
+ raise LeakedCallbackError(
+ "finished without waiting for callbacks %r" %
+ self.pending_callbacks)
+ return
+ if not isinstance(yielded, YieldPoint):
+ raise BadYieldError("yielded unknown object %r" % yielded)
+ self.yield_point = yielded
+ self.yield_point.start(self)
+ finally:
+ self.running = False
+
158 tornado/test/gen_test.py
View
@@ -0,0 +1,158 @@
+from tornado.escape import url_escape
+from tornado.httpclient import AsyncHTTPClient
+from tornado.testing import AsyncHTTPTestCase, AsyncTestCase, LogTrapTestCase
+from tornado.util import b
+from tornado.web import Application, RequestHandler, asynchronous
+
+from tornado import gen
+
+class GenTest(AsyncTestCase):
+ def run_gen(self, f):
+ f()
+ self.wait()
+
+ def test_no_yield(self):
+ @gen.engine
+ def f():
+ self.stop()
+ self.run_gen(f)
+
+ def test_inline_cb(self):
+ @gen.engine
+ def f():
+ (yield gen.Callback("k1"))()
+ res = yield gen.Wait("k1")
+ assert res is None
+ self.stop()
+ self.run_gen(f)
+
+ def test_ioloop_cb(self):
+ @gen.engine
+ def f():
+ self.io_loop.add_callback((yield gen.Callback("k1")))
+ yield gen.Wait("k1")
+ self.stop()
+ self.run_gen(f)
+
+ def test_exception_phase1(self):
+ @gen.engine
+ def f():
+ 1/0
+ self.assertRaises(ZeroDivisionError, self.run_gen, f)
+
+ def test_exception_phase2(self):
+ @gen.engine
+ def f():
+ self.io_loop.add_callback((yield gen.Callback("k1")))
+ yield gen.Wait("k1")
+ 1/0
+ self.assertRaises(ZeroDivisionError, self.run_gen, f)
+
+ def test_with_arg(self):
+ @gen.engine
+ def f():
+ (yield gen.Callback("k1"))(42)
+ res = yield gen.Wait("k1")
+ self.assertEqual(42, res)
+ self.stop()
+ self.run_gen(f)
+
+ def test_key_reuse(self):
+ @gen.engine
+ def f():
+ yield gen.Callback("k1")
+ yield gen.Callback("k1")
+ self.stop()
+ self.assertRaises(gen.KeyReuseError, self.run_gen, f)
+
+ def test_key_mismatch(self):
+ @gen.engine
+ def f():
+ yield gen.Callback("k1")
+ yield gen.Wait("k2")
+ self.stop()
+ self.assertRaises(gen.UnknownKeyError, self.run_gen, f)
+
+ def test_leaked_callback(self):
+ @gen.engine
+ def f():
+ yield gen.Callback("k1")
+ self.stop()
+ self.assertRaises(gen.LeakedCallbackError, self.run_gen, f)
+
+ def test_parallel_callback(self):
+ @gen.engine
+ def f():
+ for k in range(3):
+ self.io_loop.add_callback((yield gen.Callback(k)))
+ yield gen.Wait(1)
+ self.io_loop.add_callback((yield gen.Callback(3)))
+ yield gen.Wait(0)
+ yield gen.Wait(3)
+ yield gen.Wait(2)
+ self.stop()
+ self.run_gen(f)
+
+ def test_bogus_yield(self):
+ @gen.engine
+ def f():
+ yield 42
+ self.assertRaises(gen.BadYieldError, self.run_gen, f)
+
+ def test_reuse(self):
+ @gen.engine
+ def f():
+ self.io_loop.add_callback((yield gen.Callback(0)))
+ yield gen.Wait(0)
+ self.stop()
+ self.run_gen(f)
+ self.run_gen(f)
+
+ def test_task(self):
+ @gen.engine
+ def f():
+ yield gen.Task(self.io_loop.add_callback)
+ self.stop()
+ self.run_gen(f)
+
+
+class GenSequenceHandler(RequestHandler):
+ @asynchronous
+ @gen.engine
+ def get(self):
+ self.io_loop = self.request.connection.stream.io_loop
+ self.io_loop.add_callback((yield gen.Callback("k1")))
+ yield gen.Wait("k1")
+ self.write("1")
+ self.io_loop.add_callback((yield gen.Callback("k2")))
+ yield gen.Wait("k2")
+ self.write("2")
+ # reuse an old key
+ self.io_loop.add_callback((yield gen.Callback("k1")))
+ yield gen.Wait("k1")
+ self.finish("3")
+
+class GenTaskHandler(RequestHandler):
+ @asynchronous
+ @gen.engine
+ def get(self):
+ io_loop = self.request.connection.stream.io_loop
+ client = AsyncHTTPClient(io_loop=io_loop)
+ response = yield gen.Task(client.fetch, self.get_argument('url'))
+ response.rethrow()
+ self.finish(b("got response: ") + response.body)
+
+class GenWebTest(AsyncHTTPTestCase, LogTrapTestCase):
+ def get_app(self):
+ return Application([
+ ('/sequence', GenSequenceHandler),
+ ('/task', GenTaskHandler),
+ ])
+
+ def test_sequence_handler(self):
+ response = self.fetch('/sequence')
+ self.assertEqual(response.body, b("123"))
+
+ def test_task_handler(self):
+ response = self.fetch('/task?url=%s' % url_escape(self.get_url('/sequence')))
+ self.assertEqual(response.body, b("got response: 123"))
1  tornado/test/runtests.py
View
@@ -7,6 +7,7 @@
'tornado.util.doctests',
'tornado.test.curl_httpclient_test',
'tornado.test.escape_test',
+ 'tornado.test.gen_test',
'tornado.test.httpclient_test',
'tornado.test.httpserver_test',
'tornado.test.httputil_test',
22 website/sphinx/gen.rst
View
@@ -0,0 +1,22 @@
+``tornado.gen`` --- Simplify asynchronous code
+==============================================
+
+.. automodule:: tornado.gen
+
+ Decorator
+ ---------
+
+ .. autofunction:: engine
+
+ Yield points
+ ------------
+
+ Instances of the following classes may be used in yield expressions
+ in the generator.
+
+ .. autoclass:: Task
+
+ .. autoclass:: Callback
+
+ .. autoclass:: Wait
+
1  website/sphinx/utilities.rst
View
@@ -4,6 +4,7 @@ Utilities
.. toctree::
autoreload
+ gen
httputil
options
process
Please sign in to comment.
Something went wrong with that request. Please try again.