Skip to content

Commit

Permalink
GH-2887: ConsumerAwareRebalanceListener cleanup
Browse files Browse the repository at this point in the history
Fixes: gh-2887

org.apache.kafka.clients.consumer.internals.ConsumerCoordinator has logic to
handle exceptions thrown by the rebalance listener. Rely on this capability
instead of custom exception handling and logging in ConsumerAwareRebalanceListener.

Particular exceptions like WakeupException are contracted in Javadoc.

Since Kafka has policy to manage these types of issues, rely on it instead of trying to
manage in the framework.
  • Loading branch information
midovlvn authored and sobychacko committed Dec 14, 2023
1 parent 878448e commit edfd8e8
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 70 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2021 the original author or authors.
* Copyright 2017-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,28 +18,22 @@

import java.util.Collection;

import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.common.TopicPartition;

import org.springframework.core.log.LogAccessor;

/**
* A rebalance listener that provides access to the consumer object. Starting with version
* 2.1.5, as a convenience, default no-op implementations are provided for all methods,
* allowing the user to implement just those (s)he is interested in.
*
* @author Gary Russell
* @author Michal Domagala
* @since 2.0
*
*/
public interface ConsumerAwareRebalanceListener extends ConsumerRebalanceListener {

/**
* {@link LogAccessor} for use in default methods.
*/
LogAccessor LOGGER = new LogAccessor(LogFactory.getLog(ConsumerAwareRebalanceListener.class));

/**
* The same as {@link #onPartitionsRevoked(Collection)} with the additional consumer
Expand All @@ -48,12 +42,7 @@ public interface ConsumerAwareRebalanceListener extends ConsumerRebalanceListene
* @param partitions the partitions.
*/
default void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
try {
onPartitionsRevoked(partitions);
}
catch (Exception e) { // NOSONAR
LOGGER.error(e, "User method threw exception");
}
onPartitionsRevoked(partitions);
}

/**
Expand All @@ -72,12 +61,7 @@ default void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<
* @since 2.4
*/
default void onPartitionsLost(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
try {
onPartitionsLost(partitions);
}
catch (Exception e) { // NOSONAR
LOGGER.error(e, "User method threw exception");
}
onPartitionsLost(partitions);
}

/**
Expand All @@ -87,12 +71,7 @@ default void onPartitionsLost(Consumer<?, ?> consumer, Collection<TopicPartition
* @param partitions the partitions.
*/
default void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
try {
onPartitionsAssigned(partitions);
}
catch (Exception e) { // NOSONAR
LOGGER.error(e, "User method threw exception");
}
onPartitionsAssigned(partitions);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021 the original author or authors.
* Copyright 2021-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -26,6 +26,7 @@

/**
* @author Gary Russell
* @author Michal Domagala
* @since 2.6.5
*
*/
Expand All @@ -45,20 +46,6 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
assertThat(called.get()).isTrue();
}

@Test
void nonConsumerAwareTestAssignedThrows() {
AtomicBoolean called = new AtomicBoolean();
new ConsumerAwareRebalanceListener() {

@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
called.set(true);
throw new RuntimeException();
}

}.onPartitionsAssigned(null, null);
assertThat(called.get()).isTrue();
}

@Test
void nonConsumerAwareTestRevoked() {
Expand All @@ -74,20 +61,6 @@ public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
assertThat(called.get()).isTrue();
}

@Test
void nonConsumerAwareTestRevokedThrows() {
AtomicBoolean called = new AtomicBoolean();
new ConsumerAwareRebalanceListener() {

@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
called.set(true);
throw new RuntimeException();
}

}.onPartitionsRevokedBeforeCommit(null, null);
assertThat(called.get()).isTrue();
}

@Test
void nonConsumerAwareTestLost() {
Expand All @@ -103,19 +76,4 @@ public void onPartitionsLost(Collection<TopicPartition> partitions) {
assertThat(called.get()).isTrue();
}

@Test
void nonConsumerAwareTestLostThrows() {
AtomicBoolean called = new AtomicBoolean();
new ConsumerAwareRebalanceListener() {

@Override
public void onPartitionsLost(Collection<TopicPartition> partitions) {
called.set(true);
throw new RuntimeException();
}

}.onPartitionsLost(null, null);
assertThat(called.get()).isTrue();
}

}

0 comments on commit edfd8e8

Please sign in to comment.