From 7cfc482abd614320e03dff0d2ed4b196ac781a95 Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Thu, 26 Sep 2019 10:26:30 -0400 Subject: [PATCH] GH-1247: Only wake consumer while polling Fixes https://github.com/spring-projects/spring-kafka/issues/1247 Previously, stopping the container unconditionally called `consumer.wakeUp()`. It should only be woken while actually polling, otherwise operations such as `commitSync()` would fail (after committing the offset but before calling any callbacks or interceptors). Add logic to only wake the consumer while it is in the `poll()` method. **cherry-pick to 2.2.x, 2.1.x** # Conflicts: # spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java # spring-kafka/src/test/java/org/springframework/kafka/listener/ContainerStoppingErrorHandlerRecordModeTests.java --- .../KafkaMessageListenerContainer.java | 24 ++++++- ...ntainerStoppingBatchErrorHandlerTests.java | 1 - ...nerStoppingErrorHandlerBatchModeTests.java | 1 - ...erStoppingErrorHandlerRecordModeTests.java | 1 - .../KafkaMessageListenerContainerTests.java | 64 +++++++++++++++++++ 5 files changed, 86 insertions(+), 5 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index 9d796353d0..8707fdf8d0 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -35,6 +35,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -317,7 +318,7 @@ protected void doStop(final Runnable callback) { if (isRunning()) { this.listenerConsumerFuture.addCallback(new StopCallback(callback)); setRunning(false); - this.listenerConsumer.consumer.wakeup(); + this.listenerConsumer.wakeIfNecessary(); } } @@ -469,6 +470,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume private Map definedPartitions; + private final AtomicBoolean polling = new AtomicBoolean(); + private volatile Collection assignedPartitions; private volatile Thread consumerThread; @@ -740,8 +743,19 @@ protected void pollAndInvoke() { } processSeeks(); checkPaused(); - ConsumerRecords records = this.consumer.poll(this.pollTimeout); this.lastPoll = System.currentTimeMillis(); + this.polling.set(true); + ConsumerRecords records = this.consumer.poll(this.pollTimeout); + if (!this.polling.compareAndSet(true, false)) { + /* + * There is a small race condition where wakeIfNecessary was called between + * exiting the poll and before we reset the boolean. + */ + if (records.count() > 0 && this.logger.isDebugEnabled()) { + this.logger.debug("Discarding polled records, container stopped: " + records.count()); + } + return; + } checkResumed(); debugRecords(records); if (records != null && records.count() > 0) { @@ -755,6 +769,12 @@ protected void pollAndInvoke() { } } + void wakeIfNecessary() { + if (this.polling.getAndSet(false)) { + this.consumer.wakeup(); + } + } + private void debugRecords(ConsumerRecords records) { if (records != null && this.logger.isDebugEnabled()) { this.logger.debug("Received: " + records.count() + " records"); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/ContainerStoppingBatchErrorHandlerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/ContainerStoppingBatchErrorHandlerTests.java index 6c34458bce..72feee3384 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/ContainerStoppingBatchErrorHandlerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/ContainerStoppingBatchErrorHandlerTests.java @@ -93,7 +93,6 @@ public void stopContainerAfterException() throws Exception { InOrder inOrder = inOrder(this.consumer); inOrder.verify(this.consumer).subscribe(any(Collection.class), any(ConsumerRebalanceListener.class)); inOrder.verify(this.consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT)); - inOrder.verify(this.consumer).wakeup(); inOrder.verify(this.consumer).unsubscribe(); inOrder.verify(this.consumer).close(); inOrder.verifyNoMoreInteractions(); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/ContainerStoppingErrorHandlerBatchModeTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/ContainerStoppingErrorHandlerBatchModeTests.java index 4dec48e86d..0ea2c5ef3c 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/ContainerStoppingErrorHandlerBatchModeTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/ContainerStoppingErrorHandlerBatchModeTests.java @@ -93,7 +93,6 @@ public void stopContainerAfterException() throws Exception { InOrder inOrder = inOrder(this.consumer); inOrder.verify(this.consumer).subscribe(any(Collection.class), any(ConsumerRebalanceListener.class)); inOrder.verify(this.consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT)); - inOrder.verify(this.consumer).wakeup(); inOrder.verify(this.consumer).unsubscribe(); inOrder.verify(this.consumer).close(); inOrder.verifyNoMoreInteractions(); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/ContainerStoppingErrorHandlerRecordModeTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/ContainerStoppingErrorHandlerRecordModeTests.java index f3358c9387..6359d59141 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/ContainerStoppingErrorHandlerRecordModeTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/ContainerStoppingErrorHandlerRecordModeTests.java @@ -101,7 +101,6 @@ public void stopContainerAfterException() throws Exception { Collections.singletonMap(new TopicPartition("foo", 0), new OffsetAndMetadata(2L))); inOrder.verify(this.consumer).commitSync( Collections.singletonMap(new TopicPartition("foo", 1), new OffsetAndMetadata(1L))); - inOrder.verify(this.consumer).wakeup(); inOrder.verify(this.consumer).unsubscribe(); inOrder.verify(this.consumer).close(); inOrder.verifyNoMoreInteractions(); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java index 24b0030b1a..6c937b8902 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java @@ -26,6 +26,7 @@ import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -586,6 +587,69 @@ public void onMessage(ConsumerRecord data) { inOrder.verify(messageListener).onMessage(any(ConsumerRecord.class)); inOrder.verify(consumer).commitSync(any(Map.class)); container.stop(); + verify(consumer).wakeup(); + } + + @SuppressWarnings("unchecked") + @Test + public void testRecordAckAfterStop() throws Exception { + ConsumerFactory cf = mock(ConsumerFactory.class); + Consumer consumer = mock(Consumer.class); + given(cf.createConsumer(eq("grp"), eq("clientId"), isNull(), any())).willReturn(consumer); + final Map>> records = new HashMap<>(); + records.put(new TopicPartition("foo", 0), Collections.singletonList( + new ConsumerRecord<>("foo", 0, 0L, 1, "foo"))); + ConsumerRecords consumerRecords = new ConsumerRecords<>(records); + given(consumer.poll(any(Duration.class))).willAnswer(i -> { + Thread.sleep(50); + return consumerRecords; + }); + TopicPartitionInitialOffset[] topicPartition = new TopicPartitionInitialOffset[] { + new TopicPartitionInitialOffset("foo", 0) }; + ContainerProperties containerProps = new ContainerProperties(topicPartition); + containerProps.setGroupId("grp"); + containerProps.setAckMode(AckMode.RECORD); + containerProps.setMissingTopicsFatal(false); + final CountDownLatch latch1 = new CountDownLatch(1); + final CountDownLatch latch2 = new CountDownLatch(1); + MessageListener messageListener = spy( + new MessageListener() { // Cannot be lambda: Mockito doesn't mock final classes + + @Override + public void onMessage(ConsumerRecord data) { + latch1.countDown(); + try { + latch2.await(10, TimeUnit.SECONDS); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + }); + + final CountDownLatch commitLatch = new CountDownLatch(1); + willAnswer(i -> { + commitLatch.countDown(); + return null; + } + ).given(consumer).commitSync(anyMap()); + + containerProps.setMessageListener(messageListener); + containerProps.setClientId("clientId"); + containerProps.setShutdownTimeout(5L); + KafkaMessageListenerContainer container = + new KafkaMessageListenerContainer<>(cf, containerProps); + container.start(); + assertThat(latch1.await(10, TimeUnit.SECONDS)).isTrue(); + container.stop(); + latch2.countDown(); + assertThat(commitLatch.await(10, TimeUnit.SECONDS)).isTrue(); + InOrder inOrder = inOrder(messageListener, consumer); + inOrder.verify(consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT)); + inOrder.verify(messageListener).onMessage(any(ConsumerRecord.class)); + inOrder.verify(consumer).commitSync(anyMap()); + verify(consumer, never()).wakeup(); } @Test