Description
I use Spring cloud stream kafka binder with functions to describe my kafka consumer and consume messages in batch mode.
@Bean
public Consumer<Message<List<InputOrder>>> process() {
return messages -> {
List<Map<String, Object>> headers = messages.getHeaders()
.get(
KafkaHeaders.BATCH_CONVERTED_HEADERS,
List.class
);
var orders = messages.getPayload();
};
}
And properties is
cloud:
stream:
kafka:
binder:
brokers:
- <broker>
configuration:
max.poll.records: 20
bindings:
process-in-0:
binder: kafka
destination: <topic>
content-type: application/json
group: <group>
consumer:
batch-mode: true
default-binder: kafka
If one of the input kafka json messages couldn't be deserialized in
org.springframework.cloud.function.context.config.JsonMessageConverter#convertFromInternal
for any reason, then exception is swallowed silently and null is returned. After that SmartCompositeMessageConverter#fromMessage(org.springframework.messaging.Message<?>, java.lang.Class<?>, java.lang.Object)
just doesn't add that message to converted result.
So If one item in message.payload got this conversion error, but the second one does not, then convertedInput
in SimpleFunctionRegistry.FunctionInvocationWrapper#convertInputMessageIfNecessary
would have only second item without any exception or error message, but at the same time it would still have two item of headers because convertedInput
is checked only on null and doesn't be checked that it has the same count of input and converted messages.
So, finally in my consumer function I get messages
with 1 message in payload and 2 item in headers and, moreover, the order in headers is incorrect. I mean that the first header is for the lost message, not for the presented, but it is impossible to determine it.
Here in the end convertedInput = MessageBuilder.withPayload(convertedInput).copyHeaders(message.getHeaders()).build();
we have one item in convertedInput
, but copy all original headers
private Object convertInputMessageIfNecessary(Message message, Type type) {
if (type == null) {
return null;
}
if (message.getPayload() instanceof Optional) {
return message;
}
if (message.getPayload() instanceof Collection<?>) {
Type itemType = FunctionTypeUtils.getImmediateGenericType(type, 0);
if (itemType == null) {
itemType = type;
}
Type collectionType = CollectionUtils.findCommonElementType((Collection<?>) message.getPayload());
if (collectionType == itemType) {
return message.getPayload();
}
}
Object convertedInput = message.getPayload();
Type itemType = this.extractActualValueTypeIfNecessary(type);
Class<?> rawType = FunctionTypeUtils.isMessage(type)
? FunctionTypeUtils.getRawType(itemType)
: FunctionTypeUtils.getRawType(type);
convertedInput = this.isConversionHintRequired(type, rawType)
? SimpleFunctionRegistry.this.messageConverter.fromMessage(message, rawType, itemType)
: SimpleFunctionRegistry.this.messageConverter.fromMessage(message, rawType);
if (convertedInput != null && !rawType.isAssignableFrom(convertedInput.getClass())) {
logger.warn("Failed to convert input to " + rawType + ". Will attempt to invoke function with raw type");
}
if (FunctionTypeUtils.isMessage(type)) {
if (convertedInput == null) {
if (logger.isDebugEnabled()) {
/*
* In the event conversion was unsuccessful we simply return the original un-converted message.
* This will help to deal with issues like KafkaNull and others. However if this was not the intention
* of the developer, this would be discovered early in the development process where the
* additional message converter could be added to facilitate the conversion.
*/
logger.debug("Input type conversion of payload " + message.getPayload() + " resulted in 'null'. "
+ "Will use the original message as input.");
}
convertedInput = message;
}
else {
if (!(convertedInput instanceof Message)) {
convertedInput = MessageBuilder.withPayload(convertedInput).copyHeaders(message.getHeaders()).build();
}
}
}
return convertedInput;
}
Is it a correct behavior and I must always check count of headers and count of items in payload in consumer? Or maybe I miss some properties or error handling or anything else?
And I am also unsure that losing incorrect input messages silently is correct.
Thanks in advance for any feedback.
Affects at least version 3.2.12