Skip to content

Commit

Permalink
Merge pull request #153 from clkao/pending-messasges
Browse files Browse the repository at this point in the history
progress messages for jupyterhub 0.9
  • Loading branch information
minrk committed May 24, 2018
2 parents 6d5993c + de759c0 commit a7b7374
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 4 deletions.
1 change: 0 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ python:
- nightly
- 3.6
- 3.5
- 3.4

# install dependencies
install:
Expand Down
30 changes: 28 additions & 2 deletions kubespawner/reflector.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,14 @@ class NamespacedResourceReflector(LoggingConfigurable):
"""
)

fields = Dict(
{},
config=True,
help="""
Fields to restrict the reflected objects
"""
)

namespace = Unicode(
None,
allow_none=True,
Expand Down Expand Up @@ -88,11 +96,16 @@ def __init__(self, *args, **kwargs):

# FIXME: Protect against malicious labels?
self.label_selector = ','.join(['{}={}'.format(k, v) for k, v in self.labels.items()])
self.field_selector = ','.join(['{}={}'.format(k, v) for k, v in self.fields.items()])

self.first_load_future = Future()
self._stop_event = threading.Event()

self.start()

def __del__(self):
self.stop()

def _list_and_update(self):
"""
Update current list of resources by doing a full fetch.
Expand All @@ -101,7 +114,8 @@ def _list_and_update(self):
"""
initial_resources = getattr(self.api, self.list_method_name)(
self.namespace,
label_selector=self.label_selector
label_selector=self.label_selector,
field_selector=self.field_selector
)
# This is an atomic operation on the dictionary!
self.resources = {p.metadata.name: p for p in initial_resources.items}
Expand Down Expand Up @@ -132,8 +146,8 @@ def _watch_and_update(self):
update' cycle on them), we should be ok!
"""
cur_delay = 0.1
self.log.info("watching for %s with label selector %s / field selector %s in namespace %s", self.kind, self.label_selector, self.field_selector, self.namespace)
while True:
self.log.info("watching for %s with label selector %s in namespace %s", self.kind, self.label_selector, self.namespace)
w = watch.Watch()
try:
resource_version = self._list_and_update()
Expand All @@ -144,7 +158,9 @@ def _watch_and_update(self):
getattr(self.api, self.list_method_name),
self.namespace,
label_selector=self.label_selector,
field_selector=self.field_selector,
resource_version=resource_version,
timeout_seconds=10,
):
cur_delay = 0.1
resource = ev['object']
Expand All @@ -154,6 +170,9 @@ def _watch_and_update(self):
else:
# This is an atomic operation on the dictionary!
self.resources[resource.metadata.name] = resource
if self._stop_event.is_set():
break

except Exception:
cur_delay = cur_delay * 2
if cur_delay > 30:
Expand All @@ -166,6 +185,9 @@ def _watch_and_update(self):
continue
finally:
w.stop()
if self._stop_event.is_set():
self.log.info("%s watcher stopped", self.kind)
break

def start(self):
"""
Expand All @@ -186,4 +208,8 @@ def start(self):
self.watch_thread.daemon = True
self.watch_thread.start()

def stop(self):
self._stop_event.set()

def stopped(self):
return self._stop_event.is_set()
48 changes: 47 additions & 1 deletion kubespawner/spawner.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@
from kubespawner.utils import Callable
from kubespawner.objects import make_pod, make_pvc
from kubespawner.reflector import NamespacedResourceReflector

from asyncio import sleep
from async_generator import async_generator, yield_

class PodReflector(NamespacedResourceReflector):
kind = 'pods'
Expand All @@ -44,6 +45,15 @@ class PodReflector(NamespacedResourceReflector):
def pods(self):
return self.resources

class EventReflector(NamespacedResourceReflector):
kind = 'events'

list_method_name = 'list_namespaced_event'

@property
def events(self):
return sorted(self.resources.values(), key = lambda x : x.last_timestamp)

class KubeSpawner(Spawner):
"""
Implement a JupyterHub spawner to spawn pods in a Kubernetes Cluster.
Expand Down Expand Up @@ -1112,6 +1122,24 @@ def poll(self):
def asynchronize(self, method, *args, **kwargs):
return method(*args, **kwargs)

@async_generator
async def progress(self):
next_event = 0
self.log.debug('progress generator: %s', self.pod_name)

while not self.events.stopped():
len_events = len(self.events.events)
if next_event < len_events:
for i in range(next_event, len_events):
event = self.events.events[i]
await yield_({
'progress': 50,
'message': "%s [%s] %s" % (event.last_timestamp, event.type, event.message)
})
next_event = len_events
await sleep(1)


@gen.coroutine
def start(self):
if self.user_storage_pvc_ensure:
Expand All @@ -1128,6 +1156,17 @@ def start(self):
else:
raise

main_loop = IOLoop.current()
def on_reflector_failure():
self.log.critical("Events reflector failed, halting Hub.")
main_loop.stop()

# events are selected based on pod name, which will include previous launch/stop
self.events = EventReflector(
parent=self, namespace=self.namespace,
fields={'involvedObject.kind': 'Pod', 'involvedObject.name': self.pod_name},
on_failure=on_reflector_failure
)
# If we run into a 409 Conflict error, it means a pod with the
# same name already exists. We stop it, wait for it to stop, and
# try again. We try 4 times, and if it still fails we give up.
Expand Down Expand Up @@ -1167,6 +1206,13 @@ def start(self):
)

pod = self.pod_reflector.pods[self.pod_name]

self.log.debug('pod %s events before launch: %s',
self.pod_name, "\n".join(["%s [%s] %s" % (event.last_timestamp, event.type, event.message) for event in self.events.events]))

# Note: we stop the event watcher once launch is successful, but the reflector
# will only stop when the next event comes in, likely when it is stopped.
self.events.stop()
return (pod.status.pod_ip, self.port)

@gen.coroutine
Expand Down
10 changes: 10 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,12 @@
from __future__ import print_function
from setuptools import setup, find_packages
import sys

v = sys.version_info
if v[:2] < (3, 5):
error = "ERROR: jupyterhub-kubespawner requires Python version 3.5 or above."
print(error, file=sys.stderr)
sys.exit(1)

setup(
name='jupyterhub-kubespawner',
Expand All @@ -9,7 +17,9 @@
'kubernetes==4.*',
'escapism',
'jinja2',
'async_generator>=1.8',
],
python_requires = ">=3.5",
setup_requires=['pytest-runner'],
tests_require=['pytest'],
description='JupyterHub Spawner targeting Kubernetes',
Expand Down

0 comments on commit a7b7374

Please sign in to comment.