Skip to content

Commit

Permalink
Fix idleBetweenPolls max value calculation
Browse files Browse the repository at this point in the history
- Wrong default used when `max.poll.interval.ms` is not specified - 30s Vs 300s
- Value set in `@KafkaListener.properties` ignored for this calculatio and default
  used instead
- Also use actual default for `ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG`

**I will back port - expecting conflicts**

* Remove unused constant
  • Loading branch information
garyrussell committed Aug 17, 2020
1 parent cca897b commit 7303413
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ public class KafkaMessageListenerContainer<K, V> // NOSONAR line count
private static final boolean MICROMETER_PRESENT = ClassUtils.isPresent(
"io.micrometer.core.instrument.MeterRegistry", KafkaMessageListenerContainer.class.getClassLoader());

private static final Map<String, Object> CONSUMER_CONFIG_DEFAULTS = ConsumerConfig.configDef().defaultValues();

private final AbstractMessageListenerContainer<K, V> thisOrParentContainer;

private final TopicPartitionOffset[] topicPartitions;
Expand Down Expand Up @@ -468,8 +470,6 @@ public String toString() {

private final class ListenerConsumer implements SchedulingAwareRunnable, ConsumerSeekCallback {

private static final int SIXTY = 60;

private static final String UNCHECKED = "unchecked";

private static final String RAWTYPES = "rawtypes";
Expand Down Expand Up @@ -615,7 +615,7 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume

@SuppressWarnings(UNCHECKED)
ListenerConsumer(GenericMessageListener<?> listener, ListenerType listenerType) {
Properties consumerProperties = new Properties(this.containerProperties.getKafkaConsumerProperties());
Properties consumerProperties = propertiesFromProperties();
checkGroupInstance(consumerProperties, KafkaMessageListenerContainer.this.consumerFactory);
this.autoCommit = determineAutoCommit(consumerProperties);
this.consumer =
Expand Down Expand Up @@ -696,6 +696,20 @@ else if (listener instanceof MessageListener) {
this.micrometerHolder = obtainMicrometerHolder();
}

private Properties propertiesFromProperties() {
Properties propertyOverrides = this.containerProperties.getKafkaConsumerProperties();
Properties props = new Properties();
props.putAll(propertyOverrides);
Set<String> stringPropertyNames = propertyOverrides.stringPropertyNames();
// User might have provided properties as defaults
stringPropertyNames.forEach((name) -> {
if (!props.contains(name)) {
props.setProperty(name, propertyOverrides.getProperty(name));
}
});
return props;
}

private void checkGroupInstance(Properties properties, ConsumerFactory<K, V> consumerFactory) {
String groupInstance = properties.getProperty(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG);
if (!StringUtils.hasText(groupInstance)) {
Expand Down Expand Up @@ -755,9 +769,9 @@ else if (timeout instanceof String) {
this.logger.warn(() -> "Unexpected type: " + timeoutToLog.getClass().getName()
+ " in property '"
+ ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG
+ "'; defaulting to 30 seconds.");
+ "'; using Kafka default.");
}
return Duration.ofSeconds(SIXTY / 2).toMillis(); // Default 'max.poll.interval.ms' is 30 seconds
return (int) CONSUMER_CONFIG_DEFAULTS.get(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG);
}
}

Expand Down Expand Up @@ -824,12 +838,12 @@ else if (timeout instanceof String) {
this.logger.warn(() -> "Unexpected type: " + timeoutToLog.getClass().getName()
+ " in property '"
+ ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG
+ "'; defaulting to 60 seconds for sync commit timeouts");
+ "'; defaulting to Kafka default for sync commit timeouts");
}
return Duration.ofSeconds(SIXTY);
return Duration
.ofMillis((int) CONSUMER_CONFIG_DEFAULTS.get(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG));
}
}

}

private Object findDeserializerClass(Map<String, Object> props, boolean isValue) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.ContainerProperties.AckMode;
import org.springframework.kafka.listener.KafkaListenerErrorHandler;
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
import org.springframework.kafka.listener.ListenerExecutionFailedException;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.kafka.listener.adapter.FilteringMessageListenerAdapter;
Expand Down Expand Up @@ -225,6 +226,10 @@ public void testAnonymous() {
List<?> containers = KafkaTestUtils.getPropertyValue(container, "containers", List.class);
assertThat(KafkaTestUtils.getPropertyValue(containers.get(0), "listenerConsumer.consumerGroupId"))
.isEqualTo(DEFAULT_TEST_GROUP_ID);
assertThat(KafkaTestUtils.getPropertyValue(containers.get(0), "listenerConsumer.maxPollInterval"))
.isEqualTo(300000L);
assertThat(KafkaTestUtils.getPropertyValue(containers.get(0), "listenerConsumer.syncCommitTimeout"))
.isEqualTo(Duration.ofSeconds(60));
container.stop();
}

Expand Down Expand Up @@ -351,20 +356,24 @@ public void testAutoStartup() {
assertThat(listenerContainer.getContainerProperties().getSyncCommitTimeout()).isNull();
this.registry.start();
assertThat(listenerContainer.isRunning()).isTrue();
assertThat(((ConcurrentMessageListenerContainer<?, ?>) listenerContainer)
KafkaMessageListenerContainer<?, ?> kafkaMessageListenerContainer =
((ConcurrentMessageListenerContainer<?, ?>) listenerContainer)
.getContainers()
.get(0)
.get(0);
assertThat(kafkaMessageListenerContainer
.getContainerProperties().getSyncCommitTimeout())
.isEqualTo(Duration.ofSeconds(60));
.isEqualTo(Duration.ofSeconds(59));
assertThat(listenerContainer.getContainerProperties().getSyncCommitTimeout())
.isEqualTo(Duration.ofSeconds(60));
.isEqualTo(Duration.ofSeconds(59));
listenerContainer.stop();
assertThat(KafkaTestUtils.getPropertyValue(listenerContainer, "containerProperties.syncCommits", Boolean.class))
.isFalse();
assertThat(KafkaTestUtils.getPropertyValue(listenerContainer, "containerProperties.commitCallback"))
.isNotNull();
assertThat(KafkaTestUtils.getPropertyValue(listenerContainer, "containerProperties.consumerRebalanceListener"))
.isNotNull();
assertThat(KafkaTestUtils.getPropertyValue(kafkaMessageListenerContainer, "listenerConsumer.maxPollInterval"))
.isEqualTo(301000L);
}

@Test
Expand Down Expand Up @@ -1618,7 +1627,9 @@ static class Listener implements ConsumerSeekAware {
volatile CustomMethodArgument customMethodArgument;

@KafkaListener(id = "manualStart", topics = "manualStart",
containerFactory = "kafkaAutoStartFalseListenerContainerFactory")
containerFactory = "kafkaAutoStartFalseListenerContainerFactory",
properties = { ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG + ":301000",
ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG + ":59000" })
public void manualStart(String foo) {
}

Expand Down

0 comments on commit 7303413

Please sign in to comment.