Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid running nested runloops in ThreadedZMQSocketChannel #831

Closed
wants to merge 3 commits into from

Conversation

marc-etienne
Copy link

@marc-etienne marc-etienne commented Sep 9, 2022

Hi!

I'm the maintainer of a project called IPyIDA, which integrates the QtConsole inside IDA Pro.

I'm using lots of plumbing to make this all work, and use QtKernelManager and QtKernelClient (which inherits from ThreadKernelClient) to connect to the kernel.

Since jupyter_client version 7, IPyIDA doesn't work and raises a RuntimeError and starting the client (see eset/ipyida#44). My temporary solution was to require working version of jupyter_client until I can find the source of the problem.

After much hours (probably too much :D) trying to understand the problem I think I found a proper solution. My understanding is that the SocketStream.on_recv was designed to be used with (pre-v6) Tornado runloop. Since Tornado 6, the Tornado runloop is just a wrapper around asyncio's. on_recv still works with the asyncio runloop but receives the (completed) Future object with the result. Trying to await the Future in another runloop crashes because the callback is called from an already running runloop.

The suggested commit fixes the problem with minimal changes because I didn't want to break other things I may not have full understanding about.

There is no need to await the Future object because it's received on the on_recv
callback, and should already contain the result.
@marc-etienne
Copy link
Author

ccing @ccordoba12 because he seems to be a user for Spyder and may want to test the change.

This issue is related to #803 and #638.

@blink1073 blink1073 added the bug label Sep 9, 2022
@blink1073
Copy link
Member

Hi @marc-etienne, can you please enable maintainer edits so I can push a fix for the linter?

@marc-etienne
Copy link
Author

Apparently "Allow edits from maintainers" isn't available if forked repo sits in an organisation 🙄. I've given you write access directly instead.

I see the the flake8 and mypy error in CI log, I'll look into it.

@ccordoba12
Copy link
Contributor

Hey @marc-etienne, thanks for the ping! Qtconsole tests run here, so that should be enough, but given that this kind of changes can be very disruptive, I'll open a PR in Spyder and set it up to run against your PR to be extra sure.

About the changes themselves, I think @davidbrochart is the best to review them.

instead of more generic Awaitable. It's the type received from the
SocketStream.on_recv callback.
@blink1073
Copy link
Member

Thanks @marc-etienne and @ccordoba12!

Copy link
Member

@davidbrochart davidbrochart left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @marc-etienne, I think you're right about not awaiting the message in another event loop.
I would actually go further, because I think the on_recv callback is called with the message itself, not a Future for it.
I'll push a commit with my suggestions to see it the Qtconsole tests pass, first.

@@ -114,13 +107,13 @@ def thread_send():
assert self.ioloop is not None
self.ioloop.add_callback(thread_send)

def _handle_recv(self, future_msg: Awaitable) -> None:
def _handle_recv(self, future_msg: asyncio.Future) -> None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at pyzmq's documentation, shouldn't the argument already be the (multipart) message, not a Future?

Suggested change
def _handle_recv(self, future_msg: asyncio.Future) -> None:
def _handle_recv(self, msg_list: List[bytes]) -> None:

Comment on lines 115 to 116
assert future_msg.done()
msg_list = future_msg.result()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
assert future_msg.done()
msg_list = future_msg.result()

@davidbrochart
Copy link
Member

Qtconsole's tests fail, suggesting that the argument to the on_recv callback is indeed an asyncio.Future, so your changes seem right @marc-etienne.
Pinging @minrk to confirm.

@marc-etienne
Copy link
Author

I think the pyzmq doc on on_recv is outdated. Or this could be a bug in pyzmq itself, maybe it should be pyzmq's responsibility to call .result()... I think the callback arg differs depending on the pyzmq context type of the stream (I think Jupyter Client uses the asyncio one?). A more general solution which doesn't depend on the context type could be to test the type of the received arg of _handle_recv using asyncio.isfuture(msg) and changing the method signature to something like def _handle_recv(self, msg: Union[List[bytes], asyncio.Future]) -> None:.

@davidbrochart
Copy link
Member

Let's clarify all that, and not try to guess the type of the argument at run-time.

@minrk
Copy link
Member

minrk commented Sep 12, 2022

The short answer is: ThreadedKernelClient shouldn't use zmq.asyncio.Context, it should use zmq.Context.

on_recv should indeed only get the message frames, but technically what it does is get the return value for socket.recv_multipart().

The problem is the switch here to zmq.asyncio.Context, which wasn't accounted for in ThreadedZMQChannel (or ThreadedKernelClient). So an async socket is being passed to a non-async API (ZMQStream), which means the async socket's result is getting passed to on_recv instead of the actual recv.

If there's a bug in pyzmq, it's that is creation of ZMQStream(zmq.asyncio.Socket) doesn't fail with a TypeError.

A class that uses ThreadedZMQSocketChannel should not be creating asyncio Sockets. You can always create multiple Python Socket objects for a single zmq socket with e.g. zmq.Socket.shadow(asyncio_socket) (or the other way around: zmq.asyncio.Socket.shadow(other_socket)).

@davidbrochart
Copy link
Member

Thanks for the explanation @minrk.
If I understand correctly, this PR should include bb2319c, and the changes you mentioned should be done in downstream projects (Qtconsole, Spyder, etc.). Is that right?

@blink1073
Copy link
Member

As part of #835, we properly use a synchronous socket for ThreadedZMQSocketChannel.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants