Skip to content

Commit

Permalink
Merge pull request #174 from ausecocloud/master
Browse files Browse the repository at this point in the history
Add timeout handling to reflector
  • Loading branch information
minrk committed May 29, 2018
2 parents b0b1041 + b065bec commit 94dc895
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 36 deletions.
57 changes: 48 additions & 9 deletions kubespawner/reflector.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@
import threading

from traitlets.config import LoggingConfigurable
from traitlets import Any, Dict, Unicode
from traitlets import Any, Dict, Int, Unicode
from kubernetes import config, watch
from tornado.ioloop import IOLoop
# This is kubernetes client implementation specific, but we need to know
# whether it was a network or watch timeout.
from urllib3.exceptions import ReadTimeoutError

from .clients import shared_client

class NamespacedResourceReflector(LoggingConfigurable):
Expand Down Expand Up @@ -82,6 +85,29 @@ class NamespacedResourceReflector(LoggingConfigurable):
"""
)

request_timeout = Int(
0,
config=True,
help="""
Network timeout for kubernetes watch.
Trigger watch reconnect when no traffic has been received for this time.
This can be used to restart the watch periodically.
"""
)

timeout_seconds = Int(
10,
config=True,
help="""
Timeout for kubernetes watch.
Trigger watch reconnect when no watch event has been received.
This will cause a full reload of the currently existing resources
from the API server.
"""
)

on_failure = Any(help="""Function to be called when the reflector gives up.""")

def __init__(self, *args, **kwargs):
Expand Down Expand Up @@ -115,7 +141,8 @@ def _list_and_update(self):
initial_resources = getattr(self.api, self.list_method_name)(
self.namespace,
label_selector=self.label_selector,
field_selector=self.field_selector
field_selector=self.field_selector,
_request_timeout=self.request_timeout,
)
# This is an atomic operation on the dictionary!
self.resources = {p.metadata.name: p for p in initial_resources.items}
Expand Down Expand Up @@ -154,13 +181,23 @@ def _watch_and_update(self):
if not self.first_load_future.done():
# signal that we've loaded our initial data
self.first_load_future.set_result(None)
watch_args = {
'namespace': self.namespace,
'label_selector': self.label_selector,
'field_selector': self.field_selector,
'resource_version': resource_version,
}
if self.request_timeout:
# set network receive timeout
watch_args['_request_timeout'] = self.request_timeout
if self.timeout_seconds:
# set watch timeout
watch_args['timeout_seconds'] = self.timeout_seconds
# in case of timeout_seconds, the w.stream just exits (no exception thrown)
# -> we stop the watcher and start a new one
for ev in w.stream(
getattr(self.api, self.list_method_name),
self.namespace,
label_selector=self.label_selector,
field_selector=self.field_selector,
resource_version=resource_version,
timeout_seconds=10,
**watch_args
):
cur_delay = 0.1
resource = ev['object']
Expand All @@ -172,7 +209,9 @@ def _watch_and_update(self):
self.resources[resource.metadata.name] = resource
if self._stop_event.is_set():
break

except ReadTimeoutError:
# network read time out, just continue and restart the watch
continue
except Exception:
cur_delay = cur_delay * 2
if cur_delay > 30:
Expand Down
13 changes: 6 additions & 7 deletions kubespawner/spawner.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from tornado import gen
from tornado.ioloop import IOLoop
from tornado.concurrent import run_on_executor
from traitlets import Type, Unicode, List, Integer, Union, Dict, Bool, Any
from traitlets import Unicode, List, Integer, Union, Dict, Bool
from jupyterhub.spawner import Spawner
from jupyterhub.utils import exponential_backoff
from jupyterhub.traitlets import Command
Expand All @@ -25,7 +25,6 @@

from .clients import shared_client
from kubespawner.traitlets import Callable
from kubespawner.utils import Callable
from kubespawner.objects import make_pod, make_pvc
from kubespawner.reflector import NamespacedResourceReflector
from asyncio import sleep
Expand Down Expand Up @@ -1245,11 +1244,11 @@ def stop(self, now=False):
body=delete_options,
grace_period_seconds=grace_seconds
)
while True:
data = self.pod_reflector.pods.get(self.pod_name, None)
if data is None:
break
yield gen.sleep(1)
yield exponential_backoff(
lambda: self.pod_reflector.pods.get(self.pod_name, None) is None,
'pod/%s did not disappear in %s seconds!' % (self.pod_name, self.start_timeout),
timeout=self.start_timeout
)

def _env_keep_default(self):
return []
Expand Down
4 changes: 2 additions & 2 deletions kubespawner/traitlets.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""
Traitlets that are used in Kubespawner
"""
from traitlets import TraitType, TraitError, Dict
from traitlets import TraitType


class Callable(TraitType):
Expand All @@ -16,6 +16,6 @@ class Callable(TraitType):

def validate(self, obj, value):
if callable(value):
return value
return value
else:
self.error(obj, value)
18 changes: 0 additions & 18 deletions kubespawner/utils.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
"""
Misc. general utility functions, not tied to Kubespawner directly
"""
import random
import hashlib

from traitlets import TraitType

def generate_hashed_slug(slug, limit=63, hash_length=6):
"""
Expand All @@ -29,19 +27,3 @@ def generate_hashed_slug(slug, limit=63, hash_length=6):
prefix=slug[:limit - hash_length - 1],
hash=slug_hash[:hash_length],
).lower()


class Callable(TraitType):
"""A trait which is callable.
Notes
-----
Classes are callable, as are instances
with a __call__() method."""

info_text = 'a callable'

def validate(self, obj, value):
if callable(value):
return value
else:
self.error(obj, value)

0 comments on commit 94dc895

Please sign in to comment.