Skip to content

Commit

Permalink
Create a new network for each instance of TestingKafka
Browse files Browse the repository at this point in the history
Previously all the instance of TestingKafka were sharing the same Network which introduces random flakiness for tests involving SchemaRegistry.
  • Loading branch information
Praveen2112 committed Mar 28, 2021
1 parent d2751de commit a646c22
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 8 deletions.
Expand Up @@ -56,7 +56,7 @@ protected QueryRunner createQueryRunner()
{
kafka = TestingKafka.create();
kafka.start();
pinot = new TestingPinotCluster();
pinot = new TestingPinotCluster(kafka.getNetwork());
pinot.start();

kafka.createTopic(TOPIC_AND_TABLE);
Expand Down
Expand Up @@ -62,33 +62,33 @@ public class TestingPinotCluster
private final GenericContainer<?> zookeeper;
private final HttpClient httpClient;

public TestingPinotCluster()
public TestingPinotCluster(Network network)
{
httpClient = new JettyHttpClient();
zookeeper = new GenericContainer<>(parse("zookeeper:3.5.6"))
.withNetwork(Network.SHARED)
.withNetwork(network)
.withNetworkAliases(ZOOKEEPER_INTERNAL_HOST)
.withEnv("ZOOKEEPER_CLIENT_PORT", String.valueOf(ZOOKEEPER_PORT))
.withExposedPorts(ZOOKEEPER_PORT);

controller = new GenericContainer<>(parse(BASE_IMAGE))
.withNetwork(Network.SHARED)
.withNetwork(network)
.withClasspathResourceMapping("/pinot-controller", "/var/pinot/controller/config", BindMode.READ_ONLY)
.withEnv("JAVA_OPTS", "-Xmx512m -Dlog4j2.configurationFile=/opt/pinot/conf/pinot-controller-log4j2.xml -Dplugins.dir=/opt/pinot/plugins")
.withCommand("StartController", "-configFileName", "/var/pinot/controller/config/pinot-controller.conf")
.withNetworkAliases("pinot-controller", "localhost")
.withExposedPorts(CONTROLLER_PORT);

broker = new GenericContainer<>(parse(BASE_IMAGE))
.withNetwork(Network.SHARED)
.withNetwork(network)
.withClasspathResourceMapping("/pinot-broker", "/var/pinot/broker/config", BindMode.READ_ONLY)
.withEnv("JAVA_OPTS", "-Xmx512m -Dlog4j2.configurationFile=/opt/pinot/conf/pinot-broker-log4j2.xml -Dplugins.dir=/opt/pinot/plugins")
.withCommand("StartBroker", "-clusterName", "pinot", "-zkAddress", getZookeeperInternalHostPort(), "-configFileName", "/var/pinot/broker/config/pinot-broker.conf")
.withNetworkAliases("pinot-broker", "localhost")
.withExposedPorts(BROKER_PORT);

server = new GenericContainer<>(parse(BASE_IMAGE))
.withNetwork(Network.SHARED)
.withNetwork(network)
.withClasspathResourceMapping("/pinot-server", "/var/pinot/server/config", BindMode.READ_ONLY)
.withEnv("JAVA_OPTS", "-Xmx512m -Dlog4j2.configurationFile=/opt/pinot/conf/pinot-server-log4j2.xml -Dplugins.dir=/opt/pinot/plugins")
.withCommand("StartServer", "-clusterName", "pinot", "-zkAddress", getZookeeperInternalHostPort(), "-configFileName", "/var/pinot/server/config/pinot-server.conf")
Expand Down
Expand Up @@ -60,6 +60,7 @@ public final class TestingKafka
private static final DockerImageName KAFKA_IMAGE_NAME = DockerImageName.parse("confluentinc/cp-kafka");
private static final DockerImageName SCHEMA_REGISTRY_IMAGE_NAME = DockerImageName.parse("confluentinc/cp-schema-registry");

private final Network network;
private final KafkaContainer kafka;
private final GenericContainer<?> schemaRegistry;
private final boolean withSchemaRegistry;
Expand All @@ -83,11 +84,13 @@ public static TestingKafka createWithSchemaRegistry()
private TestingKafka(String confluentPlatformVersion, boolean withSchemaRegistry)
{
this.withSchemaRegistry = withSchemaRegistry;
network = Network.newNetwork();
closer.register(network::close);
kafka = new KafkaContainer(KAFKA_IMAGE_NAME.withTag(confluentPlatformVersion))
.withNetwork(Network.SHARED)
.withNetwork(network)
.withNetworkAliases("kafka");
schemaRegistry = new GenericContainer<>(SCHEMA_REGISTRY_IMAGE_NAME.withTag(confluentPlatformVersion))
.withNetwork(Network.SHARED)
.withNetwork(network)
.withEnv("SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", "PLAINTEXT://kafka:9092")
.withEnv("SCHEMA_REGISTRY_HOST_NAME", "0.0.0.0")
.withEnv("SCHEMA_REGISTRY_LISTENERS", "http://0.0.0.0:" + SCHEMA_REGISTRY_PORT)
Expand Down Expand Up @@ -238,4 +241,9 @@ public String getSchemaRegistryConnectString()
{
return "http://" + schemaRegistry.getContainerIpAddress() + ":" + schemaRegistry.getMappedPort(SCHEMA_REGISTRY_PORT);
}

public Network getNetwork()
{
return network;
}
}

0 comments on commit a646c22

Please sign in to comment.