diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaResourceHolder.java b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaResourceHolder.java index d22d0106b9..6ee404d30d 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaResourceHolder.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaResourceHolder.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2019 the original author or authors. + * Copyright 2017-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -38,6 +38,8 @@ public class KafkaResourceHolder extends ResourceHolderSupport { private final Duration closeTimeout; + private boolean committed; + /** * Construct an instance for the producer. * @param producer the producer. @@ -55,7 +57,10 @@ public Producer getProducer() { } public void commit() { - this.producer.commitTransaction(); + if (!this.committed) { + this.producer.commitTransaction(); + this.committed = true; + } } public void close() { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/ProducerFactoryUtils.java b/spring-kafka/src/main/java/org/springframework/kafka/core/ProducerFactoryUtils.java index 7a5b2c196a..9a670413e5 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/ProducerFactoryUtils.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/ProducerFactoryUtils.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2021 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -153,7 +153,10 @@ protected void processResourceAfterCommit(KafkaResourceHolder resourceHold @Override public void afterCompletion(int status) { try { - if (status != TransactionSynchronization.STATUS_COMMITTED) { + if (status == TransactionSynchronization.STATUS_COMMITTED) { + this.resourceHolder.commit(); + } + else { this.resourceHolder.rollback(); } } diff --git a/spring-kafka/src/test/java/transaction/TransactionSynchronizationTests.java b/spring-kafka/src/test/java/transaction/TransactionSynchronizationTests.java new file mode 100644 index 0000000000..193990ef9d --- /dev/null +++ b/spring-kafka/src/test/java/transaction/TransactionSynchronizationTests.java @@ -0,0 +1,115 @@ +/* + * Copyright 2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package transaction; + +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.BDDMockito.given; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +import org.apache.kafka.clients.producer.Producer; +import org.junit.jupiter.api.Test; + +import org.springframework.core.Ordered; +import org.springframework.kafka.core.KafkaResourceHolder; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.core.ProducerFactoryUtils; +import org.springframework.transaction.TransactionDefinition; +import org.springframework.transaction.TransactionException; +import org.springframework.transaction.support.AbstractPlatformTransactionManager; +import org.springframework.transaction.support.DefaultTransactionStatus; +import org.springframework.transaction.support.TransactionSynchronization; +import org.springframework.transaction.support.TransactionSynchronizationManager; +import org.springframework.transaction.support.TransactionTemplate; + +/** + * @author Gary Russell + * @since 2.9.7 + * + */ +public class TransactionSynchronizationTests { + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + void commitAfterAnotherSyncFails() { + Producer producer = mock(Producer.class); + ProducerFactory pf = mock(ProducerFactory.class); + given(pf.createProducer(any())).willReturn(producer); + assertThatExceptionOfType(RuntimeException.class) + .isThrownBy(() -> + new TransactionTemplate(new TM()).executeWithoutResult(status -> { + KafkaResourceHolder holder = ProducerFactoryUtils.getTransactionalResourceHolder(pf); + TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() { + + @Override + public void afterCommit() { + if (true) { + throw new RuntimeException("Test"); + } + } + + @Override + public int getOrder() { + return Ordered.HIGHEST_PRECEDENCE; + } + + }); + })) + .withMessage("Test"); + verify(producer).beginTransaction(); + verify(producer).commitTransaction(); + verify(producer).close(any()); + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + void onlyOnceCommit() { + Producer producer = mock(Producer.class); + ProducerFactory pf = mock(ProducerFactory.class); + given(pf.createProducer(any())).willReturn(producer); + new TransactionTemplate(new TM()).executeWithoutResult(status -> { + KafkaResourceHolder holder = ProducerFactoryUtils.getTransactionalResourceHolder(pf); + }); + verify(producer).beginTransaction(); + verify(producer).commitTransaction(); + verify(producer).close(any()); + } + + static class TM extends AbstractPlatformTransactionManager { + + @Override + protected Object doGetTransaction() throws TransactionException { + return new Object(); + } + + @Override + protected void doBegin(Object transaction, TransactionDefinition definition) throws TransactionException { + } + + @Override + protected void doCommit(DefaultTransactionStatus status) throws TransactionException { + } + + @Override + protected void doRollback(DefaultTransactionStatus status) throws TransactionException { + } + + } + +} +