Skip to content

Commit

Permalink
Fix commits on partition revocation
Browse files Browse the repository at this point in the history
Complete the remainder of `onPartitionsRevoked()` if the commits fail
fatally.

It is possible that commits might fail when using manual acks; we must
complete the remainder of the method if the commits fail.

**I will backport as needed**
  • Loading branch information
garyrussell committed Apr 17, 2020
1 parent 514f1f2 commit 5fa11d3
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1750,8 +1750,14 @@ public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
else {
this.userListener.onPartitionsRevoked(partitions);
}
// Wait until now to commit, in case the user listener added acks
commitPendingAcks();
try {
// Wait until now to commit, in case the user listener added acks
commitPendingAcks();
}
catch (Exception e) {
ListenerConsumer.this.logger.error(e, () -> "Fatal commit error after revocation "
+ partitions);
}
if (this.consumerAwareListener != null) {
this.consumerAwareListener.onPartitionsRevokedAfterCommit(ListenerConsumer.this.consumer,
partitions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
Expand All @@ -53,6 +54,7 @@

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
Expand Down Expand Up @@ -2256,6 +2258,98 @@ public void testCommitErrorHandlerCalled() throws Exception {
container.stop();
}

@SuppressWarnings({ "unchecked", "rawtypes" })
@Test
void testCommitFailsOnRevoke() throws Exception {
ConsumerFactory<Integer, String> cf = mock(ConsumerFactory.class);
Consumer<Integer, String> consumer = mock(Consumer.class);
given(cf.createConsumer(eq("grp"), eq("clientId"), isNull(), any())).willReturn(consumer);
Map<String, Object> cfProps = new LinkedHashMap<>();
given(cf.getConfigurationProperties()).willReturn(cfProps);
final Map<TopicPartition, List<ConsumerRecord<Integer, String>>> records = new HashMap<>();
TopicPartition topicPartition0 = new TopicPartition("foo", 0);
records.put(topicPartition0, Arrays.asList(
new ConsumerRecord<>("foo", 0, 0L, 1, "foo"),
new ConsumerRecord<>("foo", 0, 1L, 1, "bar")));
records.put(new TopicPartition("foo", 1), Arrays.asList(
new ConsumerRecord<>("foo", 1, 0L, 1, "foo"),
new ConsumerRecord<>("foo", 1, 1L, 1, "bar")));
ConsumerRecords<Integer, String> consumerRecords = new ConsumerRecords<>(records);
ConsumerRecords<Integer, String> emptyRecords = new ConsumerRecords<>(Collections.emptyMap());
AtomicBoolean first = new AtomicBoolean(true);
AtomicInteger rebalance = new AtomicInteger();
AtomicReference<ConsumerRebalanceListener> rebal = new AtomicReference<>();
CountDownLatch latch = new CountDownLatch(2);
given(consumer.poll(any(Duration.class))).willAnswer(i -> {
Thread.sleep(50);
int call = rebalance.getAndIncrement();
if (call == 0) {
rebal.get().onPartitionsRevoked(Collections.emptyList());
rebal.get().onPartitionsAssigned(records.keySet());
}
else if (call == 1) {
rebal.get().onPartitionsRevoked(Collections.singletonList(topicPartition0));
rebal.get().onPartitionsAssigned(Collections.emptyList());
}
latch.countDown();
return first.getAndSet(false) ? consumerRecords : emptyRecords;
});
willAnswer(invoc -> {
rebal.set(invoc.getArgument(1));
return null;
}).given(consumer).subscribe(any(Collection.class), any(ConsumerRebalanceListener.class));
List<Map<TopicPartition, OffsetAndMetadata>> commits = new ArrayList<>();
AtomicBoolean firstCommit = new AtomicBoolean(true);
AtomicInteger commitCount = new AtomicInteger();
willAnswer(invoc -> {
commits.add(invoc.getArgument(0, Map.class));
if (!firstCommit.getAndSet(false)) {
throw new CommitFailedException();
}
return null;
}).given(consumer).commitSync(any(), any());
ContainerProperties containerProps = new ContainerProperties("foo");
containerProps.setGroupId("grp");
containerProps.setAckMode(AckMode.MANUAL);
containerProps.setClientId("clientId");
containerProps.setIdleEventInterval(100L);
AtomicReference<Acknowledgment> acknowledgment = new AtomicReference<>();
class AckListener implements AcknowledgingMessageListener {
// not a lambda https://bugs.openjdk.java.net/browse/JDK-8074381

@Override
public void onMessage(ConsumerRecord data, Acknowledgment ack) {
acknowledgment.set(ack);
}

@Override
public void onMessage(Object data) {
}

}
containerProps.setMessageListener(new AckListener());
containerProps.setConsumerRebalanceListener(new ConsumerAwareRebalanceListener() {

@Override
public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer,
Collection<TopicPartition> partitions) {

if (acknowledgment.get() != null) {
acknowledgment.get().acknowledge();
}
}

});
Properties consumerProps = new Properties();
containerProps.setKafkaConsumerProperties(consumerProps);
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
container.start();
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(container.getAssignedPartitions()).hasSize(1);
container.stop();
}

private Consumer<?, ?> spyOnConsumer(KafkaMessageListenerContainer<Integer, String> container) {
Consumer<?, ?> consumer = spy(
KafkaTestUtils.getPropertyValue(container, "listenerConsumer.consumer", Consumer.class));
Expand Down

0 comments on commit 5fa11d3

Please sign in to comment.