From 9c0b4c0e28a9c17d9c14123522415b2048bd9522 Mon Sep 17 00:00:00 2001 From: David Brochart Date: Thu, 8 Apr 2021 11:31:31 +0200 Subject: [PATCH] Fix qtconsole issues --- .github/workflows/main.yml | 2 +- jupyter_client/asynchronous/client.py | 14 ++-- jupyter_client/blocking/client.py | 14 ++-- jupyter_client/channels.py | 3 +- jupyter_client/client.py | 25 ++++--- jupyter_client/ssh/tunnel.py | 6 +- jupyter_client/threaded.py | 96 +++++++++++++++++++-------- jupyter_client/utils.py | 4 +- 8 files changed, 101 insertions(+), 63 deletions(-) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 3af28b7a4..fa45a7cd7 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -56,7 +56,7 @@ jobs: pip install --upgrade --upgrade-strategy eager --pre -e .[test] pytest-cov codecov 'coverage<5' pip freeze - name: Check types - run: mypy jupyter_client/manager.py jupyter_client/multikernelmanager.py jupyter_client/client.py jupyter_client/blocking/client.py jupyter_client/asynchronous/client.py jupyter_client/channels.py jupyter_client/session.py jupyter_client/adapter.py jupyter_client/connect.py jupyter_client/consoleapp.py jupyter_client/jsonutil.py jupyter_client/kernelapp.py jupyter_client/launcher.py + run: mypy jupyter_client/manager.py jupyter_client/multikernelmanager.py jupyter_client/client.py jupyter_client/blocking/client.py jupyter_client/asynchronous/client.py jupyter_client/channels.py jupyter_client/session.py jupyter_client/adapter.py jupyter_client/connect.py jupyter_client/consoleapp.py jupyter_client/jsonutil.py jupyter_client/kernelapp.py jupyter_client/launcher.py jupyter_client/threaded.py - name: Run the tests run: py.test --cov jupyter_client -v jupyter_client - name: Code coverage diff --git a/jupyter_client/asynchronous/client.py b/jupyter_client/asynchronous/client.py index d67dcd482..790a5f8c8 100644 --- a/jupyter_client/asynchronous/client.py +++ b/jupyter_client/asynchronous/client.py @@ -49,15 +49,15 @@ class AsyncKernelClient(KernelClient): _recv_reply = KernelClient._async_recv_reply # replies come on the shell channel - execute = reqrep(wrapped, KernelClient._execute) - history = reqrep(wrapped, KernelClient._history) - complete = reqrep(wrapped, KernelClient._complete) - inspect = reqrep(wrapped, KernelClient._inspect) - kernel_info = reqrep(wrapped, KernelClient._kernel_info) - comm_info = reqrep(wrapped, KernelClient._comm_info) + execute = reqrep(wrapped, KernelClient.execute) + history = reqrep(wrapped, KernelClient.history) + complete = reqrep(wrapped, KernelClient.complete) + inspect = reqrep(wrapped, KernelClient.inspect) + kernel_info = reqrep(wrapped, KernelClient.kernel_info) + comm_info = reqrep(wrapped, KernelClient.comm_info) is_alive = KernelClient._async_is_alive execute_interactive = KernelClient._async_execute_interactive # replies come on the control channel - shutdown = reqrep(wrapped, KernelClient._shutdown, channel="control") + shutdown = reqrep(wrapped, KernelClient.shutdown, channel="control") diff --git a/jupyter_client/blocking/client.py b/jupyter_client/blocking/client.py index bbc80f9f1..81d4d6474 100644 --- a/jupyter_client/blocking/client.py +++ b/jupyter_client/blocking/client.py @@ -53,15 +53,15 @@ class BlockingKernelClient(KernelClient): _recv_reply = run_sync(KernelClient._async_recv_reply) # replies come on the shell channel - execute = reqrep(wrapped, KernelClient._execute) - history = reqrep(wrapped, KernelClient._history) - complete = reqrep(wrapped, KernelClient._complete) - inspect = reqrep(wrapped, KernelClient._inspect) - kernel_info = reqrep(wrapped, KernelClient._kernel_info) - comm_info = reqrep(wrapped, KernelClient._comm_info) + execute = reqrep(wrapped, KernelClient.execute) + history = reqrep(wrapped, KernelClient.history) + complete = reqrep(wrapped, KernelClient.complete) + inspect = reqrep(wrapped, KernelClient.inspect) + kernel_info = reqrep(wrapped, KernelClient.kernel_info) + comm_info = reqrep(wrapped, KernelClient.comm_info) is_alive = run_sync(KernelClient._async_is_alive) execute_interactive = run_sync(KernelClient._async_execute_interactive) # replies come on the control channel - shutdown = reqrep(wrapped, KernelClient._shutdown, channel="control") + shutdown = reqrep(wrapped, KernelClient.shutdown, channel="control") diff --git a/jupyter_client/channels.py b/jupyter_client/channels.py index abec3eaa4..09d4fd51c 100644 --- a/jupyter_client/channels.py +++ b/jupyter_client/channels.py @@ -51,7 +51,7 @@ class HBChannel(Thread): def __init__( self, - context: zmq.asyncio.Context, + context: zmq.asyncio.Context = None, session: t.Optional[Session] = None, address: t.Union[t.Tuple[str, int], str] = "", ): @@ -100,6 +100,7 @@ def _create_socket(self) -> None: # close previous socket, before opening a new one self.poller.unregister(self.socket) self.socket.close() + assert self.context is not None self.socket = self.context.socket(zmq.REQ) self.socket.linger = 1000 self.socket.connect(self.address) diff --git a/jupyter_client/client.py b/jupyter_client/client.py index 323263bae..d71eac5ab 100644 --- a/jupyter_client/client.py +++ b/jupyter_client/client.py @@ -160,7 +160,7 @@ async def _async_wait_for_ready(self, timeout: t.Optional[float] = None) -> None # Wait for kernel info reply on shell channel while True: - self._kernel_info() + self.kernel_info() try: msg = await self.shell_channel.get_msg(timeout=1) except Empty: @@ -386,10 +386,9 @@ async def _async_is_alive(self) -> bool: # We don't have access to the KernelManager, # so we use the heartbeat. return self._hb_channel.is_beating() - else: - # no heartbeat and not local, we can't tell if it's running, - # so naively return True - return True + # no heartbeat and not local, we can't tell if it's running, + # so naively return True + return True async def _async_execute_interactive( self, @@ -463,7 +462,7 @@ async def _async_execute_interactive( allow_stdin = self.allow_stdin if allow_stdin and not self.stdin_channel.is_alive(): raise RuntimeError("stdin channel must be running to allow input") - msg_id = self._execute( + msg_id = self.execute( code, silent=silent, store_history=store_history, @@ -541,7 +540,7 @@ async def _async_execute_interactive( return await self._async_recv_reply(msg_id, timeout=timeout) # Methods to send specific messages on channels - def _execute( + def execute( self, code: str, silent: bool = False, @@ -608,7 +607,7 @@ def _execute( self.shell_channel.send(msg) return msg["header"]["msg_id"] - def _complete(self, code: str, cursor_pos: t.Optional[int] = None) -> str: + def complete(self, code: str, cursor_pos: t.Optional[int] = None) -> str: """Tab complete text in the kernel's namespace. Parameters @@ -631,7 +630,7 @@ def _complete(self, code: str, cursor_pos: t.Optional[int] = None) -> str: self.shell_channel.send(msg) return msg["header"]["msg_id"] - def _inspect(self, code: str, cursor_pos: t.Optional[int] = None, detail_level: int = 0) -> str: + def inspect(self, code: str, cursor_pos: t.Optional[int] = None, detail_level: int = 0) -> str: """Get metadata information about an object in the kernel's namespace. It is up to the kernel to determine the appropriate object to inspect. @@ -662,7 +661,7 @@ def _inspect(self, code: str, cursor_pos: t.Optional[int] = None, detail_level: self.shell_channel.send(msg) return msg["header"]["msg_id"] - def _history( + def history( self, raw: bool = True, output: bool = False, @@ -708,7 +707,7 @@ def _history( self.shell_channel.send(msg) return msg["header"]["msg_id"] - def _kernel_info(self) -> str: + def kernel_info(self) -> str: """Request kernel info Returns @@ -719,7 +718,7 @@ def _kernel_info(self) -> str: self.shell_channel.send(msg) return msg["header"]["msg_id"] - def _comm_info(self, target_name: t.Optional[str] = None) -> str: + def comm_info(self, target_name: t.Optional[str] = None) -> str: """Request comm info Returns @@ -760,7 +759,7 @@ def input(self, string: str) -> None: msg = self.session.msg("input_reply", content) self.stdin_channel.send(msg) - def _shutdown(self, restart: bool = False) -> str: + def shutdown(self, restart: bool = False) -> str: """Request an immediate kernel shutdown on the control channel. Upon receipt of the (empty) reply, client code can safely assume that diff --git a/jupyter_client/ssh/tunnel.py b/jupyter_client/ssh/tunnel.py index 9f95da12c..f7fe47edb 100644 --- a/jupyter_client/ssh/tunnel.py +++ b/jupyter_client/ssh/tunnel.py @@ -23,9 +23,9 @@ SSHException = paramiko.ssh_exception.SSHException except ImportError: - paramiko = None + paramiko = None # type: ignore - class SSHException(Exception): + class SSHException(Exception): # type: ignore pass @@ -33,7 +33,7 @@ class SSHException(Exception): from .forward import forward_tunnel try: - import pexpect + import pexpect # type: ignore except ImportError: pexpect = None diff --git a/jupyter_client/threaded.py b/jupyter_client/threaded.py index 25dfc6184..54e44a3d7 100644 --- a/jupyter_client/threaded.py +++ b/jupyter_client/threaded.py @@ -1,19 +1,27 @@ """ Defines a KernelClient that provides thread-safe sockets with async callbacks on message replies. """ +import asyncio import atexit import errno -import sys import time from threading import Event from threading import Thread - -from traitlets import Instance +from typing import Any +from typing import Awaitable +from typing import Dict +from typing import List +from typing import Optional +from typing import Union + +import zmq +from traitlets import Instance # type: ignore from traitlets import Type from zmq import ZMQError from zmq.eventloop import ioloop from zmq.eventloop import zmqstream +from .session import Session from jupyter_client import KernelClient from jupyter_client.channels import HBChannel @@ -22,6 +30,10 @@ # during garbage collection of threads at exit +async def get_msg(msg: Awaitable) -> Union[List[bytes], List[zmq.Message]]: + return await msg + + class ThreadedZMQSocketChannel(object): """A ZMQ socket invoking a callback in the ioloop""" @@ -31,7 +43,12 @@ class ThreadedZMQSocketChannel(object): stream = None _inspect = None - def __init__(self, socket, session, loop): + def __init__( + self, + socket: Optional[zmq.Socket], + session: Optional[Session], + loop: Optional[zmq.eventloop.ioloop.ZMQIOLoop], + ) -> None: """Create a channel. Parameters @@ -55,21 +72,22 @@ def setup_stream(): self.stream.on_recv(self._handle_recv) evt.set() + assert self.ioloop is not None self.ioloop.add_callback(setup_stream) evt.wait() _is_alive = False - def is_alive(self): + def is_alive(self) -> bool: return self._is_alive - def start(self): + def start(self) -> None: self._is_alive = True - def stop(self): + def stop(self) -> None: self._is_alive = False - def close(self): + def close(self) -> None: if self.socket is not None: try: self.socket.close(linger=0) @@ -77,7 +95,7 @@ def close(self): pass self.socket = None - def send(self, msg): + def send(self, msg: Dict[str, Any]) -> None: """Queue a message to be sent from the IOLoop's thread. Parameters @@ -91,21 +109,25 @@ def send(self, msg): def thread_send(): self.session.send(self.stream, msg) + assert self.ioloop is not None self.ioloop.add_callback(thread_send) - def _handle_recv(self, msg): + def _handle_recv(self, future_msg: Awaitable) -> None: """Callback for stream.on_recv. Unpacks message, and calls handlers with it. """ - ident, smsg = self.session.feed_identities(msg) + assert self.ioloop is not None + msg_list = self.ioloop._asyncio_event_loop.run_until_complete(get_msg(future_msg)) + assert self.session is not None + ident, smsg = self.session.feed_identities(msg_list) msg = self.session.deserialize(smsg) # let client inspect messages if self._inspect: self._inspect(msg) self.call_handlers(msg) - def call_handlers(self, msg): + def call_handlers(self, msg: Dict[str, Any]) -> None: """This method is called in the ioloop thread when a message arrives. Subclasses should override this method to handle incoming messages. @@ -115,13 +137,13 @@ def call_handlers(self, msg): """ pass - def process_events(self): + def process_events(self) -> None: """Subclasses should override this with a method processing any pending GUI events. """ pass - def flush(self, timeout=1.0): + def flush(self, timeout: float = 1.0) -> None: """Immediately processes all pending messages on this channel. This is only used for the IOPub channel. @@ -141,14 +163,16 @@ def flush(self, timeout=1.0): # We do the IOLoop callback process twice to ensure that the IOLoop # gets to perform at least one full poll. stop_time = time.time() + timeout + assert self.ioloop is not None for i in range(2): self._flushed = False self.ioloop.add_callback(self._flush) while not self._flushed and time.time() < stop_time: time.sleep(0.01) - def _flush(self): + def _flush(self) -> None: """Callback for :method:`self.flush`.""" + assert self.stream is not None self.stream.flush() self._flushed = True @@ -165,13 +189,13 @@ def __init__(self): @staticmethod @atexit.register - def _notice_exit(): + def _notice_exit() -> None: # Class definitions can be torn down during interpreter shutdown. # We only need to set _exiting flag if this hasn't happened. if IOLoopThread is not None: IOLoopThread._exiting = True - def start(self): + def start(self) -> None: """Start the IOLoop thread Don't return until self.ioloop is defined, @@ -181,15 +205,12 @@ def start(self): Thread.start(self) self._start_event.wait() - def run(self): + def run(self) -> None: """Run my loop, ignoring EINTR events in the poller""" - if "asyncio" in sys.modules: - # tornado may be using asyncio, - # ensure an eventloop exists for this thread - import asyncio - - asyncio.set_event_loop(asyncio.new_event_loop()) + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) self.ioloop = ioloop.IOLoop() + self.ioloop._asyncio_event_loop = loop # signal that self.ioloop is defined self._start_event.set() while True: @@ -208,7 +229,7 @@ def run(self): else: break - def stop(self): + def stop(self) -> None: """Stop the channel's event loop and join its thread. This calls :meth:`~threading.Thread.join` and returns when the thread @@ -221,7 +242,7 @@ def stop(self): self.close() self.ioloop = None - def close(self): + def close(self) -> None: if self.ioloop is not None: try: self.ioloop.close(all_fds=True) @@ -238,7 +259,14 @@ def ioloop(self): ioloop_thread = Instance(IOLoopThread, allow_none=True) - def start_channels(self, shell=True, iopub=True, stdin=True, hb=True, control=True): + def start_channels( + self, + shell: bool = True, + iopub: bool = True, + stdin: bool = True, + hb: bool = True, + control: bool = True, + ) -> None: self.ioloop_thread = IOLoopThread() self.ioloop_thread.start() @@ -247,13 +275,13 @@ def start_channels(self, shell=True, iopub=True, stdin=True, hb=True, control=Tr super().start_channels(shell, iopub, stdin, hb, control) - def _check_kernel_info_reply(self, msg): + def _check_kernel_info_reply(self, msg: Dict[str, Any]) -> None: """This is run in the ioloop thread when the kernel info reply is received""" if msg["msg_type"] == "kernel_info_reply": self._handle_kernel_info_reply(msg) self.shell_channel._inspect = None - def stop_channels(self): + def stop_channels(self) -> None: super().stop_channels() if self.ioloop_thread.is_alive(): self.ioloop_thread.stop() @@ -263,3 +291,13 @@ def stop_channels(self): stdin_channel_class = Type(ThreadedZMQSocketChannel) hb_channel_class = Type(HBChannel) control_channel_class = Type(ThreadedZMQSocketChannel) + + def is_alive(self) -> bool: + """Is the kernel process still running?""" + if self._hb_channel is not None: + # We don't have access to the KernelManager, + # so we use the heartbeat. + return self._hb_channel.is_beating() + # no heartbeat and not local, we can't tell if it's running, + # so naively return True + return True diff --git a/jupyter_client/utils.py b/jupyter_client/utils.py index 7a3876146..6b0a2475c 100644 --- a/jupyter_client/utils.py +++ b/jupyter_client/utils.py @@ -8,10 +8,10 @@ import os import sys -import nest_asyncio +import nest_asyncio # type: ignore if os.name == "nt" and sys.version_info >= (3, 7): - asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) + asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) # type: ignore def run_sync(coro):