Permalink
Browse files

Different clients now share a single zmq session.

Previously, each client (browser window) would open its own set
of ZMQ sockets to a kernel. Now one master set of connections
to the kernel is created and all clients share those connections.
In some ways, this simplifies the URL design.

I have also made kernel_ids server-side created.
  • Loading branch information...
1 parent 339135b commit 2ad3ee4100f2a25c7f938f53ffc6c10116d783fc @ellisonbg ellisonbg committed with ellisonbg Apr 26, 2011
@@ -1,5 +1,6 @@
import signal
import sys
+import uuid
from IPython.zmq.ipkernel import launch_kernel
from session import SessionManager
@@ -30,9 +31,8 @@ def __contains__(self, kernel_id):
else:
return False
- def start_kernel(self, kernel_id):
- if kernel_id in self._kernels:
- raise DuplicateKernelError("Kernel already exists: %s" % kernel_id)
+ def start_kernel(self):
+ kernel_id = str(uuid.uuid4())
(process, shell_port, iopub_port, stdin_port, hb_port) = launch_kernel(pylab='inline')
d = dict(
process = process,
@@ -3,6 +3,8 @@
import logging
import os
import urllib
+import uuid
+from Queue import Queue
import zmq
@@ -21,88 +23,87 @@
options.define("port", default=8888, help="run on the given port", type=int)
-_session_id_regex = r"(?P<session_id>\w+-\w+-\w+-\w+-\w+)"
-_kernel_id_regex = r"(?P<kernel_id>\w+)"
+_kernel_id_regex = r"(?P<kernel_id>\w+-\w+-\w+-\w+-\w+)"
class MainHandler(web.RequestHandler):
def get(self):
self.render('notebook.html')
-class BaseKernelHandler(object):
-
- def get_kernel(self):
- return self.application.kernel_manager
-
- def get_session(self, kernel_id):
- km = self.get_kernel()
- sm = km.get_session_manager(kernel_id)
- return sm
-
-
-class KernelHandler(web.RequestHandler, BaseKernelHandler):
+class KernelHandler(web.RequestHandler):
def get(self):
- self.write(json.dumps(self.get_kernel().kernel_ids))
+ self.write(json.dumps(self.application.kernel_ids))
- def post(self, *args, **kwargs):
- kernel_id = kwargs['kernel_id']
- self.get_kernel().start_kernel(kernel_id)
- logging.info("Starting kernel: %s" % kernel_id)
+ def post(self):
+ kernel_id = self.application.start_kernel()
+ self.application.start_session(kernel_id)
self.write(json.dumps(kernel_id))
-class SessionHandler(web.RequestHandler, BaseKernelHandler):
+class ZMQStreamRouter(object):
- def get(self, *args, **kwargs):
- kernel_id = kwargs['kernel_id']
- self.write(json.dumps(self.get_session(kernel_id).session_ids))
+ def __init__(self, zmq_stream):
+ self.zmq_stream = zmq_stream
+ self._clients = {}
+ self.zmq_stream.on_recv(self._on_zmq_reply)
- def post(self, *args, **kwargs):
- kernel_id = kwargs['kernel_id']
- sm = self.get_session(kernel_id)
- session_id = sm.start_session()
- logging.info("Starting session: %s, %s" % (kernel_id, session_id))
- self.write(json.dumps(session_id))
+ def register_client(self, client):
+ client_id = uuid.uuid4()
+ self._clients[client_id] = client
+ return client_id
+ def unregister_client(self, client_id):
+ del self._clients[client_id]
-class ZMQStreamHandler(websocket.WebSocketHandler, BaseKernelHandler):
- stream_name = ''
+class IOPubStreamRouter(ZMQStreamRouter):
- def open(self, *args, **kwargs):
- kernel_id = kwargs['kernel_id']
- session_id = kwargs['session_id']
- logging.info("Connection open: %s, %s" % (kernel_id,session_id))
- sm = self.get_session(kernel_id)
- method_name = "get_%s_stream" % self.stream_name
- method = getattr(sm, method_name)
- self.zmq_stream = method(session_id)
- self.zmq_stream.on_recv(self._on_zmq_reply)
+ def _on_zmq_reply(self, msg_list):
+ for client_id, client in self._clients.items():
+ for msg in msg_list:
+ client.write_message(msg)
- def on_message(self, msg):
- logging.info("Message received: %r, %r" % (msg, self.__class__))
- logging.info(self.zmq_stream)
- self.zmq_stream.send_unicode(msg)
+ def forward_unicode(self, client_id, msg):
+ # This is a SUB stream that we should never write to.
+ pass
- def on_close(self):
- self.zmq_stream.close()
+
+class ShellStreamRouter(ZMQStreamRouter):
+
+ def __init__(self, zmq_stream):
+ ZMQStreamRouter.__init__(self, zmq_stream)
+ self._request_queue = Queue()
def _on_zmq_reply(self, msg_list):
- for msg in msg_list:
- logging.info("Message reply: %r" % msg)
- self.write_message(msg)
+ client_id = self._request_queue.get(block=False)
+ client = self._clients.get(client_id)
+ if client is not None:
+ for msg in msg_list:
+ client.write_message(msg)
+
+ def forward_unicode(self, client_id, msg):
+ self._request_queue.put(client_id)
+ self.zmq_stream.send_unicode(msg)
-class IOPubStreamHandler(ZMQStreamHandler):
+class ZMQStreamHandler(websocket.WebSocketHandler):
- stream_name = 'iopub'
+ def initialize(self, stream_name):
+ self.stream_name = stream_name
+ def open(self, kernel_id):
+ self.router = self.application.get_router(kernel_id, self.stream_name)
+ self.client_id = self.router.register_client(self)
+ logging.info("Connection open: %s, %s" % (kernel_id, self.client_id))
-class ShellStreamHandler(ZMQStreamHandler):
+ def on_message(self, msg):
+ self.router.forward_unicode(self.client_id, msg)
- stream_name = 'shell'
+ def on_close(self):
+ self.router.unregister_client(self.client_id)
+ logging.info("Connection closed: %s" % self.client_id)
class NotebookRootHandler(web.RequestHandler):
@@ -157,10 +158,9 @@ class NotebookApplication(web.Application):
def __init__(self):
handlers = [
(r"/", MainHandler),
- (r"/kernels/%s" % (_kernel_id_regex,), KernelHandler),
- (r"/kernels/%s/sessions" % (_kernel_id_regex,), SessionHandler),
- (r"/kernels/%s/sessions/%s/iopub" % (_kernel_id_regex,_session_id_regex), IOPubStreamHandler),
- (r"/kernels/%s/sessions/%s/shell" % (_kernel_id_regex,_session_id_regex), ShellStreamHandler),
+ (r"/kernels", KernelHandler),
+ (r"/kernels/%s/iopub" % _kernel_id_regex, ZMQStreamHandler, dict(stream_name='iopub')),
+ (r"/kernels/%s/shell" % _kernel_id_regex, ZMQStreamHandler, dict(stream_name='shell')),
(r"/notebooks", NotebookRootHandler),
(r"/notebooks/([^/]+)", NotebookHandler)
]
@@ -169,8 +169,46 @@ def __init__(self):
static_path=os.path.join(os.path.dirname(__file__), "static"),
)
web.Application.__init__(self, handlers, **settings)
+
self.context = zmq.Context()
self.kernel_manager = KernelManager(self.context)
+ self._session_dict = {}
+ self._routers = {}
+
+ #-------------------------------------------------------------------------
+ # Methods for managing kernels and sessions
+ #-------------------------------------------------------------------------
+
+ @property
+ def kernel_ids(self):
+ return self.kernel_manager.kernel_ids
+
+ def start_kernel(self):
+ kernel_id = self.kernel_manager.start_kernel()
+ logging.info("Kernel started: %s" % kernel_id)
+ return kernel_id
+
+ def start_session(self, kernel_id):
+ sm = self.kernel_manager.get_session_manager(kernel_id)
+ session_id = sm.start_session()
+ self._session_dict[kernel_id] = session_id
+ iopub_stream = sm.get_iopub_stream(session_id)
+ shell_stream = sm.get_shell_stream(session_id)
+ iopub_router = IOPubStreamRouter(iopub_stream)
+ shell_router = ShellStreamRouter(shell_stream)
+ self._routers[(kernel_id, session_id, 'iopub')] = iopub_router
+ self._routers[(kernel_id, session_id, 'shell')] = shell_router
+ logging.info("Session started: %s, %s" % (kernel_id, session_id))
+
+ def stop_session(self, kernel_id):
+ # TODO: finish this!
+ sm = self.kernel_manager.get_session_manager(kernel_id)
+ session_id = self._session_dict[kernel_id]
+
+ def get_router(self, kernel_id, stream_name):
+ session_id = self._session_dict[kernel_id]
+ router = self._routers[(kernel_id, session_id, stream_name)]
+ return router
def main():
@@ -92,7 +92,6 @@ var Notebook = function (selector) {
this.element.scroll();
this.element.data("notebook", this);
this.next_prompt_number = 1;
- this.next_kernel_number = 0;
this.kernel = null;
this.msg_cell_map = {};
this.bind_events();
@@ -429,20 +428,13 @@ Notebook.prototype.expand = function (index) {
// Kernel related things
Notebook.prototype.start_kernel = function () {
- this.kernel = new Kernel("kernel" + this.next_kernel_number);
- this.next_kernel_number = this.next_kernel_number + 1;
+ this.kernel = new Kernel();
this.kernel.start_kernel(this._kernel_started, this);
};
Notebook.prototype._kernel_started = function () {
console.log("Kernel started: ", this.kernel.kernel_id);
- this.kernel.start_session(this._session_started, this);
-};
-
-
-Notebook.prototype._session_started = function () {
- console.log("Session started: ", this.kernel.session_id);
var that = this;
this.kernel.shell_channel.onmessage = function (e) {
@@ -711,11 +703,10 @@ TextCell.prototype.config_mathjax = function () {
//============================================================================
-var Kernel = function (kernel_id) {
- this.kernel_id = kernel_id;
+var Kernel = function () {
+ this.kernel_id = null;
this.base_url = "/kernels";
- this.kernel_url = this.base_url + "/" + this.kernel_id
- this.session_id = null;
+ this.kernel_url = null;
};
@@ -734,32 +725,26 @@ Kernel.prototype.get_msg = function (msg_type, content) {
}
Kernel.prototype.start_kernel = function (callback, context) {
- $.post(this.kernel_url, function () {
- callback.call(context);
- });
-};
-
-
-Kernel.prototype.start_session = function (callback, context) {
var that = this;
- $.post(this.kernel_url + "/sessions",
- function (session_id) {
- that._handle_start_session(session_id, callback, context);
- },
- 'json');
-}
+ $.post(this.base_url,
+ function (kernel_id) {
+ that._handle_start_kernel(kernel_id, callback, context);
+ },
+ 'json'
+ );
+};
-Kernel.prototype._handle_start_session = function (session_id, callback, context) {
- this.session_id = session_id;
- this.session_url = this.kernel_url + "/sessions/" + this.session_id;
+Kernel.prototype._handle_start_kernel = function (kernel_id, callback, context) {
+ this.kernel_id = kernel_id;
+ this.kernel_url = this.base_url + "/" + this.kernel_id;
this._start_channels();
callback.call(context);
};
Kernel.prototype._start_channels = function () {
- var ws_url = "ws://127.0.0.1:8888" + this.session_url;
+ var ws_url = "ws://127.0.0.1:8888" + this.kernel_url;
this.shell_channel = new WebSocket(ws_url + "/shell");
this.iopub_channel = new WebSocket(ws_url + "/iopub");
}

0 comments on commit 2ad3ee4

Please sign in to comment.