Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add reconnection to Gateway #378

Merged
merged 1 commit into from
Jan 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions jupyter_server/gateway/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import os
import logging
import mimetypes
import random
import asyncio

from ..base.handlers import APIHandler, JupyterHandler
from ..utils import url_path_join
Expand Down Expand Up @@ -133,6 +135,7 @@ def __init__(self, **kwargs):
self.ws = None
self.ws_future = Future()
self.disconnected = False
self.retry = 0

async def _connect(self, kernel_id, message_callback):
# websocket is initialized before connection
Expand All @@ -159,6 +162,7 @@ async def _connect(self, kernel_id, message_callback):
def _connection_done(self, fut):
if not self.disconnected and fut.exception() is None: # prevent concurrent.futures._base.CancelledError
self.ws = fut.result()
self.retry = 0
self.log.debug("Connection is ready: ws: {}".format(self.ws))
else:
self.log.warning("Websocket connection has been closed via client disconnect or due to error. "
Expand Down Expand Up @@ -192,8 +196,15 @@ async def _read_messages(self, callback):
else: # ws cancelled - stop reading
break

if not self.disconnected: # if websocket is not disconnected by client, attept to reconnect to Gateway
self.log.info("Attempting to re-establish the connection to Gateway: {}".format(self.kernel_id))
# NOTE(esevan): if websocket is not disconnected by client, try to reconnect.
if not self.disconnected and self.retry < GatewayClient.instance().gateway_retry_max:
jitter = random.randint(10, 100) * 0.01
retry_interval = min(GatewayClient.instance().gateway_retry_interval * (2 ** self.retry),
GatewayClient.instance().gateway_retry_interval_max) + jitter
self.retry += 1
self.log.info("Attempting to re-establish the connection to Gateway in %s secs (%s/%s): %s",
retry_interval, self.retry, GatewayClient.instance().gateway_retry_max, self.kernel_id)
await asyncio.sleep(retry_interval)
loop = IOLoop.current()
loop.spawn_callback(self._connect, self.kernel_id, callback)

Expand Down
34 changes: 33 additions & 1 deletion jupyter_server/gateway/managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from jupyter_client.kernelspec import KernelSpecManager
from ..utils import url_path_join

from traitlets import Instance, Unicode, Float, Bool, default, validate, TraitError
from traitlets import Instance, Unicode, Int, Float, Bool, default, validate, TraitError
from traitlets.config import SingletonConfigurable


Expand Down Expand Up @@ -220,6 +220,38 @@ def __init__(self, **kwargs):
def _env_whitelist_default(self):
return os.environ.get(self.env_whitelist_env, self.env_whitelist_default_value)

gateway_retry_interval_default_value = 1.0
gateway_retry_interval_env = 'JUPYTER_GATEWAY_RETRY_INTERVAL'
gateway_retry_interval = Float(default_value=gateway_retry_interval_default_value, config=True,
help="""The time allowed for HTTP reconnection with the Gateway server for the first time.
Next will be JUPYTER_GATEWAY_RETRY_INTERVAL multiplied by two in factor of numbers of retries
but less than JUPYTER_GATEWAY_RETRY_INTERVAL_MAX.
(JUPYTER_GATEWAY_RETRY_INTERVAL env var)""")

@default('gateway_retry_interval')
def gateway_retry_interval_default(self):
return float(os.environ.get('JUPYTER_GATEWAY_RETRY_INTERVAL', self.gateway_retry_interval_default_value))

gateway_retry_interval_max_default_value = 30.0
gateway_retry_interval_max_env = 'JUPYTER_GATEWAY_RETRY_INTERVAL_MAX'
gateway_retry_interval_max = Float(default_value=gateway_retry_interval_max_default_value, config=True,
help="""The maximum time allowed for HTTP reconnection retry with the Gateway server.
(JUPYTER_GATEWAY_RETRY_INTERVAL_MAX env var)""")

@default('gateway_retry_interval_max')
def gateway_retry_interval_max_default(self):
return float(os.environ.get('JUPYTER_GATEWAY_RETRY_INTERVAL_MAX', self.gateway_retry_interval_max_default_value))

gateway_retry_max_default_value = 5
gateway_retry_max_env = 'JUPYTER_GATEWAY_RETRY_MAX'
gateway_retry_max = Int(default_value=gateway_retry_max_default_value, config=True,
help="""The maximum retries allowed for HTTP reconnection with the Gateway server.
(JUPYTER_GATEWAY_RETRY_MAX env var)""")

@default('gateway_retry_max')
def gateway_retry_max_default(self):
return int(os.environ.get('JUPYTER_GATEWAY_RETRY_MAX', self.gateway_retry_max_default_value))

@property
def gateway_enabled(self):
return bool(self.url is not None and len(self.url) > 0)
Expand Down