diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java index baaf7dda7e..bae44f1e4c 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java @@ -331,13 +331,15 @@ public void sendOffsetsToTransaction(Map offs @Override public void sendOffsetsToTransaction(Map offsets, String consumerGroupId) { - @SuppressWarnings("unchecked") - KafkaResourceHolder resourceHolder = (KafkaResourceHolder) TransactionSynchronizationManager - .getResource(this.producerFactory); - Assert.isTrue(resourceHolder != null, "No transaction in process"); - if (resourceHolder.getProducer() != null) { - resourceHolder.getProducer().sendOffsetsToTransaction(offsets, consumerGroupId); + Producer producer = this.producers.get(); + if (producer == null) { + @SuppressWarnings("unchecked") + KafkaResourceHolder resourceHolder = (KafkaResourceHolder) TransactionSynchronizationManager + .getResource(this.producerFactory); + Assert.isTrue(resourceHolder != null, "No transaction in process"); + producer = resourceHolder.getProducer(); } + producer.sendOffsetsToTransaction(offsets, consumerGroupId); } protected void closeProducer(Producer producer, boolean inLocalTx) { diff --git a/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java b/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java index aa96f76724..83eabf0494 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java @@ -38,11 +38,13 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.MockProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.assertj.core.api.Assertions; @@ -77,10 +79,13 @@ public class KafkaTemplateTransactionTests { private static final String STRING_KEY_TOPIC = "stringKeyTopic"; + private static final String LOCAL_TX_IN_TOPIC = "localTxInTopic"; + @ClassRule - public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, STRING_KEY_TOPIC) - .brokerProperty(KafkaConfig.TransactionsTopicReplicationFactorProp(), "1") - .brokerProperty(KafkaConfig.TransactionsTopicMinISRProp(), "1"); + public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, STRING_KEY_TOPIC, + LOCAL_TX_IN_TOPIC) + .brokerProperty(KafkaConfig.TransactionsTopicReplicationFactorProp(), "1") + .brokerProperty(KafkaConfig.TransactionsTopicMinISRProp(), "1"); @Test public void testLocalTransaction() throws Exception { @@ -93,13 +98,22 @@ public void testLocalTransaction() throws Exception { template.setDefaultTopic(STRING_KEY_TOPIC); Map consumerProps = KafkaTestUtils.consumerProps("testTxString", "false", embeddedKafka); consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + consumerProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(consumerProps); cf.setKeyDeserializer(new StringDeserializer()); Consumer consumer = cf.createConsumer(); - embeddedKafka.consumeFromAnEmbeddedTopic(consumer, STRING_KEY_TOPIC); + embeddedKafka.consumeFromAllEmbeddedTopics(consumer); + template.executeInTransaction(kt -> kt.send(LOCAL_TX_IN_TOPIC, "one")); + ConsumerRecord singleRecord = KafkaTestUtils.getSingleRecord(consumer, LOCAL_TX_IN_TOPIC); template.executeInTransaction(t -> { t.sendDefault("foo", "bar"); t.sendDefault("baz", "qux"); + t.sendOffsetsToTransaction(Collections.singletonMap( + new TopicPartition(LOCAL_TX_IN_TOPIC, singleRecord.partition()), + new OffsetAndMetadata(singleRecord.offset() + 1L)), "testLocalTx"); + assertThat(KafkaTestUtils.getPropertyValue( + KafkaTestUtils.getPropertyValue(template, "producers", ThreadLocal.class).get(), + "delegate.transactionManager.transactionalId")).isEqualTo("my.transaction.0"); return null; }); ConsumerRecords records = KafkaTestUtils.getRecords(consumer); @@ -112,6 +126,8 @@ public void testLocalTransaction() throws Exception { } record = iterator.next(); assertThat(record).has(Assertions.>allOf(key("baz"), value("qux"))); + // 2 log slots, 1 for the record, 1 for the commit + assertThat(consumer.position(new TopicPartition(LOCAL_TX_IN_TOPIC, singleRecord.partition()))).isEqualTo(2L); consumer.close(); assertThat(KafkaTestUtils.getPropertyValue(pf, "cache", BlockingQueue.class).size()).isEqualTo(1); pf.destroy(); @@ -210,8 +226,8 @@ public void testNoTx() { KafkaTemplate template = new KafkaTemplate<>(pf); template.setDefaultTopic(STRING_KEY_TOPIC); assertThatThrownBy(() -> template.send("foo", "bar")) - .isInstanceOf(IllegalStateException.class) - .hasMessageContaining("No transaction is in process;"); + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("No transaction is in process;"); } @Test