Skip to content

Commit

Permalink
Merge pull request #755 from juliantaylor/reflector-improve
Browse files Browse the repository at this point in the history
improve efficiency of reflector
  • Loading branch information
yuvipanda committed Jul 25, 2023
2 parents def501f + e740189 commit 05624ab
Showing 1 changed file with 14 additions and 2 deletions.
16 changes: 14 additions & 2 deletions kubespawner/reflector.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ def __init__(self, *args, **kwargs):

self.watch_task = None

async def _list_and_update(self):
async def _list_and_update(self, resource_version=None):
"""
Update current list of resources by doing a full fetch.
Expand All @@ -221,6 +221,9 @@ async def _list_and_update(self):
_request_timeout=self.request_timeout,
_preload_content=False,
)
if resource_version is not None:
kwargs["resource_version"] = resource_version
kwargs["resource_version_match"] = "NotOlderThan"
if not self.omit_namespace:
kwargs["namespace"] = self.namespace

Expand Down Expand Up @@ -264,6 +267,11 @@ async def _watch_and_update(self):
selectors.append("field selector=%r" % self.field_selector)
log_selector = ', '.join(selectors)

# fetch Any (=api-server cached) data from apiserver on initial fetch
# see https://kubernetes.io/docs/reference/using-api/api-concepts/#resource-versions
# for more information
resource_version = "0"

cur_delay = 0.1

if self.omit_namespace:
Expand All @@ -282,7 +290,7 @@ async def _watch_and_update(self):
start = time.monotonic()
w = watch.Watch()
try:
resource_version = await self._list_and_update()
resource_version = await self._list_and_update(resource_version)
watch_args = {
"label_selector": self.label_selector,
"field_selector": self.field_selector,
Expand Down Expand Up @@ -325,6 +333,7 @@ async def _watch_and_update(self):
else:
# This is an atomic operation on the dictionary!
self.resources[ref_key] = resource
resource_version = resource["metadata"]["resourceVersion"]
if self._stopping:
self.log.info("%s watcher stopped: inner", self.kind)
break
Expand All @@ -346,6 +355,9 @@ async def _watch_and_update(self):
self.log.debug("Cancelled watching %s", self.kind)
raise
except Exception:
# ensure we request a valid resource version on retry,
# needed on 410 Gone errors
resource_version = "0"
cur_delay = cur_delay * 2
if cur_delay > 30:
self.log.exception("Watching resources never recovered, giving up")
Expand Down

0 comments on commit 05624ab

Please sign in to comment.