Skip to content

Commit

Permalink
Allow Pulsar default WaitStrategy to honour startup timeout (#5674)
Browse files Browse the repository at this point in the history
  • Loading branch information
Dave Maughan committed Sep 29, 2022
1 parent f54a29a commit 405ddb7
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,8 @@

import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.containers.wait.strategy.WaitAllStrategy;
import org.testcontainers.containers.wait.strategy.WaitStrategy;
import org.testcontainers.utility.DockerImageName;

import java.util.ArrayList;
import java.util.List;

/**
* This container wraps Apache Pulsar running in standalone mode
*/
Expand Down Expand Up @@ -36,6 +32,8 @@ public class PulsarContainer extends GenericContainer<PulsarContainer> {
@Deprecated
private static final String DEFAULT_TAG = "2.10.0";

private final WaitAllStrategy waitAllStrategy = new WaitAllStrategy();

private boolean functionsWorkerEnabled = false;

private boolean transactionsEnabled = false;
Expand All @@ -60,6 +58,7 @@ public PulsarContainer(final DockerImageName dockerImageName) {
super(dockerImageName);
dockerImageName.assertCompatibleWith(DockerImageName.parse("apachepulsar/pulsar"));
withExposedPorts(BROKER_PORT, BROKER_HTTP_PORT);
setWaitStrategy(waitAllStrategy);
}

@Override
Expand Down Expand Up @@ -98,21 +97,18 @@ protected void setupCommandAndEnv() {

final String clusterName = getEnvMap().getOrDefault("PULSAR_PREFIX_clusterName", "standalone");
final String response = String.format("[\"%s\"]", clusterName);

List<WaitStrategy> waitStrategies = new ArrayList<>();
waitStrategies.add(Wait.defaultWaitStrategy());
waitStrategies.add(
waitAllStrategy.withStrategy(
Wait.forHttp(ADMIN_CLUSTERS_ENDPOINT).forPort(BROKER_HTTP_PORT).forResponsePredicate(response::equals)
);

if (transactionsEnabled) {
withEnv("PULSAR_PREFIX_transactionCoordinatorEnabled", "true");
waitStrategies.add(Wait.forHttp(TRANSACTION_TOPIC_ENDPOINT).forStatusCode(200).forPort(BROKER_HTTP_PORT));
waitAllStrategy.withStrategy(
Wait.forHttp(TRANSACTION_TOPIC_ENDPOINT).forStatusCode(200).forPort(BROKER_HTTP_PORT)
);
}
if (functionsWorkerEnabled) {
waitStrategies.add(Wait.forLogMessage(".*Function worker service started.*", 1));
waitAllStrategy.withStrategy(Wait.forLogMessage(".*Function worker service started.*", 1));
}
final WaitAllStrategy compoundedWaitStrategy = new WaitAllStrategy();
waitStrategies.forEach(compoundedWaitStrategy::withStrategy);
waitingFor(compoundedWaitStrategy);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.junit.Test;
import org.testcontainers.utility.DockerImageName;

import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -144,6 +145,14 @@ public void testClusterFullyInitialized() throws Exception {
}
}

@Test
public void testStartupTimeoutIsHonored() {
try (PulsarContainer pulsar = new PulsarContainer(PULSAR_IMAGE).withStartupTimeout(Duration.ZERO)) {
assertThatThrownBy(pulsar::start)
.hasRootCauseMessage("Precondition failed: timeout must be greater than zero");
}
}

protected void testPulsarFunctionality(String pulsarBrokerUrl) throws Exception {
try (
PulsarClient client = PulsarClient.builder().serviceUrl(pulsarBrokerUrl).build();
Expand Down

0 comments on commit 405ddb7

Please sign in to comment.