Skip to content

feat: Run exec_request inside a task #1384

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 29 commits into
base: main
Choose a base branch
from
Draft
Changes from 1 commit
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
fb7b960
feat: enhance kernel message handling with memory streams to run exec…
fleming79 Mar 29, 2025
17ebe6b
Allow infinite buffer size for memory object streams in kernel
fleming79 Mar 29, 2025
5f71eaa
fix: handle kernel abort requests gracefully in message processing
fleming79 Mar 30, 2025
62300ed
Simplify abort handling and make send_stream a required parameter for…
fleming79 Mar 30, 2025
1828a88
Fix publish status for execute_request
fleming79 Mar 30, 2025
211094b
Ensure one send_stream per socket/thread
fleming79 Mar 31, 2025
f13959b
Disable timeouts for debugging tests with debugpy
fleming79 Mar 31, 2025
bee82d1
Minor tweaks to tests to make them work with kernel changes.
fleming79 Mar 31, 2025
aaa29fe
Add anyio Event for _main_shell_ready to aid with start and test reli…
fleming79 Mar 31, 2025
5bc1985
Update timing assertions in concurrent test to improve reliability
fleming79 Mar 31, 2025
533ca75
Restore test timeouts to original values
fleming79 Mar 31, 2025
f0e5116
Add small delay in process_control_message to aid with test_sequenti…
fleming79 Mar 31, 2025
8b4b086
Remove unused _eventloop_set
fleming79 Apr 1, 2025
fa09c5f
Rename _main_shell_ready to _main_subshell_ready
fleming79 Apr 1, 2025
6a68f02
Try to make test_run_concurrently_timing more reliable
fleming79 Apr 1, 2025
104ef76
Improve test_tk_loop to run localy.
fleming79 Apr 1, 2025
7b5d3e6
Add asyncio_event_loop (currently only set for asyncio backend) to th…
fleming79 Apr 1, 2025
5cb0f60
Merge branch 'ipython:main' into run-execute-request-in-task
fleming79 Apr 1, 2025
ee8b768
Fix typos in CHANGELOG.md
fleming79 Apr 1, 2025
555f4b2
Close receive_stream after task group has exited.
fleming79 Apr 2, 2025
2b3d6ec
Pass parent directly to publish_status calls to avoid changing parent…
fleming79 Apr 2, 2025
4586e34
Pass subshell_id to _execute_request_handler
fleming79 Apr 2, 2025
62d46b4
Provide a TaskGroup in the kernel with a shielded CancelScope
fleming79 Apr 2, 2025
8fc455c
Add BlockingPortal and enhance task management in Kernel class
fleming79 Apr 4, 2025
02c59e8
Catch errors on main subshell stop and remove redundant cancel scope.
fleming79 Apr 4, 2025
da2d27f
Add optional name parameter to start_soon for improved task identific…
fleming79 Apr 4, 2025
7a85d84
Refactor _execute_request_loop to ensure status is published in final…
fleming79 Apr 5, 2025
19ee3e2
Remove deprecated metadata handling and unused log module
fleming79 Apr 5, 2025
5798b4c
Only set parent and ident from _execute_request_loop.
fleming79 Apr 6, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 30 additions & 9 deletions ipykernel/kernelbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,13 @@
import psutil
import zmq
import zmq_anyio
from anyio import TASK_STATUS_IGNORED, create_task_group, sleep, to_thread
from anyio import (
TASK_STATUS_IGNORED,
create_memory_object_stream,
create_task_group,
sleep,
to_thread,
)
from anyio.abc import TaskStatus
from IPython.core.error import StdinNotImplementedError
from jupyter_client.session import Session
Expand Down Expand Up @@ -418,27 +424,39 @@ async def shell_main(self, subshell_id: str | None):
assert subshell_id is None
assert threading.current_thread() == threading.main_thread()
socket = None

send_stream, receive_stream = create_memory_object_stream()
async with create_task_group() as tg:
if not socket.started.is_set():
await tg.start(socket.start)
tg.start_soon(self.process_shell, socket)
tg.start_soon(self.process_shell, socket, send_stream)
tg.start_soon(self._execute_request_handler, receive_stream)
if subshell_id is None:
# Main subshell.
await to_thread.run_sync(self.shell_stop.wait)
tg.cancel_scope.cancel()

async def process_shell(self, socket=None):
async def _execute_request_handler(self, receive_stream):
async with receive_stream:
async for handler, (socket, idents, msg) in receive_stream:
try:
result = handler(socket, idents, msg)
self.set_parent(idents, msg, channel="shell")
if inspect.isawaitable(result):
await result
except Exception as e:
self.log.exception("Execute request", exc_info=e)

async def process_shell(self, socket, send_stream):
# socket=None is valid if kernel subshells are not supported.
try:
while True:
await self.process_shell_message(socket=socket)
await self.process_shell_message(socket=socket, send_stream=send_stream)
except BaseException:
if self.shell_stop.is_set():
return
raise

async def process_shell_message(self, msg=None, socket=None):
async def process_shell_message(self, msg=None, socket=None, send_stream=None):
# If socket is None kernel subshells are not supported so use socket=shell_socket.
# If msg is set, process that message.
# If msg is None, await the next message to arrive on the socket.
Expand Down Expand Up @@ -507,9 +525,12 @@ async def process_shell_message(self, msg=None, socket=None):
except Exception:
self.log.debug("Unable to signal in pre_handler_hook:", exc_info=True)
try:
result = handler(socket, idents, msg)
if inspect.isawaitable(result):
await result
if msg_type == "execute_request" and send_stream:
await send_stream.send((handler, (socket, idents, msg)))
else:
result = handler(socket, idents, msg)
if inspect.isawaitable(result):
await result
except Exception:
self.log.error("Exception in message handler:", exc_info=True) # noqa: G201
except KeyboardInterrupt:
Expand Down
Loading