diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/deserialization/DeserializationService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/deserialization/DeserializationService.java index 4811d960317..1fdeddca247 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/deserialization/DeserializationService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/deserialization/DeserializationService.java @@ -1,25 +1,24 @@ package com.provectus.kafka.ui.cluster.deserialization; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.provectus.kafka.ui.cluster.model.ClustersStorage; +import com.provectus.kafka.ui.cluster.model.KafkaCluster; import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Component; +import javax.annotation.PostConstruct; import java.util.Map; import java.util.stream.Collectors; -import javax.annotation.PostConstruct; - -import org.apache.commons.lang3.StringUtils; -import org.springframework.stereotype.Component; - -import com.provectus.kafka.ui.cluster.model.ClustersStorage; -import com.provectus.kafka.ui.cluster.model.KafkaCluster; - @Component @RequiredArgsConstructor public class DeserializationService { private final ClustersStorage clustersStorage; + private final ObjectMapper objectMapper; private Map clusterDeserializers; + @PostConstruct public void init() { this.clusterDeserializers = clustersStorage.getKafkaClusters().stream() @@ -30,11 +29,7 @@ public void init() { } private RecordDeserializer createRecordDeserializerForCluster(KafkaCluster cluster) { - if (StringUtils.isEmpty(cluster.getSchemaRegistry())) { - return new SimpleRecordDeserializer(); - } else { - return new SchemaRegistryRecordDeserializer(cluster); - } + return new SchemaRegistryRecordDeserializer(cluster, objectMapper); } public RecordDeserializer getRecordDeserializerForCluster(KafkaCluster cluster) { diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/deserialization/SchemaRegistryRecordDeserializer.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/deserialization/SchemaRegistryRecordDeserializer.java index 06b2bf10471..5b3393a39d7 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/deserialization/SchemaRegistryRecordDeserializer.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/deserialization/SchemaRegistryRecordDeserializer.java @@ -1,55 +1,56 @@ package com.provectus.kafka.ui.cluster.deserialization; -import lombok.RequiredArgsConstructor; -import lombok.extern.log4j.Log4j2; - -import java.io.IOException; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -import org.apache.avro.generic.GenericRecord; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.utils.Bytes; - -import io.confluent.kafka.schemaregistry.SchemaProvider; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.provectus.kafka.ui.cluster.model.KafkaCluster; import io.confluent.kafka.schemaregistry.avro.AvroSchemaProvider; import io.confluent.kafka.schemaregistry.avro.AvroSchemaUtils; import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; import io.confluent.kafka.serializers.KafkaAvroDeserializer; +import lombok.extern.log4j.Log4j2; +import org.apache.avro.generic.GenericRecord; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.utils.Bytes; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.provectus.kafka.ui.cluster.model.KafkaCluster; +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; @Log4j2 -@RequiredArgsConstructor public class SchemaRegistryRecordDeserializer implements RecordDeserializer { private final static int CLIENT_IDENTITY_MAP_CAPACITY = 100; private final KafkaCluster cluster; - private final SchemaRegistryClient schemaRegistryClient; - private KafkaAvroDeserializer avroDeserializer; - private ObjectMapper objectMapper; - private StringDeserializer stringDeserializer; + private final SchemaRegistryClient schemaRegistryClient; + private final KafkaAvroDeserializer avroDeserializer; + private final ObjectMapper objectMapper; + private final StringDeserializer stringDeserializer; private final Map topicFormatMap = new ConcurrentHashMap<>(); - public SchemaRegistryRecordDeserializer(KafkaCluster cluster) { + public SchemaRegistryRecordDeserializer(KafkaCluster cluster, ObjectMapper objectMapper) { this.cluster = cluster; - - List endpoints = Collections.singletonList(cluster.getSchemaRegistry()); - List providers = Collections.singletonList(new AvroSchemaProvider()); - this.schemaRegistryClient = new CachedSchemaRegistryClient(endpoints, CLIENT_IDENTITY_MAP_CAPACITY, providers, Collections.emptyMap()); - - this.avroDeserializer = new KafkaAvroDeserializer(schemaRegistryClient); - this.objectMapper = new ObjectMapper(); + this.objectMapper = objectMapper; + + this.schemaRegistryClient = Optional.ofNullable(cluster.getSchemaRegistry()).map(e -> + new CachedSchemaRegistryClient( + Collections.singletonList(e), + CLIENT_IDENTITY_MAP_CAPACITY, + Collections.singletonList(new AvroSchemaProvider()), + Collections.emptyMap() + ) + ).orElse(null); + + this.avroDeserializer = Optional.ofNullable(this.schemaRegistryClient) + .map(KafkaAvroDeserializer::new) + .orElse(null); this.stringDeserializer = new StringDeserializer(); } @@ -83,11 +84,13 @@ private MessageFormat getMessageFormat(ConsumerRecord record) { private MessageFormat detectFormat(ConsumerRecord record) { String avroSchema = String.format(cluster.getSchemaNameTemplate(), record.topic()); - try { - schemaRegistryClient.getAllVersions(avroSchema); - return MessageFormat.AVRO; - } catch (RestClientException | IOException e) { - log.info("Failed to get Avro schema for topic {}", record.topic()); + if (schemaRegistryClient != null) { + try { + schemaRegistryClient.getAllVersions(avroSchema); + return MessageFormat.AVRO; + } catch (RestClientException | IOException e) { + log.info("Failed to get Avro schema for topic {}", record.topic()); + } } try { @@ -102,7 +105,7 @@ private MessageFormat detectFormat(ConsumerRecord record) { private Object parseAvroRecord(ConsumerRecord record) throws IOException { String topic = record.topic(); - if (record.value()!=null) { + if (record.value()!=null && avroDeserializer !=null) { byte[] valueBytes = record.value().get(); GenericRecord avroRecord = (GenericRecord) avroDeserializer.deserialize(topic, valueBytes); byte[] bytes = AvroSchemaUtils.toJson(avroRecord);