New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better interoperability between Micronaut and Spring Kafka #605

Open
goeh opened this Issue Sep 22, 2018 · 5 comments

Comments

Projects
None yet
2 participants
@goeh
Contributor

goeh commented Sep 22, 2018

The interoperability between Micronaut and Spring Cloud Stream Kafka binder gives me headache. Micronaut sends Kafka headers as byte arrays but Spring seems to send quoted strings. When Micronaut receives a message from Spring, header values are wrapped with "double quotes".
I don't think Micronaut does something wrong I'm just worried that interoperability issues like this will make it hard for developers to migrate.
Can we have a config option in Micronaut to quote message header values?

Steps to Reproduce

Create a Kafka client with Micronaut that contains a String header

@Topic("spring")
    void send(@KafkaKey String key,
              @Header("sender") String sender,
              @Body Book book);

On the Spring side create a Kafka listener with a String header

@StreamListener(Processor.INPUT)
public void listener(@Header("sender") String sender, Message<Book> message) {

If you print the header in the Spring listener it will be byte values, not a String.
Spring recieved message from 77,105,99,114,111,110,97,117,116

Expected Behaviour

The String header should be the same string as was sent by Micronaut.

Actual Behaviour

Doing the opposite is also a problem. If you send a String header from Spring, it will be received by Micronaut wrapped with double quotes.

return MessageBuilder.withPayload(book)
                .setHeader("sender", "spring")
                .build();
Micronaut recieved message from "spring"

Environment Information

  • Operating System: MacOS
  • Micronaut Version: 1.0.0.M4
  • JDK Version: 1.8.0_162

Example Application

https://github.com/goeh/micronaut-kafka-spring-issue

@goeh

This comment has been minimized.

Contributor

goeh commented Sep 22, 2018

From Spring Kafka docs

The DefaultKafkaHeaderMapper maps the key to the MessageHeaders header name and, in order to support rich header types, for outbound messages, JSON conversion is performed. A "special" header, with key, spring_json_header_types contains a JSON map of :. This header is used on the inbound side to provide appropriate conversion of each header value to the original type."

https://docs.spring.io/spring-kafka/reference/htmlsingle/#headers

@graemerocher

This comment has been minimized.

Member

graemerocher commented Sep 24, 2018

To be honest I don't think we should be providing workarounds for weird things that Spring Kafka is doing. The reason Micronaut receives a double quoted string is because Spring Kafka is encoding the String as JSON in the header value.

The API for headers in Kafka is that each header should have a string name and a byte[] value. Micronaut is encoding the value into a byte[] by searching for an appropriate Serde in the Serde registry:

https://github.com/micronaut-projects/micronaut-core/blob/master/configurations/kafka/src/main/java/io/micronaut/configuration/kafka/intercept/KafkaClientIntroductionAdvice.java#L159

You could override the SerdeRegistry to provide custom serialisation if that is what you are after. Why Spring is not able to decode the value I don't know.. probably Spring Kafka requires configuration on how to decode it, but it seems to me that Spring Kafka is imposing some assumptions on messaging format that makes it then harder for other Kafka producers to interoperate with it

@goeh

This comment has been minimized.

Contributor

goeh commented Sep 24, 2018

I agree that Micronaut should not provide workarounds out of the box. But someone needs to blog about alternatives to solve this problem. And right now I'm not that person, but I'm working on it.

@goeh

This comment has been minimized.

Contributor

goeh commented Sep 24, 2018

I managed to create a simple custom KafkaHeaderMapper on the Spring side that maps byte[] to/from strings and it seems to work ok for my simple use-cases (only string headers). It works both from Micronaut to Spring, Spring to Micronaut and Spring to Spring. The sample application is updated, so it no longer fails.
Next I will try to fix it on the Micronaut side instead, by overriding SerdeRegistry as you suggested.

@goeh

This comment has been minimized.

Contributor

goeh commented Sep 25, 2018

I looked briefly at the Micronaut side but I was not able to create a working prototype. I could not find out how to pick a custom serializer just for the headers.
Unfortunately I don't have time to work on it this week because of time constraints.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment