Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Kafka config dynamically #4316

Merged
merged 3 commits into from
Jul 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ public class GenericContainer<SELF extends GenericContainer<SELF>>
@NonNull
private String networkMode;

@NonNull
@Nullable
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the annotation was wrong (we do allow resetting the network to null (no network))

private Network network;

@NonNull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,8 @@

import com.github.dockerjava.api.command.InspectContainerResponse;
import lombok.SneakyThrows;
import org.testcontainers.images.builder.Transferable;
import org.testcontainers.utility.DockerImageName;

import java.nio.charset.StandardCharsets;
import java.util.Comparator;

/**
* This container wraps Confluent Kafka and Zookeeper (optionally)
*
Expand All @@ -17,20 +13,14 @@ public class KafkaContainer extends GenericContainer<KafkaContainer> {
private static final DockerImageName DEFAULT_IMAGE_NAME = DockerImageName.parse("confluentinc/cp-kafka");
private static final String DEFAULT_TAG = "5.4.3";

private static final String STARTER_SCRIPT = "/testcontainers_start.sh";

public static final int KAFKA_PORT = 9093;

public static final int ZOOKEEPER_PORT = 2181;

private static final String DEFAULT_INTERNAL_TOPIC_RF = "1";

private static final int PORT_NOT_ASSIGNED = -1;

protected String externalZookeeperConnect = null;

private int port = PORT_NOT_ASSIGNED;

/**
* @deprecated use {@link KafkaContainer(DockerImageName)} instead
*/
Expand Down Expand Up @@ -80,83 +70,59 @@ public KafkaContainer withExternalZookeeper(String connectString) {
}

public String getBootstrapServers() {
if (port == PORT_NOT_ASSIGNED) {
throw new IllegalStateException("You should start Kafka container first");
}
return String.format("PLAINTEXT://%s:%s", getHost(), port);
}

@Override
protected void doStart() {
withCommand("sh", "-c", "while [ ! -f " + STARTER_SCRIPT + " ]; do sleep 0.1; done; " + STARTER_SCRIPT);

if (externalZookeeperConnect == null) {
addExposedPort(ZOOKEEPER_PORT);
}

super.doStart();
return String.format("PLAINTEXT://%s:%s", getHost(), getMappedPort(KAFKA_PORT));
}

@Override
@SneakyThrows
protected void containerIsStarting(InspectContainerResponse containerInfo, boolean reused) {
super.containerIsStarting(containerInfo, reused);

port = getMappedPort(KAFKA_PORT);

if (reused) {
return;
}
protected void configure() {
withEnv(
"KAFKA_ADVERTISED_LISTENERS",
String.format(
"BROKER://%s:9092",
getNetwork() != null
? getNetworkAliases().get(0)
: "localhost"
)
);
Comment on lines +78 to +86
Copy link
Member Author

@bsideup bsideup Jul 30, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if set to just "localhost", clustered Kafka would fail, because There Can Be Only One Localhost ™


String command = "#!/bin/bash\n";
final String zookeeperConnect;
if (externalZookeeperConnect != null) {
zookeeperConnect = externalZookeeperConnect;
withEnv("KAFKA_ZOOKEEPER_CONNECT", externalZookeeperConnect);
} else {
zookeeperConnect = "localhost:" + ZOOKEEPER_PORT;
addExposedPort(ZOOKEEPER_PORT);
withEnv("KAFKA_ZOOKEEPER_CONNECT", "localhost:" + ZOOKEEPER_PORT);
command += "echo 'clientPort=" + ZOOKEEPER_PORT + "' > zookeeper.properties\n";
command += "echo 'dataDir=/var/lib/zookeeper/data' >> zookeeper.properties\n";
command += "echo 'dataLogDir=/var/lib/zookeeper/log' >> zookeeper.properties\n";
command += "zookeeper-server-start zookeeper.properties &\n";
}

command += "export KAFKA_ZOOKEEPER_CONNECT='" + zookeeperConnect + "'\n";

command += "export KAFKA_ADVERTISED_LISTENERS='" + String.join(",", getBootstrapServers(), brokerAdvertisedListener(containerInfo)) + "'\n";

command += ". /etc/confluent/docker/bash-config \n";
command += "/etc/confluent/docker/configure \n";
command += "/etc/confluent/docker/launch \n";
// Optimization: skip the checks
command += "echo '' > /etc/confluent/docker/ensure \n";
// Run the original command
command += "/etc/confluent/docker/run \n";
withCommand("sh", "-c", command);
Comment on lines -127 to +104
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this change is a tiny bit unrelated but, after looking for ways of simplifying things, I figured that we could delegate to the original CMD of Confluent's Kafka image but optimize it by removing the ensure step that adds ~2s, as it needs to instantiate a ZooKeeper client (not really necessary in our case, we have our own checks)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really like this change.

}

copyFileToContainer(
Transferable.of(command.getBytes(StandardCharsets.UTF_8), 0777),
STARTER_SCRIPT
@Override
@SneakyThrows
protected void containerIsStarted(InspectContainerResponse containerInfo) {
String brokerAdvertisedListener = brokerAdvertisedListener(containerInfo);
ExecResult result = execInContainer(
"kafka-configs",
"--alter",
"--bootstrap-server", brokerAdvertisedListener,
"--entity-type", "brokers",
"--entity-name", getEnvMap().get("KAFKA_BROKER_ID"),
"--add-config",
"advertised.listeners=[" + String.join(",", getBootstrapServers(), brokerAdvertisedListener) + "]"
);
if (result.getExitCode() != 0) {
throw new IllegalStateException(result.getStderr());
}
}

protected String brokerAdvertisedListener(InspectContainerResponse containerInfo) {
// Kafka supports only one INTER_BROKER listener, so we have to pick one.
// The current algorithm uses the following order of resolving the IP:
// 1. Custom network's IP set via `withNetwork`
// 2. Bridge network's IP
// 3. Best effort fallback to getNetworkSettings#ipAddress
String ipAddress = containerInfo.getNetworkSettings().getNetworks().entrySet()
.stream()
.filter(it -> it.getValue().getIpAddress() != null)
.max(Comparator.comparingInt(entry -> {
if (getNetwork() != null && getNetwork().getId().equals(entry.getValue().getNetworkID())) {
return 2;
}

if ("bridge".equals(entry.getKey())) {
return 1;
}

return 0;
}))
.map(it -> it.getValue().getIpAddress())
.orElseGet(() -> containerInfo.getNetworkSettings().getIpAddress());

return String.format("BROKER://%s:%s", ipAddress, "9092");
return String.format("BROKER://%s:%s", containerInfo.getConfig().getHostName(), "9092");
Comment on lines -138 to +126
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was super fun to discover, but apparently we over-engineered this, and hostName is enough to make it work reliably - it works with both "no network" (a.k.a. "bridge only") and multi network scenarios, as containers are visible by their hostname to other containers in the same network 😂

}
}