From edfd8e8c5e0fe12ccfee4729674fa376c2f93915 Mon Sep 17 00:00:00 2001 From: MichalDomagala Date: Wed, 8 Nov 2023 18:54:22 +0100 Subject: [PATCH] GH-2887: ConsumerAwareRebalanceListener cleanup Fixes: gh-2887 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator has logic to handle exceptions thrown by the rebalance listener. Rely on this capability instead of custom exception handling and logging in ConsumerAwareRebalanceListener. Particular exceptions like WakeupException are contracted in Javadoc. Since Kafka has policy to manage these types of issues, rely on it instead of trying to manage in the framework. --- .../ConsumerAwareRebalanceListener.java | 31 ++----------- .../ConsumerAwareRebalanceListenerTests.java | 46 +------------------ 2 files changed, 7 insertions(+), 70 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerAwareRebalanceListener.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerAwareRebalanceListener.java index 129a14a558..4a1b1fffe9 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerAwareRebalanceListener.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerAwareRebalanceListener.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2021 the original author or authors. + * Copyright 2017-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,28 +18,22 @@ import java.util.Collection; -import org.apache.commons.logging.LogFactory; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.common.TopicPartition; -import org.springframework.core.log.LogAccessor; - /** * A rebalance listener that provides access to the consumer object. Starting with version * 2.1.5, as a convenience, default no-op implementations are provided for all methods, * allowing the user to implement just those (s)he is interested in. * * @author Gary Russell + * @author Michal Domagala * @since 2.0 * */ public interface ConsumerAwareRebalanceListener extends ConsumerRebalanceListener { - /** - * {@link LogAccessor} for use in default methods. - */ - LogAccessor LOGGER = new LogAccessor(LogFactory.getLog(ConsumerAwareRebalanceListener.class)); /** * The same as {@link #onPartitionsRevoked(Collection)} with the additional consumer @@ -48,12 +42,7 @@ public interface ConsumerAwareRebalanceListener extends ConsumerRebalanceListene * @param partitions the partitions. */ default void onPartitionsRevokedBeforeCommit(Consumer consumer, Collection partitions) { - try { - onPartitionsRevoked(partitions); - } - catch (Exception e) { // NOSONAR - LOGGER.error(e, "User method threw exception"); - } + onPartitionsRevoked(partitions); } /** @@ -72,12 +61,7 @@ default void onPartitionsRevokedAfterCommit(Consumer consumer, Collection< * @since 2.4 */ default void onPartitionsLost(Consumer consumer, Collection partitions) { - try { - onPartitionsLost(partitions); - } - catch (Exception e) { // NOSONAR - LOGGER.error(e, "User method threw exception"); - } + onPartitionsLost(partitions); } /** @@ -87,12 +71,7 @@ default void onPartitionsLost(Consumer consumer, Collection consumer, Collection partitions) { - try { - onPartitionsAssigned(partitions); - } - catch (Exception e) { // NOSONAR - LOGGER.error(e, "User method threw exception"); - } + onPartitionsAssigned(partitions); } @Override diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/ConsumerAwareRebalanceListenerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/ConsumerAwareRebalanceListenerTests.java index 62c4afb2af..605f5b3322 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/ConsumerAwareRebalanceListenerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/ConsumerAwareRebalanceListenerTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2021 the original author or authors. + * Copyright 2021-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,6 +26,7 @@ /** * @author Gary Russell + * @author Michal Domagala * @since 2.6.5 * */ @@ -45,20 +46,6 @@ public void onPartitionsAssigned(Collection partitions) { assertThat(called.get()).isTrue(); } - @Test - void nonConsumerAwareTestAssignedThrows() { - AtomicBoolean called = new AtomicBoolean(); - new ConsumerAwareRebalanceListener() { - - @Override - public void onPartitionsAssigned(Collection partitions) { - called.set(true); - throw new RuntimeException(); - } - - }.onPartitionsAssigned(null, null); - assertThat(called.get()).isTrue(); - } @Test void nonConsumerAwareTestRevoked() { @@ -74,20 +61,6 @@ public void onPartitionsRevoked(Collection partitions) { assertThat(called.get()).isTrue(); } - @Test - void nonConsumerAwareTestRevokedThrows() { - AtomicBoolean called = new AtomicBoolean(); - new ConsumerAwareRebalanceListener() { - - @Override - public void onPartitionsRevoked(Collection partitions) { - called.set(true); - throw new RuntimeException(); - } - - }.onPartitionsRevokedBeforeCommit(null, null); - assertThat(called.get()).isTrue(); - } @Test void nonConsumerAwareTestLost() { @@ -103,19 +76,4 @@ public void onPartitionsLost(Collection partitions) { assertThat(called.get()).isTrue(); } - @Test - void nonConsumerAwareTestLostThrows() { - AtomicBoolean called = new AtomicBoolean(); - new ConsumerAwareRebalanceListener() { - - @Override - public void onPartitionsLost(Collection partitions) { - called.set(true); - throw new RuntimeException(); - } - - }.onPartitionsLost(null, null); - assertThat(called.get()).isTrue(); - } - }