Browse files

Split IOLoop into a base class and poll-based implementation.

The base class has all the methods with docstrings; nearly all the
implementation is in a new subclass PollIOLoop.
  • Loading branch information...
1 parent 8b32830 commit 1a66183f19ccc61d344771c46b591c239513a254 @bdarnell bdarnell committed Oct 7, 2012
Showing with 231 additions and 175 deletions.
  1. +224 −168 tornado/ioloop.py
  2. +3 −3 tornado/platform/epoll.py
  3. +2 −2 tornado/platform/kqueue.py
  4. +2 −2 tornado/platform/select.py
View
392 tornado/ioloop.py
@@ -98,26 +98,6 @@ def connection_ready(sock, fd, events):
io_loop.start()
"""
- @classmethod
- def configurable_base(cls):
- return IOLoop
-
- @classmethod
- def configurable_default(cls):
- if hasattr(select, "epoll") or sys.platform.startswith('linux'):
- try:
- from tornado.platform.epoll import EPollIOLoop
- return EPollIOLoop
- except ImportError:
- gen_log.warning("unable to import EPollIOLoop, falling back to SelectIOLoop")
- pass
- if hasattr(select, "kqueue"):
- # Python 2.6+ on BSD or Mac
- from tornado.platform.kqueue import KQueueIOLoop
- return KQueueIOLoop
- from tornado.platform.select import SelectIOLoop
- return SelectIOLoop
-
# Constants from the epoll module
_EPOLLIN = 0x001
_EPOLLPRI = 0x002
@@ -139,28 +119,6 @@ def configurable_default(cls):
_current = threading.local()
- def initialize(self, impl, time_func=None):
- self._impl = impl
- if hasattr(self._impl, 'fileno'):
- set_close_exec(self._impl.fileno())
- self.time_func = time_func or time.time
- self._handlers = {}
- self._events = {}
- self._callbacks = []
- self._callback_lock = threading.Lock()
- self._timeouts = []
- self._running = False
- self._stopped = False
- self._thread_ident = None
- self._blocking_signal_threshold = None
-
- # Create a pipe that we send bogus data to when we want to wake
- # the I/O loop when it is idle
- self._waker = Waker()
- self.add_handler(self._waker.fileno(),
- lambda fd, events: self._waker.consume(),
- self.READ)
-
@staticmethod
def instance():
"""Returns a global IOLoop instance.
@@ -213,6 +171,29 @@ def clear_current(self):
assert IOLoop._current.instance is self
IOLoop._current.instance = None
+ @classmethod
+ def configurable_base(cls):
+ return IOLoop
+
+ @classmethod
+ def configurable_default(cls):
+ if hasattr(select, "epoll") or sys.platform.startswith('linux'):
+ try:
+ from tornado.platform.epoll import EPollIOLoop
+ return EPollIOLoop
+ except ImportError:
+ gen_log.warning("unable to import EPollIOLoop, falling back to SelectIOLoop")
+ pass
+ if hasattr(select, "kqueue"):
+ # Python 2.6+ on BSD or Mac
+ from tornado.platform.kqueue import KQueueIOLoop
+ return KQueueIOLoop
+ from tornado.platform.select import SelectIOLoop
+ return SelectIOLoop
+
+ def initialize(self):
+ pass
+
def close(self, all_fds=False):
"""Closes the IOLoop, freeing any resources used.
@@ -232,33 +213,19 @@ def close(self, all_fds=False):
Therefore the call to `close` will usually appear just after
the call to `start` rather than near the call to `stop`.
"""
- self.remove_handler(self._waker.fileno())
- if all_fds:
- for fd in self._handlers.keys()[:]:
- try:
- os.close(fd)
- except Exception:
- gen_log.debug("error closing fd %s", fd, exc_info=True)
- self._waker.close()
- self._impl.close()
+ raise NotImplementedError()
def add_handler(self, fd, handler, events):
"""Registers the given handler to receive the given events for fd."""
- self._handlers[fd] = stack_context.wrap(handler)
- self._impl.register(fd, events | self.ERROR)
+ raise NotImplementedError()
def update_handler(self, fd, events):
"""Changes the events we listen for fd."""
- self._impl.modify(fd, events | self.ERROR)
+ raise NotImplementedError()
def remove_handler(self, fd):
"""Stop listening for events on fd."""
- self._handlers.pop(fd, None)
- self._events.pop(fd, None)
- try:
- self._impl.unregister(fd)
- except (OSError, IOError):
- gen_log.debug("Error deleting fd from IOLoop", exc_info=True)
+ raise NotImplementedError()
def set_blocking_signal_threshold(self, seconds, action):
"""Sends a signal if the ioloop is blocked for more than s seconds.
@@ -271,14 +238,7 @@ def set_blocking_signal_threshold(self, seconds, action):
If action is None, the process will be killed if it is blocked for
too long.
"""
- if not hasattr(signal, "setitimer"):
- gen_log.error("set_blocking_signal_threshold requires a signal module "
- "with the setitimer method")
- return
- self._blocking_signal_threshold = seconds
- if seconds is not None:
- signal.signal(signal.SIGALRM,
- action if action is not None else signal.SIG_DFL)
+ raise NotImplementedError()
def set_blocking_log_threshold(self, seconds):
"""Logs a stack trace if the ioloop is blocked for more than s seconds.
@@ -301,6 +261,202 @@ def start(self):
The loop will run until one of the I/O handlers calls stop(), which
will make the loop stop after the current event iteration completes.
"""
+ raise NotImplementedError()
+
+ def stop(self):
+ """Stop the loop after the current event loop iteration is complete.
+ If the event loop is not currently running, the next call to start()
+ will return immediately.
+
+ To use asynchronous methods from otherwise-synchronous code (such as
+ unit tests), you can start and stop the event loop like this::
+
+ ioloop = IOLoop()
+ async_method(ioloop=ioloop, callback=ioloop.stop)
+ ioloop.start()
+
+ ioloop.start() will return after async_method has run its callback,
+ whether that callback was invoked before or after ioloop.start.
+
+ Note that even after `stop` has been called, the IOLoop is not
+ completely stopped until `IOLoop.start` has also returned.
+ """
+ raise NotImplementedError()
+
+ def running(self):
+ """Returns true if this IOLoop is currently running."""
+ raise NotImplementedError()
+
+ def time(self):
+ """Returns the current time according to the IOLoop's clock.
+
+ The return value is a floating-point number relative to an
+ unspecified time in the past.
+
+ By default, the IOLoop's time function is `time.time`. However,
+ it may be configured to use e.g. `time.monotonic` instead.
+ Calls to `add_timeout` that pass a number instead of a
+ `datetime.timedelta` should use this function to compute the
+ appropriate time, so they can work no matter what time function
+ is chosen.
+ """
+ return time.time()
+
+ def add_timeout(self, deadline, callback):
+ """Calls the given callback at the time deadline from the I/O loop.
+
+ Returns a handle that may be passed to remove_timeout to cancel.
+
+ ``deadline`` may be a number denoting a time relative to
+ `IOLoop.time`, or a ``datetime.timedelta`` object for a
+ deadline relative to the current time.
+
+ Note that it is not safe to call `add_timeout` from other threads.
+ Instead, you must use `add_callback` to transfer control to the
+ IOLoop's thread, and then call `add_timeout` from there.
+ """
+ raise NotImplementedError()
+
+ def remove_timeout(self, timeout):
+ """Cancels a pending timeout.
+
+ The argument is a handle as returned by add_timeout.
+ """
+ raise NotImplementedError()
+
+ def add_callback(self, callback):
+ """Calls the given callback on the next I/O loop iteration.
+
+ It is safe to call this method from any thread at any time,
+ except from a signal handler. Note that this is the *only*
+ method in IOLoop that makes this thread-safety guarantee; all
+ other interaction with the IOLoop must be done from that
+ IOLoop's thread. add_callback() may be used to transfer
+ control from other threads to the IOLoop's thread.
+
+ To add a callback from a signal handler, see
+ `add_callback_from_signal`.
+ """
+ raise NotImplementedError()
+
+ def add_callback_from_signal(self, callback):
+ """Calls the given callback on the next I/O loop iteration.
+
+ Safe for use from a Python signal handler; should not be used
+ otherwise.
+
+ Callbacks added with this method will be run without any
+ stack_context, to avoid picking up the context of the function
+ that was interrupted by the signal.
+ """
+ raise NotImplementedError()
+
+ if futures is not None:
+ _FUTURE_TYPES = (futures.Future, DummyFuture)
+ else:
+ _FUTURE_TYPES = DummyFuture
+ def add_future(self, future, callback):
+ """Schedules a callback on the IOLoop when the given future is finished.
+
+ The callback is invoked with one argument, the future.
+ """
+ assert isinstance(future, IOLoop._FUTURE_TYPES)
+ callback = stack_context.wrap(callback)
+ future.add_done_callback(
+ lambda future: self.add_callback(
+ functools.partial(callback, future)))
+
+ def _run_callback(self, callback):
+ """Runs a callback with error handling.
+
+ For use in subclasses.
+ """
+ try:
+ callback()
+ except Exception:
+ self.handle_callback_exception(callback)
+
+ def handle_callback_exception(self, callback):
+ """This method is called whenever a callback run by the IOLoop
+ throws an exception.
+
+ By default simply logs the exception as an error. Subclasses
+ may override this method to customize reporting of exceptions.
+
+ The exception itself is not passed explicitly, but is available
+ in sys.exc_info.
+ """
+ app_log.error("Exception in callback %r", callback, exc_info=True)
+
+
+
+class PollIOLoop(IOLoop):
+ """Base class for IOLoops built around a select-like function.
+
+ For concrete implementations, see `tornado.platform.epoll.EPollIOLoop`
+ (Linux), `tornado.platform.kqueue.KQueueIOLoop` (BSD and Mac), or
+ `tornado.platform.select.SelectIOLoop` (all platforms).
+ """
+ def initialize(self, impl, time_func=None):
+ super(PollIOLoop, self).initialize()
+ self._impl = impl
+ if hasattr(self._impl, 'fileno'):
+ set_close_exec(self._impl.fileno())
+ self.time_func = time_func or time.time
+ self._handlers = {}
+ self._events = {}
+ self._callbacks = []
+ self._callback_lock = threading.Lock()
+ self._timeouts = []
+ self._running = False
+ self._stopped = False
+ self._thread_ident = None
+ self._blocking_signal_threshold = None
+
+ # Create a pipe that we send bogus data to when we want to wake
+ # the I/O loop when it is idle
+ self._waker = Waker()
+ self.add_handler(self._waker.fileno(),
+ lambda fd, events: self._waker.consume(),
+ self.READ)
+
+ def close(self, all_fds=False):
+ self.remove_handler(self._waker.fileno())
+ if all_fds:
+ for fd in self._handlers.keys()[:]:
+ try:
+ os.close(fd)
+ except Exception:
+ gen_log.debug("error closing fd %s", fd, exc_info=True)
+ self._waker.close()
+ self._impl.close()
+
+ def add_handler(self, fd, handler, events):
+ self._handlers[fd] = stack_context.wrap(handler)
+ self._impl.register(fd, events | self.ERROR)
+
+ def update_handler(self, fd, events):
+ self._impl.modify(fd, events | self.ERROR)
+
+ def remove_handler(self, fd):
+ self._handlers.pop(fd, None)
+ self._events.pop(fd, None)
+ try:
+ self._impl.unregister(fd)
+ except (OSError, IOError):
+ gen_log.debug("Error deleting fd from IOLoop", exc_info=True)
+
+ def set_blocking_signal_threshold(self, seconds, action):
+ if not hasattr(signal, "setitimer"):
+ gen_log.error("set_blocking_signal_threshold requires a signal module "
+ "with the setitimer method")
+ return
+ self._blocking_signal_threshold = seconds
+ if seconds is not None:
+ signal.signal(signal.SIGALRM,
+ action if action is not None else signal.SIG_DFL)
+
+ def start(self):
if not logging.getLogger().handlers:
# The IOLoop catches and logs exceptions, so it's
# important that log output be visible. However, python's
@@ -434,68 +590,22 @@ def start(self):
signal.set_wakeup_fd(old_wakeup_fd)
def stop(self):
- """Stop the loop after the current event loop iteration is complete.
- If the event loop is not currently running, the next call to start()
- will return immediately.
-
- To use asynchronous methods from otherwise-synchronous code (such as
- unit tests), you can start and stop the event loop like this::
-
- ioloop = IOLoop()
- async_method(ioloop=ioloop, callback=ioloop.stop)
- ioloop.start()
-
- ioloop.start() will return after async_method has run its callback,
- whether that callback was invoked before or after ioloop.start.
-
- Note that even after `stop` has been called, the IOLoop is not
- completely stopped until `IOLoop.start` has also returned.
- """
self._running = False
self._stopped = True
self._waker.wake()
def running(self):
- """Returns true if this IOLoop is currently running."""
return self._running
def time(self):
- """Returns the current time according to the IOLoop's clock.
-
- The return value is a floating-point number relative to an
- unspecified time in the past.
-
- By default, the IOLoop's time function is `time.time`. However,
- it may be configured to use e.g. `time.monotonic` instead.
- Calls to `add_timeout` that pass a number instead of a
- `datetime.timedelta` should use this function to compute the
- appropriate time, so they can work no matter what time function
- is chosen.
- """
return self.time_func()
def add_timeout(self, deadline, callback):
- """Calls the given callback at the time deadline from the I/O loop.
-
- Returns a handle that may be passed to remove_timeout to cancel.
-
- ``deadline`` may be a number denoting a time relative to
- `IOLoop.time`, or a ``datetime.timedelta`` object for a
- deadline relative to the current time.
-
- Note that it is not safe to call `add_timeout` from other threads.
- Instead, you must use `add_callback` to transfer control to the
- IOLoop's thread, and then call `add_timeout` from there.
- """
timeout = _Timeout(deadline, stack_context.wrap(callback), self)
heapq.heappush(self._timeouts, timeout)
return timeout
def remove_timeout(self, timeout):
- """Cancels a pending timeout.
-
- The argument is a handle as returned by add_timeout.
- """
# Removing from a heap is complicated, so just leave the defunct
# timeout object in the queue (see discussion in
# http://docs.python.org/library/heapq.html).
@@ -504,18 +614,6 @@ def remove_timeout(self, timeout):
timeout.callback = None
def add_callback(self, callback):
- """Calls the given callback on the next I/O loop iteration.
-
- It is safe to call this method from any thread at any time,
- except from a signal handler. Note that this is the *only*
- method in IOLoop that makes this thread-safety guarantee; all
- other interaction with the IOLoop must be done from that
- IOLoop's thread. add_callback() may be used to transfer
- control from other threads to the IOLoop's thread.
-
- To add a callback from a signal handler, see
- `add_callback_from_signal`.
- """
with self._callback_lock:
list_empty = not self._callbacks
self._callbacks.append(stack_context.wrap(callback))
@@ -529,15 +627,6 @@ def add_callback(self, callback):
self._waker.wake()
def add_callback_from_signal(self, callback):
- """Calls the given callback on the next I/O loop iteration.
-
- Safe for use from a Python signal handler; should not be used
- otherwise.
-
- Callbacks added with this method will be run without any
- stack_context, to avoid picking up the context of the function
- that was interrupted by the signal.
- """
with stack_context.NullContext():
if thread.get_ident() != self._thread_ident:
# if the signal is handled on another thread, we can add
@@ -554,39 +643,6 @@ def add_callback_from_signal(self, callback):
# but either way will work.
self._callbacks.append(stack_context.wrap(callback))
- if futures is not None:
- _FUTURE_TYPES = (futures.Future, DummyFuture)
- else:
- _FUTURE_TYPES = DummyFuture
- def add_future(self, future, callback):
- """Schedules a callback on the IOLoop when the given future is finished.
-
- The callback is invoked with one argument, the future.
- """
- assert isinstance(future, IOLoop._FUTURE_TYPES)
- callback = stack_context.wrap(callback)
- future.add_done_callback(
- lambda future: self.add_callback(
- functools.partial(callback, future)))
-
- def _run_callback(self, callback):
- try:
- callback()
- except Exception:
- self.handle_callback_exception(callback)
-
- def handle_callback_exception(self, callback):
- """This method is called whenever a callback run by the IOLoop
- throws an exception.
-
- By default simply logs the exception as an error. Subclasses
- may override this method to customize reporting of exceptions.
-
- The exception itself is not passed explicitly, but is available
- in sys.exc_info.
- """
- app_log.error("Exception in callback %r", callback, exc_info=True)
-
class _Timeout(object):
"""An IOLoop timeout, a UNIX timestamp and a callback"""
View
6 tornado/platform/epoll.py
@@ -23,11 +23,11 @@
import os
import select
-from tornado.ioloop import IOLoop
+from tornado.ioloop import PollIOLoop
if hasattr(select, 'epoll'):
# Python 2.6+
- class EPollIOLoop(IOLoop):
+ class EPollIOLoop(PollIOLoop):
def initialize(self, **kwargs):
super(EPollIOLoop, self).initialize(impl=select.epoll(), **kwargs)
else:
@@ -62,7 +62,7 @@ def poll(self, timeout):
return epoll.epoll_wait(self._epoll_fd, int(timeout * 1000))
- class EPollIOLoop(IOLoop):
+ class EPollIOLoop(PollIOLoop):
def initialize(self, **kwargs):
super(EPollIOLoop, self).initialize(impl=_EPoll(), **kwargs)
View
4 tornado/platform/kqueue.py
@@ -18,7 +18,7 @@
import select
-from tornado.ioloop import IOLoop
+from tornado.ioloop import IOLoop, PollIOLoop
assert hasattr(select, 'kqueue'), 'kqueue not supported'
@@ -86,6 +86,6 @@ def poll(self, timeout):
return events.items()
-class KQueueIOLoop(IOLoop):
+class KQueueIOLoop(PollIOLoop):
def initialize(self, **kwargs):
super(KQueueIOLoop, self).initialize(impl=_KQueue(), **kwargs)
View
4 tornado/platform/select.py
@@ -21,7 +21,7 @@
import select
-from tornado.ioloop import IOLoop
+from tornado.ioloop import IOLoop, PollIOLoop
class _Select(object):
"""A simple, select()-based IOLoop implementation for non-Linux systems"""
@@ -69,7 +69,7 @@ def poll(self, timeout):
events[fd] = events.get(fd, 0) | IOLoop.ERROR
return events.items()
-class SelectIOLoop(IOLoop):
+class SelectIOLoop(PollIOLoop):
def initialize(self, **kwargs):
super(SelectIOLoop, self).initialize(impl=_Select(), **kwargs)

0 comments on commit 1a66183

Please sign in to comment.