Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: using tombstones to account for rapid deletion #2317

Merged
merged 1 commit into from
Mar 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,17 @@ protected ManagedInformerEventSource(

@Override
public void onAdd(R resource) {
temporaryResourceCache.onEvent(resource, false);
temporaryResourceCache.onAddOrUpdateEvent(resource);
}

@Override
public void onUpdate(R oldObj, R newObj) {
temporaryResourceCache.onEvent(newObj, false);
temporaryResourceCache.onAddOrUpdateEvent(newObj);
}

@Override
public void onDelete(R obj, boolean deletedFinalStateUnknown) {
temporaryResourceCache.onEvent(obj, deletedFinalStateUnknown);
temporaryResourceCache.onDeleteEvent(obj, deletedFinalStateUnknown);
}

protected InformerManager<R, C> manager() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
package io.javaoperatorsdk.operator.processing.event.source.informer;

import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import org.slf4j.Logger;
Expand All @@ -18,8 +16,8 @@
/**
* <p>
* Temporal cache is used to solve the problem for {@link KubernetesDependentResource} that is, when
* a create or update is executed the subsequent getResource opeeration might not return the
* up-to-date resource from informer cache, since it is not received yet by webhook.
* a create or update is executed the subsequent getResource operation might not return the
* up-to-date resource from informer cache, since it is not received yet.
* </p>
* <p>
* The idea of the solution is, that since an update (for create is simpler) was done successfully,
Expand All @@ -36,31 +34,78 @@
*/
public class TemporaryResourceCache<T extends HasMetadata> {

static class ExpirationCache<K> {
private final LinkedHashMap<K, Long> cache;
private final int ttlMs;

public ExpirationCache(int maxEntries, int ttlMs) {
this.ttlMs = ttlMs;
this.cache = new LinkedHashMap<>() {
@Override
protected boolean removeEldestEntry(Map.Entry<K, Long> eldest) {
return size() > maxEntries;
}
};
}

public void add(K key) {
clean();
cache.putIfAbsent(key, System.currentTimeMillis());
}

public boolean contains(K key) {
clean();
return cache.get(key) != null;
}

void clean() {
if (!cache.isEmpty()) {
long currentTimeMillis = System.currentTimeMillis();
var iter = cache.entrySet().iterator();
// the order will already be from oldest to newest, clean a fixed number of entries to
// amortize the cost amongst multiple calls
for (int i = 0; i < 10 && iter.hasNext(); i++) {
var entry = iter.next();
if (currentTimeMillis - entry.getValue() > ttlMs) {
iter.remove();
}
}
}
}
}

private static final Logger log = LoggerFactory.getLogger(TemporaryResourceCache.class);
private static final int MAX_RESOURCE_VERSIONS = 256;

private final Map<ResourceID, T> cache = new ConcurrentHashMap<>();

// keep up to the last million deletions for up to 10 minutes
private final ExpirationCache<String> tombstones = new ExpirationCache<>(1000000, 1200000);
private final ManagedInformerEventSource<T, ?, ?> managedInformerEventSource;
private final boolean parseResourceVersions;
private final Set<String> knownResourceVersions;
private final ExpirationCache<String> knownResourceVersions;

public TemporaryResourceCache(ManagedInformerEventSource<T, ?, ?> managedInformerEventSource,
boolean parseResourceVersions) {
this.managedInformerEventSource = managedInformerEventSource;
this.parseResourceVersions = parseResourceVersions;
if (parseResourceVersions) {
knownResourceVersions = Collections.newSetFromMap(new LinkedHashMap<String, Boolean>() {
@Override
protected boolean removeEldestEntry(java.util.Map.Entry<String, Boolean> eldest) {
return size() >= MAX_RESOURCE_VERSIONS;
}
});
// keep up to the 50000 add/updates for up to 5 minutes
knownResourceVersions = new ExpirationCache<>(50000, 600000);
} else {
knownResourceVersions = null;
}
}

public synchronized void onEvent(T resource, boolean unknownState) {
public synchronized void onDeleteEvent(T resource, boolean unknownState) {
tombstones.add(resource.getMetadata().getUid());
onEvent(resource, unknownState);
}

public synchronized void onAddOrUpdateEvent(T resource) {
onEvent(resource, false);
}

synchronized void onEvent(T resource, boolean unknownState) {
cache.computeIfPresent(ResourceID.fromResource(resource),
(id, cached) -> (unknownState || !isLaterResourceVersion(id, cached, resource)) ? null
: cached);
Expand All @@ -84,20 +129,33 @@ public synchronized void putResource(T newResource, String previousResourceVersi
var cachedResource = getResourceFromCache(resourceId)
.orElse(managedInformerEventSource.get(resourceId).orElse(null));

if ((previousResourceVersion == null && cachedResource == null)
boolean moveAhead = false;
if (previousResourceVersion == null && cachedResource == null) {
if (tombstones.contains(newResource.getMetadata().getUid())) {
log.debug(
"Won't resurrect uid {} for resource id: {}",
newResource.getMetadata().getUid(), resourceId);
return;
}
// we can skip further checks as this is a simple add and there's no previous entry to
// consider
moveAhead = true;
csviri marked this conversation as resolved.
Show resolved Hide resolved
}

if (moveAhead
|| (cachedResource != null
&& (cachedResource.getMetadata().getResourceVersion().equals(previousResourceVersion))
|| isLaterResourceVersion(resourceId, newResource, cachedResource))) {
log.debug(
"Temporarily moving ahead to target version {} for resource id: {}",
newResource.getMetadata().getResourceVersion(), resourceId);
putToCache(newResource, resourceId);
cache.put(resourceId, newResource);
} else if (cache.remove(resourceId) != null) {
log.debug("Removed an obsolete resource from cache for id: {}", resourceId);
}
}

public boolean isKnownResourceVersion(T resource) {
public synchronized boolean isKnownResourceVersion(T resource) {
return knownResourceVersions != null
&& knownResourceVersions.contains(resource.getMetadata().getResourceVersion());
}
Expand All @@ -123,10 +181,6 @@ private boolean isLaterResourceVersion(ResourceID resourceId, T newResource, T c
return false;
}

private void putToCache(T resource, ResourceID resourceID) {
cache.put(resourceID == null ? ResourceID.fromResource(resource) : resourceID, resource);
}

public synchronized Optional<T> getResourceFromCache(ResourceID resourceID) {
return Optional.ofNullable(cache.get(resourceID));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ void propagateEventAndRemoveResourceFromTempCacheIfResourceVersionMismatch() {
informerEventSource.onUpdate(cachedDeployment, testDeployment());

verify(eventHandlerMock, times(1)).handleEvent(any());
verify(temporaryResourceCacheMock, times(1)).onEvent(testDeployment(), false);
verify(temporaryResourceCacheMock, times(1)).onAddOrUpdateEvent(testDeployment());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,17 @@

import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;

import org.awaitility.Awaitility;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import io.fabric8.kubernetes.api.model.ConfigMap;
import io.fabric8.kubernetes.api.model.ConfigMapBuilder;
import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import io.javaoperatorsdk.operator.processing.event.source.informer.TemporaryResourceCache.ExpirationCache;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
Expand Down Expand Up @@ -81,7 +84,7 @@ void addOperationNotAddsTheResourceIfInformerCacheNotEmpty() {
void removesResourceFromCache() {
ConfigMap testResource = propagateTestResourceToCache();

temporaryResourceCache.onEvent(testResource(), false);
temporaryResourceCache.onAddOrUpdateEvent(testResource());

assertThat(temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(testResource)))
.isNotPresent();
Expand All @@ -96,20 +99,59 @@ void resourceVersionParsing() {
ConfigMap testResource = propagateTestResourceToCache();

// an event with a newer version will not remove
temporaryResourceCache.onEvent(new ConfigMapBuilder(testResource).editMetadata()
.withResourceVersion("1").endMetadata().build(), false);
temporaryResourceCache.onAddOrUpdateEvent(new ConfigMapBuilder(testResource).editMetadata()
.withResourceVersion("1").endMetadata().build());

assertThat(temporaryResourceCache.isKnownResourceVersion(testResource)).isTrue();
assertThat(temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(testResource)))
.isPresent();

// anything else will remove
temporaryResourceCache.onEvent(testResource(), false);
temporaryResourceCache.onAddOrUpdateEvent(testResource());

assertThat(temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(testResource)))
.isNotPresent();
}

@Test
void rapidDeletion() {
var testResource = testResource();

temporaryResourceCache.onAddOrUpdateEvent(testResource);
temporaryResourceCache.onDeleteEvent(new ConfigMapBuilder(testResource).editMetadata()
.withResourceVersion("3").endMetadata().build(), false);
temporaryResourceCache.putAddedResource(testResource);

assertThat(temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(testResource)))
.isEmpty();
}

@Test
void expirationCacheMax() {
ExpirationCache<Integer> cache = new ExpirationCache<>(2, Integer.MAX_VALUE);

cache.add(1);
cache.add(2);
cache.add(3);

assertThat(cache.contains(1)).isFalse();
assertThat(cache.contains(2)).isTrue();
assertThat(cache.contains(3)).isTrue();
}

@Test
void expirationCacheTtl() {
ExpirationCache<Integer> cache = new ExpirationCache<>(2, 1);

cache.add(1);
cache.add(2);

Awaitility.await().atMost(1, TimeUnit.SECONDS).untilAsserted(() -> {
assertThat(cache.contains(1)).isFalse();
assertThat(cache.contains(2)).isFalse();
});
}

private ConfigMap propagateTestResourceToCache() {
var testResource = testResource();
when(informerEventSource.get(any())).thenReturn(Optional.empty());
Expand All @@ -127,6 +169,7 @@ ConfigMap testResource() {
configMap.getMetadata().setName("test");
configMap.getMetadata().setNamespace("default");
configMap.getMetadata().setResourceVersion(RESOURCE_VERSION);
configMap.getMetadata().setUid("test-uid");
return configMap;
}

Expand Down