Skip to content

Commit

Permalink
Merge pull request spring-projects#40189 from onobc
Browse files Browse the repository at this point in the history
* pr/40189:
  Add Spring Pulsar transaction support

Closes spring-projectsgh-40189
  • Loading branch information
philwebb committed Apr 18, 2024
2 parents 07f8274 + 08ad7aa commit d55eb5b
Show file tree
Hide file tree
Showing 7 changed files with 182 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@
import org.springframework.pulsar.core.TopicResolver;
import org.springframework.pulsar.listener.PulsarContainerProperties;
import org.springframework.pulsar.reader.PulsarReaderContainerProperties;
import org.springframework.pulsar.transaction.PulsarAwareTransactionManager;
import org.springframework.pulsar.transaction.PulsarTransactionManager;

/**
* {@link EnableAutoConfiguration Auto-configuration} for Apache Pulsar.
Expand Down Expand Up @@ -126,8 +128,11 @@ private void applyProducerBuilderCustomizers(List<ProducerBuilderCustomizer<?>>
PulsarTemplate<?> pulsarTemplate(PulsarProducerFactory<?> pulsarProducerFactory,
ObjectProvider<ProducerInterceptor> producerInterceptors, SchemaResolver schemaResolver,
TopicResolver topicResolver) {
return new PulsarTemplate<>(pulsarProducerFactory, producerInterceptors.orderedStream().toList(),
schemaResolver, topicResolver, this.properties.getTemplate().isObservationsEnabled());
PulsarTemplate<?> template = new PulsarTemplate<>(pulsarProducerFactory,
producerInterceptors.orderedStream().toList(), schemaResolver, topicResolver,
this.properties.getTemplate().isObservationsEnabled());
this.propertiesMapper.customizeTemplate(template);
return template;
}

@Bean
Expand All @@ -142,6 +147,13 @@ DefaultPulsarConsumerFactory<?> pulsarConsumerFactory(PulsarClient pulsarClient,
return new DefaultPulsarConsumerFactory<>(pulsarClient, lambdaSafeCustomizers);
}

@Bean
@ConditionalOnMissingBean(PulsarAwareTransactionManager.class)
@ConditionalOnProperty(prefix = "spring.pulsar.transaction", name = "enabled")
public PulsarTransactionManager pulsarTransactionManager(PulsarClient pulsarClient) {
return new PulsarTransactionManager(pulsarClient);
}

@SuppressWarnings("unchecked")
private void applyConsumerBuilderCustomizers(List<ConsumerBuilderCustomizer<?>> customizers,
ConsumerBuilder<?> builder) {
Expand All @@ -153,13 +165,15 @@ private void applyConsumerBuilderCustomizers(List<ConsumerBuilderCustomizer<?>>
@ConditionalOnMissingBean(name = "pulsarListenerContainerFactory")
ConcurrentPulsarListenerContainerFactory<?> pulsarListenerContainerFactory(
PulsarConsumerFactory<Object> pulsarConsumerFactory, SchemaResolver schemaResolver,
TopicResolver topicResolver, Environment environment) {
TopicResolver topicResolver, ObjectProvider<PulsarAwareTransactionManager> pulsarTransactionManager,
Environment environment) {
PulsarContainerProperties containerProperties = new PulsarContainerProperties();
containerProperties.setSchemaResolver(schemaResolver);
containerProperties.setTopicResolver(topicResolver);
if (Threading.VIRTUAL.isActive(environment)) {
containerProperties.setConsumerTaskExecutor(new VirtualThreadTaskExecutor("pulsar-consumer-"));
}
pulsarTransactionManager.ifUnique(containerProperties.transactions()::setTransactionManager);
this.propertiesMapper.customizeContainerProperties(containerProperties);
return new ConcurrentPulsarListenerContainerFactory<>(pulsarConsumerFactory, containerProperties);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ public class PulsarProperties {

private final Template template = new Template();

private final Transaction transaction = new Transaction();

public Client getClient() {
return this.client;
}
Expand Down Expand Up @@ -103,6 +105,10 @@ public Template getTemplate() {
return this.template;
}

public Transaction getTransaction() {
return this.transaction;
}

public static class Client {

/**
Expand Down Expand Up @@ -868,6 +874,23 @@ public void setObservationsEnabled(boolean observationsEnabled) {

}

public static class Transaction {

/**
* Whether transaction support is enabled.
*/
private boolean enabled;

public boolean isEnabled() {
return this.enabled;
}

public void setEnabled(boolean enabled) {
this.enabled = enabled;
}

}

public static class Authentication {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.pulsar.client.impl.AutoClusterFailover.AutoClusterFailoverBuilderImpl;

import org.springframework.boot.context.properties.PropertyMapper;
import org.springframework.pulsar.core.PulsarTemplate;
import org.springframework.pulsar.listener.PulsarContainerProperties;
import org.springframework.pulsar.reader.PulsarReaderContainerProperties;
import org.springframework.util.StringUtils;
Expand All @@ -64,6 +65,7 @@ void customizeClientBuilder(ClientBuilder clientBuilder, PulsarConnectionDetails
map.from(properties::getConnectionTimeout).to(timeoutProperty(clientBuilder::connectionTimeout));
map.from(properties::getOperationTimeout).to(timeoutProperty(clientBuilder::operationTimeout));
map.from(properties::getLookupTimeout).to(timeoutProperty(clientBuilder::lookupTimeout));
map.from(this.properties.getTransaction()::isEnabled).whenTrue().to(clientBuilder::enableTransaction);
customizeAuthentication(properties.getAuthentication(), clientBuilder::authentication);
customizeServiceUrlProviderBuilder(clientBuilder::serviceUrl, clientBuilder::serviceUrlProvider, properties,
connectionDetails);
Expand Down Expand Up @@ -157,6 +159,10 @@ <T> void customizeProducerBuilder(ProducerBuilder<T> producerBuilder) {
map.from(properties::getAccessMode).to(producerBuilder::accessMode);
}

<T> void customizeTemplate(PulsarTemplate<T> template) {
template.transactions().setEnabled(this.properties.getTransaction().isEnabled());
}

<T> void customizeConsumerBuilder(ConsumerBuilder<T> consumerBuilder) {
PulsarProperties.Consumer properties = this.properties.getConsumer();
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
Expand All @@ -183,6 +189,7 @@ private void customizeConsumerBuilderSubscription(ConsumerBuilder<?> consumerBui
void customizeContainerProperties(PulsarContainerProperties containerProperties) {
customizePulsarContainerConsumerSubscriptionProperties(containerProperties);
customizePulsarContainerListenerProperties(containerProperties);
containerProperties.transactions().setEnabled(this.properties.getTransaction().isEnabled());
}

private void customizePulsarContainerConsumerSubscriptionProperties(PulsarContainerProperties containerProperties) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@
import org.springframework.pulsar.core.ReaderBuilderCustomizer;
import org.springframework.pulsar.core.SchemaResolver;
import org.springframework.pulsar.core.TopicResolver;
import org.springframework.pulsar.listener.PulsarContainerProperties.TransactionSettings;
import org.springframework.pulsar.transaction.PulsarAwareTransactionManager;
import org.springframework.test.util.ReflectionTestUtils;

import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -330,6 +332,13 @@ void whenObservationsDisabledDoesNotEnableObservation() {
.hasFieldOrPropertyWithValue("observationEnabled", false));
}

@Test
void whenTransactionEnabledTrueEnablesTransactions() {
this.contextRunner.withPropertyValues("spring.pulsar.transaction.enabled=true")
.run((context) -> assertThat(context.getBean(PulsarTemplate.class).transactions().isEnabled())
.isTrue());
}

@Configuration(proxyBeanMethods = false)
static class InterceptorTestConfiguration {

Expand Down Expand Up @@ -525,6 +534,28 @@ void whenVirtualThreadsAreEnabledOnJava20AndEarlierListenerContainerShouldNotUse
});
}

@Test
void whenTransactionEnabledTrueListenerContainerShouldUseTransactions() {
this.contextRunner.withPropertyValues("spring.pulsar.transaction.enabled=true").run((context) -> {
ConcurrentPulsarListenerContainerFactory<?> factory = context
.getBean(ConcurrentPulsarListenerContainerFactory.class);
TransactionSettings transactions = factory.getContainerProperties().transactions();
assertThat(transactions.isEnabled()).isTrue();
assertThat(transactions.getTransactionManager()).isNotNull();
});
}

@Test
void whenTransactionEnabledFalseListenerContainerShouldNotUseTransactions() {
this.contextRunner.withPropertyValues("spring.pulsar.transaction.enabled=false").run((context) -> {
ConcurrentPulsarListenerContainerFactory<?> factory = context
.getBean(ConcurrentPulsarListenerContainerFactory.class);
TransactionSettings transactions = factory.getContainerProperties().transactions();
assertThat(transactions.isEnabled()).isFalse();
assertThat(transactions.getTransactionManager()).isNull();
});
}

}

@Nested
Expand Down Expand Up @@ -603,4 +634,37 @@ ReaderBuilderCustomizer<?> customizerBar() {

}

@Nested
class TransactionManagerTests {

private final ApplicationContextRunner contextRunner = PulsarAutoConfigurationTests.this.contextRunner;

@Test
@SuppressWarnings("unchecked")
void whenUserHasDefinedATransactionManagerTheAutoConfigurationBacksOff() {
PulsarAwareTransactionManager txnMgr = mock(PulsarAwareTransactionManager.class);
this.contextRunner.withBean("customTransactionManager", PulsarAwareTransactionManager.class, () -> txnMgr)
.run((context) -> assertThat(context).getBean(PulsarAwareTransactionManager.class).isSameAs(txnMgr));
}

@Test
void whenNoPropertiesAreSetTransactionManagerShouldNotBeDefined() {
this.contextRunner
.run((context) -> assertThat(context).doesNotHaveBean(PulsarAwareTransactionManager.class));
}

@Test
void whenTransactionEnabledFalseTransactionManagerIsNotAutoConfigured() {
this.contextRunner.withPropertyValues("spring.pulsar.transaction.enabled=false")
.run((context) -> assertThat(context).doesNotHaveBean(PulsarAwareTransactionManager.class));
}

@Test
void whenTransactionEnabledTrueTransactionManagerIsAutoConfigured() {
this.contextRunner.withPropertyValues("spring.pulsar.transaction.enabled=true")
.run((context) -> assertThat(context).hasSingleBean(PulsarAwareTransactionManager.class));
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,20 @@
import org.apache.pulsar.client.impl.AutoClusterFailover;
import org.apache.pulsar.common.schema.SchemaType;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

import org.springframework.boot.autoconfigure.pulsar.PulsarProperties.Consumer;
import org.springframework.boot.autoconfigure.pulsar.PulsarProperties.Failover.BackupCluster;
import org.springframework.pulsar.core.PulsarProducerFactory;
import org.springframework.pulsar.core.PulsarTemplate;
import org.springframework.pulsar.listener.PulsarContainerProperties;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.BDDMockito.given;
import static org.mockito.BDDMockito.then;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;

/**
* Tests for {@link PulsarPropertiesMapper}.
Expand Down Expand Up @@ -87,6 +91,26 @@ void customizeClientBuilderWhenHasAuthentication() throws UnsupportedAuthenticat
then(builder).should().authentication("myclass", authParamString);
}

@Test
void customizeClientBuilderWhenTransactionEnabled() {
PulsarProperties properties = new PulsarProperties();
properties.getTransaction().setEnabled(true);
ClientBuilder builder = mock(ClientBuilder.class);
new PulsarPropertiesMapper(properties).customizeClientBuilder(builder,
new PropertiesPulsarConnectionDetails(properties));
then(builder).should().enableTransaction(true);
}

@Test
void customizeClientBuilderWhenTransactionDisabled() {
PulsarProperties properties = new PulsarProperties();
properties.getTransaction().setEnabled(false);
ClientBuilder builder = mock(ClientBuilder.class);
new PulsarPropertiesMapper(properties).customizeClientBuilder(builder,
new PropertiesPulsarConnectionDetails(properties));
then(builder).should(never()).enableTransaction(anyBoolean());
}

@Test
void customizeClientBuilderWhenHasConnectionDetails() {
PulsarProperties properties = new PulsarProperties();
Expand Down Expand Up @@ -120,7 +144,7 @@ void customizeClientBuilderWhenHasFailover() {
ClientBuilder builder = mock(ClientBuilder.class);
new PulsarPropertiesMapper(properties).customizeClientBuilder(builder,
new PropertiesPulsarConnectionDetails(properties));
then(builder).should().serviceUrlProvider(Mockito.any(AutoClusterFailover.class));
then(builder).should().serviceUrlProvider(any(AutoClusterFailover.class));
}

@Test
Expand Down Expand Up @@ -189,6 +213,16 @@ void customizeProducerBuilder() {
then(builder).should().accessMode(ProducerAccessMode.Exclusive);
}

@Test
@SuppressWarnings("unchecked")
void customizeTemplate() {
PulsarProperties properties = new PulsarProperties();
properties.getTransaction().setEnabled(true);
PulsarTemplate<Object> template = new PulsarTemplate<>(mock(PulsarProducerFactory.class));
new PulsarPropertiesMapper(properties).customizeTemplate(template);
assertThat(template.transactions().isEnabled()).isTrue();
}

@Test
@SuppressWarnings("unchecked")
void customizeConsumerBuilder() {
Expand Down Expand Up @@ -220,11 +254,13 @@ void customizeContainerProperties() {
properties.getConsumer().getSubscription().setType(SubscriptionType.Shared);
properties.getListener().setSchemaType(SchemaType.AVRO);
properties.getListener().setObservationEnabled(true);
properties.getTransaction().setEnabled(true);
PulsarContainerProperties containerProperties = new PulsarContainerProperties("my-topic-pattern");
new PulsarPropertiesMapper(properties).customizeContainerProperties(containerProperties);
assertThat(containerProperties.getSubscriptionType()).isEqualTo(SubscriptionType.Shared);
assertThat(containerProperties.getSchemaType()).isEqualTo(SchemaType.AVRO);
assertThat(containerProperties.isObservationEnabled()).isTrue();
assertThat(containerProperties.transactions().isEnabled()).isTrue();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -396,4 +396,17 @@ void bind() {

}

@Nested
class TransactionProperties {

@Test
void bind() {
Map<String, String> map = new HashMap<>();
map.put("spring.pulsar.transaction.enabled", "true");
PulsarProperties.Transaction properties = bindPropeties(map).getTransaction();
assertThat(properties.isEnabled()).isTrue();
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,26 @@ TIP: For more details on any of the above components and to discover other avail



[[messaging.pulsar.transactions]]
== Transaction Support

Spring for Apache Pulsar supports transactions when using `PulsarTemplate` and `@PulsarListener`.

NOTE: Transactions are not currently supported when using the reactive variants.

Setting the configprop:spring.pulsar.transaction.enabled[] property to `true` will:

* Configure a `PulsarTransactionManager` bean
* Enable transaction support for `PulsarTemplate`
* Enable transaction support for `@PulsarListener` methods

The `transactional` attribute of `@PulsarListener` can be used to fine-tune when transactions should be used with listeners.

For more control of the Spring for Apache Pulsar transaction features you should define your own `PulsarTemplate` and/or `ConcurrentPulsarListenerContainerFactory` beans.
You can also define a `PulsarAwareTransactionManager` bean if the default auto-configured `PulsarTransactionManager` is not suitable.



[[messaging.pulsar.additional-properties]]
== Additional Pulsar Properties

Expand Down

0 comments on commit d55eb5b

Please sign in to comment.