Skip to content

Commit

Permalink
Feature/10 record deserialization (provectus#57)
Browse files Browse the repository at this point in the history
* Record deserialization

* Check avro schema for topic

* Fix sdp docker config

* Code cleanup

* Code review changes

* Move Avro schema name template to cluster-level config
  • Loading branch information
apetrovs committed Jun 11, 2020
1 parent 790e485 commit cb2a0ee
Show file tree
Hide file tree
Showing 15 changed files with 283 additions and 6 deletions.
21 changes: 20 additions & 1 deletion docker/kafka-clusters-only.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -95,4 +95,23 @@ services:
KAFKA_BROKER_ID: ignored
KAFKA_ZOOKEEPER_CONNECT: ignored
networks:
- default
- default

schemaregistry0:
image: confluentinc/cp-schema-registry:5.1.0
depends_on:
- zookeeper0
- kafka0
- kafka01
ports:
- 8085:8085
environment:
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka0:9092,PLAINTEXT://kafka01:9092
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper0:2183
SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL: PLAINTEXT
SCHEMA_REGISTRY_HOST_NAME: schemaregistry
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8085

SCHEMA_REGISTRY_SCHEMA_REGISTRY_INTER_INSTANCE_PROTOCOL: "http"
SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: INFO
SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _schemas
17 changes: 17 additions & 0 deletions docker/kafka-ui.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ services:
- zookeeper1
- kafka0
- kafka1
- schemaregistry0
command: [ "java", "-jar", "kafka-ui-api.jar", "--spring.profiles.active=sdp"]

zookeeper0:
Expand Down Expand Up @@ -53,3 +54,19 @@ services:
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
JMX_PORT: 9997
KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=kafka1 -Dcom.sun.management.jmxremote.rmi.port=9997

schemaregistry0:
image: confluentinc/cp-schema-registry:5.1.0
depends_on:
- zookeeper0
- kafka0
environment:
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka0:29092
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper0:2181
SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL: PLAINTEXT
SCHEMA_REGISTRY_HOST_NAME: schemaregistry0
SCHEMA_REGISTRY_LISTENERS: http://schemaregistry0:8085

SCHEMA_REGISTRY_SCHEMA_REGISTRY_INTER_INSTANCE_PROTOCOL: "http"
SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: INFO
SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _schemas
15 changes: 15 additions & 0 deletions kafka-ui-api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,21 @@
<artifactId>mapstruct</artifactId>
<version>${org.mapstruct.version}</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-schema-registry-client</artifactId>
<version>${confluent.version}</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>${confluent.version}</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>${avro.version}</version>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,7 @@ public static class Cluster {
String name;
String bootstrapServers;
String zookeeper;
String schemaRegistry;
String schemaNameTemplate = "%s-value";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package com.provectus.kafka.ui.cluster.deserialization;

import lombok.RequiredArgsConstructor;

import java.util.Map;
import java.util.stream.Collectors;

import javax.annotation.PostConstruct;

import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;

import com.provectus.kafka.ui.cluster.model.ClustersStorage;
import com.provectus.kafka.ui.cluster.model.KafkaCluster;

@Component
@RequiredArgsConstructor
public class DeserializationService {

private final ClustersStorage clustersStorage;
private Map<String, RecordDeserializer> clusterDeserializers;

@PostConstruct
public void init() {
this.clusterDeserializers = clustersStorage.getKafkaClusters().stream()
.collect(Collectors.toMap(
KafkaCluster::getName,
this::createRecordDeserializerForCluster
));
}

private RecordDeserializer createRecordDeserializerForCluster(KafkaCluster cluster) {
if (StringUtils.isEmpty(cluster.getSchemaRegistry())) {
return new SimpleRecordDeserializer();
} else {
return new SchemaRegistryRecordDeserializer(cluster);
}
}

public RecordDeserializer getRecordDeserializerForCluster(KafkaCluster cluster) {
return clusterDeserializers.get(cluster.getName());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.provectus.kafka.ui.cluster.deserialization;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.utils.Bytes;

public interface RecordDeserializer {

Object deserialize(ConsumerRecord<Bytes, Bytes> record);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package com.provectus.kafka.ui.cluster.deserialization;

import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.Bytes;

import io.confluent.kafka.schemaregistry.SchemaProvider;
import io.confluent.kafka.schemaregistry.avro.AvroSchemaProvider;
import io.confluent.kafka.schemaregistry.avro.AvroSchemaUtils;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.provectus.kafka.ui.cluster.model.KafkaCluster;

@Log4j2
@RequiredArgsConstructor
public class SchemaRegistryRecordDeserializer implements RecordDeserializer {

private final static int CLIENT_IDENTITY_MAP_CAPACITY = 100;

private final KafkaCluster cluster;
private final SchemaRegistryClient schemaRegistryClient;
private KafkaAvroDeserializer avroDeserializer;
private ObjectMapper objectMapper;
private StringDeserializer stringDeserializer;

private final Map<String, MessageFormat> topicFormatMap = new ConcurrentHashMap<>();

public SchemaRegistryRecordDeserializer(KafkaCluster cluster) {
this.cluster = cluster;

List<String> endpoints = Collections.singletonList(cluster.getSchemaRegistry());
List<SchemaProvider> providers = Collections.singletonList(new AvroSchemaProvider());
this.schemaRegistryClient = new CachedSchemaRegistryClient(endpoints, CLIENT_IDENTITY_MAP_CAPACITY, providers, Collections.emptyMap());

this.avroDeserializer = new KafkaAvroDeserializer(schemaRegistryClient);
this.objectMapper = new ObjectMapper();
this.stringDeserializer = new StringDeserializer();
}

public Object deserialize(ConsumerRecord<Bytes, Bytes> record) {
MessageFormat format = getMessageFormat(record);

try {
Object parsedValue;
switch (format) {
case AVRO:
parsedValue = parseAvroRecord(record);
break;
case JSON:
parsedValue = parseJsonRecord(record);
break;
case STRING:
parsedValue = parseStringRecord(record);
break;
default:
throw new IllegalArgumentException("Unknown message format " + format + " for topic " + record.topic());
}
return parsedValue;
} catch (IOException e) {
throw new RuntimeException("Failed to parse record from topic " + record.topic(), e);
}
}

private MessageFormat getMessageFormat(ConsumerRecord<Bytes, Bytes> record) {
return topicFormatMap.computeIfAbsent(record.topic(), k -> detectFormat(record));
}

private MessageFormat detectFormat(ConsumerRecord<Bytes, Bytes> record) {
String avroSchema = String.format(cluster.getSchemaNameTemplate(), record.topic());
try {
schemaRegistryClient.getAllVersions(avroSchema);
return MessageFormat.AVRO;
} catch (RestClientException | IOException e) {
log.info("Failed to get Avro schema for topic {}", record.topic());
}

try {
parseJsonRecord(record);
return MessageFormat.JSON;
} catch (IOException e) {
log.info("Failed to parse json from topic {}", record.topic());
}

return MessageFormat.STRING;
}

private Object parseAvroRecord(ConsumerRecord<Bytes, Bytes> record) throws IOException {
String topic = record.topic();
byte[] valueBytes = record.value().get();
GenericRecord avroRecord = (GenericRecord) avroDeserializer.deserialize(topic, valueBytes);
byte[] bytes = AvroSchemaUtils.toJson(avroRecord);
return parseJson(bytes);
}

private Object parseJsonRecord(ConsumerRecord<Bytes, Bytes> record) throws IOException {
byte[] valueBytes = record.value().get();
return parseJson(valueBytes);
}

private Object parseJson(byte[] bytes) throws IOException {
return objectMapper.readValue(bytes, new TypeReference<Map<String, Object>>() {
});
}

private Object parseStringRecord(ConsumerRecord<Bytes, Bytes> record) {
String topic = record.topic();
byte[] valueBytes = record.value().get();
return stringDeserializer.deserialize(topic, valueBytes);
}

public enum MessageFormat {
AVRO,
JSON,
STRING
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.provectus.kafka.ui.cluster.deserialization;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.Bytes;

public class SimpleRecordDeserializer implements RecordDeserializer {

private final StringDeserializer stringDeserializer = new StringDeserializer();

@Override
public Object deserialize(ConsumerRecord<Bytes, Bytes> record) {
return stringDeserializer.deserialize(record.topic(), record.value().get());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@ public class KafkaCluster {
private final String jmxPort;
private final String bootstrapServers;
private final String zookeeper;
private final String schemaRegistry;
private final String schemaNameTemplate;
private final ServerStatus status;
private final ServerStatus zookeeperStatus;
private final InternalClusterMetrics metrics;
private final Map<String, InternalTopic> topics;
private final Throwable lastKafkaException;
private final Throwable lastZookeeperException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import org.apache.kafka.common.utils.Bytes;
import org.springframework.stereotype.Service;

import com.provectus.kafka.ui.cluster.deserialization.DeserializationService;
import com.provectus.kafka.ui.cluster.deserialization.RecordDeserializer;
import com.provectus.kafka.ui.cluster.model.ConsumerPosition;
import com.provectus.kafka.ui.cluster.model.KafkaCluster;
import com.provectus.kafka.ui.cluster.util.ClusterUtil;
Expand All @@ -37,15 +39,17 @@ public class ConsumingService {
private static final int MAX_POLLS_COUNT = 30;

private final KafkaService kafkaService;
private final DeserializationService deserializationService;

public Flux<TopicMessage> loadMessages(KafkaCluster cluster, String topic, ConsumerPosition consumerPosition, Integer limit) {
int recordsLimit = Optional.ofNullable(limit)
.map(s -> Math.min(s, MAX_RECORD_LIMIT))
.orElse(DEFAULT_RECORD_LIMIT);
RecordEmitter emitter = new RecordEmitter(kafkaService, cluster, topic, consumerPosition);
RecordDeserializer recordDeserializer = deserializationService.getRecordDeserializerForCluster(cluster);
return Flux.create(emitter::emit)
.subscribeOn(Schedulers.boundedElastic())
.map(ClusterUtil::mapToTopicMessage)
.map(r -> ClusterUtil.mapToTopicMessage(r, recordDeserializer))
.limitRequest(recordsLimit);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.provectus.kafka.ui.cluster.util;

import com.provectus.kafka.ui.cluster.model.*;
import com.provectus.kafka.ui.cluster.deserialization.RecordDeserializer;
import com.provectus.kafka.ui.model.*;
import lombok.extern.slf4j.Slf4j;
import com.provectus.kafka.ui.model.TopicMessage;
Expand Down Expand Up @@ -149,7 +150,7 @@ public static int convertToIntServerStatus(ServerStatus serverStatus) {
return serverStatus.equals(ServerStatus.ONLINE) ? 1 : 0;
}

public static TopicMessage mapToTopicMessage(ConsumerRecord<Bytes, Bytes> consumerRecord) {
public static TopicMessage mapToTopicMessage(ConsumerRecord<Bytes, Bytes> consumerRecord, RecordDeserializer recordDeserializer) {
OffsetDateTime timestamp = OffsetDateTime.ofInstant(Instant.ofEpochMilli(consumerRecord.timestamp()), UTC_ZONE_ID);
TopicMessage.TimestampTypeEnum timestampType = mapToTimestampType(consumerRecord.timestampType());
Map<String, String> headers = new HashMap<>();
Expand All @@ -166,7 +167,8 @@ public static TopicMessage mapToTopicMessage(ConsumerRecord<Bytes, Bytes> consum
topicMessage.setKey(consumerRecord.key().toString());
}
topicMessage.setHeaders(headers);
topicMessage.setContent(consumerRecord.value().toString());
Object parsedValue = recordDeserializer.deserialize(consumerRecord);
topicMessage.setContent(parsedValue);

return topicMessage;
}
Expand Down
2 changes: 2 additions & 0 deletions kafka-ui-api/src/main/resources/application-local.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ kafka:
name: local
bootstrapServers: localhost:29091
zookeeper: localhost:2183
schemaRegistry: http://localhost:8085
# schemaNameTemplate: "%s-value"
-
name: secondLocal
bootstrapServers: localhost:29092
Expand Down
1 change: 1 addition & 0 deletions kafka-ui-api/src/main/resources/application-sdp.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ kafka:
name: local
bootstrapServers: kafka0:29092
zookeeper: zookeeper0:2181
schemaRegistry: http://schemaregistry0:8085
-
name: secondLocal
zookeeper: zookeeper1:2181
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ components:
additionalProperties:
type: string
content:
type: string
type: object
required:
- partition
- offset
Expand Down
Loading

0 comments on commit cb2a0ee

Please sign in to comment.