Permalink
Browse files

Fixed subtle bug in kernel restarting.

* Old routers were not being shutdown and removed.
* We were incorrectly associating the new kernel with the notebook
  (we were using the *old* kernel_id for this).
* General clean ups in the kernel manager.
  • Loading branch information...
1 parent 03eb23c commit 708c73e487b631e064bd1e0566909797aed1e315 @ellisonbg ellisonbg committed Aug 7, 2011
@@ -90,14 +90,12 @@ def open(self, kernel_id):
rkm = self.application.routing_kernel_manager
self.router = rkm.get_router(kernel_id, self.stream_name)
self.client_id = self.router.register_client(self)
- logging.info("Connection open: %s, %s" % (kernel_id, self.client_id))
def on_message(self, msg):
self.router.forward_msg(self.client_id, msg)
def on_close(self):
self.router.unregister_client(self.client_id)
- logging.info("Connection closed: %s" % self.client_id)
#-----------------------------------------------------------------------------
@@ -207,37 +207,58 @@ class RoutingKernelManager(LoggingConfigurable):
@property
def kernel_ids(self):
+ """List the kernel ids."""
return self.kernel_manager.kernel_ids
+ def kernel_for_notebook(self, notebook_id):
+ """Return the kernel_id for a notebook_id or None."""
+ return self._notebook_mapping.get(notebook_id)
+
+ def set_kernel_for_notebook(self, notebook_id, kernel_id):
+ """Associate a notebook with a kernel."""
+ if notebook_id is not None:
+ self._notebook_mapping[notebook_id] = kernel_id
+
def notebook_for_kernel(self, kernel_id):
+ """Return the notebook_id for a kernel_id or None."""
notebook_ids = [k for k, v in self._notebook_mapping.iteritems() if v == kernel_id]
if len(notebook_ids) == 1:
return notebook_ids[0]
else:
return None
def delete_mapping_for_kernel(self, kernel_id):
+ """Remove the kernel/notebook mapping for kernel_id."""
notebook_id = self.notebook_for_kernel(kernel_id)
if notebook_id is not None:
del self._notebook_mapping[notebook_id]
def start_kernel(self, notebook_id=None):
+ """Start a kernel an return its kernel_id.
+
+ Parameters
+ ----------
+ notebook_id : uuid
+ The uuid of the notebook to associate the new kernel with. If this
+ is not None, this kernel will be persistent whenever the notebook
+ requests a kernel.
+ """
self.log.info
- kernel_id = self._notebook_mapping.get(notebook_id)
+ kernel_id = self.kernel_for_notebook(notebook_id)
if kernel_id is None:
kwargs = dict()
kwargs['extra_arguments'] = self.kernel_argv
kernel_id = self.kernel_manager.start_kernel(**kwargs)
- if notebook_id is not None:
- self._notebook_mapping[notebook_id] = kernel_id
- self.log.info("Kernel started for notebook %s: %s" % (notebook_id,kernel_id))
+ self.set_kernel_for_notebook(notebook_id, kernel_id)
+ self.log.info("Kernel started: %s" % kernel_id)
self.log.debug("Kernel args: %r" % kwargs)
self.start_session_manager(kernel_id)
else:
self.log.info("Using existing kernel: %s" % kernel_id)
return kernel_id
def start_session_manager(self, kernel_id):
+ """Start the ZMQ sockets (a "session") to connect to a kernel."""
sm = self.kernel_manager.create_session_manager(kernel_id)
self._session_dict[kernel_id] = sm
iopub_stream = sm.get_iopub_stream()
@@ -248,10 +269,11 @@ def start_session_manager(self, kernel_id):
shell_router = ShellStreamRouter(
zmq_stream=shell_stream, session=sm.session, config=self.config
)
- self._routers[(kernel_id, 'iopub')] = iopub_router
- self._routers[(kernel_id, 'shell')] = shell_router
+ self.set_router(kernel_id, 'iopub', iopub_router)
+ self.set_router(kernel_id, 'shell', shell_router)
def kill_kernel(self, kernel_id):
+ """Kill a kernel and remove its notebook association."""
if kernel_id not in self.kernel_manager:
raise web.HTTPError(404)
try:
@@ -264,12 +286,14 @@ def kill_kernel(self, kernel_id):
self.log.info("Kernel killed: %s" % kernel_id)
def interrupt_kernel(self, kernel_id):
+ """Interrupt a kernel."""
if kernel_id not in self.kernel_manager:
raise web.HTTPError(404)
self.kernel_manager.interrupt_kernel(kernel_id)
self.log.debug("Kernel interrupted: %s" % kernel_id)
def restart_kernel(self, kernel_id):
+ """Restart a kernel while keeping clients connected."""
if kernel_id not in self.kernel_manager:
raise web.HTTPError(404)
@@ -286,6 +310,14 @@ def restart_kernel(self, kernel_id):
new_iopub_router.copy_clients(old_iopub_router)
new_shell_router.copy_clients(old_shell_router)
+ # Shut down the old routers
+ old_shell_router.close()
+ old_iopub_router.close()
+ self.delete_router(kernel_id, 'shell')
+ self.delete_router(kernel_id, 'iopub')
+ del old_shell_router
+ del old_iopub_router
+
# Now shutdown the old session and the kernel.
# TODO: This causes a hard crash in ZMQStream.close, which sets
# self.socket to None to hastily. We will need to fix this in PyZMQ
@@ -295,12 +327,24 @@ def restart_kernel(self, kernel_id):
# Now save the new kernel/notebook association. We have to save it
# after the old kernel is killed as that will delete the mapping.
- self._notebook_mapping[notebook_id] = kernel_id
+ self.set_kernel_for_notebook(notebook_id, new_kernel_id)
- self.log.debug("Kernel restarted: %s -> %s" % (kernel_id, new_kernel_id))
+ self.log.debug("Kernel restarted: %s" % new_kernel_id)
return new_kernel_id
def get_router(self, kernel_id, stream_name):
+ """Return the router for a given kernel_id and stream name."""
router = self._routers[(kernel_id, stream_name)]
return router
+ def set_router(self, kernel_id, stream_name, router):
+ """Set the router for a given kernel_id and stream_name."""
+ self._routers[(kernel_id, stream_name)] = router
+
+ def delete_router(self, kernel_id, stream_name):
+ """Delete a router for a kernel_id and stream_name."""
+ try:
+ del self._routers[(kernel_id, stream_name)]
+ except KeyError:
+ pass
+
@@ -37,9 +37,17 @@ def __init__(self, **kwargs):
super(ZMQStreamRouter,self).__init__(**kwargs)
self.zmq_stream.on_recv(self._on_zmq_reply)
+ def __del__(self):
+ self.close()
+
+ def close(self):
+ """Disable the routing actions of this router."""
+ self._clients = {}
+ self.zmq_stream.on_recv(None)
+
def register_client(self, client):
"""Register a client, returning a client uuid."""
- client_id = uuid.uuid4()
+ client_id = unicode(uuid.uuid4())
self._clients[client_id] = client
return client_id
@@ -112,7 +120,6 @@ def _on_zmq_reply(self, msg_list):
def forward_msg(self, client_id, msg):
if len(msg) < self.max_msg_size:
msg = json.loads(msg)
- # to_send = self.session.serialize(msg)
self._request_queue.put(client_id)
self.session.send(self.zmq_stream, msg)

0 comments on commit 708c73e

Please sign in to comment.