Skip to content

Commit

Permalink
Prevent missing IOPub on restart
Browse files Browse the repository at this point in the history
  • Loading branch information
SylvainCorlay committed Dec 10, 2020
1 parent 0c83c9d commit 2e2fc54
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 10 deletions.
2 changes: 1 addition & 1 deletion notebook/services/kernels/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ def _handle_kernel_info_reply(self, msg):
enabling msg spec adaptation, if necessary
"""
idents,msg = self.session.feed_identities(msg)
idents, msg = self.session.feed_identities(msg)
try:
msg = self.session.deserialize(msg)
except:
Expand Down
44 changes: 35 additions & 9 deletions notebook/services/kernels/kernelmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from datetime import datetime, timedelta
from functools import partial
import os
import time

from tornado import web
from tornado.concurrent import Future
Expand Down Expand Up @@ -310,37 +311,62 @@ async def restart_kernel(self, kernel_id, now=False):
await maybe_future(self.pinned_superclass.restart_kernel(self, kernel_id, now=now))
kernel = self.get_kernel(kernel_id)
# return a Future that will resolve when the kernel has successfully restarted
channel = kernel.connect_shell()
shell_channel = kernel.connect_shell()
iopub_channel = kernel.connect_iopub()
future = Future()
info_future = Future()
iopub_future = Future()

def finish():
"""Common cleanup when restart finishes/fails for any reason."""
if not channel.closed():
channel.close()
loop.remove_timeout(timeout)
kernel.remove_restart_callback(on_restart_failed, 'dead')

def on_reply(msg):
def on_shell_reply(msg):
self.log.debug("Kernel info reply received: %s", kernel_id)
finish()
if not future.done():
future.set_result(msg)
shell_channel.close()
if not info_future.done():
info_future.set_result(msg)
if iopub_future.done():
finish()
future.set_result(info_future.result())

def on_iopub(msg):
self.log.debug("first IOPub received: %s", kernel_id)
iopub_channel.close()
if not iopub_future.done():
iopub_future.set_result(None)
if info_future.done():
finish()
future.set_result(info_future.result())

def on_timeout():
self.log.warning("Timeout waiting for kernel_info_reply: %s", kernel_id)
if not shell_channel.closed():
shell_channel.close()
if not iopub_channel.closed():
iopub_channel.close()
finish()
if not future.done():
future.set_exception(TimeoutError("Timeout waiting for restart"))

def on_restart_failed():
self.log.warning("Restarting kernel failed: %s", kernel_id)
if not shell_channel.closed():
shell_channel.close()
if not iopub_channel.closed():
iopub_channel.close()
finish()
if not future.done():
future.set_exception(RuntimeError("Restart failed"))

kernel.add_restart_callback(on_restart_failed, 'dead')
kernel.session.send(channel, "kernel_info_request")
channel.on_recv(on_reply)
iopub_channel.on_recv(on_iopub)
shell_channel.on_recv(on_shell_reply)
while not future.done():
time.sleep(0.2)
# Nudge the kernel with kernel info requests until we get an IOPub message
kernel.session.send(shell_channel, "kernel_info_request")
loop = IOLoop.current()
timeout = loop.add_timeout(loop.time() + self.kernel_info_timeout, on_timeout)
return future
Expand Down

0 comments on commit 2e2fc54

Please sign in to comment.