Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/10 record deserialization #57

Merged
merged 6 commits into from
Jun 11, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion docker/kafka-clusters-only.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -95,4 +95,23 @@ services:
KAFKA_BROKER_ID: ignored
KAFKA_ZOOKEEPER_CONNECT: ignored
networks:
- default
- default

schemaregistry0:
image: confluentinc/cp-schema-registry:5.1.0
depends_on:
- zookeeper0
- kafka0
- kafka01
ports:
- 8085:8085
environment:
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka0:9092,PLAINTEXT://kafka01:9092
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper0:2183
SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL: PLAINTEXT
SCHEMA_REGISTRY_HOST_NAME: schemaregistry
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8085

SCHEMA_REGISTRY_SCHEMA_REGISTRY_INTER_INSTANCE_PROTOCOL: "http"
SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: INFO
SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _schemas
17 changes: 17 additions & 0 deletions docker/kafka-ui.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ services:
- zookeeper1
- kafka0
- kafka1
- schemaregistry0
command: [ "java", "-jar", "kafka-ui-api.jar", "--spring.profiles.active=sdp"]

zookeeper0:
Expand Down Expand Up @@ -53,3 +54,19 @@ services:
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
JMX_PORT: 9997
KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=kafka1 -Dcom.sun.management.jmxremote.rmi.port=9997

schemaregistry0:
image: confluentinc/cp-schema-registry:5.1.0
depends_on:
- zookeeper0
- kafka0
environment:
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka0:29092
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper0:2181
SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL: PLAINTEXT
SCHEMA_REGISTRY_HOST_NAME: schemaregistry0
SCHEMA_REGISTRY_LISTENERS: http://schemaregistry0:8085

SCHEMA_REGISTRY_SCHEMA_REGISTRY_INTER_INSTANCE_PROTOCOL: "http"
SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: INFO
SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _schemas
15 changes: 15 additions & 0 deletions kafka-ui-api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,21 @@
<artifactId>mapstruct</artifactId>
<version>${org.mapstruct.version}</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-schema-registry-client</artifactId>
<version>${confluent.version}</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>${confluent.version}</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>${avro.version}</version>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,6 @@ public static class Cluster {
String name;
String bootstrapServers;
String zookeeper;
String schemaRegistry;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package com.provectus.kafka.ui.cluster.deserialization;

import lombok.RequiredArgsConstructor;

import java.util.Map;
import java.util.stream.Collectors;

import javax.annotation.PostConstruct;

import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;

import com.provectus.kafka.ui.cluster.model.ClustersStorage;
import com.provectus.kafka.ui.cluster.model.KafkaCluster;

@Component
@RequiredArgsConstructor
public class DeserializationService {

private final ClustersStorage clustersStorage;
private Map<String, RecordDeserializer> clusterDeserializers;

@PostConstruct
public void init() {
this.clusterDeserializers = clustersStorage.getKafkaClusters().stream()
.collect(Collectors.toMap(
KafkaCluster::getName,
this::createRecordDeserializerForCluster
));
}

private RecordDeserializer createRecordDeserializerForCluster(KafkaCluster cluster) {
if (StringUtils.isEmpty(cluster.getSchemaRegistry())) {
return new SimpleRecordDeserializer();
} else {
return new SchemaRegistryRecordDeserializer(cluster.getSchemaRegistry());
}
}

public RecordDeserializer getRecordDeserializerForCluster(KafkaCluster cluster) {
return clusterDeserializers.get(cluster.getName());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.provectus.kafka.ui.cluster.deserialization;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.utils.Bytes;

public interface RecordDeserializer {

Object deserialize(ConsumerRecord<Bytes, Bytes> record);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package com.provectus.kafka.ui.cluster.deserialization;

import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.Bytes;

import io.confluent.kafka.schemaregistry.SchemaProvider;
import io.confluent.kafka.schemaregistry.avro.AvroSchemaProvider;
import io.confluent.kafka.schemaregistry.avro.AvroSchemaUtils;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;

@Log4j2
@RequiredArgsConstructor
public class SchemaRegistryRecordDeserializer implements RecordDeserializer {

private final static String AVRO_SCHEMA_TEMPLATE = "%s-value";

private SchemaRegistryClient schemaRegistryClient;
private KafkaAvroDeserializer avroDeserializer;
private ObjectMapper objectMapper;
private StringDeserializer stringDeserializer;

private final Map<String, MessageFormat> topicFormatMap = new ConcurrentHashMap<>();

public SchemaRegistryRecordDeserializer(String schemaRegistryUrl) {
List<String> endpoints = Collections.singletonList(schemaRegistryUrl);
List<SchemaProvider> providers = Collections.singletonList(new AvroSchemaProvider());
this. schemaRegistryClient = new CachedSchemaRegistryClient(endpoints, 100, providers, Collections.emptyMap());
germanosin marked this conversation as resolved.
Show resolved Hide resolved

this.avroDeserializer = new KafkaAvroDeserializer(schemaRegistryClient);
this.objectMapper = new ObjectMapper();
this.stringDeserializer = new StringDeserializer();
}

public Object deserialize(ConsumerRecord<Bytes, Bytes> record) {
MessageFormat format = getMessageFormat(record);

try {
Object parsedValue;
switch (format) {
case AVRO:
parsedValue = parseAvroRecord(record);
break;
case JSON:
parsedValue = parseJsonRecord(record);
break;
case STRING:
parsedValue = parseStringRecord(record);
break;
default:
throw new IllegalArgumentException("Unknown message format " + format + " for topic " + record.topic());
}
return parsedValue;
} catch (IOException e) {
throw new RuntimeException("Failed to parse record from topic " + record.topic(), e);
}
}

private MessageFormat getMessageFormat(ConsumerRecord<Bytes, Bytes> record) {
return topicFormatMap.computeIfAbsent(record.topic(), k -> detectFormat(record));
}

private MessageFormat detectFormat(ConsumerRecord<Bytes, Bytes> record) {
String avroSchema = String.format(AVRO_SCHEMA_TEMPLATE, record.topic());
germanosin marked this conversation as resolved.
Show resolved Hide resolved
try {
schemaRegistryClient.getAllVersions(avroSchema);
return MessageFormat.AVRO;
} catch (RestClientException | IOException e) {
log.info("Failed to get Avro schema for topic {}", record.topic());
}

try {
parseJsonRecord(record);
return MessageFormat.JSON;
} catch (IOException e) {
log.info("Failed to parse json from topic {}", record.topic());
germanosin marked this conversation as resolved.
Show resolved Hide resolved
}

return MessageFormat.STRING;
}

private Object parseAvroRecord(ConsumerRecord<Bytes, Bytes> record) throws IOException {
String topic = record.topic();
byte[] valueBytes = record.value().get();
GenericRecord avroRecord = (GenericRecord) avroDeserializer.deserialize(topic, valueBytes);
byte[] bytes = AvroSchemaUtils.toJson(avroRecord);
return parseJson(bytes);
}

private Object parseJsonRecord(ConsumerRecord<Bytes, Bytes> record) throws IOException {
byte[] valueBytes = record.value().get();
return parseJson(valueBytes);
}

private Object parseJson(byte[] bytes) throws IOException {
return objectMapper.readValue(bytes, new TypeReference<Map<String, Object>>() {
});
}

private Object parseStringRecord(ConsumerRecord<Bytes, Bytes> record) {
String topic = record.topic();
byte[] valueBytes = record.value().get();
return stringDeserializer.deserialize(topic, valueBytes);
}

public enum MessageFormat {
AVRO,
JSON,
STRING
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.provectus.kafka.ui.cluster.deserialization;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.Bytes;

public class SimpleRecordDeserializer implements RecordDeserializer {

private final StringDeserializer stringDeserializer = new StringDeserializer();

@Override
public Object deserialize(ConsumerRecord<Bytes, Bytes> record) {
return stringDeserializer.deserialize(record.topic(), record.value().get());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ public class KafkaCluster {
private final String jmxPort;
private final String bootstrapServers;
private final String zookeeper;
private final String schemaRegistry;
private final ServerStatus status;
private final ServerStatus zookeeperStatus;
private final InternalClusterMetrics metrics;
private final Map<String, InternalTopic> topics;
private final Throwable lastKafkaException;
private final Throwable lastZookeeperException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import org.apache.kafka.common.utils.Bytes;
import org.springframework.stereotype.Service;

import com.provectus.kafka.ui.cluster.deserialization.DeserializationService;
import com.provectus.kafka.ui.cluster.deserialization.RecordDeserializer;
import com.provectus.kafka.ui.cluster.model.ConsumerPosition;
import com.provectus.kafka.ui.cluster.model.KafkaCluster;
import com.provectus.kafka.ui.cluster.util.ClusterUtil;
Expand All @@ -37,15 +39,17 @@ public class ConsumingService {
private static final int MAX_POLLS_COUNT = 30;

private final KafkaService kafkaService;
private final DeserializationService deserializationService;

public Flux<TopicMessage> loadMessages(KafkaCluster cluster, String topic, ConsumerPosition consumerPosition, Integer limit) {
int recordsLimit = Optional.ofNullable(limit)
.map(s -> Math.min(s, MAX_RECORD_LIMIT))
.orElse(DEFAULT_RECORD_LIMIT);
RecordEmitter emitter = new RecordEmitter(kafkaService, cluster, topic, consumerPosition);
RecordDeserializer recordDeserializer = deserializationService.getRecordDeserializerForCluster(cluster);
return Flux.create(emitter::emit)
.subscribeOn(Schedulers.boundedElastic())
.map(ClusterUtil::mapToTopicMessage)
.map(r -> ClusterUtil.mapToTopicMessage(r, recordDeserializer))
.limitRequest(recordsLimit);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.provectus.kafka.ui.cluster.util;

import com.provectus.kafka.ui.cluster.model.*;
import com.provectus.kafka.ui.cluster.deserialization.RecordDeserializer;
import com.provectus.kafka.ui.model.*;
import lombok.extern.slf4j.Slf4j;
import com.provectus.kafka.ui.model.TopicMessage;
Expand Down Expand Up @@ -149,7 +150,7 @@ public static int convertToIntServerStatus(ServerStatus serverStatus) {
return serverStatus.equals(ServerStatus.ONLINE) ? 1 : 0;
}

public static TopicMessage mapToTopicMessage(ConsumerRecord<Bytes, Bytes> consumerRecord) {
public static TopicMessage mapToTopicMessage(ConsumerRecord<Bytes, Bytes> consumerRecord, RecordDeserializer recordDeserializer) {
OffsetDateTime timestamp = OffsetDateTime.ofInstant(Instant.ofEpochMilli(consumerRecord.timestamp()), UTC_ZONE_ID);
TopicMessage.TimestampTypeEnum timestampType = mapToTimestampType(consumerRecord.timestampType());
Map<String, String> headers = new HashMap<>();
Expand All @@ -166,7 +167,8 @@ public static TopicMessage mapToTopicMessage(ConsumerRecord<Bytes, Bytes> consum
topicMessage.setKey(consumerRecord.key().toString());
}
topicMessage.setHeaders(headers);
topicMessage.setContent(consumerRecord.value().toString());
Object parsedValue = recordDeserializer.deserialize(consumerRecord);
topicMessage.setContent(parsedValue);

return topicMessage;
}
Expand Down
1 change: 1 addition & 0 deletions kafka-ui-api/src/main/resources/application-local.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ kafka:
name: local
bootstrapServers: localhost:29091
zookeeper: localhost:2183
schemaRegistry: http://localhost:8085
-
name: secondLocal
bootstrapServers: localhost:29092
Expand Down
1 change: 1 addition & 0 deletions kafka-ui-api/src/main/resources/application-sdp.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ kafka:
name: local
bootstrapServers: kafka0:29092
zookeeper: zookeeper0:2181
schemaRegistry: http://schemaregistry0:8085
-
name: secondLocal
zookeeper: zookeeper1:2181
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ components:
additionalProperties:
type: string
content:
type: string
type: object
required:
- partition
- offset
Expand Down
16 changes: 16 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,24 @@
<swagger-annotations.version>1.6.0</swagger-annotations.version>
<springdoc-openapi-webflux-ui.version>1.2.32</springdoc-openapi-webflux-ui.version>
<kafka.version>2.4.1</kafka.version>
<avro.version>1.9.2</avro.version>
<confluent.version>5.5.0</confluent.version>
</properties>

<repositories>
<repository>
<id>confluent</id>
<url>https://packages.confluent.io/maven/</url>
</repository>
</repositories>

<pluginRepositories>
<pluginRepository>
<id>confluent</id>
<url>https://packages.confluent.io/maven/</url>
</pluginRepository>
</pluginRepositories>

<groupId>com.provectus</groupId>
<artifactId>kafka-ui</artifactId>
<version>0.0.1-SNAPSHOT</version>
Expand Down