diff --git a/ipykernel/inprocess/client.py b/ipykernel/inprocess/client.py index 1784a6eaa..f424eedee 100644 --- a/ipykernel/inprocess/client.py +++ b/ipykernel/inprocess/client.py @@ -173,10 +173,8 @@ def _dispatch_to_kernel(self, msg): raise RuntimeError('Cannot send request. No kernel exists.') stream = kernel.shell_stream - self.session.send(stream, msg) - msg_parts = stream.recv_multipart() loop = asyncio.get_event_loop() - loop.run_until_complete(kernel.dispatch_shell(msg_parts)) + loop.run_until_complete(kernel.dispatch_shell(msg)) idents, reply_msg = self.session.recv(stream, copy=False) self.shell_channel.call_handlers_later(reply_msg) diff --git a/ipykernel/ipkernel.py b/ipykernel/ipkernel.py index bb50c04cc..4c594c3df 100644 --- a/ipykernel/ipkernel.py +++ b/ipykernel/ipkernel.py @@ -74,6 +74,8 @@ def _user_ns_changed(self, change): _sys_raw_input = Any() _sys_eval_input = Any() + comm_msg_types = [ 'comm_open', 'comm_msg', 'comm_close' ] + def __init__(self, **kwargs): super(IPythonKernel, self).__init__(**kwargs) @@ -102,8 +104,7 @@ def __init__(self, **kwargs): self.comm_manager = CommManager(parent=self, kernel=self) self.shell.configurables.append(self.comm_manager) - comm_msg_types = [ 'comm_open', 'comm_msg', 'comm_close' ] - for msg_type in comm_msg_types: + for msg_type in self.comm_msg_types: self.shell_handlers[msg_type] = getattr(self.comm_manager, msg_type) if _use_appnope() and self._darwin_app_nap: @@ -583,6 +584,16 @@ def do_clear(self): self.shell.reset(False) return dict(status='ok') + def should_dispatch_immediately(self, msg): + try: + msg_type = msg['header']['msg_type'] + if msg_type in self.comm_msg_types: + return True + except ValueError: + pass + + return False + # This exists only for backwards compatibility - use IPythonKernel instead diff --git a/ipykernel/kernelbase.py b/ipykernel/kernelbase.py index 39d6d2661..8e8d0803b 100644 --- a/ipykernel/kernelbase.py +++ b/ipykernel/kernelbase.py @@ -16,6 +16,7 @@ import time import uuid import warnings +from contextvars import ContextVar try: # jupyter_client >= 5, use tz-aware now @@ -134,7 +135,7 @@ def _default_ident(self): # track associations with current request _allow_stdin = Bool(False) - _parents = Dict({"shell": {}, "control": {}}) + _parents = Dict({"shell": ContextVar("shell_parent", default={}), "control": ContextVar("control_parent", default={})}) _parent_ident = Dict({'shell': b'', 'control': b''}) @property @@ -301,18 +302,14 @@ def should_handle(self, stream, msg, idents): return False return True - async def dispatch_shell(self, msg): + async def dispatch_shell(self, msg, idents=None): """dispatch shell requests""" # flush control queue before handling shell requests await self._flush_control_queue() - idents, msg = self.session.feed_identities(msg, copy=False) - try: - msg = self.session.deserialize(msg, content=True, copy=False) - except Exception: - self.log.error("Invalid Message", exc_info=True) - return + if idents is None: + idents = [] # Set the parent message for side effects. self.set_parent(idents, msg, channel='shell') @@ -466,15 +463,38 @@ async def dispatch_queue(self): def _message_counter_default(self): return itertools.count() - def schedule_dispatch(self, dispatch, *args): + def should_dispatch_immediately(self, msg): + """ + This provides a hook for dispatching incoming messages + from the frontend immediately, and out of order. + + It could be used to allow asynchronous messages from + GUIs to be processed. + """ + return False + + def schedule_dispatch(self, msg, dispatch): """schedule a message for dispatch""" + + idents, msg = self.session.feed_identities(msg, copy=False) + try: + msg = self.session.deserialize(msg, content=True, copy=False) + except: + self.log.error("Invalid shell message", exc_info=True) + return + + new_args = (msg, idents) + + if self.should_dispatch_immediately(msg): + return self.io_loop.add_callback(dispatch, *new_args) + idx = next(self._message_counter) self.msg_queue.put_nowait( ( idx, dispatch, - args, + new_args, ) ) # ensure the eventloop wakes up @@ -498,7 +518,7 @@ def start(self): self.shell_stream.on_recv( partial( self.schedule_dispatch, - self.dispatch_shell, + dispatch=self.dispatch_shell, ), copy=False, ) @@ -556,7 +576,7 @@ def set_parent(self, ident, parent, channel='shell'): on the stdin channel. """ self._parent_ident[channel] = ident - self._parents[channel] = parent + self._parents[channel].set(parent) def get_parent(self, channel="shell"): """Get the parent request associated with a channel. @@ -573,7 +593,7 @@ def get_parent(self, channel="shell"): message : dict the parent message for the most recent request on the channel. """ - return self._parents.get(channel, {}) + return self._parents.get(channel, {}).get({}) def send_response(self, stream, msg_or_type, content=None, ident=None, buffers=None, track=False, header=None, metadata=None, channel='shell'):