Permalink
Browse files

Add magic for yielding futures.

Introduce IOLoop.current() as a thread-local counterpart to IOLoop.instance().

gen.engine now recognizes Futures directly.
  • Loading branch information...
1 parent 5587a45 commit 0cc0df770b72177a6df289f065dd4cec260779b5 @bdarnell bdarnell committed Sep 3, 2012
Showing with 28 additions and 5 deletions.
  1. +5 −1 tornado/gen.py
  2. +19 −0 tornado/ioloop.py
  3. +2 −4 tornado/test/concurrent_test.py
  4. +2 −0 tornado/testing.py
View
@@ -69,6 +69,7 @@ def get(self):
import sys
import types
+from tornado.concurrent import Future
from tornado.ioloop import IOLoop
from tornado.stack_context import ExceptionStackContext
@@ -251,7 +252,7 @@ def get_result(self):
class YieldFuture(YieldPoint):
def __init__(self, future, io_loop=None):
self.future = future
- self.io_loop = io_loop or IOLoop.instance()
+ self.io_loop = io_loop or IOLoop.current()
def start(self, runner):
self.runner = runner
@@ -379,6 +380,9 @@ def run(self):
raise
if isinstance(yielded, list):
yielded = Multi(yielded)
+ if isinstance(yielded, Future):
+ # TODO: lists of futures
+ yielded = YieldFuture(yielded)
if isinstance(yielded, YieldPoint):
self.yield_point = yielded
try:
View
@@ -114,6 +114,8 @@ def connection_ready(sock, fd, events):
# Global lock for creating global IOLoop instance
_instance_lock = threading.Lock()
+ _current = threading.local()
+
def __init__(self, impl=None):
self._impl = impl or _poll()
if hasattr(self._impl, 'fileno'):
@@ -173,6 +175,20 @@ def install(self):
assert not IOLoop.initialized()
IOLoop._instance = self
+ @staticmethod
+ def current():
+ current = getattr(IOLoop._current, "instance", None)
+ if current is None:
+ raise ValueError("no current IOLoop")
+ return current
+
+ def make_current(self):
+ IOLoop._current.instance = self
+
+ def clear_current(self):
+ assert IOLoop._current.instance is self
+ IOLoop._current.instance = None
+
def close(self, all_fds=False):
"""Closes the IOLoop, freeing any resources used.
@@ -264,6 +280,8 @@ def start(self):
if self._stopped:
self._stopped = False
return
+ old_current = getattr(IOLoop._current, "instance", None)
+ IOLoop._current.instance = self
self._thread_ident = thread.get_ident()
self._running = True
while True:
@@ -346,6 +364,7 @@ def start(self):
self._stopped = False
if self._blocking_signal_threshold is not None:
signal.setitimer(signal.ITIMER_REAL, 0, 0)
+ IOLoop._current.instance = old_current
def stop(self):
"""Stop the loop after the current event loop iteration is complete.
@@ -161,8 +161,7 @@ def test_future_error(self):
def test_generator(self):
@gen.engine
def f():
- result = yield gen.YieldFuture(self.client.capitalize("hello"),
- io_loop=self.io_loop)
+ result = yield self.client.capitalize("hello")
self.assertEqual(result, "HELLO")
self.stop()
f()
@@ -172,8 +171,7 @@ def test_generator_error(self):
@gen.engine
def f():
with self.assertRaisesRegexp(CapError, "already capitalized"):
- yield gen.YieldFuture(self.client.capitalize("HELLO"),
- io_loop=self.io_loop)
+ yield self.client.capitalize("HELLO")
self.stop()
f()
self.wait()
View
@@ -124,8 +124,10 @@ def __init__(self, *args, **kwargs):
def setUp(self):
super(AsyncTestCase, self).setUp()
self.io_loop = self.get_new_ioloop()
+ self.io_loop.make_current()
def tearDown(self):
+ self.io_loop.clear_current()
if (not IOLoop.initialized() or
self.io_loop is not IOLoop.instance()):
# Try to clean up any file descriptors left open in the ioloop.

0 comments on commit 0cc0df7

Please sign in to comment.