From 27467e45c702ac1e6bee43b92604b40b6c122d1e Mon Sep 17 00:00:00 2001 From: German Osin Date: Wed, 2 Dec 2020 21:47:57 +0300 Subject: [PATCH] Added custom properties & fixed reset policy (#134) * Added custom properties support --- .../kafka/ui/cluster/config/ClustersProperties.java | 2 ++ .../SchemaRegistryRecordDeserializer.java | 3 ++- .../kafka/ui/cluster/mapper/ClusterMapper.java | 10 ++++++++++ .../kafka/ui/cluster/model/KafkaCluster.java | 2 ++ .../kafka/ui/cluster/service/ClusterService.java | 1 + .../kafka/ui/cluster/service/ConsumingService.java | 13 +++++++------ .../com/provectus/kafka/ui/kafka/KafkaService.java | 3 +++ kafka-ui-api/src/main/resources/application.yml | 4 ++-- 8 files changed, 29 insertions(+), 9 deletions(-) diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/config/ClustersProperties.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/config/ClustersProperties.java index b140de24d83..093bfefef80 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/config/ClustersProperties.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/config/ClustersProperties.java @@ -1,5 +1,6 @@ package com.provectus.kafka.ui.cluster.config; +import java.util.Properties; import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Configuration; @@ -24,5 +25,6 @@ public static class Cluster { String protobufFile; String protobufMessageName; int jmxPort; + Properties properties; } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/deserialization/SchemaRegistryRecordDeserializer.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/deserialization/SchemaRegistryRecordDeserializer.java index e8024b3017e..a64aded30a7 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/deserialization/SchemaRegistryRecordDeserializer.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/deserialization/SchemaRegistryRecordDeserializer.java @@ -97,7 +97,8 @@ private MessageFormat detectFormat(ConsumerRecord record) { final List versions = schemaRegistryClient.getAllVersions(schemaName); if (!versions.isEmpty()) { final Integer version = versions.iterator().next(); - final Schema schema = schemaRegistryClient.getByVersion(record.topic(), version, false); + final String subjectName = String.format(cluster.getSchemaNameTemplate(), record.topic()); + final Schema schema = schemaRegistryClient.getByVersion(subjectName, version, false); if (schema.getSchemaType().equals(MessageFormat.PROTOBUF.name())) { try { protobufDeserializer.deserialize(record.topic(), record.value().get()); diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/mapper/ClusterMapper.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/mapper/ClusterMapper.java index 0c66055e44d..b4f6aff75e8 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/mapper/ClusterMapper.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/mapper/ClusterMapper.java @@ -3,6 +3,7 @@ import com.provectus.kafka.ui.cluster.config.ClustersProperties; import com.provectus.kafka.ui.cluster.model.*; import com.provectus.kafka.ui.model.*; +import java.util.Properties; import org.mapstruct.Mapper; import org.mapstruct.Mapping; @@ -22,6 +23,7 @@ public interface ClusterMapper { @Mapping(target = "bytesOutPerSec", source = "metrics.bytesOutPerSec", qualifiedByName = "sumMetrics") Cluster toCluster(KafkaCluster cluster); @Mapping(target = "protobufFile", source = "protobufFile", qualifiedByName="resolvePath") + @Mapping(target = "properties", source = "properties", qualifiedByName="setProperties") KafkaCluster toKafkaCluster(ClustersProperties.Cluster clusterProperties); @Mapping(target = "diskUsage", source = "internalBrokerDiskUsage", qualifiedByName="mapDiskUsage") ClusterStats toClusterStats(InternalClusterMetrics metrics); @@ -73,4 +75,12 @@ default Path resolvePath(String path) { } } + default Properties setProperties(Properties properties) { + Properties copy = new Properties(); + if (properties!=null) { + copy.putAll(properties); + } + return copy; + } + } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/KafkaCluster.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/KafkaCluster.java index 03a514fe842..a4a833fddfb 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/KafkaCluster.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/KafkaCluster.java @@ -1,6 +1,7 @@ package com.provectus.kafka.ui.cluster.model; import com.provectus.kafka.ui.model.ServerStatus; +import java.util.Properties; import lombok.Builder; import lombok.Data; @@ -24,4 +25,5 @@ public class KafkaCluster { private final Throwable lastZookeeperException; private final Path protobufFile; private final String protobufMessageName; + private final Properties properties; } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java index 95bf52b7195..e18c6328943 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java @@ -128,6 +128,7 @@ public Mono> groupMetadata(KafkaCluster c public Map topicPartitionsEndOffsets(KafkaCluster cluster, Collection topicPartitions) { Properties properties = new Properties(); + properties.putAll(cluster.getProperties()); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers()); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ConsumingService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ConsumingService.java index 652aaec04a1..39edf074356 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ConsumingService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ConsumingService.java @@ -88,7 +88,7 @@ private boolean treeContainsValue(JsonNode tree, String query) { @RequiredArgsConstructor private static class RecordEmitter { - private static final int MAX_POLLS_COUNT = 30; + private static final int MAX_EMPTY_POLLS_COUNT = 3; private static final Duration POLL_TIMEOUT_MS = Duration.ofMillis(1000L); private final KafkaService kafkaService; @@ -98,15 +98,16 @@ private static class RecordEmitter { public void emit(FluxSink> sink) { try (KafkaConsumer consumer = kafkaService.createConsumer(cluster)) { -// assignPartitions(consumer); -// seekOffsets(consumer); assignAndSeek(consumer); - int pollsCount = 0; - while (!sink.isCancelled() && ++pollsCount < MAX_POLLS_COUNT) { + int emptyPollsCount = 0; + log.info("assignment: {}", consumer.assignment()); + while (!sink.isCancelled()) { ConsumerRecords records = consumer.poll(POLL_TIMEOUT_MS); log.info("{} records polled", records.count()); - if (records.count() == 0) { + if (records.count() == 0 && emptyPollsCount < MAX_EMPTY_POLLS_COUNT) { break; + } else { + emptyPollsCount++; } records.iterator() .forEachRemaining(sink::next); diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/kafka/KafkaService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/kafka/KafkaService.java index 49f8281b394..82ff081ada0 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/kafka/KafkaService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/kafka/KafkaService.java @@ -209,6 +209,7 @@ public Mono getOrCreateAdminClient(KafkaCluster cluster) { public Mono createAdminClient(KafkaCluster kafkaCluster) { Properties properties = new Properties(); + properties.putAll(kafkaCluster.getProperties()); properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaCluster.getBootstrapServers()); properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, clientTimeout); AdminClient adminClient = AdminClient.create(properties); @@ -245,10 +246,12 @@ public Mono> getConsumerGroups(KafkaCluster cluster) { public KafkaConsumer createConsumer(KafkaCluster cluster) { Properties props = new Properties(); + props.putAll(cluster.getProperties()); props.put(ConsumerConfig.CLIENT_ID_CONFIG, "kafka-ui"); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers()); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); return new KafkaConsumer<>(props); } diff --git a/kafka-ui-api/src/main/resources/application.yml b/kafka-ui-api/src/main/resources/application.yml index 26c8c43ea8a..0bb3c197c5d 100644 --- a/kafka-ui-api/src/main/resources/application.yml +++ b/kafka-ui-api/src/main/resources/application.yml @@ -1,8 +1,8 @@ kafka: clusters: - name: local - bootstrapServers: b-1.sandbox.kbqc5i.c3.kafka.eu-central-1.amazonaws.com:9092 - zookeeper: z-2.sandbox.kbqc5i.c3.kafka.eu-central-1.amazonaws.com:2181 + bootstrapServers: localhost:9092 + zookeeper: localhost:2181 schemaRegistry: http://localhost:8081 admin-client-timeout: 5000 zookeeper: