From 1b53c4b7a96888c15863f7a2157e6c5c28d11151 Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Fri, 13 Apr 2018 14:32:45 -0400 Subject: [PATCH] GH-646: Transaction Improvements Resolves https://github.com/spring-projects/spring-kafka/issues/646 - add protection to the transactional producer cache to avoid multiple cache inserts - add try/catch around `abortTransaction()` - detect (and reject) an illegal call to `template.send()` when there is no existing transaction - from the send callback remove the `ThreadLocal` check (will always be null) and only close the producer if it's not transactional - remove autoFlush from template tests --- .../core/DefaultKafkaProducerFactory.java | 6 ++-- .../kafka/core/KafkaTemplate.java | 30 ++++++++++++++----- .../kafka/core/KafkaAdminBadContextTests.java | 4 +-- .../core/KafkaTemplateTransactionTests.java | 19 ++++++++++-- 4 files changed, 44 insertions(+), 15 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java index ee62df87bb..7b514a9a8b 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2017 the original author or authors. + * Copyright 2016-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -302,8 +302,8 @@ public void abortTransaction() throws ProducerFencedException { } @Override - public void close() { - if (this.cache != null) { + public synchronized void close() { + if (this.cache != null && !this.cache.contains(this)) { this.cache.offer(this); } } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java index eaa977eb8e..e07e34071d 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java @@ -265,10 +265,14 @@ public T executeInTransaction(OperationsCallback callback) { result = callback.doInOperations(this); } catch (Exception e) { - producer.abortTransaction(); - this.producers.remove(); - closeProducer(producer, false); - producer = null; + try { + producer.abortTransaction(); + } + finally { + this.producers.remove(); + closeProducer(producer, false); + producer = null; + } } if (producer != null) { try { @@ -328,12 +332,19 @@ protected void closeProducer(Producer producer, boolean inLocalTx) { * @return a Future for the {@link RecordMetadata}. */ protected ListenableFuture> doSend(final ProducerRecord producerRecord) { + if (this.transactional) { + Assert.state(inTransaction(), + "No transaction is in process; " + + "possible solutions: run the template operation within the scope of a " + + "template.executeInTransaction() operation, start a transaction with @Transactional " + + "before invoking the template method, " + + "run in a transaction started by a listener container when consuming a record"); + } final Producer producer = getTheProducer(); if (this.logger.isTraceEnabled()) { this.logger.trace("Sending: " + producerRecord); } final SettableListenableFuture> future = new SettableListenableFuture<>(); - final boolean inLocalTx = inTransaction(); producer.send(producerRecord, new Callback() { @Override @@ -365,8 +376,8 @@ public void onCompletion(RecordMetadata metadata, Exception exception) { } } finally { - if (KafkaTemplate.this.producers.get() == null) { - closeProducer(producer, inLocalTx); + if (!KafkaTemplate.this.transactional) { + closeProducer(producer, false); } } } @@ -381,8 +392,11 @@ public void onCompletion(RecordMetadata metadata, Exception exception) { return future; } + protected boolean inTransaction() { - return this.producers.get() != null || TransactionSynchronizationManager.isActualTransactionActive(); + return this.transactional && (this.producers.get() != null + || TransactionSynchronizationManager.getResource(this.producerFactory) != null + || TransactionSynchronizationManager.isActualTransactionActive()); } private Producer getTheProducer() { diff --git a/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaAdminBadContextTests.java b/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaAdminBadContextTests.java index 12ebd0be82..ec3195b8ec 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaAdminBadContextTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaAdminBadContextTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2017 the original author or authors. + * Copyright 2017-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -41,7 +41,7 @@ public class KafkaAdminBadContextTests { @Test public void testContextNotLoaded() { try { - new AnnotationConfigApplicationContext(BadConfig.class); + new AnnotationConfigApplicationContext(BadConfig.class).close(); fail("Expected Exception"); } catch (IllegalStateException e) { diff --git a/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java b/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java index 3c38a6876b..094eea4333 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java @@ -17,6 +17,7 @@ package org.springframework.kafka.core; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.BDDMockito.given; @@ -83,7 +84,7 @@ public void testLocalTransaction() throws Exception { DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory<>(senderProps); pf.setKeySerializer(new StringSerializer()); pf.setTransactionIdPrefix("my.transaction."); - KafkaTemplate template = new KafkaTemplate<>(pf, true); + KafkaTemplate template = new KafkaTemplate<>(pf); template.setDefaultTopic(STRING_KEY_TOPIC); Map consumerProps = KafkaTestUtils.consumerProps("testTxString", "false", embeddedKafka); consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); @@ -119,7 +120,7 @@ public void testGlobalTransaction() throws Exception { DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory<>(senderProps); pf.setKeySerializer(new StringSerializer()); pf.setTransactionIdPrefix("my.transaction."); - KafkaTemplate template = new KafkaTemplate<>(pf, true); + KafkaTemplate template = new KafkaTemplate<>(pf); template.setDefaultTopic(STRING_KEY_TOPIC); Map consumerProps = KafkaTestUtils.consumerProps("testTxString", "false", embeddedKafka); consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); @@ -194,6 +195,20 @@ public void testOverrideProducerIdempotentConfig() throws Exception { .get(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG)).isEqualTo(false); } + @Test + public void testNoTx() { + Map senderProps = KafkaTestUtils.producerProps(embeddedKafka); + senderProps.put(ProducerConfig.RETRIES_CONFIG, 1); + DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory<>(senderProps); + pf.setKeySerializer(new StringSerializer()); + pf.setTransactionIdPrefix("my.transaction."); + KafkaTemplate template = new KafkaTemplate<>(pf); + template.setDefaultTopic(STRING_KEY_TOPIC); + assertThatThrownBy(() -> template.send("foo", "bar")) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("No transaction is in process;"); + } + @Configuration @EnableTransactionManagement public static class DeclarativeConfig {