diff --git a/notebook/gateway/handlers.py b/notebook/gateway/handlers.py index 400c8e086c..64996dad00 100644 --- a/notebook/gateway/handlers.py +++ b/notebook/gateway/handlers.py @@ -130,10 +130,12 @@ def __init__(self, **kwargs): self.kernel_id = None self.ws = None self.ws_future = Future() - self.ws_future_cancelled = False + self.disconnected = False @gen.coroutine def _connect(self, kernel_id): + # websocket is initialized before connection + self.ws = None self.kernel_id = kernel_id ws_url = url_path_join( GatewayClient.instance().ws_url, @@ -148,40 +150,48 @@ def _connect(self, kernel_id): self.ws_future.add_done_callback(self._connection_done) def _connection_done(self, fut): - if not self.ws_future_cancelled: # prevent concurrent.futures._base.CancelledError + if not self.disconnected and fut.exception() is None: # prevent concurrent.futures._base.CancelledError self.ws = fut.result() self.log.debug("Connection is ready: ws: {}".format(self.ws)) else: - self.log.warning("Websocket connection has been cancelled via client disconnect before its establishment. " + self.log.warning("Websocket connection has been closed via client disconnect or due to error. " "Kernel with ID '{}' may not be terminated on GatewayClient: {}". format(self.kernel_id, GatewayClient.instance().url)) def _disconnect(self): + self.disconnected = True if self.ws is not None: # Close connection self.ws.close() elif not self.ws_future.done(): # Cancel pending connection. Since future.cancel() is a noop on tornado, we'll track cancellation locally self.ws_future.cancel() - self.ws_future_cancelled = True - self.log.debug("_disconnect: ws_future_cancelled: {}".format(self.ws_future_cancelled)) + self.log.debug("_disconnect: future cancelled, disconnected: {}".format(self.disconnected)) @gen.coroutine def _read_messages(self, callback): """Read messages from gateway server.""" - while True: + while self.ws is not None: message = None - if not self.ws_future_cancelled: + if not self.disconnected: try: message = yield self.ws.read_message() except Exception as e: self.log.error("Exception reading message from websocket: {}".format(e)) # , exc_info=True) if message is None: + if not self.disconnected: + self.log.warning("Lost connection to Gateway: {}".format(self.kernel_id)) break callback(message) # pass back to notebook client (see self.on_open and WebSocketChannelsHandler.open) else: # ws cancelled - stop reading break + if not self.disconnected: # if websocket is not disconnected by client, attept to reconnect to Gateway + self.log.info("Attempting to re-establish the connection to Gateway: {}".format(self.kernel_id)) + self._connect(self.kernel_id) + loop = IOLoop.current() + loop.add_future(self.ws_future, lambda future: self._read_messages(callback)) + def on_open(self, kernel_id, message_callback, **kwargs): """Web socket connection open against gateway server.""" self._connect(kernel_id) @@ -205,7 +215,7 @@ def on_message(self, message): def _write_message(self, message): """Send message to gateway server.""" try: - if not self.ws_future_cancelled: + if not self.disconnected and self.ws is not None: self.ws.write_message(message) except Exception as e: self.log.error("Exception writing message to websocket: {}".format(e)) # , exc_info=True)