Skip to content

Commit

Permalink
Merge pull request #1552 from yue9944882/cherrypick-11-reflector-list…
Browse files Browse the repository at this point in the history
…-expiration

Cherrypick for RELEASE-11: Fixes reflector list expiration
  • Loading branch information
k8s-ci-robot committed Feb 23, 2021
2 parents b8cf5e6 + af80a09 commit d597d79
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 0 deletions.
Expand Up @@ -16,13 +16,15 @@
import io.kubernetes.client.common.KubernetesObject;
import io.kubernetes.client.informer.EventType;
import io.kubernetes.client.informer.ListerWatcher;
import io.kubernetes.client.openapi.ApiException;
import io.kubernetes.client.openapi.models.V1ListMeta;
import io.kubernetes.client.openapi.models.V1ObjectMeta;
import io.kubernetes.client.util.CallGeneratorParams;
import io.kubernetes.client.util.Strings;
import io.kubernetes.client.util.Watchable;
import java.io.IOException;
import java.net.ConnectException;
import java.net.HttpURLConnection;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -37,6 +39,9 @@ public class ReflectorRunnable<
private static final Logger log = LoggerFactory.getLogger(ReflectorRunnable.class);

private String lastSyncResourceVersion;

private boolean isLastSyncResourceVersionUnavailable;

private Watchable<ApiType> watch;

private ListerWatcher<ApiType, ApiListType> listerWatcher;
Expand Down Expand Up @@ -86,6 +91,7 @@ public void run() {
}
this.syncWith(items, resourceVersion);
this.lastSyncResourceVersion = resourceVersion;
this.isLastSyncResourceVersionUnavailable = false;

if (log.isDebugEnabled()) {
log.debug("{}#Start watching with {}...", apiTypeClass, lastSyncResourceVersion);
Expand Down Expand Up @@ -145,6 +151,13 @@ public void run() {
closeWatch();
}
}
} catch (ApiException e) {
if (e.getCode() == HttpURLConnection.HTTP_GONE) {
log.info(
"ResourceVersion {} expired, will retry w/o resourceVersion at the next time",
getRelistResourceVersion());
isLastSyncResourceVersionUnavailable = true;
}
} catch (Throwable t) {
this.exceptionHandler.accept(apiTypeClass, t);
}
Expand Down Expand Up @@ -175,8 +188,23 @@ public String getLastSyncResourceVersion() {
return lastSyncResourceVersion;
}

public boolean isLastSyncResourceVersionUnavailable() {
return isLastSyncResourceVersionUnavailable;
}

private String getRelistResourceVersion() {
if (isLastSyncResourceVersionUnavailable) {
// Since this reflector makes paginated list requests, and all paginated list requests skip
// the watch cache
// if the lastSyncResourceVersion is unavailable, we set ResourceVersion="" and list again to
// re-establish reflector
// to the latest available ResourceVersion, using a consistent read from etcd.
return "";
}
if (Strings.isNullOrEmpty(lastSyncResourceVersion)) {
// For performance reasons, initial list performed by reflector uses "0" as resource version
// to allow it to
// be served from the watch cache if it is enabled.
return "0";
}
return lastSyncResourceVersion;
Expand Down
Expand Up @@ -12,6 +12,7 @@
*/
package io.kubernetes.client.informer.cache;

import static org.junit.Assert.assertFalse;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
Expand All @@ -28,6 +29,7 @@
import io.kubernetes.client.util.CallGeneratorParams;
import io.kubernetes.client.util.Watch;
import io.kubernetes.client.util.Watchable;
import java.net.HttpURLConnection;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicReference;
import org.awaitility.Awaitility;
Expand Down Expand Up @@ -215,4 +217,42 @@ public Watchable<V1Pod> watch(CallGeneratorParams params) throws ApiException {
reflectorRunnable.stop();
}
}

@Test
public void testReflectorListShouldHandleExpiredResourceVersion() throws ApiException {
String expectedResourceVersion = "100";
when(listerWatcher.list(any()))
.thenReturn(
new V1PodList().metadata(new V1ListMeta().resourceVersion(expectedResourceVersion)));
// constantly failing watches will make the reflector run only one time
when(listerWatcher.watch(any())).thenThrow(new ApiException(HttpURLConnection.HTTP_GONE, ""));
ReflectorRunnable<V1Pod, V1PodList> reflectorRunnable =
new ReflectorRunnable<>(V1Pod.class, listerWatcher, deltaFIFO);
try {
Thread thread = new Thread(reflectorRunnable::run);
thread.setDaemon(true);
thread.start();
Awaitility.await()
.atMost(Duration.ofSeconds(1))
.pollInterval(Duration.ofMillis(100))
.until(
() -> expectedResourceVersion.equals(reflectorRunnable.getLastSyncResourceVersion()));
assertFalse(reflectorRunnable.isLastSyncResourceVersionUnavailable());
} finally {
reflectorRunnable.stop();
}

try {
when(listerWatcher.list(any())).thenThrow(new ApiException(HttpURLConnection.HTTP_GONE, ""));
Thread thread = new Thread(reflectorRunnable::run);
thread.setDaemon(true);
thread.start();
Awaitility.await()
.atMost(Duration.ofSeconds(5))
.pollInterval(Duration.ofMillis(100))
.until(() -> reflectorRunnable.isLastSyncResourceVersionUnavailable());
} finally {
reflectorRunnable.stop();
}
}
}

0 comments on commit d597d79

Please sign in to comment.