Skip to content

Commit

Permalink
Major refactor of kernel connection management in the notebook.
Browse files Browse the repository at this point in the history
* Full kernel heartbeating is working.
* Connections between the notebook and server and now created
  a new each time there is a WebSocket connection. Each channel is
  also handled separately. This dramatically simplifies the
  server code and makes for a more scalable system.
  • Loading branch information
ellisonbg committed Aug 15, 2011
1 parent 31e9da5 commit 5bad195
Show file tree
Hide file tree
Showing 8 changed files with 276 additions and 386 deletions.
146 changes: 120 additions & 26 deletions IPython/frontend/html/notebook/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,22 @@
# Imports
#-----------------------------------------------------------------------------

import json
import logging
import os
import urllib

from tornado import web
from tornado import websocket

from zmq.eventloop import ioloop
from zmq.utils import jsonapi

from IPython.zmq.session import Session

try:
from docutils.core import publish_string
except ImportError:
publish_string = None



#-----------------------------------------------------------------------------
# Top-level handlers
#-----------------------------------------------------------------------------
Expand Down Expand Up @@ -52,56 +54,149 @@ def get(self, notebook_id):
class MainKernelHandler(web.RequestHandler):

def get(self):
rkm = self.application.routing_kernel_manager
self.finish(json.dumps(rkm.kernel_ids))
km = self.application.kernel_manager
self.finish(jsonapi.dumps(km.kernel_ids))

def post(self):
rkm = self.application.routing_kernel_manager
km = self.application.kernel_manager
notebook_id = self.get_argument('notebook', default=None)
kernel_id = rkm.start_kernel(notebook_id)
kernel_id = km.start_kernel(notebook_id)
self.set_header('Location', '/'+kernel_id)
self.finish(json.dumps(kernel_id))
self.finish(jsonapi.dumps(kernel_id))


class KernelHandler(web.RequestHandler):

SUPPORTED_METHODS = ('DELETE')

def delete(self, kernel_id):
rkm = self.application.routing_kernel_manager
rkm.kill_kernel(kernel_id)
km = self.application.kernel_manager
km.kill_kernel(kernel_id)
self.set_status(204)
self.finish()


class KernelActionHandler(web.RequestHandler):

def post(self, kernel_id, action):
rkm = self.application.routing_kernel_manager
km = self.application.kernel_manager
if action == 'interrupt':
rkm.interrupt_kernel(kernel_id)
km.interrupt_kernel(kernel_id)
self.set_status(204)
if action == 'restart':
new_kernel_id = rkm.restart_kernel(kernel_id)
self.write(json.dumps(new_kernel_id))
new_kernel_id = km.restart_kernel(kernel_id)
self.write(jsonapi.dumps(new_kernel_id))
self.finish()


class ZMQStreamHandler(websocket.WebSocketHandler):

def initialize(self, stream_name):
self.stream_name = stream_name
def _reserialize_reply(self, msg_list):
"""Reserialize a reply message using JSON.
This takes the msg list from the ZMQ socket, unserializes it using
self.session and then serializes the result using JSON. This method
should be used by self._on_zmq_reply to build messages that can
be sent back to the browser.
"""
idents, msg_list = self.session.feed_identities(msg_list)
msg = self.session.unserialize(msg_list)
msg['header'].pop('date')
msg.pop('buffers')
return jsonapi.dumps(msg)


class IOPubHandler(ZMQStreamHandler):

def initialize(self, *args, **kwargs):
self._kernel_alive = True
self._beating = False

def open(self, kernel_id):
km = self.application.kernel_manager
self.kernel_id = kernel_id
self.session = Session()
self.time_to_dead = km.time_to_dead
self.iopub_stream = km.create_iopub_stream(kernel_id)
self.hb_stream = km.create_hb_stream(kernel_id)
self.iopub_stream.on_recv(self._on_zmq_reply)
self.start_hb(self.kernel_died)

def _on_zmq_reply(self, msg_list):
msg = self._reserialize_reply(msg_list)
self.write_message(msg)

def on_close(self):
self.stop_hb()
self.iopub_stream.close()
self.hb_stream.close()

def start_hb(self, callback):
"""Start the heartbeating and call the callback if the kernel dies."""
if not self._beating:
self._kernel_alive = True

def ping_or_dead():
if self._kernel_alive:
self._kernel_alive = False
self.hb_stream.send(b'ping')
else:
try:
callback()
except:
pass
finally:
self._hb_periodic_callback.stop()

def beat_received(msg):
self._kernel_alive = True

self.hb_stream.on_recv(beat_received)
self._hb_periodic_callback = ioloop.PeriodicCallback(ping_or_dead, self.time_to_dead*1000)
self._hb_periodic_callback.start()
self._beating= True

def stop_hb(self):
"""Stop the heartbeating and cancel all related callbacks."""
if self._beating:
self._hb_periodic_callback.stop()
if not self.hb_stream.closed():
self.hb_stream.on_recv(None)

def kernel_died(self):
self.write_message(
{'header': {'msg_type': 'status'},
'parent_header': {},
'content': {'execution_state':'dead'}
}
)
self.on_close()


class ShellHandler(ZMQStreamHandler):

def initialize(self, *args, **kwargs):
pass

def open(self, kernel_id):
rkm = self.application.routing_kernel_manager
self.router = rkm.get_router(kernel_id, self.stream_name)
self.client_id = self.router.register_client(self)
km = self.application.kernel_manager
self.max_msg_size = km.max_msg_size
self.kernel_id = kernel_id
self.session = Session()
self.shell_stream = self.application.kernel_manager.create_shell_stream(kernel_id)
self.shell_stream.on_recv(self._on_zmq_reply)

def _on_zmq_reply(self, msg_list):
msg = self._reserialize_reply(msg_list)
self.write_message(msg)

def on_message(self, msg):
self.router.forward_msg(self.client_id, msg)
if len(msg) < self.max_msg_size:
msg = jsonapi.loads(msg)
self.session.send(self.shell_stream, msg)

def on_close(self):
self.router.unregister_client(self.client_id)
self.shell_stream.close()


#-----------------------------------------------------------------------------
Expand All @@ -113,7 +208,7 @@ class NotebookRootHandler(web.RequestHandler):
def get(self):
nbm = self.application.notebook_manager
files = nbm.list_notebooks()
self.finish(json.dumps(files))
self.finish(jsonapi.dumps(files))

def post(self):
nbm = self.application.notebook_manager
Expand All @@ -125,7 +220,7 @@ def post(self):
else:
notebook_id = nbm.new_notebook()
self.set_header('Location', '/'+notebook_id)
self.finish(json.dumps(notebook_id))
self.finish(jsonapi.dumps(notebook_id))


class NotebookHandler(web.RequestHandler):
Expand Down Expand Up @@ -175,11 +270,10 @@ def post(self):
body = self.request.body.strip()
source = body
# template_path=os.path.join(os.path.dirname(__file__), u'templates', u'rst_template.html')
print template_path
defaults = {'file_insertion_enabled': 0,
'raw_enabled': 0,
'_disable_config': 1,
'stylesheet_path': 0,
'stylesheet_path': 0
# 'template': template_path
}
try:
Expand Down
Loading

0 comments on commit 5bad195

Please sign in to comment.