diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/config/ConcurrentPulsarListenerContainerFactory.java b/spring-pulsar/src/main/java/org/springframework/pulsar/config/ConcurrentPulsarListenerContainerFactory.java index a84d07667..5a947ea72 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/config/ConcurrentPulsarListenerContainerFactory.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/config/ConcurrentPulsarListenerContainerFactory.java @@ -74,8 +74,15 @@ protected ConcurrentPulsarMessageListenerContainer createContainerInstance(Pu PulsarContainerProperties properties = new PulsarContainerProperties(); properties.setSchemaResolver(this.getContainerProperties().getSchemaResolver()); properties.setTopicResolver(this.getContainerProperties().getTopicResolver()); - properties.setTransactionManager(this.getContainerProperties().getTransactionManager()); - properties.setTransactionDefinition(this.getContainerProperties().getTransactionDefinition()); + + var parentTxnProps = this.getContainerProperties().transactions(); + var childTxnProps = properties.transactions(); + childTxnProps.setEnabled(parentTxnProps.isEnabled()); + childTxnProps.setRequired(parentTxnProps.isRequired()); + childTxnProps.setTimeout(parentTxnProps.getTimeout()); + childTxnProps.setTransactionDefinition(parentTxnProps.getTransactionDefinition()); + childTxnProps.setTransactionManager(parentTxnProps.getTransactionManager()); + if (!CollectionUtils.isEmpty(endpoint.getTopics())) { properties.setTopics(new HashSet<>(endpoint.getTopics())); } diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/config/MethodPulsarListenerEndpoint.java b/spring-pulsar/src/main/java/org/springframework/pulsar/config/MethodPulsarListenerEndpoint.java index fea77f5cc..860397714 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/config/MethodPulsarListenerEndpoint.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/config/MethodPulsarListenerEndpoint.java @@ -45,6 +45,7 @@ import org.springframework.pulsar.listener.ConcurrentPulsarMessageListenerContainer; import org.springframework.pulsar.listener.PulsarConsumerErrorHandler; import org.springframework.pulsar.listener.PulsarContainerProperties; +import org.springframework.pulsar.listener.PulsarContainerProperties.TransactionSettings; import org.springframework.pulsar.listener.PulsarMessageListenerContainer; import org.springframework.pulsar.listener.adapter.AbstractPulsarMessageToSpringMessageAdapter; import org.springframework.pulsar.listener.adapter.HandlerAdapter; @@ -88,7 +89,7 @@ public class MethodPulsarListenerEndpoint extends AbstractPulsarListenerEndpo private ConsumerBuilderCustomizer consumerBuilderCustomizer; - private boolean transactional = true; + private Boolean transactional; public void setBean(Object bean) { this.bean = bean; @@ -175,7 +176,7 @@ protected AbstractPulsarMessageToSpringMessageAdapter createMessageListener( topicResolver.resolveTopic(null, messageType.getRawClass(), () -> null) .ifResolved((topic) -> pulsarContainerProperties.setTopics(Set.of(topic))); } - configureTransactions(pulsarContainerProperties); + validateAndAdjustTransactionSettings(pulsarContainerProperties.transactions()); container.setNegativeAckRedeliveryBackoff(this.negativeAckRedeliveryBackoff); container.setAckTimeoutRedeliveryBackoff(this.ackTimeoutRedeliveryBackoff); container.setDeadLetterPolicy(this.deadLetterPolicy); @@ -186,17 +187,24 @@ protected AbstractPulsarMessageToSpringMessageAdapter createMessageListener( return messageListener; } - private void configureTransactions(PulsarContainerProperties containerProps) { - if (!this.transactional) { - this.logger.debug(() -> "Listener w/ id [%s] requested no transactions - setting txn mgr to null" - .formatted(this.getId())); - containerProps.setTransactionManager(null); + private void validateAndAdjustTransactionSettings(TransactionSettings txnProps) { + // If user did not specify transactional attribute do nothing + if (this.transactional == null) { return; } - if (containerProps.getTransactionManager() == null) { + Assert.state(!(txnProps.isRequired() && !this.transactional), + "Listener w/ id [%s] requested no transactions but txn are required".formatted(this.getId())); + if (!this.transactional) { + this.logger.debug(() -> "Listener w/ id [%s] requested no transactions".formatted(this.getId())); + txnProps.setEnabled(false); + } + else if (txnProps.getTransactionManager() == null) { this.logger.warn(() -> "Listener w/ id [%s] requested transactions but no txn mgr available" .formatted(this.getId())); } + else { + txnProps.setEnabled(true); + } } private ResolvableType resolvableType(MethodParameter methodParameter) { @@ -281,11 +289,11 @@ public void setConsumerBuilderCustomizer(ConsumerBuilderCustomizer consumerBu this.consumerBuilderCustomizer = consumerBuilderCustomizer; } - public boolean isTransactional() { + public Boolean getTransactional() { return this.transactional; } - public void setTransactional(boolean transactional) { + public void setTransactional(Boolean transactional) { this.transactional = transactional; } diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarTemplate.java b/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarTemplate.java index 681757990..3af60be50 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarTemplate.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarTemplate.java @@ -16,7 +16,6 @@ package org.springframework.pulsar.core; -import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -101,19 +100,9 @@ public class PulsarTemplate private String beanName = ""; /** - * Whether this template supports transactions. + * Transaction settings. */ - private boolean transactional; - - /** - * Whether this template allows non-transactional operations. - */ - private boolean allowNonTransactional = true; - - /** - * The timeout to use for any transactions that originate from the template. - */ - private Duration transactionTimeout; + private final TransactionProperties transactionProps = new TransactionProperties(); /** * Construct a template instance without interceptors that uses the default schema @@ -166,31 +155,12 @@ public void setApplicationContext(ApplicationContext applicationContext) { } /** - * Sets whether the template supports transactional operations. - * @param transactional whether the template supports transactional operations - * @since 1.1.0 - */ - public void setTransactional(boolean transactional) { - this.transactional = transactional; - } - - /** - * Sets whether the template supports non-transactional operations. - * @param allowNonTransactional whether the template supports non-transactional - * operations - * @since 1.1.0 - */ - public void setAllowNonTransactional(boolean allowNonTransactional) { - this.allowNonTransactional = allowNonTransactional; - } - - /** - * Sets the timeout to use for any transactions that originate from the template. - * @param transactionTimeout the timeout + * Gets the transaction properties. + * @return the transaction properties * @since 1.1.0 */ - public void setTransactionTimeout(Duration transactionTimeout) { - this.transactionTimeout = transactionTimeout; + public TransactionProperties transactions() { + return this.transactionProps; } /** @@ -340,11 +310,12 @@ private Observation newObservation(PulsarMessageSenderContext senderContext) { @Nullable private Transaction getTransaction() { - if (!this.transactional) { + if (!this.transactions().isEnabled()) { return null; } + boolean allowNonTransactional = !this.transactions().isRequired(); boolean inTransaction = inTransaction(); - Assert.state(this.allowNonTransactional || inTransaction, + Assert.state(allowNonTransactional || inTransaction, "No transaction is in process; " + "possible solutions: run the template operation within the scope of a " + "template.executeInTransaction() operation, start a transaction with @Transactional " @@ -363,7 +334,7 @@ private Transaction getTransaction() { // or there is an actual active transaction that we need to sync a Pulsar txn with // hence the call to 'obtainResourceHolder' rather than 'getResourceHolder' var resourceHolder = PulsarTransactionUtils.obtainResourceHolder(this.producerFactory.getPulsarClient(), - this.transactionTimeout); + this.transactions().getTimeout()); return resourceHolder.getTransaction(); } @@ -373,7 +344,7 @@ private Transaction getTransaction() { * @return whether the template is currently running in a transaction */ private boolean inTransaction() { - if (!this.transactional) { + if (!this.transactions().isEnabled()) { return false; } return this.threadBoundTransactions.get(Thread.currentThread()) != null @@ -405,7 +376,7 @@ private Producer prepareProducerForSend(@Nullable String topic, @Nullable T m @Nullable public R executeInTransaction(TemplateCallback callback) { Assert.notNull(callback, "callback must not be null"); - Assert.state(this.transactional, "This template does not support transactions"); + Assert.state(this.transactions().isEnabled(), "This template does not support transactions"); var currentThread = Thread.currentThread(); var txn = this.threadBoundTransactions.get(currentThread); Assert.state(txn == null, "Nested calls to 'executeInTransaction' are not allowed"); @@ -430,8 +401,9 @@ public R executeInTransaction(TemplateCallback callback) { private Transaction newPulsarTransaction() { try { var txnBuilder = this.producerFactory.getPulsarClient().newTransaction(); - if (this.transactionTimeout != null) { - txnBuilder.withTransactionTimeout(this.transactionTimeout.toSeconds(), TimeUnit.SECONDS); + if (this.transactions().getTimeout() != null) { + long timeoutSecs = this.transactions().getTimeout().toSeconds(); + txnBuilder.withTransactionTimeout(timeoutSecs, TimeUnit.SECONDS); } return txnBuilder.build().get(); } diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/core/TransactionProperties.java b/spring-pulsar/src/main/java/org/springframework/pulsar/core/TransactionProperties.java new file mode 100644 index 000000000..c0738366e --- /dev/null +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/core/TransactionProperties.java @@ -0,0 +1,69 @@ +/* + * Copyright 2023-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.pulsar.core; + +import java.time.Duration; + +/** + * Common transaction settings for components. + * + * @author Chris Bono + * @since 1.1.0 + */ +public class TransactionProperties { + + /** + * Whether the component supports transactions. + */ + private boolean enabled; + + /** + * Whether the component requires transactions. + */ + private boolean required; + + /** + * Duration representing the transaction timeout - null to use default timeout of the + * underlying transaction system, or none if timeouts are not supported. + */ + private Duration timeout; + + public boolean isEnabled() { + return this.enabled; + } + + public void setEnabled(boolean enabled) { + this.enabled = enabled; + } + + public boolean isRequired() { + return this.required; + } + + public void setRequired(boolean required) { + this.required = required; + } + + public Duration getTimeout() { + return this.timeout; + } + + public void setTimeout(Duration timeout) { + this.timeout = timeout; + } + +} diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/listener/DefaultPulsarMessageListenerContainer.java b/spring-pulsar/src/main/java/org/springframework/pulsar/listener/DefaultPulsarMessageListenerContainer.java index be7d1a5a5..3d34e09a7 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/listener/DefaultPulsarMessageListenerContainer.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/listener/DefaultPulsarMessageListenerContainer.java @@ -64,6 +64,7 @@ import org.springframework.pulsar.event.ConsumerFailedToStartEvent; import org.springframework.pulsar.event.ConsumerStartedEvent; import org.springframework.pulsar.event.ConsumerStartingEvent; +import org.springframework.pulsar.listener.PulsarContainerProperties.TransactionSettings; import org.springframework.pulsar.observation.DefaultPulsarListenerObservationConvention; import org.springframework.pulsar.observation.PulsarListenerObservation; import org.springframework.pulsar.observation.PulsarMessageReceiverContext; @@ -255,18 +256,9 @@ private final class Listener implements SchedulingAwareRunnable { this.isBatchListener = this.containerProperties.isBatchListener(); this.ackMode = this.containerProperties.getAckMode(); this.subscriptionType = this.containerProperties.getSubscriptionType(); - this.transactionManager = this.containerProperties.getTransactionManager(); + validateTransactionSettings(this.containerProperties.transactions()); + this.transactionManager = this.containerProperties.transactions().getTransactionManager(); this.transactionTemplate = determineTransactionTemplate(); - - var txnRecordListenerWithBatchAckMode = (this.transactionManager != null && !this.isBatchListener - && this.containerProperties.getAckMode() == AckMode.BATCH); - Assert.state(!(txnRecordListenerWithBatchAckMode), - "Transactional record listeners can not use batch ack mode"); - - var batchListenerWithRecordAckMode = (this.isBatchListener - && this.containerProperties.getAckMode() == AckMode.RECORD); - Assert.state(!(batchListenerWithRecordAckMode), "Batch record listeners do not support AckMode.RECORD"); - if (messageListener instanceof PulsarBatchMessageListener) { this.batchMessageListener = (PulsarBatchMessageListener) messageListener; this.listener = null; @@ -323,13 +315,32 @@ else if (messageListener != null) { } } + private void validateTransactionSettings(TransactionSettings txnProps) { + if (!txnProps.isEnabled()) { + return; + } + var missingRequiredTxnMgr = txnProps.isRequired() && txnProps.getTransactionManager() == null; + Assert.state(!missingRequiredTxnMgr, "Transactions are required but txn manager is null"); + + var txnRecordListenerWithBatchAckMode = (txnProps.getTransactionManager() != null && !this.isBatchListener + && this.containerProperties.getAckMode() == AckMode.BATCH); + Assert.state(!(txnRecordListenerWithBatchAckMode), + "Transactional record listeners can not use batch ack mode"); + + var batchListenerWithRecordAckMode = (this.isBatchListener + && this.containerProperties.getAckMode() == AckMode.RECORD); + Assert.state(!(batchListenerWithRecordAckMode), "Batch record listeners do not support AckMode.RECORD"); + + // TODO custom errorHandler w/ transactions not supported + } + @Nullable private TransactionTemplate determineTransactionTemplate() { if (this.transactionManager == null) { return null; } - TransactionTemplate template = new TransactionTemplate(this.transactionManager); - TransactionDefinition definition = this.containerProperties.getTransactionDefinition(); + var template = new TransactionTemplate(this.transactionManager); + var definition = this.containerProperties.transactions().determineTransactionDefinition(); Assert.state( definition == null || definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/listener/PulsarContainerProperties.java b/spring-pulsar/src/main/java/org/springframework/pulsar/listener/PulsarContainerProperties.java index 817688c2f..1699744cb 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/listener/PulsarContainerProperties.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/listener/PulsarContainerProperties.java @@ -31,9 +31,11 @@ import org.springframework.pulsar.core.DefaultTopicResolver; import org.springframework.pulsar.core.SchemaResolver; import org.springframework.pulsar.core.TopicResolver; +import org.springframework.pulsar.core.TransactionProperties; import org.springframework.pulsar.observation.PulsarListenerObservationConvention; import org.springframework.pulsar.transaction.PulsarAwareTransactionManager; import org.springframework.transaction.TransactionDefinition; +import org.springframework.transaction.support.DefaultTransactionDefinition; import org.springframework.util.Assert; import io.micrometer.observation.ObservationRegistry; @@ -93,11 +95,7 @@ public class PulsarContainerProperties { private Properties pulsarConsumerProperties = new Properties(); - @Nullable - private TransactionDefinition transactionDefinition; - - @Nullable - private PulsarAwareTransactionManager transactionManager; + private final TransactionSettings transactions = new TransactionSettings(); public PulsarContainerProperties(String... topics) { this.topics = Set.of(topics); @@ -284,42 +282,12 @@ public void setPulsarConsumerProperties(Properties pulsarConsumerProperties) { } /** - * Get the transaction definition. - * @return the definition + * Gets the transaction settings. + * @return the transaction settings * @since 1.1.0 */ - @Nullable - public TransactionDefinition getTransactionDefinition() { - return this.transactionDefinition; - } - - /** - * Set a transaction definition with properties (e.g. timeout) that will be copied to - * the container's transaction template. - * @param transactionDefinition the definition - * @since 1.1.0 - */ - public void setTransactionDefinition(@Nullable TransactionDefinition transactionDefinition) { - this.transactionDefinition = transactionDefinition; - } - - /** - * Gets the transaction manager used to start transactions. - * @return the transaction manager - * @since 1.1.0 - */ - @Nullable - public PulsarAwareTransactionManager getTransactionManager() { - return this.transactionManager; - } - - /** - * Set the transaction manager to start a transaction. - * @param transactionManager the transaction manager - * @since 1.1.0 - */ - public void setTransactionManager(@Nullable PulsarAwareTransactionManager transactionManager) { - this.transactionManager = transactionManager; + public TransactionSettings transactions() { + return this.transactions; } public void updateContainerProperties() { @@ -335,4 +303,71 @@ private void applyPropIfSpecified(String key, Consumer setter) { } } + /** + * Transaction related settings. + * + * @since 1.1.0 + */ + public static class TransactionSettings extends TransactionProperties { + + @Nullable + private TransactionDefinition transactionDefinition; + + @Nullable + private PulsarAwareTransactionManager transactionManager; + + /** + * Get the transaction definition. + * @return the definition + */ + @Nullable + public TransactionDefinition getTransactionDefinition() { + return this.transactionDefinition; + } + + /** + * Set a transaction definition with properties (e.g. timeout) that will be copied + * to the container's transaction template. + * @param transactionDefinition the definition + */ + public void setTransactionDefinition(@Nullable TransactionDefinition transactionDefinition) { + this.transactionDefinition = transactionDefinition; + } + + /** + * Determines the transaction definition to use by respecting any user configured + * timeout property. + * @return the transaction definition to use including any user specified timeout + * setting + */ + public TransactionDefinition determineTransactionDefinition() { + var timeout = this.getTimeout(); + if (timeout == null) { + return this.transactionDefinition; + } + var txnDef = (this.transactionDefinition != null) + ? new DefaultTransactionDefinition(this.transactionDefinition) : new DefaultTransactionDefinition(); + txnDef.setTimeout(Math.toIntExact(timeout.toSeconds())); + return txnDef; + } + + /** + * Gets the transaction manager used to start transactions. + * @return the transaction manager + */ + @Nullable + public PulsarAwareTransactionManager getTransactionManager() { + return this.transactionManager; + } + + /** + * Set the transaction manager to start a transaction. + * @param transactionManager the transaction manager + */ + public void setTransactionManager(@Nullable PulsarAwareTransactionManager transactionManager) { + this.transactionManager = transactionManager; + } + + } + } diff --git a/spring-pulsar/src/test/java/org/springframework/pulsar/listener/DefaultPulsarMessageListenerContainerTests.java b/spring-pulsar/src/test/java/org/springframework/pulsar/listener/DefaultPulsarMessageListenerContainerTests.java index 37fa734cd..0fab6435e 100644 --- a/spring-pulsar/src/test/java/org/springframework/pulsar/listener/DefaultPulsarMessageListenerContainerTests.java +++ b/spring-pulsar/src/test/java/org/springframework/pulsar/listener/DefaultPulsarMessageListenerContainerTests.java @@ -404,6 +404,7 @@ void deadLetterPolicyCustom() throws Exception { void batchListenerWithRecordAckModeNotSupported() { var containerProps = new PulsarContainerProperties(); containerProps.setSchema(Schema.STRING); + containerProps.transactions().setEnabled(true); containerProps.setBatchListener(true); containerProps.setAckMode(AckMode.RECORD); containerProps.setMessageListener((PulsarBatchMessageListener) (consumer, msgs) -> { diff --git a/spring-pulsar/src/test/java/org/springframework/pulsar/listener/DefaultPulsarMessageListenerContainerTxnTests.java b/spring-pulsar/src/test/java/org/springframework/pulsar/listener/DefaultPulsarMessageListenerContainerTxnTests.java index d4d42387c..9114a33f4 100644 --- a/spring-pulsar/src/test/java/org/springframework/pulsar/listener/DefaultPulsarMessageListenerContainerTxnTests.java +++ b/spring-pulsar/src/test/java/org/springframework/pulsar/listener/DefaultPulsarMessageListenerContainerTxnTests.java @@ -87,13 +87,10 @@ void tearDown() throws PulsarClientException { void recordListenerWithAutoRecordAck() throws Exception { var topicIn = topicIn("rec-lstnr-auto-rec-ack"); var topicOut = topicOut("rec-lstnr-auto-rec-ack"); - var containerProps = new PulsarContainerProperties(); - containerProps.setSchema(Schema.STRING); - containerProps.setTransactionManager(transactionManager); - containerProps.setAckMode(AckMode.RECORD); + var containerProps = newContainerProps(); var listenerLatch = new CountDownLatch(1); containerProps.setMessageListener((PulsarRecordMessageListener) (consumer, msg) -> { - pulsarTemplate.setTransactional(true); + pulsarTemplate.transactions().setEnabled(true); pulsarTemplate.send(topicOut, msg.getValue() + "-out"); listenerLatch.countDown(); }); @@ -105,13 +102,10 @@ void recordListenerWithAutoRecordAck() throws Exception { void recordListenerWithAutoRecordAckAndRollback() throws Exception { var topicIn = topicIn("rec-lstnr-auto-rec-ack-rb"); var topicOut = topicOut("rec-lstnr-auto-rec-ack-rb"); - var containerProps = new PulsarContainerProperties(); - containerProps.setSchema(Schema.STRING); - containerProps.setTransactionManager(transactionManager); - containerProps.setAckMode(AckMode.RECORD); + var containerProps = newContainerProps(); var listenerLatch = new CountDownLatch(1); containerProps.setMessageListener((PulsarRecordMessageListener) (consumer, msg) -> { - pulsarTemplate.setTransactional(true); + pulsarTemplate.transactions().setEnabled(true); pulsarTemplate.send(topicOut, msg.getValue() + "-out"); PulsarTransactionUtils.getResourceHolder(client).setRollbackOnly(); listenerLatch.countDown(); @@ -124,13 +118,11 @@ void recordListenerWithAutoRecordAckAndRollback() throws Exception { void recordListenerWithManualRecordAck() throws Exception { var topicIn = topicIn("rec-lstnr-manu-rec-ack"); var topicOut = topicOut("rec-lstnr-manu-rec-ack"); - var containerProps = new PulsarContainerProperties(); - containerProps.setSchema(Schema.STRING); - containerProps.setTransactionManager(transactionManager); + var containerProps = newContainerProps(); containerProps.setAckMode(AckMode.MANUAL); var listenerLatch = new CountDownLatch(1); containerProps.setMessageListener((PulsarAcknowledgingMessageListener) (consumer, msg, ack) -> { - pulsarTemplate.setTransactional(true); + pulsarTemplate.transactions().setEnabled(true); pulsarTemplate.send(topicOut, msg.getValue() + "-out"); ack.acknowledge(); listenerLatch.countDown(); @@ -143,13 +135,11 @@ void recordListenerWithManualRecordAck() throws Exception { void recordListenerWithManualRecordAckAndRollback() throws Exception { var topicIn = topicIn("rec-lstnr-manu-rec-ack-rb"); var topicOut = topicOut("rec-lstnr-manu-rec-ack-rb"); - var containerProps = new PulsarContainerProperties(); - containerProps.setSchema(Schema.STRING); - containerProps.setTransactionManager(transactionManager); + var containerProps = newContainerProps(); containerProps.setAckMode(AckMode.MANUAL); var listenerLatch = new CountDownLatch(1); containerProps.setMessageListener((PulsarAcknowledgingMessageListener) (consumer, msg, ack) -> { - pulsarTemplate.setTransactional(true); + pulsarTemplate.transactions().setEnabled(true); pulsarTemplate.send(topicOut, msg.getValue() + "-out"); ack.acknowledge(); PulsarTransactionUtils.getResourceHolder(client).setRollbackOnly(); @@ -163,13 +153,10 @@ void recordListenerWithManualRecordAckAndRollback() throws Exception { void recordListenerThrowsException() throws Exception { var topicIn = topicIn("rec-lstnr-throws-ex"); var topicOut = topicOut("rec-lstnr-throws-ex"); - var containerProps = new PulsarContainerProperties(); - containerProps.setSchema(Schema.STRING); - containerProps.setTransactionManager(transactionManager); - containerProps.setAckMode(AckMode.RECORD); + var containerProps = newContainerProps(); var listenerLatch = new CountDownLatch(1); containerProps.setMessageListener((PulsarRecordMessageListener) (consumer, msg) -> { - pulsarTemplate.setTransactional(true); + pulsarTemplate.transactions().setEnabled(true); pulsarTemplate.send(topicOut, msg.getValue() + "-out"); listenerLatch.countDown(); throw new RuntimeException("BOOM"); @@ -182,13 +169,10 @@ void recordListenerThrowsException() throws Exception { void recordListenerWithNestedTxn() throws Exception { var topicIn = topicIn("rec-lstnr-nested-txn"); var topicOut = topicOut("rec-lstnr-nested-txn"); - var containerProps = new PulsarContainerProperties(); - containerProps.setSchema(Schema.STRING); - containerProps.setTransactionManager(transactionManager); - containerProps.setAckMode(AckMode.RECORD); + var containerProps = newContainerProps(); var listenerLatch = new CountDownLatch(1); containerProps.setMessageListener((PulsarRecordMessageListener) (consumer, msg) -> { - pulsarTemplate.setTransactional(true); + pulsarTemplate.transactions().setEnabled(true); pulsarTemplate.executeInTransaction((t) -> t.send(topicOut, msg.getValue() + "-out")); listenerLatch.countDown(); }); @@ -200,13 +184,10 @@ void recordListenerWithNestedTxn() throws Exception { void recordListenerWithNestedTxnAndRollback() throws Exception { var topicIn = topicIn("rec-lstnr-nested-txn-rb"); var topicOut = topicOut("rec-lstnr-nested-txn-rb"); - var containerProps = new PulsarContainerProperties(); - containerProps.setSchema(Schema.STRING); - containerProps.setTransactionManager(transactionManager); - containerProps.setAckMode(AckMode.RECORD); + var containerProps = newContainerProps(); var listenerLatch = new CountDownLatch(1); containerProps.setMessageListener((PulsarRecordMessageListener) (consumer, msg) -> { - pulsarTemplate.setTransactional(true); + pulsarTemplate.transactions().setEnabled(true); pulsarTemplate.executeInTransaction((t) -> t.send(topicOut, msg.getValue() + "-out")); PulsarTransactionUtils.getResourceHolder(client).setRollbackOnly(); listenerLatch.countDown(); @@ -219,14 +200,11 @@ void recordListenerWithNestedTxnAndRollback() throws Exception { void recordListenerWithMultipleMessages() throws Exception { var topicIn = topicIn("rec-lstnr-multi-msg"); var topicOut = topicOut("rec-lstnr-multi-msg"); - var containerProps = new PulsarContainerProperties(); - containerProps.setSchema(Schema.STRING); - containerProps.setTransactionManager(transactionManager); - containerProps.setAckMode(AckMode.RECORD); + var containerProps = newContainerProps(); var inputMsgs = List.of("msg1", "msg2", "msg3"); var listenerLatch = new CountDownLatch(inputMsgs.size()); containerProps.setMessageListener((PulsarRecordMessageListener) (consumer, msg) -> { - pulsarTemplate.setTransactional(true); + pulsarTemplate.transactions().setEnabled(true); pulsarTemplate.send(topicOut, msg.getValue() + "-out"); listenerLatch.countDown(); }); @@ -239,14 +217,11 @@ void recordListenerWithMultipleMessages() throws Exception { void recordListenerWithMultipleMessagesAndRollback() throws Exception { var topicIn = topicIn("rec-lstnr-multi-msg-rb"); var topicOut = topicOut("rec-lstnr-multi-msg-rb"); - var containerProps = new PulsarContainerProperties(); - containerProps.setSchema(Schema.STRING); - containerProps.setTransactionManager(transactionManager); - containerProps.setAckMode(AckMode.RECORD); + var containerProps = newContainerProps(); var inputMsgs = List.of("msg1", "msg2", "msg3"); var listenerLatch = new CountDownLatch(inputMsgs.size()); containerProps.setMessageListener((PulsarRecordMessageListener) (consumer, msg) -> { - pulsarTemplate.setTransactional(true); + pulsarTemplate.transactions().setEnabled(true); pulsarTemplate.send(topicOut, msg.getValue() + "-out"); listenerLatch.countDown(); if (msg.getValue().equals("msg2")) { @@ -260,9 +235,7 @@ void recordListenerWithMultipleMessagesAndRollback() throws Exception { @Test void recordListenerWithBatchAckNotSupported() { - var containerProps = new PulsarContainerProperties(); - containerProps.setSchema(Schema.STRING); - containerProps.setTransactionManager(transactionManager); + var containerProps = newContainerProps(); containerProps.setAckMode(AckMode.BATCH); containerProps.setMessageListener((PulsarRecordMessageListener) (consumer, msg) -> { throw new RuntimeException("should never get here"); @@ -277,9 +250,7 @@ void recordListenerWithBatchAckNotSupported() { void batchListenerUsesBatchAckWhenSharedSub() throws Exception { var topicIn = topicIn("batch-lstr-batch-ack"); var topicOut = topicOut("batch-lstr-batch-ack"); - var containerProps = new PulsarContainerProperties(); - containerProps.setSchema(Schema.STRING); - containerProps.setTransactionManager(transactionManager); + var containerProps = newContainerProps(); containerProps.setBatchListener(true); containerProps.setAckMode(AckMode.BATCH); containerProps.setSubscriptionType(SubscriptionType.Shared); @@ -287,7 +258,7 @@ void batchListenerUsesBatchAckWhenSharedSub() throws Exception { var listenerLatch = new CountDownLatch(1); containerProps.setMessageListener((PulsarBatchMessageListener) (consumer, msgs) -> { assertThat(msgs.size()).isEqualTo(inputMsgs.size()); - pulsarTemplate.setTransactional(true); + pulsarTemplate.transactions().setEnabled(true); msgs.forEach((msg) -> pulsarTemplate.send(topicOut, msg.getValue() + "-out")); listenerLatch.countDown(); }); @@ -302,9 +273,7 @@ void batchListenerUsesBatchAckWhenSharedSub() throws Exception { void batchListenerUsesCumulativeAckWhenNotSharedSub() throws Exception { var topicIn = topicIn("batch-lstr-cumltv-ack"); var topicOut = topicOut("batch-lstr-cumltv-ack"); - var containerProps = new PulsarContainerProperties(); - containerProps.setSchema(Schema.STRING); - containerProps.setTransactionManager(transactionManager); + var containerProps = newContainerProps(); containerProps.setBatchListener(true); containerProps.setAckMode(AckMode.BATCH); containerProps.setSubscriptionType(SubscriptionType.Exclusive); @@ -312,7 +281,7 @@ void batchListenerUsesCumulativeAckWhenNotSharedSub() throws Exception { var listenerLatch = new CountDownLatch(1); containerProps.setMessageListener((PulsarBatchMessageListener) (consumer, msgs) -> { assertThat(msgs.size()).isEqualTo(inputMsgs.size()); - pulsarTemplate.setTransactional(true); + pulsarTemplate.transactions().setEnabled(true); msgs.forEach((msg) -> pulsarTemplate.send(topicOut, msg.getValue() + "-out")); listenerLatch.countDown(); }); @@ -327,16 +296,14 @@ void batchListenerUsesCumulativeAckWhenNotSharedSub() throws Exception { void batchListenerThrowsException() throws Exception { var topicIn = topicIn("batch-lstr-throws-ex"); var topicOut = topicOut("batch-lstr-throws-ex"); - var containerProps = new PulsarContainerProperties(); - containerProps.setSchema(Schema.STRING); - containerProps.setTransactionManager(transactionManager); + var containerProps = newContainerProps(); containerProps.setBatchListener(true); containerProps.setAckMode(AckMode.BATCH); var inputMsgs = List.of("msg1", "msg2", "msg3"); var listenerLatch = new CountDownLatch(1); containerProps.setMessageListener((PulsarBatchMessageListener) (consumer, msgs) -> { assertThat(msgs.size()).isEqualTo(inputMsgs.size()); - pulsarTemplate.setTransactional(true); + pulsarTemplate.transactions().setEnabled(true); msgs.forEach((msg) -> pulsarTemplate.send(topicOut, msg.getValue() + "-out")); listenerLatch.countDown(); throw new RuntimeException("NOPE"); @@ -349,9 +316,7 @@ void batchListenerThrowsException() throws Exception { void batchListenerWithTxnMarkedForRollback() throws Exception { var topicIn = topicIn("batch-lstr-rollback"); var topicOut = topicOut("batch-lstr-rollback"); - var containerProps = new PulsarContainerProperties(); - containerProps.setSchema(Schema.STRING); - containerProps.setTransactionManager(transactionManager); + var containerProps = newContainerProps(); containerProps.setBatchListener(true); containerProps.setAckMode(AckMode.BATCH); containerProps.setSubscriptionType(SubscriptionType.Exclusive); @@ -359,7 +324,7 @@ void batchListenerWithTxnMarkedForRollback() throws Exception { var listenerLatch = new CountDownLatch(1); containerProps.setMessageListener((PulsarBatchMessageListener) (consumer, msgs) -> { assertThat(msgs.size()).isEqualTo(inputMsgs.size()); - pulsarTemplate.setTransactional(true); + pulsarTemplate.transactions().setEnabled(true); msgs.forEach((msg) -> pulsarTemplate.send(topicOut, msg.getValue() + "-out")); PulsarTransactionUtils.getResourceHolder(client).setRollbackOnly(); listenerLatch.countDown(); @@ -372,16 +337,14 @@ void batchListenerWithTxnMarkedForRollback() throws Exception { void batchListenerWithNestedProduceTxn() throws Exception { var topicIn = topicIn("batch-lstr-nested-txn"); var topicOut = topicOut("batch-lstr-nested-txn"); - var containerProps = new PulsarContainerProperties(); - containerProps.setSchema(Schema.STRING); - containerProps.setTransactionManager(transactionManager); + var containerProps = newContainerProps(); containerProps.setBatchListener(true); containerProps.setAckMode(AckMode.BATCH); var inputMsgs = List.of("msg1", "msg2", "msg3"); var listenerLatch = new CountDownLatch(1); containerProps.setMessageListener((PulsarBatchMessageListener) (consumer, msgs) -> { assertThat(msgs.size()).isEqualTo(inputMsgs.size()); - pulsarTemplate.setTransactional(true); + pulsarTemplate.transactions().setEnabled(true); msgs.forEach((msg) -> { if (msg.getValue().equals("msg2")) { pulsarTemplate.executeInTransaction((t) -> t.send(topicOut, msg.getValue() + "-out")); @@ -402,16 +365,14 @@ void batchListenerWithNestedProduceTxn() throws Exception { void batchListenerWithManualAck() throws Exception { var topicIn = topicIn("batch-lstr-man-ack"); var topicOut = topicOut("batch-lstr-man-ack"); - var containerProps = new PulsarContainerProperties(); - containerProps.setSchema(Schema.STRING); - containerProps.setTransactionManager(transactionManager); + var containerProps = newContainerProps(); containerProps.setBatchListener(true); containerProps.setAckMode(AckMode.MANUAL); var inputMsgs = List.of("msg1", "msg2", "msg3"); var listenerLatch = new CountDownLatch(1); containerProps.setMessageListener((PulsarBatchAcknowledgingMessageListener) (consumer, msgs, ack) -> { assertThat(msgs.size()).isEqualTo(inputMsgs.size()); - pulsarTemplate.setTransactional(true); + pulsarTemplate.transactions().setEnabled(true); msgs.forEach((msg) -> pulsarTemplate.send(topicOut, msg.getValue() + "-out")); ack.acknowledge(msgs.stream().map(Message::getMessageId).toList()); listenerLatch.countDown(); @@ -425,16 +386,14 @@ void batchListenerWithManualAck() throws Exception { void batchListenerWithManualAckAndRollback() throws Exception { var topicIn = topicIn("batch-lstr-man-ack-rb"); var topicOut = topicOut("batch-lstr-man-ack-rb"); - var containerProps = new PulsarContainerProperties(); - containerProps.setSchema(Schema.STRING); - containerProps.setTransactionManager(transactionManager); + var containerProps = newContainerProps(); containerProps.setBatchListener(true); containerProps.setAckMode(AckMode.MANUAL); var inputMsgs = List.of("msg1", "msg2", "msg3"); var listenerLatch = new CountDownLatch(1); containerProps.setMessageListener((PulsarBatchAcknowledgingMessageListener) (consumer, msgs, ack) -> { assertThat(msgs.size()).isEqualTo(inputMsgs.size()); - pulsarTemplate.setTransactional(true); + pulsarTemplate.transactions().setEnabled(true); msgs.forEach((msg) -> pulsarTemplate.send(topicOut, msg.getValue() + "-out")); ack.acknowledge(msgs.stream().map(Message::getMessageId).toList()); PulsarTransactionUtils.getResourceHolder(client).setRollbackOnly(); @@ -464,7 +423,7 @@ private void startContainerAndSendInputsThenWaitForLatch(String topicIn, PulsarC var container = new DefaultPulsarMessageListenerContainer<>(consumerFactory, containerProps); try { container.start(); - pulsarTemplate.setTransactional(false); + pulsarTemplate.transactions().setEnabled(false); if (sendInBatch) { inputMsgs.forEach((msg) -> pulsarTemplate.newMessage(msg) .withTopic(topicIn) @@ -503,6 +462,15 @@ private void assertMessagesAvailableInOutputTopic(String topicOut, List .get()).map(Message::getValue).containsExactlyInAnyOrderElementsOf(expectedMessages); } + private PulsarContainerProperties newContainerProps() { + var containerProps = new PulsarContainerProperties(); + containerProps.setSchema(Schema.STRING); + containerProps.transactions().setEnabled(true); + containerProps.transactions().setTransactionManager(transactionManager); + containerProps.setAckMode(AckMode.RECORD); + return containerProps; + } + private String topicIn(String testInfo) { return "dpmlctt-%s-in".formatted(testInfo); } diff --git a/spring-pulsar/src/test/java/org/springframework/pulsar/listener/PulsarListenerTxnTests.java b/spring-pulsar/src/test/java/org/springframework/pulsar/listener/PulsarListenerTxnTests.java index 96f5081b2..f257147f8 100644 --- a/spring-pulsar/src/test/java/org/springframework/pulsar/listener/PulsarListenerTxnTests.java +++ b/spring-pulsar/src/test/java/org/springframework/pulsar/listener/PulsarListenerTxnTests.java @@ -81,7 +81,7 @@ class ListenerWithExternalTransaction { @Test void producedMessageIsCommitted() throws Exception { - pulsarTemplate.setTransactional(false); + pulsarTemplate.transactions().setEnabled(false); pulsarTemplate.sendAsync(topicIn, "msg1"); assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue(); assertMessagesAvailableInOutputTopic(topicOut, "msg1-out"); @@ -97,7 +97,7 @@ static class ListenerWithExternalTransactionConfig { @Transactional @PulsarListener(topics = topicIn, ackMode = AckMode.RECORD) void listen(String msg) { - template.setTransactional(true); + template.transactions().setEnabled(true); template.send(topicOut, msg + "-out"); latch.countDown(); } @@ -116,7 +116,7 @@ class ListenerWithExternalTransactionRollback { @Test void producedMessageIsNotCommitted() throws Exception { - pulsarTemplate.setTransactional(false); + pulsarTemplate.transactions().setEnabled(false); pulsarTemplate.sendAsync(topicIn, "msg1"); assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue(); assertNoMessagesAvailableInOutputTopic(topicOut); @@ -132,7 +132,7 @@ static class ListenerWithExternalTransactionRollbackConfig { @Transactional @PulsarListener(topics = topicIn, ackMode = AckMode.RECORD) void listen(String msg) { - template.setTransactional(true); + template.transactions().setEnabled(true); template.send(topicOut, msg + "-out"); latch.countDown(); throw new RuntimeException("BOOM"); @@ -152,7 +152,7 @@ class RecordListenerWithCommit { @Test void producedMessageIsCommitted() throws Exception { - pulsarTemplate.setTransactional(false); + pulsarTemplate.transactions().setEnabled(false); pulsarTemplate.sendAsync(topicIn, "msg1"); assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue(); assertMessagesAvailableInOutputTopic(topicOut, "msg1-out"); @@ -167,7 +167,7 @@ static class RecordListenerWithCommitConfig { @PulsarListener(topics = topicIn, ackMode = AckMode.RECORD) void listen(String msg) { - template.setTransactional(true); + template.transactions().setEnabled(true); template.send(topicOut, msg + "-out"); latch.countDown(); } @@ -186,7 +186,7 @@ class RecordListenerWithRollback { @Test void producedMessageIsNotCommitted() throws Exception { - pulsarTemplate.setTransactional(false); + pulsarTemplate.transactions().setEnabled(false); pulsarTemplate.sendAsync(topicIn, "msg1"); assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue(); assertNoMessagesAvailableInOutputTopic(topicOut); @@ -201,7 +201,7 @@ static class RecordListenerWithRollbackConfig { @PulsarListener(topics = topicIn, ackMode = AckMode.RECORD) void listen(String msg) { - template.setTransactional(true); + template.transactions().setEnabled(true); template.send(topicOut, msg + "-out"); latch.countDown(); throw new RuntimeException("BOOM-record"); @@ -222,7 +222,7 @@ class BatchListenerWithCommit { @Test void producedMessagesAreCommitted() throws Exception { - pulsarTemplate.setTransactional(false); + pulsarTemplate.transactions().setEnabled(false); inputMsgs.forEach((msg) -> pulsarTemplate.newMessage(msg) .withTopic(topicIn) .withProducerCustomizer((pb) -> pb.enableBatching(true) @@ -244,7 +244,7 @@ static class BatchListenerWithCommitConfig { @PulsarListener(topics = topicIn, batch = true) void listen(List msgs) { assertThat(msgs.size()).isEqualTo(inputMsgs.size()); - template.setTransactional(true); + template.transactions().setEnabled(true); msgs.forEach((msg) -> template.send(topicOut, msg + "-out")); latch.countDown(); } @@ -264,7 +264,7 @@ class BatchListenerWithRollback { @Test void producedMessagesAreNotCommitted() throws Exception { - pulsarTemplate.setTransactional(false); + pulsarTemplate.transactions().setEnabled(false); inputMsgs.forEach((msg) -> pulsarTemplate.newMessage(msg) .withTopic(topicIn) .withProducerCustomizer((pb) -> pb.enableBatching(true) @@ -285,7 +285,7 @@ static class BatchListenerWithRollbackConfig { @PulsarListener(topics = topicIn, batch = true) void listen(List msgs) { assertThat(msgs.size()).isEqualTo(inputMsgs.size()); - template.setTransactional(true); + template.transactions().setEnabled(true); msgs.forEach((msg) -> template.send(topicOut, msg + "-out")); latch.countDown(); throw new RuntimeException("BOOM-batch"); diff --git a/spring-pulsar/src/test/java/org/springframework/pulsar/listener/PulsarTxnTestsBase.java b/spring-pulsar/src/test/java/org/springframework/pulsar/listener/PulsarTxnTestsBase.java index cb499d832..77d016b71 100644 --- a/spring-pulsar/src/test/java/org/springframework/pulsar/listener/PulsarTxnTestsBase.java +++ b/spring-pulsar/src/test/java/org/springframework/pulsar/listener/PulsarTxnTestsBase.java @@ -86,7 +86,7 @@ PulsarClient pulsarClient() { @Bean PulsarTemplate pulsarTemplate(PulsarProducerFactory pulsarProducerFactory) { var template = new PulsarTemplate<>(pulsarProducerFactory); - template.setTransactional(true); + template.transactions().setEnabled(true); return template; } @@ -102,7 +102,7 @@ PulsarListenerContainerFactory pulsarListenerContainerFactory( PulsarConsumerFactory pulsarConsumerFactory, PulsarAwareTransactionManager pulsarTransactionManager) { var containerProps = new PulsarContainerProperties(); - containerProps.setTransactionManager(pulsarTransactionManager); + containerProps.transactions().setTransactionManager(pulsarTransactionManager); return new ConcurrentPulsarListenerContainerFactory<>(pulsarConsumerFactory, containerProps); } diff --git a/spring-pulsar/src/test/java/org/springframework/pulsar/transaction/PulsarTemplateLocalTransactionTests.java b/spring-pulsar/src/test/java/org/springframework/pulsar/transaction/PulsarTemplateLocalTransactionTests.java index 0f922c9b8..85b85015d 100644 --- a/spring-pulsar/src/test/java/org/springframework/pulsar/transaction/PulsarTemplateLocalTransactionTests.java +++ b/spring-pulsar/src/test/java/org/springframework/pulsar/transaction/PulsarTemplateLocalTransactionTests.java @@ -84,7 +84,7 @@ void tearDown() throws PulsarClientException { private PulsarTemplate newTransactionalTemplate() { var senderFactory = new DefaultPulsarProducerFactory(client, null); var pulsarTemplate = new PulsarTemplate<>(senderFactory); - pulsarTemplate.setTransactional(true); + pulsarTemplate.transactions().setEnabled(true); return pulsarTemplate; } @@ -159,7 +159,7 @@ void nestedTransactionsNotAllowed() { @Test void transactionsNotAllowedWithNonTransactionalTemplate() { var pulsarTemplate = newTransactionalTemplate(); - pulsarTemplate.setTransactional(false); + pulsarTemplate.transactions().setEnabled(false); assertThatIllegalStateException().isThrownBy(() -> pulsarTemplate.executeInTransaction((template) -> "boom")) .withMessage("This template does not support transactions"); } @@ -176,7 +176,7 @@ private void assertMessagesCommitted(String topic, List expectedMsgs) { void sendFailsWhenNotInTxnAndAllowNonTxnFlagIsFalse() { String topic = "pttt-no-txn-topic"; var pulsarTemplate = newTransactionalTemplate(); - pulsarTemplate.setAllowNonTransactional(false); + pulsarTemplate.transactions().setRequired(true); assertThatIllegalStateException().isThrownBy(() -> pulsarTemplate.send(topic, "msg1")) .withMessageStartingWith("No transaction is in process; possible solutions: run"); } @@ -187,7 +187,7 @@ void sendSucceedsWhenNotInTxnAndAllowNonTxnFlagIsNotFalse(boolean shouldExplicit String topic = "pttt-no-txn-%s-topic".formatted(shouldExplicitlySetAllowNonTxnFlag); var pulsarTemplate = newTransactionalTemplate(); if (shouldExplicitlySetAllowNonTxnFlag) { - pulsarTemplate.setAllowNonTransactional(true); + pulsarTemplate.transactions().setRequired(false); } pulsarTemplate.send(topic, "msg1"); assertMessagesCommitted(topic, List.of("msg1"));