From 42940821ec71d0efe350d408100278f815487e69 Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Wed, 19 Jun 2024 12:14:55 -0400 Subject: [PATCH] GH-3319: Fix NPE in the `KafkaListenerEndpointRegistry` The `KafkaListenerEndpointRegistry.getUnregisteredListenerContainer()` returns `null` when container is already present in the internal `unregisteredContainers` cache * Fix `KafkaListenerEndpointRegistry.getUnregisteredListenerContainer()` to return a container instance from the `unregisteredContainers` cache **Auto-cherry-pick to `3.2.x` & `3.1.x`** --- .../config/KafkaListenerEndpointRegistry.java | 2 +- .../KafkaListenerEndpointRegistryTests.java | 19 +++++++++++++++++++ 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointRegistry.java b/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointRegistry.java index daf046fa59..da5e2692b9 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointRegistry.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointRegistry.java @@ -168,7 +168,7 @@ public MessageListenerContainer getUnregisteredListenerContainer(String id) { refreshContextContainers(); return this.unregisteredContainers.get(id); } - return null; + return container; } /** diff --git a/spring-kafka/src/test/java/org/springframework/kafka/config/KafkaListenerEndpointRegistryTests.java b/spring-kafka/src/test/java/org/springframework/kafka/config/KafkaListenerEndpointRegistryTests.java index 2576f38b74..c9a250ac5a 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/config/KafkaListenerEndpointRegistryTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/config/KafkaListenerEndpointRegistryTests.java @@ -35,11 +35,15 @@ import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; +import org.springframework.context.support.GenericApplicationContext; +import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import org.springframework.kafka.listener.MessageListenerContainer; /** * @author Gary Russell * @author Joo Hyuk Kim + * @author Artem Bilan + * * @since 2.8.9 */ public class KafkaListenerEndpointRegistryTests { @@ -139,6 +143,21 @@ void getListenerContainersMatchingBiPredicate(List names, BiPredicate listenerContainerMock = mock(ConcurrentMessageListenerContainer.class); + given(listenerContainerMock.getListenerId()).willReturn("testListenerContainer"); + applicationContext.registerBean(ConcurrentMessageListenerContainer.class, () -> listenerContainerMock); + applicationContext.refresh(); + registry.setApplicationContext(applicationContext); + // Lazy-load from application context + assertThat(registry.getUnregisteredListenerContainer("testListenerContainer")).isNotNull(); + // From internal map + assertThat(registry.getUnregisteredListenerContainer("testListenerContainer")).isNotNull(); + } + /** * Provides parameters for the getListenerContainersMatchingBiPredicate test. * Each set of parameters includes a list of names, a bi-predicate, and the expected count of matching containers.