diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index 7fba33174c..f171a23ea1 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -141,6 +141,8 @@ public class KafkaMessageListenerContainer // NOSONAR line count private static final boolean MICROMETER_PRESENT = ClassUtils.isPresent( "io.micrometer.core.instrument.MeterRegistry", KafkaMessageListenerContainer.class.getClassLoader()); + private static final Map CONSUMER_CONFIG_DEFAULTS = ConsumerConfig.configDef().defaultValues(); + private final AbstractMessageListenerContainer thisOrParentContainer; private final TopicPartitionOffset[] topicPartitions; @@ -468,8 +470,6 @@ public String toString() { private final class ListenerConsumer implements SchedulingAwareRunnable, ConsumerSeekCallback { - private static final int SIXTY = 60; - private static final String UNCHECKED = "unchecked"; private static final String RAWTYPES = "rawtypes"; @@ -615,7 +615,7 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume @SuppressWarnings(UNCHECKED) ListenerConsumer(GenericMessageListener listener, ListenerType listenerType) { - Properties consumerProperties = new Properties(this.containerProperties.getKafkaConsumerProperties()); + Properties consumerProperties = propertiesFromProperties(); checkGroupInstance(consumerProperties, KafkaMessageListenerContainer.this.consumerFactory); this.autoCommit = determineAutoCommit(consumerProperties); this.consumer = @@ -696,6 +696,20 @@ else if (listener instanceof MessageListener) { this.micrometerHolder = obtainMicrometerHolder(); } + private Properties propertiesFromProperties() { + Properties propertyOverrides = this.containerProperties.getKafkaConsumerProperties(); + Properties props = new Properties(); + props.putAll(propertyOverrides); + Set stringPropertyNames = propertyOverrides.stringPropertyNames(); + // User might have provided properties as defaults + stringPropertyNames.forEach((name) -> { + if (!props.contains(name)) { + props.setProperty(name, propertyOverrides.getProperty(name)); + } + }); + return props; + } + private void checkGroupInstance(Properties properties, ConsumerFactory consumerFactory) { String groupInstance = properties.getProperty(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG); if (!StringUtils.hasText(groupInstance)) { @@ -755,9 +769,9 @@ else if (timeout instanceof String) { this.logger.warn(() -> "Unexpected type: " + timeoutToLog.getClass().getName() + " in property '" + ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG - + "'; defaulting to 30 seconds."); + + "'; using Kafka default."); } - return Duration.ofSeconds(SIXTY / 2).toMillis(); // Default 'max.poll.interval.ms' is 30 seconds + return (int) CONSUMER_CONFIG_DEFAULTS.get(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG); } } @@ -824,12 +838,12 @@ else if (timeout instanceof String) { this.logger.warn(() -> "Unexpected type: " + timeoutToLog.getClass().getName() + " in property '" + ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG - + "'; defaulting to 60 seconds for sync commit timeouts"); + + "'; defaulting to Kafka default for sync commit timeouts"); } - return Duration.ofSeconds(SIXTY); + return Duration + .ofMillis((int) CONSUMER_CONFIG_DEFAULTS.get(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG)); } } - } private Object findDeserializerClass(Map props, boolean isValue) { diff --git a/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java index 9d85f92178..8c8d733c7a 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java @@ -91,6 +91,7 @@ import org.springframework.kafka.listener.ContainerProperties; import org.springframework.kafka.listener.ContainerProperties.AckMode; import org.springframework.kafka.listener.KafkaListenerErrorHandler; +import org.springframework.kafka.listener.KafkaMessageListenerContainer; import org.springframework.kafka.listener.ListenerExecutionFailedException; import org.springframework.kafka.listener.MessageListenerContainer; import org.springframework.kafka.listener.adapter.FilteringMessageListenerAdapter; @@ -225,6 +226,10 @@ public void testAnonymous() { List containers = KafkaTestUtils.getPropertyValue(container, "containers", List.class); assertThat(KafkaTestUtils.getPropertyValue(containers.get(0), "listenerConsumer.consumerGroupId")) .isEqualTo(DEFAULT_TEST_GROUP_ID); + assertThat(KafkaTestUtils.getPropertyValue(containers.get(0), "listenerConsumer.maxPollInterval")) + .isEqualTo(300000L); + assertThat(KafkaTestUtils.getPropertyValue(containers.get(0), "listenerConsumer.syncCommitTimeout")) + .isEqualTo(Duration.ofSeconds(60)); container.stop(); } @@ -351,13 +356,15 @@ public void testAutoStartup() { assertThat(listenerContainer.getContainerProperties().getSyncCommitTimeout()).isNull(); this.registry.start(); assertThat(listenerContainer.isRunning()).isTrue(); - assertThat(((ConcurrentMessageListenerContainer) listenerContainer) + KafkaMessageListenerContainer kafkaMessageListenerContainer = + ((ConcurrentMessageListenerContainer) listenerContainer) .getContainers() - .get(0) + .get(0); + assertThat(kafkaMessageListenerContainer .getContainerProperties().getSyncCommitTimeout()) - .isEqualTo(Duration.ofSeconds(60)); + .isEqualTo(Duration.ofSeconds(59)); assertThat(listenerContainer.getContainerProperties().getSyncCommitTimeout()) - .isEqualTo(Duration.ofSeconds(60)); + .isEqualTo(Duration.ofSeconds(59)); listenerContainer.stop(); assertThat(KafkaTestUtils.getPropertyValue(listenerContainer, "containerProperties.syncCommits", Boolean.class)) .isFalse(); @@ -365,6 +372,8 @@ public void testAutoStartup() { .isNotNull(); assertThat(KafkaTestUtils.getPropertyValue(listenerContainer, "containerProperties.consumerRebalanceListener")) .isNotNull(); + assertThat(KafkaTestUtils.getPropertyValue(kafkaMessageListenerContainer, "listenerConsumer.maxPollInterval")) + .isEqualTo(301000L); } @Test @@ -1618,7 +1627,9 @@ static class Listener implements ConsumerSeekAware { volatile CustomMethodArgument customMethodArgument; @KafkaListener(id = "manualStart", topics = "manualStart", - containerFactory = "kafkaAutoStartFalseListenerContainerFactory") + containerFactory = "kafkaAutoStartFalseListenerContainerFactory", + properties = { ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG + ":301000", + ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG + ":59000" }) public void manualStart(String foo) { }