Skip to content

Commit

Permalink
Merge pull request quarkusio#29897 from ozangunalp/kafka_native_conta…
Browse files Browse the repository at this point in the history
…iner_devservice

Dev Services for Kafka configuration for image providers
  • Loading branch information
ozangunalp committed Dec 16, 2022
2 parents 0125cfd + 0f3a938 commit d5f8007
Show file tree
Hide file tree
Showing 5 changed files with 207 additions and 49 deletions.
24 changes: 20 additions & 4 deletions docs/src/main/asciidoc/kafka-dev-services.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,32 @@ Note that the Kafka advertised address is automatically configured with the chos
[[configuring-the-image]]
== Configuring the image

Dev Services for Kafka supports https://redpanda.com[Redpanda] and https://strimzi.io[Strimzi] (in https://github.com/apache/kafka/blob/trunk/config/kraft/README.md[Kraft] mode).
Dev Services for Kafka supports https://redpanda.com[Redpanda], https://github/ozangunalp/kafka-native[kafka-native]
and https://strimzi.io[Strimzi] (in https://github.com/apache/kafka/blob/trunk/config/kraft/README.md[Kraft] mode) images.

Redpanda is a Kafka compatible event streaming platform.
Because it provides a faster startup time dev services defaults to `vectorized/redpanda` images.
**Redpanda** is a Kafka compatible event streaming platform.
Because it provides a fast startup times, dev services defaults to Redpanda images from `vectorized/redpanda`.
You can select any version from https://hub.docker.com/r/vectorized/redpanda.

Strimzi provides container images and Operators for running Apache Kafka on Kubernetes.
**kafka-native** provides images of standard Apache Kafka distribution compiled to native binary using Quarkus and GraalVM.
While still being _experimental_, it provides very fast startup times with small footprint.

Image type can be configured using

[source, properties]
----
quarkus.kafka.devservices.provider=kafka-native
----

**Strimzi** provides container images and Operators for running Apache Kafka on Kubernetes.
While Strimzi is optimized for Kubernetes, the images work perfectly in classic container environments.
Strimzi container images run "genuine" Kafka broker on JVM, which is slower to start.

[source, properties]
----
quarkus.kafka.devservices.provider=strimzi
----

For Strimzi, you can select any image with a Kafka version which has Kraft support (2.8.1 and higher) from https://quay.io/repository/strimzi-test-container/test-container?tab=tags

[source, properties]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,39 +225,53 @@ private RunningDevService startKafka(DockerStatusBuildItem dockerStatusBuildItem

// Starting the broker
final Supplier<RunningDevService> defaultKafkaBrokerSupplier = () -> {
if (config.imageName.contains("strimzi")) {
StrimziKafkaContainer container = new StrimziKafkaContainer(config.imageName)
.withBrokerId(1)
.withKraft()
.waitForRunning();
ConfigureUtil.configureSharedNetwork(container, "kafka");
if (config.serviceName != null) {
container.withLabel(DevServicesKafkaProcessor.DEV_SERVICE_LABEL, config.serviceName);
}
if (config.fixedExposedPort != 0) {
container.withPort(config.fixedExposedPort);
}
timeout.ifPresent(container::withStartupTimeout);

container.start();
return new RunningDevService(Feature.KAFKA_CLIENT.getName(),
container.getContainerId(),
container::close,
KAFKA_BOOTSTRAP_SERVERS, container.getBootstrapServers());
} else {
RedPandaKafkaContainer container = new RedPandaKafkaContainer(
DockerImageName.parse(config.imageName).asCompatibleSubstituteFor("vectorized/redpanda"),
config.fixedExposedPort,
launchMode.getLaunchMode() == LaunchMode.DEVELOPMENT ? config.serviceName : null,
useSharedNetwork, config.redpanda);
timeout.ifPresent(container::withStartupTimeout);
container.start();

return new RunningDevService(Feature.KAFKA_CLIENT.getName(),
container.getContainerId(),
container::close,
KAFKA_BOOTSTRAP_SERVERS, container.getBootstrapServers());
switch (config.provider) {
case REDPANDA:
RedPandaKafkaContainer redpanda = new RedPandaKafkaContainer(
DockerImageName.parse(config.imageName).asCompatibleSubstituteFor("vectorized/redpanda"),
config.fixedExposedPort,
launchMode.getLaunchMode() == LaunchMode.DEVELOPMENT ? config.serviceName : null,
useSharedNetwork, config.redpanda);
timeout.ifPresent(redpanda::withStartupTimeout);
redpanda.start();

return new RunningDevService(Feature.KAFKA_CLIENT.getName(),
redpanda.getContainerId(),
redpanda::close,
KAFKA_BOOTSTRAP_SERVERS, redpanda.getBootstrapServers());
case STRIMZI:
StrimziKafkaContainer strimzi = new StrimziKafkaContainer(config.imageName)
.withBrokerId(1)
.withKraft()
.waitForRunning();
ConfigureUtil.configureSharedNetwork(strimzi, "kafka");
if (config.serviceName != null) {
strimzi.withLabel(DevServicesKafkaProcessor.DEV_SERVICE_LABEL, config.serviceName);
}
if (config.fixedExposedPort != 0) {
strimzi.withPort(config.fixedExposedPort);
}
timeout.ifPresent(strimzi::withStartupTimeout);

strimzi.start();
return new RunningDevService(Feature.KAFKA_CLIENT.getName(),
strimzi.getContainerId(),
strimzi::close,
KAFKA_BOOTSTRAP_SERVERS, strimzi.getBootstrapServers());
case KAFKA_NATIVE:
KafkaNativeContainer kafkaNative = new KafkaNativeContainer(DockerImageName.parse(config.imageName),
config.fixedExposedPort,
launchMode.getLaunchMode() == LaunchMode.DEVELOPMENT ? config.serviceName : null,
useSharedNetwork);
timeout.ifPresent(kafkaNative::withStartupTimeout);
kafkaNative.start();

return new RunningDevService(Feature.KAFKA_CLIENT.getName(),
kafkaNative.getContainerId(),
kafkaNative::close,
KAFKA_BOOTSTRAP_SERVERS, kafkaNative.getBootstrapServers());
}
return null;
};

return maybeContainerAddress
Expand Down Expand Up @@ -300,11 +314,15 @@ private static final class KafkaDevServiceCfg {
private final String serviceName;
private final Map<String, Integer> topicPartitions;
private final Duration topicPartitionsTimeout;

private final KafkaDevServicesBuildTimeConfig.Provider provider;

private final RedPandaBuildTimeConfig redpanda;

public KafkaDevServiceCfg(KafkaDevServicesBuildTimeConfig config) {
this.devServicesEnabled = config.enabled.orElse(true);
this.imageName = config.imageName;
this.provider = config.provider;
this.imageName = config.imageName.orElseGet(provider::getDefaultImageName);
this.fixedExposedPort = config.port.orElse(0);
this.shared = config.shared;
this.serviceName = config.serviceName;
Expand All @@ -323,13 +341,15 @@ public boolean equals(Object o) {
return false;
}
KafkaDevServiceCfg that = (KafkaDevServiceCfg) o;
return devServicesEnabled == that.devServicesEnabled && Objects.equals(imageName, that.imageName)
return devServicesEnabled == that.devServicesEnabled
&& Objects.equals(provider, that.provider)
&& Objects.equals(imageName, that.imageName)
&& Objects.equals(fixedExposedPort, that.fixedExposedPort);
}

@Override
public int hashCode() {
return Objects.hash(devServicesEnabled, imageName, fixedExposedPort);
return Objects.hash(devServicesEnabled, provider, imageName, fixedExposedPort);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,25 +28,47 @@ public class KafkaDevServicesBuildTimeConfig {
public Optional<Integer> port;

/**
* The Kafka container image to use.
* <p>
* Only Redpanda and Strimzi images are supported.
* Default image is Redpanda.
* Kafka dev service container type.
* <p>
* Note that Strimzi images are launched in Kraft mode.
* In order to use a Strimzi image you need to set a compatible image name such as
* {@code quay.io/strimzi-test-container/test-container:0.100.0-kafka-3.1.0} or
* {@code quay.io/strimzi/kafka:0.27.1-kafka-3.0.0}
* Redpanda, Strimzi and kafka-native container providers are supported. Default is redpanda.
* <p>
* For Redpanda:
* See https://vectorized.io/docs/quick-start-docker/ and https://hub.docker.com/r/vectorized/redpanda
* <p>
* For Strimzi:
* See https://github.com/strimzi/test-container and https://quay.io/repository/strimzi-test-container/test-container
* <p>
* For Kafka Native:
* See https://github.com/ozangunalp/kafka-native and https://quay.io/repository/ogunalp/kafka-native
* <p>
* Note that Strimzi and Kafka Native images are launched in Kraft mode.
*/
@ConfigItem(defaultValue = "docker.io/vectorized/redpanda:v22.3.4")
public String imageName;
@ConfigItem(defaultValue = "redpanda")
public Provider provider = Provider.REDPANDA;

public enum Provider {
REDPANDA("docker.io/vectorized/redpanda:v22.3.4"),
STRIMZI("quay.io/strimzi-test-container/test-container:latest-kafka-3.2.1"),
KAFKA_NATIVE("quay.io/ogunalp/kafka-native:latest");

private final String defaultImageName;

Provider(String imageName) {
this.defaultImageName = imageName;
}

public String getDefaultImageName() {
return defaultImageName;
}
}

/**
* The Kafka container image to use.
* <p>
* Dependent on the provider.
*/
@ConfigItem
public Optional<String> imageName;

/**
* Indicates if the Kafka broker managed by Quarkus Dev Services is shared.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package io.quarkus.kafka.client.deployment;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;

import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.images.builder.Transferable;
import org.testcontainers.utility.DockerImageName;

import com.github.dockerjava.api.command.InspectContainerResponse;

import io.quarkus.devservices.common.ConfigureUtil;

public class KafkaNativeContainer extends GenericContainer<KafkaNativeContainer> {

private static final String STARTER_SCRIPT = "/work/run.sh";

private final Integer fixedExposedPort;
private final boolean useSharedNetwork;

private String additionalArgs = null;
private int exposedPort = -1;

private String hostName = null;

public KafkaNativeContainer(DockerImageName dockerImageName, int fixedExposedPort, String serviceName,
boolean useSharedNetwork) {
super(dockerImageName);
this.fixedExposedPort = fixedExposedPort;
this.useSharedNetwork = useSharedNetwork;
if (serviceName != null) {
withLabel(DevServicesKafkaProcessor.DEV_SERVICE_LABEL, serviceName);
}
String cmd = String.format("while [ ! -f %s ]; do sleep 0.1; done; sleep 0.1; %s", STARTER_SCRIPT, STARTER_SCRIPT);
withCommand("sh", "-c", cmd);
waitingFor(Wait.forLogMessage(".*Kafka broker started.*", 1));
}

@Override
protected void containerIsStarting(InspectContainerResponse containerInfo, boolean reused) {
super.containerIsStarting(containerInfo, reused);
// Set exposed port
this.exposedPort = getMappedPort(DevServicesKafkaProcessor.KAFKA_PORT);
// follow output
// Start and configure the advertised address
String cmd = "#!/bin/bash\n";
cmd += "/work/kafka";
cmd += " -Dkafka.advertised.listeners=" + getBootstrapServers();
if (useSharedNetwork) {
cmd += " -Dkafka.listeners=BROKER://:9093,PLAINTEXT://:9092,CONTROLLER://:9094";
cmd += " -Dkafka.interbroker.listener.name=BROKER";
cmd += " -Dkafka.controller.listener.names=CONTROLLER";
cmd += " -Dkafka.listener.security.protocol.map=BROKER:PLAINTEXT,CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT";
cmd += " -Dkafka.early.start.listeners=BROKER,CONTROLLER,PLAINTEXT";
}
if (additionalArgs != null) {
cmd += " " + additionalArgs;
}

//noinspection OctalInteger
copyFileToContainer(
Transferable.of(cmd.getBytes(StandardCharsets.UTF_8), 0777),
STARTER_SCRIPT);
}

private String getKafkaAdvertisedListeners() {
List<String> addresses = new ArrayList<>();
if (useSharedNetwork) {
addresses.add(String.format("BROKER://%s:9093", hostName));
}
// See https://github.com/quarkusio/quarkus/issues/21819
// Kafka is always exposed to the Docker host network
addresses.add(String.format("PLAINTEXT://%s:%d", getHost(), getExposedKafkaPort()));
return String.join(",", addresses);
}

public int getExposedKafkaPort() {
return exposedPort;
}

@Override
protected void configure() {
super.configure();

addExposedPort(DevServicesKafkaProcessor.KAFKA_PORT);
hostName = ConfigureUtil.configureSharedNetwork(this, "kafka");

if (fixedExposedPort != null) {
addFixedExposedPort(fixedExposedPort, DevServicesKafkaProcessor.KAFKA_PORT);
}
}

public String getBootstrapServers() {
return getKafkaAdvertisedListeners();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ quarkus.log.category.\"org.apache.zookeeper\".level=WARN
# enable health check
quarkus.kafka.health.enabled=true

quarkus.kafka.devservices.provider=kafka-native
quarkus.kafka.devservices.topic-partitions.test=2
quarkus.kafka.devservices.topic-partitions.test-consumer=3
quarkus.kafka.devservices.topic-partitions-timeout=4S

0 comments on commit d5f8007

Please sign in to comment.