Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spring Cloud 2020.0.4 breaks Spring Cloud Stream Kafka AVRO Message Conversion when Sleuth is on the classpath #2051

Closed
davidmelia opened this issue Nov 1, 2021 · 16 comments

Comments

@davidmelia
Copy link

Describe the bug
When Upgrading from Spring Cloud 2020.0.3 to 2020.0.4 AVRO message conversion is ignored in Spring Cloud Stream Kafka (Kafka 2.7) giving the following error:

2021-10-28 15:43:47.868�[0;39m �[31mERROR [spring-cloud-webflux-avor-source-only,14b2b7aa4a15f2e6,14b2b7aa4a15f2e6]�[0;39m �[35m28075�[0;39m �[2m---�[0;39m �[2m[ctor-http-nio-2]�[0;39m �[36ma.w.r.e.AbstractErrorWebExceptionHandler�[0;39m �[2m:�[0;39m [25d7b7f7-1]  500 Server Error for HTTP GET "/dave"

org.springframework.messaging.converter.MessageConversionException: Could not write JSON: Not a map: {"type":"record","name":"AvroFxRateEvent","namespace":"uk.co.dave.consumer.fxrate.consumer.avro","fields":[{"name":"from","type":{"type":"string","avro.java.string":"String"}},{"name":"to","type":{"type":"string","avro.java.string":"String"}},{"name":"rate","type":{"type":"bytes","logicalType":"decimal","precision":7,"scale":6}}]} (through reference chain: uk.co.dave.consumer.fxrate.consumer.avro.AvroFxRateEvent["schema"]->org.apache.avro.Schema$RecordSchema["valueType"]); nested exception is com.fasterxml.jackson.databind.JsonMappingException: Not a map: {"type":"record","name":"AvroFxRateEvent","namespace":"uk.co.dave.consumer.fxrate.consumer.avro","fields":[{"name":"from","type":{"type":"string","avro.java.string":"String"}},{"name":"to","type":{"type":"string","avro.java.string":"String"}},{"name":"rate","type":{"type":"bytes","logicalType":"decimal","precision":7,"scale":6}}]} (through reference chain: uk.co.dave.consumer.fxrate.consumer.avro.AvroFxRateEvent["schema"]->org.apache.avro.Schema$RecordSchema["valueType"])
	at org.springframework.messaging.converter.MappingJackson2MessageConverter.convertToInternal(MappingJackson2MessageConverter.java:273) ~[spring-messaging-5.3.9.jar:5.3.9]
	Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Error has been observed at the following site(s):
	|_ checkpoint ⇢ Handler example.DaveController#dave() [DispatcherHandler]
	|_ checkpoint ⇢ org.springframework.cloud.sleuth.instrument.web.TraceWebFilter [DefaultWebFilterChain]
	|_ checkpoint ⇢ HTTP GET "/dave" [ExceptionHandlingWebHandler]
Stack trace:
		at org.springframework.messaging.converter.MappingJackson2MessageConverter.convertToInternal(MappingJackson2MessageConverter.java:273) ~[spring-messaging-5.3.9.jar:5.3.9]
		at org.springframework.cloud.stream.converter.ApplicationJsonMessageMarshallingConverter.convertToInternal(ApplicationJsonMessageMarshallingConverter.java:69) ~[spring-cloud-stream-3.1.4.jar:3.1.4]
		at org.springframework.messaging.converter.AbstractMessageConverter.toMessage(AbstractMessageConverter.java:201) ~[spring-messaging-5.3.9.jar:5.3.9]
		at org.springframework.messaging.converter.AbstractMessageConverter.toMessage(AbstractMessageConverter.java:191) ~[spring-messaging-5.3.9.jar:5.3.9]
		at org.springframework.cloud.function.context.config.SmartCompositeMessageConverter.toMessage(SmartCompositeMessageConverter.java:96) ~[spring-cloud-function-context-3.1.4.jar:3.1.4]
		at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.convertOutputMessageIfNecessary(SimpleFunctionRegistry.java:1245) ~[spring-cloud-function-context-3.1.4.jar:3.1.4]
		at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.convertOutputIfNecessary(SimpleFunctionRegistry.java:1056) ~[spring-cloud-function-context-3.1.4.jar:3.1.4]
		at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.apply(SimpleFunctionRegistry.java:509) ~[spring-cloud-function-context-3.1.4.jar:3.1.4]
		at org.springframework.cloud.sleuth.instrument.messaging.TraceFunctionAroundWrapper.doApply(TraceFunctionAroundWrapper.java:125) ~[spring-cloud-sleuth-instrumentation-3.0.4.jar:3.0.4]
		at org.springframework.cloud.function.context.catalog.FunctionAroundWrapper.apply(FunctionAroundWrapper.java:41) ~[spring-cloud-function-context-3.1.4.jar:3.1.4]
		at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$1.doApply(SimpleFunctionRegistry.java:257) ~[spring-cloud-function-context-3.1.4.jar:3.1.4]
		at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.apply(SimpleFunctionRegistry.java:506) ~[spring-cloud-function-context-3.1.4.jar:3.1.4]
		at org.springframework.cloud.stream.function.StreamBridge.send(StreamBridge.java:214) ~[spring-cloud-stream-3.1.4.jar:3.1.4]
		at org.springframework.cloud.stream.function.StreamBridge.send(StreamBridge.java:156) ~[spring-cloud-stream-3.1.4.jar:3.1.4]
		at org.springframework.cloud.stream.function.StreamBridge.send(StreamBridge.java:136) ~[spring-cloud-stream-3.1.4.jar:3.1.4]

Which looks like the avro message converters are being ignored for the Jackson ones.

To solve the error

  1. downgrade Spring Cloud Stream to 3.1.3
  2. removing Spring Cloud Sleuth as a dependency
  3. downgrade to Spring Cloud 2020.0.3

Sample
https://github.com/davidmelia/spring-boot-webflux-avro-source-only

If you compile and run this project (Assumes a local confluent schema registry on http://localhost:8081 and kafka on localhost:9092) and hit http://localhost:8080/dave this sends a message to kafka. You will see the above error.

Downgraded to spring cloud stream 3.1.3 (uncomment spring-cloud-stream-dependencies in the pom.xml) OR
removing Spring Cloud Sleuth in the pom.xml OR downgrading to Spring Cloud 2020.0.3 fixes this problem.

I previously raised on Spring Cloud Sleuth but after realising that downgrading spring cloud stream fixes the problem I am not sure where the problem lies (#2048)

Thanks

@sobychacko
Copy link

if you set the property spring.sleuth.function.enabled to false, then it disconnects sleuth from the app. Could you try that and see if that fixes the issue? If that works, then we can delegate the issue to spring-cloud-sleuth.

@davidmelia
Copy link
Author

@sobychacko - I didn't think of doing that - I can confirm spring.sleuth.function.enabled=false fixes the issue.

@olegz
Copy link
Contributor

olegz commented Nov 2, 2021

@davidmelia I'll talk to @marcingrzejszczak tomorrow. I know there was a lot of work in TraceFunctionAroundWrapper in sleuth and will be available with RC tomorrow, but given that all that is cutting edge that flag is specifically for that purpose. Doesn't mean that there is no issue, just means we still need to dig in and see what's going on, but for now you have simply disabled that interceptor and helped identify that there is indeed an issue in sleuth, so thank you!

@MarcoR83
Copy link

MarcoR83 commented May 12, 2022

Just tapped into the same issue here. The problem is that because the Function is wrapped for that brave/sleuth tracing advice the following sets the FunctionInvocationWrapper.skipInputConversion property on the sleuth wrapper:

https://github.com/spring-cloud/spring-cloud-stream/blob/c22651a3b0dfdae4313a4772a23f5a8cde653c60/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/function/FunctionConfiguration.java#L522

The decision in FunctionInvocationWrapper.convertInputMessageIfNecessary is made based on the original function, which has skipInputConversion=false by default. Thats why it tries to convert an already perfectly deserialized object, which fails.

https://github.com/spring-cloud/spring-cloud-function/blob/7921842943e782bf82cdcdf21627110548980537/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java#L1049

IMHO the properties of the Wrapper/Decorator should be delegated through the original instance and not copied.

When can we expect this to get fixed?

@olegz
Copy link
Contributor

olegz commented May 12, 2022

You can disable sleuth with spring.sleuth.function.enabled property.
If you intention is to use sleuth, then I need a small reproducible sample. Those flags are there for a reason, since the intention of this particular wrapper is to only read/add additional headers to the message and not to process anything

@Richardmbs12
Copy link

Hi @olegz

I am experiencing exactly the same issue.

My case is a little different. We have a function that takes in a object deserialized from json, and its output object needs to be serialized to xml

We also want to use spring cloud sleuth with our functions. Especially because we use the traceId from the propagated b3headers in our functions.

But because the FunctionAroundWrapper sets - targetFunction.setSkipOutputConversion(true) , it never uses our XMLConverter and the function always returns json.

@erohana
Copy link

erohana commented Oct 24, 2022

hi
we are experiencing the same issue with spring boot 2.5.12, spring cloud 2020.0.5
we use avro messages

the problem is that it happens randomly, not for all messages.
it don't see in this thread if it was fixed in later versions, and I don't want to disable sleuth since we heavily dependant on it

@erohana
Copy link

erohana commented Oct 27, 2022

@marcingrzejszczak please suggest a version of spring boot and cloud that it works, or a workaround

@olegz
Copy link
Contributor

olegz commented Oct 27, 2022

I am going to be closing this issue since

  1. Sleuth is on its way out in favor of observability
  2. I just pushed the minor fix to functions to ensure that sleuth is not applied for reactive functions as it really doesn't make mush sense
  3. Observability is already in place for 4.x branch
  4. spring-cloud/spring-cloud-function@62ffbc1

@olegz olegz closed this as completed Oct 27, 2022
@erohana
Copy link

erohana commented Oct 27, 2022

@olegz the problem is not with reactive, it is with spring cloud function

@olegz
Copy link
Contributor

olegz commented Oct 27, 2022

Can you have an example that reproduces it so I can have a look? The original example with the stack trace included at the beginning of this issue is reactive

@erohana
Copy link

erohana commented Oct 27, 2022

it keeps happening in production randomly and we weren't able to reproduce it in the local environment. but we do have the message and the stacktrace from the DLQ.

the strange thing is that when we retried the same message from the DLQ it worker with no error

part of the stacktrace :


at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:191) at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:65) at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115) at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133) at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106) at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72) at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317) at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272) at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187) at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166) at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109) at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:216) at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.sendMessageIfAny(KafkaMessageDrivenChannelAdapter.java:397) at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$300(KafkaMessageDrivenChannelAdapter.java:83) at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:454) at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:428) at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.lambda$onMessage$0(RetryingMessageListenerAdapter.java:123) at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329) at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:255) at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:117) at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:41) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2334) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2315) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2237) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2150) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2032) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1705) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1276) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1268) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1163) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.lang.Thread.run(Thread.java:834) Caused by: org.springframework.messaging.converter.MessageConversionException: Could not write JSON: Not an array: 

and sometimes this stacktrace


 error occurred in message handler [org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1@7943bd60]; nested exception is org.springframework.messaging.converter.MessageConversionException: Could not write JSON: Infinite recursion (StackOverflowError) (through reference chain: java.lang.Module["classLoader"]->jdk.internal.loader.ClassLoaders$PlatformClassLoader["unnamedModule"]->java.lang.Module["classLoader"]->jdk.internal.loader.ClassLoaders$PlatformClassLoader["unnamedModule"]->java.lang.Module["classLoader"]->jdk.internal.loader.ClassLoaders$PlatformClassLoader["unnamedModule"]->java.lang.Module["classLoader"]->jdk.internal.loader.ClassLoaders$PlatformClassLoader["unnamedModule"]->java.lang.Module["classLoader"]->jdk.internal.loader.ClassLoaders$PlatformClassLoader["unnamedModule"]->java.lang.Module["classLoader"]->jdk.internal.loader.ClassLoaders$PlatformClassLoader["unnamedModule"]->java.lang.Module["classLoader"]->jdk.internal.loader.ClassLoaders$PlatformClassLoader["unnamedModule"]->java.lang.Module["classLoader"]->jdk.internal.loader.ClassLoaders$PlatformClassLoader["unnamedModule"]->java.lang.Module["classLoader"]->jdk.internal.loader.ClassLoaders$PlatformClassLoader["unnamedModule"]->java.lang.Module["classLoader"]->jdk.internal.loader.ClassLoaders$PlatformClassLoader["unnamedModule"]->java.lang.Module["classLoader"]->jdk.internal.loader.ClassLoaders$PlatformClassLoader["unnamedModule"]->java.lang.Module["classLoader"]->jdk.internal.loader.ClassLoaders$PlatformClassLoader["unnamedModule"]->java.lang.Module["classLoader"]-

@olegz
Copy link
Contributor

olegz commented Oct 27, 2022

I would not call it random or similar problem to the original post. It's clear you have some recursion in your JSON and that explains the randomness since it happens per-message.
Consider extracting one of those messages from DLQ and evaluating its JSON payload and se what is the issue.

@erohana
Copy link

erohana commented Oct 27, 2022

again, once we retry the same message it works, I would expect it shouldn't work .
once we turned off spring sleuth everything works fine - spring.sleuth.enabled=false

@olegz
Copy link
Contributor

olegz commented Oct 28, 2022

There is no such thing as bug that can not be reproduced. It simply means there is no bug

@olegz
Copy link
Contributor

olegz commented Oct 28, 2022

I have no sample message, no meaningful stack trace, not approximate instruction on how to even attempt to reproduce it. What would you like us to do?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

8 participants