Skip to content
This repository has been archived by the owner on Feb 23, 2023. It is now read-only.

Add support for Avro serialization in Kafka #687

Closed
pdebuitlear opened this issue Mar 30, 2021 · 13 comments · Fixed by #704
Closed

Add support for Avro serialization in Kafka #687

pdebuitlear opened this issue Mar 30, 2021 · 13 comments · Fixed by #704
Assignees
Labels
type: compatibility Native image compatibility issue
Milestone

Comments

@pdebuitlear
Copy link

There seem to be 2 issues with Avro in Spring Native. These 2 issues I'm not seeing with the same code running outside of Spring Native.
Sample code here.

  • The first issue is that auto generated classes from Avro being are not being registered automatically. I needed to add a hint for the class in question. This is not ideal.

  • The second issue is that when I trying to invoke the serialization logic by trying to put a message to the broker, I am getting the following exception indicating that the input schema is invalid. This works outside of Spring Native.

SEVERE: Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is org.apache.kafka.common.errors.SerializationException: Error registering Avro schema: {"type":"record","name":"Address","namespace":"com.sample.avro","fields":[{"name":"addressId","type":{"type":"string","avro.java.string":"String"}},{"name":"addressLine1","type":["null",{"type":"string","avro.java.string":"String"}],"default":null},{"name":"addressLine2","type":["null",{"type":"string","avro.java.string":"String"}],"default":null},{"name":"addressLine3","type":["null",{"type":"string","avro.java.string":"String"}],"default":null},{"name":"addressPostcode","type":["null",{"type":"string","avro.java.string":"String"}],"default":null}]}] with root cause
io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Input schema is an invalid Avro schema; error code: 42201
	at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:292)
	at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:351)
	at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:494)
	at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:485)
	at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:458)
	at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:206)
	at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:268)
	at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:244)
	at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:74)
	at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:59)
	at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62)
	at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:910)
	at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:870)
	at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:862)
	at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:563)
	at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:369)
	at com.sample.topology.Producer.sendMessage(Producer.java:24)
	at com.sample.controllers.KafkaController.sendMessageToKafkaTopic(KafkaController.java:25)
	at java.lang.reflect.Method.invoke(Method.java:566)
	at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:197)
	at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:141)
	at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:106)
	at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:894)
	at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:808)
	at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87)
	at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1060)
	at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:962)
	at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1006)
	at org.springframework.web.servlet.FrameworkServlet.doPost(FrameworkServlet.java:909)
	at javax.servlet.http.HttpServlet.service(HttpServlet.java:652)
	at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:883)
	at javax.servlet.http.HttpServlet.service(HttpServlet.java:733)
	at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:227)
	at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)
	at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:100)
	at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)
	at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)
	at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)
	at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:93)
	at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)
	at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)
	at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)
	at org.springframework.boot.actuate.metrics.web.servlet.WebMvcMetricsFilter.doFilterInternal(WebMvcMetricsFilter.java:93)
	at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)
	at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)
	at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)
	at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:201)
	at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)
	at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)
	at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)
	at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:202)
	at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:97)
	at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:542)
	at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:143)
	at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92)
	at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:78)
	at org.apache.catalina.valves.RemoteIpValve.invoke(RemoteIpValve.java:764)
	at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:357)
	at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:374)
	at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:65)
	at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:893)
	at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1707)
	at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
	at java.lang.Thread.run(Thread.java:834)
	at com.oracle.svm.core.thread.JavaThreads.threadStartRoutine(JavaThreads.java:519)
	at com.oracle.svm.core.posix.thread.PosixJavaThreads.pthreadStartRoutine(PosixJavaThreads.java:192)
@sdeleuze sdeleuze added the status: waiting-for-triage An issue we've not yet triaged or decided on label Mar 31, 2021
@garyrussell
Copy link
Contributor

SEVERE: Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is org.apache.kafka.common.errors.SerializationException: Error registering Avro schema:
{"type":"record","name":"Address","namespace":"com.sample.avro","fields":[{"name":"addressId","type":{"type":"string","avro.java.string":"String"}},{"name":"addressLine1","type":["null",{"type":"string","avro.java.string":"String"}],"default":null},{"name":"addressLine2","type":["null",{"type":"string","avro.java.string":"String"}],"default":null},{"name":"addressLine3","type":["null",{"type":"string","avro.java.string":"String"}],"default":null},{"name":"addressPostcode","type":["null",{"type":"string","avro.java.string":"String"}],"default":null}]}]

The 42201 error is returned by the registry server - can you capture the JSON for the schema from the non-native app, to see what is the difference.

@pdebuitlear
Copy link
Author

Here is debug logging for the non-native instance. Your stacktrace has an extra trailing square bracket ] at the end of the payload. Otherwise they look the same.

2021-03-31 14:58:31 -DEBUG Sending POST with input {"schema":"{"type":"record","name":"Address","namespace":"com.sample.avro","fields":[{"name":"addressId","type":{"type":"string","avro.java.string":"String"}},{"name":"addressLine1","type":["null",{"type":"string","avro.java.string":"String"}],"default":null},{"name":"addressLine2","type":["null",{"type":"string","avro.java.string":"String"}],"default":null},{"name":"addressLine3","type":["null",{"type":"string","avro.java.string":"String"}],"default":null},{"name":"addressPostcode","type":["null",{"type":"string","avro.java.string":"String"}],"default":null}]}"} to http://localhost:8081/subjects/Address-value/versions

@garyrussell
Copy link
Contributor

It's not MY stack trace, it's yours 😄 but that would explain why the registry flags it as invalid.

Looking at the serializer, it uses this to obtain the schema AvroSchemaUtils.getSchema(record, useSchemaReflection, removeJavaProperties));.

It looks like schema.reflection is false by default, and I don't see you setting it to true in your yml.

But that avro code seems to be the smoking gun.

@pdebuitlear
Copy link
Author

It's not MY stack trace, it's yours 😄 but that would explain why the registry flags it as invalid.

I didn't scroll right when I pasted it in the stacktrace 😒

@garyrussell
Copy link
Contributor

I got your project built and see that it works as Java.

I finally got the image to build, but now I need docker containers for Kafka and the registry; I see you have a kubernetes profile which I assume you use for this.

If you can jump start me, it would save me some time; then I want to add some debug logic to the serializer, to figure where the junk is coming from.

@garyrussell
Copy link
Contributor

Never mind; I got everything working - I reproduced it and will start diagnosing - probably tomorrow now - stay tuned.

@garyrussell
Copy link
Contributor

Strange, I added this to your app...

    @Bean
    ApplicationRunner runner() {
        return args -> {
            Thread.sleep(30000);
            Address address = Address.newBuilder().setAddressId(UUID.randomUUID().toString()).setAddressPostcode("12345").build();
            System.out.println(AvroSchemaUtils.getSchema(address, false).toString(true));
        };
    }

and

avro_1             | {
avro_1             |   "type" : "record",
avro_1             |   "name" : "Address",
avro_1             |   "namespace" : "com.sample.avro",
avro_1             |   "fields" : [ {
avro_1             |     "name" : "addressId",
avro_1             |     "type" : {
avro_1             |       "type" : "string",
avro_1             |       "avro.java.string" : "String"
avro_1             |     }
avro_1             |   }, {
avro_1             |     "name" : "addressLine1",
avro_1             |     "type" : [ "null", {
avro_1             |       "type" : "string",
avro_1             |       "avro.java.string" : "String"
avro_1             |     } ],
avro_1             |     "default" : null
avro_1             |   }, {
avro_1             |     "name" : "addressLine2",
avro_1             |     "type" : [ "null", {
avro_1             |       "type" : "string",
avro_1             |       "avro.java.string" : "String"
avro_1             |     } ],
avro_1             |     "default" : null
avro_1             |   }, {
avro_1             |     "name" : "addressLine3",
avro_1             |     "type" : [ "null", {
avro_1             |       "type" : "string",
avro_1             |       "avro.java.string" : "String"
avro_1             |     } ],
avro_1             |     "default" : null
avro_1             |   }, {
avro_1             |     "name" : "addressPostcode",
avro_1             |     "type" : [ "null", {
avro_1             |       "type" : "string",
avro_1             |       "avro.java.string" : "String"
avro_1             |     } ],
avro_1             |     "default" : null
avro_1             |   } ]
avro_1             | }

No extra ] - so it's being added someplace else. More to follow.

@garyrussell
Copy link
Contributor

garyrussell commented Apr 1, 2021

Progress.

I was getting a different problem to you; registration failed due to an empty schema, after some debugging, I added these to the @NativeHint.types

                RegisterSchemaRequest.class,
                RegisterSchemaResponse.class,
                SchemaTypeConverter.class,

and now the schema registration and publishing is successful.

I am not yet seeing anything on the consumer (Streams) side, though.

schema-registry_1  | [2021-04-01 17:51:56,860] INFO Registering new schema: subject Address-value, version null, id null, type null (io.confluent.kafka.schemaregistry.rest.resources.SubjectVersionsResource)
avro_1             | 2021-04-01 17:51:56.944  INFO 1 --- [nio-9999-exec-1] com.sample.topology.Producer             : Produced address -> {"addressId": "2e5024b4-14b9-4245-83bf-1eac8d1ee3f6", "addressLine1": null, "addressLine2": null, "addressLine3": null, "addressPostcode": "code"}

@garyrussell
Copy link
Contributor

Correction; it is working fine for me now...

avro_1             | 2021-04-01 19:31:49.628  INFO 1 --- [nio-9999-exec-4] com.sample.topology.Producer             : Address-0@3
avro_1             | 2021-04-01 19:31:49.630  INFO 1 --- [-StreamThread-1] com.sample.topology.Consumer             : Address: {"addressId": "30d04e94-a48b-440a-b816-2c22f7b44196", "addressLine1": null, "addressLine2": null, "addressLine3": null, "addressPostcode": "code"}
avro_1             | 2021-04-01 19:31:49.631  INFO 1 --- [nio-9999-exec-4] com.sample.topology.Producer             : Produced address -> {"addressId": "30d04e94-a48b-440a-b816-2c22f7b44196", "addressLine1": null, "addressLine2": null, "addressLine3": null, "addressPostcode": "code"}

I added code to the producer to collect the record metadata.

    public void sendMessage(Address address) {
        Future<SendResult<String, Address>> future = this.kafkaTemplate.send(this.TOPIC, address.getAddressId(), address);
        try {
            SendResult<String, Address> result = future.get(10, TimeUnit.SECONDS);
            log.info(result.getRecordMetadata().toString());
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
        log.info(String.format("Produced address -> %s", address));
    }

@garyrussell
Copy link
Contributor

garyrussell/spring-native-kafka-avro@201180c

@pdebuitlear
Copy link
Author

It's the suggestion here that a TypeHint is needed for all generated classes?

@garyrussell
Copy link
Contributor

@pdebuitlear At the moment, yes; I am looking at a mechanism to detect and auto-register Avro generated types.

garyrussell added a commit to garyrussell/spring-native that referenced this issue Apr 6, 2021
- detect Apache Avro generated types and register for reflection
garyrussell added a commit to garyrussell/spring-native that referenced this issue Apr 6, 2021
- detect Apache Avro generated types used in listeners, and register for reflection
@garyrussell
Copy link
Contributor

#707

aclement added a commit that referenced this issue Apr 6, 2021
@sdeleuze sdeleuze removed the status: waiting-for-triage An issue we've not yet triaged or decided on label Apr 9, 2021
@sdeleuze sdeleuze added the type: compatibility Native image compatibility issue label Apr 9, 2021
@sdeleuze sdeleuze added this to the 0.9.2 milestone Apr 9, 2021
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
type: compatibility Native image compatibility issue
Development

Successfully merging a pull request may close this issue.

4 participants