Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ micrometer-docs-gen = "1.0.4"
micrometer-tracing = "1.6.0-RC1"
protobuf = "3.25.8"
pulsar = "4.1.1"
spring = "7.0.0-M9"
spring = "7.0.0-RC1"
# tests
assertj = "3.27.6"
awaitility = "4.3.0"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,6 @@
import org.springframework.pulsar.transaction.PulsarTransactionUtils;
import org.springframework.scheduling.SchedulingAwareRunnable;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.TransactionCallbackWithoutResult;
import org.springframework.transaction.support.TransactionTemplate;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
Expand Down Expand Up @@ -604,14 +602,10 @@ private Observation newObservation(Message<T> message) {
private void dispatchMessageToListenerInTxn(Message<T> message, AtomicBoolean inRetryMode) {
try {
requireNonNull(this.transactionTemplate, "transactionTemplate must not be null")
.execute(new TransactionCallbackWithoutResult() {
@Override
protected void doInTransactionWithoutResult(TransactionStatus status) {
RuntimeException aborted = dispatchMessageToListener(message, inRetryMode,
getTransaction());
if (aborted != null) {
throw aborted;
}
.executeWithoutResult((status) -> {
RuntimeException aborted = dispatchMessageToListener(message, inRetryMode, getTransaction());
if (aborted != null) {
throw aborted;
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ protected boolean isPulsarMessageList() {
return this.isPulsarMessageList;
}

@SuppressWarnings("removal")
public void setBeanResolver(BeanResolver beanResolver) {
this.evaluationContext.setBeanResolver(beanResolver);
this.evaluationContext.setTypeConverter(new StandardTypeConverter());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.PulsarContainer;
import org.testcontainers.junit.jupiter.Testcontainers;
Expand Down Expand Up @@ -252,11 +253,13 @@ void recordListenerWithBatchAckNotSupported() {
.withMessage("Transactional record listeners can not use batch ack mode");
}

@Disabled("Flaky test see spring-pulsar/issues/1294")
@Test
void batchListenerUsesBatchAckWhenSharedSub() throws Exception {
batchListenerUsesProperBatchAckForSubscriptionType("batch-lstr-batch-ack", SubscriptionType.Shared);
}

@Disabled("Flaky test see spring-pulsar/issues/1294")
@Test
void batchListenerUsesCumulativeAckWhenNotSharedSub() throws Exception {
batchListenerUsesProperBatchAckForSubscriptionType("batch-lstr-cumltv-ack", SubscriptionType.Exclusive);
Expand Down