Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -491,18 +491,16 @@ default Set<Class<? extends HasMetadata>> withPreviousAnnotationForDependentReso

/**
* If the event logic should parse the resourceVersion to determine the ordering of dependent
* resource events. This is typically not needed.
* resource events.
*
* <p>Disabled by default as Kubernetes does not support, and discourages, this interpretation of
* resourceVersions. Enable only if your api server event processing seems to lag the operator
* logic, and you want to further minimize the amount of work done / updates issued by the
* operator.
* <p>Enabled by default as Kubernetes does support this interpretation of resourceVersions.
* Disable only if your api server provides non comparable resource versions..
*
* @return if resource version should be parsed (as integer)
* @since 4.5.0
*/
default boolean parseResourceVersionsForEventFilteringAndCaching() {
return false;
return true;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,11 @@ public static <P extends HasMetadata> P addFinalizerWithSSA(
}
}

public static int compareResourceVersions(HasMetadata h1, HasMetadata h2) {
return compareResourceVersions(
h1.getMetadata().getResourceVersion(), h2.getMetadata().getResourceVersion());
}

public static int compareResourceVersions(String v1, String v2) {
var v1Length = v1.length();
if (v1Length == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public class ControllerEventSource<T extends HasMetadata>

@SuppressWarnings({"unchecked", "rawtypes"})
public ControllerEventSource(Controller<T> controller) {
super(NAME, controller.getCRClient(), controller.getConfiguration(), false);
super(NAME, controller.getCRClient(), controller.getConfiguration(), true);
this.controller = controller;

final var config = controller.getConfiguration();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public InformerEventSource(
}

InformerEventSource(InformerEventSourceConfiguration<R> configuration, KubernetesClient client) {
this(configuration, client, false);
this(configuration, client, true);
}

@SuppressWarnings({"unchecked", "rawtypes"})
Expand Down Expand Up @@ -207,21 +207,8 @@ private synchronized void onAddOrUpdate(
}

private boolean canSkipEvent(R newObject, R oldObject, ResourceID resourceID) {
var res = temporaryResourceCache.getResourceFromCache(resourceID);
if (res.isEmpty()) {
return isEventKnownFromAnnotation(newObject, oldObject);
}
boolean resVersionsEqual =
newObject
.getMetadata()
.getResourceVersion()
.equals(res.get().getMetadata().getResourceVersion());
log.debug(
"Resource found in temporal cache for id: {} resource versions equal: {}",
resourceID,
resVersionsEqual);
return resVersionsEqual
|| temporaryResourceCache.isLaterResourceVersion(resourceID, res.get(), newObject);
return temporaryResourceCache.canSkipEvent(resourceID, newObject)
|| isEventKnownFromAnnotation(newObject, oldObject);
}

private boolean isEventKnownFromAnnotation(R newObject, R oldObject) {
Expand Down Expand Up @@ -301,11 +288,7 @@ public synchronized void handleRecentResourceCreate(ResourceID resourceID, R res

private void handleRecentCreateOrUpdate(Operation operation, R newResource, R oldResource) {
primaryToSecondaryIndex.onAddOrUpdate(newResource);
temporaryResourceCache.putResource(
newResource,
Optional.ofNullable(oldResource)
.map(r -> r.getMetadata().getResourceVersion())
.orElse(null));
temporaryResourceCache.putResource(newResource);
}

private boolean useSecondaryToPrimaryIndex() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,11 @@ public Optional<R> get(ResourceID resourceID) {
: r);
}

public Optional<String> getLastSyncResourceVersion(Optional<String> namespace) {
return getSource(namespace.orElse(WATCH_ALL_NAMESPACES))
.map(source -> source.getLastSyncResourceVersion());
}

@Override
public Stream<ResourceID> keys() {
return sources.values().stream().flatMap(Cache::keys);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,10 @@ public Optional<T> get(ResourceID resourceID) {
return Optional.ofNullable(cache.getByKey(getKey(resourceID)));
}

public String getLastSyncResourceVersion() {
return this.informer.lastSyncResourceVersion();
}

private String getKey(ResourceID resourceID) {
return Cache.namespaceKeyFunc(resourceID.getNamespace().orElse(null), resourceID.getName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import io.javaoperatorsdk.operator.api.config.ControllerConfiguration;
import io.javaoperatorsdk.operator.api.config.Informable;
import io.javaoperatorsdk.operator.api.config.NamespaceChangeable;
import io.javaoperatorsdk.operator.api.reconciler.PrimaryUpdateAndCacheUtils;
import io.javaoperatorsdk.operator.api.reconciler.dependent.RecentOperationCacheFiller;
import io.javaoperatorsdk.operator.health.InformerHealthIndicator;
import io.javaoperatorsdk.operator.health.InformerWrappingEventSourceHealthIndicator;
Expand Down Expand Up @@ -122,30 +123,38 @@ public synchronized void stop() {
@Override
public void handleRecentResourceUpdate(
ResourceID resourceID, R resource, R previousVersionOfResource) {
temporaryResourceCache.putResource(
resource, previousVersionOfResource.getMetadata().getResourceVersion());
temporaryResourceCache.putResource(resource);
}

@Override
public void handleRecentResourceCreate(ResourceID resourceID, R resource) {
temporaryResourceCache.putAddedResource(resource);
temporaryResourceCache.putResource(resource);
}

@Override
public Optional<R> get(ResourceID resourceID) {
var res = cache.get(resourceID);
Optional<R> resource = temporaryResourceCache.getResourceFromCache(resourceID);
if (resource.isPresent()) {
log.debug("Resource found in temporary cache for Resource ID: {}", resourceID);
if (parseResourceVersions
&& resource.isPresent()
&& res.filter(
r ->
PrimaryUpdateAndCacheUtils.compareResourceVersions(r, resource.orElseThrow())
> 0)
.isEmpty()) {
log.debug("Latest resource found in temporary cache for Resource ID: {}", resourceID);
return resource;
} else {
log.debug(
"Resource not found in temporary cache reading it from informer cache,"
+ " for Resource ID: {}",
resourceID);
var res = cache.get(resourceID);
log.debug("Resource found in cache: {} for id: {}", res.isPresent(), resourceID);
return res;
}
log.debug(
"Resource not found, or older, in temporary cache. Found in informer cache {}, for"
+ " Resource ID: {}",
res.isPresent(),
resourceID);
return res;
}

public Optional<String> getLastSyncResourceVersion(Optional<String> namespace) {
return cache.getLastSyncResourceVersion(namespace);
}

@SuppressWarnings("unused")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/
package io.javaoperatorsdk.operator.processing.event.source.informer;

import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -24,7 +23,7 @@
import org.slf4j.LoggerFactory;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.api.config.ConfigurationService;
import io.javaoperatorsdk.operator.api.reconciler.PrimaryUpdateAndCacheUtils;
import io.javaoperatorsdk.operator.processing.dependent.kubernetes.KubernetesDependentResource;
import io.javaoperatorsdk.operator.processing.event.ResourceID;

Expand All @@ -46,53 +45,10 @@
*/
public class TemporaryResourceCache<T extends HasMetadata> {

static class ExpirationCache<K> {
private final LinkedHashMap<K, Long> cache;
private final int ttlMs;

public ExpirationCache(int maxEntries, int ttlMs) {
this.ttlMs = ttlMs;
this.cache =
new LinkedHashMap<>() {
@Override
protected boolean removeEldestEntry(Map.Entry<K, Long> eldest) {
return size() > maxEntries;
}
};
}

public void add(K key) {
clean();
cache.putIfAbsent(key, System.currentTimeMillis());
}

public boolean contains(K key) {
clean();
return cache.get(key) != null;
}

void clean() {
if (!cache.isEmpty()) {
long currentTimeMillis = System.currentTimeMillis();
var iter = cache.entrySet().iterator();
// the order will already be from oldest to newest, clean a fixed number of entries to
// amortize the cost amongst multiple calls
for (int i = 0; i < 10 && iter.hasNext(); i++) {
var entry = iter.next();
if (currentTimeMillis - entry.getValue() > ttlMs) {
iter.remove();
}
}
}
}
}

private static final Logger log = LoggerFactory.getLogger(TemporaryResourceCache.class);

private final Map<ResourceID, T> cache = new ConcurrentHashMap<>();

// keep up to the last million deletions for up to 10 minutes
private final ExpirationCache<String> tombstones = new ExpirationCache<>(1000000, 1200000);
private final ManagedInformerEventSource<T, ?, ?> managedInformerEventSource;
private final boolean parseResourceVersions;

Expand All @@ -104,7 +60,6 @@ public TemporaryResourceCache(
}

public synchronized void onDeleteEvent(T resource, boolean unknownState) {
tombstones.add(resource.getMetadata().getUid());
onEvent(resource, unknownState);
}

Expand All @@ -116,74 +71,68 @@ synchronized void onEvent(T resource, boolean unknownState) {
cache.computeIfPresent(
ResourceID.fromResource(resource),
(id, cached) ->
(unknownState || !isLaterResourceVersion(id, cached, resource)) ? null : cached);
}

public synchronized void putAddedResource(T newResource) {
putResource(newResource, null);
(unknownState
|| PrimaryUpdateAndCacheUtils.compareResourceVersions(resource, cached) > 0)
Copy link
Collaborator

@csviri csviri Oct 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This shouln't be >= , so we remove the cached also when it is the same?

? null
: cached);
}

/**
* put the item into the cache if the previousResourceVersion matches the current state. If not
* the currently cached item is removed.
*
* @param previousResourceVersion null indicates an add
*/
public synchronized void putResource(T newResource, String previousResourceVersion) {
public synchronized void putResource(T newResource) {
if (!parseResourceVersions) {
return;
}

var resourceId = ResourceID.fromResource(newResource);
var cachedResource = managedInformerEventSource.get(resourceId).orElse(null);

boolean moveAhead = false;
if (previousResourceVersion == null && cachedResource == null) {
if (tombstones.contains(newResource.getMetadata().getUid())) {
log.debug(
"Won't resurrect uid {} for resource id: {}",
newResource.getMetadata().getUid(),
resourceId);
return;
}
// we can skip further checks as this is a simple add and there's no previous entry to
// consider
moveAhead = true;
if (newResource.getMetadata().getResourceVersion() == null) {
log.warn(
"Resource {}: with no resourceVersion put in temporary cache. This is not the expected"
+ " usage pattern, only resources returned from the api server should be put in the"
+ " cache.",
resourceId);
return;
}

if (moveAhead
|| (cachedResource != null
&& (cachedResource
.getMetadata()
.getResourceVersion()
.equals(previousResourceVersion))
|| isLaterResourceVersion(resourceId, newResource, cachedResource))) {
// first check against the source in general - this also prevents resurrecting resources when
// we've already seen the deletion event
String latest =
managedInformerEventSource
.getLastSyncResourceVersion(resourceId.getNamespace())
.orElse(null);
if (latest != null
&& PrimaryUpdateAndCacheUtils.compareResourceVersions(
latest, newResource.getMetadata().getResourceVersion())
> 0) {
log.debug(
"Temporarily moving ahead to target version {} for resource id: {}",
"Resource {}: resourceVersion {} is not later than latest {}",
resourceId,
newResource.getMetadata().getResourceVersion(),
resourceId);
cache.put(resourceId, newResource);
} else if (cache.remove(resourceId) != null) {
log.debug("Removed an obsolete resource from cache for id: {}", resourceId);
latest);
return;
}
}

/**
* @return true if {@link ConfigurationService#parseResourceVersionsForEventFilteringAndCaching()}
* is enabled and the resourceVersion of newResource is numerically greater than
* cachedResource, otherwise false
*/
public boolean isLaterResourceVersion(ResourceID resourceId, T newResource, T cachedResource) {
try {
if (parseResourceVersions
&& Long.parseLong(newResource.getMetadata().getResourceVersion())
> Long.parseLong(cachedResource.getMetadata().getResourceVersion())) {
return true;
}
} catch (NumberFormatException e) {
var cachedResource = managedInformerEventSource.get(resourceId).orElse(null);

if (cachedResource == null
|| PrimaryUpdateAndCacheUtils.compareResourceVersions(newResource, cachedResource) >= 0) {
log.debug(
"Could not compare resourceVersions {} and {} for {}",
"Temporarily moving ahead to target version {} for resource id: {}",
newResource.getMetadata().getResourceVersion(),
cachedResource.getMetadata().getResourceVersion(),
resourceId);
cache.put(resourceId, newResource);
}
return false;
}

public boolean canSkipEvent(ResourceID resourceID, T resource) {
return parseResourceVersions
&& getResourceFromCache(resourceID)
.filter(
cached -> PrimaryUpdateAndCacheUtils.compareResourceVersions(cached, resource) >= 0)
.isPresent();
}

public synchronized Optional<T> getResourceFromCache(ResourceID resourceID) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,12 @@ public synchronized void start() {}
}

@Test
void skipsEventPropagationIfResourceWithSameVersionInResourceCache() {
void skipsEventPropagation() {
when(temporaryResourceCacheMock.getResourceFromCache(any()))
.thenReturn(Optional.of(testDeployment()));

when(temporaryResourceCacheMock.canSkipEvent(any(), any())).thenReturn(true);

informerEventSource.onAdd(testDeployment());
informerEventSource.onUpdate(testDeployment(), testDeployment());

Expand Down
Loading