Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Using messageKeyExpression=payload.field with StreamBridge #2213

Closed
vxavictor513 opened this issue Aug 21, 2021 · 22 comments
Closed

Using messageKeyExpression=payload.field with StreamBridge #2213

vxavictor513 opened this issue Aug 21, 2021 · 22 comments
Assignees
Milestone

Comments

@vxavictor513
Copy link
Contributor

I configured messageKeyExpression=payload.field and sent a POJO payload using StreamBridge.

Message<MyPayload> message = MessageBuilder.withPayload(new MyPayload("234")).build();
streamBridge.send("toStream-out-0", message);
@Data
@AllArgsConstructor
public class MyPayload implements Serializable {
    private String field;
}
spring.cloud.stream.source=toStream
spring.cloud.stream.bindings.toStream-out-0.content-type=application/json
spring.cloud.stream.bindings.toStream-out-0.destination=myTopic
spring.cloud.stream.kafka.bindings.toStream-out-0.producer.message-key-expression=payload.field

Got an error at KafkaExpressionEvaluatingInterceptor because the payload has already been converted to byte[].

2021-08-21 11:01:35.353 ERROR 12252 --- [nio-8090-exec-1] o.a.c.c.C.[.[.[/].[dispatcherServlet]    : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is org.springframework.messaging.MessageDeliveryException: failed to send Message to channel 'toStream-out-0'; nested exception is org.springframework.expression.spel.SpelEvaluationException: EL1008E: Property or field 'field' cannot be found on object of type 'byte[]' - maybe not public or not valid?, failedMessage=GenericMessage [payload=byte[15], headers={sendTimeout=10000, id=7de27c8f-365e-1d75-c237-62b93a2c2498, userId=[B@53b31111, contentType=application/json, timestamp=1629514893939}]] with root cause

org.springframework.expression.spel.SpelEvaluationException: EL1008E: Property or field 'field' cannot be found on object of type 'byte[]' - maybe not public or not valid?
	at org.springframework.expression.spel.ast.PropertyOrFieldReference.readProperty(PropertyOrFieldReference.java:217) ~[spring-expression-5.3.9.jar:5.3.9]
	at org.springframework.expression.spel.ast.PropertyOrFieldReference.getValueInternal(PropertyOrFieldReference.java:104) ~[spring-expression-5.3.9.jar:5.3.9]
	at org.springframework.expression.spel.ast.PropertyOrFieldReference.getValueInternal(PropertyOrFieldReference.java:91) ~[spring-expression-5.3.9.jar:5.3.9]
	at org.springframework.expression.spel.ast.CompoundExpression.getValueRef(CompoundExpression.java:61) ~[spring-expression-5.3.9.jar:5.3.9]
	at org.springframework.expression.spel.ast.CompoundExpression.getValueInternal(CompoundExpression.java:91) ~[spring-expression-5.3.9.jar:5.3.9]
	at org.springframework.expression.spel.ast.SpelNodeImpl.getValue(SpelNodeImpl.java:112) ~[spring-expression-5.3.9.jar:5.3.9]
	at org.springframework.expression.spel.standard.SpelExpression.getValue(SpelExpression.java:337) ~[spring-expression-5.3.9.jar:5.3.9]
	at org.springframework.cloud.stream.binder.kafka.KafkaExpressionEvaluatingInterceptor.preSend(KafkaExpressionEvaluatingInterceptor.java:63) ~[spring-cloud-stream-binder-kafka-3.1.3.jar:3.1.3]
	at org.springframework.integration.channel.AbstractMessageChannel$ChannelInterceptorList.preSend(AbstractMessageChannel.java:469) ~[spring-integration-core-5.5.3.jar:5.5.3]
	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:309) ~[spring-integration-core-5.5.3.jar:5.5.3]
	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272) ~[spring-integration-core-5.5.3.jar:5.5.3]
	at org.springframework.cloud.stream.function.StreamBridge.send(StreamBridge.java:215) ~[spring-cloud-stream-3.1.3.jar:3.1.3]
	at org.springframework.cloud.stream.function.StreamBridge.send(StreamBridge.java:156) ~[spring-cloud-stream-3.1.3.jar:3.1.3]
	at org.springframework.cloud.stream.function.StreamBridge.send(StreamBridge.java:136) ~[spring-cloud-stream-3.1.3.jar:3.1.3]
	at com.example.demokafkamessagekeyexpression.MyController.publishToDynamicTopic(MyController.java:44) ~[classes/:na]
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:na]
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
	at java.base/java.lang.reflect.Method.invoke(Method.java:566) ~[na:na]
	at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:197) ~[spring-web-5.3.9.jar:5.3.9]
	at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:141) ~[spring-web-5.3.9.jar:5.3.9]
	at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:106) ~[spring-webmvc-5.3.9.jar:5.3.9]
	at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:895) ~[spring-webmvc-5.3.9.jar:5.3.9]
	at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:808) ~[spring-webmvc-5.3.9.jar:5.3.9]
	at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87) ~[spring-webmvc-5.3.9.jar:5.3.9]
	at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1064) ~[spring-webmvc-5.3.9.jar:5.3.9]
	at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:963) ~[spring-webmvc-5.3.9.jar:5.3.9]
	at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1006) ~[spring-webmvc-5.3.9.jar:5.3.9]
	at org.springframework.web.servlet.FrameworkServlet.doPost(FrameworkServlet.java:909) ~[spring-webmvc-5.3.9.jar:5.3.9]
	at javax.servlet.http.HttpServlet.service(HttpServlet.java:681) ~[tomcat-embed-core-9.0.52.jar:4.0.FR]
	at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:883) ~[spring-webmvc-5.3.9.jar:5.3.9]
	at javax.servlet.http.HttpServlet.service(HttpServlet.java:764) ~[tomcat-embed-core-9.0.52.jar:4.0.FR]
	at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:227) ~[tomcat-embed-core-9.0.52.jar:9.0.52]
	at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.52.jar:9.0.52]
	at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53) ~[tomcat-embed-websocket-9.0.52.jar:9.0.52]
	at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.52.jar:9.0.52]
	at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.52.jar:9.0.52]
	at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:100) ~[spring-web-5.3.9.jar:5.3.9]
	at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) ~[spring-web-5.3.9.jar:5.3.9]
	at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.52.jar:9.0.52]
	at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.52.jar:9.0.52]
	at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:93) ~[spring-web-5.3.9.jar:5.3.9]
	at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) ~[spring-web-5.3.9.jar:5.3.9]
	at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.52.jar:9.0.52]
	at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.52.jar:9.0.52]
	at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:201) ~[spring-web-5.3.9.jar:5.3.9]
	at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) ~[spring-web-5.3.9.jar:5.3.9]
	at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.52.jar:9.0.52]
	at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.52.jar:9.0.52]
	at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:197) ~[tomcat-embed-core-9.0.52.jar:9.0.52]
	at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:97) ~[tomcat-embed-core-9.0.52.jar:9.0.52]
	at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:542) ~[tomcat-embed-core-9.0.52.jar:9.0.52]
	at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:135) ~[tomcat-embed-core-9.0.52.jar:9.0.52]
	at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92) ~[tomcat-embed-core-9.0.52.jar:9.0.52]
	at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:78) ~[tomcat-embed-core-9.0.52.jar:9.0.52]
	at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:357) ~[tomcat-embed-core-9.0.52.jar:9.0.52]
	at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:382) ~[tomcat-embed-core-9.0.52.jar:9.0.52]
	at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:65) ~[tomcat-embed-core-9.0.52.jar:9.0.52]
	at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:893) ~[tomcat-embed-core-9.0.52.jar:9.0.52]
	at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1726) ~[tomcat-embed-core-9.0.52.jar:9.0.52]
	at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49) ~[tomcat-embed-core-9.0.52.jar:9.0.52]
	at org.apache.tomcat.util.threads.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1191) ~[tomcat-embed-core-9.0.52.jar:9.0.52]
	at org.apache.tomcat.util.threads.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:659) ~[tomcat-embed-core-9.0.52.jar:9.0.52]
	at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) ~[tomcat-embed-core-9.0.52.jar:9.0.52]
	at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]

The conversion seems to happen here at StreamBridge before KafkaExpressionEvaluatingInterceptor is invoked, despite the documentation says otherwise.

image

Tried with both Spring Cloud Hoxton.SR12 and 2020.0.3 but no luck. Is there anything misconfigured?

@sobychacko
Copy link
Contributor

Hi, Thanks for this report. We will look into this soon to find out if this is a StreamBridge related issue. In the meantime, here are some workarounds that you can do.

Option 1: use native encoding to true

spring.cloud.stream.bindings.toStream-out-0.producer.use-native-encoding=true
spring.cloud.stream.kafka.bindings.toStream-out-0.producer.configuration.key.serializer=org.apache.kafka.common.serialization.StringSerializer
spring.cloud.stream.kafka.bindings.toStream-out-0.producer.configuration.value.serializer=org.springframework.kafka.support.serializer.JsonSerializer

Option 2: Use headers instead of payload as the message key expression

spring.cloud.stream.kafka.bindings.toStream-out-0.producer.message-key-expression=headers['foo']
spring.cloud.stream.kafka.bindings.toStream-out-0.producer.configuration.key.serializer=org.apache.kafka.common.serialization.StringSerializer

Then,

final MyPayload payload = new MyPayload("234");
Message<MyPayload> message = MessageBuilder.withPayload(payload).setHeader("foo", payload.getField()).build();
streamBridge.send("toStream-out-0", message);

@sobychacko sobychacko transferred this issue from spring-cloud/spring-cloud-stream-binder-kafka Aug 24, 2021
@olegz
Copy link
Contributor

olegz commented Aug 24, 2021

Generally it is a vey bad practice to use payload for routing decisions, since the payload is considered to be privileged data_ - data only to be read by its final recipient.
Imagine Postman with letter. Postman makes routing decisions based on information on the letter envelop. He does not open the envelop. There should be enough information on envelope (address, zip code, name) for postman to make routing decision.

In your case things are even simpler since you are fully in control of Message creation. All you need to do is:

Message<MyPayload> message = MessageBuilder.withPayload(new MyPayload("234")).setHeader("payload_identifier", "123").build();
streamBridge.send("toStream-out-0", message);

and have expression

messageKeyExpression=headers.payload_identifier

@sobychacko
Copy link
Contributor

When using Kafka, make sure to set the header as a byte[], otherwise, you have to provide the proper key.serializer (StringSeriallizer in this case).

@sabbyanandan
Copy link
Contributor

Closing due to no activity.

@cicorias
Copy link

this is something we have a repro on that I can share privately. It occurs in the streambridge layers before the message provider itself. @sabbyanandan don't believe this should be closed - it's only 21 days as well. what is the age of stale issues that get closed normally?

@vxavictor513
Copy link
Contributor Author

vxavictor513 commented Sep 15, 2021

We will look into this soon to find out if this is a StreamBridge related issue.

@sabbyanandan: I thought @sobychacko and team are looking into it. Shouldn't we just keep it open? Otherwise, the team should at least state it clearly in the official docs that payload isn't supported.

Imagine Postman with letter. Postman makes routing decisions based on information on the letter envelop. He does not open the envelop. There should be enough information on envelope (address, zip code, name) for postman to make routing decision.

@olegz: I don't think this is a good analogy because the process of extracting and preparing the message key is within the publisher boundary. It is producer of the payload itself, so it isn't quite accurate considering it a postman.

@vxavictor513
Copy link
Contributor Author

vxavictor513 commented Sep 15, 2021

I'm currently using a workaround by setting kafka_messageKey header.

Message<MyPayload> message = MessageBuilder.withPayload(payload)
        .setHeader(KafkaHeaders.MESSAGE_KEY, payload.getField())
        .build();

It works, but it doesn't provide (1) flexibility from configuration-based approach, (2) ability to view all keys in one glance from configuration. I believe these are the same reasons why messageKeyExpression was provided at the first place, whether it is accessing header or payload.

Would really appreciate if someone can look into the issue closer and provide a fix.

@vxavictor513
Copy link
Contributor Author

vxavictor513 commented Sep 15, 2021

On side note, spring.cloud.stream.bindings.<bindingName>.producer.partitionKeyExpression actually works with payload.

Thus I don't see any strong reason why messageKeyExpression shouldn't support payload.

@cicorias
Copy link

what we are seeing is if we have two different bindings is that under pressure within the StreamBridge.send(binding, data/message) - that it sometimes uses the SPel expression for the OTHER binding, and vica versa -- so it seems as if there crossing of what SPel to use for that binding -- again under "pressure" .

So, ....partitionKeyExpression does work, just not under pressure.

And since the data format is different the payload.<property> lookup can be different -- and that's why we see the EL1008E: Property or field 'field' cannot be found error messages.

This was working with EventHubs -- but again we debugged and saw this issue occur

@olegz
Copy link
Contributor

olegz commented Sep 15, 2021

this is dated and closed issue. Please feel free to raise a new one if you still believe there are issues that can not be addressed with explanation provided in the threads above

@cicorias
Copy link

well, that was the point of my initial response, why was it closed so fast and it's not closed as the error still exists.

A workaround isn't a correction.

@sabbyanandan
Copy link
Contributor

Hi, folks. Most Spring projects typically wait for 2 weeks for a community/requester's response, and they are automatically closed when there's no activity. We don't use that Bot in Spring Cloud Stream, however, so that's why we manually closed it. Besides that, in my naive review, it appeared we had shared a few solutions, so it wasn't apparent if that was sufficient or not.

If someone can share a concise description of the issue and a reproducible use case sample, we will be happy to review it.

@sabbyanandan sabbyanandan reopened this Sep 15, 2021
@garyrussell
Copy link
Contributor

@sabbyanandan The move to functional (from annotations) also (originally) broke partitionKeyExpression. @olegz "reluctantly" restored the functionality - see his comment on cce66f3

There was an earlier attempt to fix it here 74aee81

So, ....partitionKeyExpression does work, just not under pressure.

I don't see how there can be crosstalk between bindings or even concurrent sends on the same binding - the expression is evaluated in a stateless ChannelInterceptor (PartitioningInterceptor); each binding gets its own instance configured from the binding properties.

I am sure it will be much more difficult for kafka-specific expression evaluation to be implemented in the functional model, because spring-cloud-function is far away from the kafka binder and knows nothing about its internals.

The problem is the message is converted/built well before it reaches the Kafka binder.

For the partition key, he was able to handle it within Spring Cloud Stream, in the PartitionAwareFunctionWrapper; it would not be correct to pollute the core library with knowledge about the binder.

Perhaps some kind of pluggable interceptor mechanism could be considered, where the binder provides the interceptor and the core project applies it.

However, it is not going to be simple because it would have to retain existing functionality for users that are using native encoding.

In the meantime, @sobychacko 's workarounds above are available #2213 (comment)

@cicorias
Copy link

@garyrussell I can provide a repro privately if that works. While it's not a substantial chunk of code, its proprietary.

I can even take the time with you to demonstrate the crosstalk situation on a Teams call, Google chat, Zoom, etc.

Yes I agree I couldn't see how this was possible, but putting conditional breakpoints showed the crossing, and walking up the stack to the external callers the crossing wasn't there.

@olegz
Copy link
Contributor

olegz commented Sep 16, 2021

I still don't understand especially in the context of a StreamBridge where you have full control of the Message, why can't you inject a specific header and have your SpEL expression to be header-based (not payload-based). Even if you have to duplicate some value from the payload.
Framework having any view into your payload is so fundamentally wrong. . . Imagine your mailman opening your letters to decide where it should go or not? That is what meta data is for. Gary is right, I reluctantly added support back to function-based model only because we had that before . . but now I wish I didn't do it.
What am I missing? Why can it be header?

@cicorias
Copy link

@olegz I agree and that's the change we made.

However, there is an issue in the existing codebase that either should be fixed, documented that it doesn't work in some situation, or removed.

Do you want users of the platform to go down the same path blindly thinking that spel will work and there is known issues with under pressure issues? Thereby wasting their time, and more time on GitHub issues as they post questions, bug, issues again?

I think it would be beneficial to first repro the situation like I offered to @garyrussell -- and then take either a communication plan or a bug fix so as not to drain users of the platform of their time, or perhaps introduce problems in production code that goes unnoticed, essentially lowering the trust of Spring.

@olegz
Copy link
Contributor

olegz commented Sep 16, 2021

We already have it documented, but it appears that it needs to be further documented in the context of SpEL.
That said, i'll allow myself to disagree with introduce problems in production code since i would be very surprised that this would not be discovered during development and testing.

But obviously i do agree that some more documentation is missing on our end as users still making this mistake

@cicorias
Copy link

There is no documentation that says you shouldn't use two different bindings that have different SpeL expressions within the same process.

Not sure how using the classes (StreamBridge) as documented is a mistake if is documented. Perhaps the feature as documented should be retracted if it's not working nor intended to work in all situations.

But with many things, often people feel they've made a mistake but they were wrong.

Again, just trying to help the Spring Community.

@garyrussell let me know if there's any follow up or not.

@olegz
Copy link
Contributor

olegz commented Sep 16, 2021

So, i'll label this as documentation issue.

@olegz olegz self-assigned this Sep 16, 2021
@olegz olegz added this to the 3.2.0 milestone Sep 16, 2021
@olegz
Copy link
Contributor

olegz commented Sep 16, 2021

And we should probably deprecate all support for payload-based evaluation (routing, SpEL, etc)

@cicorias
Copy link

And we should probably deprecate all support for payload-based evaluation (routing, SpEL, etc)

that's something I'm 100% on board with.

@garyrussell
Copy link
Contributor

Regardless of this issue regarding setting the key, if there is crosstalk possible between bindings with different partitionKeyExpressions then that needs to be fixed (even if the feature is removed in a future release).

An MCRE would be much appreciated.

We should probably open a new issue for that, though.

@olegz olegz closed this as completed in ec24224 Nov 30, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

6 participants