Skip to content

Dependencies are not wired into custom KafkaMessageChannelBinder with non-empty properties #3148

@ilia1243

Description

@ilia1243

Describe the issue

According to documentation https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream-binder-kafka.html#rebalance-listener

if you provide a single KafkaBindingRebalanceListener bean in the application context, it will be wired into all Kafka consumer bindings.

However, it is not wired into custom binder with non-empty properties.

To Reproduce

application.yml:

spring:
  cloud:
    function:
      definition: testConsumer
    stream:
      kafka:
        binder:
          brokers: localhost:9092
      binders:
        test-binder:
          type: 'kafka'
          environment:
            spring.cloud.stream.kafka.binder:
              consumer-properties:
                max.poll.records: 1            # < at least one property here 
      bindings:
        testConsumer-in-0:
          binder: test-binder
          group: test-group
          destination: test-topic

Kotlin reproducer:

@SpringBootApplication
class Application {
    @Bean
    fun testConsumer() = java.util.function.Consumer<GenericMessage<ByteArray>> {
    }
}

@Component
class RebalanceListener : KafkaBindingRebalanceListener {
    override fun onPartitionsAssigned(bindingName: String, consumer: Consumer<*, *>, partitions: Collection<TopicPartition?>?, initial: Boolean) {
        // In successful case this should be printed
        println("RebalanceListener is wired and called")
    }
}

fun main() {
    runApplication<Application>()
}
Dependencies:
plugins {
    kotlin("jvm") version "2.2.20"
    kotlin("plugin.spring") version "2.2.20"
}

repositories {
    mavenCentral()
}

dependencies {
    implementation(platform("org.springframework.boot:spring-boot-dependencies:4.0.0-M3"))
    implementation(platform("org.springframework.cloud:spring-cloud-dependencies:2025.1.0-M3"))
    implementation("org.springframework.cloud:spring-cloud-starter-stream-kafka")
}

If replace in application.yml

consumer-properties:
  max.poll.records: 1

to

consumer-properties: {}

the problem does not reproduce, and KafkaBindingRebalanceListener is wired and called.

This may be related to the way the child application context is created https://github.com/spring-cloud/spring-cloud-stream/blob/v5.0.0-M3/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/DefaultBinderFactory.java#L477

    ConfigurableApplicationContext initializeBinderContextSimple(String configurationName, Map<String, Object> binderProperties,
            BinderType binderType, BinderConfiguration binderConfiguration, boolean refresh) {
        ...
        boolean useApplicationContextAsParent = binderProperties.isEmpty()
                && this.context != null;
        ...
        if (useApplicationContextAsParent) {
            binderProducingContext.setParent(this.context);
        }

If properties are not empty, the child context has no reference to the parent one, while KafkaBindingRebalanceListener is defined in the parent context.

Version of the framework: 2025.1.0-M3
Expected behavior: KafkaBindingRebalanceListener should be wired to all binders.

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions