diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java index cab60a256ea..04efdae3c37 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java @@ -2,6 +2,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Properties; import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; @@ -26,6 +27,7 @@ public static class Cluster { String keySchemaNameTemplate = "%s-key"; String protobufFile; String protobufMessageName; + Map protobufMessageNameByTopic; List kafkaConnect; int jmxPort; boolean jmxSsl; diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaCluster.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaCluster.java index dce53c0b9b1..0b517d0cc76 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaCluster.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaCluster.java @@ -35,6 +35,7 @@ public class KafkaCluster { private final Throwable lastZookeeperException; private final Path protobufFile; private final String protobufMessageName; + private final Map protobufMessageNameByTopic; private final Properties properties; private final Boolean readOnly; private final Boolean disableLogDirsCollection; diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/DeserializationService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/DeserializationService.java index 816b0d9d87f..51e44915913 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/DeserializationService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/DeserializationService.java @@ -35,7 +35,8 @@ private RecordSerDe createRecordDeserializerForCluster(KafkaCluster cluster) { if (cluster.getProtobufFile() != null) { log.info("Using ProtobufFileRecordSerDe for cluster '{}'", cluster.getName()); return new ProtobufFileRecordSerDe(cluster.getProtobufFile(), - cluster.getProtobufMessageName(), objectMapper); + cluster.getProtobufMessageNameByTopic(), cluster.getProtobufMessageName(), + objectMapper); } else { log.info("Using SchemaRegistryAwareRecordSerDe for cluster '{}'", cluster.getName()); return new SchemaRegistryAwareRecordSerDe(cluster); diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/ProtobufFileRecordSerDe.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/ProtobufFileRecordSerDe.java index 69b4f1d1553..d8491b11794 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/ProtobufFileRecordSerDe.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/ProtobufFileRecordSerDe.java @@ -1,6 +1,7 @@ package com.provectus.kafka.ui.serde; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.protobuf.Descriptors.Descriptor; import com.google.protobuf.DynamicMessage; import com.google.protobuf.util.JsonFormat; import com.provectus.kafka.ui.model.MessageSchema; @@ -14,6 +15,8 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; +import java.util.HashMap; +import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.stream.Collectors; @@ -30,15 +33,35 @@ public class ProtobufFileRecordSerDe implements RecordSerDe { private final ObjectMapper objectMapper; private final Path protobufSchemaPath; private final ProtobufSchemaConverter schemaConverter = new ProtobufSchemaConverter(); + private final Map messageDescriptorMap; + private final Descriptor defaultMessageDescriptor; - public ProtobufFileRecordSerDe(Path protobufSchemaPath, String messageName, - ObjectMapper objectMapper) throws IOException { + public ProtobufFileRecordSerDe(Path protobufSchemaPath, Map messageNameMap, + String defaultMessageName, ObjectMapper objectMapper) + throws IOException { this.objectMapper = objectMapper; this.protobufSchemaPath = protobufSchemaPath; try (final Stream lines = Files.lines(protobufSchemaPath)) { - this.protobufSchema = new ProtobufSchema( + var schema = new ProtobufSchema( lines.collect(Collectors.joining("\n")) - ).copy(messageName); + ); + if (defaultMessageName != null) { + this.protobufSchema = schema.copy(defaultMessageName); + } else { + this.protobufSchema = schema; + } + this.messageDescriptorMap = new HashMap<>(); + if (messageNameMap != null) { + for (Map.Entry entry : messageNameMap.entrySet()) { + var descriptor = Objects.requireNonNull(protobufSchema.toDescriptor(entry.getValue()), + "The given message type is not found in protobuf definition: " + + entry.getValue()); + messageDescriptorMap.put(entry.getKey(), descriptor); + } + } + defaultMessageDescriptor = Objects.requireNonNull(protobufSchema.toDescriptor(), + "The given message type is not found in protobuf definition: " + + defaultMessageName); } } @@ -51,7 +74,7 @@ public DeserializedKeyValue deserialize(ConsumerRecord msg) { builder.keyFormat(MessageFormat.UNKNOWN); } if (msg.value() != null) { - builder.value(parse(msg.value().get())); + builder.value(parse(msg.value().get(), getDescriptor(msg.topic()))); builder.valueFormat(MessageFormat.PROTOBUF); } return builder.build(); @@ -60,11 +83,15 @@ public DeserializedKeyValue deserialize(ConsumerRecord msg) { } } + private Descriptor getDescriptor(String topic) { + return messageDescriptorMap.getOrDefault(topic, defaultMessageDescriptor); + } + @SneakyThrows - private String parse(byte[] value) { + private String parse(byte[] value, Descriptor descriptor) { DynamicMessage protoMsg = DynamicMessage.parseFrom( - protobufSchema.toDescriptor(), - new ByteArrayInputStream(value) + descriptor, + new ByteArrayInputStream(value) ); byte[] jsonFromProto = ProtobufSchemaUtils.toJson(protoMsg); return new String(jsonFromProto); @@ -78,7 +105,7 @@ public ProducerRecord serialize(String topic, if (data == null) { return new ProducerRecord<>(topic, partition, Objects.requireNonNull(key).getBytes(), null); } - DynamicMessage.Builder builder = protobufSchema.newMessageBuilder(); + DynamicMessage.Builder builder = DynamicMessage.newBuilder(getDescriptor(topic)); try { JsonFormat.parser().merge(data, builder); final DynamicMessage message = builder.build(); @@ -97,8 +124,8 @@ public ProducerRecord serialize(String topic, public TopicMessageSchema getTopicSchema(String topic) { final JsonSchema jsonSchema = schemaConverter.convert( - protobufSchemaPath.toUri(), - protobufSchema.toDescriptor() + protobufSchemaPath.toUri(), + getDescriptor(topic) ); final MessageSchema keySchema = new MessageSchema() .name(protobufSchema.fullName()) diff --git a/kafka-ui-api/src/main/resources/application-local.yml b/kafka-ui-api/src/main/resources/application-local.yml index 9b1458e1eda..93a5745c388 100644 --- a/kafka-ui-api/src/main/resources/application-local.yml +++ b/kafka-ui-api/src/main/resources/application-local.yml @@ -19,6 +19,14 @@ kafka: # address: http://localhost:8083 # jmxPort: 9998 # read-only: true + # - + # name: localUsingProtobufFile + # bootstrapServers: localhost:9092 + # protobufFile: messages.proto + # protobufMessageName: GenericMessage + # protobufMessageNameByTopic: + # input-topic: InputMessage + # output-topic: OutputMessage admin-client-timeout: 5000 zookeeper: connection-timeout: 1000 diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/serde/ProtobufFileRecordSerDeTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/serde/ProtobufFileRecordSerDeTest.java new file mode 100644 index 00000000000..0f87efe08f6 --- /dev/null +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/serde/ProtobufFileRecordSerDeTest.java @@ -0,0 +1,102 @@ +package com.provectus.kafka.ui.serde; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.protobuf.DynamicMessage; +import com.google.protobuf.util.JsonFormat; +import com.provectus.kafka.ui.serde.schemaregistry.MessageFormat; +import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema; +import java.io.IOException; +import java.net.URISyntaxException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Collections; +import java.util.Map; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.utils.Bytes; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +class ProtobufFileRecordSerDeTest { + + // Sample message of type `test.Person` + private static byte[] personMessage; + // Sample message of type `test.AddressBook` + private static byte[] addressBookMessage; + private static Path protobufSchemaPath; + + @BeforeAll + static void setUp() throws URISyntaxException, IOException { + protobufSchemaPath = Paths.get(ProtobufFileRecordSerDeTest.class.getClassLoader() + .getResource("address-book.proto").toURI()); + ProtobufSchema protobufSchema = new ProtobufSchema(Files.readString(protobufSchemaPath)); + + DynamicMessage.Builder builder = protobufSchema.newMessageBuilder("test.Person"); + JsonFormat.parser().merge( + "{ \"name\": \"My Name\",\"id\": 101, \"email\": \"user1@example.com\" }", builder); + personMessage = builder.build().toByteArray(); + + builder = protobufSchema.newMessageBuilder("test.AddressBook"); + JsonFormat.parser().merge( + "{\"version\": 1, \"people\": [" + + "{ \"name\": \"My Name\",\"id\": 102, \"email\": \"user2@example.com\" }]}", builder); + addressBookMessage = builder.build().toByteArray(); + } + + @Test + void testDeserialize() throws IOException { + var messageNameMap = Map.of( + "topic1", "test.Person", + "topic2", "test.AddressBook"); + var deserializer = + new ProtobufFileRecordSerDe(protobufSchemaPath, messageNameMap, null, new ObjectMapper()); + var msg1 = deserializer + .deserialize(new ConsumerRecord<>("topic1", 1, 0, Bytes.wrap("key".getBytes()), + Bytes.wrap(personMessage))); + assertEquals(MessageFormat.PROTOBUF, msg1.getValueFormat()); + assertTrue(msg1.getValue().contains("user1@example.com")); + + var msg2 = deserializer + .deserialize(new ConsumerRecord<>("topic2", 1, 1, Bytes.wrap("key".getBytes()), + Bytes.wrap(addressBookMessage))); + assertTrue(msg2.getValue().contains("user2@example.com")); + } + + @Test + void testNoDefaultMessageName() throws IOException { + // by default the first message type defined in proto definition is used + var deserializer = + new ProtobufFileRecordSerDe(protobufSchemaPath, Collections.emptyMap(), null, + new ObjectMapper()); + var msg = deserializer + .deserialize(new ConsumerRecord<>("topic", 1, 0, Bytes.wrap("key".getBytes()), + Bytes.wrap(personMessage))); + assertTrue(msg.getValue().contains("user1@example.com")); + } + + @Test + void testDefaultMessageName() throws IOException { + var messageNameMap = Map.of("topic1", "test.Person"); + var deserializer = + new ProtobufFileRecordSerDe(protobufSchemaPath, messageNameMap, "test.AddressBook", + new ObjectMapper()); + var msg = deserializer + .deserialize(new ConsumerRecord<>("a_random_topic", 1, 0, Bytes.wrap("key".getBytes()), + Bytes.wrap(addressBookMessage))); + assertTrue(msg.getValue().contains("user2@example.com")); + } + + @Test + void testSerialize() throws IOException { + var messageNameMap = Map.of("topic1", "test.Person"); + var serializer = + new ProtobufFileRecordSerDe(protobufSchemaPath, messageNameMap, "test.AddressBook", + new ObjectMapper()); + var serialized = serializer.serialize("topic1", "key1", "{\"name\":\"MyName\"}", 0); + assertNotNull(serialized.value()); + } +} diff --git a/kafka-ui-api/src/test/resources/address-book.proto b/kafka-ui-api/src/test/resources/address-book.proto new file mode 100644 index 00000000000..72eab7aab8c --- /dev/null +++ b/kafka-ui-api/src/test/resources/address-book.proto @@ -0,0 +1,39 @@ +// [START declaration] +syntax = "proto3"; +package test; + +// [END declaration] + +// [START java_declaration] +option java_multiple_files = true; +option java_package = "com.example.tutorial.protos"; +option java_outer_classname = "AddressBookProtos"; +// [END java_declaration] + +// [START messages] +message Person { + string name = 1; + int32 id = 2; // Unique ID number for this person. + string email = 3; + + enum PhoneType { + MOBILE = 0; + HOME = 1; + WORK = 2; + } + + message PhoneNumber { + string number = 1; + PhoneType type = 2; + } + + repeated PhoneNumber phones = 4; + +} + +// Our address book file is just one of these. +message AddressBook { + int32 version = 1; + repeated Person people = 2; +} +// [END messages] \ No newline at end of file