From ad6bdd8cbf7caf70818fb77eac5083c5800192d5 Mon Sep 17 00:00:00 2001 From: Meysam Zarezadeh Date: Mon, 6 Sep 2021 17:31:43 +0430 Subject: [PATCH 1/4] Use a map between topics and message-names when using ProtobufFile --- .../kafka/ui/config/ClustersProperties.java | 3 +- .../kafka/ui/model/KafkaCluster.java | 2 +- .../ui/serde/ProtobufFileRecordSerDe.java | 39 +++++++--- .../src/main/resources/application-local.yml | 8 ++ .../ui/serde/ProtobufFileRecordSerDeTest.java | 73 +++++++++++++++++++ .../src/test/resources/address-book.proto | 39 ++++++++++ 6 files changed, 152 insertions(+), 12 deletions(-) create mode 100644 kafka-ui-api/src/test/java/com/provectus/kafka/ui/serde/ProtobufFileRecordSerDeTest.java create mode 100644 kafka-ui-api/src/test/resources/address-book.proto diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java index cab60a256ea..b44393dff51 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java @@ -2,6 +2,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Properties; import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; @@ -25,7 +26,7 @@ public static class Cluster { String schemaNameTemplate = "%s-value"; String keySchemaNameTemplate = "%s-key"; String protobufFile; - String protobufMessageName; + Map protobufMessageName; List kafkaConnect; int jmxPort; boolean jmxSsl; diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaCluster.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaCluster.java index dce53c0b9b1..3fb6b36addc 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaCluster.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaCluster.java @@ -34,7 +34,7 @@ public class KafkaCluster { private final Throwable lastKafkaException; private final Throwable lastZookeeperException; private final Path protobufFile; - private final String protobufMessageName; + private final Map protobufMessageName; private final Properties properties; private final Boolean readOnly; private final Boolean disableLogDirsCollection; diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/ProtobufFileRecordSerDe.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/ProtobufFileRecordSerDe.java index 69b4f1d1553..d7184772180 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/ProtobufFileRecordSerDe.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/ProtobufFileRecordSerDe.java @@ -1,6 +1,7 @@ package com.provectus.kafka.ui.serde; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.protobuf.Descriptors.Descriptor; import com.google.protobuf.DynamicMessage; import com.google.protobuf.util.JsonFormat; import com.provectus.kafka.ui.model.MessageSchema; @@ -14,6 +15,7 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; +import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.stream.Collectors; @@ -26,19 +28,28 @@ //TODO: currently we assume that keys for this serde are always string - need to discuss if it is ok public class ProtobufFileRecordSerDe implements RecordSerDe { + public static final String TOPICS_DEFAULT_MESSAGE_NAME = "_default"; private final ProtobufSchema protobufSchema; private final ObjectMapper objectMapper; private final Path protobufSchemaPath; private final ProtobufSchemaConverter schemaConverter = new ProtobufSchemaConverter(); + private final Map messageNameMap; - public ProtobufFileRecordSerDe(Path protobufSchemaPath, String messageName, + public ProtobufFileRecordSerDe(Path protobufSchemaPath, Map messageNameMap, ObjectMapper objectMapper) throws IOException { this.objectMapper = objectMapper; this.protobufSchemaPath = protobufSchemaPath; + this.messageNameMap = messageNameMap; try (final Stream lines = Files.lines(protobufSchemaPath)) { - this.protobufSchema = new ProtobufSchema( - lines.collect(Collectors.joining("\n")) - ).copy(messageName); + var schema = new ProtobufSchema( + lines.collect(Collectors.joining("\n")) + ); + var defaultMessageName = messageNameMap.get(TOPICS_DEFAULT_MESSAGE_NAME); + if (defaultMessageName != null) { + this.protobufSchema = schema.copy(defaultMessageName); + } else { + this.protobufSchema = schema; + } } } @@ -51,7 +62,7 @@ public DeserializedKeyValue deserialize(ConsumerRecord msg) { builder.keyFormat(MessageFormat.UNKNOWN); } if (msg.value() != null) { - builder.value(parse(msg.value().get())); + builder.value(parse(msg.value().get(), getDescriptor(msg.topic()))); builder.valueFormat(MessageFormat.PROTOBUF); } return builder.build(); @@ -60,11 +71,19 @@ public DeserializedKeyValue deserialize(ConsumerRecord msg) { } } + private Descriptor getDescriptor(String topic) { + var messageName = messageNameMap.get(topic); + if (messageName != null) { + return protobufSchema.toDescriptor(messageName); + } + return protobufSchema.toDescriptor(); + } + @SneakyThrows - private String parse(byte[] value) { + private String parse(byte[] value, Descriptor descriptor) { DynamicMessage protoMsg = DynamicMessage.parseFrom( - protobufSchema.toDescriptor(), - new ByteArrayInputStream(value) + descriptor, + new ByteArrayInputStream(value) ); byte[] jsonFromProto = ProtobufSchemaUtils.toJson(protoMsg); return new String(jsonFromProto); @@ -97,8 +116,8 @@ public ProducerRecord serialize(String topic, public TopicMessageSchema getTopicSchema(String topic) { final JsonSchema jsonSchema = schemaConverter.convert( - protobufSchemaPath.toUri(), - protobufSchema.toDescriptor() + protobufSchemaPath.toUri(), + getDescriptor(topic) ); final MessageSchema keySchema = new MessageSchema() .name(protobufSchema.fullName()) diff --git a/kafka-ui-api/src/main/resources/application-local.yml b/kafka-ui-api/src/main/resources/application-local.yml index 9b1458e1eda..ce31a519676 100644 --- a/kafka-ui-api/src/main/resources/application-local.yml +++ b/kafka-ui-api/src/main/resources/application-local.yml @@ -19,6 +19,14 @@ kafka: # address: http://localhost:8083 # jmxPort: 9998 # read-only: true + # - + # name: localUsingProtobufFile + # bootstrapServers: localhost:9092 + # protobufFile: messages.proto + # protobufMessageName: + # _default: GenericMessage + # input-topic: InputMessage + # output-topic: OutputMessage admin-client-timeout: 5000 zookeeper: connection-timeout: 1000 diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/serde/ProtobufFileRecordSerDeTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/serde/ProtobufFileRecordSerDeTest.java new file mode 100644 index 00000000000..7c5afd3818b --- /dev/null +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/serde/ProtobufFileRecordSerDeTest.java @@ -0,0 +1,73 @@ +package com.provectus.kafka.ui.serde; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.provectus.kafka.ui.serde.schemaregistry.MessageFormat; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.utils.Bytes; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Base64; +import java.util.Collections; +import java.util.HashMap; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class ProtobufFileRecordSerDeTest { + + // Two sample messages used here are generated using `address-book.proto` and contain a person with following email address + public static final String TEST_USER_EMAIL = "user@example.com"; + // Sample message of type `test.Person` + private final byte[] personMessage = Base64.getDecoder().decode("CgRVc2VyEGUaEHVzZXJAZXhhbXBsZS5jb20iCgoIMTEyMjMzNDQ="); + // Sample message of type `test.AddressBook` + private final byte[] addressBookMessage = Base64.getDecoder().decode("CAESJgoEVXNlchBlGhB1c2VyQGV4YW1wbGUuY29tIgoKCDExMjIzMzQ0"); + private static Path protobufSchema; + + @BeforeAll + static void setUp() throws URISyntaxException { + protobufSchema = Paths.get(ProtobufFileRecordSerDeTest.class.getClassLoader().getResource("address-book.proto").toURI()); + } + + @Test + void testDeserialize() throws IOException { + var messageNameMap = new HashMap() {{ + put("topic1", "test.Person"); + put("topic2", "test.AddressBook"); + }}; + var deserializer = new ProtobufFileRecordSerDe(protobufSchema, messageNameMap, new ObjectMapper()); + var msg1 = deserializer.deserialize(new ConsumerRecord<>("topic1", 1, 0, Bytes.wrap("key".getBytes()), + Bytes.wrap(personMessage))); + assertEquals(MessageFormat.PROTOBUF, msg1.getValueFormat()); + assertTrue(msg1.getValue().contains(TEST_USER_EMAIL)); + + var msg2 = deserializer.deserialize(new ConsumerRecord<>("topic2", 1, 1, Bytes.wrap("key".getBytes()), + Bytes.wrap(addressBookMessage))); + assertTrue(msg2.getValue().contains(TEST_USER_EMAIL)); + } + + @Test + void testNoDefaultMessageName() throws IOException { + // by default the first message type defined in proto definition is used + var deserializer = new ProtobufFileRecordSerDe(protobufSchema, Collections.emptyMap(), new ObjectMapper()); + var msg = deserializer.deserialize(new ConsumerRecord<>("topic", 1, 0, Bytes.wrap("key".getBytes()), + Bytes.wrap(personMessage))); + assertTrue(msg.getValue().contains(TEST_USER_EMAIL)); + } + + @Test + void testDefaultMessageName() throws IOException { + var messageNameMap = new HashMap() {{ + put("topic1", "test.Person"); + put("_default", "test.AddressBook"); + }}; + var deserializer = new ProtobufFileRecordSerDe(protobufSchema, messageNameMap, new ObjectMapper()); + var msg = deserializer.deserialize(new ConsumerRecord<>("a_random_topic", 1, 0, Bytes.wrap("key".getBytes()), + Bytes.wrap(addressBookMessage))); + assertTrue(msg.getValue().contains(TEST_USER_EMAIL)); + } +} \ No newline at end of file diff --git a/kafka-ui-api/src/test/resources/address-book.proto b/kafka-ui-api/src/test/resources/address-book.proto new file mode 100644 index 00000000000..72eab7aab8c --- /dev/null +++ b/kafka-ui-api/src/test/resources/address-book.proto @@ -0,0 +1,39 @@ +// [START declaration] +syntax = "proto3"; +package test; + +// [END declaration] + +// [START java_declaration] +option java_multiple_files = true; +option java_package = "com.example.tutorial.protos"; +option java_outer_classname = "AddressBookProtos"; +// [END java_declaration] + +// [START messages] +message Person { + string name = 1; + int32 id = 2; // Unique ID number for this person. + string email = 3; + + enum PhoneType { + MOBILE = 0; + HOME = 1; + WORK = 2; + } + + message PhoneNumber { + string number = 1; + PhoneType type = 2; + } + + repeated PhoneNumber phones = 4; + +} + +// Our address book file is just one of these. +message AddressBook { + int32 version = 1; + repeated Person people = 2; +} +// [END messages] \ No newline at end of file From 1f07b4a26bfcfb77491c83b4b625f3f1ce1c7662 Mon Sep 17 00:00:00 2001 From: Meysam Zarezadeh Date: Tue, 7 Sep 2021 11:35:33 +0430 Subject: [PATCH 2/4] fixed checkstyle problems --- .../kafka/ui/config/ClustersProperties.java | 2 +- .../kafka/ui/model/KafkaCluster.java | 2 +- .../ui/serde/ProtobufFileRecordSerDeTest.java | 118 ++++++++++-------- 3 files changed, 65 insertions(+), 57 deletions(-) diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java index b44393dff51..e788e282ecc 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java @@ -26,7 +26,7 @@ public static class Cluster { String schemaNameTemplate = "%s-value"; String keySchemaNameTemplate = "%s-key"; String protobufFile; - Map protobufMessageName; + Map protobufMessageName; List kafkaConnect; int jmxPort; boolean jmxSsl; diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaCluster.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaCluster.java index 3fb6b36addc..167135f39f3 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaCluster.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaCluster.java @@ -34,7 +34,7 @@ public class KafkaCluster { private final Throwable lastKafkaException; private final Throwable lastZookeeperException; private final Path protobufFile; - private final Map protobufMessageName; + private final Map protobufMessageName; private final Properties properties; private final Boolean readOnly; private final Boolean disableLogDirsCollection; diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/serde/ProtobufFileRecordSerDeTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/serde/ProtobufFileRecordSerDeTest.java index 7c5afd3818b..79c5565c317 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/serde/ProtobufFileRecordSerDeTest.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/serde/ProtobufFileRecordSerDeTest.java @@ -1,73 +1,81 @@ package com.provectus.kafka.ui.serde; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + import com.fasterxml.jackson.databind.ObjectMapper; import com.provectus.kafka.ui.serde.schemaregistry.MessageFormat; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.common.utils.Bytes; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Test; - import java.io.IOException; import java.net.URISyntaxException; import java.nio.file.Path; import java.nio.file.Paths; import java.util.Base64; import java.util.Collections; -import java.util.HashMap; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; +import java.util.Map; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.utils.Bytes; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; class ProtobufFileRecordSerDeTest { - // Two sample messages used here are generated using `address-book.proto` and contain a person with following email address - public static final String TEST_USER_EMAIL = "user@example.com"; - // Sample message of type `test.Person` - private final byte[] personMessage = Base64.getDecoder().decode("CgRVc2VyEGUaEHVzZXJAZXhhbXBsZS5jb20iCgoIMTEyMjMzNDQ="); - // Sample message of type `test.AddressBook` - private final byte[] addressBookMessage = Base64.getDecoder().decode("CAESJgoEVXNlchBlGhB1c2VyQGV4YW1wbGUuY29tIgoKCDExMjIzMzQ0"); - private static Path protobufSchema; + // Two sample messages used here are generated using `address-book.proto` and contain a person + // with following email address + public static final String TEST_USER_EMAIL = "user@example.com"; + // Sample message of type `test.Person` + private final byte[] personMessage = Base64.getDecoder() + .decode("CgRVc2VyEGUaEHVzZXJAZXhhbXBsZS5jb20iCgoIMTEyMjMzNDQ="); + // Sample message of type `test.AddressBook` + private final byte[] addressBookMessage = Base64.getDecoder() + .decode("CAESJgoEVXNlchBlGhB1c2VyQGV4YW1wbGUuY29tIgoKCDExMjIzMzQ0"); + private static Path protobufSchema; - @BeforeAll - static void setUp() throws URISyntaxException { - protobufSchema = Paths.get(ProtobufFileRecordSerDeTest.class.getClassLoader().getResource("address-book.proto").toURI()); - } + @BeforeAll + static void setUp() throws URISyntaxException { + protobufSchema = Paths.get(ProtobufFileRecordSerDeTest.class.getClassLoader() + .getResource("address-book.proto").toURI()); + } - @Test - void testDeserialize() throws IOException { - var messageNameMap = new HashMap() {{ - put("topic1", "test.Person"); - put("topic2", "test.AddressBook"); - }}; - var deserializer = new ProtobufFileRecordSerDe(protobufSchema, messageNameMap, new ObjectMapper()); - var msg1 = deserializer.deserialize(new ConsumerRecord<>("topic1", 1, 0, Bytes.wrap("key".getBytes()), - Bytes.wrap(personMessage))); - assertEquals(MessageFormat.PROTOBUF, msg1.getValueFormat()); - assertTrue(msg1.getValue().contains(TEST_USER_EMAIL)); + @Test + void testDeserialize() throws IOException { + var messageNameMap = Map.of( + "topic1", "test.Person", + "topic2", "test.AddressBook"); + var deserializer = + new ProtobufFileRecordSerDe(protobufSchema, messageNameMap, new ObjectMapper()); + var msg1 = deserializer + .deserialize(new ConsumerRecord<>("topic1", 1, 0, Bytes.wrap("key".getBytes()), + Bytes.wrap(personMessage))); + assertEquals(MessageFormat.PROTOBUF, msg1.getValueFormat()); + assertTrue(msg1.getValue().contains(TEST_USER_EMAIL)); - var msg2 = deserializer.deserialize(new ConsumerRecord<>("topic2", 1, 1, Bytes.wrap("key".getBytes()), - Bytes.wrap(addressBookMessage))); - assertTrue(msg2.getValue().contains(TEST_USER_EMAIL)); - } + var msg2 = deserializer + .deserialize(new ConsumerRecord<>("topic2", 1, 1, Bytes.wrap("key".getBytes()), + Bytes.wrap(addressBookMessage))); + assertTrue(msg2.getValue().contains(TEST_USER_EMAIL)); + } - @Test - void testNoDefaultMessageName() throws IOException { - // by default the first message type defined in proto definition is used - var deserializer = new ProtobufFileRecordSerDe(protobufSchema, Collections.emptyMap(), new ObjectMapper()); - var msg = deserializer.deserialize(new ConsumerRecord<>("topic", 1, 0, Bytes.wrap("key".getBytes()), - Bytes.wrap(personMessage))); - assertTrue(msg.getValue().contains(TEST_USER_EMAIL)); - } + @Test + void testNoDefaultMessageName() throws IOException { + // by default the first message type defined in proto definition is used + var deserializer = + new ProtobufFileRecordSerDe(protobufSchema, Collections.emptyMap(), new ObjectMapper()); + var msg = deserializer + .deserialize(new ConsumerRecord<>("topic", 1, 0, Bytes.wrap("key".getBytes()), + Bytes.wrap(personMessage))); + assertTrue(msg.getValue().contains(TEST_USER_EMAIL)); + } - @Test - void testDefaultMessageName() throws IOException { - var messageNameMap = new HashMap() {{ - put("topic1", "test.Person"); - put("_default", "test.AddressBook"); - }}; - var deserializer = new ProtobufFileRecordSerDe(protobufSchema, messageNameMap, new ObjectMapper()); - var msg = deserializer.deserialize(new ConsumerRecord<>("a_random_topic", 1, 0, Bytes.wrap("key".getBytes()), - Bytes.wrap(addressBookMessage))); - assertTrue(msg.getValue().contains(TEST_USER_EMAIL)); - } -} \ No newline at end of file + @Test + void testDefaultMessageName() throws IOException { + var messageNameMap = Map.of( + "topic1", "test.Person", + "_default", "test.AddressBook"); + var deserializer = + new ProtobufFileRecordSerDe(protobufSchema, messageNameMap, new ObjectMapper()); + var msg = deserializer + .deserialize(new ConsumerRecord<>("a_random_topic", 1, 0, Bytes.wrap("key".getBytes()), + Bytes.wrap(addressBookMessage))); + assertTrue(msg.getValue().contains(TEST_USER_EMAIL)); + } +} From dffb272c4851037c90552d7f31d423d41997fc84 Mon Sep 17 00:00:00 2001 From: Meysam Zarezadeh Date: Tue, 7 Sep 2021 17:30:06 +0430 Subject: [PATCH 3/4] make config backward-compatible --- .../kafka/ui/config/ClustersProperties.java | 3 +- .../kafka/ui/model/KafkaCluster.java | 3 +- .../ui/serde/DeserializationService.java | 3 +- .../ui/serde/ProtobufFileRecordSerDe.java | 15 +++--- .../src/main/resources/application-local.yml | 4 +- .../ui/serde/ProtobufFileRecordSerDeTest.java | 52 +++++++++++-------- 6 files changed, 48 insertions(+), 32 deletions(-) diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java index e788e282ecc..04efdae3c37 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java @@ -26,7 +26,8 @@ public static class Cluster { String schemaNameTemplate = "%s-value"; String keySchemaNameTemplate = "%s-key"; String protobufFile; - Map protobufMessageName; + String protobufMessageName; + Map protobufMessageNameByTopic; List kafkaConnect; int jmxPort; boolean jmxSsl; diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaCluster.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaCluster.java index 167135f39f3..0b517d0cc76 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaCluster.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaCluster.java @@ -34,7 +34,8 @@ public class KafkaCluster { private final Throwable lastKafkaException; private final Throwable lastZookeeperException; private final Path protobufFile; - private final Map protobufMessageName; + private final String protobufMessageName; + private final Map protobufMessageNameByTopic; private final Properties properties; private final Boolean readOnly; private final Boolean disableLogDirsCollection; diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/DeserializationService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/DeserializationService.java index 816b0d9d87f..51e44915913 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/DeserializationService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/DeserializationService.java @@ -35,7 +35,8 @@ private RecordSerDe createRecordDeserializerForCluster(KafkaCluster cluster) { if (cluster.getProtobufFile() != null) { log.info("Using ProtobufFileRecordSerDe for cluster '{}'", cluster.getName()); return new ProtobufFileRecordSerDe(cluster.getProtobufFile(), - cluster.getProtobufMessageName(), objectMapper); + cluster.getProtobufMessageNameByTopic(), cluster.getProtobufMessageName(), + objectMapper); } else { log.info("Using SchemaRegistryAwareRecordSerDe for cluster '{}'", cluster.getName()); return new SchemaRegistryAwareRecordSerDe(cluster); diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/ProtobufFileRecordSerDe.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/ProtobufFileRecordSerDe.java index d7184772180..a29b5d323d0 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/ProtobufFileRecordSerDe.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/ProtobufFileRecordSerDe.java @@ -15,6 +15,7 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; +import java.util.Collections; import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -28,7 +29,6 @@ //TODO: currently we assume that keys for this serde are always string - need to discuss if it is ok public class ProtobufFileRecordSerDe implements RecordSerDe { - public static final String TOPICS_DEFAULT_MESSAGE_NAME = "_default"; private final ProtobufSchema protobufSchema; private final ObjectMapper objectMapper; private final Path protobufSchemaPath; @@ -36,15 +36,15 @@ public class ProtobufFileRecordSerDe implements RecordSerDe { private final Map messageNameMap; public ProtobufFileRecordSerDe(Path protobufSchemaPath, Map messageNameMap, - ObjectMapper objectMapper) throws IOException { + String defaultMessageName, ObjectMapper objectMapper) + throws IOException { this.objectMapper = objectMapper; this.protobufSchemaPath = protobufSchemaPath; - this.messageNameMap = messageNameMap; + this.messageNameMap = messageNameMap != null ? messageNameMap : Collections.emptyMap(); try (final Stream lines = Files.lines(protobufSchemaPath)) { var schema = new ProtobufSchema( - lines.collect(Collectors.joining("\n")) + lines.collect(Collectors.joining("\n")) ); - var defaultMessageName = messageNameMap.get(TOPICS_DEFAULT_MESSAGE_NAME); if (defaultMessageName != null) { this.protobufSchema = schema.copy(defaultMessageName); } else { @@ -97,7 +97,10 @@ public ProducerRecord serialize(String topic, if (data == null) { return new ProducerRecord<>(topic, partition, Objects.requireNonNull(key).getBytes(), null); } - DynamicMessage.Builder builder = protobufSchema.newMessageBuilder(); + var messageName = messageNameMap.get(topic); + DynamicMessage.Builder builder = messageName != null + ? protobufSchema.newMessageBuilder(messageName) + : protobufSchema.newMessageBuilder(); try { JsonFormat.parser().merge(data, builder); final DynamicMessage message = builder.build(); diff --git a/kafka-ui-api/src/main/resources/application-local.yml b/kafka-ui-api/src/main/resources/application-local.yml index ce31a519676..93a5745c388 100644 --- a/kafka-ui-api/src/main/resources/application-local.yml +++ b/kafka-ui-api/src/main/resources/application-local.yml @@ -23,8 +23,8 @@ kafka: # name: localUsingProtobufFile # bootstrapServers: localhost:9092 # protobufFile: messages.proto - # protobufMessageName: - # _default: GenericMessage + # protobufMessageName: GenericMessage + # protobufMessageNameByTopic: # input-topic: InputMessage # output-topic: OutputMessage admin-client-timeout: 5000 diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/serde/ProtobufFileRecordSerDeTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/serde/ProtobufFileRecordSerDeTest.java index 79c5565c317..7765f1b7100 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/serde/ProtobufFileRecordSerDeTest.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/serde/ProtobufFileRecordSerDeTest.java @@ -4,12 +4,15 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.protobuf.DynamicMessage; +import com.google.protobuf.util.JsonFormat; import com.provectus.kafka.ui.serde.schemaregistry.MessageFormat; +import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema; import java.io.IOException; import java.net.URISyntaxException; +import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; -import java.util.Base64; import java.util.Collections; import java.util.Map; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -19,21 +22,28 @@ class ProtobufFileRecordSerDeTest { - // Two sample messages used here are generated using `address-book.proto` and contain a person - // with following email address - public static final String TEST_USER_EMAIL = "user@example.com"; // Sample message of type `test.Person` - private final byte[] personMessage = Base64.getDecoder() - .decode("CgRVc2VyEGUaEHVzZXJAZXhhbXBsZS5jb20iCgoIMTEyMjMzNDQ="); + private static byte[] personMessage; // Sample message of type `test.AddressBook` - private final byte[] addressBookMessage = Base64.getDecoder() - .decode("CAESJgoEVXNlchBlGhB1c2VyQGV4YW1wbGUuY29tIgoKCDExMjIzMzQ0"); - private static Path protobufSchema; + private static byte[] addressBookMessage; + private static Path protobufSchemaPath; @BeforeAll - static void setUp() throws URISyntaxException { - protobufSchema = Paths.get(ProtobufFileRecordSerDeTest.class.getClassLoader() + static void setUp() throws URISyntaxException, IOException { + protobufSchemaPath = Paths.get(ProtobufFileRecordSerDeTest.class.getClassLoader() .getResource("address-book.proto").toURI()); + ProtobufSchema protobufSchema = new ProtobufSchema(Files.readString(protobufSchemaPath)); + + DynamicMessage.Builder builder = protobufSchema.newMessageBuilder("test.Person"); + JsonFormat.parser().merge( + "{ \"name\": \"My Name\",\"id\": 101, \"email\": \"user1@example.com\" }", builder); + personMessage = builder.build().toByteArray(); + + builder = protobufSchema.newMessageBuilder("test.AddressBook"); + JsonFormat.parser().merge( + "{\"version\": 1, \"people\": [" + + "{ \"name\": \"My Name\",\"id\": 102, \"email\": \"user2@example.com\" }]}", builder); + addressBookMessage = builder.build().toByteArray(); } @Test @@ -42,40 +52,40 @@ void testDeserialize() throws IOException { "topic1", "test.Person", "topic2", "test.AddressBook"); var deserializer = - new ProtobufFileRecordSerDe(protobufSchema, messageNameMap, new ObjectMapper()); + new ProtobufFileRecordSerDe(protobufSchemaPath, messageNameMap, null, new ObjectMapper()); var msg1 = deserializer .deserialize(new ConsumerRecord<>("topic1", 1, 0, Bytes.wrap("key".getBytes()), Bytes.wrap(personMessage))); assertEquals(MessageFormat.PROTOBUF, msg1.getValueFormat()); - assertTrue(msg1.getValue().contains(TEST_USER_EMAIL)); + assertTrue(msg1.getValue().contains("user1@example.com")); var msg2 = deserializer .deserialize(new ConsumerRecord<>("topic2", 1, 1, Bytes.wrap("key".getBytes()), Bytes.wrap(addressBookMessage))); - assertTrue(msg2.getValue().contains(TEST_USER_EMAIL)); + assertTrue(msg2.getValue().contains("user2@example.com")); } @Test void testNoDefaultMessageName() throws IOException { // by default the first message type defined in proto definition is used var deserializer = - new ProtobufFileRecordSerDe(protobufSchema, Collections.emptyMap(), new ObjectMapper()); + new ProtobufFileRecordSerDe(protobufSchemaPath, Collections.emptyMap(), null, + new ObjectMapper()); var msg = deserializer .deserialize(new ConsumerRecord<>("topic", 1, 0, Bytes.wrap("key".getBytes()), Bytes.wrap(personMessage))); - assertTrue(msg.getValue().contains(TEST_USER_EMAIL)); + assertTrue(msg.getValue().contains("user1@example.com")); } @Test void testDefaultMessageName() throws IOException { - var messageNameMap = Map.of( - "topic1", "test.Person", - "_default", "test.AddressBook"); + var messageNameMap = Map.of("topic1", "test.Person"); var deserializer = - new ProtobufFileRecordSerDe(protobufSchema, messageNameMap, new ObjectMapper()); + new ProtobufFileRecordSerDe(protobufSchemaPath, messageNameMap, "test.AddressBook", + new ObjectMapper()); var msg = deserializer .deserialize(new ConsumerRecord<>("a_random_topic", 1, 0, Bytes.wrap("key".getBytes()), Bytes.wrap(addressBookMessage))); - assertTrue(msg.getValue().contains(TEST_USER_EMAIL)); + assertTrue(msg.getValue().contains("user2@example.com")); } } From fd76a2559ee72ffeed4b477accb7ff7ee739591b Mon Sep 17 00:00:00 2001 From: Meysam Zarezadeh Date: Sat, 11 Sep 2021 11:30:48 +0430 Subject: [PATCH 4/4] Validate the given message names for the topics in ProtobufFileRecordSerDe --- .../ui/serde/ProtobufFileRecordSerDe.java | 29 +++++++++++-------- .../ui/serde/ProtobufFileRecordSerDeTest.java | 11 +++++++ 2 files changed, 28 insertions(+), 12 deletions(-) diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/ProtobufFileRecordSerDe.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/ProtobufFileRecordSerDe.java index a29b5d323d0..d8491b11794 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/ProtobufFileRecordSerDe.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/ProtobufFileRecordSerDe.java @@ -15,7 +15,7 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; -import java.util.Collections; +import java.util.HashMap; import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -33,14 +33,14 @@ public class ProtobufFileRecordSerDe implements RecordSerDe { private final ObjectMapper objectMapper; private final Path protobufSchemaPath; private final ProtobufSchemaConverter schemaConverter = new ProtobufSchemaConverter(); - private final Map messageNameMap; + private final Map messageDescriptorMap; + private final Descriptor defaultMessageDescriptor; public ProtobufFileRecordSerDe(Path protobufSchemaPath, Map messageNameMap, String defaultMessageName, ObjectMapper objectMapper) throws IOException { this.objectMapper = objectMapper; this.protobufSchemaPath = protobufSchemaPath; - this.messageNameMap = messageNameMap != null ? messageNameMap : Collections.emptyMap(); try (final Stream lines = Files.lines(protobufSchemaPath)) { var schema = new ProtobufSchema( lines.collect(Collectors.joining("\n")) @@ -50,6 +50,18 @@ public ProtobufFileRecordSerDe(Path protobufSchemaPath, Map mess } else { this.protobufSchema = schema; } + this.messageDescriptorMap = new HashMap<>(); + if (messageNameMap != null) { + for (Map.Entry entry : messageNameMap.entrySet()) { + var descriptor = Objects.requireNonNull(protobufSchema.toDescriptor(entry.getValue()), + "The given message type is not found in protobuf definition: " + + entry.getValue()); + messageDescriptorMap.put(entry.getKey(), descriptor); + } + } + defaultMessageDescriptor = Objects.requireNonNull(protobufSchema.toDescriptor(), + "The given message type is not found in protobuf definition: " + + defaultMessageName); } } @@ -72,11 +84,7 @@ public DeserializedKeyValue deserialize(ConsumerRecord msg) { } private Descriptor getDescriptor(String topic) { - var messageName = messageNameMap.get(topic); - if (messageName != null) { - return protobufSchema.toDescriptor(messageName); - } - return protobufSchema.toDescriptor(); + return messageDescriptorMap.getOrDefault(topic, defaultMessageDescriptor); } @SneakyThrows @@ -97,10 +105,7 @@ public ProducerRecord serialize(String topic, if (data == null) { return new ProducerRecord<>(topic, partition, Objects.requireNonNull(key).getBytes(), null); } - var messageName = messageNameMap.get(topic); - DynamicMessage.Builder builder = messageName != null - ? protobufSchema.newMessageBuilder(messageName) - : protobufSchema.newMessageBuilder(); + DynamicMessage.Builder builder = DynamicMessage.newBuilder(getDescriptor(topic)); try { JsonFormat.parser().merge(data, builder); final DynamicMessage message = builder.build(); diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/serde/ProtobufFileRecordSerDeTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/serde/ProtobufFileRecordSerDeTest.java index 7765f1b7100..0f87efe08f6 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/serde/ProtobufFileRecordSerDeTest.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/serde/ProtobufFileRecordSerDeTest.java @@ -1,6 +1,7 @@ package com.provectus.kafka.ui.serde; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; import com.fasterxml.jackson.databind.ObjectMapper; @@ -88,4 +89,14 @@ void testDefaultMessageName() throws IOException { Bytes.wrap(addressBookMessage))); assertTrue(msg.getValue().contains("user2@example.com")); } + + @Test + void testSerialize() throws IOException { + var messageNameMap = Map.of("topic1", "test.Person"); + var serializer = + new ProtobufFileRecordSerDe(protobufSchemaPath, messageNameMap, "test.AddressBook", + new ObjectMapper()); + var serialized = serializer.serialize("topic1", "key1", "{\"name\":\"MyName\"}", 0); + assertNotNull(serialized.value()); + } }