Skip to content

Commit

Permalink
GH-1168: Fix sendOffsets in local transaction
Browse files Browse the repository at this point in the history
Fixes #1168

`sendOffsetsToTransaction` assumed the transaction was started by a
`KafkaTransactionManager` and using `executeInTransaction` failed.

Look for a local transactional producer before checking for a thread-bound transaction.

**cherry-pick to all supported branches**

# Conflicts:
#	spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java

# Conflicts:
#	spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java
  • Loading branch information
garyrussell authored and artembilan committed Jul 18, 2019
1 parent 15ab184 commit 01c9bba
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 12 deletions.
Expand Up @@ -331,13 +331,15 @@ public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offs

@Override
public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) {
@SuppressWarnings("unchecked")
KafkaResourceHolder<K, V> resourceHolder = (KafkaResourceHolder<K, V>) TransactionSynchronizationManager
.getResource(this.producerFactory);
Assert.isTrue(resourceHolder != null, "No transaction in process");
if (resourceHolder.getProducer() != null) {
resourceHolder.getProducer().sendOffsetsToTransaction(offsets, consumerGroupId);
Producer<K, V> producer = this.producers.get();
if (producer == null) {
@SuppressWarnings("unchecked")
KafkaResourceHolder<K, V> resourceHolder = (KafkaResourceHolder<K, V>) TransactionSynchronizationManager
.getResource(this.producerFactory);
Assert.isTrue(resourceHolder != null, "No transaction in process");
producer = resourceHolder.getProducer();
}
producer.sendOffsetsToTransaction(offsets, consumerGroupId);
}

protected void closeProducer(Producer<K, V> producer, boolean inLocalTx) {
Expand Down
Expand Up @@ -38,11 +38,13 @@
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.assertj.core.api.Assertions;
Expand Down Expand Up @@ -77,10 +79,13 @@ public class KafkaTemplateTransactionTests {

private static final String STRING_KEY_TOPIC = "stringKeyTopic";

private static final String LOCAL_TX_IN_TOPIC = "localTxInTopic";

@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, STRING_KEY_TOPIC)
.brokerProperty(KafkaConfig.TransactionsTopicReplicationFactorProp(), "1")
.brokerProperty(KafkaConfig.TransactionsTopicMinISRProp(), "1");
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, STRING_KEY_TOPIC,
LOCAL_TX_IN_TOPIC)
.brokerProperty(KafkaConfig.TransactionsTopicReplicationFactorProp(), "1")
.brokerProperty(KafkaConfig.TransactionsTopicMinISRProp(), "1");

@Test
public void testLocalTransaction() throws Exception {
Expand All @@ -93,13 +98,22 @@ public void testLocalTransaction() throws Exception {
template.setDefaultTopic(STRING_KEY_TOPIC);
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testTxString", "false", embeddedKafka);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
cf.setKeyDeserializer(new StringDeserializer());
Consumer<String, String> consumer = cf.createConsumer();
embeddedKafka.consumeFromAnEmbeddedTopic(consumer, STRING_KEY_TOPIC);
embeddedKafka.consumeFromAllEmbeddedTopics(consumer);
template.executeInTransaction(kt -> kt.send(LOCAL_TX_IN_TOPIC, "one"));
ConsumerRecord<String, String> singleRecord = KafkaTestUtils.getSingleRecord(consumer, LOCAL_TX_IN_TOPIC);
template.executeInTransaction(t -> {
t.sendDefault("foo", "bar");
t.sendDefault("baz", "qux");
t.sendOffsetsToTransaction(Collections.singletonMap(
new TopicPartition(LOCAL_TX_IN_TOPIC, singleRecord.partition()),
new OffsetAndMetadata(singleRecord.offset() + 1L)), "testLocalTx");
assertThat(KafkaTestUtils.getPropertyValue(
KafkaTestUtils.getPropertyValue(template, "producers", ThreadLocal.class).get(),
"delegate.transactionManager.transactionalId")).isEqualTo("my.transaction.0");
return null;
});
ConsumerRecords<String, String> records = KafkaTestUtils.getRecords(consumer);
Expand All @@ -112,6 +126,8 @@ public void testLocalTransaction() throws Exception {
}
record = iterator.next();
assertThat(record).has(Assertions.<ConsumerRecord<String, String>>allOf(key("baz"), value("qux")));
// 2 log slots, 1 for the record, 1 for the commit
assertThat(consumer.position(new TopicPartition(LOCAL_TX_IN_TOPIC, singleRecord.partition()))).isEqualTo(2L);
consumer.close();
assertThat(KafkaTestUtils.getPropertyValue(pf, "cache", BlockingQueue.class).size()).isEqualTo(1);
pf.destroy();
Expand Down Expand Up @@ -210,8 +226,8 @@ public void testNoTx() {
KafkaTemplate<String, String> template = new KafkaTemplate<>(pf);
template.setDefaultTopic(STRING_KEY_TOPIC);
assertThatThrownBy(() -> template.send("foo", "bar"))
.isInstanceOf(IllegalStateException.class)
.hasMessageContaining("No transaction is in process;");
.isInstanceOf(IllegalStateException.class)
.hasMessageContaining("No transaction is in process;");
}

@Test
Expand Down

0 comments on commit 01c9bba

Please sign in to comment.