Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tornado 5 fixes in ThreadedClient #352

Merged
merged 2 commits into from Mar 9, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
49 changes: 36 additions & 13 deletions jupyter_client/threaded.py
Expand Up @@ -3,7 +3,8 @@
from __future__ import absolute_import
import atexit
import errno
from threading import Thread
import sys
from threading import Thread, Event
import time

# import ZMQError in top-level namespace, to avoid ugly attribute-error messages
Expand Down Expand Up @@ -41,9 +42,15 @@ def __init__(self, socket, session, loop):
self.socket = socket
self.session = session
self.ioloop = loop
evt = Event()

self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
self.stream.on_recv(self._handle_recv)
def setup_stream():
self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
self.stream.on_recv(self._handle_recv)
evt.set()

self.ioloop.add_callback(setup_stream)
evt.wait()

_is_alive = False
def is_alive(self):
Expand Down Expand Up @@ -142,11 +149,11 @@ class IOLoopThread(Thread):
"""Run a pyzmq ioloop in a thread to send and receive messages
"""
_exiting = False
ioloop = None

def __init__(self, loop):
def __init__(self):
super(IOLoopThread, self).__init__()
self.daemon = True
self.ioloop = loop or ioloop.IOLoop()

@staticmethod
@atexit.register
Expand All @@ -156,8 +163,26 @@ def _notice_exit():
if IOLoopThread is not None:
IOLoopThread._exiting = True

def start(self):
"""Start the IOLoop thread

Don't return until self.ioloop is defined,
which is created in the thread
"""
self._start_event = Event()
Thread.start(self)
self._start_event.wait()

def run(self):
"""Run my loop, ignoring EINTR events in the poller"""
if 'asyncio' in sys.modules:
# tornado may be using asyncio,
# ensure an eventloop exists for this thread
import asyncio
asyncio.set_event_loop(asyncio.new_event_loop())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems odd that we have to do this. Is there a neater way that we can use in the future when we can assume that tornado is using asyncio?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The situation: asyncio will not create an eventloop in a thread, you have to tell it to, at least in certain circumstances. Tornado is running on top of asyncio, but won't create the required asyncio eventloop if there isn't one (asyncio.get_event_loop will fail in threads that haven't initialized eventloops explicitly). I'm not sure if this is a tornado bug or not.

We can ask if the tornado configured IOLoop class is a subclass of AsyncIOLoop, which is the tornado asyncio wrapper implementation. If we assume that's going to be the case and not some other weird requires-asyncio implementation that doesn't subclass it.

Note that this is just instantiating an object, not starting it or anything, so it's a pretty minimal operation and harmless if the thread-local eventloop goes unused. I'm not quite sure what the cleanest solution is for "create the asyncio eventloop that tornado may need in this thread only if tornado is actually going to need it", because we would need more detailed try-except for importing tornado.platform.asyncio.AsyncIOLoop which may not be defined (e.g. python 2 or older tornado) in order to call issubclass().

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could consider setting our own policy, so that we handle the calls to get_event_loop.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't set our own policy in jupyter_client because that would preclude applications setting their own policies. We could set our own in qtconsoleapp, but can't rely on that in the qtconsole kernelmanager, which can be used in other applications like Spyder.

self.ioloop = ioloop.IOLoop()
# signal that self.ioloop is defined
self._start_event.set()
while True:
try:
self.ioloop.start()
Expand All @@ -182,9 +207,10 @@ def stop(self):
:meth:`~threading.Thread.start` is called again.
"""
if self.ioloop is not None:
self.ioloop.stop()
self.ioloop.add_callback(self.ioloop.stop)
self.join()
self.close()
self.ioloop = None

def close(self):
if self.ioloop is not None:
Expand All @@ -198,22 +224,19 @@ class ThreadedKernelClient(KernelClient):
""" A KernelClient that provides thread-safe sockets with async callbacks on message replies.
"""

_ioloop = None
@property
def ioloop(self):
if self._ioloop is None:
self._ioloop = ioloop.IOLoop()
return self._ioloop
return self.ioloop_thread.ioloop

ioloop_thread = Instance(IOLoopThread, allow_none=True)

def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
self.ioloop_thread = IOLoopThread()
self.ioloop_thread.start()

if shell:
self.shell_channel._inspect = self._check_kernel_info_reply

self.ioloop_thread = IOLoopThread(self.ioloop)
self.ioloop_thread.start()

super(ThreadedKernelClient, self).start_channels(shell, iopub, stdin, hb)

def _check_kernel_info_reply(self, msg):
Expand Down