Skip to content

Commit

Permalink
Issue #12786: Create hook for dispatching messages out of order
Browse files Browse the repository at this point in the history
  • Loading branch information
sonthonaxrk authored and SylvainCorlay committed Apr 2, 2021
1 parent d6c78de commit b763d54
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 14 deletions.
5 changes: 1 addition & 4 deletions ipykernel/inprocess/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,10 +171,7 @@ def _dispatch_to_kernel(self, msg):
raise RuntimeError('Cannot send request. No kernel exists.')

stream = kernel.shell_stream
self.session.send(stream, msg)
msg_parts = stream.recv_multipart()
kernel.dispatch_shell(msg_parts)

kernel.dispatch_shell(msg)
idents, reply_msg = self.session.recv(stream, copy=False)
self.shell_channel.call_handlers_later(reply_msg)

Expand Down
39 changes: 29 additions & 10 deletions ipykernel/kernelbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,14 +240,10 @@ def should_handle(self, stream, msg, idents):
return True

@gen.coroutine
def dispatch_shell(self, msg):
def dispatch_shell(self, msg, idents=None):
"""dispatch shell requests"""
idents, msg = self.session.feed_identities(msg, copy=False)
try:
msg = self.session.deserialize(msg, content=True, copy=False)
except:
self.log.error("Invalid Message", exc_info=True)
return
if idents is None:
idents = []

# Set the parent message for side effects.
self.set_parent(idents, msg, channel='shell')
Expand Down Expand Up @@ -403,15 +399,38 @@ def dispatch_queue(self):
def _message_counter_default(self):
return itertools.count()

def schedule_dispatch(self, dispatch, *args):
def should_dispatch_immediately(self, msg):
"""
This provides a hook for dispatching incoming messages
from the frontend immediately, and out of order.
It could be used to allow asynchronous messages from
GUIs to be processed.
"""
return False

def schedule_dispatch(self, msg, dispatch):
"""schedule a message for dispatch"""

idents, msg = self.session.feed_identities(msg, copy=False)
try:
msg = self.session.deserialize(msg, content=True, copy=False)
except:
self.log.error("Invalid shell message", exc_info=True)
return

new_args = (msg, idents)

if self.should_dispatch_immediately(msg):
return self.io_loop.add_callback(dispatch, *new_args)

idx = next(self._message_counter)

self.msg_queue.put_nowait(
(
idx,
dispatch,
args,
new_args,
)
)
# ensure the eventloop wakes up
Expand All @@ -428,7 +447,7 @@ def start(self):
self.shell_stream.on_recv(
partial(
self.schedule_dispatch,
self.dispatch_shell,
dispatch=self.dispatch_shell,
),
copy=False,
)
Expand Down

0 comments on commit b763d54

Please sign in to comment.