Skip to content

Commit

Permalink
feat: API to check if next reconciliation is imminent (#2272)
Browse files Browse the repository at this point in the history
Signed-off-by: Attila Mészáros <csviri@gmail.com>
  • Loading branch information
csviri committed Mar 11, 2024
1 parent 7ef7bf8 commit a9b7bf7
Show file tree
Hide file tree
Showing 7 changed files with 180 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,5 +51,18 @@ <R> Optional<R> getSecondaryResource(Class<R> expectedType,
* @return the {@link IndexerResourceCache} associated with the associated {@link Reconciler} for
* this context
*/
@SuppressWarnings("unused")
IndexedResourceCache<P> getPrimaryCache();

/**
* Determines whether a new reconciliation will be triggered right after the current
* reconciliation is finished. This allows to optimize certain situations, helping avoid unneeded
* API calls. A reconciler might, for example, skip updating the status when it's known another
* reconciliation is already scheduled, which would in turn trigger another status update, thus
* rendering the current one moot.
*
* @return {@code true} is another reconciliation is already scheduled, {@code false} otherwise
**/
boolean isNextReconciliationImminent();

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.javaoperatorsdk.operator.api.reconciler.dependent.managed.ManagedDependentResourceContext;
import io.javaoperatorsdk.operator.processing.Controller;
import io.javaoperatorsdk.operator.processing.event.EventSourceRetriever;
import io.javaoperatorsdk.operator.processing.event.ResourceID;

public class DefaultContext<P extends HasMetadata> implements Context<P> {

Expand Down Expand Up @@ -45,6 +46,12 @@ public IndexedResourceCache<P> getPrimaryCache() {
return controller.getEventSourceManager().getControllerResourceEventSource();
}

@Override
public boolean isNextReconciliationImminent() {
return controller.getEventProcessor()
.isNextReconciliationImminent(ResourceID.fromResource(primaryResource));
}

@Override
public <R> Stream<R> getSecondaryResourcesAsStream(Class<R> expectedType) {
return controller.getEventSourceManager().getResourceEventSourcesFor(expectedType).stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,10 @@ public void start() throws OperatorException {
handleAlreadyMarkedEvents();
}

public boolean isNextReconciliationImminent(ResourceID resourceID) {
return resourceStateManager.getOrCreate(resourceID).eventPresent();
}

private void handleAlreadyMarkedEvents() {
for (var state : resourceStateManager.resourcesWithEventPresent()) {
handleMarkedEventForResource(state);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package io.javaoperatorsdk.operator;

import java.time.Duration;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
import io.javaoperatorsdk.operator.junit.LocallyRunOperatorExtension;
import io.javaoperatorsdk.operator.sample.nextreconciliationimminent.NextReconciliationImminentCustomResource;
import io.javaoperatorsdk.operator.sample.nextreconciliationimminent.NextReconciliationImminentReconciler;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

public class NextReconciliationImminentIT {

private static final Logger log =
LoggerFactory.getLogger(NextReconciliationImminentIT.class);

public static final int WAIT_FOR_EVENT = 300;
public static final String TEST_RESOURCE_NAME = "test1";

@RegisterExtension
LocallyRunOperatorExtension extension =
LocallyRunOperatorExtension.builder()
.withReconciler(new NextReconciliationImminentReconciler())
.build();

@Test
void skippingStatusUpdateWithNextReconciliationImminent() throws InterruptedException {
var resource = extension.create(testResource());

var reconciler = extension.getReconcilerOfType(NextReconciliationImminentReconciler.class);
await().untilAsserted(() -> assertThat(reconciler.isReconciliationWaiting()).isTrue());
Thread.sleep(WAIT_FOR_EVENT);

resource.getMetadata().getAnnotations().put("trigger", "" + System.currentTimeMillis());
extension.replace(resource);
Thread.sleep(WAIT_FOR_EVENT);
log.info("Made change to trigger event");

reconciler.allowReconciliationToProceed();
Thread.sleep(WAIT_FOR_EVENT);
// second event arrived
await().untilAsserted(() -> assertThat(reconciler.isReconciliationWaiting()).isTrue());
reconciler.allowReconciliationToProceed();

await().pollDelay(Duration.ofMillis(WAIT_FOR_EVENT)).untilAsserted(() -> {
assertThat(extension.get(NextReconciliationImminentCustomResource.class, TEST_RESOURCE_NAME)
.getStatus().getUpdateNumber()).isEqualTo(1);
});
}


NextReconciliationImminentCustomResource testResource() {
var res = new NextReconciliationImminentCustomResource();
res.setMetadata(new ObjectMetaBuilder()
.withName(TEST_RESOURCE_NAME)
.build());
return res;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package io.javaoperatorsdk.operator.sample.nextreconciliationimminent;

import io.fabric8.kubernetes.api.model.Namespaced;
import io.fabric8.kubernetes.client.CustomResource;
import io.fabric8.kubernetes.model.annotation.Group;
import io.fabric8.kubernetes.model.annotation.ShortNames;
import io.fabric8.kubernetes.model.annotation.Version;

@Group("sample.javaoperatorsdk")
@Version("v1")
@ShortNames("nri")
public class NextReconciliationImminentCustomResource
extends CustomResource<Void, NextReconciliationImminentStatus>
implements Namespaced {



}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package io.javaoperatorsdk.operator.sample.nextreconciliationimminent;

import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.javaoperatorsdk.operator.api.reconciler.Context;
import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration;
import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;

@ControllerConfiguration(generationAwareEventProcessing = false)
public class NextReconciliationImminentReconciler
implements Reconciler<NextReconciliationImminentCustomResource> {

private static final Logger log =
LoggerFactory.getLogger(NextReconciliationImminentReconciler.class);

private final SynchronousQueue<Boolean> queue = new SynchronousQueue<>();
private volatile boolean reconciliationWaiting = false;

@Override
public UpdateControl<NextReconciliationImminentCustomResource> reconcile(
NextReconciliationImminentCustomResource resource,
Context<NextReconciliationImminentCustomResource> context) throws InterruptedException {
log.info("started reconciliation");
reconciliationWaiting = true;
// wait long enough to get manually allowed
queue.poll(120, TimeUnit.SECONDS);
log.info("Continue after wait");
reconciliationWaiting = false;

if (context.isNextReconciliationImminent()) {
return UpdateControl.noUpdate();
} else {
if (resource.getStatus() == null) {
resource.setStatus(new NextReconciliationImminentStatus());
}
resource.getStatus().setUpdateNumber(resource.getStatus().getUpdateNumber() + 1);
log.info("Patching status");
return UpdateControl.patchStatus(resource);
}
}

public void allowReconciliationToProceed() {
try {
queue.put(true);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

public boolean isReconciliationWaiting() {
return reconciliationWaiting;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package io.javaoperatorsdk.operator.sample.nextreconciliationimminent;

public class NextReconciliationImminentStatus {

private int updateNumber;

public int getUpdateNumber() {
return updateNumber;
}

public void setUpdateNumber(int updateNumber) {
this.updateNumber = updateNumber;
}
}

0 comments on commit a9b7bf7

Please sign in to comment.