Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,15 @@ protected ConcurrentPulsarMessageListenerContainer<T> createContainerInstance(Pu
PulsarContainerProperties properties = new PulsarContainerProperties();
properties.setSchemaResolver(this.getContainerProperties().getSchemaResolver());
properties.setTopicResolver(this.getContainerProperties().getTopicResolver());
properties.setTransactionManager(this.getContainerProperties().getTransactionManager());
properties.setTransactionDefinition(this.getContainerProperties().getTransactionDefinition());

var parentTxnProps = this.getContainerProperties().transactions();
var childTxnProps = properties.transactions();
childTxnProps.setEnabled(parentTxnProps.isEnabled());
childTxnProps.setRequired(parentTxnProps.isRequired());
childTxnProps.setTimeout(parentTxnProps.getTimeout());
childTxnProps.setTransactionDefinition(parentTxnProps.getTransactionDefinition());
childTxnProps.setTransactionManager(parentTxnProps.getTransactionManager());

if (!CollectionUtils.isEmpty(endpoint.getTopics())) {
properties.setTopics(new HashSet<>(endpoint.getTopics()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.springframework.pulsar.listener.ConcurrentPulsarMessageListenerContainer;
import org.springframework.pulsar.listener.PulsarConsumerErrorHandler;
import org.springframework.pulsar.listener.PulsarContainerProperties;
import org.springframework.pulsar.listener.PulsarContainerProperties.TransactionSettings;
import org.springframework.pulsar.listener.PulsarMessageListenerContainer;
import org.springframework.pulsar.listener.adapter.AbstractPulsarMessageToSpringMessageAdapter;
import org.springframework.pulsar.listener.adapter.HandlerAdapter;
Expand Down Expand Up @@ -88,7 +89,7 @@ public class MethodPulsarListenerEndpoint<V> extends AbstractPulsarListenerEndpo

private ConsumerBuilderCustomizer<?> consumerBuilderCustomizer;

private boolean transactional = true;
private Boolean transactional;

public void setBean(Object bean) {
this.bean = bean;
Expand Down Expand Up @@ -175,7 +176,7 @@ protected AbstractPulsarMessageToSpringMessageAdapter<V> createMessageListener(
topicResolver.resolveTopic(null, messageType.getRawClass(), () -> null)
.ifResolved((topic) -> pulsarContainerProperties.setTopics(Set.of(topic)));
}
configureTransactions(pulsarContainerProperties);
validateAndAdjustTransactionSettings(pulsarContainerProperties.transactions());
container.setNegativeAckRedeliveryBackoff(this.negativeAckRedeliveryBackoff);
container.setAckTimeoutRedeliveryBackoff(this.ackTimeoutRedeliveryBackoff);
container.setDeadLetterPolicy(this.deadLetterPolicy);
Expand All @@ -186,17 +187,24 @@ protected AbstractPulsarMessageToSpringMessageAdapter<V> createMessageListener(
return messageListener;
}

private void configureTransactions(PulsarContainerProperties containerProps) {
if (!this.transactional) {
this.logger.debug(() -> "Listener w/ id [%s] requested no transactions - setting txn mgr to null"
.formatted(this.getId()));
containerProps.setTransactionManager(null);
private void validateAndAdjustTransactionSettings(TransactionSettings txnProps) {
// If user did not specify transactional attribute do nothing
if (this.transactional == null) {
return;
}
if (containerProps.getTransactionManager() == null) {
Assert.state(!(txnProps.isRequired() && !this.transactional),
Copy link
Collaborator Author

@onobc onobc Apr 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These variants need tests (to be added in subsequent PR)

"Listener w/ id [%s] requested no transactions but txn are required".formatted(this.getId()));
if (!this.transactional) {
this.logger.debug(() -> "Listener w/ id [%s] requested no transactions".formatted(this.getId()));
txnProps.setEnabled(false);
}
else if (txnProps.getTransactionManager() == null) {
this.logger.warn(() -> "Listener w/ id [%s] requested transactions but no txn mgr available"
.formatted(this.getId()));
}
else {
txnProps.setEnabled(true);
}
}

private ResolvableType resolvableType(MethodParameter methodParameter) {
Expand Down Expand Up @@ -281,11 +289,11 @@ public void setConsumerBuilderCustomizer(ConsumerBuilderCustomizer<?> consumerBu
this.consumerBuilderCustomizer = consumerBuilderCustomizer;
}

public boolean isTransactional() {
public Boolean getTransactional() {
return this.transactional;
}

public void setTransactional(boolean transactional) {
public void setTransactional(Boolean transactional) {
this.transactional = transactional;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package org.springframework.pulsar.core;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -101,19 +100,9 @@ public class PulsarTemplate<T>
private String beanName = "";

/**
* Whether this template supports transactions.
* Transaction settings.
*/
private boolean transactional;

/**
* Whether this template allows non-transactional operations.
*/
private boolean allowNonTransactional = true;

/**
* The timeout to use for any transactions that originate from the template.
*/
private Duration transactionTimeout;
private final TransactionProperties transactionProps = new TransactionProperties();

/**
* Construct a template instance without interceptors that uses the default schema
Expand Down Expand Up @@ -166,31 +155,12 @@ public void setApplicationContext(ApplicationContext applicationContext) {
}

/**
* Sets whether the template supports transactional operations.
* @param transactional whether the template supports transactional operations
* @since 1.1.0
*/
public void setTransactional(boolean transactional) {
this.transactional = transactional;
}

/**
* Sets whether the template supports non-transactional operations.
* @param allowNonTransactional whether the template supports non-transactional
* operations
* @since 1.1.0
*/
public void setAllowNonTransactional(boolean allowNonTransactional) {
this.allowNonTransactional = allowNonTransactional;
}

/**
* Sets the timeout to use for any transactions that originate from the template.
* @param transactionTimeout the timeout
* Gets the transaction properties.
* @return the transaction properties
* @since 1.1.0
*/
public void setTransactionTimeout(Duration transactionTimeout) {
this.transactionTimeout = transactionTimeout;
public TransactionProperties transactions() {
return this.transactionProps;
}

/**
Expand Down Expand Up @@ -340,11 +310,12 @@ private Observation newObservation(PulsarMessageSenderContext senderContext) {

@Nullable
private Transaction getTransaction() {
if (!this.transactional) {
if (!this.transactions().isEnabled()) {
return null;
}
boolean allowNonTransactional = !this.transactions().isRequired();
boolean inTransaction = inTransaction();
Assert.state(this.allowNonTransactional || inTransaction,
Assert.state(allowNonTransactional || inTransaction,
"No transaction is in process; "
+ "possible solutions: run the template operation within the scope of a "
+ "template.executeInTransaction() operation, start a transaction with @Transactional "
Expand All @@ -363,7 +334,7 @@ private Transaction getTransaction() {
// or there is an actual active transaction that we need to sync a Pulsar txn with
// hence the call to 'obtainResourceHolder' rather than 'getResourceHolder'
var resourceHolder = PulsarTransactionUtils.obtainResourceHolder(this.producerFactory.getPulsarClient(),
this.transactionTimeout);
this.transactions().getTimeout());
return resourceHolder.getTransaction();
}

Expand All @@ -373,7 +344,7 @@ private Transaction getTransaction() {
* @return whether the template is currently running in a transaction
*/
private boolean inTransaction() {
if (!this.transactional) {
if (!this.transactions().isEnabled()) {
return false;
}
return this.threadBoundTransactions.get(Thread.currentThread()) != null
Expand Down Expand Up @@ -405,7 +376,7 @@ private Producer<T> prepareProducerForSend(@Nullable String topic, @Nullable T m
@Nullable
public <R> R executeInTransaction(TemplateCallback<T, R> callback) {
Assert.notNull(callback, "callback must not be null");
Assert.state(this.transactional, "This template does not support transactions");
Assert.state(this.transactions().isEnabled(), "This template does not support transactions");
var currentThread = Thread.currentThread();
var txn = this.threadBoundTransactions.get(currentThread);
Assert.state(txn == null, "Nested calls to 'executeInTransaction' are not allowed");
Expand All @@ -430,8 +401,9 @@ public <R> R executeInTransaction(TemplateCallback<T, R> callback) {
private Transaction newPulsarTransaction() {
try {
var txnBuilder = this.producerFactory.getPulsarClient().newTransaction();
if (this.transactionTimeout != null) {
txnBuilder.withTransactionTimeout(this.transactionTimeout.toSeconds(), TimeUnit.SECONDS);
if (this.transactions().getTimeout() != null) {
long timeoutSecs = this.transactions().getTimeout().toSeconds();
txnBuilder.withTransactionTimeout(timeoutSecs, TimeUnit.SECONDS);
}
return txnBuilder.build().get();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright 2023-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.pulsar.core;

import java.time.Duration;

/**
* Common transaction settings for components.
*
* @author Chris Bono
* @since 1.1.0
*/
public class TransactionProperties {

/**
* Whether the component supports transactions.
*/
private boolean enabled;

/**
* Whether the component requires transactions.
*/
private boolean required;

/**
* Duration representing the transaction timeout - null to use default timeout of the
* underlying transaction system, or none if timeouts are not supported.
*/
private Duration timeout;

public boolean isEnabled() {
return this.enabled;
}

public void setEnabled(boolean enabled) {
this.enabled = enabled;
}

public boolean isRequired() {
return this.required;
}

public void setRequired(boolean required) {
this.required = required;
}

public Duration getTimeout() {
return this.timeout;
}

public void setTimeout(Duration timeout) {
this.timeout = timeout;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.springframework.pulsar.event.ConsumerFailedToStartEvent;
import org.springframework.pulsar.event.ConsumerStartedEvent;
import org.springframework.pulsar.event.ConsumerStartingEvent;
import org.springframework.pulsar.listener.PulsarContainerProperties.TransactionSettings;
import org.springframework.pulsar.observation.DefaultPulsarListenerObservationConvention;
import org.springframework.pulsar.observation.PulsarListenerObservation;
import org.springframework.pulsar.observation.PulsarMessageReceiverContext;
Expand Down Expand Up @@ -255,18 +256,9 @@ private final class Listener implements SchedulingAwareRunnable {
this.isBatchListener = this.containerProperties.isBatchListener();
this.ackMode = this.containerProperties.getAckMode();
this.subscriptionType = this.containerProperties.getSubscriptionType();
this.transactionManager = this.containerProperties.getTransactionManager();
validateTransactionSettings(this.containerProperties.transactions());
this.transactionManager = this.containerProperties.transactions().getTransactionManager();
this.transactionTemplate = determineTransactionTemplate();

var txnRecordListenerWithBatchAckMode = (this.transactionManager != null && !this.isBatchListener
&& this.containerProperties.getAckMode() == AckMode.BATCH);
Assert.state(!(txnRecordListenerWithBatchAckMode),
"Transactional record listeners can not use batch ack mode");

var batchListenerWithRecordAckMode = (this.isBatchListener
&& this.containerProperties.getAckMode() == AckMode.RECORD);
Assert.state(!(batchListenerWithRecordAckMode), "Batch record listeners do not support AckMode.RECORD");

if (messageListener instanceof PulsarBatchMessageListener) {
this.batchMessageListener = (PulsarBatchMessageListener<T>) messageListener;
this.listener = null;
Expand Down Expand Up @@ -323,13 +315,32 @@ else if (messageListener != null) {
}
}

private void validateTransactionSettings(TransactionSettings txnProps) {
if (!txnProps.isEnabled()) {
return;
}
var missingRequiredTxnMgr = txnProps.isRequired() && txnProps.getTransactionManager() == null;
Assert.state(!missingRequiredTxnMgr, "Transactions are required but txn manager is null");

var txnRecordListenerWithBatchAckMode = (txnProps.getTransactionManager() != null && !this.isBatchListener
&& this.containerProperties.getAckMode() == AckMode.BATCH);
Assert.state(!(txnRecordListenerWithBatchAckMode),
"Transactional record listeners can not use batch ack mode");

var batchListenerWithRecordAckMode = (this.isBatchListener
&& this.containerProperties.getAckMode() == AckMode.RECORD);
Assert.state(!(batchListenerWithRecordAckMode), "Batch record listeners do not support AckMode.RECORD");

// TODO custom errorHandler w/ transactions not supported
}

@Nullable
private TransactionTemplate determineTransactionTemplate() {
if (this.transactionManager == null) {
return null;
}
TransactionTemplate template = new TransactionTemplate(this.transactionManager);
TransactionDefinition definition = this.containerProperties.getTransactionDefinition();
var template = new TransactionTemplate(this.transactionManager);
var definition = this.containerProperties.transactions().determineTransactionDefinition();
Assert.state(
definition == null
|| definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED
Expand Down
Loading