Permalink
Browse files

use transaction module from pypy-stm to run some parts of ioloop in p…

…arallel
  • Loading branch information...
lopuhin committed Sep 1, 2013
1 parent ff9f9c3 commit 246c5e71ce8792b20c56049cf2e3eff192a01b20
Showing with 25 additions and 11 deletions.
  1. +25 −11 tornado/ioloop.py
View
@@ -40,6 +40,7 @@
import threading
import time
import traceback
import transaction
from tornado.concurrent import Future, TracebackFuture
from tornado.log import app_log, gen_log
@@ -606,8 +607,12 @@ def start(self):
with self._callback_lock:
callbacks = self._callbacks
self._callbacks = []
n_callbacks = 0
for callback in callbacks:
self._run_callback(callback)
n_callbacks += 1
transaction.add(self._run_callback, callback)
transaction.run()
logging.info('run %d callbacks in parallel', n_callbacks)
# Closures may be holding on to a lot of memory, so allow
# them to be freed before we go into our poll wait.
callbacks = callback = None
@@ -673,18 +678,15 @@ def start(self):
# other file descriptors, there may be reentrant calls to
# this IOLoop that update self._events
self._events.update(event_pairs)
n_handlers = 0
while self._events:
n_handlers += 1
fd, events = self._events.popitem()
try:
self._handlers[fd](fd, events)
except (OSError, IOError) as e:
if e.args[0] == errno.EPIPE:
# Happens when the client closes the connection
pass
else:
self.handle_callback_exception(self._handlers.get(fd))
except Exception:
self.handle_callback_exception(self._handlers.get(fd))
handler = self._handlers[fd]
transaction.add(self._handle_event, fd, handler, events)
transaction.run()
logging.info('run %d handlers in parallel', n_handlers)
# reset the stopped flag so another start/stop pair can be issued
self._stopped = False
if self._blocking_signal_threshold is not None:
@@ -693,6 +695,18 @@ def start(self):
if old_wakeup_fd is not None:
signal.set_wakeup_fd(old_wakeup_fd)
def _handle_event(self, fd, handler, events):
try:
handler(fd, events)
except (OSError, IOError) as e:
if e.args[0] == errno.EPIPE:
# Happens when the client closes the connection
pass
else:
self.handle_callback_exception(self._handlers.get(fd))
except Exception:
self.handle_callback_exception(self._handlers.get(fd))
def stop(self):
self._running = False
self._stopped = True

0 comments on commit 246c5e7

Please sign in to comment.