Skip to content
Browse files

Merge pull request #1030 from minrk/pyzmq

Use pyzmq tools when available instead of duplicating functionality.

ZMQStream is the right object to use for event-driven handling of messages, but instead we had a duplication of half of it in KernelManager.

Also use the pyzmq install() function for using the pyzmq eventloop with tornado, instead of copying its contents into notebookapp.
  • Loading branch information...
2 parents 59c4f31 + aaf8939 commit aa2337bb8b1988c7e89b619f36908d1a6bd7a10b @fperez fperez committed Nov 22, 2011
Showing with 39 additions and 133 deletions.
  1. +7 −2 IPython/frontend/html/notebook/notebookapp.py
  2. +32 −131 IPython/zmq/kernelmanager.py
View
9 IPython/frontend/html/notebook/notebookapp.py
@@ -32,8 +32,13 @@
# Install the pyzmq ioloop. This has to be done before anything else from
# tornado is imported.
from zmq.eventloop import ioloop
-import tornado.ioloop
-tornado.ioloop.IOLoop = ioloop.IOLoop
+# FIXME: ioloop.install is new in pyzmq-2.1.7, so remove this conditional
+# when pyzmq dependency is updated beyond that.
+if hasattr(ioloop, 'install'):
+ ioloop.install()
+else:
+ import tornado.ioloop
+ tornado.ioloop.IOLoop = ioloop.IOLoop
from tornado import httpserver
from tornado import web
View
163 IPython/zmq/kernelmanager.py
@@ -18,7 +18,6 @@
# Standard library imports.
import errno
import json
-from Queue import Queue, Empty
from subprocess import Popen
import os
import signal
@@ -28,8 +27,7 @@
# System library imports.
import zmq
-from zmq import POLLIN, POLLOUT, POLLERR
-from zmq.eventloop import ioloop
+from zmq.eventloop import ioloop, zmqstream
# Local imports.
from IPython.config.loader import Config
@@ -88,7 +86,7 @@ class ZMQSocketChannel(Thread):
session = None
socket = None
ioloop = None
- iostate = None
+ stream = None
_address = None
def __init__(self, context, session, address):
@@ -144,37 +142,28 @@ def address(self):
"""
return self._address
- def add_io_state(self, state):
- """Add IO state to the eventloop.
-
+ def _queue_send(self, msg):
+ """Queue a message to be sent from the IOLoop's thread.
+
Parameters
----------
- state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
- The IO state flag to set.
-
- This is thread safe as it uses the thread safe IOLoop.add_callback.
+ msg : message to send
+
+ This is threadsafe, as it uses IOLoop.add_callback to give the loop's
+ thread control of the action.
"""
- def add_io_state_callback():
- if not self.iostate & state:
- self.iostate = self.iostate | state
- self.ioloop.update_handler(self.socket, self.iostate)
- self.ioloop.add_callback(add_io_state_callback)
-
- def drop_io_state(self, state):
- """Drop IO state from the eventloop.
-
- Parameters
- ----------
- state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
- The IO state flag to set.
+ def thread_send():
+ self.session.send(self.stream, msg)
+ self.ioloop.add_callback(thread_send)
- This is thread safe as it uses the thread safe IOLoop.add_callback.
+ def _handle_recv(self, msg):
+ """callback for stream.on_recv
+
+ unpacks message, and calls handlers with it.
"""
- def drop_io_state_callback():
- if self.iostate & state:
- self.iostate = self.iostate & (~state)
- self.ioloop.update_handler(self.socket, self.iostate)
- self.ioloop.add_callback(drop_io_state_callback)
+ ident,smsg = self.session.feed_identities(msg)
+ self.call_handlers(self.session.unserialize(smsg))
+
class ShellSocketChannel(ZMQSocketChannel):
@@ -187,17 +176,15 @@ class ShellSocketChannel(ZMQSocketChannel):
def __init__(self, context, session, address):
super(ShellSocketChannel, self).__init__(context, session, address)
- self.command_queue = Queue()
self.ioloop = ioloop.IOLoop()
def run(self):
"""The thread's main activity. Call start() instead."""
self.socket = self.context.socket(zmq.DEALER)
self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
self.socket.connect('tcp://%s:%i' % self.address)
- self.iostate = POLLERR|POLLIN
- self.ioloop.add_handler(self.socket, self._handle_events,
- self.iostate)
+ self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
+ self.stream.on_recv(self._handle_recv)
self._run_loop()
def stop(self):
@@ -268,7 +255,7 @@ def execute(self, code, silent=False,
allow_stdin=allow_stdin,
)
msg = self.session.msg('execute_request', content)
- self._queue_request(msg)
+ self._queue_send(msg)
return msg['header']['msg_id']
def complete(self, text, line, cursor_pos, block=None):
@@ -293,7 +280,7 @@ def complete(self, text, line, cursor_pos, block=None):
"""
content = dict(text=text, line=line, block=block, cursor_pos=cursor_pos)
msg = self.session.msg('complete_request', content)
- self._queue_request(msg)
+ self._queue_send(msg)
return msg['header']['msg_id']
def object_info(self, oname):
@@ -310,7 +297,7 @@ def object_info(self, oname):
"""
content = dict(oname=oname)
msg = self.session.msg('object_info_request', content)
- self._queue_request(msg)
+ self._queue_send(msg)
return msg['header']['msg_id']
def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
@@ -348,7 +335,7 @@ def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
content = dict(raw=raw, output=output, hist_access_type=hist_access_type,
**kwargs)
msg = self.session.msg('history_request', content)
- self._queue_request(msg)
+ self._queue_send(msg)
return msg['header']['msg_id']
def shutdown(self, restart=False):
@@ -365,38 +352,9 @@ def shutdown(self, restart=False):
# Send quit message to kernel. Once we implement kernel-side setattr,
# this should probably be done that way, but for now this will do.
msg = self.session.msg('shutdown_request', {'restart':restart})
- self._queue_request(msg)
+ self._queue_send(msg)
return msg['header']['msg_id']
- def _handle_events(self, socket, events):
- if events & POLLERR:
- self._handle_err()
- if events & POLLOUT:
- self._handle_send()
- if events & POLLIN:
- self._handle_recv()
-
- def _handle_recv(self):
- ident,msg = self.session.recv(self.socket, 0)
- self.call_handlers(msg)
-
- def _handle_send(self):
- try:
- msg = self.command_queue.get(False)
- except Empty:
- pass
- else:
- self.session.send(self.socket,msg)
- if self.command_queue.empty():
- self.drop_io_state(POLLOUT)
-
- def _handle_err(self):
- # We don't want to let this go silently, so eventually we should log.
- raise zmq.ZMQError()
-
- def _queue_request(self, msg):
- self.command_queue.put(msg)
- self.add_io_state(POLLOUT)
class SubSocketChannel(ZMQSocketChannel):
@@ -413,9 +371,8 @@ def run(self):
self.socket.setsockopt(zmq.SUBSCRIBE,b'')
self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
self.socket.connect('tcp://%s:%i' % self.address)
- self.iostate = POLLIN|POLLERR
- self.ioloop.add_handler(self.socket, self._handle_events,
- self.iostate)
+ self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
+ self.stream.on_recv(self._handle_recv)
self._run_loop()
def stop(self):
@@ -456,33 +413,9 @@ def flush(self, timeout=1.0):
while not self._flushed and time.time() < stop_time:
time.sleep(0.01)
- def _handle_events(self, socket, events):
- # Turn on and off POLLOUT depending on if we have made a request
- if events & POLLERR:
- self._handle_err()
- if events & POLLIN:
- self._handle_recv()
-
- def _handle_err(self):
- # We don't want to let this go silently, so eventually we should log.
- raise zmq.ZMQError()
-
- def _handle_recv(self):
- # Get all of the messages we can
- while True:
- try:
- ident,msg = self.session.recv(self.socket)
- except zmq.ZMQError:
- # Check the errno?
- # Will this trigger POLLERR?
- break
- else:
- if msg is None:
- break
- self.call_handlers(msg)
-
def _flush(self):
"""Callback for :method:`self.flush`."""
+ self.stream.flush()
self._flushed = True
@@ -494,16 +427,14 @@ class StdInSocketChannel(ZMQSocketChannel):
def __init__(self, context, session, address):
super(StdInSocketChannel, self).__init__(context, session, address)
self.ioloop = ioloop.IOLoop()
- self.msg_queue = Queue()
def run(self):
"""The thread's main activity. Call start() instead."""
self.socket = self.context.socket(zmq.DEALER)
self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
self.socket.connect('tcp://%s:%i' % self.address)
- self.iostate = POLLERR|POLLIN
- self.ioloop.add_handler(self.socket, self._handle_events,
- self.iostate)
+ self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
+ self.stream.on_recv(self._handle_recv)
self._run_loop()
def stop(self):
@@ -524,37 +455,7 @@ def input(self, string):
"""Send a string of raw input to the kernel."""
content = dict(value=string)
msg = self.session.msg('input_reply', content)
- self._queue_reply(msg)
-
- def _handle_events(self, socket, events):
- if events & POLLERR:
- self._handle_err()
- if events & POLLOUT:
- self._handle_send()
- if events & POLLIN:
- self._handle_recv()
-
- def _handle_recv(self):
- ident,msg = self.session.recv(self.socket, 0)
- self.call_handlers(msg)
-
- def _handle_send(self):
- try:
- msg = self.msg_queue.get(False)
- except Empty:
- pass
- else:
- self.session.send(self.socket,msg)
- if self.msg_queue.empty():
- self.drop_io_state(POLLOUT)
-
- def _handle_err(self):
- # We don't want to let this go silently, so eventually we should log.
- raise zmq.ZMQError()
-
- def _queue_reply(self, msg):
- self.msg_queue.put(msg)
- self.add_io_state(POLLOUT)
+ self._queue_send(msg)
class HBSocketChannel(ZMQSocketChannel):

0 comments on commit aa2337b

Please sign in to comment.
Something went wrong with that request. Please try again.