Skip to content

Commit

Permalink
Attempt to re-establish websocket connection to Gateway
Browse files Browse the repository at this point in the history
When notebook (with `--gateway-url` option) lost the connection to
Gateway, notebook didn't connect to Gateway again although the
websocket connection from the client was still alive.

This change recovers the connection to Gateway to prevent this anomaly.

Signed-off-by: Eunsoo Park <esevan.park@gmail.com>
  • Loading branch information
esevan authored and lresende committed Aug 7, 2019
1 parent 5c43809 commit 20c2c66
Showing 1 changed file with 18 additions and 8 deletions.
26 changes: 18 additions & 8 deletions notebook/gateway/handlers.py
Expand Up @@ -130,10 +130,12 @@ def __init__(self, **kwargs):
self.kernel_id = None
self.ws = None
self.ws_future = Future()
self.ws_future_cancelled = False
self.disconnected = False

@gen.coroutine
def _connect(self, kernel_id):
# websocket is initialized before connection
self.ws = None
self.kernel_id = kernel_id
ws_url = url_path_join(
GatewayClient.instance().ws_url,
Expand All @@ -148,40 +150,48 @@ def _connect(self, kernel_id):
self.ws_future.add_done_callback(self._connection_done)

def _connection_done(self, fut):
if not self.ws_future_cancelled: # prevent concurrent.futures._base.CancelledError
if not self.disconnected and fut.exception() is None: # prevent concurrent.futures._base.CancelledError
self.ws = fut.result()
self.log.debug("Connection is ready: ws: {}".format(self.ws))
else:
self.log.warning("Websocket connection has been cancelled via client disconnect before its establishment. "
self.log.warning("Websocket connection has been closed via client disconnect or due to error. "
"Kernel with ID '{}' may not be terminated on GatewayClient: {}".
format(self.kernel_id, GatewayClient.instance().url))

def _disconnect(self):
self.disconnected = True
if self.ws is not None:
# Close connection
self.ws.close()
elif not self.ws_future.done():
# Cancel pending connection. Since future.cancel() is a noop on tornado, we'll track cancellation locally
self.ws_future.cancel()
self.ws_future_cancelled = True
self.log.debug("_disconnect: ws_future_cancelled: {}".format(self.ws_future_cancelled))
self.log.debug("_disconnect: future cancelled, disconnected: {}".format(self.disconnected))

@gen.coroutine
def _read_messages(self, callback):
"""Read messages from gateway server."""
while True:
while self.ws is not None:
message = None
if not self.ws_future_cancelled:
if not self.disconnected:
try:
message = yield self.ws.read_message()
except Exception as e:
self.log.error("Exception reading message from websocket: {}".format(e)) # , exc_info=True)
if message is None:
if not self.disconnected:
self.log.warning("Lost connection to Gateway: {}".format(self.kernel_id))
break
callback(message) # pass back to notebook client (see self.on_open and WebSocketChannelsHandler.open)
else: # ws cancelled - stop reading
break

if not self.disconnected: # if websocket is not disconnected by client, attept to reconnect to Gateway
self.log.info("Attempting to re-establish the connection to Gateway: {}".format(self.kernel_id))
self._connect(self.kernel_id)
loop = IOLoop.current()
loop.add_future(self.ws_future, lambda future: self._read_messages(callback))

def on_open(self, kernel_id, message_callback, **kwargs):
"""Web socket connection open against gateway server."""
self._connect(kernel_id)
Expand All @@ -205,7 +215,7 @@ def on_message(self, message):
def _write_message(self, message):
"""Send message to gateway server."""
try:
if not self.ws_future_cancelled:
if not self.disconnected and self.ws is not None:
self.ws.write_message(message)
except Exception as e:
self.log.error("Exception writing message to websocket: {}".format(e)) # , exc_info=True)
Expand Down

0 comments on commit 20c2c66

Please sign in to comment.