Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-304: Add ConsumerAwareRebalanceListener #307

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ else if (containerProperties.getTopicPattern() != null) {
this.containerProperties.setAckTime(containerProperties.getAckTime());
}
if (this.containerProperties.getConsumerRebalanceListener() == null) {
this.containerProperties.setConsumerRebalanceListener(createConsumerRebalanceListener());
this.containerProperties.setConsumerRebalanceListener(createSimpleLoggingConsumerRebalanceListener());
}
}

Expand Down Expand Up @@ -234,7 +234,7 @@ public void stop(Runnable callback) {
* Return default implementation of {@link ConsumerRebalanceListener} instance.
* @return the {@link ConsumerRebalanceListener} currently assigned to this container.
*/
protected final ConsumerRebalanceListener createConsumerRebalanceListener() {
protected final ConsumerRebalanceListener createSimpleLoggingConsumerRebalanceListener() {
return new ConsumerRebalanceListener() {

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.kafka.listener;

import java.util.Collection;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.common.TopicPartition;

/**
* A rebalance listener that provides access to the consumer object.
*
* @author Gary Russell
* @since 2.0
*
*/
public interface ConsumerAwareRebalanceListener extends ConsumerRebalanceListener {

/**
* The same as {@link #onPartitionsRevoked(Collection)} with the additional consumer
* parameter. It is invoked by the container before any pending offsets are committed.
* @param consumer the consumer.
* @param partitions the partitions.
*/
void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);

/**
* The same as {@link #onPartitionsRevoked(Collection)} with the additional consumer
* parameter. It is invoked by the container after any pending offsets are committed.
* @param consumer the consumer.
* @param partitions the partitions.
*/
void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);

/**
* The same as {@link #onPartitionsAssigned(Collection)} with the additional consumer
* parameter.
* @param consumer the consumer.
* @param partitions the partitions.
*/
void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);

@Override
default void onPartitionsRevoked(Collection<TopicPartition> partitions) {
throw new UnsupportedOperationException("Listener container should never call this");
}

@Override
default void onPartitionsAssigned(Collection<TopicPartition> partitions) {
throw new UnsupportedOperationException("Listener container should never call this");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -332,11 +332,25 @@ else if (listener instanceof MessageListener) {
public ConsumerRebalanceListener createRebalanceListener(final Consumer<K, V> consumer) {
return new ConsumerRebalanceListener() {

final ConsumerRebalanceListener userListener = getContainerProperties().getConsumerRebalanceListener();

final ConsumerAwareRebalanceListener consumerAwareListener =
userListener instanceof ConsumerAwareRebalanceListener
? (ConsumerAwareRebalanceListener) userListener : null;

@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
getContainerProperties().getConsumerRebalanceListener().onPartitionsRevoked(partitions);
if (this.consumerAwareListener != null) {
this.consumerAwareListener.onPartitionsRevokedBeforeCommit(consumer, partitions);
}
else {
this.userListener.onPartitionsRevoked(partitions);
}
// Wait until now to commit, in case the user listener added acks
commitManualAcks();
commitPendingAcks();
if (this.consumerAwareListener != null) {
this.consumerAwareListener.onPartitionsRevokedAfterCommit(consumer, partitions);
}
}

@Override
Expand Down Expand Up @@ -365,7 +379,12 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
if (ListenerConsumer.this.listener instanceof ConsumerSeekAware) {
seekPartitions(partitions, false);
}
getContainerProperties().getConsumerRebalanceListener().onPartitionsAssigned(partitions);
if (this.consumerAwareListener != null) {
this.consumerAwareListener.onPartitionsAssigned(consumer, partitions);
}
else {
this.userListener.onPartitionsAssigned(partitions);
}
}

};
Expand Down Expand Up @@ -485,7 +504,7 @@ public void run() {
}
}
}
commitManualAcks();
commitPendingAcks();
try {
this.consumer.unsubscribe();
}
Expand All @@ -498,7 +517,7 @@ public void run() {
}
}

private void commitManualAcks() {
private void commitPendingAcks() {
processCommits();
if (this.offsets.size() > 0) {
// we always commit after stopping the invoker
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import org.apache.kafka.clients.consumer.Consumer;
Expand Down Expand Up @@ -59,6 +60,7 @@
import org.springframework.kafka.event.ListenerContainerIdleEvent;
import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
import org.springframework.kafka.listener.ConsumerSeekAware;
import org.springframework.kafka.listener.KafkaListenerErrorHandler;
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
Expand Down Expand Up @@ -134,6 +136,9 @@ public class EnableKafkaIntegrationTests {
@Autowired
private DefaultKafkaConsumerFactory<Integer, String> consumerFactory;

@Autowired
private AtomicReference<Consumer<?, ?>> consumerRef;

@Test
public void testSimple() throws Exception {
template.send("annotated1", 0, "foo");
Expand Down Expand Up @@ -197,6 +202,7 @@ public void testSimple() throws Exception {
template.send("annotated11", 0, "foo");
template.flush();
assertThat(this.listener.latch7.await(60, TimeUnit.SECONDS)).isTrue();
assertThat(this.consumerRef.get()).isNotNull();

assertThat(this.recordFilter.called).isTrue();
}
Expand Down Expand Up @@ -567,7 +573,7 @@ public KafkaListenerContainerFactory<?> batchManualFactory() {

@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
kafkaAutoStartFalseListenerContainerFactory() {
kafkaAutoStartFalseListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
ContainerProperties props = factory.getContainerProperties();
Expand All @@ -581,12 +587,12 @@ public KafkaListenerContainerFactory<?> batchManualFactory() {

@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
kafkaRebalanceListenerContainerFactory() {
kafkaRebalanceListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
ContainerProperties props = factory.getContainerProperties();
factory.setConsumerFactory(consumerFactory());
props.setConsumerRebalanceListener(consumerRebalanceListener());
props.setConsumerRebalanceListener(consumerRebalanceListener(consumerRef()));
return factory;
}

Expand Down Expand Up @@ -664,17 +670,31 @@ public KafkaTemplate<Integer, String> kafkaJsonTemplate() {
return kafkaTemplate;
}

private ConsumerRebalanceListener consumerRebalanceListener() {
return new ConsumerRebalanceListener() {
@Bean
public AtomicReference<Consumer<?, ?>> consumerRef() {
return new AtomicReference<>();
}

private ConsumerAwareRebalanceListener consumerRebalanceListener(
final AtomicReference<Consumer<?, ?>> consumerRef) {
return new ConsumerAwareRebalanceListener() {

@Override
public void onPartitionsRevoked(Collection<org.apache.kafka.common.TopicPartition> partitions) {

public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer,
Collection<org.apache.kafka.common.TopicPartition> partitions) {
consumerRef.set(consumer);
}

@Override
public void onPartitionsAssigned(Collection<org.apache.kafka.common.TopicPartition> partitions) {
public void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer,
Collection<org.apache.kafka.common.TopicPartition> partitions) {
consumerRef.set(consumer);
}

@Override
public void onPartitionsAssigned(Consumer<?, ?> consumer,
Collection<org.apache.kafka.common.TopicPartition> partitions) {
consumerRef.set(consumer);
}

};
Expand Down
48 changes: 48 additions & 0 deletions src/reference/asciidoc/kafka.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -584,6 +584,54 @@ static class MultiListenerBean {
}
----

[[rebalance-listeners]]
===== Rebalance Listeners

`ContainerProperties` has a property `consumerRebalanceListener` which takes an implementation of the Kafka client's `consumerRebalanceListener` interface.
If this property is not provided, the container will configure a simple logging listener that logs rebalance events under the `INFO` level.
This framework adds a sub-interface `ConsumerAwareRebalanceListener`:

[source, java]
----
public interface ConsumerAwareRebalanceListener extends ConsumerRebalanceListener {

void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);

void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);

void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);

}
----

Notice that there are two callbacks when partitions are revoked; the first is called immediately; the second is called after any pending offsets are committed.
This is useful if you wish to maintain offsets in some external repository; for example:

[source, java]
----
containerProperties.setConsumerRebalanceListener(new ConsumerAwareRebalanceListener() {

@Override
public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
// acknowledge any pending Acknowledgments (if using manual acks)
}

@Override
public void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
// ...
store(consumer.position(partition));
// ...
}

@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// ...
consumer.seek(partition, offsetTracker.getOffset() + 1);
// ...
}
});
----

[[annotation-send-to]]
===== Forwarding Listener Results using @SendTo

Expand Down
5 changes: 5 additions & 0 deletions src/reference/asciidoc/whats-new.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ See <<annotation-send-to>> for more information.
Message listeners can now be aware of the `Consumer` object.
See <<message-listeners>> for more information.

===== ConsumerAwareRebalanceListener

Rebalance listeners can now access the `Consumer` object during rebalance notifications.
See <<rebalance-listeners>> for more information.

===== @EmbeddedKafka Annotation

For convenience a test class level `@EmbeddedKafka` annotation is provided with the purpose to register `KafkaEmbedded` as a bean.
Expand Down