Skip to content

Commit

Permalink
GH-1278: Zombie Fencing with Batch Listener
Browse files Browse the repository at this point in the history
Resolves #1278

Support transaction per partition with Batch Listeners so the `transactional.id`
is tied to the group/topic/partition.

* Fix javadoc
  • Loading branch information
garyrussell authored and artembilan committed Oct 21, 2019
1 parent 5bd49b3 commit 4855751
Show file tree
Hide file tree
Showing 7 changed files with 518 additions and 9 deletions.
Expand Up @@ -180,6 +180,8 @@ public enum AckMode {

private Duration consumerStartTimout = DEFAULT_CONSUMER_START_TIMEOUT;

private boolean subBatchPerPartition;

/**
* Create properties for a container that will subscribe to the specified topics.
* @param topics the topics.
Expand Down Expand Up @@ -590,6 +592,23 @@ public void setConsumerStartTimout(Duration consumerStartTimout) {
this.consumerStartTimout = consumerStartTimout;
}

public boolean isSubBatchPerPartition() {
return this.subBatchPerPartition;
}

/**
* When using a batch message listener whether to dispatch records by partition (with
* a transaction for each sub batch if transactions are in use) or the complete batch
* received by the {@code poll()}. Useful when using transactions to enable zombie
* fencing, by using a {code transactional.id} that is unique for each
* group/topic/partition.
* @param subBatchPerPartition true for a separate transaction for each partition.
* @since 2.3.2
*/
public void setSubBatchPerPartition(boolean subBatchPerPartition) {
this.subBatchPerPartition = subBatchPerPartition;
}

@Override
public String toString() {
return "ContainerProperties ["
Expand All @@ -611,6 +630,7 @@ public String toString() {
+ ", monitorInterval=" + this.monitorInterval
+ (this.scheduler != null ? ", scheduler=" + this.scheduler : "")
+ ", noPollThreshold=" + this.noPollThreshold
+ ", subBatchPerPartition=" + this.subBatchPerPartition
+ "]";
}

Expand Down
Expand Up @@ -23,6 +23,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
Expand All @@ -37,7 +38,6 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.Consumer;
Expand Down Expand Up @@ -555,13 +555,11 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume

private final MicrometerHolder micrometerHolder;

private Map<TopicPartition, OffsetMetadata> definedPartitions;

private final AtomicBoolean polling = new AtomicBoolean();

private volatile Collection<TopicPartition> assignedPartitions;
private final boolean subBatchPerPartition = this.containerProperties.isSubBatchPerPartition();

private volatile Thread consumerThread;
private Map<TopicPartition, OffsetMetadata> definedPartitions;

private int count;

Expand All @@ -579,8 +577,16 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume

private int nackIndex;

private Iterator<TopicPartition> batchIterator;

private ConsumerRecords<K, V> lastBatch;

private volatile boolean consumerPaused;

private volatile Collection<TopicPartition> assignedPartitions;

private volatile Thread consumerThread;

private volatile long lastPoll = System.currentTimeMillis();

@SuppressWarnings(UNCHECKED)
Expand Down Expand Up @@ -929,7 +935,7 @@ protected void pollAndInvoke() {
checkPaused();
this.lastPoll = System.currentTimeMillis();
this.polling.set(true);
ConsumerRecords<K, V> records = this.consumer.poll(this.pollTimeout);
ConsumerRecords<K, V> records = doPoll();
if (!this.polling.compareAndSet(true, false)) {
/*
* There is a small race condition where wakeIfNecessary was called between
Expand All @@ -953,6 +959,31 @@ protected void pollAndInvoke() {
}
}

private ConsumerRecords<K, V> doPoll() {
ConsumerRecords<K, V> records;
if (this.isBatchListener && this.subBatchPerPartition) {
if (this.batchIterator == null) {
this.lastBatch = this.consumer.poll(this.pollTimeout);
if (this.lastBatch.count() == 0) {
return this.lastBatch;
}
else {
this.batchIterator = this.lastBatch.partitions().iterator();
}
}
TopicPartition next = this.batchIterator.next();
List<ConsumerRecord<K, V>> subBatch = this.lastBatch.records(next);
records = new ConsumerRecords<>(Collections.singletonMap(next, subBatch));
if (!this.batchIterator.hasNext()) {
this.batchIterator = null;
}
}
else {
records = this.consumer.poll(this.pollTimeout);
}
return records;
}

void wakeIfNecessary() {
if (this.polling.getAndSet(false)) {
this.consumer.wakeup();
Expand Down Expand Up @@ -1186,6 +1217,10 @@ private void invokeBatchListenerInTx(final ConsumerRecords<K, V> records,
final List<ConsumerRecord<K, V>> recordList) {

try {
if (this.subBatchPerPartition) {
ConsumerRecord<K, V> record = recordList.get(0);
TransactionSupport.setTransactionIdSuffix(zombieFenceTxIdSuffix(record.topic(), record.partition()));
}
this.transactionTemplate.execute(new TransactionCallbackWithoutResult() {

@Override
Expand Down Expand Up @@ -1221,6 +1256,11 @@ protected void doInTransactionWithoutResult(TransactionStatus status) {
batchAfterRollback(records, recordList, e, afterRollbackProcessorToUse);
}
}
finally {
if (this.subBatchPerPartition) {
TransactionSupport.clearTransactionIdSuffix();
}
}
}

private void batchAfterRollback(final ConsumerRecords<K, V> records,
Expand All @@ -1241,8 +1281,12 @@ private void batchAfterRollback(final ConsumerRecords<K, V> records,
}

private List<ConsumerRecord<K, V>> createRecordList(final ConsumerRecords<K, V> records) {
return StreamSupport.stream(records.spliterator(), false)
.collect(Collectors.toList());
Iterator<ConsumerRecord<K, V>> iterator = records.iterator();
List<ConsumerRecord<K, V>> list = new LinkedList<>();
while (iterator.hasNext()) {
list.add(iterator.next());
}
return list;
}

/**
Expand Down
Expand Up @@ -109,26 +109,32 @@ public void discardRemainingRecordsFromPollAndSeek() throws Exception {
inOrder.verify(this.producer).commitTransaction();
offsets.clear();
offsets.put(new TopicPartition("foo", 0), new OffsetAndMetadata(2L));
inOrder.verify(this.producer).beginTransaction();
inOrder.verify(this.producer).sendOffsetsToTransaction(offsets, CONTAINER_ID);
inOrder.verify(this.producer).commitTransaction();
offsets.clear();
offsets.put(new TopicPartition("foo", 1), new OffsetAndMetadata(1L));
inOrder.verify(this.producer).beginTransaction();
inOrder.verify(this.producer).sendOffsetsToTransaction(offsets, CONTAINER_ID);
inOrder.verify(this.producer).commitTransaction();
inOrder.verify(this.producer).beginTransaction();
inOrder.verify(this.consumer).seek(new TopicPartition("foo", 1), 1L);
inOrder.verify(this.consumer).seek(new TopicPartition("foo", 2), 0L);
inOrder.verify(this.producer).abortTransaction();
inOrder.verify(this.consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT));
offsets.clear();
offsets.put(new TopicPartition("foo", 1), new OffsetAndMetadata(2L));
inOrder.verify(this.producer).beginTransaction();
inOrder.verify(this.producer).sendOffsetsToTransaction(offsets, CONTAINER_ID);
inOrder.verify(this.producer).commitTransaction();
offsets.clear();
offsets.put(new TopicPartition("foo", 2), new OffsetAndMetadata(1L));
inOrder.verify(this.producer).beginTransaction();
inOrder.verify(this.producer).sendOffsetsToTransaction(offsets, CONTAINER_ID);
inOrder.verify(this.producer).commitTransaction();
offsets.clear();
offsets.put(new TopicPartition("foo", 2), new OffsetAndMetadata(2L));
inOrder.verify(this.producer).beginTransaction();
inOrder.verify(this.producer).sendOffsetsToTransaction(offsets, CONTAINER_ID);
inOrder.verify(this.producer).commitTransaction();
inOrder.verify(this.consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT));
Expand Down
@@ -0,0 +1,196 @@
/*
* Copyright 2017-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.kafka.listener;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.BDDMockito.given;
import static org.mockito.BDDMockito.willAnswer;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.record.TimestampType;
import org.junit.jupiter.api.Test;
import org.mockito.InOrder;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;

/**
* @author Gary Russell
* @since 2.3.2
*
*/
@SpringJUnitConfig
@DirtiesContext
public class SubBatchPerPartitionTests {

private static final String CONTAINER_ID = "container";

@SuppressWarnings("rawtypes")
@Autowired
private Consumer consumer;

@Autowired
private Config config;

@Autowired
private KafkaListenerEndpointRegistry registry;

/*
* Deliver 6 records from three partitions, fail on the second record second
* partition.
*/
@SuppressWarnings("unchecked")
@Test
public void discardRemainingRecordsFromPollAndSeek() throws Exception {
assertThat(this.config.deliveryLatch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(this.config.commitLatch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(this.config.pollLatch.await(10, TimeUnit.SECONDS)).isTrue();
this.registry.stop();
assertThat(this.config.closeLatch.await(10, TimeUnit.SECONDS)).isTrue();
InOrder inOrder = inOrder(this.consumer);
inOrder.verify(this.consumer).subscribe(any(Collection.class), any(ConsumerRebalanceListener.class));
inOrder.verify(this.consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT));
inOrder.verify(this.consumer, times(3)).commitSync(any(), eq(Duration.ofSeconds(60)));
inOrder.verify(this.consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT));
assertThat(this.config.contents).contains("foo", "bar", "baz", "qux", "fiz", "buz");
}

@Configuration
@EnableKafka
public static class Config {

private final List<String> contents = new ArrayList<>();

private final CountDownLatch pollLatch = new CountDownLatch(2);

private final CountDownLatch deliveryLatch = new CountDownLatch(3);

private final CountDownLatch commitLatch = new CountDownLatch(3);

private final CountDownLatch closeLatch = new CountDownLatch(1);

@KafkaListener(id = CONTAINER_ID, topics = "foo")
public void foo(List<String> in) {
contents.addAll(in);
this.deliveryLatch.countDown();
}

@SuppressWarnings({ "rawtypes" })
@Bean
public ConsumerFactory consumerFactory() {
ConsumerFactory consumerFactory = mock(ConsumerFactory.class);
final Consumer consumer = consumer();
given(consumerFactory.createConsumer(CONTAINER_ID, "", "-0", KafkaTestUtils.defaultPropertyOverrides()))
.willReturn(consumer);
return consumerFactory;
}

@SuppressWarnings({ "rawtypes", "unchecked" })
@Bean
public Consumer consumer() {
final Consumer consumer = mock(Consumer.class);
final TopicPartition topicPartition0 = new TopicPartition("foo", 0);
final TopicPartition topicPartition1 = new TopicPartition("foo", 1);
final TopicPartition topicPartition2 = new TopicPartition("foo", 2);
willAnswer(i -> {
((ConsumerRebalanceListener) i.getArgument(1)).onPartitionsAssigned(
Arrays.asList(topicPartition0, topicPartition1, topicPartition2));
return null;
}).given(consumer).subscribe(any(Collection.class), any(ConsumerRebalanceListener.class));
Map<TopicPartition, List<ConsumerRecord>> records1 = new LinkedHashMap<>();
records1.put(topicPartition0, Arrays.asList(
new ConsumerRecord("foo", 0, 0L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, 0, null, "foo"),
new ConsumerRecord("foo", 0, 1L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, 0, null, "bar")));
records1.put(topicPartition1, Arrays.asList(
new ConsumerRecord("foo", 1, 0L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, 0, null, "baz"),
new ConsumerRecord("foo", 1, 1L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, 0, null, "qux")));
records1.put(topicPartition2, Arrays.asList(
new ConsumerRecord("foo", 2, 0L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, 0, null, "fiz"),
new ConsumerRecord("foo", 2, 1L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, 0, null, "buz")));
final AtomicInteger which = new AtomicInteger();
willAnswer(i -> {
this.pollLatch.countDown();
switch (which.getAndIncrement()) {
case 0:
return new ConsumerRecords(records1);
default:
try {
Thread.sleep(100);
}
catch (@SuppressWarnings("unused") InterruptedException e) {
Thread.currentThread().interrupt();
}
return new ConsumerRecords(Collections.emptyMap());
}
}).given(consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT));
willAnswer(i -> {
this.commitLatch.countDown();
return null;
}).given(consumer).commitSync(anyMap(), any());
willAnswer(i -> {
this.closeLatch.countDown();
return null;
}).given(consumer).close();
return consumer;
}

@SuppressWarnings({ "rawtypes", "unchecked" })
@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckOnError(false);
factory.setBatchListener(true);
factory.getContainerProperties().setMissingTopicsFatal(false);
factory.getContainerProperties().setSubBatchPerPartition(true);
return factory;
}

}

}

0 comments on commit 4855751

Please sign in to comment.