# Let's build a kernel proxy with a special purpose!

We're going to make a kernel that lanches another kernel underneath, while proxying all the messages to/from it.

Additionally, we'll build it on top of AsyncKernelManager and AsyncKernelClient

In [8]:
import asyncio

In [21]:
from jupyter_client import AsyncKernelManager

In [47]:
jupyter_client.__version__

'6.1.2'

Launching a kernel can be done with a KernelManager. For this post, we'll be using an AsyncKernelManager which comes wiht the latest version of `jupyter_client` at the time of this writing (6.1.2).

**NOTE**: You'll see some "pre-amble" code in upcoming cells where "cleanup" is done before a variable is defined

a lot of code where cleanup is performed before we even define the 

In [136]:
# Setup a way to shutdown a kernel cleanly:
# * The channels on a kernel client
# * The kernel itself on the kernel manager
async def shutdown_kernel():
    kc.stop_channels()
    await km.shutdown_kernel()

# To facilitate re-running this cell, we shut down prior kernels at
# the start. It's a no-op when the kernel hasn't been started yet.
try:
    await shutdown_kernel()
except:
    pass

km = AsyncKernelManager(kernel_name="python3")

# Start the kernel process
await km.start_kernel()

# Create our "client" for interfacing with the kernel
kc = km.client()

# Connect to the communication channels
kc.start_channels()

try:
    # Wait for the kernel itself to be ready, though only letting it run for 100ms
    await kc.wait_for_ready(timeout=100)
except RuntimeError:
    kc.stop_channels()
    await km.shutdown_kernel()
    raise

With this started, we can now execute code on the kernel using the `KernelClient`, `kc`.

In [137]:
msg_id = kc.execute('x = 3')
reply = await kc.get_shell_msg(100)
display(reply)

{'header': {'msg_id': 'f76e38d5-d72a6605fc981ebf03293e90_6',
  'msg_type': 'execute_reply',
  'username': 'kylek',
  'session': 'f76e38d5-d72a6605fc981ebf03293e90',
  'date': datetime.datetime(2020, 4, 8, 20, 19, 51, 957126, tzinfo=tzutc()),
  'version': '5.3'},
 'msg_id': 'f76e38d5-d72a6605fc981ebf03293e90_6',
 'msg_type': 'execute_reply',
 'parent_header': {'msg_id': 'ef304761-93d40d632d1a4198fb27b864_1',
  'msg_type': 'execute_request',
  'username': 'kylek',
  'session': 'ef304761-93d40d632d1a4198fb27b864',
  'date': datetime.datetime(2020, 4, 8, 20, 19, 51, 952841, tzinfo=tzutc()),
  'version': '5.3'},
 'metadata': {'started': '2020-04-08T20:19:51.954173Z',
  'dependencies_met': True,
  'engine': 'b17287e4-644e-4225-9854-08166f4b027a',
  'status': 'ok'},
 'content': {'status': 'ok',
  'execution_count': 1,
  'user_expressions': {},
  'payload': []},
 'buffers': []}

There are two primary channels that we're going to care about in this notebook: shell and iopub.

Shell is used for sending commands
IOPub has all the good stuff (like output!)

There can be a lot of IOPub messages, so we expect to `await` each one separately.

In [138]:
msg = await kc.get_iopub_msg()
msg

{'header': {'msg_id': 'f76e38d5-d72a6605fc981ebf03293e90_4',
  'msg_type': 'status',
  'username': 'kylek',
  'session': 'f76e38d5-d72a6605fc981ebf03293e90',
  'date': datetime.datetime(2020, 4, 8, 20, 19, 51, 953947, tzinfo=tzutc()),
  'version': '5.3'},
 'msg_id': 'f76e38d5-d72a6605fc981ebf03293e90_4',
 'msg_type': 'status',
 'parent_header': {'msg_id': 'ef304761-93d40d632d1a4198fb27b864_1',
  'msg_type': 'execute_request',
  'username': 'kylek',
  'session': 'ef304761-93d40d632d1a4198fb27b864',
  'date': datetime.datetime(2020, 4, 8, 20, 19, 51, 952841, tzinfo=tzutc()),
  'version': '5.3'},
 'metadata': {},
 'content': {'execution_state': 'busy'},
 'buffers': []}

We don't have to do that though, we can set up a background task. It looks like this:

In [142]:
# Here's an example task
task = asyncio.create_task(kc.get_iopub_msg())
await task

print("done: ", task.done())
print("result: ", task.result())

done:  True
result:  {'header': {'msg_id': 'f76e38d5-d72a6605fc981ebf03293e90_7', 'msg_type': 'status', 'username': 'kylek', 'session': 'f76e38d5-d72a6605fc981ebf03293e90', 'date': datetime.datetime(2020, 4, 8, 20, 19, 51, 957888, tzinfo=tzutc()), 'version': '5.3'}, 'msg_id': 'f76e38d5-d72a6605fc981ebf03293e90_7', 'msg_type': 'status', 'parent_header': {'msg_id': 'ef304761-93d40d632d1a4198fb27b864_1', 'msg_type': 'execute_request', 'username': 'kylek', 'session': 'ef304761-93d40d632d1a4198fb27b864', 'date': datetime.datetime(2020, 4, 8, 20, 19, 51, 952841, tzinfo=tzutc()), 'version': '5.3'}, 'metadata': {}, 'content': {'execution_state': 'idle'}, 'buffers': []}


Tasks are cancellable, exceptions can be looked into, callbacks can be added. They're a pretty basic yet incredibly useful primitive.

In [147]:
# Set up some nice formatting calls

from IPython.display import HTML
from datetime import datetime
from datetime import timezone
from datetime import timedelta
import json

# Serialize datetime as ISO8609 datetime strings
def serializer(obj):
    if isinstance(obj, datetime):
        return { '_isoformat': obj.isoformat() }
    return super().default(obj)

# Pretty print a message
def pretty(msg):
    return HTML(f"""<pre>{json.dumps(msg, indent=2, default=serializer)}</pre>""")

In [151]:
# Gather all messages in an array
dump = []

# Grab all IOPub messages
async def dump_iopub():
    while True:
        msg = await kc.get_iopub_msg()
        dump.append(msg)
        # Slow down for the sake of this demo
        await asyncio.sleep(0.1)
        handle.update(jsonish(msg))

display(HTML("<h2>Last IOPUB message</h2>"))
handle = display(pretty(msg), display_id=True)

In [152]:
# Cleanup if the task is already running and we re-run this cell
try:
    iopub_task.cancel()
except:
    pass

# Start handling that task in the background!
iopub_task = asyncio.create_task(dump_iopub())

In [154]:
# Run this cell while watching above
kc.execute("x = 3")

'ef304761-93d40d632d1a4198fb27b864_4'

In [155]:
iopub_task.done()

False

In [156]:
iopub_task.get_stack()

[<frame at 0x7ff55687c2d0, file '<ipython-input-151-564b4dbe5f3f>', line 7, code dump_iopub>]

In [158]:
# If there was an exception we can get it here
iopub_task.exception()

InvalidStateError: Exception is not set.

# Junk Experiments

In [108]:
# Create a message using kc.session.msg
msg = kc.session.msg('junk', {"pooh": True})
display(msg)

{'header': {'msg_id': '98322640-77bda1aa4fb7396deba76556_8',
  'msg_type': 'junk',
  'username': 'kylek',
  'session': '98322640-77bda1aa4fb7396deba76556',
  'date': datetime.datetime(2020, 4, 8, 19, 23, 25, 872628, tzinfo=datetime.timezone.utc),
  'version': '5.3'},
 'msg_id': '98322640-77bda1aa4fb7396deba76556_8',
 'msg_type': 'junk',
 'parent_header': {},
 'content': {'pooh': True},
 'metadata': {}}

In [109]:
# Send is not awaitable, it sends immediately
kc.shell_channel.send(msg)

In [119]:
reply = await kc.get_shell_msg(100)

CancelledError: 

In [None]:
reply