Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How should I send/receive protobuf message? #727

Closed
thinkerou opened this issue Jul 6, 2018 · 7 comments
Closed

How should I send/receive protobuf message? #727

thinkerou opened this issue Jul 6, 2018 · 7 comments

Comments

@thinkerou
Copy link

For example, I have the following protobuf:

message Person {
  uint64 number = 1;
  string name = 2;
}

message Event {
  string msg = 1;
  code = 2;
}

And application.properties:

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=mock-test
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.ByteArraySerializer

And Sender.java:

@Component
public class Sender {

    @Autowired
    private KafkaTemplate<String, byte[]> kafkaTemplate;

    public void send(Person person) {
        kafkaTemplate.send("topic_test", person.toByteArray());
    }
}

when kafka receive Sender message after, it will do something and return Event message.

And Receiver.java:

@Component
public class Receiver {
    @KafkaListener(topics = {"topic_test"})
    public void listen(ConsumerRecord<?, Event> record) throws InterruptedException, InvalidProtocolBufferException {
        Optional<Event> kafkaMessage = Optional.ofNullable(record.value());
        if (kafkaMessage.isPresent()) {
           Event message = Event.parseFrom(kafkaMessage.get().toByteArray());
                    handleEvent(message);
        }
    }
}

the project can compile success, but run it will error:

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void com.thinkerou.test.kafka.Receiver.listen(org.apache.kafka.clients.consumer.ConsumerRecord<?, com.thinkerou.test.Event>) throws java.lang.InterruptedException,com.google.protobuf.InvalidProtocolBufferException' threw exception; nested exception is java.lang.ClassCastException: [B cannot be cast to com.thinkerou.test.Event
	at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:267) ~[spring-kafka-2.1.7.RELEASE.jar!/:2.1.7.RELEASE]
	at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:80) ~[spring-kafka-2.1.7.RELEASE.jar!/:2.1.7.RELEASE]
	at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:51) ~[spring-kafka-2.1.7.RELEASE.jar!/:2.1.7.RELEASE]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1071) [spring-kafka-2.1.7.RELEASE.jar!/:2.1.7.RELEASE]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1051) [spring-kafka-2.1.7.RELEASE.jar!/:2.1.7.RELEASE]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:998) [spring-kafka-2.1.7.RELEASE.jar!/:2.1.7.RELEASE]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:866) [spring-kafka-2.1.7.RELEASE.jar!/:2.1.7.RELEASE]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:724) [spring-kafka-2.1.7.RELEASE.jar!/:2.1.7.RELEASE]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_102]
	at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_102]
	at java.lang.Thread.run(Thread.java:745) [na:1.8.0_102]
Caused by: java.lang.ClassCastException: [B cannot be cast to com.thinkerou.test.Event
	at com.thinkerou.test.kafka.Receiver.listen(Receiver.java:42) ~[classes!/:0.0.1-SNAPSHOT]
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_102]
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_102]
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_102]
	at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_102]
	at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:181) ~[spring-messaging-5.0.7.RELEASE.jar!/:5.0.7.RELEASE]
	at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:114) ~[spring-messaging-5.0.7.RELEASE.jar!/:5.0.7.RELEASE]
	at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48) ~[spring-kafka-2.1.7.RELEASE.jar!/:2.1.7.RELEASE]
	at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:248) ~[spring-kafka-2.1.7.RELEASE.jar!/:2.1.7.RELEASE]
	... 10 common frames omitted
@nyilmaz
Copy link
Contributor

nyilmaz commented Jul 6, 2018

You are using byte[] deserializer, your application cannot know how to construct your Event object with this config. Also your values (both sent and received) are not the same, ie you are sending Person but trying to read its protobuf bytes as Event.

@thinkerou
Copy link
Author

@nyilmaz thank you for reply, I want to use Event object deserializer, sorry I don't know how set config.

@thinkerou
Copy link
Author

thinkerou commented Jul 6, 2018

Now I define two class:

public class ProtoSerializer extends Adapter implements Serializer<Person> {
    private static final Logger logger = LoggerFactory.getLogger(ProtoSerializer.class);

    @Override
    public byte[] serialize(final String topic, final Person data) {
        logger.info("Person serialize ...");
        return data.toByteArray();
    }
}

public class ProtoDeserializer extends Adapter implements Deserializer<Event> {
    private static final Logger logger = LoggerFactory.getLogger(ProtoDeserializer.class);

    @Override
    public Event deserialize(final String topic, byte[] data) {
        logger.info("Event deserialize ...");
        try {
            return Event.parseFrom(data);
        } catch (final InvalidProtocolBufferException e) {
            logger.error("Received unparseable message", e);
            throw new RuntimeException("Received unparseable message " + e.getMessage(), e);
        }
    }
}

And update application.properties:

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=mock-test
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# HERE update
spring.kafka.consumer.value-deserializer=com.thinkerou.test.kafka.ProtoDeserializer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
# HERE update
spring.kafka.producer.value-serializer=com.thinkerou.test.kafka.ProtoSerializer

When I send one message, it will print debug log Person serialize ..., but not print debug log Event deserialize ... when consume message, I don't know why.

And run it still error:

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void com.thinkerou.test.kafka.Receiver.listen(org.apache.kafka.clients.consumer.ConsumerRecord<java.lang.Long, com.thinkerou.test.Event>) throws java.lang.InterruptedException' threw exception; nested exception is java.lang.ClassCastException: java.lang.String cannot be cast to com.thinkerou.test.Event

@garyrussell
Copy link
Contributor

Look at the consumer config in the log to make sure the deserializer is correct

2018-07-06 09:06:50.339  INFO 15667 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
	auto.commit.interval.ms = 5000
	auto.offset.reset = earliest
	bootstrap.servers = [10.0.0.6:9092]
	check.crcs = true
	client.id = 
	connections.max.idle.ms = 540000
	enable.auto.commit = true
	exclude.internal.topics = true
	fetch.max.bytes = 52428800
	fetch.max.wait.ms = 500
	fetch.min.bytes = 1
	group.id = foo
	heartbeat.interval.ms = 3000
	interceptor.classes = null
	internal.leave.group.on.close = true
	isolation.level = read_uncommitted
	key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
        ...

@thinkerou
Copy link
Author

@garyrussell thank you for reply.
log:

2018-07-06 21:17:03.510  INFO 83794 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
        auto.commit.interval.ms = 5000
        auto.offset.reset = latest
        bootstrap.servers = [localhost:9092]
        check.crcs = true
        client.id = 
        connections.max.idle.ms = 540000
        enable.auto.commit = true
        exclude.internal.topics = true
        fetch.max.bytes = 52428800
        fetch.max.wait.ms = 500
        fetch.min.bytes = 1
        group.id = media-cloud-mock-test
        heartbeat.interval.ms = 3000
        interceptor.classes = null
        internal.leave.group.on.close = true
        isolation.level = read_uncommitted
        key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
        max.partition.fetch.bytes = 1048576
        max.poll.interval.ms = 300000
        max.poll.records = 500
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
        receive.buffer.bytes = 65536
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 305000
        retry.backoff.ms = 100
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.mechanism = GSSAPI
        security.protocol = PLAINTEXT
        send.buffer.bytes = 131072
        session.timeout.ms = 10000
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.endpoint.identification.algorithm = null
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLS
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer

2018-07-06 21:17:03.590  INFO 83794 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version : 1.0.1

I find value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer not com.thinkerou.test.kafka.ProtoDeserializer

@garyrussell
Copy link
Contributor

Then you need to double check your application.properties; perhaps some other version of the file is picked up.

When I copy/paste your spring.kafka.consumer.value-deserializer=com.thinkerou.test.kafka.ProtoDeserializer I get an error (since that class doesn't exist for me).

@thinkerou
Copy link
Author

@garyrussell thanks....a lot, one typo error!!!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants