Skip to content

Commit

Permalink
[release-1.14] Introduce a ENABLE_VIRTUAL_THREADS env variable to e…
Browse files Browse the repository at this point in the history
…nable virtual threads, default to real threads for now (#3887)

* Fix profiler job

Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com>

* Use async logger

Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com>

* Use real threads for profiling job

Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com>

* Info level logging for profiler job

Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com>

* Format java

Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com>

* Artifact log per event

Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com>

* Enable virtual threads flag

Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com>

* Use and handle consumer wakeup

Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com>

* Allow closing consumer only once

Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com>

---------

Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com>
Co-authored-by: Pierangelo Di Pilato <pierdipi@redhat.com>
  • Loading branch information
knative-prow-robot and pierDipi committed May 8, 2024
1 parent 1495831 commit b90b525
Show file tree
Hide file tree
Showing 11 changed files with 185 additions and 16 deletions.
12 changes: 10 additions & 2 deletions .github/workflows/knative-profile-data-plane.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,18 @@ jobs:
./data-plane/profiler/run.sh || exit 1
ls -al
- uses: actions/upload-artifact@v2
- uses: actions/upload-artifact@v4
with:
name: profile-${{ matrix.event }}-receiver.html
path: profile-${{ matrix.event }}-receiver.html

- uses: actions/upload-artifact@v2
- uses: actions/upload-artifact@v4
with:
name: profile-${{ matrix.event }}-dispatcher.html
path: profile-${{ matrix.event }}-dispatcher.html

- uses: actions/upload-artifact@v4
if: always()
with:
name: logs-${{ matrix.event }}
path: /tmp/eventing-kafka-broker-logs/profiler/
62 changes: 62 additions & 0 deletions data-plane/dispatcher-loom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,68 @@
</container>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>${maven.shade.plugin.version}</version>
<configuration>
<minimizeJar>true</minimizeJar>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<!-- This merges all the META-INF/services -->
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>dev.knative.eventing.kafka.broker.dispatcherloom.Main</mainClass>
</transformer>
</transformers>
<filters>
<filter>
<artifact>*:*</artifact>
<excludeDefaults>false</excludeDefaults>
</filter>
<filter>
<artifact>net.logstash.logback:logstash-logback-encoder</artifact>
<includes>
<include>**</include>
</includes>
</filter>
<filter>
<artifact>ch.qos.logback:logback-core</artifact>
<includes>
<include>**</include>
</includes>
</filter>
<filter>
<artifact>ch.qos.logback:logback-classic</artifact>
<includes>
<include>**</include>
</includes>
</filter>
<filter>
<artifact>org.apache.kafka:kafka-clients</artifact>
<includes>
<include>**</include>
</includes>
</filter>
<filter>
<artifact>io.fabric8:kubernetes-client</artifact>
<includes>
<include>**</include>
</includes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,19 @@ public class LoomKafkaConsumer<K, V> implements ReactiveKafkaConsumer<K, V> {
private final BlockingQueue<Runnable> taskQueue;
private final AtomicBoolean isClosed;
private final Thread taskRunnerThread;
private final Promise<Void> closePromise = Promise.promise();

public LoomKafkaConsumer(Vertx vertx, Consumer<K, V> consumer) {
this.consumer = consumer;
this.taskQueue = new LinkedBlockingQueue<>();
this.isClosed = new AtomicBoolean(false);

this.taskRunnerThread = Thread.ofVirtual().start(this::processTaskQueue);
if (Boolean.parseBoolean(System.getenv("ENABLE_VIRTUAL_THREADS"))) {
this.taskRunnerThread = Thread.ofVirtual().start(this::processTaskQueue);
} else {
this.taskRunnerThread = new Thread(this::processTaskQueue);
this.taskRunnerThread.start();
}
}

private void addTask(Runnable task, Promise<?> promise) {
Expand Down Expand Up @@ -92,19 +98,21 @@ public Future<Map<TopicPartition, OffsetAndMetadata>> commit(Map<TopicPartition,

@Override
public Future<Void> close() {
if (!this.isClosed.compareAndSet(false, true)) {
return closePromise.future();
}

final Promise<Void> promise = Promise.promise();
taskQueue.add(() -> {
try {
logger.debug("Closing underlying Kafka consumer client");
consumer.wakeup();
consumer.close();
} catch (Exception e) {
promise.tryFail(e);
closePromise.tryFail(e);
}
});

logger.debug("Closing consumer {}", keyValue("size", taskQueue.size()));
isClosed.set(true);

Thread.ofVirtual().start(() -> {
try {
Expand All @@ -116,7 +124,7 @@ public Future<Void> close() {

taskRunnerThread.interrupt();
taskRunnerThread.join();
promise.tryComplete();
closePromise.tryComplete();

logger.debug("Background thread completed");

Expand All @@ -126,11 +134,11 @@ public Future<Void> close() {
"Interrupted while waiting for taskRunnerThread to finish {}",
keyValue("taskQueueSize", size),
e);
promise.tryFail(new InterruptedException("taskQueue.size = " + size + ". " + e.getMessage()));
closePromise.tryFail(new InterruptedException("taskQueue.size = " + size + ". " + e.getMessage()));
}
});

return promise.future();
return closePromise.future();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -143,7 +144,7 @@ private void poll() {
.poll(POLLING_TIMEOUT)
.onSuccess(records -> vertx.runOnContext(v -> this.recordsHandler(records)))
.onFailure(t -> {
if (this.closed.get()) {
if (this.closed.get() || t instanceof WakeupException) {
// The failure might have been caused by stopping the consumer, so we just ignore it
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.errors.WakeupException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -86,7 +87,7 @@ private synchronized void poll() {
return;
}
if (inFlightRecords.get() >= getConsumerVerticleContext().getMaxPollRecords()) {
logger.info(
logger.debug(
"In flight records exceeds " + ConsumerConfig.MAX_POLL_RECORDS_CONFIG
+ " waiting for response from subscriber before polling for new records {} {} {}",
keyValue(
Expand All @@ -101,6 +102,10 @@ private synchronized void poll() {
.poll(POLL_TIMEOUT)
.onSuccess(records -> vertx.runOnContext(v -> this.handleRecords(records)))
.onFailure(cause -> {
if (cause instanceof WakeupException) {
return; // Do nothing we're shutting down
}

isPollInFlight.set(false);
logger.error(
"Failed to poll messages {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,10 @@
public class Main {

static {
System.setProperty("logback.configurationFile", "/etc/logging/config.xml");
if (System.getProperty("logback.configurationFile") == null
|| System.getProperty("logback.configurationFile").isEmpty()) {
System.setProperty("logback.configurationFile", "/etc/logging/config.xml");
}
}

private static final Logger logger = LoggerFactory.getLogger(Main.class);
Expand Down
7 changes: 6 additions & 1 deletion data-plane/profiler/resources/config-logging.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,12 @@
<appender name="jsonConsoleAppender" class="ch.qos.logback.core.ConsoleAppender">
<encoder class="net.logstash.logback.encoder.LogstashEncoder"/>
</appender>
<appender name="async" class="ch.qos.logback.classic.AsyncAppender">
<appender-ref ref="jsonConsoleAppender" />
<neverBlock>true</neverBlock>
<maxFlushTime>1000</maxFlushTime>
</appender>
<root level="INFO">
<appender-ref ref="jsonConsoleAppender"/>
<appender-ref ref="async"/>
</root>
</configuration>
6 changes: 6 additions & 0 deletions data-plane/profiler/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ export METRICS_PUBLISH_QUANTILES="false"
export EGRESSES_INITIAL_CAPACITY="1"
export HTTP2_DISABLE="true"
export WAIT_STARTUP_SECONDS="8"
export CONFIG_FEATURES_PATH=""
export ENABLE_VIRTUAL_THREADS="true"

# Define receiver specific env variables.
export SERVICE_NAME="kafka-broker-receiver"
Expand All @@ -109,6 +111,8 @@ export INSTANCE_ID="receiver"
java \
-XX:+UnlockDiagnosticVMOptions \
-XX:+DebugNonSafepoints \
-XX:+EnableDynamicAgentLoading \
-Djdk.tracePinnedThreads=full \
-Dlogback.configurationFile="${RESOURCES_DIR}"/config-logging.xml \
-jar "${PROJECT_ROOT_DIR}"/receiver-vertx/target/receiver-vertx-1.0-SNAPSHOT.jar >"${LOG_DIR}/receiver.log" &
receiver_pid=$!
Expand All @@ -125,6 +129,8 @@ export INSTANCE_ID="dispatcher"
java \
-XX:+UnlockDiagnosticVMOptions \
-XX:+DebugNonSafepoints \
-XX:+EnableDynamicAgentLoading \
-Djdk.tracePinnedThreads=full \
-Dlogback.configurationFile="${RESOURCES_DIR}"/config-logging.xml \
-jar "${PROJECT_ROOT_DIR}"/dispatcher-vertx/target/dispatcher-vertx-1.0-SNAPSHOT.jar >"${LOG_DIR}/dispatcher.log" &
dispatcher_pid=$!
Expand Down
63 changes: 63 additions & 0 deletions data-plane/receiver-loom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,69 @@
</container>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>${maven.shade.plugin.version}</version>
<configuration>
<minimizeJar>true</minimizeJar>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<!-- This merges all the META-INF/services -->
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>dev.knative.eventing.kafka.broker.receiverloom.Main</mainClass>
</transformer>
</transformers>
<filters>
<filter>
<artifact>*:*</artifact>
<excludeDefaults>false</excludeDefaults>
</filter>
<filter>
<artifact>net.logstash.logback:logstash-logback-encoder</artifact>
<includes>
<include>**</include>
</includes>
</filter>
<filter>
<artifact>ch.qos.logback:logback-core</artifact>
<includes>
<include>**</include>
</includes>
</filter>
<filter>
<artifact>ch.qos.logback:logback-classic</artifact>
<includes>
<include>**</include>
</includes>
</filter>
<filter>
<artifact>org.apache.kafka:kafka-clients</artifact>
<includes>
<include>**</include>
</includes>
</filter>
<filter>
<artifact>io.fabric8:kubernetes-client</artifact>
<includes>
<include>**</include>
</includes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,12 @@ public LoomKafkaProducer(Vertx v, Producer<K, V> producer) {
this.tracer = null;
}

sendFromQueueThread = Thread.ofVirtual().start(this::sendFromQueue);
if (Boolean.parseBoolean(System.getenv("ENABLE_VIRTUAL_THREADS"))) {
this.sendFromQueueThread = Thread.ofVirtual().start(this::sendFromQueue);
} else {
this.sendFromQueueThread = new Thread(this::sendFromQueue);
this.sendFromQueueThread.start();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,10 @@
public class Main {

static {
System.setProperty("logback.configurationFile", "/etc/logging/config.xml");
if (System.getProperty("logback.configurationFile") == null
|| System.getProperty("logback.configurationFile").isEmpty()) {
System.setProperty("logback.configurationFile", "/etc/logging/config.xml");
}
}

private static final Logger logger = LoggerFactory.getLogger(Main.class);
Expand Down Expand Up @@ -117,7 +120,7 @@ public static void start(final String[] args, final ReactiveProducerFactory kafk
.toCompletionStage()
.toCompletableFuture()
.get();
} catch (ExecutionException ex) {
} catch (Exception ex) {
if (featuresConfig.isAuthenticationOIDC()) {
logger.error("Could not load OIDC config while OIDC authentication feature is enabled.");
throw ex;
Expand Down

0 comments on commit b90b525

Please sign in to comment.