-
Notifications
You must be signed in to change notification settings - Fork 639
Description
Describe the issue
When using Spring Cloud Stream with Kafka (Spring Boot 3.5.7, Spring Cloud 2025.0.0), consumer bindings that utilize generic types fail to process messages if the bean is defined as a concrete implementation class rather than as the functional interface type (Consumer<T>, Consumer<Message<T>>, etc.). This issue affects both batch and single (default) consumption modes, though the specific errors differ between modes.
Working vs. Failing Pattern
✅ Working pattern – Bean declared as functional interface:
@Bean
public Consumer<List<ProductCompleteDTO>> productBatchListConsumer2() {
return new GenericBatchListConsumer<>("productBatchListConsumer2");
}❌ Failing pattern – Bean declared as concrete implementation:
@Bean
public GenericBatchListConsumer<ProductCompleteDTO> productBatchListConsumer() {
return new GenericBatchListConsumer<>("productBatchListConsumer");
}Detailed Observations
Considering the consumer beans defined in the ConfigurationApp class of the sample repository:
Single Mode Behavior:
productConsumerdefined asGenericConsumer<ProductCompleteDTO>: ✅ Works correctlyproductConsumer2defined asConsumer<ProductCompleteDTO>: ✅ Works correctlyproductMessageConsumerdefined asGenericMessageConsumer<ProductCompleteDTO>: ❌ Fails with class cast errorproductMessageConsumer2defined asConsumer<Message<ProductCompleteDTO>>: ✅ Works correctly
Batch Mode Behavior:
productBatchListConsumerdefined asGenericBatchListConsumer<ProductCompleteDTO>: ❌ Fails with conversion errorproductBatchListConsumer2defined asConsumer<List<ProductCompleteDTO>>: ✅ Works correctlyproductBatchListMessageConsumerdefined asGenericBatchMessageListConsumer<ProductCompleteDTO>: ❌ Fails with conversion errorproductBatchListMessageConsumer2defined asConsumer<Message<List<ProductCompleteDTO>>>: ✅ Works correctly
Note
The common factor in all failing cases is the combination of Bean declared as a concrete implementation class (not interface)
Note
In the case of the productConsumer definition, which is declared as a concrete implementation GenericConsumer<ProductCompleteDTO>, it does work correctly. This is the only case observed that works with a concrete implementation class.
Error Examples
Batch Mode Error (attempting to convert List to single DTO):
Caused by: org.springframework.core.convert.ConversionFailedException: Failed to convert from type [java.util.ArrayList<?>] to type [com.example.demoStreamKafka.dto.ProductCompleteDTO] for value [[...]]
at org.springframework.core.convert.support.ConversionUtils.invokeConverter(ConversionUtils.java:47)
at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.convertNonMessageInputIfNecessary(SimpleFunctionRegistry.java:1312)
Single Mode Error (attempting to cast DTO to Message):
Caused by: java.lang.ClassCastException: class com.example.demoStreamKafka.dto.ProductCompleteDTO cannot be cast to class org.springframework.messaging.Message
at com.example.demoStreamKafka.consumer.GenericMessageConsumer.accept(GenericMessageConsumer.java:9)
at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.invokeConsumer(SimpleFunctionRegistry.java:1056)
To Reproduce
- Clone the sample project: https://github.com/ferblaca/demoStreamGenerics/tree/upgrade_to_2025_0_0
- Start the project (Kafka in KRaft mode via Docker Compose will launch automatically)
- Observe the logs for conversion and class cast exceptions in the failing consumer bindings
Expected behavior
All consumer bindings using generic types should work consistently.
Thank you in advance!