Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public Optional<RetryInfo> getRetryInfo() {
public <T> Optional<T> getSecondaryResource(Class<T> expectedType, String eventSourceName) {
return controller.getEventSourceManager()
.getResourceEventSourceFor(expectedType, eventSourceName)
.flatMap(es -> es.getSecondaryResource(primaryResource));
.getSecondaryResource(primaryResource);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,30 @@ public interface EventSourceInitializer<P extends HasMetadata> {
Map<String, EventSource> prepareEventSources(EventSourceContext<P> context);

/**
* Utility method to easily create map with default names of event sources.
* Utility method to easily create map with generated name for event sources. This is for the use
* case when the event sources are not access explicitly by name in the reconciler.
*
* @param eventSources to name
* @return even source with default names
*/
static Map<String, EventSource> defaultNamedEventSources(EventSource... eventSources) {
static Map<String, EventSource> nameEventSources(EventSource... eventSources) {
Map<String, EventSource> eventSourceMap = new HashMap<>(eventSources.length);
for (EventSource eventSource : eventSources) {
eventSourceMap.put(EventSource.defaultNameFor(eventSource), eventSource);
eventSourceMap.put(generateNameFor(eventSource), eventSource);
}
return eventSourceMap;
}

/**
* This is for the use case when the event sources are not access explicitly by name in the
* reconciler.
*
* @param eventSource EventSource
* @return generated name
*/
static String generateNameFor(EventSource eventSource) {
// we can have multiple event sources for the same class
return eventSource.getClass().getName() + "#" + eventSource.hashCode();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import java.util.LinkedHashSet;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
Expand All @@ -13,6 +12,7 @@
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.MissingCRDException;
import io.javaoperatorsdk.operator.OperatorException;
import io.javaoperatorsdk.operator.api.reconciler.EventSourceInitializer;
import io.javaoperatorsdk.operator.processing.Controller;
import io.javaoperatorsdk.operator.processing.LifecycleAware;
import io.javaoperatorsdk.operator.processing.event.source.EventSource;
Expand Down Expand Up @@ -123,7 +123,7 @@ public final void registerEventSource(String name, EventSource eventSource)
lock.lock();
try {
if (name == null || name.isBlank()) {
name = EventSource.defaultNameFor(eventSource);
name = EventSourceInitializer.generateNameFor(eventSource);
}
eventSources.add(name, eventSource);
eventSource.setEventHandler(eventProcessor);
Expand Down Expand Up @@ -171,19 +171,15 @@ public ControllerResourceEventSource<R> getControllerResourceEventSource() {
return eventSources.controllerResourceEventSource();
}

public <S> Optional<ResourceEventSource<S, R>> getResourceEventSourceFor(
<S> ResourceEventSource<S, R> getResourceEventSourceFor(
Class<S> dependentType) {
return getResourceEventSourceFor(dependentType, null);
}

public <S> Optional<ResourceEventSource<S, R>> getResourceEventSourceFor(
public <S> ResourceEventSource<S, R> getResourceEventSourceFor(
Class<S> dependentType, String qualifier) {
if (dependentType == null) {
return Optional.empty();
}
String name = qualifier == null ? "" : qualifier;
final var eventSource = eventSources.get(dependentType, name);
return Optional.ofNullable(eventSource);
Objects.requireNonNull(dependentType, "dependentType is Mandatory");
return eventSources.get(dependentType, qualifier);
}

TimerEventSource<R> retryEventSource() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,48 +61,38 @@ public boolean contains(String name, EventSource source) {
public void add(String name, EventSource eventSource) {
if (contains(name, eventSource)) {
throw new IllegalArgumentException("An event source is already registered for the "
+ keyAsString(getDependentType(eventSource), name)
+ keyAsString(getResourceType(eventSource), name)
+ " class/name combination");
}
sources.computeIfAbsent(keyFor(eventSource), k -> new HashMap<>()).put(name, eventSource);
}

@SuppressWarnings("rawtypes")
private Class<?> getDependentType(EventSource source) {
private Class<?> getResourceType(EventSource source) {
return source instanceof ResourceOwner
? ((ResourceOwner) source).resourceType()
: source.getClass();
}

private String keyFor(EventSource source) {
return keyFor(getDependentType(source));
return keyFor(getResourceType(source));
}

private String keyFor(Class<?> dependentType) {
var key = dependentType.getCanonicalName();

// make sure timer event source is started first, then controller event source
// this is needed so that these sources are set when informer sources start so that events can
// properly be processed
if (controllerResourceEventSource != null
&& key.equals(controllerResourceEventSource.resourceType().getCanonicalName())) {
key = 1 + "-" + key;
} else if (key.equals(retryAndRescheduleTimerEventSource.getClass().getCanonicalName())) {
key = 0 + "-" + key;
}
return key;
return dependentType.getCanonicalName();
}

@SuppressWarnings("unchecked")
public <S> ResourceEventSource<S, R> get(Class<S> dependentType, String name) {
final var sourcesForType = sources.get(keyFor(dependentType));
if (sourcesForType == null || sourcesForType.isEmpty()) {
return null;
throw new IllegalArgumentException(
"There is no event source found for class:" + dependentType.getName());
}

final var size = sourcesForType.size();
final EventSource source;
if (size == 1) {
if (size == 1 && name == null) {
source = sourcesForType.values().stream().findFirst().orElse(null);
} else {
if (name == null || name.isBlank()) {
Expand All @@ -114,7 +104,8 @@ public <S> ResourceEventSource<S, R> get(Class<S> dependentType, String name) {
source = sourcesForType.get(name);

if (source == null) {
return null;
throw new IllegalArgumentException("There is no event source found for class:" +
" " + dependentType.getName() + ", name:" + name);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,5 @@ public interface EventSource extends LifecycleAware {
*/
void setEventHandler(EventHandler handler);

static String defaultNameFor(EventSource source) {
return source.getClass().getCanonicalName();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package io.javaoperatorsdk.operator.api.reconciler;

import java.util.HashMap;

import org.junit.jupiter.api.Test;

import io.javaoperatorsdk.operator.processing.event.source.polling.PollingEventSource;

import static org.assertj.core.api.Assertions.assertThat;

class EventSourceInitializerTest {

@Test
void defaultNameDifferentForOtherInstance() {
var eventSource1 = new PollingEventSource(() -> new HashMap<>(), 1000, String.class);
var eventSource2 = new PollingEventSource(() -> new HashMap<>(), 1000, String.class);
var eventSourceName1 = EventSourceInitializer.generateNameFor(eventSource1);
var eventSourceName2 = EventSourceInitializer.generateNameFor(eventSource2);

assertThat(eventSourceName1).isNotEqualTo(eventSourceName2);
}

}
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package io.javaoperatorsdk.operator.processing.event;

import java.util.Iterator;
import java.util.Optional;
import java.util.Set;

import org.junit.jupiter.api.Test;
Expand All @@ -19,10 +17,10 @@
import io.javaoperatorsdk.operator.sample.simple.TestCustomResource;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
Expand Down Expand Up @@ -76,22 +74,24 @@ public void startCascadesToEventSources() {

@Test
void retrievingEventSourceForClassShouldWork() {
assertTrue(eventSourceManager.getResourceEventSourceFor(null).isEmpty());
assertTrue(eventSourceManager.getResourceEventSourceFor(Class.class).isEmpty());
assertThatExceptionOfType(IllegalArgumentException.class)
.isThrownBy(() -> eventSourceManager.getResourceEventSourceFor(Class.class));

// manager is initialized with a controller configured to handle HasMetadata
EventSourceManager manager = initManager();
Optional<EventSource> source = manager.getResourceEventSourceFor(HasMetadata.class);
assertTrue(source.isPresent());
assertTrue(source.get() instanceof ControllerResourceEventSource);
EventSource source = manager.getResourceEventSourceFor(HasMetadata.class);
assertThat(source).isInstanceOf(ControllerResourceEventSource.class);

assertThatExceptionOfType(IllegalArgumentException.class)
.isThrownBy(() -> manager.getResourceEventSourceFor(HasMetadata.class, "unknown_name"));

CachingEventSource eventSource = mock(CachingEventSource.class);
when(eventSource.resourceType()).thenReturn(String.class);
manager.registerEventSource(eventSource);

source = manager.getResourceEventSourceFor(String.class);
assertTrue(source.isPresent());
assertEquals(eventSource, source.get());
assertThat(source).isNotNull();
assertEquals(eventSource, source);
}

@Test
Expand Down Expand Up @@ -133,41 +133,12 @@ void retrievingAnEventSourceWhenMultipleAreRegisteredForATypeShouldRequireAQuali
assertTrue(exception.getMessage().contains("name1"));
assertTrue(exception.getMessage().contains("name2"));

assertEquals(manager.getResourceEventSourceFor(TestCustomResource.class, "name2").get(),
assertEquals(manager.getResourceEventSourceFor(TestCustomResource.class, "name2"),
eventSource2);
assertEquals(manager.getResourceEventSourceFor(TestCustomResource.class, "name1").get(),
assertEquals(manager.getResourceEventSourceFor(TestCustomResource.class, "name1"),
eventSource);
}

@Test
void timerAndControllerEventSourcesShouldBeListedFirst() {
EventSourceManager manager = initManager();

CachingEventSource eventSource = mock(CachingEventSource.class);
when(eventSource.resourceType()).thenReturn(String.class);
manager.registerEventSource(eventSource);

final Set<EventSource> sources = manager.getRegisteredEventSources();
assertEquals(3, sources.size());
final Iterator<EventSource> iterator = sources.iterator();
for (int i = 0; i < sources.size(); i++) {
final EventSource source = iterator.next();
switch (i) {
case 0:
assertTrue(source instanceof TimerEventSource);
break;
case 1:
assertTrue(source instanceof ControllerResourceEventSource);
break;
case 2:
assertTrue(source instanceof CachingEventSource);
break;
default:
fail();
}
}
}

private EventSourceManager initManager() {
final ControllerConfiguration configuration = mock(ControllerConfiguration.class);
when(configuration.getResourceClass()).thenReturn(HasMetadata.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package io.javaoperatorsdk.operator.processing.event;

import org.junit.jupiter.api.Test;

import io.javaoperatorsdk.operator.processing.event.source.EventSource;

import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.Mockito.mock;

class EventSourcesTest {

EventSources eventSources = new EventSources();

@Test
void cannotAddTwoEventSourcesWithSameName() {
assertThrows(IllegalArgumentException.class, () -> {
eventSources.add("name", mock(EventSource.class));
eventSources.add("name", mock(EventSource.class));
});
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public Map<String, EventSource> prepareEventSources(
.withLabelSelector("integrationtest = " + this.getClass().getSimpleName())
.build();
informerEventSource = new InformerEventSource<>(informerConfiguration, client);
return EventSourceInitializer.defaultNamedEventSources(informerEventSource);
return EventSourceInitializer.nameEventSources(informerEventSource);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public Map<String, EventSource> prepareEventSources(
.build();

return EventSourceInitializer
.defaultNamedEventSources(new InformerEventSource<>(config, context));
.nameEventSources(new InformerEventSource<>(config, context));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public StandaloneDependentTestReconciler() {
public Map<String, EventSource> prepareEventSources(
EventSourceContext<StandaloneDependentTestCustomResource> context) {
return EventSourceInitializer
.defaultNamedEventSources(deploymentDependent.initEventSource(context));
.nameEventSources(deploymentDependent.initEventSource(context));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public UpdateControl<MySQLSchema> reconcile(MySQLSchema schema, Context<MySQLSch
Secret secret = context.getSecondaryResource(Secret.class).orElseThrow();

return context.getSecondaryResource(Schema.class, SchemaDependentResource.NAME).map(s -> {
updateStatusPojo(schema, secret.getMetadata().getName(),
updateStatusPojo(schema, s, secret.getMetadata().getName(),
decode(secret.getData().get(MYSQL_SECRET_USERNAME)));
log.info("Schema {} created - updating CR status", s.getName());
return UpdateControl.updateStatus(schema);
Expand All @@ -60,15 +60,16 @@ public ErrorStatusUpdateControl<MySQLSchema> updateErrorStatus(MySQLSchema schem
}


private void updateStatusPojo(MySQLSchema schema, String secretName, String userName) {
private void updateStatusPojo(MySQLSchema mySQLSchema, Schema schema, String secretName,
String userName) {
SchemaStatus status = new SchemaStatus();
status.setUrl(
format(
"jdbc:mysql://%1$s/%2$s",
System.getenv("MYSQL_HOST"), schema.getMetadata().getName()));
System.getenv("MYSQL_HOST"), schema.getName()));
status.setUserName(userName);
status.setSecretName(secretName);
status.setStatus("CREATED");
schema.setStatus(status);
mySQLSchema.setStatus(status);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public Map<String, EventSource> prepareEventSources(EventSourceContext<Webapp> c
.withAssociatedSecondaryResourceIdentifier(tomcatFromWebAppSpec)
.build();
return EventSourceInitializer
.defaultNamedEventSources(new InformerEventSource<>(configuration, context));
.nameEventSources(new InformerEventSource<>(configuration, context));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public Map<String, EventSource> prepareEventSources(EventSourceContext<WebPage>
new InformerEventSource<>(InformerConfiguration.from(context, Service.class)
.withLabelSelector(LOW_LEVEL_LABEL_KEY)
.build(), context);
return EventSourceInitializer.defaultNamedEventSources(configMapEventSource,
return EventSourceInitializer.nameEventSources(configMapEventSource,
deploymentEventSource,
serviceEventSource);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public WebPageStandaloneDependentsReconciler(KubernetesClient kubernetesClient)

@Override
public Map<String, EventSource> prepareEventSources(EventSourceContext<WebPage> context) {
return EventSourceInitializer.defaultNamedEventSources(configMapDR.initEventSource(context),
return EventSourceInitializer.nameEventSources(configMapDR.initEventSource(context),
deploymentDR.initEventSource(context), serviceDR.initEventSource(context));
}

Expand Down