Skip to content

Commit

Permalink
Fix manual acks with transactions
Browse files Browse the repository at this point in the history
- send the offset to the transaction instead of committing via the consumer.
  • Loading branch information
garyrussell committed Mar 17, 2020
1 parent 3aad45a commit 021a29d
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume

private ConsumerRecords<K, V> lastBatch;

private Producer<?, ?> producer;

private volatile boolean consumerPaused;

private volatile Collection<TopicPartition> assignedPartitions;
Expand Down Expand Up @@ -988,7 +990,7 @@ protected void pollAndInvoke() {
this.lastPoll = System.currentTimeMillis();
this.polling.set(true);
ConsumerRecords<K, V> records = doPoll();
if (!this.polling.compareAndSet(true, false)) {
if (!this.polling.compareAndSet(true, false) && records != null) {
/*
* There is a small race condition where wakeIfNecessary was called between
* exiting the poll and before we reset the boolean.
Expand Down Expand Up @@ -1242,7 +1244,10 @@ private void ackImmediate(ConsumerRecord<K, V> record) {
new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1));
this.commitLogger.log(() -> "Committing: " + commits);
if (this.syncCommits) {
if (this.producer != null) {
this.producer.sendOffsetsToTransaction(commits, this.consumerGroupId);
}
else if (this.syncCommits) {
this.consumer.commitSync(commits, this.syncCommitTimeout);
}
else {
Expand Down Expand Up @@ -1292,6 +1297,7 @@ public void doInTransactionWithoutResult(TransactionStatus s) {
producer = ((KafkaResourceHolder) TransactionSynchronizationManager
.getResource(ListenerConsumer.this.kafkaTxManager.getProducerFactory()))
.getProducer(); // NOSONAR nullable
ListenerConsumer.this.producer = producer;
}
RuntimeException aborted = doInvokeBatchListener(records, recordList, producer);
if (aborted != null) {
Expand Down Expand Up @@ -1542,6 +1548,7 @@ public void doInTransactionWithoutResult(TransactionStatus s) {
producer = ((KafkaResourceHolder) TransactionSynchronizationManager
.getResource(ListenerConsumer.this.kafkaTxManager.getProducerFactory()))
.getProducer(); // NOSONAR
ListenerConsumer.this.producer = producer;
}
RuntimeException aborted = doInvokeRecordListener(record, producer, iterator);
if (aborted != null) {
Expand Down Expand Up @@ -1715,7 +1722,7 @@ private void invokeOnMessage(final ConsumerRecord<K, V> record,
checkDeser(record, ErrorHandlingDeserializer2.KEY_DESERIALIZER_EXCEPTION_HEADER);
}
doInvokeOnMessage(record);
if (this.nackSleep < 0) {
if (this.nackSleep < 0 && !this.isManualImmediateAck) {
ackCurrent(record, producer);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2019 the original author or authors.
* Copyright 2017-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -77,6 +77,8 @@
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.core.ProducerFactoryUtils;
import org.springframework.kafka.event.ConsumerStoppedEvent;
import org.springframework.kafka.listener.ContainerProperties.AckMode;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.DefaultKafkaHeaderMapper;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.TopicPartitionOffset;
Expand Down Expand Up @@ -128,21 +130,28 @@ public static void setup() {

@Test
public void testConsumeAndProduceTransactionKTM() throws Exception {
testConsumeAndProduceTransactionGuts(false, false);
testConsumeAndProduceTransactionGuts(false, false, AckMode.RECORD);
}

@Test
public void testConsumeAndProduceTransactionKCTM() throws Exception {
testConsumeAndProduceTransactionGuts(true, false);
testConsumeAndProduceTransactionGuts(true, false, AckMode.RECORD);
}

@Test
public void testConsumeAndProduceTransactionHandleError() throws Exception {
testConsumeAndProduceTransactionGuts(false, true);
testConsumeAndProduceTransactionGuts(false, true, AckMode.RECORD);
}

@Test
public void testConsumeAndProduceTransactionKTMManual() throws Exception {
testConsumeAndProduceTransactionGuts(false, false, AckMode.MANUAL_IMMEDIATE);
}

@SuppressWarnings({ "rawtypes", "unchecked" })
private void testConsumeAndProduceTransactionGuts(boolean chained, boolean handleError) throws Exception {
private void testConsumeAndProduceTransactionGuts(boolean chained, boolean handleError, AckMode ackMode)
throws Exception {

Consumer consumer = mock(Consumer.class);
final TopicPartition topicPartition = new TopicPartition("foo", 0);
willAnswer(i -> {
Expand All @@ -152,14 +161,15 @@ private void testConsumeAndProduceTransactionGuts(boolean chained, boolean handl
}).given(consumer).subscribe(any(Collection.class), any(ConsumerRebalanceListener.class));
ConsumerRecords records = new ConsumerRecords(Collections.singletonMap(topicPartition,
Collections.singletonList(new ConsumerRecord<>("foo", 0, 0, "key", "value"))));
ConsumerRecords empty = new ConsumerRecords(Collections.emptyMap());
final AtomicBoolean done = new AtomicBoolean();
willAnswer(i -> {
if (done.compareAndSet(false, true)) {
return records;
}
else {
Thread.sleep(500);
return null;
return empty;
}
}).given(consumer).poll(any(Duration.class));
ConsumerFactory cf = mock(ConsumerFactory.class);
Expand All @@ -183,15 +193,38 @@ private void testConsumeAndProduceTransactionGuts(boolean chained, boolean handl
ptm = new ChainedKafkaTransactionManager(new SomeOtherTransactionManager(), tm);
}
ContainerProperties props = new ContainerProperties("foo");
props.setAckMode(ackMode);
props.setGroupId("group");
props.setTransactionManager(ptm);
final KafkaTemplate template = new KafkaTemplate(pf);
props.setMessageListener((MessageListener) m -> {
template.send("bar", "baz");
if (handleError) {
throw new RuntimeException("fail");
if (AckMode.MANUAL_IMMEDIATE.equals(ackMode)) {
class AckListener implements AcknowledgingMessageListener {
// not a lambda https://bugs.openjdk.java.net/browse/JDK-8074381

@Override
public void onMessage(ConsumerRecord data, Acknowledgment acknowledgment) {
template.send("bar", "baz");
if (handleError) {
throw new RuntimeException("fail");
}
acknowledgment.acknowledge();
}

@Override
public void onMessage(Object data) {
}

}
});
props.setMessageListener(new AckListener());
}
else {
props.setMessageListener((MessageListener) m -> {
template.send("bar", "baz");
if (handleError) {
throw new RuntimeException("fail");
}
});
}
KafkaMessageListenerContainer container = new KafkaMessageListenerContainer<>(cf, props);
container.setBeanName("commit");
if (handleError) {
Expand Down

0 comments on commit 021a29d

Please sign in to comment.