Skip to content

feat: support indexers in informer related event sources #1131

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Apr 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,13 @@ private InformerConfigurationBuilder(Class<R> resourceClass) {
this.resourceClass = resourceClass;
}

public InformerConfigurationBuilder<R, P> withPrimaryResourcesRetriever(
public InformerConfigurationBuilder<R, P> withSecondaryToPrimaryMapper(
SecondaryToPrimaryMapper<R> secondaryToPrimaryMapper) {
this.secondaryToPrimaryResourcesIdSet = secondaryToPrimaryMapper;
return this;
}

public InformerConfigurationBuilder<R, P> withAssociatedSecondaryResourceIdentifier(
public InformerConfigurationBuilder<R, P> withPrimaryToSecondaryMapper(
PrimaryToSecondaryMapper<P> associatedWith) {
this.associatedWith = associatedWith;
return this;
Expand Down Expand Up @@ -115,8 +115,8 @@ static <R extends HasMetadata, P extends HasMetadata> InformerConfigurationBuild
return new InformerConfigurationBuilder<R, P>(configuration.getResourceClass())
.withNamespaces(configuration.getNamespaces())
.withLabelSelector(configuration.getLabelSelector())
.withAssociatedSecondaryResourceIdentifier(
.withPrimaryToSecondaryMapper(
configuration.getPrimaryToSecondaryMapper())
.withPrimaryResourcesRetriever(configuration.getSecondaryToPrimaryMapper());
.withSecondaryToPrimaryMapper(configuration.getSecondaryToPrimaryMapper());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import io.fabric8.kubernetes.client.KubernetesClient;
import io.javaoperatorsdk.operator.api.config.ControllerConfiguration;
import io.javaoperatorsdk.operator.processing.event.source.EventSource;
import io.javaoperatorsdk.operator.processing.event.source.ResourceCache;
import io.javaoperatorsdk.operator.processing.event.source.IndexerResourceCache;

/**
* Contextual information made available to event sources.
Expand All @@ -13,11 +13,11 @@
*/
public class EventSourceContext<P extends HasMetadata> {

private final ResourceCache<P> primaryCache;
private final IndexerResourceCache<P> primaryCache;
private final ControllerConfiguration<P> controllerConfiguration;
private final KubernetesClient client;

public EventSourceContext(ResourceCache<P> primaryCache,
public EventSourceContext(IndexerResourceCache<P> primaryCache,
ControllerConfiguration<P> controllerConfiguration,
KubernetesClient client) {
this.primaryCache = primaryCache;
Expand All @@ -30,7 +30,7 @@ public EventSourceContext(ResourceCache<P> primaryCache,
*
* @return the primary resource cache
*/
public ResourceCache<P> getPrimaryCache() {
public IndexerResourceCache<P> getPrimaryCache() {
return primaryCache;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ private void configureWith(String labelSelector, Set<String> namespaces) {
InformerConfiguration.from(resourceType())
.withLabelSelector(labelSelector)
.withNamespaces(namespaces)
.withPrimaryResourcesRetriever(primaryResourcesRetriever)
.withAssociatedSecondaryResourceIdentifier(secondaryResourceIdentifier)
.withSecondaryToPrimaryMapper(primaryResourcesRetriever)
.withPrimaryToSecondaryMapper(secondaryResourceIdentifier)
.build();
configureWith(new InformerEventSource<>(ic, client));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package io.javaoperatorsdk.operator.processing.event.source;

import java.util.List;
import java.util.Map;
import java.util.function.Function;

import io.fabric8.kubernetes.api.model.HasMetadata;

public interface IndexerResourceCache<T extends HasMetadata> extends ResourceCache<T> {

void addIndexers(Map<String, Function<T, List<String>>> indexers);

default void addIndexer(String name, Function<T, List<String>> indexer) {
addIndexers(Map.of(name, indexer));
}

List<T> byIndex(String indexName, String indexKey);

}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import io.javaoperatorsdk.operator.processing.event.Event;
import io.javaoperatorsdk.operator.processing.event.EventHandler;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import io.javaoperatorsdk.operator.processing.event.source.ResourceCache;

/**
* <p>
Expand Down Expand Up @@ -64,7 +63,7 @@
*/
public class InformerEventSource<R extends HasMetadata, P extends HasMetadata>
extends ManagedInformerEventSource<R, P, InformerConfiguration<R, P>>
implements ResourceCache<R>, ResourceEventHandler<R>, RecentOperationEventFilter<R> {
implements ResourceEventHandler<R>, RecentOperationEventFilter<R> {

private static final Logger log = LoggerFactory.getLogger(InformerEventSource.class);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package io.javaoperatorsdk.operator.processing.event.source.informer;

import java.util.Map;
import java.util.Optional;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.slf4j.Logger;
Expand All @@ -22,11 +23,11 @@
import io.javaoperatorsdk.operator.processing.LifecycleAware;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import io.javaoperatorsdk.operator.processing.event.source.Cache;
import io.javaoperatorsdk.operator.processing.event.source.ResourceCache;
import io.javaoperatorsdk.operator.processing.event.source.IndexerResourceCache;
import io.javaoperatorsdk.operator.processing.event.source.UpdatableCache;

public class InformerManager<T extends HasMetadata, C extends ResourceConfiguration<T>>
implements LifecycleAware, ResourceCache<T>, UpdatableCache<T> {
implements LifecycleAware, IndexerResourceCache<T>, UpdatableCache<T> {

private static final String ANY_NAMESPACE_MAP_KEY = "anyNamespace";
private static final Logger log = LoggerFactory.getLogger(InformerManager.class);
Expand Down Expand Up @@ -89,7 +90,7 @@ public void stop() {
@Override
public Stream<T> list(Predicate<T> predicate) {
if (predicate == null) {
return sources.values().stream().flatMap(ResourceCache::list);
return sources.values().stream().flatMap(IndexerResourceCache::list);
}
return sources.values().stream().flatMap(i -> i.list(predicate));
}
Expand Down Expand Up @@ -144,4 +145,14 @@ public void put(ResourceID key, T resource) {
key, resource));
}

@Override
public void addIndexers(Map<String, Function<T, List<String>>> indexers) {
sources.values().forEach(s -> s.addIndexers(indexers));
}

@Override
public List<T> byIndex(String indexName, String indexKey) {
return sources.values().stream().map(s -> s.byIndex(indexName, indexKey))
.flatMap(List::stream).collect(Collectors.toList());
}
}

This file was deleted.

Loading