diff --git a/docs/src/main/asciidoc/kafka-dev-services.adoc b/docs/src/main/asciidoc/kafka-dev-services.adoc index 30588ba1e1381..e48283d851827 100644 --- a/docs/src/main/asciidoc/kafka-dev-services.adoc +++ b/docs/src/main/asciidoc/kafka-dev-services.adoc @@ -50,16 +50,32 @@ Note that the Kafka advertised address is automatically configured with the chos [[configuring-the-image]] == Configuring the image -Dev Services for Kafka supports https://redpanda.com[Redpanda] and https://strimzi.io[Strimzi] (in https://github.com/apache/kafka/blob/trunk/config/kraft/README.md[Kraft] mode). +Dev Services for Kafka supports https://redpanda.com[Redpanda], https://github/ozangunalp/kafka-native[kafka-native] +and https://strimzi.io[Strimzi] (in https://github.com/apache/kafka/blob/trunk/config/kraft/README.md[Kraft] mode) images. -Redpanda is a Kafka compatible event streaming platform. -Because it provides a faster startup time dev services defaults to `vectorized/redpanda` images. +**Redpanda** is a Kafka compatible event streaming platform. +Because it provides a fast startup times, dev services defaults to Redpanda images from `vectorized/redpanda`. You can select any version from https://hub.docker.com/r/vectorized/redpanda. -Strimzi provides container images and Operators for running Apache Kafka on Kubernetes. +**kafka-native** provides images of standard Apache Kafka distribution compiled to native binary using Quarkus and GraalVM. +While still being _experimental_, it provides very fast startup times with small footprint. + +Image type can be configured using + +[source, properties] +---- +quarkus.kafka.devservices.provider=kafka-native +---- + +**Strimzi** provides container images and Operators for running Apache Kafka on Kubernetes. While Strimzi is optimized for Kubernetes, the images work perfectly in classic container environments. Strimzi container images run "genuine" Kafka broker on JVM, which is slower to start. +[source, properties] +---- +quarkus.kafka.devservices.provider=strimzi +---- + For Strimzi, you can select any image with a Kafka version which has Kraft support (2.8.1 and higher) from https://quay.io/repository/strimzi-test-container/test-container?tab=tags [source, properties] diff --git a/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/DevServicesKafkaProcessor.java b/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/DevServicesKafkaProcessor.java index 03b704e56fe5e..88e167dc08b5d 100644 --- a/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/DevServicesKafkaProcessor.java +++ b/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/DevServicesKafkaProcessor.java @@ -225,39 +225,53 @@ private RunningDevService startKafka(DockerStatusBuildItem dockerStatusBuildItem // Starting the broker final Supplier defaultKafkaBrokerSupplier = () -> { - if (config.imageName.contains("strimzi")) { - StrimziKafkaContainer container = new StrimziKafkaContainer(config.imageName) - .withBrokerId(1) - .withKraft() - .waitForRunning(); - ConfigureUtil.configureSharedNetwork(container, "kafka"); - if (config.serviceName != null) { - container.withLabel(DevServicesKafkaProcessor.DEV_SERVICE_LABEL, config.serviceName); - } - if (config.fixedExposedPort != 0) { - container.withPort(config.fixedExposedPort); - } - timeout.ifPresent(container::withStartupTimeout); - - container.start(); - return new RunningDevService(Feature.KAFKA_CLIENT.getName(), - container.getContainerId(), - container::close, - KAFKA_BOOTSTRAP_SERVERS, container.getBootstrapServers()); - } else { - RedPandaKafkaContainer container = new RedPandaKafkaContainer( - DockerImageName.parse(config.imageName).asCompatibleSubstituteFor("vectorized/redpanda"), - config.fixedExposedPort, - launchMode.getLaunchMode() == LaunchMode.DEVELOPMENT ? config.serviceName : null, - useSharedNetwork, config.redpanda); - timeout.ifPresent(container::withStartupTimeout); - container.start(); - - return new RunningDevService(Feature.KAFKA_CLIENT.getName(), - container.getContainerId(), - container::close, - KAFKA_BOOTSTRAP_SERVERS, container.getBootstrapServers()); + switch (config.provider) { + case REDPANDA: + RedPandaKafkaContainer redpanda = new RedPandaKafkaContainer( + DockerImageName.parse(config.imageName).asCompatibleSubstituteFor("vectorized/redpanda"), + config.fixedExposedPort, + launchMode.getLaunchMode() == LaunchMode.DEVELOPMENT ? config.serviceName : null, + useSharedNetwork, config.redpanda); + timeout.ifPresent(redpanda::withStartupTimeout); + redpanda.start(); + + return new RunningDevService(Feature.KAFKA_CLIENT.getName(), + redpanda.getContainerId(), + redpanda::close, + KAFKA_BOOTSTRAP_SERVERS, redpanda.getBootstrapServers()); + case STRIMZI: + StrimziKafkaContainer strimzi = new StrimziKafkaContainer(config.imageName) + .withBrokerId(1) + .withKraft() + .waitForRunning(); + ConfigureUtil.configureSharedNetwork(strimzi, "kafka"); + if (config.serviceName != null) { + strimzi.withLabel(DevServicesKafkaProcessor.DEV_SERVICE_LABEL, config.serviceName); + } + if (config.fixedExposedPort != 0) { + strimzi.withPort(config.fixedExposedPort); + } + timeout.ifPresent(strimzi::withStartupTimeout); + + strimzi.start(); + return new RunningDevService(Feature.KAFKA_CLIENT.getName(), + strimzi.getContainerId(), + strimzi::close, + KAFKA_BOOTSTRAP_SERVERS, strimzi.getBootstrapServers()); + case KAFKA_NATIVE: + KafkaNativeContainer kafkaNative = new KafkaNativeContainer(DockerImageName.parse(config.imageName), + config.fixedExposedPort, + launchMode.getLaunchMode() == LaunchMode.DEVELOPMENT ? config.serviceName : null, + useSharedNetwork); + timeout.ifPresent(kafkaNative::withStartupTimeout); + kafkaNative.start(); + + return new RunningDevService(Feature.KAFKA_CLIENT.getName(), + kafkaNative.getContainerId(), + kafkaNative::close, + KAFKA_BOOTSTRAP_SERVERS, kafkaNative.getBootstrapServers()); } + return null; }; return maybeContainerAddress @@ -300,11 +314,15 @@ private static final class KafkaDevServiceCfg { private final String serviceName; private final Map topicPartitions; private final Duration topicPartitionsTimeout; + + private final KafkaDevServicesBuildTimeConfig.Provider provider; + private final RedPandaBuildTimeConfig redpanda; public KafkaDevServiceCfg(KafkaDevServicesBuildTimeConfig config) { this.devServicesEnabled = config.enabled.orElse(true); - this.imageName = config.imageName; + this.provider = config.provider; + this.imageName = config.imageName.orElseGet(provider::getDefaultImageName); this.fixedExposedPort = config.port.orElse(0); this.shared = config.shared; this.serviceName = config.serviceName; @@ -323,13 +341,15 @@ public boolean equals(Object o) { return false; } KafkaDevServiceCfg that = (KafkaDevServiceCfg) o; - return devServicesEnabled == that.devServicesEnabled && Objects.equals(imageName, that.imageName) + return devServicesEnabled == that.devServicesEnabled + && Objects.equals(provider, that.provider) + && Objects.equals(imageName, that.imageName) && Objects.equals(fixedExposedPort, that.fixedExposedPort); } @Override public int hashCode() { - return Objects.hash(devServicesEnabled, imageName, fixedExposedPort); + return Objects.hash(devServicesEnabled, provider, imageName, fixedExposedPort); } } diff --git a/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/KafkaDevServicesBuildTimeConfig.java b/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/KafkaDevServicesBuildTimeConfig.java index 30b9a780a1bbf..36fc407e05d26 100644 --- a/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/KafkaDevServicesBuildTimeConfig.java +++ b/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/KafkaDevServicesBuildTimeConfig.java @@ -28,15 +28,9 @@ public class KafkaDevServicesBuildTimeConfig { public Optional port; /** - * The Kafka container image to use. - *

- * Only Redpanda and Strimzi images are supported. - * Default image is Redpanda. + * Kafka dev service container type. *

- * Note that Strimzi images are launched in Kraft mode. - * In order to use a Strimzi image you need to set a compatible image name such as - * {@code quay.io/strimzi-test-container/test-container:0.100.0-kafka-3.1.0} or - * {@code quay.io/strimzi/kafka:0.27.1-kafka-3.0.0} + * Redpanda, Strimzi and kafka-native container providers are supported. Default is redpanda. *

* For Redpanda: * See https://vectorized.io/docs/quick-start-docker/ and https://hub.docker.com/r/vectorized/redpanda @@ -44,9 +38,37 @@ public class KafkaDevServicesBuildTimeConfig { * For Strimzi: * See https://github.com/strimzi/test-container and https://quay.io/repository/strimzi-test-container/test-container *

+ * For Kafka Native: + * See https://github.com/ozangunalp/kafka-native and https://quay.io/repository/ogunalp/kafka-native + *

+ * Note that Strimzi and Kafka Native images are launched in Kraft mode. */ - @ConfigItem(defaultValue = "docker.io/vectorized/redpanda:v22.3.4") - public String imageName; + @ConfigItem(defaultValue = "redpanda") + public Provider provider = Provider.REDPANDA; + + public enum Provider { + REDPANDA("docker.io/vectorized/redpanda:v22.3.4"), + STRIMZI("quay.io/strimzi-test-container/test-container:latest-kafka-3.2.1"), + KAFKA_NATIVE("quay.io/ogunalp/kafka-native:latest"); + + private final String defaultImageName; + + Provider(String imageName) { + this.defaultImageName = imageName; + } + + public String getDefaultImageName() { + return defaultImageName; + } + } + + /** + * The Kafka container image to use. + *

+ * Dependent on the provider. + */ + @ConfigItem + public Optional imageName; /** * Indicates if the Kafka broker managed by Quarkus Dev Services is shared. diff --git a/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/KafkaNativeContainer.java b/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/KafkaNativeContainer.java new file mode 100644 index 0000000000000..b6fff5fd04a9c --- /dev/null +++ b/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/KafkaNativeContainer.java @@ -0,0 +1,99 @@ +package io.quarkus.kafka.client.deployment; + +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; + +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.images.builder.Transferable; +import org.testcontainers.utility.DockerImageName; + +import com.github.dockerjava.api.command.InspectContainerResponse; + +import io.quarkus.devservices.common.ConfigureUtil; + +public class KafkaNativeContainer extends GenericContainer { + + private static final String STARTER_SCRIPT = "/work/run.sh"; + + private final Integer fixedExposedPort; + private final boolean useSharedNetwork; + + private String additionalArgs = null; + private int exposedPort = -1; + + private String hostName = null; + + public KafkaNativeContainer(DockerImageName dockerImageName, int fixedExposedPort, String serviceName, + boolean useSharedNetwork) { + super(dockerImageName); + this.fixedExposedPort = fixedExposedPort; + this.useSharedNetwork = useSharedNetwork; + if (serviceName != null) { + withLabel(DevServicesKafkaProcessor.DEV_SERVICE_LABEL, serviceName); + } + String cmd = String.format("while [ ! -f %s ]; do sleep 0.1; done; sleep 0.1; %s", STARTER_SCRIPT, STARTER_SCRIPT); + withCommand("sh", "-c", cmd); + waitingFor(Wait.forLogMessage(".*Kafka broker started.*", 1)); + } + + @Override + protected void containerIsStarting(InspectContainerResponse containerInfo, boolean reused) { + super.containerIsStarting(containerInfo, reused); + // Set exposed port + this.exposedPort = getMappedPort(DevServicesKafkaProcessor.KAFKA_PORT); + // follow output + // Start and configure the advertised address + String cmd = "#!/bin/bash\n"; + cmd += "/work/kafka"; + cmd += " -Dkafka.advertised.listeners=" + getBootstrapServers(); + if (useSharedNetwork) { + cmd += " -Dkafka.listeners=BROKER://:9093,PLAINTEXT://:9092,CONTROLLER://:9094"; + cmd += " -Dkafka.interbroker.listener.name=BROKER"; + cmd += " -Dkafka.controller.listener.names=CONTROLLER"; + cmd += " -Dkafka.listener.security.protocol.map=BROKER:PLAINTEXT,CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT"; + cmd += " -Dkafka.early.start.listeners=BROKER,CONTROLLER,PLAINTEXT"; + } + if (additionalArgs != null) { + cmd += " " + additionalArgs; + } + + //noinspection OctalInteger + copyFileToContainer( + Transferable.of(cmd.getBytes(StandardCharsets.UTF_8), 0777), + STARTER_SCRIPT); + } + + private String getKafkaAdvertisedListeners() { + List addresses = new ArrayList<>(); + if (useSharedNetwork) { + addresses.add(String.format("BROKER://%s:9093", hostName)); + } + // See https://github.com/quarkusio/quarkus/issues/21819 + // Kafka is always exposed to the Docker host network + addresses.add(String.format("PLAINTEXT://%s:%d", getHost(), getExposedKafkaPort())); + return String.join(",", addresses); + } + + public int getExposedKafkaPort() { + return exposedPort; + } + + @Override + protected void configure() { + super.configure(); + + addExposedPort(DevServicesKafkaProcessor.KAFKA_PORT); + hostName = ConfigureUtil.configureSharedNetwork(this, "kafka"); + + if (fixedExposedPort != null) { + addFixedExposedPort(fixedExposedPort, DevServicesKafkaProcessor.KAFKA_PORT); + } + } + + public String getBootstrapServers() { + return getKafkaAdvertisedListeners(); + } + +} diff --git a/integration-tests/kafka-devservices/src/main/resources/application.properties b/integration-tests/kafka-devservices/src/main/resources/application.properties index 8e83b2b6d39c7..bdd341fbda057 100644 --- a/integration-tests/kafka-devservices/src/main/resources/application.properties +++ b/integration-tests/kafka-devservices/src/main/resources/application.properties @@ -5,6 +5,7 @@ quarkus.log.category.\"org.apache.zookeeper\".level=WARN # enable health check quarkus.kafka.health.enabled=true +quarkus.kafka.devservices.provider=kafka-native quarkus.kafka.devservices.topic-partitions.test=2 quarkus.kafka.devservices.topic-partitions.test-consumer=3 quarkus.kafka.devservices.topic-partitions-timeout=4S