diff --git a/src/test/java/io/streamnative/kop/KafkaIntegrationTest.java b/src/test/java/io/streamnative/kop/KafkaIntegrationTest.java index 8941561898..3e08fa238c 100644 --- a/src/test/java/io/streamnative/kop/KafkaIntegrationTest.java +++ b/src/test/java/io/streamnative/kop/KafkaIntegrationTest.java @@ -26,10 +26,13 @@ */ package io.streamnative.kop; +import static io.streamnative.kop.KafkaProtocolHandler.PLAINTEXT_PREFIX; +import static io.streamnative.kop.KafkaProtocolHandler.SSL_PREFIX; import static org.testng.AssertJUnit.assertFalse; import static org.testng.AssertJUnit.assertTrue; import com.google.common.collect.Sets; +import java.net.InetAddress; import java.util.Optional; import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; @@ -112,8 +115,18 @@ private static void checkForErrorsInLogs(final String logs) { protected void setup() throws Exception { super.resetConfig(); + // in order to access PulsarBroker when using Docker for Mac, we need to adjust things: + // - set pulsar advertized address to host IP + // - use the `host.testcontainers.internal` address exposed by testcontainers + String ip = InetAddress.getLocalHost().getHostAddress(); + System.out.println("exposing Pulsar broker on " + ip); + super.conf.setAdvertisedAddress(ip); + this.conf.setListeners( + PLAINTEXT_PREFIX + ip + ":" + kafkaBrokerPort + "," + + SSL_PREFIX + ip + ":" + kafkaBrokerPortTls); super.internalSetup(); + if (!this.admin.clusters().getClusters().contains(this.configClusterName)) { // so that clients can test short names this.admin.clusters().createCluster(this.configClusterName, @@ -156,7 +169,7 @@ void simpleProduceAndConsume(final String integration, final Optional to System.out.println("topic created"); final GenericContainer producer = new GenericContainer<>("streamnative/kop-test-" + integration) - .withEnv("KOP_BROKER", "localhost:" + super.kafkaBrokerPort) + .withEnv("KOP_BROKER", "host.testcontainers.internal:" + super.kafkaBrokerPort) .withEnv("KOP_PRODUCE", "true") .withEnv("KOP_TOPIC", topic.orElse(integration)) .withEnv("KOP_LIMIT", "10") @@ -165,7 +178,7 @@ void simpleProduceAndConsume(final String integration, final Optional to .withNetworkMode("host"); final GenericContainer consumer = new GenericContainer<>("streamnative/kop-test-" + integration) - .withEnv("KOP_BROKER", "localhost:" + super.kafkaBrokerPort) + .withEnv("KOP_BROKER", "host.testcontainers.internal:" + super.kafkaBrokerPort) .withEnv("KOP_TOPIC", topic.orElse(integration)) .withEnv("KOP_CONSUME", "true") .withEnv("KOP_LIMIT", "10")