Skip to content

Commit

Permalink
Pulsar: add flag to enable transactions and set configuration (#5479)
Browse files Browse the repository at this point in the history
* Add `withTransactions()` method to enable transactions on Pulsar container
* Set default version to latest released (2.10.0)
* New docker command that enables the user to easily change configuration parameters
* Added new tests to cover new methods
* Improved the documentation with simple usage and new methods
  • Loading branch information
nicoloboschi committed Jun 20, 2022
1 parent 745bfc3 commit 2afe714
Show file tree
Hide file tree
Showing 4 changed files with 195 additions and 25 deletions.
46 changes: 46 additions & 0 deletions docs/modules/pulsar.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,51 @@
# Apache Pulsar Module

Testcontainers can be used to automatically create [Apache Pulsar](https://pulsar.apache.org) containers without external services.

It's based on the official Apache Pulsar docker image, it is recommended to read the [official guide](https://pulsar.apache.org/docs/next/getting-started-docker/).

## Example

Create a `PulsarContainer` to use it in your tests:

<!--codeinclude-->
[Create a Pulsar container](../../modules/pulsar/src/test/java/org/testcontainers/containers/PulsarContainerTest.java) inside_block:constructorWithVersion
<!--/codeinclude-->

Then you can retrieve the broker and the admin url:

<!--codeinclude-->
[Get broker and admin urls](../../modules/pulsar/src/test/java/org/testcontainers/containers/PulsarContainerTest.java) inside_block:coordinates
<!--/codeinclude-->

## Options

### Configuration
If you need to set Pulsar configuration variables you can use the native APIs and set each variable with `PULSAR_PREFIX_` as prefix.

For example, if you want to enable `brokerDeduplicationEnabled`:

<!--codeinclude-->
[Set configuration variables](../../modules/pulsar/src/test/java/org/testcontainers/containers/PulsarContainerTest.java) inside_block:constructorWithEnv
<!--/codeinclude-->

### Pulsar IO

If you need to test Pulsar IO framework you can enable the Pulsar Functions Worker:

<!--codeinclude-->
[Create a Pulsar container with functions worker](../../modules/pulsar/src/test/java/org/testcontainers/containers/PulsarContainerTest.java) inside_block:constructorWithFunctionsWorker
<!--/codeinclude-->

### Pulsar Transactions

If you need to test Pulsar Transactions you can enable the transactions feature:

<!--codeinclude-->
[Create a Pulsar container with transactions](../../modules/pulsar/src/test/java/org/testcontainers/containers/PulsarContainerTest.java) inside_block:constructorWithTransactions
<!--/codeinclude-->


## Adding this module to your project dependencies

Add the following dependency to your `pom.xml`/`build.gradle` file:
Expand Down
4 changes: 2 additions & 2 deletions modules/pulsar/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ description = "Testcontainers :: Pulsar"
dependencies {
api project(':testcontainers')

testImplementation group: 'org.apache.pulsar', name: 'pulsar-client', version: '2.7.4'
testImplementation group: 'org.apache.pulsar', name: 'pulsar-client', version: '2.10.0'
testImplementation group: 'org.assertj', name: 'assertj-core', version: '3.23.1'
testImplementation group: 'org.apache.pulsar', name: 'pulsar-client-admin', version: '2.7.4'
testImplementation group: 'org.apache.pulsar', name: 'pulsar-client-admin', version: '2.10.0'
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,12 @@

import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.containers.wait.strategy.WaitAllStrategy;
import org.testcontainers.containers.wait.strategy.WaitStrategy;
import org.testcontainers.utility.DockerImageName;

import java.util.ArrayList;
import java.util.List;

/**
* This container wraps Apache Pulsar running in standalone mode
*/
Expand All @@ -15,13 +19,21 @@ public class PulsarContainer extends GenericContainer<PulsarContainer> {

public static final String METRICS_ENDPOINT = "/metrics";

/**
* See <a href="https://github.com/apache/pulsar/blob/master/pulsar-common/src/main/java/org/apache/pulsar/common/naming/SystemTopicNames.java">SystemTopicNames</a>.
*/
private static final String TRANSACTION_TOPIC_ENDPOINT =
"/admin/v2/persistent/pulsar/system/transaction_coordinator_assign/partitions";

private static final DockerImageName DEFAULT_IMAGE_NAME = DockerImageName.parse("apachepulsar/pulsar");

@Deprecated
private static final String DEFAULT_TAG = "2.2.0";
private static final String DEFAULT_TAG = "2.10.0";

private boolean functionsWorkerEnabled = false;

private boolean transactionsEnabled = false;

/**
* @deprecated use {@link PulsarContainer(DockerImageName)} instead
*/
Expand All @@ -41,36 +53,55 @@ public PulsarContainer(String pulsarVersion) {
public PulsarContainer(final DockerImageName dockerImageName) {
super(dockerImageName);
dockerImageName.assertCompatibleWith(DockerImageName.parse("apachepulsar/pulsar"));

withExposedPorts(BROKER_PORT, BROKER_HTTP_PORT);
withCommand("/pulsar/bin/pulsar", "standalone", "--no-functions-worker", "-nss");
waitingFor(Wait.forHttp(METRICS_ENDPOINT).forStatusCode(200).forPort(BROKER_HTTP_PORT));
}

@Override
protected void configure() {
super.configure();

if (functionsWorkerEnabled) {
withCommand("/pulsar/bin/pulsar", "standalone");
waitingFor(
new WaitAllStrategy()
.withStrategy(waitStrategy)
.withStrategy(Wait.forLogMessage(".*Function worker service started.*", 1))
);
}
setupCommandAndEnv();
}

public PulsarContainer withFunctionsWorker() {
functionsWorkerEnabled = true;
return this;
}

public PulsarContainer withTransactions() {
transactionsEnabled = true;
return this;
}

public String getPulsarBrokerUrl() {
return String.format("pulsar://%s:%s", getHost(), getMappedPort(BROKER_PORT));
}

public String getHttpServiceUrl() {
return String.format("http://%s:%s", getHost(), getMappedPort(BROKER_HTTP_PORT));
}

protected void setupCommandAndEnv() {
String standaloneBaseCommand =
"/pulsar/bin/apply-config-from-env.py /pulsar/conf/standalone.conf " + "&& bin/pulsar standalone";

if (!functionsWorkerEnabled) {
standaloneBaseCommand += " --no-functions-worker -nss";
}

withCommand("/bin/bash", "-c", standaloneBaseCommand);

List<WaitStrategy> waitStrategies = new ArrayList<>();
waitStrategies.add(Wait.defaultWaitStrategy());
waitStrategies.add(Wait.forHttp(METRICS_ENDPOINT).forStatusCode(200).forPort(BROKER_HTTP_PORT));
if (transactionsEnabled) {
withEnv("PULSAR_PREFIX_transactionCoordinatorEnabled", "true");
waitStrategies.add(Wait.forHttp(TRANSACTION_TOPIC_ENDPOINT).forStatusCode(200).forPort(BROKER_HTTP_PORT));
}
if (functionsWorkerEnabled) {
waitStrategies.add(Wait.forLogMessage(".*Function worker service started.*", 1));
}
final WaitAllStrategy compoundedWaitStrategy = new WaitAllStrategy();
waitStrategies.forEach(compoundedWaitStrategy::withStrategy);
waitingFor(compoundedWaitStrategy);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.junit.Test;
import org.testcontainers.utility.DockerImageName;

Expand All @@ -19,36 +22,103 @@ public class PulsarContainerTest {

public static final String TEST_TOPIC = "test_topic";

private static final DockerImageName PULSAR_IMAGE = DockerImageName.parse("apachepulsar/pulsar:2.2.0");
private static final DockerImageName PULSAR_IMAGE = DockerImageName.parse("apachepulsar/pulsar:2.10.0");

@Test
public void testUsage() throws Exception {
try (PulsarContainer pulsar = new PulsarContainer(PULSAR_IMAGE)) {
try (
// do not use PULSAR_IMAGE to make the doc looks easier
// constructorWithVersion {
PulsarContainer pulsar = new PulsarContainer(DockerImageName.parse("apachepulsar/pulsar:2.10.0"));
// }
) {
pulsar.start();
// coordinates {
final String pulsarBrokerUrl = pulsar.getPulsarBrokerUrl();
final String httpServiceUrl = pulsar.getHttpServiceUrl();
// }
testPulsarFunctionality(pulsarBrokerUrl);
}
}

@Test
public void envVarsUsage() throws Exception {
try (
// constructorWithEnv {
PulsarContainer pulsar = new PulsarContainer(PULSAR_IMAGE)
.withEnv("PULSAR_PREFIX_brokerDeduplicationEnabled", "true");
// }
) {
pulsar.start();
testPulsarFunctionality(pulsar.getPulsarBrokerUrl());
}
}

@Test
public void shouldNotEnableFunctionsWorkerByDefault() throws Exception {
try (PulsarContainer pulsar = new PulsarContainer("2.5.1")) {
try (PulsarContainer pulsar = new PulsarContainer(PULSAR_IMAGE)) {
pulsar.start();

PulsarAdmin pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl(pulsar.getHttpServiceUrl()).build();

assertThatThrownBy(() -> pulsarAdmin.functions().getFunctions("public", "default"))
.isInstanceOf(PulsarAdminException.class);
try (PulsarAdmin pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl(pulsar.getHttpServiceUrl()).build()) {
assertThatThrownBy(() -> pulsarAdmin.functions().getFunctions("public", "default"))
.isInstanceOf(PulsarAdminException.class);
}
}
}

@Test
public void shouldWaitForFunctionsWorkerStarted() throws Exception {
try (PulsarContainer pulsar = new PulsarContainer("2.5.1").withFunctionsWorker()) {
try (
// constructorWithFunctionsWorker {
PulsarContainer pulsar = new PulsarContainer(PULSAR_IMAGE).withFunctionsWorker();
// }
) {
pulsar.start();

try (PulsarAdmin pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl(pulsar.getHttpServiceUrl()).build()) {
assertThat(pulsarAdmin.functions().getFunctions("public", "default")).hasSize(0);
}
}
}

@Test
public void testTransactions() throws Exception {
try (
// constructorWithTransactions {
PulsarContainer pulsar = new PulsarContainer(PULSAR_IMAGE).withTransactions();
// }
) {
pulsar.start();

PulsarAdmin pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl(pulsar.getHttpServiceUrl()).build();
try (PulsarAdmin pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl(pulsar.getHttpServiceUrl()).build()) {
assertThat(
pulsarAdmin
.topics()
.getList("pulsar/system")
.contains("persistent://pulsar/system/transaction_coordinator_assign-partition-0")
)
.isTrue();
}
testTransactionFunctionality(pulsar.getPulsarBrokerUrl());
}
}

@Test
public void testTransactionsAndFunctionsWorker() throws Exception {
try (PulsarContainer pulsar = new PulsarContainer(PULSAR_IMAGE).withTransactions().withFunctionsWorker()) {
pulsar.start();

assertThat(pulsarAdmin.functions().getFunctions("public", "default")).hasSize(0);
try (PulsarAdmin pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl(pulsar.getHttpServiceUrl()).build();) {
assertThat(
pulsarAdmin
.topics()
.getList("pulsar/system")
.contains("persistent://pulsar/system/transaction_coordinator_assign-partition-0")
)
.isTrue();
assertThat(pulsarAdmin.functions().getFunctions("public", "default")).hasSize(0);
}
testTransactionFunctionality(pulsar.getPulsarBrokerUrl());
}
}

Expand All @@ -65,4 +135,27 @@ protected void testPulsarFunctionality(String pulsarBrokerUrl) throws Exception
assertThat(new String(message.getData())).isEqualTo("test containers");
}
}

protected void testTransactionFunctionality(String pulsarBrokerUrl) throws Exception {
try (
PulsarClient client = PulsarClient.builder().serviceUrl(pulsarBrokerUrl).enableTransaction(true).build();
Consumer<String> consumer = client
.newConsumer(Schema.STRING)
.topic("transaction-topic")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscriptionName("test-transaction-sub")
.subscribe();
Producer<String> producer = client
.newProducer(Schema.STRING)
.sendTimeout(0, TimeUnit.SECONDS)
.topic("transaction-topic")
.create()
) {
final Transaction transaction = client.newTransaction().build().get();
producer.newMessage(transaction).value("first").send();
transaction.commit();
Message<String> message = consumer.receive();
assertThat(message.getValue()).isEqualTo("first");
}
}
}

0 comments on commit 2afe714

Please sign in to comment.