Skip to content

Commit

Permalink
Fix re-pausing consumer after a rebalance
Browse files Browse the repository at this point in the history
Fixes #1111

`pause()` the consumer in the rebalance listener so that the Consumer
will discard any fetched records that would otherwise be returned by
the initial poll.

**cherry-pick to 2.1.x**

# Conflicts:
#	spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java
  • Loading branch information
garyrussell authored and artembilan committed Jun 3, 2019
1 parent 4ab4c8a commit a5bfe1b
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1704,10 +1704,9 @@ public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
if (ListenerConsumer.this.consumerPaused) {
ListenerConsumer.this.consumerPaused = false;
ListenerConsumer.this.consumer.pause(partitions);
ListenerConsumer.this.logger.warn("Paused consumer resumed by Kafka due to rebalance; "
+ "the container will pause again before polling, unless the container's "
+ "'paused' property is reset by a custom rebalance listener");
+ "consumer paused again, so the initial poll() will never return any records");
}
ListenerConsumer.this.assignedPartitions = partitions;
if (!ListenerConsumer.this.autoCommit) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1863,16 +1863,25 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
public void testPauseResume() throws Exception {
ConsumerFactory<Integer, String> cf = mock(ConsumerFactory.class);
Consumer<Integer, String> consumer = mock(Consumer.class);
given(cf.createConsumer(eq("grp"), eq("clientId"), isNull(), isNull())).willReturn(consumer);
given(cf.createConsumer(eq("grp"), eq("clientId"), isNull(), any())).willReturn(consumer);
Map<String, Object> cfProps = new HashMap<>();
cfProps.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 45000);
given(cf.getConfigurationProperties()).willReturn(cfProps);
final Map<TopicPartition, List<ConsumerRecord<Integer, String>>> records = new HashMap<>();
records.put(new TopicPartition("foo", 0), Arrays.asList(
new ConsumerRecord<>("foo", 0, 0L, 1, "foo"),
new ConsumerRecord<>("foo", 0, 1L, 1, "bar")));
ConsumerRecords<Integer, String> consumerRecords = new ConsumerRecords<>(records);
ConsumerRecords<Integer, String> emptyRecords = new ConsumerRecords<>(Collections.emptyMap());
AtomicBoolean first = new AtomicBoolean(true);
AtomicBoolean rebalance = new AtomicBoolean(true);
AtomicReference<ConsumerRebalanceListener> rebal = new AtomicReference<>();
given(consumer.poll(any(Duration.class))).willAnswer(i -> {
Thread.sleep(50);
if (rebalance.getAndSet(false)) {
rebal.get().onPartitionsRevoked(Collections.emptyList());
rebal.get().onPartitionsAssigned(records.keySet());
}
return first.getAndSet(false) ? consumerRecords : emptyRecords;
});
final CountDownLatch commitLatch = new CountDownLatch(2);
Expand All @@ -1881,9 +1890,11 @@ public void testPauseResume() throws Exception {
return null;
}).given(consumer).commitSync(any(Map.class));
given(consumer.assignment()).willReturn(records.keySet());
final CountDownLatch pauseLatch = new CountDownLatch(2);
final CountDownLatch pauseLatch1 = new CountDownLatch(2); // consumer, event publisher
final CountDownLatch pauseLatch2 = new CountDownLatch(2); // consumer, consumer
willAnswer(i -> {
pauseLatch.countDown();
pauseLatch1.countDown();
pauseLatch2.countDown();
return null;
}).given(consumer).pause(records.keySet());
given(consumer.paused()).willReturn(records.keySet());
Expand All @@ -1892,20 +1903,26 @@ public void testPauseResume() throws Exception {
resumeLatch.countDown();
return null;
}).given(consumer).resume(records.keySet());
TopicPartitionInitialOffset[] topicPartition = new TopicPartitionInitialOffset[] {
new TopicPartitionInitialOffset("foo", 0) };
ContainerProperties containerProps = new ContainerProperties(topicPartition);
willAnswer(invoc -> {
rebal.set(invoc.getArgument(1));
return null;
}).given(consumer).subscribe(any(Collection.class), any(ConsumerRebalanceListener.class));
ContainerProperties containerProps = new ContainerProperties("foo");
containerProps.setGroupId("grp");
containerProps.setAckMode(AckMode.RECORD);
containerProps.setClientId("clientId");
containerProps.setIdleEventInterval(100L);
containerProps.setMessageListener((MessageListener) r -> { });
containerProps.setMissingTopicsFatal(false);
Properties consumerProps = new Properties();
consumerProps.setProperty(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "42000");
containerProps.setConsumerProperties(consumerProps);
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
CountDownLatch stopLatch = new CountDownLatch(1);
container.setApplicationEventPublisher(e -> {
if (e instanceof ConsumerPausedEvent) {
pauseLatch.countDown();
pauseLatch1.countDown();
}
else if (e instanceof ConsumerResumedEvent) {
resumeLatch.countDown();
Expand All @@ -1916,13 +1933,19 @@ else if (e instanceof ConsumerStoppedEvent) {
});
container.start();
assertThat(commitLatch.await(10, TimeUnit.SECONDS)).isTrue();
verify(consumer, times(2)).commitSync(any(Map.class));
verify(consumer, times(3)).commitSync(anyMap());
assertThat(container.isContainerPaused()).isFalse();
container.pause();
assertThat(pauseLatch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(container.isPaused()).isTrue();
assertThat(pauseLatch1.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(container.isContainerPaused()).isTrue();
rebalance.set(true); // force a re-pause
assertThat(pauseLatch2.await(10, TimeUnit.SECONDS)).isTrue();
container.resume();
assertThat(resumeLatch.await(10, TimeUnit.SECONDS)).isTrue();
container.stop();
assertThat(stopLatch.await(10, TimeUnit.SECONDS)).isTrue();
verify(consumer, times(4)).commitSync(anyMap());
}

@SuppressWarnings({ "unchecked", "rawtypes" })
Expand Down

0 comments on commit a5bfe1b

Please sign in to comment.