Skip to content

Commit

Permalink
Merge pull request #678 from dolfinus/watch_multiple_namespaces
Browse files Browse the repository at this point in the history
Allow to watch multiple namespaces at the same time
  • Loading branch information
consideRatio committed Apr 10, 2023
2 parents 77ab30e + c2097ee commit bc03c79
Show file tree
Hide file tree
Showing 4 changed files with 445 additions and 177 deletions.
190 changes: 107 additions & 83 deletions kubespawner/spawner.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
import string
import sys
import warnings
from functools import partial, wraps
from functools import partial
from typing import Optional, Tuple, Type
from urllib.parse import urlparse

import escapism
Expand Down Expand Up @@ -117,30 +118,37 @@ class KubeSpawner(Spawner):
# Reflectors keeping track of the k8s api-server's state for various k8s
# resources are singletons as that state can be tracked and shared by all
# KubeSpawner objects.
reflectors = {
"pods": None,
"events": None,
}
reflectors = {}

# Characters as defined by safe for DNS
# Note: '-' is not in safe_chars, as it is being used as escape character
safe_chars = set(string.ascii_lowercase + string.digits)

def _get_reflector_key(self, kind: str) -> Tuple[str, str, Optional[str]]:
if self.enable_user_namespaces:
# one reflector fo all namespaces
return (kind, None)

return (kind, self.namespace)

@property
def pod_reflector(self):
"""
A convenience alias to the class variable reflectors['pods'].
Returns instance of ResourceReflector for pods.
"""
return self.__class__.reflectors['pods']
key = self._get_reflector_key('pods')
return self.__class__.reflectors.get(key, None)

@property
def event_reflector(self):
"""
A convenience alias to the class variable reflectors['events'] if the
Returns instance of ResourceReflector for events, if the
spawner instance has events_enabled.
"""
if self.events_enabled:
return self.__class__.reflectors['events']
key = self._get_reflector_key('events')
return self.__class__.reflectors.get(key, None)
return None

def __init__(self, *args, **kwargs):
_mock = kwargs.pop('_mock', False)
Expand Down Expand Up @@ -194,43 +202,6 @@ def __init__(self, *args, **kwargs):
load_config(host=self.k8s_api_host, ssl_ca_cert=self.k8s_api_ssl_ca_cert)
self.api = shared_client("CoreV1Api")

self._start_watching_pods()
if self.events_enabled:
self._start_watching_events()

def _await_pod_reflector(method):
"""Decorator to wait for pod reflector to load
Apply to methods which require the pod reflector
to have completed its first load of pods.
"""

@wraps(method)
async def async_method(self, *args, **kwargs):
if not self.pod_reflector.first_load_future.done():
await self.pod_reflector.first_load_future
return await method(self, *args, **kwargs)

return async_method

def _await_event_reflector(method):
"""Decorator to wait for event reflector to load
Apply to methods which require the event reflector
to have completed its first load of events.
"""

@wraps(method)
async def async_method(self, *args, **kwargs):
if (
self.events_enabled
and not self.event_reflector.first_load_future.done()
):
await self.event_reflector.first_load_future
return await method(self, *args, **kwargs)

return async_method

k8s_api_ssl_ca_cert = Unicode(
"",
config=True,
Expand Down Expand Up @@ -2242,7 +2213,6 @@ def load_state(self, state):
if 'dns_name' in state:
self.dns_name = state['dns_name']

@_await_pod_reflector
async def poll(self):
"""
Check if the pod is still running.
Expand All @@ -2256,6 +2226,9 @@ async def poll(self):
necessary to check that the returned value is None, rather than
just Falsy, to determine that the pod is still running.
"""

await self._start_watching_pods()

ref_key = f"{self.namespace}/{self.pod_name}"
pod = self.pod_reflector.pods.get(ref_key, None)
if pod is not None:
Expand Down Expand Up @@ -2345,9 +2318,12 @@ async def progress(self):
and here is the specification of events that is relevant to understand:
ref: https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.20/#event-v1-core
"""

if not self.events_enabled:
return

await self._start_watching_events()

self.log.debug('progress generator: %s', self.pod_name)
start_future = self._start_future
progress = 0
Expand Down Expand Up @@ -2397,16 +2373,16 @@ async def progress(self):
break
await asyncio.sleep(1)

def _start_reflector(
async def _start_reflector(
self,
kind,
reflector_class,
replace=False,
kind: str,
reflector_class: Type[ResourceReflector],
replace: bool = False,
**kwargs,
):
"""Start a shared reflector on the KubeSpawner class
kind: key for the reflector (e.g. 'pod' or 'events')
kind: used to generate key to store reflector shared instance (e.g. 'pod' or 'events')
reflector_class: Reflector class to be instantiated
kwargs: extra keyword-args to be relayed to ReflectorClass
Expand All @@ -2416,44 +2392,64 @@ def _start_reflector(
If replace=True, a running pod reflector will be stopped
and a new one started (for recovering from possible errors).
"""
key = kind
ReflectorClass = reflector_class

def on_reflector_failure():
self.log.critical(
"%s reflector failed, halting Hub.",
key.title(),
)
sys.exit(1)
key = self._get_reflector_key(kind)
previous_reflector = self.__class__.reflectors.get(key, None)

previous_reflector = self.__class__.reflectors.get(key)
if previous_reflector and not replace:
# fast path
return previous_reflector

if replace or not previous_reflector:
self.__class__.reflectors[key] = ReflectorClass(
parent=self,
namespace=self.namespace,
on_failure=on_reflector_failure,
**kwargs,
)
f = asyncio.ensure_future(self.__class__.reflectors[key].start())
if self.enable_user_namespaces:
# Create one reflector for all namespaces.
# This requires binding ServiceAccount to ClusterRole.

def on_reflector_failure():
# If reflector cannot be started, halt the JH application.
self.log.critical(
"Reflector with key %r reflector, halting Hub.",
key,
)
sys.exit(1)

async def catch_reflector_start():
async def catch_reflector_start(func):
try:
await f
await func
except Exception:
self.log.exception(f"Reflector for {kind} failed to start.")
self.log.exception(f"Reflector with key {key} failed to start.")
sys.exit(1)

asyncio.create_task(catch_reflector_start())
else:
# Create a dedicated reflector for each namespace.
# This allows JH to run pods in multiple namespaces without binding ServiceAccount to ClusterRole.

on_reflector_failure = None

async def catch_reflector_start(func):
# If reflector cannot be started (e.g. insufficient access rights, namespace cannot be found),
# just raise an exception instead halting the entire JH application.
try:
await func
except Exception:
self.log.exception(f"Reflector with key {key} failed to start.")
raise

self.__class__.reflectors[key] = current_reflector = reflector_class(
parent=self,
namespace=self.namespace,
on_failure=on_reflector_failure,
**kwargs,
)
await catch_reflector_start(current_reflector.start())

if replace and previous_reflector:
if previous_reflector:
# we replaced the reflector, stop the old one
asyncio.ensure_future(previous_reflector.stop())

# return the current reflector
return self.__class__.reflectors[key]
return current_reflector

def _start_watching_events(self, replace=False):
async def _start_watching_events(self, replace=False):
"""Start the events reflector
If replace=False and the event reflector is already running,
Expand All @@ -2462,31 +2458,46 @@ def _start_watching_events(self, replace=False):
If replace=True, a running pod reflector will be stopped
and a new one started (for recovering from possible errors).
"""
return self._start_reflector(
return await self._start_reflector(
kind="events",
reflector_class=EventReflector,
fields={"involvedObject.kind": "Pod"},
omit_namespace=self.enable_user_namespaces,
replace=replace,
)

def _start_watching_pods(self, replace=False):
"""Start the pod reflector
async def _start_watching_pods(self, replace=False):
"""Start the pods reflector
If replace=False and the pod reflector is already running,
do nothing.
If replace=True, a running pod reflector will be stopped
and a new one started (for recovering from possible errors).
"""
return self._start_reflector(
return await self._start_reflector(
kind="pods",
reflector_class=PodReflector,
labels={"component": self.component_label},
omit_namespace=self.enable_user_namespaces,
replace=replace,
)

@classmethod
async def _stop_all_reflectors(cls):
"""Stop reflectors for all instances, a function used when running tests."""
tasks = []
for key in list(cls.reflectors.keys()):
reflector = cls.reflectors.pop(key)
tasks.append(reflector.stop())

try:
await asyncio.gather(*tasks)
except Exception:
for task in tasks:
task.cancel()
raise

def start(self):
"""Thin wrapper around self._start
Expand Down Expand Up @@ -2670,6 +2681,18 @@ async def _start(self):
if self.enable_user_namespaces:
await self._ensure_namespace()

# namespace can be changed via kubespawner_override, start watching pods only after
# load_user_options() is called
start_futures = [self._start_watching_pods()]
if self.events_enabled:
start_futures.append(self._start_watching_events())
try:
await asyncio.gather(*start_futures)
except Exception:
for future in start_futures:
future.cancel()
raise

# record latest event so we don't include old
# events from previous pods in self.events
# track by order and name instead of uid
Expand Down Expand Up @@ -2792,7 +2815,7 @@ async def _start(self):
ref_key,
)
self.log.error(f"Pods: {self.pod_reflector.pods}")
self._start_watching_pods(replace=True)
asyncio.ensure_future(self._start_watching_pods(replace=True))
raise

pod = self.pod_reflector.pods[ref_key]
Expand Down Expand Up @@ -2881,8 +2904,9 @@ async def _make_delete_pvc_request(self, pvc_name, request_timeout):
else:
raise

@_await_pod_reflector
async def stop(self, now=False):
await self._start_watching_pods()

delete_options = client.V1DeleteOptions()

if now:
Expand Down Expand Up @@ -2916,7 +2940,7 @@ async def stop(self, now=False):
self.log.error(
"Pod %s did not disappear, restarting pod reflector", ref_key
)
self._start_watching_pods(replace=True)
asyncio.ensure_future(self._start_watching_pods(replace=True))
raise

@default('env_keep')
Expand Down
16 changes: 5 additions & 11 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -638,19 +638,13 @@ async def exec_python(kube_client, kube_ns):


@pytest.fixture(scope="function")
def reset_pod_reflector():
async def reset_pod_reflectors():
"""
Resets the class state KubeSpawner.reflectors["pods"] before and after the
Resets the class state KubeSpawner.reflectors before and after the
test function executes. This enables us to start fresh if a test needs to
test configuration influencing our singleton pod reflector.
test configuration influencing the pod reflector options.
"""

def _reset_pod_reflector():
pods_reflector = KubeSpawner.reflectors["pods"]
KubeSpawner.reflectors["pods"] = None
if pods_reflector:
asyncio.ensure_future(pods_reflector.stop())

_reset_pod_reflector()
await KubeSpawner._stop_all_reflectors()
yield
_reset_pod_reflector()
await KubeSpawner._stop_all_reflectors()

0 comments on commit bc03c79

Please sign in to comment.