Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ISSUE-849: Use a map between topics and message-names when using ProtobufFile #854

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
Expand All @@ -26,6 +27,7 @@ public static class Cluster {
String keySchemaNameTemplate = "%s-key";
String protobufFile;
String protobufMessageName;
Map<String, String> protobufMessageNameByTopic;
List<ConnectCluster> kafkaConnect;
int jmxPort;
boolean jmxSsl;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class KafkaCluster {
private final Throwable lastZookeeperException;
private final Path protobufFile;
private final String protobufMessageName;
private final Map<String, String> protobufMessageNameByTopic;
private final Properties properties;
private final Boolean readOnly;
private final Boolean disableLogDirsCollection;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ private RecordSerDe createRecordDeserializerForCluster(KafkaCluster cluster) {
if (cluster.getProtobufFile() != null) {
log.info("Using ProtobufFileRecordSerDe for cluster '{}'", cluster.getName());
return new ProtobufFileRecordSerDe(cluster.getProtobufFile(),
cluster.getProtobufMessageName(), objectMapper);
cluster.getProtobufMessageNameByTopic(), cluster.getProtobufMessageName(),
objectMapper);
} else {
log.info("Using SchemaRegistryAwareRecordSerDe for cluster '{}'", cluster.getName());
return new SchemaRegistryAwareRecordSerDe(cluster);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.provectus.kafka.ui.serde;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.protobuf.Descriptors.Descriptor;
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.util.JsonFormat;
import com.provectus.kafka.ui.model.MessageSchema;
Expand All @@ -14,6 +15,8 @@
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
Expand All @@ -30,15 +33,23 @@ public class ProtobufFileRecordSerDe implements RecordSerDe {
private final ObjectMapper objectMapper;
private final Path protobufSchemaPath;
private final ProtobufSchemaConverter schemaConverter = new ProtobufSchemaConverter();
private final Map<String, String> messageNameMap;

public ProtobufFileRecordSerDe(Path protobufSchemaPath, String messageName,
ObjectMapper objectMapper) throws IOException {
public ProtobufFileRecordSerDe(Path protobufSchemaPath, Map<String, String> messageNameMap,
String defaultMessageName, ObjectMapper objectMapper)
throws IOException {
this.objectMapper = objectMapper;
this.protobufSchemaPath = protobufSchemaPath;
this.messageNameMap = messageNameMap != null ? messageNameMap : Collections.emptyMap();
try (final Stream<String> lines = Files.lines(protobufSchemaPath)) {
this.protobufSchema = new ProtobufSchema(
var schema = new ProtobufSchema(
lines.collect(Collectors.joining("\n"))
).copy(messageName);
);
if (defaultMessageName != null) {
this.protobufSchema = schema.copy(defaultMessageName);
} else {
this.protobufSchema = schema;
}
}
}

Expand All @@ -51,7 +62,7 @@ public DeserializedKeyValue deserialize(ConsumerRecord<Bytes, Bytes> msg) {
builder.keyFormat(MessageFormat.UNKNOWN);
}
if (msg.value() != null) {
builder.value(parse(msg.value().get()));
builder.value(parse(msg.value().get(), getDescriptor(msg.topic())));
builder.valueFormat(MessageFormat.PROTOBUF);
}
return builder.build();
Expand All @@ -60,11 +71,19 @@ public DeserializedKeyValue deserialize(ConsumerRecord<Bytes, Bytes> msg) {
}
}

private Descriptor getDescriptor(String topic) {
var messageName = messageNameMap.get(topic);
if (messageName != null) {
return protobufSchema.toDescriptor(messageName);
}
return protobufSchema.toDescriptor();
}

@SneakyThrows
private String parse(byte[] value) {
private String parse(byte[] value, Descriptor descriptor) {
DynamicMessage protoMsg = DynamicMessage.parseFrom(
protobufSchema.toDescriptor(),
new ByteArrayInputStream(value)
descriptor,
new ByteArrayInputStream(value)
);
byte[] jsonFromProto = ProtobufSchemaUtils.toJson(protoMsg);
return new String(jsonFromProto);
Expand All @@ -78,7 +97,10 @@ public ProducerRecord<byte[], byte[]> serialize(String topic,
if (data == null) {
return new ProducerRecord<>(topic, partition, Objects.requireNonNull(key).getBytes(), null);
}
DynamicMessage.Builder builder = protobufSchema.newMessageBuilder();
var messageName = messageNameMap.get(topic);
DynamicMessage.Builder builder = messageName != null
? protobufSchema.newMessageBuilder(messageName)
: protobufSchema.newMessageBuilder();
try {
JsonFormat.parser().merge(data, builder);
final DynamicMessage message = builder.build();
Expand All @@ -97,8 +119,8 @@ public ProducerRecord<byte[], byte[]> serialize(String topic,
public TopicMessageSchema getTopicSchema(String topic) {

final JsonSchema jsonSchema = schemaConverter.convert(
protobufSchemaPath.toUri(),
protobufSchema.toDescriptor()
protobufSchemaPath.toUri(),
getDescriptor(topic)
);
final MessageSchema keySchema = new MessageSchema()
.name(protobufSchema.fullName())
Expand Down
8 changes: 8 additions & 0 deletions kafka-ui-api/src/main/resources/application-local.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,14 @@ kafka:
# address: http://localhost:8083
# jmxPort: 9998
# read-only: true
# -
# name: localUsingProtobufFile
# bootstrapServers: localhost:9092
# protobufFile: messages.proto
# protobufMessageName: GenericMessage
# protobufMessageNameByTopic:
# input-topic: InputMessage
# output-topic: OutputMessage
admin-client-timeout: 5000
zookeeper:
connection-timeout: 1000
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package com.provectus.kafka.ui.serde;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.util.JsonFormat;
import com.provectus.kafka.ui.serde.schemaregistry.MessageFormat;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.utils.Bytes;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

class ProtobufFileRecordSerDeTest {

// Sample message of type `test.Person`
private static byte[] personMessage;
// Sample message of type `test.AddressBook`
private static byte[] addressBookMessage;
private static Path protobufSchemaPath;

@BeforeAll
static void setUp() throws URISyntaxException, IOException {
protobufSchemaPath = Paths.get(ProtobufFileRecordSerDeTest.class.getClassLoader()
.getResource("address-book.proto").toURI());
ProtobufSchema protobufSchema = new ProtobufSchema(Files.readString(protobufSchemaPath));

DynamicMessage.Builder builder = protobufSchema.newMessageBuilder("test.Person");
JsonFormat.parser().merge(
"{ \"name\": \"My Name\",\"id\": 101, \"email\": \"user1@example.com\" }", builder);
personMessage = builder.build().toByteArray();

builder = protobufSchema.newMessageBuilder("test.AddressBook");
JsonFormat.parser().merge(
"{\"version\": 1, \"people\": ["
+ "{ \"name\": \"My Name\",\"id\": 102, \"email\": \"user2@example.com\" }]}", builder);
addressBookMessage = builder.build().toByteArray();
}

@Test
void testDeserialize() throws IOException {
var messageNameMap = Map.of(
"topic1", "test.Person",
"topic2", "test.AddressBook");
var deserializer =
new ProtobufFileRecordSerDe(protobufSchemaPath, messageNameMap, null, new ObjectMapper());
var msg1 = deserializer
.deserialize(new ConsumerRecord<>("topic1", 1, 0, Bytes.wrap("key".getBytes()),
Bytes.wrap(personMessage)));
assertEquals(MessageFormat.PROTOBUF, msg1.getValueFormat());
assertTrue(msg1.getValue().contains("user1@example.com"));

var msg2 = deserializer
.deserialize(new ConsumerRecord<>("topic2", 1, 1, Bytes.wrap("key".getBytes()),
Bytes.wrap(addressBookMessage)));
assertTrue(msg2.getValue().contains("user2@example.com"));
}

@Test
void testNoDefaultMessageName() throws IOException {
// by default the first message type defined in proto definition is used
var deserializer =
new ProtobufFileRecordSerDe(protobufSchemaPath, Collections.emptyMap(), null,
new ObjectMapper());
var msg = deserializer
.deserialize(new ConsumerRecord<>("topic", 1, 0, Bytes.wrap("key".getBytes()),
Bytes.wrap(personMessage)));
assertTrue(msg.getValue().contains("user1@example.com"));
}

@Test
void testDefaultMessageName() throws IOException {
var messageNameMap = Map.of("topic1", "test.Person");
var deserializer =
new ProtobufFileRecordSerDe(protobufSchemaPath, messageNameMap, "test.AddressBook",
new ObjectMapper());
var msg = deserializer
.deserialize(new ConsumerRecord<>("a_random_topic", 1, 0, Bytes.wrap("key".getBytes()),
Bytes.wrap(addressBookMessage)));
assertTrue(msg.getValue().contains("user2@example.com"));
}
}
39 changes: 39 additions & 0 deletions kafka-ui-api/src/test/resources/address-book.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// [START declaration]
syntax = "proto3";
package test;

// [END declaration]

// [START java_declaration]
option java_multiple_files = true;
option java_package = "com.example.tutorial.protos";
option java_outer_classname = "AddressBookProtos";
// [END java_declaration]

// [START messages]
message Person {
string name = 1;
int32 id = 2; // Unique ID number for this person.
string email = 3;

enum PhoneType {
MOBILE = 0;
HOME = 1;
WORK = 2;
}

message PhoneNumber {
string number = 1;
PhoneType type = 2;
}

repeated PhoneNumber phones = 4;

}

// Our address book file is just one of these.
message AddressBook {
int32 version = 1;
repeated Person people = 2;
}
// [END messages]