Skip to content

Support for setting KafkaAdmin#operationTimeout via externalised configuration #2430

@hari-mani

Description

@hari-mani

Expected Behavior

Setting the property spring.kafka.admin.operationTimeout should set the value for KafkaAdmin#operationTimeout

Current Behavior

To set the mentioned property the application is required to create a bean of type KafkaAdmin and set value for operationTimeout using the setter as show below

 @Bean
  public KafkaAdmin kafkaAdmin() {
    KafkaAdmin kafkaAdmin = new KafkaAdmin(this.properties.buildAdminProperties());
    kafkaAdmin.setFatalIfBrokerNotAvailable(this.properties.getAdmin().isFailFast());
    kafkaAdmin.setModifyTopicConfigs(this.properties.getAdmin().isModifyTopicConfigs());
    kafkaAdmin.setOperationTimeout(1000);
    return kafkaAdmin;
  }

Context

While trying to run an application using spring-kafka I got the below exception where the operation was getting timeout due to it's inability to complete within the default value set for operationTimeout.

java.util.concurrent.TimeoutException: null
        at java.base/java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1960) ~[na:na]
        at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2095) ~[na:na]
        at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:180) ~[kafka-clients-3.3.1.jar:na]
        at org.springframework.kafka.core.KafkaAdmin.clusterId(KafkaAdmin.java:233) ~[spring-kafka-3.0.0-SNAPSHOT.jar:3.0.0-SNAPSHOT]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.obtainClusterId(KafkaMessageListenerContainer.java:941) ~[spring-kafka-3.0.0-SNAPSHOT.jar:3.0.0-SNAPSHOT]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.<init>(KafkaMessageListenerContainer.java:919) ~[spring-kafka-3.0.0-SNAPSHOT.jar:3.0.0-SNAPSHOT]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer.doStart(KafkaMessageListenerContainer.java:376) ~[spring-kafka-3.0.0-SNAPSHOT.jar:3.0.0-SNAPSHOT]
        at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:462) ~[spring-kafka-3.0.0-SNAPSHOT.jar:3.0.0-SNAPSHOT]
        at org.springframework.kafka.listener.ConcurrentMessageListenerContainer.doStart(ConcurrentMessageListenerContainer.java:226) ~[spring-kafka-3.0.0-SNAPSHOT.jar:3.0.0-SNAPSHOT]
        at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:462) ~[spring-kafka-3.0.0-SNAPSHOT.jar:3.0.0-SNAPSHOT]
        at org.springframework.kafka.config.KafkaListenerEndpointRegistry.startIfNecessary(KafkaListenerEndpointRegistry.java:383) ~[spring-kafka-3.0.0-SNAPSHOT.jar:3.0.0-SNAPSHOT]
        at org.springframework.kafka.config.KafkaListenerEndpointRegistry.start(KafkaListenerEndpointRegistry.java:328) ~[spring-kafka-3.0.0-SNAPSHOT.jar:3.0.0-SNAPSHOT]
        at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:178) ~[spring-context-6.0.0-SNAPSHOT.jar:6.0.0-SNAPSHOT]
        at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:356) ~[spring-context-6.0.0-SNAPSHOT.jar:6.0.0-SNAPSHOT]
        at java.base/java.lang.Iterable.forEach(Iterable.java:75) ~[na:na]
        at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:155) ~[spring-context-6.0.0-SNAPSHOT.jar:6.0.0-SNAPSHOT]
        at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:123) ~[spring-context-6.0.0-SNAPSHOT.jar:6.0.0-SNAPSHOT]
        at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:943) ~[spring-context-6.0.0-SNAPSHOT.jar:6.0.0-SNAPSHOT]
        at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:595) ~[spring-context-6.0.0-SNAPSHOT.jar:6.0.0-SNAPSHOT]
        at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:742) ~[spring-boot-3.0.0-SNAPSHOT.jar:3.0.0-SNAPSHOT]
        at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:433) ~[spring-boot-3.0.0-SNAPSHOT.jar:3.0.0-SNAPSHOT]
        at org.springframework.boot.SpringApplication.run(SpringApplication.java:313) ~[spring-boot-3.0.0-SNAPSHOT.jar:3.0.0-SNAPSHOT]
        at org.springframework.boot.SpringApplication.run(SpringApplication.java:1314) ~[spring-boot-3.0.0-SNAPSHOT.jar:3.0.0-SNAPSHOT]
        at org.springframework.boot.SpringApplication.run(SpringApplication.java:1303) ~[spring-boot-3.0.0-SNAPSHOT.jar:3.0.0-SNAPSHOT]

Also the javadoc mentions that the value for operationTimeout denotes the value in seconds but it is used with unit as TimeUnit.MILLISECONDS when passed to KafkaFuture.get().

this.clusterId = client.describeCluster().clusterId().get(this.operationTimeout, TimeUnit.MILLISECONDS);

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions