Skip to content

Commit

Permalink
Add ListenerContainerNoLongerIdleEvent
Browse files Browse the repository at this point in the history
  • Loading branch information
garyrussell authored and artembilan committed Sep 18, 2020
1 parent 145240e commit be0cc7c
Show file tree
Hide file tree
Showing 5 changed files with 153 additions and 8 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Copyright 2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.kafka.event;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.TopicPartition;

/**
* An event that is emitted when a container is no longer idle if configured to publish
* idle events.
*
* @author Gary Russell
* @since 2.6.2
*/
public class ListenerContainerNoLongerIdleEvent extends KafkaEvent {

private static final long serialVersionUID = 1L;

private final long idleTime;

private final String listenerId;

private final List<TopicPartition> topicPartitions;

private transient Consumer<?, ?> consumer;

/**
* Construct an instance with the provided arguments.
* @param source the container instance that generated the event.
* @param container the container or the parent container if the container is a child.
* @param idleTime how long the container was idle.
* @param id the container id.
* @param topicPartitions the topics/partitions currently assigned.
* @param consumer the consumer.
*/
public ListenerContainerNoLongerIdleEvent(Object source, Object container, long idleTime, String id,
Collection<TopicPartition> topicPartitions, Consumer<?, ?> consumer) {

super(source, container);
this.idleTime = idleTime;
this.listenerId = id;
this.topicPartitions = topicPartitions == null ? null : new ArrayList<>(topicPartitions);
this.consumer = consumer;
}

/**
* The TopicPartitions the container is listening to.
* @return the TopicPartition list.
*/
public Collection<TopicPartition> getTopicPartitions() {
return this.topicPartitions == null ? null : Collections.unmodifiableList(this.topicPartitions);
}

/**
* How long the container was idle.
* @return the time in milliseconds.
*/
public long getIdleTime() {
return this.idleTime;
}

/**
* The id of the listener (if {@code @KafkaListener}) or the container bean name.
* @return the id.
*/
public String getListenerId() {
return this.listenerId;
}

/**
* Retrieve the consumer. Only populated if the listener is consumer-aware.
* Allows the listener to resume a paused consumer.
* @return the consumer.
*/
public Consumer<?, ?> getConsumer() {
return this.consumer;
}

@Override
public String toString() {
return "ListenerContainerNoLongerIdleEvent [idleTime="
+ ((float) this.idleTime / 1000) + "s, listenerId=" + this.listenerId // NOSONAR magic #
+ ", container=" + getSource()
+ ", topicPartitions=" + this.topicPartitions + "]";
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import org.springframework.kafka.event.ConsumerStoppedEvent;
import org.springframework.kafka.event.ConsumerStoppingEvent;
import org.springframework.kafka.event.ListenerContainerIdleEvent;
import org.springframework.kafka.event.ListenerContainerNoLongerIdleEvent;
import org.springframework.kafka.event.NonResponsiveConsumerEvent;
import org.springframework.kafka.listener.ConsumerSeekAware.ConsumerSeekCallback;
import org.springframework.kafka.listener.ContainerProperties.AckMode;
Expand Down Expand Up @@ -358,6 +359,13 @@ private void publishIdleContainerEvent(long idleTime, Consumer<?, ?> consumer, b
}
}

private void publishNoLongerIdleContainerEvent(long idleTime, Consumer<?, ?> consumer) {
if (getApplicationEventPublisher() != null) {
getApplicationEventPublisher().publishEvent(new ListenerContainerNoLongerIdleEvent(this,
this.thisOrParentContainer, idleTime, getBeanName(), getAssignedPartitions(), consumer));
}
}

private void publishNonResponsiveConsumerEvent(long timeSinceLastPoll, Consumer<?, ?> consumer) {
if (getApplicationEventPublisher() != null) {
getApplicationEventPublisher().publishEvent(
Expand Down Expand Up @@ -591,6 +599,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume

private boolean commitRecovered;

private boolean wasIdle;

private volatile boolean consumerPaused;

private volatile Thread consumerThread;
Expand Down Expand Up @@ -1122,16 +1132,25 @@ protected void pollAndInvoke() {
debugRecords(records);
if (records != null && records.count() > 0) {
savePositionsIfNeeded(records);
if (this.containerProperties.getIdleEventInterval() != null) {
this.lastReceive = System.currentTimeMillis();
}
notIdle();
invokeListener(records);
}
else {
checkIdle();
}
}

private void notIdle() {
if (this.containerProperties.getIdleEventInterval() != null) {
long now = System.currentTimeMillis();
if (this.wasIdle) {
this.wasIdle = false;
publishNoLongerIdleContainerEvent(now - this.lastReceive, this.consumer);
}
this.lastReceive = now;
}
}

private void savePositionsIfNeeded(ConsumerRecords<K, V> records) {
if (this.fixTxOffsets) {
this.savedPositions.clear();
Expand Down Expand Up @@ -1274,6 +1293,7 @@ private void checkIdle() {
long now = System.currentTimeMillis();
if (now > this.lastReceive + this.containerProperties.getIdleEventInterval()
&& now > this.lastAlertAt + this.containerProperties.getIdleEventInterval()) {
this.wasIdle = true;
publishIdleContainerEvent(now - this.lastReceive, this.consumer, this.consumerPaused);
this.lastAlertAt = now;
if (this.consumerSeekAwareListener != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
import org.springframework.kafka.core.MicrometerProducerListener;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.event.ListenerContainerIdleEvent;
import org.springframework.kafka.event.ListenerContainerNoLongerIdleEvent;
import org.springframework.kafka.listener.AbstractConsumerSeekAware;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ConsumerAwareErrorHandler;
Expand Down Expand Up @@ -247,13 +248,11 @@ public void testAnonymous() {
public void testSimple() throws Exception {
this.recordFilter.called = false;
template.send("annotated1", 0, "foo");
template.flush();
assertThat(this.listener.latch1.await(60, TimeUnit.SECONDS)).isTrue();
assertThat(this.config.globalErrorThrowable).isNotNull();
assertThat(this.listener.receivedGroupId).isEqualTo("foo");

template.send("annotated2", 0, 123, "foo");
template.flush();
assertThat(this.listener.latch2.await(60, TimeUnit.SECONDS)).isTrue();
assertThat(this.listener.key).isEqualTo(123);
assertThat(this.listener.partition).isNotNull();
Expand All @@ -277,13 +276,11 @@ public void testSimple() throws Exception {
.isEqualTo(2L);

template.send("annotated3", 0, "foo");
template.flush();
assertThat(this.listener.latch3.await(60, TimeUnit.SECONDS)).isTrue();
assertThat(this.listener.capturedRecord.value()).isEqualTo("foo");
assertThat(this.config.listen3Exception).isNotNull();

template.send("annotated4", 0, "foo");
template.flush();
assertThat(this.listener.latch4.await(60, TimeUnit.SECONDS)).isTrue();
assertThat(this.listener.capturedRecord.value()).isEqualTo("foo");
assertThat(this.listener.ack).isNotNull();
Expand Down Expand Up @@ -317,6 +314,10 @@ public void testSimple() throws Exception {
assertThat(KafkaTestUtils.getPropertyValue(containers.get(0), "listenerConsumer.consumer.clientId"))
.isEqualTo("clientIdViaProps3-0");

template.send("annotated4", 0, "foo");
assertThat(this.listener.noLongerIdleEventLatch.await(60, TimeUnit.SECONDS)).isTrue();
assertThat(this.listener.noLongerIdleEvent.getListenerId().startsWith("qux-"));

template.send("annotated5", 0, 0, "foo");
template.send("annotated5", 1, 0, "bar");
template.send("annotated6", 0, 0, "baz");
Expand Down Expand Up @@ -1637,6 +1638,8 @@ static class Listener implements ConsumerSeekAware {

final CountDownLatch eventLatch = new CountDownLatch(1);

final CountDownLatch noLongerIdleEventLatch = new CountDownLatch(1);

final CountDownLatch keyLatch = new CountDownLatch(1);

final AtomicBoolean reposition12 = new AtomicBoolean();
Expand Down Expand Up @@ -1667,6 +1670,8 @@ static class Listener implements ConsumerSeekAware {

volatile ListenerContainerIdleEvent event;

volatile ListenerContainerNoLongerIdleEvent noLongerIdleEvent;

volatile List<Integer> keys;

volatile List<Integer> partitions;
Expand Down Expand Up @@ -1757,6 +1762,12 @@ public void eventHandler(ListenerContainerIdleEvent event) {
eventLatch.countDown();
}

@EventListener(condition = "event.listenerId.startsWith('qux')")
public void eventHandler(ListenerContainerNoLongerIdleEvent event) {
this.noLongerIdleEvent = event;
noLongerIdleEventLatch.countDown();
}

@KafkaListener(id = "fiz", topicPartitions = {
@TopicPartition(topic = "annotated5", partitions = { "#{'${foo:0,1}'.split(',')}" }),
@TopicPartition(topic = "annotated6", partitions = "0",
Expand Down
5 changes: 5 additions & 0 deletions src/reference/asciidoc/kafka.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -2535,6 +2535,7 @@ The following Spring application events are published by listener containers and
This event might signal that the configured task executor has insufficient threads to support the containers it is used in and their concurrency.
An error message is also logged when this condition occurs.
* `ListenerContainerIdleEvent`: published when no messages have been received in `idleInterval` (if configured).
* `ListenerContainerNoLongerIdleEvent`: published when a record is consumed after previously publishing a `ListenerContainerIdleEvent`.
* `NonResponsiveConsumerEvent`: published when the consumer appears to be blocked in the `poll` method.
* `ConsumerPausedEvent`: published by each consumer when the container is paused.
* `ConsumerResumedEvent`: published by each consumer when the container is resumed.
Expand All @@ -2558,6 +2559,8 @@ For example, if the consumer's `pause()` method was previously called, it can `r
* `paused`: Whether the container is currently paused.
See <<pause-resume>> for more information.

The `ListenerContainerNoLongerIdleEvent` has the same properties, except `idleTime` and `paused`.

The `NonResponsiveConsumerEvent` has the following properties:

* `source`: The listener container instance that published the event.
Expand Down Expand Up @@ -2637,6 +2640,8 @@ You can modify this behavior by setting the `monitorInterval` (default 30 second
The `noPollThreshold` should be greater than `1.0` to avoid getting spurious events due to a race condition.
Receiving such an event lets you stop the containers, thus waking the consumer so that it can stop.

Starting with version 2.6.2, if a container has published a `ListenerContainerIdleEvent`, it will publish a `ListenerContainerNoLongerIdleEvent` when a record is subsequently received.

====== Event Consumption

You can capture these events by implementing `ApplicationListener` -- either a general listener or one narrowed to only receive this specific event.
Expand Down
5 changes: 4 additions & 1 deletion src/reference/asciidoc/whats-new.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ See <<seek-to-current>>, <<recovering-batch-eh>>, <<dead-letters>> and <<after-r
You can now configure an `adviceChain` in the container properties.
See <<container-props>> for more information.

==== @KafkaLisener Changes
When the container is configured to publish `ListenerContainerIdleEvent` s, it now publishes a `ListenerContainerNoLongerIdleEvent` when a record is received after publishing an idle event.
See <<events>> and <<idle-containers>> for more information.

==== @KafkaListener Changes

When using manual partition assignment, you can now specify a wildcard for determining which partitions should be reset to the initial offset.
In addition, if the listener implements `ConsumerSeekAware`, `onPartitionsAssigned()` is called after the manual assignment.
Expand Down

0 comments on commit be0cc7c

Please sign in to comment.