Permalink
Browse files

Update stack_context and zmqstream with Tornado changes (3ce6d3daeeed…

…a0fc25206ef99048467307aa0cbe)
  • Loading branch information...
1 parent e2a2e52 commit 8006e25ef181d1dfd9dbd5dd37232842910d148b @spez committed Oct 13, 2010
Showing with 17 additions and 6 deletions.
  1. +2 −0 zmq/eventloop/stack_context.py
  2. +15 −6 zmq/eventloop/zmqstream.py
@@ -101,6 +101,8 @@ def wrap(fn):
different execution context (either in a different thread or
asynchronously in the same thread).
'''
+ if fn is None:
+ return None
# functools.wraps doesn't appear to work on functools.partial objects
#@functools.wraps(fn)
def wrapped(callback, contexts, *args, **kwargs):
View
@@ -21,6 +21,8 @@
from zmq.core.socket import jsonapi, pickle
import ioloop
+import stack_context
+
try:
from queue import Queue
except ImportError:
@@ -78,7 +80,9 @@ def __init__(self, socket, io_loop=None):
self._recv_copy = False
self._state = zmq.POLLERR
- self.io_loop.add_handler(self.socket, self._handle_events, self._state)
+ with stack_context.NullContext():
+ self.io_loop.add_handler(
+ self.socket, self._handle_events, self._state)
# shortcircuit some socket methods
self.bind = self.socket.bind
@@ -125,7 +129,7 @@ def on_recv(self, callback, copy=True):
"""
assert callback is None or callable(callback)
- self._recv_callback = callback
+ self._recv_callback = stack_context.wrap(callback)
self._recv_copy = copy
if callback is None:
self._drop_io_state(zmq.POLLIN)
@@ -158,7 +162,7 @@ def on_send(self, callback):
if callback is None, send callbacks are disabled.
"""
- self._send_callback = callback
+ self._send_callback = stack_context.wrap(callback)
if callback is None:
self._drop_io_state(zmq.POLLOUT)
else:
@@ -175,7 +179,7 @@ def on_err(self, callback):
callback will be passed no arguments.
"""
# self._add_io_state(zmq.POLLOUT)
- self._errback = callback
+ self._errback = stack_context.wrap(callback)
def send(self, msg, flags=0, copy=False, callback=None):
@@ -225,7 +229,7 @@ def send_pyobj(self, obj, flags=0, protocol=-1, callback=None):
def set_close_callback(self, callback):
"""Call the given callback when the stream is closed."""
- self._close_callback = callback
+ self._close_callback = stack_context.wrap(callback)
def close(self):
"""Close this stream."""
@@ -252,8 +256,13 @@ def _run_callback(self, callback, *args, **kwargs):
"""Wrap running callbacks in try/except to allow us to
close our socket."""
try:
- callback(*args, **kwargs)
+ # Use a NullContext to ensure that all StackContexts are run
+ # inside our blanket exception handler rather than outside.
+ with stack_context.NullContext():
+ callback(*args, **kwargs)
except:
+ logging.error("Uncaught exception, closing connection.",
+ exc_info=True)
# Close the socket on an uncaught exception from a user callback
# (It would eventually get closed when the socket object is
# gc'd, but we don't want to rely on gc happening before we

0 comments on commit 8006e25

Please sign in to comment.