Skip to content

Commit

Permalink
More work on updating the notebook zmq forwarding.
Browse files Browse the repository at this point in the history
  • Loading branch information
ellisonbg committed Jul 21, 2011
1 parent f0ab259 commit fd1d84c
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 23 deletions.
2 changes: 1 addition & 1 deletion IPython/frontend/html/notebook/handlers.py
Expand Up @@ -55,7 +55,7 @@ def open(self, kernel_id):
logging.info("Connection open: %s, %s" % (kernel_id, self.client_id)) logging.info("Connection open: %s, %s" % (kernel_id, self.client_id))


def on_message(self, msg): def on_message(self, msg):
self.router.forward_unicode(self.client_id, msg) self.router.forward_msg(self.client_id, msg)


def on_close(self): def on_close(self):
self.router.unregister_client(self.client_id) self.router.unregister_client(self.client_id)
Expand Down
15 changes: 11 additions & 4 deletions IPython/frontend/html/notebook/notebookapp.py
Expand Up @@ -55,7 +55,7 @@


class NotebookWebApplication(web.Application): class NotebookWebApplication(web.Application):


def __init__(self, kernel_manager, log, kernel_argv): def __init__(self, kernel_manager, log, kernel_argv, config):
handlers = [ handlers = [
(r"/", MainHandler), (r"/", MainHandler),
(r"/kernels", KernelHandler), (r"/kernels", KernelHandler),
Expand All @@ -74,6 +74,7 @@ def __init__(self, kernel_manager, log, kernel_argv):
self.kernel_manager = kernel_manager self.kernel_manager = kernel_manager
self.log = log self.log = log
self.kernel_argv = kernel_argv self.kernel_argv = kernel_argv
self.config = config
self._routers = {} self._routers = {}
self._session_dict = {} self._session_dict = {}


Expand All @@ -99,8 +100,12 @@ def start_session_manager(self, kernel_id):
self._session_dict[kernel_id] = sm self._session_dict[kernel_id] = sm
iopub_stream = sm.get_iopub_stream() iopub_stream = sm.get_iopub_stream()
shell_stream = sm.get_shell_stream() shell_stream = sm.get_shell_stream()
iopub_router = IOPubStreamRouter(iopub_stream, sm.session) iopub_router = IOPubStreamRouter(
shell_router = ShellStreamRouter(shell_stream, sm.session) zmq_stream=iopub_stream, session=sm.session, config=self.config
)
shell_router = ShellStreamRouter(
zmq_stream=shell_stream, session=sm.session, config=self.config
)
self._routers[(kernel_id, 'iopub')] = iopub_router self._routers[(kernel_id, 'iopub')] = iopub_router
self._routers[(kernel_id, 'shell')] = shell_router self._routers[(kernel_id, 'shell')] = shell_router


Expand Down Expand Up @@ -230,7 +235,9 @@ def init_logging(self):
def initialize(self, argv=None): def initialize(self, argv=None):
super(IPythonNotebookApp, self).initialize(argv) super(IPythonNotebookApp, self).initialize(argv)
self.init_kernel_manager() self.init_kernel_manager()
self.web_app = NotebookWebApplication(self.kernel_manager, self.log, self.kernel_argv) self.web_app = NotebookWebApplication(
self.kernel_manager, self.log, self.kernel_argv, self.config
)
self.http_server = httpserver.HTTPServer(self.web_app) self.http_server = httpserver.HTTPServer(self.web_app)
self.http_server.listen(self.port) self.http_server.listen(self.port)


Expand Down
72 changes: 54 additions & 18 deletions IPython/frontend/html/notebook/routers.py
Expand Up @@ -2,57 +2,93 @@
from Queue import Queue from Queue import Queue
import json import json


class ZMQStreamRouter(object): from IPython.config.configurable import Configurable
from IPython.utils.traitlets import Instance, Int, Dict


def __init__(self, zmq_stream, session): class ZMQStreamRouter(Configurable):
self.zmq_stream = zmq_stream
self.session = session zmq_stream = Instance('zmq.eventloop.zmqstream.ZMQStream')
self._clients = {} session = Instance('IPython.zmq.session.Session')
max_msg_size = Int(2048, config=True, help="""
The max raw message size accepted from the browser
over a WebSocket connection.
""")

_clients = Dict()

def __init__(self, **kwargs):
super(ZMQStreamRouter,self).__init__(**kwargs)
self.zmq_stream.on_recv(self._on_zmq_reply) self.zmq_stream.on_recv(self._on_zmq_reply)


def register_client(self, client): def register_client(self, client):
"""Register a client, returning a client uuid."""
client_id = uuid.uuid4() client_id = uuid.uuid4()
self._clients[client_id] = client self._clients[client_id] = client
return client_id return client_id


def unregister_client(self, client_id): def unregister_client(self, client_id):
"""Unregister a client by its client uuid."""
del self._clients[client_id] del self._clients[client_id]


def copy_clients(self, router): def copy_clients(self, router):
# Copy the clients of another router. """Copy the clients of another router to this one.
This is used to enable the backend zeromq stream to disconnect
and reconnect while the WebSocket connections to browsers
remain, such as when a kernel is restarted.
"""
for client_id, client in router._clients.items(): for client_id, client in router._clients.items():
client.router = self client.router = self
self._clients[client_id] = client self._clients[client_id] = client


def forward_msg(self, client_id, msg):
"""Forward a msg to a client by its id.
The default implementation of this will fail silently if a message
arrives on a socket that doesn't support it. This method should
use max_msg_size to check and silently discard message that are too
long."""
pass

def _on_zmq_reply(self, msg_list):
"""Handle a message the ZMQ stream sends to the router.
Usually, this is where the return message will be written to
clients that need it using client.write_message().
"""
pass



class IOPubStreamRouter(ZMQStreamRouter): class IOPubStreamRouter(ZMQStreamRouter):


def _on_zmq_reply(self, msg_list): def _on_zmq_reply(self, msg_list):
msg = self.session.unpack_message(msg_list)
msg = json.dumps(msg)
for client_id, client in self._clients.items(): for client_id, client in self._clients.items():
for msg in msg_list: for msg in msg_list:
print "Got message: ", msg
client.write_message(msg) client.write_message(msg)


def forward_unicode(self, client_id, msg):
# This is a SUB stream that we should never write to.
pass



class ShellStreamRouter(ZMQStreamRouter): class ShellStreamRouter(ZMQStreamRouter):


def __init__(self, zmq_stream, session): _request_queue = Instance(Queue,(),{})
ZMQStreamRouter.__init__(self, zmq_stream, session)
self._request_queue = Queue()


def _on_zmq_reply(self, msg_list): def _on_zmq_reply(self, msg_list):
msg = self.session.unpack_message(msg_list)
msg = json.dumps(msg)
print "Reply: ", msg_list
client_id = self._request_queue.get(block=False) client_id = self._request_queue.get(block=False)
client = self._clients.get(client_id) client = self._clients.get(client_id)
if client is not None: if client is not None:
for msg in msg_list: for msg in msg_list:
client.write_message(msg) client.write_message(msg)


def forward_unicode(self, client_id, msg): def forward_msg(self, client_id, msg):
print "Inbound message: ", msg if len(msg) < self.max_msg_size:
self._request_queue.put(client_id) msg = json.loads(msg)
self.session.send(self.zmq_stream, msg) print "Raw msg: ", msg
to_send = self.session.serialize(msg)
print "to_send: ", to_send, to_send[-3:]
self._request_queue.put(client_id)
self.session.send_raw(self.zmq_stream, to_send[-3:])


0 comments on commit fd1d84c

Please sign in to comment.