Skip to content

Commit

Permalink
Add support for async kernel management via subclassing (#428)
Browse files Browse the repository at this point in the history
* Add support for async kernel management via subclassing

Introduced async subclasses that derive from synchronous classes,
overriding appropriate methods with async support.

* Convert to async/await model

* Apply changes per review

Also added the cache_ports logic that existed in the sync multiKernelManager.

* Add async def is_alive() back
  • Loading branch information
davidbrochart authored Mar 4, 2020
1 parent fa0c78b commit 99bb826
Show file tree
Hide file tree
Showing 9 changed files with 782 additions and 37 deletions.
4 changes: 2 additions & 2 deletions jupyter_client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from .connect import *
from .launcher import *
from .client import KernelClient
from .manager import KernelManager, run_kernel
from .manager import KernelManager, AsyncKernelManager, run_kernel
from .blocking import BlockingKernelClient
from .asynchronous import AsyncKernelClient
from .multikernelmanager import MultiKernelManager
from .multikernelmanager import MultiKernelManager, AsyncMultiKernelManager
20 changes: 19 additions & 1 deletion jupyter_client/asynchronous/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ async def wait_for_ready(self, timeout=None):
self._handle_kernel_info_reply(msg)
break

if not self.is_alive():
if not await self.is_alive():
raise RuntimeError('Kernel died before replying to kernel_info')

# Check if current time is ready check time plus timeout
Expand Down Expand Up @@ -234,6 +234,24 @@ def _output_hook_kernel(self, session, socket, parent_header, msg):
else:
self._output_hook_default(msg)

async def is_alive(self):
"""Is the kernel process still running?"""
from ..manager import KernelManager, AsyncKernelManager
if isinstance(self.parent, KernelManager):
# This KernelClient was created by a KernelManager,
# we can ask the parent KernelManager:
if isinstance(self.parent, AsyncKernelManager):
return await self.parent.is_alive()
return self.parent.is_alive()
if self._hb_channel is not None:
# We don't have access to the KernelManager,
# so we use the heartbeat.
return self._hb_channel.is_beating()
else:
# no heartbeat and not local, we can't tell if it's running,
# so naively return True
return True

async def execute_interactive(self, code, silent=False, store_history=True,
user_expressions=None, allow_stdin=None, stop_on_error=True,
timeout=None, output_hook=None, stdin_hook=None,
Expand Down
4 changes: 2 additions & 2 deletions jupyter_client/ioloop/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
from .manager import IOLoopKernelManager
from .restarter import IOLoopKernelRestarter
from .manager import IOLoopKernelManager, AsyncIOLoopKernelManager
from .restarter import IOLoopKernelRestarter, AsyncIOLoopKernelRestarter
46 changes: 44 additions & 2 deletions jupyter_client/ioloop/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
Type,
)

from jupyter_client.manager import KernelManager
from .restarter import IOLoopKernelRestarter
from jupyter_client.manager import KernelManager, AsyncKernelManager
from .restarter import IOLoopKernelRestarter, AsyncIOLoopKernelRestarter


def as_zmqstream(f):
Expand All @@ -21,9 +21,11 @@ def wrapped(self, *args, **kwargs):
return ZMQStream(socket, self.loop)
return wrapped


class IOLoopKernelManager(KernelManager):

loop = Instance('tornado.ioloop.IOLoop')

def _loop_default(self):
return ioloop.IOLoop.current()

Expand Down Expand Up @@ -59,3 +61,43 @@ def stop_restarter(self):
connect_iopub = as_zmqstream(KernelManager.connect_iopub)
connect_stdin = as_zmqstream(KernelManager.connect_stdin)
connect_hb = as_zmqstream(KernelManager.connect_hb)


class AsyncIOLoopKernelManager(AsyncKernelManager):

loop = Instance('tornado.ioloop.IOLoop')

def _loop_default(self):
return ioloop.IOLoop.current()

restarter_class = Type(
default_value=AsyncIOLoopKernelRestarter,
klass=AsyncIOLoopKernelRestarter,
help=(
'Type of KernelRestarter to use. '
'Must be a subclass of AsyncIOLoopKernelManager.\n'
'Override this to customize how kernel restarts are managed.'
),
config=True,
)
_restarter = Instance('jupyter_client.ioloop.AsyncIOLoopKernelRestarter', allow_none=True)

def start_restarter(self):
if self.autorestart and self.has_kernel:
if self._restarter is None:
self._restarter = self.restarter_class(
kernel_manager=self, loop=self.loop,
parent=self, log=self.log
)
self._restarter.start()

def stop_restarter(self):
if self.autorestart:
if self._restarter is not None:
self._restarter.stop()
self._restarter = None

connect_shell = as_zmqstream(AsyncKernelManager.connect_shell)
connect_iopub = as_zmqstream(AsyncKernelManager.connect_iopub)
connect_stdin = as_zmqstream(AsyncKernelManager.connect_stdin)
connect_hb = as_zmqstream(AsyncKernelManager.connect_hb)
38 changes: 38 additions & 0 deletions jupyter_client/ioloop/restarter.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@
Instance,
)


class IOLoopKernelRestarter(KernelRestarter):
"""Monitor and autorestart a kernel."""

loop = Instance('tornado.ioloop.IOLoop')

def _loop_default(self):
warnings.warn("IOLoopKernelRestarter.loop is deprecated in jupyter-client 5.2",
DeprecationWarning, stacklevel=4,
Expand All @@ -41,3 +43,39 @@ def stop(self):
if self._pcallback is not None:
self._pcallback.stop()
self._pcallback = None


class AsyncIOLoopKernelRestarter(IOLoopKernelRestarter):

async def poll(self):
if self.debug:
self.log.debug('Polling kernel...')
is_alive = await self.kernel_manager.is_alive()
if not is_alive:
if self._restarting:
self._restart_count += 1
else:
self._restart_count = 1

if self._restart_count >= self.restart_limit:
self.log.warning("AsyncIOLoopKernelRestarter: restart failed")
self._fire_callbacks('dead')
self._restarting = False
self._restart_count = 0
self.stop()
else:
newports = self.random_ports_until_alive and self._initial_startup
self.log.info('AsyncIOLoopKernelRestarter: restarting kernel (%i/%i), %s random ports',
self._restart_count,
self.restart_limit,
'new' if newports else 'keep'
)
self._fire_callbacks('restart')
await self.kernel_manager.restart_kernel(now=True, newports=newports)
self._restarting = True
else:
if self._initial_startup:
self._initial_startup = False
if self._restarting:
self.log.debug("AsyncIOLoopKernelRestarter: restart apparently succeeded")
self._restarting = False
Loading

0 comments on commit 99bb826

Please sign in to comment.