Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pulsar: add flag to enable transactions and set configuration #5479

Merged
31 changes: 31 additions & 0 deletions docs/modules/pulsar.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,36 @@
# Apache Pulsar Module

Testcontainers can be used to automatically create [Apache Pulsar](https://pulsar.apache.org) containers without external services.


## Example

Create a `PulsarContainer` to use it in your tests:

<!--codeinclude-->
[Create a Pulsar container](../../modules/pulsar/src/test/java/org/testcontainers/containers/PulsarContainerTest.java) inside_block:constructorWithVersion
<!--/codeinclude-->

Then you can retrieve the broker and the admin url:

<!--codeinclude-->
[Get broker and admin urls](../../modules/pulsar/src/test/java/org/testcontainers/containers/PulsarContainerTest.java) inside_block:coordinates
<!--/codeinclude-->

If you need to test Pulsar IO framework you can enable the Pulsar Functions Worker:

<!--codeinclude-->
[Create a Pulsar container with functions worker](../../modules/pulsar/src/test/java/org/testcontainers/containers/PulsarContainerTest.java) inside_block:constructorWithFunctionsWorker
<!--/codeinclude-->


If you need to test Pulsar Transactions you can enable the transactions feature:

<!--codeinclude-->
[Create a Pulsar container with transactions](../../modules/pulsar/src/test/java/org/testcontainers/containers/PulsarContainerTest.java) inside_block:constructorWithTransactions
<!--/codeinclude-->


## Adding this module to your project dependencies

Add the following dependency to your `pom.xml`/`build.gradle` file:
Expand Down
4 changes: 2 additions & 2 deletions modules/pulsar/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ description = "Testcontainers :: Pulsar"
dependencies {
api project(':testcontainers')

testImplementation group: 'org.apache.pulsar', name: 'pulsar-client', version: '2.7.4'
testImplementation group: 'org.apache.pulsar', name: 'pulsar-client', version: '2.10.0'
testImplementation group: 'org.assertj', name: 'assertj-core', version: '3.23.1'
testImplementation group: 'org.apache.pulsar', name: 'pulsar-client-admin', version: '2.7.4'
testImplementation group: 'org.apache.pulsar', name: 'pulsar-client-admin', version: '2.10.0'
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import org.testcontainers.containers.wait.strategy.WaitAllStrategy;
import org.testcontainers.utility.DockerImageName;

import java.util.Map;

/**
* This container wraps Apache Pulsar running in standalone mode
*/
Expand All @@ -15,13 +17,21 @@ public class PulsarContainer extends GenericContainer<PulsarContainer> {

public static final String METRICS_ENDPOINT = "/metrics";

/**
* See <a href="https://github.com/apache/pulsar/blob/master/pulsar-common/src/main/java/org/apache/pulsar/common/naming/SystemTopicNames.java">SystemTopicNames</a>.
*/
public static final String TRANSACTION_TOPIC_ENDPOINT =
nicoloboschi marked this conversation as resolved.
Show resolved Hide resolved
"/admin/v2/persistent/pulsar/system/transaction_coordinator_assign/partitions";

private static final DockerImageName DEFAULT_IMAGE_NAME = DockerImageName.parse("apachepulsar/pulsar");

@Deprecated
private static final String DEFAULT_TAG = "2.2.0";
private static final String DEFAULT_TAG = "2.10.0";

private boolean functionsWorkerEnabled = false;

private boolean transactions = false;
nicoloboschi marked this conversation as resolved.
Show resolved Hide resolved

/**
* @deprecated use {@link PulsarContainer(DockerImageName)} instead
*/
Expand All @@ -41,36 +51,65 @@ public PulsarContainer(String pulsarVersion) {
public PulsarContainer(final DockerImageName dockerImageName) {
super(dockerImageName);
dockerImageName.assertCompatibleWith(DockerImageName.parse("apachepulsar/pulsar"));

withExposedPorts(BROKER_PORT, BROKER_HTTP_PORT);
withCommand("/pulsar/bin/pulsar", "standalone", "--no-functions-worker", "-nss");
waitingFor(Wait.forHttp(METRICS_ENDPOINT).forStatusCode(200).forPort(BROKER_HTTP_PORT));
}

@Override
protected void configure() {
super.configure();

if (functionsWorkerEnabled) {
withCommand("/pulsar/bin/pulsar", "standalone");
waitingFor(
new WaitAllStrategy()
.withStrategy(waitStrategy)
.withStrategy(Wait.forLogMessage(".*Function worker service started.*", 1))
);
}
setupCommandAndEnv();
}

public PulsarContainer withFunctionsWorker() {
functionsWorkerEnabled = true;
return this;
}

public PulsarContainer withTransactions() {
transactions = true;
return this;
}

public PulsarContainer withConfiguration(String name, String value) {
return withEnv("PULSAR_PREFIX_" + name, value);
}

public PulsarContainer withConfiguration(Map<String, String> configuration) {
configuration.forEach((name, value) -> withEnv("PULSAR_PREFIX_" + name, value));
return this;
}
nicoloboschi marked this conversation as resolved.
Show resolved Hide resolved

public String getPulsarBrokerUrl() {
return String.format("pulsar://%s:%s", getHost(), getMappedPort(BROKER_PORT));
}

public String getHttpServiceUrl() {
return String.format("http://%s:%s", getHost(), getMappedPort(BROKER_HTTP_PORT));
}

protected void setupCommandAndEnv() {
String standaloneBaseCommand =
"/pulsar/bin/apply-config-from-env.py /pulsar/conf/standalone.conf " + "&& bin/pulsar standalone";

if (!functionsWorkerEnabled) {
standaloneBaseCommand += " --no-functions-worker -nss";
}

withCommand("/bin/bash", "-c", standaloneBaseCommand);
if (transactions) {
withConfiguration("transactionCoordinatorEnabled", "true");
}

if (functionsWorkerEnabled) {
nicoloboschi marked this conversation as resolved.
Show resolved Hide resolved
waitingFor(
new WaitAllStrategy()
.withStrategy(waitStrategy)
.withStrategy(Wait.forLogMessage(".*Function worker service started.*", 1))
);
} else if (transactions) {
waitingFor(Wait.forHttp(TRANSACTION_TOPIC_ENDPOINT).forStatusCode(200).forPort(BROKER_HTTP_PORT));
nicoloboschi marked this conversation as resolved.
Show resolved Hide resolved
} else {
waitingFor(Wait.forHttp(METRICS_ENDPOINT).forStatusCode(200).forPort(BROKER_HTTP_PORT));
}
}
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
package org.testcontainers.containers;

import com.beust.jcommander.internal.Maps;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.junit.Test;
import org.testcontainers.utility.DockerImageName;

Expand All @@ -19,36 +23,102 @@ public class PulsarContainerTest {

public static final String TEST_TOPIC = "test_topic";

private static final DockerImageName PULSAR_IMAGE = DockerImageName.parse("apachepulsar/pulsar:2.2.0");
private static final DockerImageName PULSAR_IMAGE = DockerImageName.parse("apachepulsar/pulsar:2.10.0");

@Test
public void testUsage() throws Exception {
try (PulsarContainer pulsar = new PulsarContainer(PULSAR_IMAGE)) {
try (
// do not use PULSAR_IMAGE to make the doc looks easier
// constructorWithVersion {
PulsarContainer pulsar = new PulsarContainer(DockerImageName.parse("apachepulsar/pulsar:2.10.0"));
// }
) {
pulsar.start();
testPulsarFunctionality(pulsar.getPulsarBrokerUrl());
// coordinates {
final String pulsarBrokerUrl = pulsar.getPulsarBrokerUrl();
final String HttpServiceUrl = pulsar.getHttpServiceUrl();
// }
testPulsarFunctionality(pulsarBrokerUrl);
}
}

@Test
public void shouldNotEnableFunctionsWorkerByDefault() throws Exception {
try (PulsarContainer pulsar = new PulsarContainer("2.5.1")) {
try (PulsarContainer pulsar = new PulsarContainer(PULSAR_IMAGE)) {
pulsar.start();

PulsarAdmin pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl(pulsar.getHttpServiceUrl()).build();

assertThatThrownBy(() -> pulsarAdmin.functions().getFunctions("public", "default"))
.isInstanceOf(PulsarAdminException.class);
try (PulsarAdmin pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl(pulsar.getHttpServiceUrl()).build();) {
nicoloboschi marked this conversation as resolved.
Show resolved Hide resolved
assertThatThrownBy(() -> pulsarAdmin.functions().getFunctions("public", "default"))
.isInstanceOf(PulsarAdminException.class);
}
}
}

@Test
public void shouldWaitForFunctionsWorkerStarted() throws Exception {
try (PulsarContainer pulsar = new PulsarContainer("2.5.1").withFunctionsWorker()) {
try (
// constructorWithFunctionsWorker {
PulsarContainer pulsar = new PulsarContainer(PULSAR_IMAGE).withFunctionsWorker();
// }
) {
pulsar.start();

PulsarAdmin pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl(pulsar.getHttpServiceUrl()).build();
try (PulsarAdmin pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl(pulsar.getHttpServiceUrl()).build();) {
nicoloboschi marked this conversation as resolved.
Show resolved Hide resolved
assertThat(pulsarAdmin.functions().getFunctions("public", "default")).hasSize(0);
}
}
}

@Test
public void testTransactions() throws Exception {
try (
// constructorWithTransactions {
PulsarContainer pulsar = new PulsarContainer(PULSAR_IMAGE).withTransactions();
// }
) {
pulsar.start();

assertThat(pulsarAdmin.functions().getFunctions("public", "default")).hasSize(0);
try (PulsarAdmin pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl(pulsar.getHttpServiceUrl()).build();) {
nicoloboschi marked this conversation as resolved.
Show resolved Hide resolved
assertThat(
pulsarAdmin
.topics()
.getList("pulsar/system")
.contains("persistent://pulsar/system/transaction_coordinator_assign-partition-0")
)
.isTrue();
}
testTransactionFunctionality(pulsar.getPulsarBrokerUrl());
}
}

@Test
public void testTransactionsAndFunctionsWorker() throws Exception {
try (PulsarContainer pulsar = new PulsarContainer(PULSAR_IMAGE).withTransactions().withFunctionsWorker()) {
pulsar.start();

try (PulsarAdmin pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl(pulsar.getHttpServiceUrl()).build();) {
assertThat(
pulsarAdmin
.topics()
.getList("pulsar/system")
.contains("persistent://pulsar/system/transaction_coordinator_assign-partition-0")
)
.isTrue();
assertThat(pulsarAdmin.functions().getFunctions("public", "default")).hasSize(0);
}
testTransactionFunctionality(pulsar.getPulsarBrokerUrl());
}
}

@Test
public void testConfiguration() throws Exception {
try (PulsarContainer pulsar = new PulsarContainer(PULSAR_IMAGE)) {
pulsar.withConfiguration("clusterName", "mycluster");
assertThat(pulsar.getEnvMap().get("PULSAR_PREFIX_clusterName")).isEqualTo("mycluster");
nicoloboschi marked this conversation as resolved.
Show resolved Hide resolved

pulsar.withConfiguration(Maps.newHashMap("clusterName", "mycluster2", "maxTopicsPerNamespace", "10"));
assertThat(pulsar.getEnvMap().get("PULSAR_PREFIX_clusterName")).isEqualTo("mycluster2");
assertThat(pulsar.getEnvMap().get("PULSAR_PREFIX_maxTopicsPerNamespace")).isEqualTo("10");
nicoloboschi marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand All @@ -65,4 +135,27 @@ protected void testPulsarFunctionality(String pulsarBrokerUrl) throws Exception
assertThat(new String(message.getData())).isEqualTo("test containers");
}
}

protected void testTransactionFunctionality(String pulsarBrokerUrl) throws Exception {
try (
PulsarClient client = PulsarClient.builder().serviceUrl(pulsarBrokerUrl).enableTransaction(true).build();
Consumer<String> consumer = client
.newConsumer(Schema.STRING)
.topic("transaction-topic")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscriptionName("test-transaction-sub")
.subscribe();
Producer<String> producer = client
.newProducer(Schema.STRING)
.sendTimeout(0, TimeUnit.SECONDS)
.topic("transaction-topic")
.create()
) {
final Transaction transaction = client.newTransaction().build().get();
producer.newMessage(transaction).value("first").send();
transaction.commit();
Message<String> message = consumer.receive();
assertThat(message.getValue()).isEqualTo("first");
}
}
}