Skip to content

Commit

Permalink
Merge pull request #1030 from minrk/pyzmq
Browse files Browse the repository at this point in the history
Use pyzmq tools when available instead of duplicating functionality.

ZMQStream is the right object to use for event-driven handling of messages, but instead we had a duplication of half of it in KernelManager.

Also use the pyzmq install() function for using the pyzmq eventloop with tornado, instead of copying its contents into notebookapp.
  • Loading branch information
fperez committed Nov 23, 2011
2 parents 59c4f31 + aaf8939 commit aa2337b
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 133 deletions.
9 changes: 7 additions & 2 deletions IPython/frontend/html/notebook/notebookapp.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,13 @@
# Install the pyzmq ioloop. This has to be done before anything else from
# tornado is imported.
from zmq.eventloop import ioloop
import tornado.ioloop
tornado.ioloop.IOLoop = ioloop.IOLoop
# FIXME: ioloop.install is new in pyzmq-2.1.7, so remove this conditional
# when pyzmq dependency is updated beyond that.
if hasattr(ioloop, 'install'):
ioloop.install()
else:
import tornado.ioloop
tornado.ioloop.IOLoop = ioloop.IOLoop

from tornado import httpserver
from tornado import web
Expand Down
163 changes: 32 additions & 131 deletions IPython/zmq/kernelmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
# Standard library imports.
import errno
import json
from Queue import Queue, Empty
from subprocess import Popen
import os
import signal
Expand All @@ -28,8 +27,7 @@

# System library imports.
import zmq
from zmq import POLLIN, POLLOUT, POLLERR
from zmq.eventloop import ioloop
from zmq.eventloop import ioloop, zmqstream

# Local imports.
from IPython.config.loader import Config
Expand Down Expand Up @@ -88,7 +86,7 @@ class ZMQSocketChannel(Thread):
session = None
socket = None
ioloop = None
iostate = None
stream = None
_address = None

def __init__(self, context, session, address):
Expand Down Expand Up @@ -144,37 +142,28 @@ def address(self):
"""
return self._address

def add_io_state(self, state):
"""Add IO state to the eventloop.
def _queue_send(self, msg):
"""Queue a message to be sent from the IOLoop's thread.
Parameters
----------
state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
The IO state flag to set.
This is thread safe as it uses the thread safe IOLoop.add_callback.
msg : message to send
This is threadsafe, as it uses IOLoop.add_callback to give the loop's
thread control of the action.
"""
def add_io_state_callback():
if not self.iostate & state:
self.iostate = self.iostate | state
self.ioloop.update_handler(self.socket, self.iostate)
self.ioloop.add_callback(add_io_state_callback)

def drop_io_state(self, state):
"""Drop IO state from the eventloop.
Parameters
----------
state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
The IO state flag to set.
def thread_send():
self.session.send(self.stream, msg)
self.ioloop.add_callback(thread_send)

This is thread safe as it uses the thread safe IOLoop.add_callback.
def _handle_recv(self, msg):
"""callback for stream.on_recv
unpacks message, and calls handlers with it.
"""
def drop_io_state_callback():
if self.iostate & state:
self.iostate = self.iostate & (~state)
self.ioloop.update_handler(self.socket, self.iostate)
self.ioloop.add_callback(drop_io_state_callback)
ident,smsg = self.session.feed_identities(msg)
self.call_handlers(self.session.unserialize(smsg))



class ShellSocketChannel(ZMQSocketChannel):
Expand All @@ -187,17 +176,15 @@ class ShellSocketChannel(ZMQSocketChannel):

def __init__(self, context, session, address):
super(ShellSocketChannel, self).__init__(context, session, address)
self.command_queue = Queue()
self.ioloop = ioloop.IOLoop()

def run(self):
"""The thread's main activity. Call start() instead."""
self.socket = self.context.socket(zmq.DEALER)
self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
self.socket.connect('tcp://%s:%i' % self.address)
self.iostate = POLLERR|POLLIN
self.ioloop.add_handler(self.socket, self._handle_events,
self.iostate)
self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
self.stream.on_recv(self._handle_recv)
self._run_loop()

def stop(self):
Expand Down Expand Up @@ -268,7 +255,7 @@ def execute(self, code, silent=False,
allow_stdin=allow_stdin,
)
msg = self.session.msg('execute_request', content)
self._queue_request(msg)
self._queue_send(msg)
return msg['header']['msg_id']

def complete(self, text, line, cursor_pos, block=None):
Expand All @@ -293,7 +280,7 @@ def complete(self, text, line, cursor_pos, block=None):
"""
content = dict(text=text, line=line, block=block, cursor_pos=cursor_pos)
msg = self.session.msg('complete_request', content)
self._queue_request(msg)
self._queue_send(msg)
return msg['header']['msg_id']

def object_info(self, oname):
Expand All @@ -310,7 +297,7 @@ def object_info(self, oname):
"""
content = dict(oname=oname)
msg = self.session.msg('object_info_request', content)
self._queue_request(msg)
self._queue_send(msg)
return msg['header']['msg_id']

def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
Expand Down Expand Up @@ -348,7 +335,7 @@ def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
content = dict(raw=raw, output=output, hist_access_type=hist_access_type,
**kwargs)
msg = self.session.msg('history_request', content)
self._queue_request(msg)
self._queue_send(msg)
return msg['header']['msg_id']

def shutdown(self, restart=False):
Expand All @@ -365,38 +352,9 @@ def shutdown(self, restart=False):
# Send quit message to kernel. Once we implement kernel-side setattr,
# this should probably be done that way, but for now this will do.
msg = self.session.msg('shutdown_request', {'restart':restart})
self._queue_request(msg)
self._queue_send(msg)
return msg['header']['msg_id']

def _handle_events(self, socket, events):
if events & POLLERR:
self._handle_err()
if events & POLLOUT:
self._handle_send()
if events & POLLIN:
self._handle_recv()

def _handle_recv(self):
ident,msg = self.session.recv(self.socket, 0)
self.call_handlers(msg)

def _handle_send(self):
try:
msg = self.command_queue.get(False)
except Empty:
pass
else:
self.session.send(self.socket,msg)
if self.command_queue.empty():
self.drop_io_state(POLLOUT)

def _handle_err(self):
# We don't want to let this go silently, so eventually we should log.
raise zmq.ZMQError()

def _queue_request(self, msg):
self.command_queue.put(msg)
self.add_io_state(POLLOUT)


class SubSocketChannel(ZMQSocketChannel):
Expand All @@ -413,9 +371,8 @@ def run(self):
self.socket.setsockopt(zmq.SUBSCRIBE,b'')
self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
self.socket.connect('tcp://%s:%i' % self.address)
self.iostate = POLLIN|POLLERR
self.ioloop.add_handler(self.socket, self._handle_events,
self.iostate)
self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
self.stream.on_recv(self._handle_recv)
self._run_loop()

def stop(self):
Expand Down Expand Up @@ -456,33 +413,9 @@ def flush(self, timeout=1.0):
while not self._flushed and time.time() < stop_time:
time.sleep(0.01)

def _handle_events(self, socket, events):
# Turn on and off POLLOUT depending on if we have made a request
if events & POLLERR:
self._handle_err()
if events & POLLIN:
self._handle_recv()

def _handle_err(self):
# We don't want to let this go silently, so eventually we should log.
raise zmq.ZMQError()

def _handle_recv(self):
# Get all of the messages we can
while True:
try:
ident,msg = self.session.recv(self.socket)
except zmq.ZMQError:
# Check the errno?
# Will this trigger POLLERR?
break
else:
if msg is None:
break
self.call_handlers(msg)

def _flush(self):
"""Callback for :method:`self.flush`."""
self.stream.flush()
self._flushed = True


Expand All @@ -494,16 +427,14 @@ class StdInSocketChannel(ZMQSocketChannel):
def __init__(self, context, session, address):
super(StdInSocketChannel, self).__init__(context, session, address)
self.ioloop = ioloop.IOLoop()
self.msg_queue = Queue()

def run(self):
"""The thread's main activity. Call start() instead."""
self.socket = self.context.socket(zmq.DEALER)
self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
self.socket.connect('tcp://%s:%i' % self.address)
self.iostate = POLLERR|POLLIN
self.ioloop.add_handler(self.socket, self._handle_events,
self.iostate)
self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
self.stream.on_recv(self._handle_recv)
self._run_loop()

def stop(self):
Expand All @@ -524,37 +455,7 @@ def input(self, string):
"""Send a string of raw input to the kernel."""
content = dict(value=string)
msg = self.session.msg('input_reply', content)
self._queue_reply(msg)

def _handle_events(self, socket, events):
if events & POLLERR:
self._handle_err()
if events & POLLOUT:
self._handle_send()
if events & POLLIN:
self._handle_recv()

def _handle_recv(self):
ident,msg = self.session.recv(self.socket, 0)
self.call_handlers(msg)

def _handle_send(self):
try:
msg = self.msg_queue.get(False)
except Empty:
pass
else:
self.session.send(self.socket,msg)
if self.msg_queue.empty():
self.drop_io_state(POLLOUT)

def _handle_err(self):
# We don't want to let this go silently, so eventually we should log.
raise zmq.ZMQError()

def _queue_reply(self, msg):
self.msg_queue.put(msg)
self.add_io_state(POLLOUT)
self._queue_send(msg)


class HBSocketChannel(ZMQSocketChannel):
Expand Down

0 comments on commit aa2337b

Please sign in to comment.