Skip to content

Commit

Permalink
re-add stm transactions to use multiple cores on PyPy STM
Browse files Browse the repository at this point in the history
  • Loading branch information
lopuhin committed Nov 1, 2014
1 parent 06638c8 commit 04cd740
Showing 1 changed file with 36 additions and 13 deletions.
49 changes: 36 additions & 13 deletions tornado/ioloop.py
Expand Up @@ -57,6 +57,21 @@
except ImportError:
import _thread as thread # py3

try:
import transaction as stm_transaction
except ImportError:
class FakeSTMTransaction(object):
def __init__(self):
self._callbacks = []
def add(self, fn, *args, **kwargs):
self._callbacks.append((fn, args, kwargs))
def run(self):
for fn, args, kwargs in self._callbacks:
fn(*args, **kwargs)
self._callbacks = []
stm_transaction = FakeSTMTransaction()


from tornado.platform.auto import set_close_exec, Waker


Expand Down Expand Up @@ -778,10 +793,13 @@ def start(self):
heapq.heapify(self._timeouts)

for callback in callbacks:
self._run_callback(callback)
stm_transaction.add(self._run_callback, callback)
stm_transaction.run()
for timeout in due_timeouts:
if timeout.callback is not None:
self._run_callback(timeout.callback)
stm_transaction.add(
self._run_callback, timeout.callback)
stm_transaction.run()
# Closures may be holding on to a lot of memory, so allow
# them to be freed before we go into our poll wait.
callbacks = callback = due_timeouts = timeout = None
Expand Down Expand Up @@ -832,17 +850,10 @@ def start(self):
self._events.update(event_pairs)
while self._events:
fd, events = self._events.popitem()
try:
fd_obj, handler_func = self._handlers[fd]
handler_func(fd_obj, events)
except (OSError, IOError) as e:
if errno_from_exception(e) == errno.EPIPE:
# Happens when the client closes the connection
pass
else:
self.handle_callback_exception(self._handlers.get(fd))
except Exception:
self.handle_callback_exception(self._handlers.get(fd))
fd_obj, handler_func = self._handlers[fd]
stm_transaction.add(
self._handle_event, handler_func, fd, fd_obj, events)
stm_transaction.run()
fd_obj = handler_func = None

finally:
Expand All @@ -854,6 +865,18 @@ def start(self):
if old_wakeup_fd is not None:
signal.set_wakeup_fd(old_wakeup_fd)

def _handle_event(self, handler_func, fd, fd_obj, events):
try:
handler_func(fd_obj, events)
except (OSError, IOError) as e:
if errno_from_exception(e) == errno.EPIPE:
# Happens when the client closes the connection
pass
else:
self.handle_callback_exception(self._handlers.get(fd))
except Exception:
self.handle_callback_exception(self._handlers.get(fd))

def stop(self):
self._running = False
self._stopped = True
Expand Down

0 comments on commit 04cd740

Please sign in to comment.