Skip to content

Commit

Permalink
[release-v1.12] Backport flaky tests fixes (#1071)
Browse files Browse the repository at this point in the history
* Use atomics for ReferenceCounting

Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com>

* Log consumer context when sending events

Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com>

* Improve LoomKafkaProducer (logging, resource usage)

Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com>

* Detect audience and identity SA changes and refactor sender and token provider

Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com>

* Increase timeout

Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com>

* Async appender

Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com>

* Migrate to async appender even in official configuration

Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com>

* Migrate to use async appender by default

Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com>

* Make generate release

Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com>

---------

Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com>
  • Loading branch information
pierDipi committed Apr 30, 2024
1 parent 9a337c9 commit b6ee2d1
Show file tree
Hide file tree
Showing 15 changed files with 151 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,12 @@ data:
<appender name="jsonConsoleAppender" class="ch.qos.logback.core.ConsoleAppender">
<encoder class="net.logstash.logback.encoder.LogstashEncoder"/>
</appender>
<appender name="async" class="ch.qos.logback.classic.AsyncAppender">
<appender-ref ref="jsonConsoleAppender" />
<neverBlock>true</neverBlock>
<maxFlushTime>1000</maxFlushTime>
</appender>
<root level="INFO">
<appender-ref ref="jsonConsoleAppender"/>
<appender-ref ref="async"/>
</root>
</configuration>
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import dev.knative.eventing.kafka.broker.core.NamespacedName;
import io.fabric8.kubernetes.api.model.authentication.TokenRequest;
import io.fabric8.kubernetes.api.model.authentication.TokenRequestBuilder;
import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.ConfigBuilder;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClientBuilder;
Expand All @@ -44,57 +43,68 @@ public class TokenProvider {

public TokenProvider(Vertx vertx) {
this.vertx = vertx;
Config clientConfig = new ConfigBuilder().build();
kubernetesClient =
new KubernetesClientBuilder().withConfig(clientConfig).build();

this.kubernetesClient = new KubernetesClientBuilder()
.withConfig(new ConfigBuilder().build())
.build();
this.tokenCache = CacheBuilder.newBuilder()
.expireAfterWrite(CACHE_EXPIRATION_TIME_SECONDS, TimeUnit.SECONDS)
.maximumSize(CACHE_MAXIMUM_SIZE)
.initialCapacity(2)
.build();
}

public Future<String> getToken(NamespacedName serviceAccount, String audience) {
String cacheKey = generateCacheKey(serviceAccount, audience);
String token = tokenCache.getIfPresent(cacheKey);
final var cacheKey = generateCacheKey(serviceAccount, audience);

final var token = tokenCache.getIfPresent(cacheKey);
if (token != null) {
return Future.succeededFuture(token);
} else {
return requestToken(serviceAccount, audience).onSuccess(t -> {
if (t != null) {
tokenCache.put(cacheKey, t);
}
});
}

return requestToken(serviceAccount, audience).onSuccess(t -> {
if (t != null) {
tokenCache.put(cacheKey, t);
}
});
}

private Future<String> requestToken(NamespacedName serviceAccount, String audience) {
return this.vertx.executeBlocking(
promise -> {
TokenRequest tokenRequest = new TokenRequestBuilder()
.withNewSpec()
.withAudiences(audience)
.withExpirationSeconds(TOKEN_EXPIRATION_SECONDS)
.endSpec()
.build();
try {
final var builder = new TokenRequestBuilder()
.withNewSpec()
.withAudiences(audience)
.withExpirationSeconds(TOKEN_EXPIRATION_SECONDS)
.endSpec()
.build();

tokenRequest = kubernetesClient
.serviceAccounts()
.inNamespace(serviceAccount.namespace())
.withName(serviceAccount.name())
.tokenRequest(tokenRequest);
final var tokenRequest = kubernetesClient
.serviceAccounts()
.inNamespace(serviceAccount.namespace())
.withName(serviceAccount.name())
.tokenRequest(builder);

if (tokenRequest != null && tokenRequest.getStatus() != null) {
promise.complete(tokenRequest.getStatus().getToken());
} else {
promise.fail("could not request token for " + serviceAccount.name() + "/"
+ serviceAccount.namespace());
if (isValidTokenRequest(tokenRequest)) {
promise.tryComplete(tokenRequest.getStatus().getToken());
} else {
promise.tryFail("could not request token for " + serviceAccount.name() + "/"
+ serviceAccount.namespace());
}
} catch (final RuntimeException exception) {
promise.tryFail(exception);
}
},
false);
}

private static boolean isValidTokenRequest(final TokenRequest tokenRequest) {
return tokenRequest != null
&& tokenRequest.getStatus() != null
&& tokenRequest.getStatus().getToken() != null
&& !tokenRequest.getStatus().getToken().isBlank();
}

private String generateCacheKey(NamespacedName serviceAccount, String audience) {
return serviceAccount.namespace() + "/" + serviceAccount.name() + "/" + audience;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,11 +268,17 @@ private static boolean egressEquals(DataPlaneContract.Egress e1, DataPlaneContra
&& Objects.equals(e1.getKeyType(), e2.getKeyType())
&& Objects.equals(e1.getDialectedFilterList(), e2.getDialectedFilterList())
&& Objects.equals(e1.getDestinationCACerts(), e2.getDestinationCACerts())
&& Objects.equals(e1.getDestinationAudience(), e2.getDestinationAudience())
&& Objects.equals(e1.getReplyUrlCACerts(), e2.getReplyUrlCACerts())
&& Objects.equals(e1.getReplyUrlAudience(), e2.getReplyUrlAudience())
&& Objects.equals(e1.getOidcServiceAccountName(), e2.getOidcServiceAccountName())
&& Objects.equals(e1.getReference(), e2.getReference())
&& Objects.equals(
e1.getEgressConfig().getDeadLetterCACerts(),
e2.getEgressConfig().getDeadLetterCACerts());
e2.getEgressConfig().getDeadLetterCACerts())
&& Objects.equals(
e1.getEgressConfig().getDeadLetterAudience(),
e2.getEgressConfig().getDeadLetterAudience());
}

private static boolean trustBundlesEquals(Set<String> oldTrustBundles, Set<String> newTrustBundles) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package dev.knative.eventing.kafka.broker.core.utils;

import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;

/**
* Thread unsafe holder with reference counter.
Expand All @@ -25,12 +26,12 @@
public class ReferenceCounter<T> {

private final T value;
private int refs;
private final AtomicInteger refs;

public ReferenceCounter(final T value) {
Objects.requireNonNull(value);
this.value = value;
this.refs = 0;
this.refs = new AtomicInteger(0);
}

/**
Expand All @@ -44,14 +45,13 @@ public T getValue() {
* Increment the ref count
*/
public void increment() {
this.refs++;
this.refs.incrementAndGet();
}

/**
* @return true if the count is 0, hence nobody is referring anymore to this value
*/
public boolean decrementAndCheck() {
this.refs--;
return this.refs == 0;
return this.refs.decrementAndGet() == 0;
}
}
12 changes: 12 additions & 0 deletions data-plane/dispatcher-loom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,18 @@
<include>**</include>
</includes>
</filter>
<filter>
<artifact>ch.qos.logback:logback-core</artifact>
<includes>
<include>**</include>
</includes>
</filter>
<filter>
<artifact>ch.qos.logback:logback-classic</artifact>
<includes>
<include>**</include>
</includes>
</filter>
<filter>
<artifact>org.apache.kafka:kafka-clients</artifact>
<includes>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ private void logError(final String msg, final ConsumerRecord<Object, CloudEvent>

if (logger.isDebugEnabled()) {
logger.error(
msg + " {} {} {} {} {} {} {}",
msg + " {} {} {} {} {} {}",
consumerVerticleContext.getLoggingKeyValue(),
keyValue("topic", record.topic()),
keyValue("partition", record.partition()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public WebClientCloudEventSender(
Objects.requireNonNull(vertx);
Objects.requireNonNull(client, "provide client");
Objects.requireNonNull(additionalTags, "provide additional tags");
if (target == null || target.equals("")) {
if (target == null || target.isEmpty()) {
throw new IllegalArgumentException("provide a target");
}
if (!URI.create(target).isAbsolute()) {
Expand All @@ -110,7 +110,8 @@ public Future<HttpResponse<Buffer>> send(final CloudEvent event) {

private Future<HttpResponse<Buffer>> send(final CloudEvent event, final int retryCounter) {
logger.debug(
"Sending event {} {} {}",
"Sending event {} {} {} {}",
consumerVerticleContext.getLoggingKeyValue(),
keyValue("id", event.getId()),
keyValue("subscriberURI", target),
keyValue("retry", retryCounter));
Expand Down
12 changes: 12 additions & 0 deletions data-plane/receiver-loom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,18 @@
<include>**</include>
</includes>
</filter>
<filter>
<artifact>ch.qos.logback:logback-core</artifact>
<includes>
<include>**</include>
</includes>
</filter>
<filter>
<artifact>ch.qos.logback:logback-classic</artifact>
<includes>
<include>**</include>
</includes>
</filter>
<filter>
<artifact>org.apache.kafka:kafka-clients</artifact>
<includes>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public class LoomKafkaProducer<K, V> implements ReactiveKafkaProducer<K, V> {
private final ProducerTracer<?> tracer;
private final VertxInternal vertx;
private final Thread sendFromQueueThread;
private final Promise<Void> closePromise = Promise.promise();

public LoomKafkaProducer(Vertx v, Producer<K, V> producer) {
Objects.requireNonNull(v, "Vertx cannot be null");
Expand Down Expand Up @@ -122,22 +123,31 @@ private void sendFromQueue() {

@Override
public Future<Void> close() {
final Promise<Void> promise = Promise.promise();
this.isClosed.set(true);
if (!this.isClosed.compareAndSet(false, true)) {
return closePromise.future();
}

logger.debug("Closing producer");

Thread.ofVirtual().start(() -> {
try {
while (!eventQueue.isEmpty()) {
logger.debug("Waiting for the eventQueue to become empty");
Thread.sleep(2000L);
}
logger.debug("Interrupting sendFromQueueThread thread");
sendFromQueueThread.interrupt();
logger.debug("Waiting for sendFromQueueThread thread to complete");
sendFromQueueThread.join();
logger.debug("Closing the producer");
producer.close();
promise.complete();
closePromise.complete();
} catch (Exception e) {
promise.fail(e);
closePromise.fail(e);
}
});
return promise.future();

return closePromise.future();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
Expand Down Expand Up @@ -83,10 +84,14 @@ public void testConcurrentRecordSendCountAndSequence() throws InterruptedExcepti
}

@Test
public void testSendAfterClose(VertxTestContext testContext) {
public void testSendAfterClose(VertxTestContext testContext) throws ExecutionException, InterruptedException {

// Close the producer before sending a record
producer.close().onFailure(testContext::failNow);
producer.close()
.onFailure(testContext::failNow)
.toCompletionStage()
.toCompletableFuture()
.get();

// Attempt to send a record after the producer is closed
ProducerRecord<String, Integer> record = new ProducerRecord<>("test", "sequence number", 123);
Expand All @@ -113,6 +118,7 @@ public void testCloseIsWaitingForEmptyQueue(VertxTestContext testContext) {
producer.send(record)
.onSuccess(ar -> {
checkpoints.flag();
assertTrue(producer.isSendFromQueueThreadAlive());
})
.onFailure(testContext::failNow);
}
Expand Down
24 changes: 24 additions & 0 deletions openshift/patches/add_maven_shade_plugin.patch
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,18 @@ index 318827975..53f64c956 100644
+ </includes>
+ </filter>
+ <filter>
+ <artifact>ch.qos.logback:logback-core</artifact>
+ <includes>
+ <include>**</include>
+ </includes>
+ </filter>
+ <filter>
+ <artifact>ch.qos.logback:logback-classic</artifact>
+ <includes>
+ <include>**</include>
+ </includes>
+ </filter>
+ <filter>
+ <artifact>org.apache.kafka:kafka-clients</artifact>
+ <includes>
+ <include>**</include>
Expand Down Expand Up @@ -102,6 +114,18 @@ index 10c4c94f2..c2e8a41cb 100644
+ </includes>
+ </filter>
+ <filter>
+ <artifact>ch.qos.logback:logback-core</artifact>
+ <includes>
+ <include>**</include>
+ </includes>
+ </filter>
+ <filter>
+ <artifact>ch.qos.logback:logback-classic</artifact>
+ <includes>
+ <include>**</include>
+ </includes>
+ </filter>
+ <filter>
+ <artifact>org.apache.kafka:kafka-clients</artifact>
+ <includes>
+ <include>**</include>
Expand Down
7 changes: 6 additions & 1 deletion openshift/release/artifacts/eventing-kafka-controller.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1487,8 +1487,13 @@ data:
<appender name="jsonConsoleAppender" class="ch.qos.logback.core.ConsoleAppender">
<encoder class="net.logstash.logback.encoder.LogstashEncoder"/>
</appender>
<appender name="async" class="ch.qos.logback.classic.AsyncAppender">
<appender-ref ref="jsonConsoleAppender" />
<neverBlock>true</neverBlock>
<maxFlushTime>1000</maxFlushTime>
</appender>
<root level="INFO">
<appender-ref ref="jsonConsoleAppender"/>
<appender-ref ref="async"/>
</root>
</configuration>
---
Expand Down
7 changes: 6 additions & 1 deletion openshift/release/artifacts/eventing-kafka.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1487,8 +1487,13 @@ data:
<appender name="jsonConsoleAppender" class="ch.qos.logback.core.ConsoleAppender">
<encoder class="net.logstash.logback.encoder.LogstashEncoder"/>
</appender>
<appender name="async" class="ch.qos.logback.classic.AsyncAppender">
<appender-ref ref="jsonConsoleAppender" />
<neverBlock>true</neverBlock>
<maxFlushTime>1000</maxFlushTime>
</appender>
<root level="INFO">
<appender-ref ref="jsonConsoleAppender"/>
<appender-ref ref="async"/>
</root>
</configuration>
---
Expand Down
Loading

0 comments on commit b6ee2d1

Please sign in to comment.