diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java index 7bb6c6efca..7a208b3514 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java @@ -277,14 +277,9 @@ public Object postProcessAfterInitialization(final Object bean, final String bea final boolean hasClassLevelListeners = classLevelListeners.size() > 0; final List multiMethods = new ArrayList<>(); Map> annotatedMethods = MethodIntrospector.selectMethods(targetClass, - new MethodIntrospector.MetadataLookup>() { - - @Override - public Set inspect(Method method) { - Set listenerMethods = findListenerAnnotations(method); - return (!listenerMethods.isEmpty() ? listenerMethods : null); - } - + (MethodIntrospector.MetadataLookup>) method -> { + Set listenerMethods = findListenerAnnotations(method); + return (!listenerMethods.isEmpty() ? listenerMethods : null); }); if (hasClassLevelListeners) { Set methodsWithHandler = MethodIntrospector.selectMethods(targetClass, @@ -427,8 +422,7 @@ protected void processListener(MethodKafkaListenerEndpoint endpoint, Kafka endpoint.setTopicPartitions(resolveTopicPartitions(kafkaListener)); endpoint.setTopics(resolveTopics(kafkaListener)); endpoint.setTopicPattern(resolvePattern(kafkaListener)); - endpoint.setClientIdPrefix(resolveExpressionAsString(kafkaListener.clientIdPrefix(), - "clientIdPrefix")); + endpoint.setClientIdPrefix(resolveExpressionAsString(kafkaListener.clientIdPrefix(), "clientIdPrefix")); String group = kafkaListener.containerGroup(); if (StringUtils.hasText(group)) { Object resolvedGroup = resolveExpression(group); @@ -475,11 +469,14 @@ private void resolveKafkaProperties(MethodKafkaListenerEndpoint endpoint, if (propertyStrings.length > 0) { Properties properties = new Properties(); for (String property : propertyStrings) { - try { - properties.load(new StringReader(resolveExpressionAsString(property, "property"))); - } - catch (IOException e) { - this.logger.error("Failed to load property " + property + ", continuing...", e); + String value = resolveExpressionAsString(property, "property"); + if (value != null) { + try { + properties.load(new StringReader(value)); + } + catch (IOException e) { + this.logger.error("Failed to load property " + property + ", continuing...", e); + } } } endpoint.setConsumerProperties(properties); @@ -488,7 +485,7 @@ private void resolveKafkaProperties(MethodKafkaListenerEndpoint endpoint, private String getEndpointId(KafkaListener kafkaListener) { if (StringUtils.hasText(kafkaListener.id())) { - return resolve(kafkaListener.id()); + return resolveExpressionAsString(kafkaListener.id(), "id"); } else { return GENERATED_ID_PREFIX + this.counter.getAndIncrement(); @@ -514,19 +511,19 @@ private TopicPartitionInitialOffset[] resolveTopicPartitions(KafkaListener kafka result.addAll(resolveTopicPartitionsList(topicPartition)); } } - return result.toArray(new TopicPartitionInitialOffset[result.size()]); + return result.toArray(new TopicPartitionInitialOffset[0]); } private String[] resolveTopics(KafkaListener kafkaListener) { String[] topics = kafkaListener.topics(); List result = new ArrayList<>(); if (topics.length > 0) { - for (int i = 0; i < topics.length; i++) { - Object topic = resolveExpression(topics[i]); + for (String topic1 : topics) { + Object topic = resolveExpression(topic1); resolveAsString(topic, result); } } - return result.toArray(new String[result.size()]); + return result.toArray(new String[0]); } private Pattern resolvePattern(KafkaListener kafkaListener) { @@ -558,8 +555,8 @@ private List resolveTopicPartitionsList(TopicPartit Assert.state(partitions.length > 0 || partitionOffsets.length > 0, "At least one 'partition' or 'partitionOffset' required in @TopicPartition for topic '" + topic + "'"); List result = new ArrayList<>(); - for (int i = 0; i < partitions.length; i++) { - resolvePartitionAsInteger((String) topic, resolveExpression(partitions[i]), result); + for (String partition : partitions) { + resolvePartitionAsInteger((String) topic, resolveExpression(partition), result); } for (PartitionOffset partitionOffset : partitionOffsets) { diff --git a/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java index 763f0f5b98..1bf8ba2789 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java @@ -17,7 +17,7 @@ package org.springframework.kafka.annotation; import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; import static org.mockito.ArgumentMatchers.anyMap; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.isNull; @@ -255,7 +255,7 @@ public void testSimple() throws Exception { "listenerConsumer.consumer")); assertThat( KafkaTestUtils.getPropertyValue(this.listener.listen4Consumer, "fetcher.maxPollRecords", Integer.class)) - .isEqualTo(100); + .isEqualTo(100); assertThat(this.quxGroup).hasSize(1); assertThat(this.quxGroup.get(0)).isSameAs(manualContainer); List containers = KafkaTestUtils.getPropertyValue(manualContainer, "containers", List.class); @@ -309,14 +309,12 @@ public void testSimple() throws Exception { .isNotEqualTo("rebalanceListener"); String clientId = KafkaTestUtils.getPropertyValue(rebalanceContainer, "listenerConsumer.consumer.clientId", String.class); - assertThat( - clientId) - .startsWith("rebal-"); + assertThat(clientId).startsWith("rebal-"); assertThat(clientId.indexOf('-')).isEqualTo(clientId.lastIndexOf('-')); } @Test - public void testAutoStartup() throws Exception { + public void testAutoStartup() { MessageListenerContainer listenerContainer = registry.getListenerContainer("manualStart"); assertThat(listenerContainer).isNotNull(); assertThat(listenerContainer.isRunning()).isFalse(); @@ -399,7 +397,6 @@ public void testJson() throws Exception { } @Test - @DirtiesContext public void testJsonHeaders() throws Exception { ConcurrentMessageListenerContainer container = (ConcurrentMessageListenerContainer) registry.getListenerContainer("jsonHeaders"); @@ -546,7 +543,7 @@ public void testValidation() throws Exception { } @Test - public void testReplyingListener() throws Exception { + public void testReplyingListener() { Map consumerProps = new HashMap<>(this.consumerFactory.getConfigurationProperties()); consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "testReplying"); ConsumerFactory cf = new DefaultKafkaConsumerFactory<>(consumerProps); @@ -560,7 +557,7 @@ public void testReplyingListener() throws Exception { } @Test - public void testReplyingBatchListener() throws Exception { + public void testReplyingBatchListener() { Map consumerProps = new HashMap<>(this.consumerFactory.getConfigurationProperties()); consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "testBatchReplying"); ConsumerFactory cf = new DefaultKafkaConsumerFactory<>(consumerProps); @@ -586,7 +583,7 @@ public void testReplyingBatchListener() throws Exception { } @Test - public void testReplyingListenerWithErrorHandler() throws Exception { + public void testReplyingListenerWithErrorHandler() { Map consumerProps = new HashMap<>(this.consumerFactory.getConfigurationProperties()); consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "testErrorHandlerReplying"); ConsumerFactory cf = new DefaultKafkaConsumerFactory<>(consumerProps); @@ -600,7 +597,7 @@ public void testReplyingListenerWithErrorHandler() throws Exception { } @Test - public void testVoidListenerWithReplyingErrorHandler() throws Exception { + public void testVoidListenerWithReplyingErrorHandler() { Map consumerProps = new HashMap<>(this.consumerFactory.getConfigurationProperties()); consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "testVoidWithErrorHandlerReplying"); ConsumerFactory cf = new DefaultKafkaConsumerFactory<>(consumerProps); @@ -614,7 +611,7 @@ public void testVoidListenerWithReplyingErrorHandler() throws Exception { } @Test - public void testReplyingBatchListenerWithErrorHandler() throws Exception { + public void testReplyingBatchListenerWithErrorHandler() { Map consumerProps = new HashMap<>(this.consumerFactory.getConfigurationProperties()); consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "testErrorHandlerBatchReplying"); ConsumerFactory cf = new DefaultKafkaConsumerFactory<>(consumerProps); @@ -640,7 +637,7 @@ public void testReplyingBatchListenerWithErrorHandler() throws Exception { } @Test - public void testMultiReplyTo() throws Exception { + public void testMultiReplyTo() { Map consumerProps = new HashMap<>(this.consumerFactory.getConfigurationProperties()); consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "testMultiReplying"); ConsumerFactory cf = new DefaultKafkaConsumerFactory<>(consumerProps); @@ -709,10 +706,12 @@ public void testAddingTopics() { assertThat(embeddedKafka.getTopics().size()).isEqualTo(count + 1); embeddedKafka.addTopics(new NewTopic("morePartitions", 10, (short) 1)); assertThat(embeddedKafka.getTopics().size()).isEqualTo(count + 2); - assertThatThrownBy(() -> embeddedKafka.addTopics(new NewTopic("morePartitions", 10, (short) 1))) - .isInstanceOf(IllegalArgumentException.class).hasMessageContaining("exists"); - assertThatThrownBy(() -> embeddedKafka.addTopics(new NewTopic("morePartitions2", 10, (short) 2))) - .isInstanceOf(IllegalArgumentException.class).hasMessageContaining("replication"); + assertThatIllegalArgumentException() + .isThrownBy(() -> embeddedKafka.addTopics(new NewTopic("morePartitions", 10, (short) 1))) + .withMessageContaining("exists"); + assertThatIllegalArgumentException() + .isThrownBy(() -> embeddedKafka.addTopics(new NewTopic("morePartitions2", 10, (short) 2))) + .withMessageContaining("replication"); Map consumerProps = new HashMap<>(this.consumerFactory.getConfigurationProperties()); consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "testMultiReplying"); ConsumerFactory cf = new DefaultKafkaConsumerFactory<>(consumerProps); @@ -733,7 +732,7 @@ public void testReceivePollResults() throws Exception { @Test public void testAutoConfigTm() { assertThat(this.transactionalFactory.getContainerProperties().getTransactionManager()) - .isInstanceOf(ChainedKafkaTransactionManager.class); + .isInstanceOf(ChainedKafkaTransactionManager.class); } @Test @@ -1416,8 +1415,9 @@ public void listen3(ConsumerRecord record) { this.latch3.countDown(); } - @KafkaListener(id = "qux", topics = "annotated4", containerFactory = "kafkaManualAckListenerContainerFactory", - containerGroup = "qux#{'Group'}", properties = { + @KafkaListener(id = "#{'qux'}", topics = "annotated4", + containerFactory = "kafkaManualAckListenerContainerFactory", containerGroup = "qux#{'Group'}", + properties = { "max.poll.interval.ms:#{'${poll.interval:60000}'}", ConsumerConfig.MAX_POLL_RECORDS_CONFIG + "=#{'${poll.recs:100}'}" }) @@ -1879,6 +1879,7 @@ public void setDelegate( public Foo convert(String source) { return delegate.convert(source); } + } public static class ValidatedClass {