diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/mapper/ClusterMapper.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/mapper/ClusterMapper.java index 7e963c64e46..85256c08ef5 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/mapper/ClusterMapper.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/mapper/ClusterMapper.java @@ -6,6 +6,9 @@ import org.mapstruct.Mapper; import org.mapstruct.Mapping; +import java.util.Map; +import java.util.stream.Collectors; + @Mapper(componentModel = "spring") public interface ClusterMapper { @@ -19,7 +22,13 @@ public interface ClusterMapper { ClusterMetrics toClusterMetrics(InternalClusterMetrics metrics); BrokerMetrics toBrokerMetrics(InternalBrokerMetrics metrics); Topic toTopic(InternalTopic topic); + Partition toPartition(InternalPartition topic); TopicDetails toTopicDetails(InternalTopic topic); TopicConfig toTopicConfig(InternalTopicConfig topic); Replica toReplica(InternalReplica replica); + + default java.util.List map(Map map) { + return map.values().stream().map(this::toPartition).collect(Collectors.toList()); + } + } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalClusterMetrics.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalClusterMetrics.java index 6603816d441..128316cda9e 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalClusterMetrics.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalClusterMetrics.java @@ -22,7 +22,7 @@ public class InternalClusterMetrics { private final int outOfSyncReplicasCount; private final Map bytesInPerSec; private final Map bytesOutPerSec; - private final int segmentCount; + private final long segmentCount; private final long segmentSize; private final Map internalBrokerMetrics; private final List metrics; diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalPartition.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalPartition.java index 21502f216a4..87395e71ca4 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalPartition.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalPartition.java @@ -15,4 +15,6 @@ public class InternalPartition { private final int replicasCount; private final long offsetMin; private final long offsetMax; + private final long segmentSize; + private final long segmentCount; } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalTopic.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalTopic.java index 164c1073250..36c0741b20e 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalTopic.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalTopic.java @@ -13,7 +13,7 @@ public class InternalTopic { private final String name; private final boolean internal; - private final List partitions; + private final Map partitions; private final List topicConfigs; private final int replicas; @@ -22,6 +22,6 @@ public class InternalTopic { private final int replicationFactor; private final int underReplicatedPartitions; private final long segmentSize; - private final int segmentCount; - private final Map partitionSegmentSize; + private final long segmentCount; +// private final Map partitionSegmentSize; } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java index 78b7c99b4f3..32e95bfe1b7 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java @@ -3,6 +3,7 @@ import com.provectus.kafka.ui.cluster.mapper.ClusterMapper; import com.provectus.kafka.ui.cluster.model.ClustersStorage; import com.provectus.kafka.ui.cluster.model.ConsumerPosition; +import com.provectus.kafka.ui.cluster.model.InternalTopic; import com.provectus.kafka.ui.cluster.model.KafkaCluster; import com.provectus.kafka.ui.cluster.util.ClusterUtil; import com.provectus.kafka.ui.kafka.KafkaService; @@ -83,10 +84,12 @@ public Optional> getTopicConfigs(String name, String topicName .map(t -> t.getTopicConfigs().stream().map(clusterMapper::toTopicConfig).collect(Collectors.toList())); } - public Mono createTopic(String name, Mono topicFormData) { - return clustersStorage.getClusterByName(name).map( - cluster -> kafkaService.createTopic(cluster, topicFormData) - ).orElse(Mono.empty()).map(clusterMapper::toTopic); + public Mono createTopic(String clusterName, Mono topicFormData) { + return clustersStorage.getClusterByName(clusterName).map(cluster -> + kafkaService.createTopic(cluster, topicFormData) + .doOnNext(t -> updateCluster(t, clusterName, cluster)) + .map(clusterMapper::toTopic) + ).orElse(Mono.empty()); } @SneakyThrows @@ -151,18 +154,15 @@ public Mono updateTopic(String clusterName, String topicName, Mono topicFormData .flatMap(t -> kafkaService.updateTopic(cl, topicName, t)) + .doOnNext(t -> updateCluster(t, clusterName, cl)) .map(clusterMapper::toTopic) - .flatMap(t -> updateCluster(t, clusterName, cl)) - ) - .orElse(Mono.empty()); + ).orElse(Mono.empty()); } - private Mono updateCluster(T topic, String clusterName, KafkaCluster cluster) { - return kafkaService.getUpdatedCluster(cluster) - .map(c -> { - clustersStorage.setKafkaCluster(clusterName, c); - return topic; - }); + private KafkaCluster updateCluster(InternalTopic topic, String clusterName, KafkaCluster cluster) { + final KafkaCluster updatedCluster = kafkaService.getUpdatedCluster(cluster, topic); + clustersStorage.setKafkaCluster(clusterName, updatedCluster); + return updatedCluster; } public Flux getMessages(String clusterName, String topicName, ConsumerPosition consumerPosition, String query, Integer limit) { diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ConsumingService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ConsumingService.java index e6491698ce9..652aaec04a1 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ConsumingService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ConsumingService.java @@ -1,25 +1,5 @@ package com.provectus.kafka.ui.cluster.service; -import lombok.RequiredArgsConstructor; -import lombok.extern.log4j.Log4j2; - -import java.time.Duration; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.function.Function; -import java.util.stream.Collectors; - -import org.apache.commons.lang3.StringUtils; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.consumer.OffsetAndTimestamp; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.utils.Bytes; -import org.springframework.stereotype.Service; - import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.provectus.kafka.ui.cluster.deserialization.DeserializationService; @@ -30,11 +10,26 @@ import com.provectus.kafka.ui.kafka.KafkaService; import com.provectus.kafka.ui.model.SeekType; import com.provectus.kafka.ui.model.TopicMessage; - +import lombok.RequiredArgsConstructor; +import lombok.extern.log4j.Log4j2; +import org.apache.commons.lang3.StringUtils; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.Bytes; +import org.springframework.stereotype.Service; import reactor.core.publisher.Flux; import reactor.core.publisher.FluxSink; import reactor.core.scheduler.Schedulers; +import java.time.Duration; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; + @Service @Log4j2 @RequiredArgsConstructor @@ -128,7 +123,7 @@ private List getRequestedPartitions() { return Optional.ofNullable(cluster.getTopics().get(topic)) .orElseThrow(() -> new IllegalArgumentException("Unknown topic: " + topic)) - .getPartitions().stream() + .getPartitions().values().stream() .filter(internalPartition -> partitionPositions.isEmpty() || partitionPositions.containsKey(internalPartition.getPartition())) .map(partitionInfo -> new TopicPartition(topic, partitionInfo.getPartition())) .collect(Collectors.toList()); diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/util/ClusterUtil.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/util/ClusterUtil.java index 59d18f622ba..9fa45a71dab 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/util/ClusterUtil.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/util/ClusterUtil.java @@ -129,7 +129,10 @@ public static InternalTopic mapToInternalTopic(TopicDescription topicDescription .mapToInt(InternalPartition::getReplicasCount) .sum(); - topic.partitions(partitions); + topic.partitions(partitions.stream().collect(Collectors.toMap( + InternalPartition::getPartition, + t -> t + ))); topic.replicas(replicasCount); topic.partitionCount(topicDescription.partitions().size()); topic.inSyncReplicas(inSyncReplicasCount); diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/kafka/KafkaService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/kafka/KafkaService.java index b5572bd41fe..38b69c9dcda 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/kafka/KafkaService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/kafka/KafkaService.java @@ -3,7 +3,10 @@ import com.provectus.kafka.ui.cluster.model.*; import com.provectus.kafka.ui.cluster.util.ClusterUtil; import com.provectus.kafka.ui.cluster.util.JmxClusterUtil; -import com.provectus.kafka.ui.model.*; +import com.provectus.kafka.ui.model.ConsumerGroup; +import com.provectus.kafka.ui.model.Metric; +import com.provectus.kafka.ui.model.ServerStatus; +import com.provectus.kafka.ui.model.TopicFormData; import com.provectus.kafka.ui.zookeeper.ZookeeperService; import lombok.RequiredArgsConstructor; import lombok.SneakyThrows; @@ -11,7 +14,6 @@ import org.apache.kafka.clients.admin.*; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.config.ConfigResource; @@ -19,8 +21,10 @@ import org.apache.kafka.common.utils.Bytes; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.util.function.Tuple2; +import reactor.util.function.Tuple3; import reactor.util.function.Tuples; import java.util.*; @@ -40,10 +44,15 @@ public class KafkaService { private final ZookeeperService zookeeperService; private final Map adminClientCache = new ConcurrentHashMap<>(); - private final Map> leadersCache = new ConcurrentHashMap<>(); private final JmxClusterUtil jmxClusterUtil; private final ClustersStorage clustersStorage; + public KafkaCluster getUpdatedCluster(KafkaCluster cluster, InternalTopic updatedTopic) { + final Map topics = new HashMap<>(cluster.getTopics()); + topics.put(updatedTopic.getName(), updatedTopic); + return cluster.toBuilder().topics(topics).build(); + } + @SneakyThrows public Mono getUpdatedCluster(KafkaCluster cluster) { return getOrCreateAdminClient(cluster) @@ -51,10 +60,8 @@ public Mono getUpdatedCluster(KafkaCluster cluster) { ac -> getClusterMetrics(ac.getAdminClient()) .flatMap(i -> fillJmxMetrics(i, cluster.getName(), ac.getAdminClient())) .flatMap( clusterMetrics -> - getTopicsData(ac.getAdminClient()).flatMap( topics -> - loadTopicsConfig(ac.getAdminClient(), topics.stream().map(InternalTopic::getName).collect(Collectors.toList())) - .map( configs -> mergeWithConfigs(topics, configs)) - .flatMap(it -> updateSegmentMetrics(ac.getAdminClient(), clusterMetrics, it)) + getTopicsData(ac.getAdminClient()).flatMap( it -> + updateSegmentMetrics(ac.getAdminClient(), clusterMetrics, it) ).map( segmentSizeDto -> buildFromData(cluster, segmentSizeDto)) ) ).onErrorResume( @@ -116,8 +123,8 @@ private InternalClusterMetrics collectTopicsMetrics(Map to underReplicatedPartitions += topic.getUnderReplicatedPartitions(); inSyncReplicasCount += topic.getInSyncReplicas(); outOfSyncReplicasCount += (topic.getReplicas() - topic.getInSyncReplicas()); - onlinePartitionCount += topic.getPartitions().stream().mapToInt(s -> s.getLeader() == null ? 0 : 1).sum(); - offlinePartitionCount += topic.getPartitions().stream().mapToInt(s -> s.getLeader() != null ? 0 : 1).sum(); + onlinePartitionCount += topic.getPartitions().values().stream().mapToInt(s -> s.getLeader() == null ? 0 : 1).sum(); + offlinePartitionCount += topic.getPartitions().values().stream().mapToInt(s -> s.getLeader() != null ? 0 : 1).sum(); } return InternalClusterMetrics.builder() @@ -142,21 +149,20 @@ private Map mergeWithConfigs(List topics, @SneakyThrows private Mono> getTopicsData(AdminClient adminClient) { return ClusterUtil.toMono(adminClient.listTopics(LIST_TOPICS_OPTIONS).names()) - .flatMap(topics -> ClusterUtil.toMono(adminClient.describeTopics(topics).all())) - .map(topic -> { - var leadersMap = topic.values().stream() - .flatMap(t -> t.partitions().stream() - .flatMap(t1 -> { - Map result = new HashMap<>(); - result.put(new TopicPartition(t.name(), t1.partition()), t1.leader().id()); - return Stream.of(result); - })); - leadersCache.put(adminClient, ClusterUtil.toSingleMap(leadersMap)); - return topic; - }) - .map( m -> m.values().stream().map(ClusterUtil::mapToInternalTopic).collect(Collectors.toList())); + .flatMap(topics -> getTopicsData(adminClient, topics).collectList()); + } + + private Flux getTopicsData(AdminClient adminClient, Collection topics) { + final Mono>> configsMono = loadTopicsConfig(adminClient, topics); + + return ClusterUtil.toMono(adminClient.describeTopics(topics).all()).map( + m -> m.values().stream().map(ClusterUtil::mapToInternalTopic).collect(Collectors.toList()) + ).flatMap( internalTopics -> configsMono.map(configs -> + mergeWithConfigs(internalTopics, configs).values() + )).flatMapMany(Flux::fromIterable); } + private Mono getClusterMetrics(AdminClient client) { return ClusterUtil.toMono(client.describeCluster().nodes()) .flatMap(brokers -> @@ -182,12 +188,8 @@ public Mono createTopic(AdminClient adminClient, Mono topicData); - }).flatMap(topicData -> { - var tdw = adminClient.describeTopics(Collections.singletonList(topicData.getName())); - return getTopicDescription(tdw.values().get(topicData.getName()), topicData.getName()); - }) + }).flatMap(topicData -> getTopicsData(adminClient, Collections.singleton(topicData.getName())).next()) .switchIfEmpty(Mono.error(new RuntimeException("Can't find created topic"))) - .map(ClusterUtil::mapToInternalTopic) .flatMap( t -> loadTopicsConfig(adminClient, Collections.singletonList(t.getName())) .map( c -> mergeWithConfigs(Collections.singletonList(t), c)) @@ -210,18 +212,8 @@ public Mono createAdminClient(KafkaCluster kafkaCluster) { return ExtendedAdminClient.extendedAdminClient(adminClient); } - - - private Mono getTopicDescription(KafkaFuture entry, String topicName) { - return ClusterUtil.toMono(entry) - .onErrorResume(e -> { - log.error("Can't get topic with name: " + topicName); - return Mono.empty(); - }); - } - @SneakyThrows - private Mono>> loadTopicsConfig(AdminClient adminClient, List topicNames) { + private Mono>> loadTopicsConfig(AdminClient adminClient, Collection topicNames) { List resources = topicNames.stream() .map(topicName -> new ConfigResource(ConfigResource.Type.TOPIC, topicName)) .collect(Collectors.toList()); @@ -279,8 +271,6 @@ public Mono updateTopic(KafkaCluster cluster, String topicName, T }); } - - private Mono getUpdatedTopic (ExtendedAdminClient ac, String topicName) { return getTopicsData(ac.getAdminClient()) .map(s -> s.stream() @@ -302,45 +292,78 @@ private Mono alterConfig(TopicFormData topicFormData, ConfigResource top } - private Mono updateSegmentMetrics(AdminClient ac, InternalClusterMetrics clusterMetrics, Map internalTopic) { - return ClusterUtil.toMono(ac.describeTopics(internalTopic.keySet()).all()).flatMap(topic -> + private InternalTopic mergeWithStats(InternalTopic topic, Map topics, Map partitions) { + final LongSummaryStatistics stats = topics.get(topic.getName()); + + return topic.toBuilder() + .segmentSize(stats.getSum()) + .segmentCount(stats.getCount()) + .partitions( + topic.getPartitions().entrySet().stream().map(e -> + Tuples.of(e.getKey(), mergeWithStats(topic.getName(), e.getValue(), partitions)) + ).collect(Collectors.toMap( + Tuple2::getT1, + Tuple2::getT2 + )) + ).build(); + } + + private InternalPartition mergeWithStats(String topic, InternalPartition partition, Map partitions) { + final LongSummaryStatistics stats = partitions.get(new TopicPartition(topic, partition.getPartition())); + return partition.toBuilder() + .segmentSize(stats.getSum()) + .segmentCount(stats.getCount()) + .build(); + } + + private Mono updateSegmentMetrics(AdminClient ac, InternalClusterMetrics clusterMetrics, List internalTopics) { + List names = internalTopics.stream().map(InternalTopic::getName).collect(Collectors.toList()); + return ClusterUtil.toMono(ac.describeTopics(names).all()).flatMap(topic -> ClusterUtil.toMono(ac.describeLogDirs(clusterMetrics.getInternalBrokerMetrics().keySet()).all()) .map(log -> { - var partitionSegmentSizeStream = leadersCache.get(ac).entrySet().stream() - .flatMap(l -> { - Map result = new HashMap<>(); - result.put(l.getKey(), log.get(l.getValue()).values().stream().mapToLong(e -> e.replicaInfos.get(l.getKey()).size).sum()); - return Stream.of(result); - }); - var partitionSegmentSize = ClusterUtil.toSingleMap(partitionSegmentSizeStream); - - var resultTopicMetricsStream = internalTopic.keySet().stream().flatMap(k -> { - Map result = new HashMap<>(); - result.put(k, internalTopic.get(k).toBuilder() - .segmentSize(partitionSegmentSize.entrySet().stream().filter(e -> e.getKey().topic().equals(k)).mapToLong(Map.Entry::getValue).sum()) - .partitionSegmentSize(partitionSegmentSize.entrySet().stream().filter(e -> e.getKey().topic().equals(k)).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))).build()); - return Stream.of(result); - }); - - var resultBrokerMetricsStream = clusterMetrics.getInternalBrokerMetrics().entrySet().stream().map( - e -> { - var brokerSegmentSize = log.get(e.getKey()).values().stream() - .mapToLong(v -> v.replicaInfos.values().stream() - .mapToLong(r -> r.size).sum()).sum(); - InternalBrokerMetrics tempBrokerMetrics = e.getValue().toBuilder().segmentSize(brokerSegmentSize).build(); - return Collections.singletonMap(e.getKey(), tempBrokerMetrics); - }); - - var resultClusterMetrics = clusterMetrics.toBuilder() - .internalBrokerMetrics(ClusterUtil.toSingleMap(resultBrokerMetricsStream)) - .segmentSize(partitionSegmentSize.values().stream().reduce(Long::sum).orElseThrow()) - .build(); + + final List> topicPartitions = + log.entrySet().stream().flatMap(b -> + b.getValue().entrySet().stream().flatMap(topicMap -> + topicMap.getValue().replicaInfos.entrySet().stream() + .map(e -> Tuples.of(b.getKey(), e.getKey(), e.getValue().size)) + ) + ).collect(Collectors.toList()); + + final Map partitionStats = topicPartitions.stream().collect( + Collectors.groupingBy( + Tuple2::getT2, + Collectors.summarizingLong(Tuple3::getT3) + ) + ); + + final Map topicStats = topicPartitions.stream().collect( + Collectors.groupingBy( + t -> t.getT2().topic(), + Collectors.summarizingLong(Tuple3::getT3) + ) + ); + + final LongSummaryStatistics summary = topicPartitions.stream().collect(Collectors.summarizingLong(Tuple3::getT3)); + + + final Map resultTopics = internalTopics.stream().map(e -> + Tuples.of(e.getName(), mergeWithStats(e, topicStats, partitionStats)) + ).collect(Collectors.toMap( + Tuple2::getT1, + Tuple2::getT2 + )); return InternalSegmentSizeDto.builder() - .clusterMetricsWithSegmentSize(resultClusterMetrics) - .internalTopicWithSegmentSize(ClusterUtil.toSingleMap(resultTopicMetricsStream)).build(); + .clusterMetricsWithSegmentSize( + clusterMetrics.toBuilder() + .segmentSize(summary.getSum()) + .segmentCount(summary.getCount()) + .build() + ) + .internalTopicWithSegmentSize(resultTopics).build(); }) - ); + ); } public List getJmxMetric(String clusterName, Node node) { @@ -378,12 +401,12 @@ private InternalClusterMetrics calculateClusterMetrics(InternalClusterMetrics in .collect(Collectors.toList())).build(); } - public List getTopicPartitions(KafkaCluster c, InternalTopic topic ) { - var tps = topic.getPartitions().stream() + public Map getTopicPartitions(KafkaCluster c, InternalTopic topic ) { + var tps = topic.getPartitions().values().stream() .map(t -> new TopicPartition(topic.getName(), t.getPartition())) .collect(Collectors.toList()); Map partitions = - topic.getPartitions().stream().collect(Collectors.toMap( + topic.getPartitions().values().stream().collect(Collectors.toMap( InternalPartition::getPartition, tp -> tp )); @@ -397,9 +420,12 @@ public List getTopicPartitions(KafkaCluster c, InternalTopic .offsetMin(Optional.ofNullable(earliest.get(tp)).orElse(0L)) .offsetMax(Optional.ofNullable(latest.get(tp)).orElse(0L)) .build() - ).collect(Collectors.toList()); + ).collect(Collectors.toMap( + InternalPartition::getPartition, + tp -> tp + )); } catch (Exception e) { - return Collections.emptyList(); + return Collections.emptyMap(); } } }