Skip to content

Commit

Permalink
Fix services_enabled with component_label
Browse files Browse the repository at this point in the history
  • Loading branch information
dolfinus committed Oct 22, 2022
1 parent b96c54f commit a5a171a
Show file tree
Hide file tree
Showing 3 changed files with 164 additions and 32 deletions.
25 changes: 12 additions & 13 deletions kubespawner/objects.py
Expand Up @@ -6,6 +6,7 @@
import json
import operator
import re
from typing import List, Optional
from urllib.parse import urlparse

from kubernetes_asyncio.client.models import (
Expand Down Expand Up @@ -904,12 +905,12 @@ def make_secret(


def make_service(
name,
port,
servername,
owner_references,
labels=None,
annotations=None,
name: str,
port: int,
selector: dict,
owner_references: List[V1OwnerReference],
labels: Optional[dict] = None,
annotations: Optional[dict] = None,
):
"""
Make a k8s service specification for using dns to communicate with the notebook.
Expand All @@ -919,8 +920,10 @@ def make_service(
name:
Name of the service. Must be unique within the namespace the object is
going to be created in.
env:
Dictionary of environment variables.
selector:
Labels of server pod to be used in spec.selector
owner_references:
Pod's owner references used to automatically remote service after pod removal
labels:
Labels to add to the service.
annotations:
Expand All @@ -941,11 +944,7 @@ def make_service(
spec=V1ServiceSpec(
type='ClusterIP',
ports=[V1ServicePort(name='http', port=port, target_port=port)],
selector={
'component': 'singleuser-server',
'hub.jupyter.org/servername': servername,
'hub.jupyter.org/username': metadata.labels['hub.jupyter.org/username'],
},
selector=selector,
),
)

Expand Down
40 changes: 23 additions & 17 deletions kubespawner/spawner.py
Expand Up @@ -55,11 +55,6 @@ class PodReflector(ResourceReflector):

kind = "pods"

# The default component label can be over-ridden by specifying the component_label property
labels = {
'component': 'singleuser-server',
}

@property
def pods(self):
"""
Expand Down Expand Up @@ -2117,12 +2112,13 @@ def get_service_manifest(self, owner_reference):
annotations = self._build_common_annotations(
self._expand_all(self.extra_annotations)
)
selector = self._build_pod_labels(self._expand_all(self.extra_labels))

# TODO: validate that the service name
return make_service(
name=self.pod_name,
port=self.port,
servername=self.name,
selector=selector,
owner_references=[owner_reference],
labels=labels,
annotations=annotations,
Expand Down Expand Up @@ -2376,9 +2372,9 @@ async def progress(self):

def _start_reflector(
self,
kind=None,
reflector_class=ResourceReflector,
replace=False,
kind: str,
reflector_class: ResourceReflector,
replace: bool = False,
**kwargs,
):
"""Start a shared reflector on the KubeSpawner class
Expand All @@ -2395,7 +2391,6 @@ def _start_reflector(
and a new one started (for recovering from possible errors).
"""
key = kind
ReflectorClass = reflector_class

def on_reflector_failure():
self.log.critical(
Expand All @@ -2407,7 +2402,7 @@ def on_reflector_failure():
previous_reflector = self.__class__.reflectors.get(key)

if replace or not previous_reflector:
self.__class__.reflectors[key] = ReflectorClass(
self.__class__.reflectors[key] = reflector_class(
parent=self,
namespace=self.namespace,
on_failure=on_reflector_failure,
Expand All @@ -2418,19 +2413,23 @@ def on_reflector_failure():
async def catch_reflector_start():
try:
await f
except Exception as e:
except Exception:
self.log.exception(f"Reflector for {kind} failed to start.")
sys.exit(1)

asyncio.create_task(catch_reflector_start())

if replace and previous_reflector:
# we replaced the reflector, stop the old one
asyncio.ensure_future(previous_reflector.stop())
self._stop_reflector(previous_reflector)

# return the current reflector
return self.__class__.reflectors[key]

@staticmethod
def _stop_reflector(reflector: ResourceReflector):
asyncio.ensure_future(reflector.stop())

def _start_watching_events(self, replace=False):
"""Start the events reflector
Expand All @@ -2457,15 +2456,21 @@ def _start_watching_pods(self, replace=False):
If replace=True, a running pod reflector will be stopped
and a new one started (for recovering from possible errors).
"""
pod_reflector_class = PodReflector
pod_reflector_class.labels.update({"component": self.component_label})
return self._start_reflector(
"pods",
PodReflector,
kind="pods",
reflector_class=PodReflector,
labels={"component": self.component_label},
omit_namespace=self.enable_user_namespaces,
replace=replace,
)

@classmethod
def _stop_watching_pods(cls):
pods_reflector = cls.reflectors.get("pods")
if pods_reflector:
cls._stop_reflector(pods_reflector)
cls.reflectors["pods"] = None

def start(self):
"""Thin wrapper around self._start
Expand Down Expand Up @@ -2638,6 +2643,7 @@ async def _make_create_resource_request(self, kind, manifest):
else:
return True

@_await_pod_reflector
async def _start(self):
"""Start the user's pod"""

Expand Down
131 changes: 129 additions & 2 deletions tests/test_spawner.py
Expand Up @@ -206,6 +206,51 @@ async def test_spawn_start(
assert isinstance(status, int)


async def test_spawn_component_label(
request,
kube_ns,
kube_client,
config,
hub,
):
spawner = KubeSpawner(
hub=hub,
user=MockUser(name="start"),
config=config,
api_token="abc123",
oauth_client_id="unused",
component_label="something",
)

# reflector is created for the entire class, not for every spawner instance
# and if it was already created by previous constructor run, it will left intact.
# recreate it to use component_label in selector
spawner._start_watching_pods(replace=True)

def finalizer():
# and then stop to avoid influencing other tests
KubeSpawner._stop_watching_pods()

request.addfinalizer(finalizer)

pod_name = spawner.pod_name

# start the spawner
await spawner.start()

# verify the pod exists
pods = (await kube_client.list_namespaced_pod(kube_ns)).items
pods = [p for p in pods if p.metadata.name == pod_name]
assert pods

# component label is same as expected
pod = pods[0]
assert pod.metadata.labels["component"] == "something"

# stop the pod
await spawner.stop()


async def test_spawn_internal_ssl(
kube_ns,
kube_client,
Expand Down Expand Up @@ -301,6 +346,87 @@ async def test_spawn_internal_ssl(
assert service_name not in service_names


async def test_spawn_services_enabled(
request,
kube_ns,
kube_client,
hub,
config,
):
spawner = KubeSpawner(
config=config,
hub=hub,
user=MockUser(name="services"),
api_token="abc123",
oauth_client_id="unused",
services_enabled=True,
component_label="something",
common_labels={
"some/label": "value1",
},
extra_labels={
"extra/label": "value2",
},
)

# reflector is created for the entire class, not for every spawner instance
# and if it was already created by previous constructor run, it will left intact.
# recreate it to use component_label in selector
spawner._start_watching_pods(replace=True)

def finalizer():
# and then stop to avoid influencing other tests
KubeSpawner._stop_watching_pods()

request.addfinalizer(finalizer)

# start the spawner
await spawner.start()
pod_name = "jupyter-%s" % spawner.user.name
# verify the pod exists
pods = (await kube_client.list_namespaced_pod(kube_ns)).items
pod_names = [p.metadata.name for p in pods]
assert pod_name in pod_names
# verify poll while running
status = await spawner.poll()
assert status is None

# verify service exist
service_name = pod_name
services = (await kube_client.list_namespaced_service(kube_ns)).items
services = [s for s in services if s.metadata.name == service_name]
assert services

# verify selector contains component_label, common_labels and extra_labels
# as well as user and server name
selector = services[0].spec.selector
assert selector["component"] == "something"
assert selector["some/label"] == "value1"
assert selector["extra/label"] == "value2"
assert selector["hub.jupyter.org/servername"] == ""
assert selector["hub.jupyter.org/username"] == "services"

# stop the pod
await spawner.stop()

# verify pod is gone
pods = (await kube_client.list_namespaced_pod(kube_ns)).items
pod_names = [p.metadata.name for p in pods]
assert "jupyter-%s" % spawner.user.name not in pod_names

# verify service is gone
# it may take a little while for them to get cleaned up
for _ in range(5):
services = (await kube_client.list_namespaced_service(kube_ns)).items
service_names = {s.metadata.name for s in services}
if service_name in service_names:
await asyncio.sleep(1)
else:
break

assert service_name not in service_names


async def test_spawn_after_pod_created_hook(
kube_ns,
kube_client,
Expand All @@ -314,11 +440,12 @@ async def after_pod_created_hook(spawner: KubeSpawner, pod: dict):
annotations = spawner._build_common_annotations(
spawner._expand_all(spawner.extra_annotations)
)
selector = spawner._build_pod_labels(spawner._expand_all(spawner.extra_labels))

service_manifest = make_service(
name=spawner.pod_name + "-hook",
port=spawner.port,
servername=spawner.name,
selector=selector,
owner_references=[owner_reference],
labels=labels,
annotations=annotations,
Expand All @@ -340,7 +467,7 @@ async def after_pod_created_hook(spawner: KubeSpawner, pod: dict):
spawner = KubeSpawner(
config=config,
hub=hub,
user=MockUser(name="ssl"),
user=MockUser(name="hook"),
api_token="abc123",
oauth_client_id="unused",
after_pod_created_hook=after_pod_created_hook,
Expand Down

0 comments on commit a5a171a

Please sign in to comment.