Skip to content

Commit

Permalink
Merge pull request #7389 from minrk/one-websocket
Browse files Browse the repository at this point in the history
use single WebSocket connection for all channels
  • Loading branch information
ellisonbg committed Jan 9, 2015
2 parents a5b6e2c + 171de5c commit 62757cf
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 142 deletions.
11 changes: 7 additions & 4 deletions IPython/html/base/zmqhandlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ def clear_cookie(self, *args, **kwargs):
"""meaningless for websockets"""
pass

def _reserialize_reply(self, msg_list):
def _reserialize_reply(self, msg_list, channel=None):
"""Reserialize a reply message using JSON.
This takes the msg list from the ZMQ socket, deserializes it using
Expand All @@ -148,19 +148,22 @@ def _reserialize_reply(self, msg_list):
"""
idents, msg_list = self.session.feed_identities(msg_list)
msg = self.session.deserialize(msg_list)
if channel:
msg['channel'] = channel
if msg['buffers']:
buf = serialize_binary_message(msg)
return buf
else:
smsg = json.dumps(msg, default=date_default)
return cast_unicode(smsg)

def _on_zmq_reply(self, msg_list):
def _on_zmq_reply(self, stream, msg_list):
# Sometimes this gets triggered when the on_close method is scheduled in the
# eventloop but hasn't been called.
if self.stream.closed(): return
if stream.closed(): return
channel = getattr(stream, 'channel', None)
try:
msg = self._reserialize_reply(msg_list)
msg = self._reserialize_reply(msg_list, channel=channel)
except Exception:
self.log.critical("Malformed message: %r" % msg_list, exc_info=True)
else:
Expand Down
94 changes: 39 additions & 55 deletions IPython/html/services/kernels/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def post(self, kernel_id, action):
self.finish()


class ZMQChannelHandler(AuthenticatedZMQStreamHandler):
class ZMQChannelsHandler(AuthenticatedZMQStreamHandler):

@property
def kernel_info_timeout(self):
Expand All @@ -95,8 +95,13 @@ def __repr__(self):

def create_stream(self):
km = self.kernel_manager
meth = getattr(km, 'connect_%s' % self.channel)
self.zmq_stream = meth(self.kernel_id, identity=self.session.bsession)
identity = self.session.bsession
for channel in ('shell', 'iopub', 'stdin'):
meth = getattr(km, 'connect_' + channel)
self.channels[channel] = stream = meth(self.kernel_id, identity=identity)
stream.channel = channel
km.add_restart_callback(self.kernel_id, self.on_kernel_restarted)
km.add_restart_callback(self.kernel_id, self.on_restart_failed, 'dead')

def request_kernel_info(self):
"""send a request for kernel_info"""
Expand Down Expand Up @@ -160,16 +165,17 @@ def _finish_kernel_info(self, info):
self._kernel_info_future.set_result(info)

def initialize(self):
super(ZMQChannelHandler, self).initialize()
super(ZMQChannelsHandler, self).initialize()
self.zmq_stream = None
self.channels = {}
self.kernel_id = None
self.kernel_info_channel = None
self._kernel_info_future = Future()

@gen.coroutine
def pre_get(self):
# authenticate first
super(ZMQChannelHandler, self).pre_get()
super(ZMQChannelsHandler, self).pre_get()
# then request kernel info, waiting up to a certain time before giving up.
# We don't want to wait forever, because browsers don't take it well when
# servers never respond to websocket connection requests.
Expand All @@ -189,56 +195,36 @@ def give_up():
@gen.coroutine
def get(self, kernel_id):
self.kernel_id = cast_unicode(kernel_id, 'ascii')
yield super(ZMQChannelHandler, self).get(kernel_id=kernel_id)
yield super(ZMQChannelsHandler, self).get(kernel_id=kernel_id)

def open(self, kernel_id):
super(ZMQChannelHandler, self).open()
super(ZMQChannelsHandler, self).open()
try:
self.create_stream()
except web.HTTPError as e:
self.log.error("Error opening stream: %s", e)
# WebSockets don't response to traditional error codes so we
# close the connection.
if not self.stream.closed():
self.stream.close()
for channel, stream in self.channels.items():
if not stream.closed():
stream.close()
self.close()
else:
self.zmq_stream.on_recv(self._on_zmq_reply)
for channel, stream in self.channels.items():
stream.on_recv_stream(self._on_zmq_reply)

def on_message(self, msg):
if self.zmq_stream is None:
return
elif self.zmq_stream.closed():
self.log.info("%s closed, closing websocket.", self)
self.close()
return
if isinstance(msg, bytes):
msg = deserialize_binary_message(msg)
else:
msg = json.loads(msg)
self.session.send(self.zmq_stream, msg)
channel = msg.pop('channel', None)
if channel is None:
self.log.warn("No channel specified, assuming shell: %s", msg)
channel = 'shell'
stream = self.channels[channel]
self.session.send(stream, msg)

def on_close(self):
# This method can be called twice, once by self.kernel_died and once
# from the WebSocket close event. If the WebSocket connection is
# closed before the ZMQ streams are setup, they could be None.
if self.zmq_stream is not None and not self.zmq_stream.closed():
self.zmq_stream.on_recv(None)
# close the socket directly, don't wait for the stream
socket = self.zmq_stream.socket
self.zmq_stream.close()
socket.close()


class IOPubHandler(ZMQChannelHandler):
channel = 'iopub'

def create_stream(self):
super(IOPubHandler, self).create_stream()
km = self.kernel_manager
km.add_restart_callback(self.kernel_id, self.on_kernel_restarted)
km.add_restart_callback(self.kernel_id, self.on_restart_failed, 'dead')

def on_close(self):
km = self.kernel_manager
if self.kernel_id in km:
Expand All @@ -248,12 +234,24 @@ def on_close(self):
km.remove_restart_callback(
self.kernel_id, self.on_restart_failed, 'dead',
)
super(IOPubHandler, self).on_close()

# This method can be called twice, once by self.kernel_died and once
# from the WebSocket close event. If the WebSocket connection is
# closed before the ZMQ streams are setup, they could be None.
for channel, stream in self.channels.items():
if stream is not None and not stream.closed():
stream.on_recv(None)
# close the socket directly, don't wait for the stream
socket = stream.socket
stream.close()
socket.close()

self.channels = {}

def _send_status_message(self, status):
msg = self.session.msg("status",
{'execution_state': status}
)
msg['channel'] = 'iopub'
self.write_message(json.dumps(msg, default=date_default))

def on_kernel_restarted(self):
Expand All @@ -263,18 +261,6 @@ def on_kernel_restarted(self):
def on_restart_failed(self):
logging.error("kernel %s restarted failed!", self.kernel_id)
self._send_status_message('dead')

def on_message(self, msg):
"""IOPub messages make no sense"""
pass


class ShellHandler(ZMQChannelHandler):
channel = 'shell'


class StdinHandler(ZMQChannelHandler):
channel = 'stdin'


#-----------------------------------------------------------------------------
Expand All @@ -289,7 +275,5 @@ class StdinHandler(ZMQChannelHandler):
(r"/api/kernels", MainKernelHandler),
(r"/api/kernels/%s" % _kernel_id_regex, KernelHandler),
(r"/api/kernels/%s/%s" % (_kernel_id_regex, _kernel_action_regex), KernelActionHandler),
(r"/api/kernels/%s/iopub" % _kernel_id_regex, IOPubHandler),
(r"/api/kernels/%s/shell" % _kernel_id_regex, ShellHandler),
(r"/api/kernels/%s/stdin" % _kernel_id_regex, StdinHandler)
(r"/api/kernels/%s/channels" % _kernel_id_regex, ZMQChannelsHandler),
]

0 comments on commit 62757cf

Please sign in to comment.