Skip to content

Commit

Permalink
Added common deserializer (#109)
Browse files Browse the repository at this point in the history
  • Loading branch information
germanosin committed Nov 2, 2020
1 parent e9a6b52 commit 88cc301
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 51 deletions.
Original file line number Diff line number Diff line change
@@ -1,25 +1,24 @@
package com.provectus.kafka.ui.cluster.deserialization;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.provectus.kafka.ui.cluster.model.ClustersStorage;
import com.provectus.kafka.ui.cluster.model.KafkaCluster;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.Map;
import java.util.stream.Collectors;

import javax.annotation.PostConstruct;

import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;

import com.provectus.kafka.ui.cluster.model.ClustersStorage;
import com.provectus.kafka.ui.cluster.model.KafkaCluster;

@Component
@RequiredArgsConstructor
public class DeserializationService {

private final ClustersStorage clustersStorage;
private final ObjectMapper objectMapper;
private Map<String, RecordDeserializer> clusterDeserializers;


@PostConstruct
public void init() {
this.clusterDeserializers = clustersStorage.getKafkaClusters().stream()
Expand All @@ -30,11 +29,7 @@ public void init() {
}

private RecordDeserializer createRecordDeserializerForCluster(KafkaCluster cluster) {
if (StringUtils.isEmpty(cluster.getSchemaRegistry())) {
return new SimpleRecordDeserializer();
} else {
return new SchemaRegistryRecordDeserializer(cluster);
}
return new SchemaRegistryRecordDeserializer(cluster, objectMapper);
}

public RecordDeserializer getRecordDeserializerForCluster(KafkaCluster cluster) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,55 +1,56 @@
package com.provectus.kafka.ui.cluster.deserialization;

import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.Bytes;

import io.confluent.kafka.schemaregistry.SchemaProvider;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.provectus.kafka.ui.cluster.model.KafkaCluster;
import io.confluent.kafka.schemaregistry.avro.AvroSchemaProvider;
import io.confluent.kafka.schemaregistry.avro.AvroSchemaUtils;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import lombok.extern.log4j.Log4j2;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.Bytes;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.provectus.kafka.ui.cluster.model.KafkaCluster;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;

@Log4j2
@RequiredArgsConstructor
public class SchemaRegistryRecordDeserializer implements RecordDeserializer {

private final static int CLIENT_IDENTITY_MAP_CAPACITY = 100;

private final KafkaCluster cluster;
private final SchemaRegistryClient schemaRegistryClient;
private KafkaAvroDeserializer avroDeserializer;
private ObjectMapper objectMapper;
private StringDeserializer stringDeserializer;
private final SchemaRegistryClient schemaRegistryClient;
private final KafkaAvroDeserializer avroDeserializer;
private final ObjectMapper objectMapper;
private final StringDeserializer stringDeserializer;

private final Map<String, MessageFormat> topicFormatMap = new ConcurrentHashMap<>();

public SchemaRegistryRecordDeserializer(KafkaCluster cluster) {
public SchemaRegistryRecordDeserializer(KafkaCluster cluster, ObjectMapper objectMapper) {
this.cluster = cluster;

List<String> endpoints = Collections.singletonList(cluster.getSchemaRegistry());
List<SchemaProvider> providers = Collections.singletonList(new AvroSchemaProvider());
this.schemaRegistryClient = new CachedSchemaRegistryClient(endpoints, CLIENT_IDENTITY_MAP_CAPACITY, providers, Collections.emptyMap());

this.avroDeserializer = new KafkaAvroDeserializer(schemaRegistryClient);
this.objectMapper = new ObjectMapper();
this.objectMapper = objectMapper;

this.schemaRegistryClient = Optional.ofNullable(cluster.getSchemaRegistry()).map(e ->
new CachedSchemaRegistryClient(
Collections.singletonList(e),
CLIENT_IDENTITY_MAP_CAPACITY,
Collections.singletonList(new AvroSchemaProvider()),
Collections.emptyMap()
)
).orElse(null);

this.avroDeserializer = Optional.ofNullable(this.schemaRegistryClient)
.map(KafkaAvroDeserializer::new)
.orElse(null);
this.stringDeserializer = new StringDeserializer();
}

Expand Down Expand Up @@ -83,11 +84,13 @@ private MessageFormat getMessageFormat(ConsumerRecord<Bytes, Bytes> record) {

private MessageFormat detectFormat(ConsumerRecord<Bytes, Bytes> record) {
String avroSchema = String.format(cluster.getSchemaNameTemplate(), record.topic());
try {
schemaRegistryClient.getAllVersions(avroSchema);
return MessageFormat.AVRO;
} catch (RestClientException | IOException e) {
log.info("Failed to get Avro schema for topic {}", record.topic());
if (schemaRegistryClient != null) {
try {
schemaRegistryClient.getAllVersions(avroSchema);
return MessageFormat.AVRO;
} catch (RestClientException | IOException e) {
log.info("Failed to get Avro schema for topic {}", record.topic());
}
}

try {
Expand All @@ -102,7 +105,7 @@ private MessageFormat detectFormat(ConsumerRecord<Bytes, Bytes> record) {

private Object parseAvroRecord(ConsumerRecord<Bytes, Bytes> record) throws IOException {
String topic = record.topic();
if (record.value()!=null) {
if (record.value()!=null && avroDeserializer !=null) {
byte[] valueBytes = record.value().get();
GenericRecord avroRecord = (GenericRecord) avroDeserializer.deserialize(topic, valueBytes);
byte[] bytes = AvroSchemaUtils.toJson(avroRecord);
Expand Down

0 comments on commit 88cc301

Please sign in to comment.