Skip to content

Commit

Permalink
Merge pull request #150 from clkao/events-reflector
Browse files Browse the repository at this point in the history
Use EventReflector to watch pod events
  • Loading branch information
minrk committed Mar 26, 2018
2 parents 86386e8 + 619190a commit 085cb30
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 3 deletions.
25 changes: 22 additions & 3 deletions kubespawner/reflector.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,14 @@ class NamespacedResourceReflector(LoggingConfigurable):
"""
)

fields = Dict(
{},
config=True,
help="""
Fields to restrict the reflected objects
"""
)

namespace = Unicode(
None,
allow_none=True,
Expand Down Expand Up @@ -88,8 +96,10 @@ def __init__(self, *args, **kwargs):

# FIXME: Protect against malicious labels?
self.label_selector = ','.join(['{}={}'.format(k, v) for k, v in self.labels.items()])
self.field_selector = ','.join(['{}={}'.format(k, v) for k, v in self.fields.items()])

self.first_load_future = Future()
self._stop_event = threading.Event()

self.start()

Expand All @@ -101,7 +111,8 @@ def _list_and_update(self):
"""
initial_resources = getattr(self.api, self.list_method_name)(
self.namespace,
label_selector=self.label_selector
label_selector=self.label_selector,
field_selector=self.field_selector
)
# This is an atomic operation on the dictionary!
self.resources = {p.metadata.name: p for p in initial_resources.items}
Expand Down Expand Up @@ -133,7 +144,7 @@ def _watch_and_update(self):
"""
cur_delay = 0.1
while True:
self.log.info("watching for %s with label selector %s in namespace %s", self.kind, self.label_selector, self.namespace)
self.log.info("watching for %s with label selector %s / field selector %s in namespace %s", self.kind, self.label_selector, self.field_selector, self.namespace)
w = watch.Watch()
try:
resource_version = self._list_and_update()
Expand All @@ -144,6 +155,7 @@ def _watch_and_update(self):
getattr(self.api, self.list_method_name),
self.namespace,
label_selector=self.label_selector,
field_selector=self.field_selector,
resource_version=resource_version,
):
cur_delay = 0.1
Expand All @@ -154,6 +166,9 @@ def _watch_and_update(self):
else:
# This is an atomic operation on the dictionary!
self.resources[resource.metadata.name] = resource
if self._stop_event.is_set():
break

except Exception:
cur_delay = cur_delay * 2
if cur_delay > 30:
Expand All @@ -166,6 +181,9 @@ def _watch_and_update(self):
continue
finally:
w.stop()
if self._stop_event.is_set():
self.log.info("%s watcher stopped", self.kind)
break

def start(self):
"""
Expand All @@ -186,4 +204,5 @@ def start(self):
self.watch_thread.daemon = True
self.watch_thread.start()


def stop(self):
self._stop_event.set()
24 changes: 24 additions & 0 deletions kubespawner/spawner.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,15 @@ class PodReflector(NamespacedResourceReflector):
def pods(self):
return self.resources

class EventReflector(NamespacedResourceReflector):
kind = 'events'

list_method_name = 'list_namespaced_event'

@property
def events(self):
return self.resources

class KubeSpawner(Spawner):
"""
Implement a JupyterHub spawner to spawn pods in a Kubernetes Cluster.
Expand Down Expand Up @@ -1019,6 +1028,17 @@ def start(self):
else:
raise

main_loop = IOLoop.current()
def on_reflector_failure():
self.log.critical("Events reflector failed, halting Hub.")
main_loop.stop()

# events are selected based on pod name, which will include previous launch/stop
self.events = EventReflector(
parent=self, namespace=self.namespace,
fields={'involvedObject.kind': 'Pod', 'involvedObject.name': self.pod_name},
on_failure=on_reflector_failure
)
# If we run into a 409 Conflict error, it means a pod with the
# same name already exists. We stop it, wait for it to stop, and
# try again. We try 4 times, and if it still fails we give up.
Expand Down Expand Up @@ -1058,6 +1078,10 @@ def start(self):
)

pod = self.pod_reflector.pods[self.pod_name]
self.log.debug('pod %s events before launch: %s', self.pod_name, self.events.events)
# Note: we stop the event watcher once launch is successful, but the reflector
# will only stop when the next event comes in, likely when it is stopped.
self.events.stop()
return (pod.status.pod_ip, self.port)

@gen.coroutine
Expand Down

0 comments on commit 085cb30

Please sign in to comment.