Skip to content

Commit

Permalink
GH-1247: Only wake consumer while polling
Browse files Browse the repository at this point in the history
Fixes #1247

Previously, stopping the container unconditionally called `consumer.wakeUp()`.
It should only be woken while actually polling, otherwise operations such
as `commitSync()` would fail (after committing the offset but before calling
any callbacks or interceptors).

Add logic to only wake the consumer while it is in the `poll()` method.

**cherry-pick to 2.2.x, 2.1.x**

# Conflicts:
#	spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java
#	spring-kafka/src/test/java/org/springframework/kafka/listener/ContainerStoppingErrorHandlerRecordModeTests.java
  • Loading branch information
garyrussell authored and artembilan committed Sep 26, 2019
1 parent a52f415 commit 7cfc482
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 5 deletions.
Expand Up @@ -35,6 +35,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

Expand Down Expand Up @@ -317,7 +318,7 @@ protected void doStop(final Runnable callback) {
if (isRunning()) {
this.listenerConsumerFuture.addCallback(new StopCallback(callback));
setRunning(false);
this.listenerConsumer.consumer.wakeup();
this.listenerConsumer.wakeIfNecessary();
}
}

Expand Down Expand Up @@ -469,6 +470,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume

private Map<TopicPartition, OffsetMetadata> definedPartitions;

private final AtomicBoolean polling = new AtomicBoolean();

private volatile Collection<TopicPartition> assignedPartitions;

private volatile Thread consumerThread;
Expand Down Expand Up @@ -740,8 +743,19 @@ protected void pollAndInvoke() {
}
processSeeks();
checkPaused();
ConsumerRecords<K, V> records = this.consumer.poll(this.pollTimeout);
this.lastPoll = System.currentTimeMillis();
this.polling.set(true);
ConsumerRecords<K, V> records = this.consumer.poll(this.pollTimeout);
if (!this.polling.compareAndSet(true, false)) {
/*
* There is a small race condition where wakeIfNecessary was called between
* exiting the poll and before we reset the boolean.
*/
if (records.count() > 0 && this.logger.isDebugEnabled()) {
this.logger.debug("Discarding polled records, container stopped: " + records.count());
}
return;
}
checkResumed();
debugRecords(records);
if (records != null && records.count() > 0) {
Expand All @@ -755,6 +769,12 @@ protected void pollAndInvoke() {
}
}

void wakeIfNecessary() {
if (this.polling.getAndSet(false)) {
this.consumer.wakeup();
}
}

private void debugRecords(ConsumerRecords<K, V> records) {
if (records != null && this.logger.isDebugEnabled()) {
this.logger.debug("Received: " + records.count() + " records");
Expand Down
Expand Up @@ -93,7 +93,6 @@ public void stopContainerAfterException() throws Exception {
InOrder inOrder = inOrder(this.consumer);
inOrder.verify(this.consumer).subscribe(any(Collection.class), any(ConsumerRebalanceListener.class));
inOrder.verify(this.consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT));
inOrder.verify(this.consumer).wakeup();
inOrder.verify(this.consumer).unsubscribe();
inOrder.verify(this.consumer).close();
inOrder.verifyNoMoreInteractions();
Expand Down
Expand Up @@ -93,7 +93,6 @@ public void stopContainerAfterException() throws Exception {
InOrder inOrder = inOrder(this.consumer);
inOrder.verify(this.consumer).subscribe(any(Collection.class), any(ConsumerRebalanceListener.class));
inOrder.verify(this.consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT));
inOrder.verify(this.consumer).wakeup();
inOrder.verify(this.consumer).unsubscribe();
inOrder.verify(this.consumer).close();
inOrder.verifyNoMoreInteractions();
Expand Down
Expand Up @@ -101,7 +101,6 @@ public void stopContainerAfterException() throws Exception {
Collections.singletonMap(new TopicPartition("foo", 0), new OffsetAndMetadata(2L)));
inOrder.verify(this.consumer).commitSync(
Collections.singletonMap(new TopicPartition("foo", 1), new OffsetAndMetadata(1L)));
inOrder.verify(this.consumer).wakeup();
inOrder.verify(this.consumer).unsubscribe();
inOrder.verify(this.consumer).close();
inOrder.verifyNoMoreInteractions();
Expand Down
Expand Up @@ -26,6 +26,7 @@
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
Expand Down Expand Up @@ -586,6 +587,69 @@ public void onMessage(ConsumerRecord<Integer, String> data) {
inOrder.verify(messageListener).onMessage(any(ConsumerRecord.class));
inOrder.verify(consumer).commitSync(any(Map.class));
container.stop();
verify(consumer).wakeup();
}

@SuppressWarnings("unchecked")
@Test
public void testRecordAckAfterStop() throws Exception {
ConsumerFactory<Integer, String> cf = mock(ConsumerFactory.class);
Consumer<Integer, String> consumer = mock(Consumer.class);
given(cf.createConsumer(eq("grp"), eq("clientId"), isNull(), any())).willReturn(consumer);
final Map<TopicPartition, List<ConsumerRecord<Integer, String>>> records = new HashMap<>();
records.put(new TopicPartition("foo", 0), Collections.singletonList(
new ConsumerRecord<>("foo", 0, 0L, 1, "foo")));
ConsumerRecords<Integer, String> consumerRecords = new ConsumerRecords<>(records);
given(consumer.poll(any(Duration.class))).willAnswer(i -> {
Thread.sleep(50);
return consumerRecords;
});
TopicPartitionInitialOffset[] topicPartition = new TopicPartitionInitialOffset[] {
new TopicPartitionInitialOffset("foo", 0) };
ContainerProperties containerProps = new ContainerProperties(topicPartition);
containerProps.setGroupId("grp");
containerProps.setAckMode(AckMode.RECORD);
containerProps.setMissingTopicsFatal(false);
final CountDownLatch latch1 = new CountDownLatch(1);
final CountDownLatch latch2 = new CountDownLatch(1);
MessageListener<Integer, String> messageListener = spy(
new MessageListener<Integer, String>() { // Cannot be lambda: Mockito doesn't mock final classes

@Override
public void onMessage(ConsumerRecord<Integer, String> data) {
latch1.countDown();
try {
latch2.await(10, TimeUnit.SECONDS);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}

});

final CountDownLatch commitLatch = new CountDownLatch(1);
willAnswer(i -> {
commitLatch.countDown();
return null;
}
).given(consumer).commitSync(anyMap());

containerProps.setMessageListener(messageListener);
containerProps.setClientId("clientId");
containerProps.setShutdownTimeout(5L);
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
container.start();
assertThat(latch1.await(10, TimeUnit.SECONDS)).isTrue();
container.stop();
latch2.countDown();
assertThat(commitLatch.await(10, TimeUnit.SECONDS)).isTrue();
InOrder inOrder = inOrder(messageListener, consumer);
inOrder.verify(consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT));
inOrder.verify(messageListener).onMessage(any(ConsumerRecord.class));
inOrder.verify(consumer).commitSync(anyMap());
verify(consumer, never()).wakeup();
}

@Test
Expand Down

0 comments on commit 7cfc482

Please sign in to comment.