Skip to content

Commit

Permalink
Improve RedpandaContainer (#7320)
Browse files Browse the repository at this point in the history
* Configure Redpanda via `.bootstrap.yml` and `redpanda.yml`
* Allow to enable authorization and authentication methods
* Allow to add additional listeners. E.g. using it with toxiproxy or redpanda console
* Enable rest proxy
* Configure rpk

Fixes #6395 

---------

Co-authored-by: Kevin Wittek <kiview@users.noreply.github.com>
  • Loading branch information
eddumelendez and kiview authored Aug 17, 2023
1 parent d48bab7 commit f5471c8
Show file tree
Hide file tree
Showing 7 changed files with 562 additions and 12 deletions.
38 changes: 38 additions & 0 deletions docs/modules/redpanda.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,44 @@ Redpanda also provides a schema registry implementation. Like the Redpanda broke
[Schema Registry](../../modules/redpanda/src/test/java/org/testcontainers/redpanda/RedpandaContainerTest.java) inside_block:getSchemaRegistryAddress
<!--/codeinclude-->

It is also possible to enable security capabilities of Redpanda by using:

<!--codeinclude-->
[Enable security](../../modules/redpanda/src/test/java/org/testcontainers/redpanda/RedpandaContainerTest.java) inside_block:security
<!--/codeinclude-->

Superusers can be created by using:

<!--codeinclude-->
[Register Superuser](../../modules/redpanda/src/test/java/org/testcontainers/redpanda/RedpandaContainerTest.java) inside_block:createSuperUser
<!--/codeinclude-->

Below is an example of how to create the `AdminClient`:

<!--codeinclude-->
[Create Admin Client](../../modules/redpanda/src/test/java/org/testcontainers/redpanda/RedpandaContainerTest.java) inside_block:createAdminClient
<!--/codeinclude-->

There are scenarios where additional listeners are needed because the consumer/producer can be another
container in the same network or a different process where the port to connect differs from the default
exposed port `9092`. E.g [Toxiproxy](../../docs/modules/toxiproxy.md).

<!--codeinclude-->
[Register additional listener](../../modules/redpanda/src/test/java/org/testcontainers/redpanda/RedpandaContainerTest.java) inside_block:registerListener
<!--/codeinclude-->

Container defined in the same network:

<!--codeinclude-->
[Create kcat container](../../modules/redpanda/src/test/java/org/testcontainers/redpanda/RedpandaContainerTest.java) inside_block:createKCatContainer
<!--/codeinclude-->

Client using the new registered listener:

<!--codeinclude-->
[Produce/Consume via new listener](../../modules/redpanda/src/test/java/org/testcontainers/redpanda/RedpandaContainerTest.java) inside_block:produceConsumeMessage
<!--/codeinclude-->

## Adding this module to your project dependencies

Add the following dependency to your `pom.xml`/`build.gradle` file:
Expand Down
1 change: 1 addition & 0 deletions modules/redpanda/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ description = "Testcontainers :: Redpanda"

dependencies {
api project(':testcontainers')
shaded 'org.freemarker:freemarker:2.3.32'

testImplementation 'org.apache.kafka:kafka-clients:3.5.1'
testImplementation 'org.assertj:assertj-core:3.24.2'
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,31 @@
package org.testcontainers.redpanda;

import com.github.dockerjava.api.command.InspectContainerResponse;
import freemarker.template.Configuration;
import freemarker.template.Template;
import lombok.AllArgsConstructor;
import lombok.Cleanup;
import lombok.Data;
import lombok.SneakyThrows;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.images.builder.Transferable;
import org.testcontainers.utility.ComparableVersion;
import org.testcontainers.utility.DockerImageName;
import org.testcontainers.utility.MountableFile;

import java.io.ByteArrayOutputStream;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;

/**
* Testcontainers implementation for Redpanda.
Expand All @@ -14,6 +34,7 @@
* <ul>
* <li>Broker: 9092</li>
* <li>Schema Registry: 8081</li>
* <li>Proxy: 8082</li>
* </ul>
*/
public class RedpandaContainer extends GenericContainer<RedpandaContainer> {
Expand All @@ -30,9 +51,21 @@ public class RedpandaContainer extends GenericContainer<RedpandaContainer> {

private static final int REDPANDA_PORT = 9092;

private static final int REDPANDA_ADMIN_PORT = 9644;

private static final int SCHEMA_REGISTRY_PORT = 8081;

private static final String STARTER_SCRIPT = "/testcontainers_start.sh";
private static final int REST_PROXY_PORT = 8082;

private boolean enableAuthorization;

private String authenticationMethod = "none";

private String schemaRegistryAuthenticationMethod = "none";

private final List<String> superusers = new ArrayList<>();

private final Set<Supplier<Listener>> listenersValueSupplier = new HashSet<>();

public RedpandaContainer(String image) {
this(DockerImageName.parse(image));
Expand All @@ -47,33 +80,198 @@ public RedpandaContainer(DockerImageName imageName) {
throw new IllegalArgumentException("Redpanda version must be >= v22.2.1");
}

withExposedPorts(REDPANDA_PORT, SCHEMA_REGISTRY_PORT);
withExposedPorts(REDPANDA_PORT, REDPANDA_ADMIN_PORT, SCHEMA_REGISTRY_PORT, REST_PROXY_PORT);
withCreateContainerCmdModifier(cmd -> {
cmd.withEntrypoint("sh");
cmd.withEntrypoint();
cmd.withUser("root:root");
});
waitingFor(Wait.forLogMessage(".*Started Kafka API server.*", 1));
withCommand("-c", "while [ ! -f " + STARTER_SCRIPT + " ]; do sleep 0.1; done; " + STARTER_SCRIPT);
waitingFor(Wait.forLogMessage(".*Successfully started Redpanda!.*", 1));
withCopyFileToContainer(
MountableFile.forClasspathResource("testcontainers/entrypoint-tc.sh", 0700),
"/entrypoint-tc.sh"
);
withCommand("/entrypoint-tc.sh", "redpanda", "start", "--mode=dev-container", "--smp=1", "--memory=1G");
}

@Override
protected void configure() {
this.listenersValueSupplier.stream()
.map(Supplier::get)
.map(Listener::getAddress)
.forEach(this::withNetworkAliases);
}

@SneakyThrows
@Override
protected void containerIsStarting(InspectContainerResponse containerInfo) {
super.containerIsStarting(containerInfo);

String command = "#!/bin/bash\n";

command += "/usr/bin/rpk redpanda start --mode dev-container --smp 1 --memory 1G ";
command += "--kafka-addr PLAINTEXT://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092 ";
command +=
"--advertise-kafka-addr PLAINTEXT://127.0.0.1:29092,OUTSIDE://" + getHost() + ":" + getMappedPort(9092);
Configuration cfg = new Configuration(Configuration.DEFAULT_INCOMPATIBLE_IMPROVEMENTS);
cfg.setClassLoaderForTemplateLoading(getClass().getClassLoader(), "testcontainers");
cfg.setDefaultEncoding("UTF-8");

copyFileToContainer(Transferable.of(command, 0777), STARTER_SCRIPT);
copyFileToContainer(getBootstrapFile(cfg), "/etc/redpanda/.bootstrap.yaml");
copyFileToContainer(getRedpandaFile(cfg), "/etc/redpanda/redpanda.yaml");
}

/**
* Returns the bootstrap servers address.
* @return the bootstrap servers address
*/
public String getBootstrapServers() {
return String.format("PLAINTEXT://%s:%s", getHost(), getMappedPort(REDPANDA_PORT));
}

/**
* Returns the schema registry address.
* @return the schema registry address
*/
public String getSchemaRegistryAddress() {
return String.format("http://%s:%s", getHost(), getMappedPort(SCHEMA_REGISTRY_PORT));
}

/**
* Returns the admin address.
* @return the admin address
*/
public String getAdminAddress() {
return String.format("http://%s:%s", getHost(), getMappedPort(REDPANDA_ADMIN_PORT));
}

/**
* Returns the rest proxy address.
* @return the rest proxy address
*/
public String getRestProxyAddress() {
return String.format("http://%s:%s", getHost(), getMappedPort(REST_PROXY_PORT));
}

/**
* Enables authorization.
* @return this {@link RedpandaContainer} instance
*/
public RedpandaContainer enableAuthorization() {
this.enableAuthorization = true;
return this;
}

/**
* Enables SASL.
* @return this {@link RedpandaContainer} instance
*/
public RedpandaContainer enableSasl() {
this.authenticationMethod = "sasl";
return this;
}

/**
* Enables Http Basic Auth for Schema Registry.
* @return this {@link RedpandaContainer} instance
*/
public RedpandaContainer enableSchemaRegistryHttpBasicAuth() {
this.schemaRegistryAuthenticationMethod = "http_basic";
return this;
}

/**
* Register username as a superuser.
* @param username username to register as a superuser
* @return this {@link RedpandaContainer} instance
*/
public RedpandaContainer withSuperuser(String username) {
this.superusers.add(username);
return this;
}

/**
* Add a {@link Supplier} that will provide a listener with format {@code host:port}.
* Host will be added as a network alias.
* <p>
* The listener will be added to the default listeners.
* <p>
* Default listeners:
* <ul>
* <li>0.0.0.0:9092</li>
* <li>0.0.0.0:9093</li>
* </ul>
* <p>
* Default advertised listeners:
* <ul>
* <li>{@code container.getHost():container.getMappedPort(9092)}</li>
* <li>127.0.0.1:9093</li>
* </ul>
* @param listenerSupplier a supplier that will provide a listener
* @return this {@link RedpandaContainer} instance
*/
public RedpandaContainer withListener(Supplier<String> listenerSupplier) {
String[] parts = listenerSupplier.get().split(":");
this.listenersValueSupplier.add(() -> new Listener(parts[0], Integer.parseInt(parts[1])));
return this;
}

private Transferable getBootstrapFile(Configuration cfg) {
Map<String, Object> kafkaApi = new HashMap<>();
kafkaApi.put("enableAuthorization", this.enableAuthorization);
kafkaApi.put("superusers", this.superusers);

Map<String, Object> root = new HashMap<>();
root.put("kafkaApi", kafkaApi);

String file = resolveTemplate(cfg, "bootstrap.yaml.ftl", root);

return Transferable.of(file, 0700);
}

private Transferable getRedpandaFile(Configuration cfg) {
List<Map<String, Object>> listeners =
this.listenersValueSupplier.stream()
.map(Supplier::get)
.map(listener -> {
Map<String, Object> listenerMap = new HashMap<>();
listenerMap.put("address", listener.getAddress());
listenerMap.put("port", listener.getPort());
return listenerMap;
})
.collect(Collectors.toList());

Map<String, Object> kafkaApi = new HashMap<>();
kafkaApi.put("authenticationMethod", this.authenticationMethod);
kafkaApi.put("enableAuthorization", this.enableAuthorization);
kafkaApi.put("advertisedHost", getHost());
kafkaApi.put("advertisedPort", getMappedPort(9092));
kafkaApi.put("listeners", listeners);

Map<String, Object> schemaRegistry = new HashMap<>();
schemaRegistry.put("authenticationMethod", this.schemaRegistryAuthenticationMethod);

Map<String, Object> root = new HashMap<>();
root.put("kafkaApi", kafkaApi);
root.put("schemaRegistry", schemaRegistry);

String file = resolveTemplate(cfg, "redpanda.yaml.ftl", root);

return Transferable.of(file, 0700);
}

@SneakyThrows
private String resolveTemplate(Configuration cfg, String template, Map<String, Object> data) {
Template temp = cfg.getTemplate(template);

@Cleanup
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
@Cleanup
Writer out = new OutputStreamWriter(byteArrayOutputStream, StandardCharsets.UTF_8);
temp.process(data, out);

return new String(byteArrayOutputStream.toByteArray(), StandardCharsets.UTF_8);
}

@Data
@AllArgsConstructor
private static class Listener {

private String address;

private int port;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Injected by testcontainers
# This file contains cluster properties which will only be considered when
# starting the cluster for the first time. Afterwards, you can configure cluster
# properties via the Redpanda Admi n API.
superusers:
<#if kafkaApi.superusers?has_content >
<#list kafkaApi.superusers as superuser>
- ${superuser}
</#list>
<#else>
[]
</#if>

<#if kafkaApi.enableAuthorization >
kafka_enable_authorization: true
</#if>

auto_create_topics_enabled: true
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#!/usr/bin/env bash

# Wait for testcontainer's injected redpanda config with the port only known after docker start
until grep -q "# Injected by testcontainers" "/etc/redpanda/redpanda.yaml"
do
sleep 0.1
done
exec /entrypoint.sh $@
Loading

0 comments on commit f5471c8

Please sign in to comment.