From 22bd412b25b6deaf731af6d92c7be3f2b0e915b0 Mon Sep 17 00:00:00 2001 From: David Brochart Date: Tue, 20 Jul 2021 15:38:28 +0200 Subject: [PATCH 1/3] Use ContextVar for parent messages --- ipykernel/kernelbase.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/ipykernel/kernelbase.py b/ipykernel/kernelbase.py index 39d6d2661..bd2fe31f3 100644 --- a/ipykernel/kernelbase.py +++ b/ipykernel/kernelbase.py @@ -16,6 +16,7 @@ import time import uuid import warnings +from contextvars import ContextVar try: # jupyter_client >= 5, use tz-aware now @@ -134,7 +135,7 @@ def _default_ident(self): # track associations with current request _allow_stdin = Bool(False) - _parents = Dict({"shell": {}, "control": {}}) + _parents = Dict({"shell": ContextVar("shell_parent", default={}), "control": ContextVar("control_parent", default={})}) _parent_ident = Dict({'shell': b'', 'control': b''}) @property @@ -556,7 +557,7 @@ def set_parent(self, ident, parent, channel='shell'): on the stdin channel. """ self._parent_ident[channel] = ident - self._parents[channel] = parent + self._parents[channel].set(parent) def get_parent(self, channel="shell"): """Get the parent request associated with a channel. @@ -573,7 +574,7 @@ def get_parent(self, channel="shell"): message : dict the parent message for the most recent request on the channel. """ - return self._parents.get(channel, {}) + return self._parents.get(channel, {}).get({}) def send_response(self, stream, msg_or_type, content=None, ident=None, buffers=None, track=False, header=None, metadata=None, channel='shell'): From db498e0debfb699314263d7e5d02520812fb4742 Mon Sep 17 00:00:00 2001 From: RK Date: Mon, 1 Feb 2021 09:44:10 +0000 Subject: [PATCH 2/3] Issue #12786: Create hook for dispatching messages out of order --- ipykernel/inprocess/client.py | 4 +--- ipykernel/kernelbase.py | 39 ++++++++++++++++++++++++++--------- 2 files changed, 30 insertions(+), 13 deletions(-) diff --git a/ipykernel/inprocess/client.py b/ipykernel/inprocess/client.py index 1784a6eaa..f424eedee 100644 --- a/ipykernel/inprocess/client.py +++ b/ipykernel/inprocess/client.py @@ -173,10 +173,8 @@ def _dispatch_to_kernel(self, msg): raise RuntimeError('Cannot send request. No kernel exists.') stream = kernel.shell_stream - self.session.send(stream, msg) - msg_parts = stream.recv_multipart() loop = asyncio.get_event_loop() - loop.run_until_complete(kernel.dispatch_shell(msg_parts)) + loop.run_until_complete(kernel.dispatch_shell(msg)) idents, reply_msg = self.session.recv(stream, copy=False) self.shell_channel.call_handlers_later(reply_msg) diff --git a/ipykernel/kernelbase.py b/ipykernel/kernelbase.py index bd2fe31f3..8e8d0803b 100644 --- a/ipykernel/kernelbase.py +++ b/ipykernel/kernelbase.py @@ -302,18 +302,14 @@ def should_handle(self, stream, msg, idents): return False return True - async def dispatch_shell(self, msg): + async def dispatch_shell(self, msg, idents=None): """dispatch shell requests""" # flush control queue before handling shell requests await self._flush_control_queue() - idents, msg = self.session.feed_identities(msg, copy=False) - try: - msg = self.session.deserialize(msg, content=True, copy=False) - except Exception: - self.log.error("Invalid Message", exc_info=True) - return + if idents is None: + idents = [] # Set the parent message for side effects. self.set_parent(idents, msg, channel='shell') @@ -467,15 +463,38 @@ async def dispatch_queue(self): def _message_counter_default(self): return itertools.count() - def schedule_dispatch(self, dispatch, *args): + def should_dispatch_immediately(self, msg): + """ + This provides a hook for dispatching incoming messages + from the frontend immediately, and out of order. + + It could be used to allow asynchronous messages from + GUIs to be processed. + """ + return False + + def schedule_dispatch(self, msg, dispatch): """schedule a message for dispatch""" + + idents, msg = self.session.feed_identities(msg, copy=False) + try: + msg = self.session.deserialize(msg, content=True, copy=False) + except: + self.log.error("Invalid shell message", exc_info=True) + return + + new_args = (msg, idents) + + if self.should_dispatch_immediately(msg): + return self.io_loop.add_callback(dispatch, *new_args) + idx = next(self._message_counter) self.msg_queue.put_nowait( ( idx, dispatch, - args, + new_args, ) ) # ensure the eventloop wakes up @@ -499,7 +518,7 @@ def start(self): self.shell_stream.on_recv( partial( self.schedule_dispatch, - self.dispatch_shell, + dispatch=self.dispatch_shell, ), copy=False, ) From ca2ae36064506c1ed1511533371ae0dc1a5efc09 Mon Sep 17 00:00:00 2001 From: RK Date: Mon, 1 Feb 2021 11:15:41 +0000 Subject: [PATCH 3/3] Issue #12786: Implement hook for Comm messages --- ipykernel/ipkernel.py | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/ipykernel/ipkernel.py b/ipykernel/ipkernel.py index bb50c04cc..4c594c3df 100644 --- a/ipykernel/ipkernel.py +++ b/ipykernel/ipkernel.py @@ -74,6 +74,8 @@ def _user_ns_changed(self, change): _sys_raw_input = Any() _sys_eval_input = Any() + comm_msg_types = [ 'comm_open', 'comm_msg', 'comm_close' ] + def __init__(self, **kwargs): super(IPythonKernel, self).__init__(**kwargs) @@ -102,8 +104,7 @@ def __init__(self, **kwargs): self.comm_manager = CommManager(parent=self, kernel=self) self.shell.configurables.append(self.comm_manager) - comm_msg_types = [ 'comm_open', 'comm_msg', 'comm_close' ] - for msg_type in comm_msg_types: + for msg_type in self.comm_msg_types: self.shell_handlers[msg_type] = getattr(self.comm_manager, msg_type) if _use_appnope() and self._darwin_app_nap: @@ -583,6 +584,16 @@ def do_clear(self): self.shell.reset(False) return dict(status='ok') + def should_dispatch_immediately(self, msg): + try: + msg_type = msg['header']['msg_type'] + if msg_type in self.comm_msg_types: + return True + except ValueError: + pass + + return False + # This exists only for backwards compatibility - use IPythonKernel instead