From af80a094534d694cc682613facfd6515d9a91cdb Mon Sep 17 00:00:00 2001 From: yue9944882 <291271447@qq.com> Date: Thu, 18 Feb 2021 17:47:21 +0800 Subject: [PATCH] fixes reflector list expiration --- .../informer/cache/ReflectorRunnable.java | 28 +++++++++++++ .../informer/cache/ReflectorRunnableTest.java | 40 +++++++++++++++++++ 2 files changed, 68 insertions(+) diff --git a/util/src/main/java/io/kubernetes/client/informer/cache/ReflectorRunnable.java b/util/src/main/java/io/kubernetes/client/informer/cache/ReflectorRunnable.java index d28afdbea6..0cfdc368fd 100644 --- a/util/src/main/java/io/kubernetes/client/informer/cache/ReflectorRunnable.java +++ b/util/src/main/java/io/kubernetes/client/informer/cache/ReflectorRunnable.java @@ -16,6 +16,7 @@ import io.kubernetes.client.common.KubernetesObject; import io.kubernetes.client.informer.EventType; import io.kubernetes.client.informer.ListerWatcher; +import io.kubernetes.client.openapi.ApiException; import io.kubernetes.client.openapi.models.V1ListMeta; import io.kubernetes.client.openapi.models.V1ObjectMeta; import io.kubernetes.client.util.CallGeneratorParams; @@ -23,6 +24,7 @@ import io.kubernetes.client.util.Watchable; import java.io.IOException; import java.net.ConnectException; +import java.net.HttpURLConnection; import java.time.Duration; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; @@ -37,6 +39,9 @@ public class ReflectorRunnable< private static final Logger log = LoggerFactory.getLogger(ReflectorRunnable.class); private String lastSyncResourceVersion; + + private boolean isLastSyncResourceVersionUnavailable; + private Watchable watch; private ListerWatcher listerWatcher; @@ -86,6 +91,7 @@ public void run() { } this.syncWith(items, resourceVersion); this.lastSyncResourceVersion = resourceVersion; + this.isLastSyncResourceVersionUnavailable = false; if (log.isDebugEnabled()) { log.debug("{}#Start watching with {}...", apiTypeClass, lastSyncResourceVersion); @@ -145,6 +151,13 @@ public void run() { closeWatch(); } } + } catch (ApiException e) { + if (e.getCode() == HttpURLConnection.HTTP_GONE) { + log.info( + "ResourceVersion {} expired, will retry w/o resourceVersion at the next time", + getRelistResourceVersion()); + isLastSyncResourceVersionUnavailable = true; + } } catch (Throwable t) { this.exceptionHandler.accept(apiTypeClass, t); } @@ -175,8 +188,23 @@ public String getLastSyncResourceVersion() { return lastSyncResourceVersion; } + public boolean isLastSyncResourceVersionUnavailable() { + return isLastSyncResourceVersionUnavailable; + } + private String getRelistResourceVersion() { + if (isLastSyncResourceVersionUnavailable) { + // Since this reflector makes paginated list requests, and all paginated list requests skip + // the watch cache + // if the lastSyncResourceVersion is unavailable, we set ResourceVersion="" and list again to + // re-establish reflector + // to the latest available ResourceVersion, using a consistent read from etcd. + return ""; + } if (Strings.isNullOrEmpty(lastSyncResourceVersion)) { + // For performance reasons, initial list performed by reflector uses "0" as resource version + // to allow it to + // be served from the watch cache if it is enabled. return "0"; } return lastSyncResourceVersion; diff --git a/util/src/test/java/io/kubernetes/client/informer/cache/ReflectorRunnableTest.java b/util/src/test/java/io/kubernetes/client/informer/cache/ReflectorRunnableTest.java index 9d3c35319a..7c5750396d 100644 --- a/util/src/test/java/io/kubernetes/client/informer/cache/ReflectorRunnableTest.java +++ b/util/src/test/java/io/kubernetes/client/informer/cache/ReflectorRunnableTest.java @@ -12,6 +12,7 @@ */ package io.kubernetes.client.informer.cache; +import static org.junit.Assert.assertFalse; import static org.mockito.Mockito.any; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; @@ -28,6 +29,7 @@ import io.kubernetes.client.util.CallGeneratorParams; import io.kubernetes.client.util.Watch; import io.kubernetes.client.util.Watchable; +import java.net.HttpURLConnection; import java.time.Duration; import java.util.concurrent.atomic.AtomicReference; import org.awaitility.Awaitility; @@ -215,4 +217,42 @@ public Watchable watch(CallGeneratorParams params) throws ApiException { reflectorRunnable.stop(); } } + + @Test + public void testReflectorListShouldHandleExpiredResourceVersion() throws ApiException { + String expectedResourceVersion = "100"; + when(listerWatcher.list(any())) + .thenReturn( + new V1PodList().metadata(new V1ListMeta().resourceVersion(expectedResourceVersion))); + // constantly failing watches will make the reflector run only one time + when(listerWatcher.watch(any())).thenThrow(new ApiException(HttpURLConnection.HTTP_GONE, "")); + ReflectorRunnable reflectorRunnable = + new ReflectorRunnable<>(V1Pod.class, listerWatcher, deltaFIFO); + try { + Thread thread = new Thread(reflectorRunnable::run); + thread.setDaemon(true); + thread.start(); + Awaitility.await() + .atMost(Duration.ofSeconds(1)) + .pollInterval(Duration.ofMillis(100)) + .until( + () -> expectedResourceVersion.equals(reflectorRunnable.getLastSyncResourceVersion())); + assertFalse(reflectorRunnable.isLastSyncResourceVersionUnavailable()); + } finally { + reflectorRunnable.stop(); + } + + try { + when(listerWatcher.list(any())).thenThrow(new ApiException(HttpURLConnection.HTTP_GONE, "")); + Thread thread = new Thread(reflectorRunnable::run); + thread.setDaemon(true); + thread.start(); + Awaitility.await() + .atMost(Duration.ofSeconds(5)) + .pollInterval(Duration.ofMillis(100)) + .until(() -> reflectorRunnable.isLastSyncResourceVersionUnavailable()); + } finally { + reflectorRunnable.stop(); + } + } }