-
Notifications
You must be signed in to change notification settings - Fork 633
Closed
Description
Describe the bug
I used spring-cloud-stream 3.0.3.RELEASE
and recently upgraded to spring-cloud-stream 3.0.8.RELEASE
.
I am using a custom message converter that inherits AbstractMessageConverter
and the consumer function signature is Consumer<List<MyClass>>
.
However, after upgrading version, the following error occurred.
org.springframework.messaging.converter.MessageConversionException: Could Not Convert Input
Looking for the cause, my message has already been converted in my custom message converter, but spring tried to convert my message again in SimpleFunctionRegistry
, and the conversion process failed here.
I think the problem is the code below.
message.getPayload().getClass().isAssignableFrom(((Class<?>) rawType))
I think this code should be changed as below.
((Class<?>) rawType).isAssignableFrom(message.getPayload().getClass())
Because rawType is the type I want to finally assign.
If this isn't a problem, is there something I've misunderstood?
Sample
public class MyClassListMessageConverter extends AbstractMessageConverter {
public MyClassListMessageConverter() {
super(new MimeType("application", "*+avro"));
}
@Override
protected boolean canConvertFrom(Message<?> message, Class<?> targetClass) {
boolean canConvert = Optional.ofNullable(message.getHeaders().get(KafkaHeaders.RECEIVED_MESSAGE_KEY, List.class))
.orElse(Collections.emptyList())
.stream()
.anyMatch(key -> key instanceof Key);
canConvert = canConvert && message.getPayload() instanceof List;
if (canConvert) {
List<Object> payload = (List<Object>) message.getPayload();
canConvert = payload.stream().anyMatch(o -> o instanceof Envelope)
|| payload.stream().allMatch(o -> o instanceof KafkaNull);
}
return canConvert && super.canConvertFrom(message, targetClass);
}
@Override
protected boolean supports(Class<?> clazz) {
return byte[].class.equals(clazz) || List.class.equals(clazz) || Object.class.equals(clazz);
}
@Override
protected Object convertFromInternal(Message<?> message, Class<?> targetClass, Object conversionHint) {
try {
List<Key> keys = Objects.requireNonNull(message.getHeaders().get(KafkaHeaders.RECEIVED_MESSAGE_KEY, List.class));
List<Object> payloads = (List<Object>) message.getPayload();
if (keys.size() != payloads.size()) {
throw new IllegalStateException(String.format("keys and payloads size does not equal. keys size : %d, payloads size : %d", keys.size(), payloads.size()));
}
List<MyClass> myClasses = new ArrayList<>();
for (int i = 0; i < payloads.size(); i++) {
Object payload = payloads.get(i);
if (payload instanceof KafkaNull) {
myClasses.add(convertToMyClass(keys.get(i), null));
} else if (payload instanceof Envelope) {
myClasses.add(convertToMyClass(keys.get(i), (Envelope) payload));
} else {
throw new IllegalStateException("Unknown type of payload : " + payload);
}
}
return myClasses;
} catch (Exception e) {
log.error("{}", message);
throw new RuntimeException(e);
}
}
}
spring.cloud.function.definition: myConsumer
spring.cloud.stream.bindings:
myConsumer-in-0:
destination: mytopic
group: mygroup
content-type: application/*+avro
binder: kafka
consumer:
batch-mode: true
spring.cloud.stream.kafka.binder:
brokers: ${my.brokers}
required-acks: all
auto-create-topics: false
spring.cloud.stream.kafka.bindings:
myConsumer-in-0:
consumer:
configuration:
auto.offset.reset: earliest
key.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
schema.registry.url: ${my.schema-registry-url}
specific.avro.reader: true
max.poll.records: 100