diff --git a/docker/kafka-clusters-only.yaml b/docker/kafka-clusters-only.yaml index ab58d527652..55780f896a8 100644 --- a/docker/kafka-clusters-only.yaml +++ b/docker/kafka-clusters-only.yaml @@ -23,39 +23,35 @@ services: KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka0:9092,PLAINTEXT_HOST://localhost:29091 #,PLAIN://kafka0:29090 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT #,PLAIN:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2 + JMX_PORT: 9997 + KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=localhost -Dcom.sun.management.jmxremote.rmi.port=9997 + + kafka01: + image: confluentinc/cp-kafka:5.1.0 + depends_on: + - zookeeper0 + ports: + - 29093:29093 + - 9999:9999 + environment: + KAFKA_BROKER_ID: 2 + KAFKA_ZOOKEEPER_CONNECT: zookeeper0:2183 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka01:9092,PLAINTEXT_HOST://localhost:29093,PLAIN://kafka0:29090 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,PLAIN:PLAINTEXT + KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2 JMX_PORT: 9997 KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=localhost -Dcom.sun.management.jmxremote.rmi.port=9997 -# -# kafka01: -# image: confluentinc/cp-kafka:5.1.0 -# depends_on: -# - zookeeper0 -# ports: -# - 29093:29093 -# - 9999:9999 -# environment: -# KAFKA_BROKER_ID: 2 -# KAFKA_ZOOKEEPER_CONNECT: zookeeper0:2183 -# KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka01:9092,PLAINTEXT_HOST://localhost:29093,PLAIN://kafka0:29090 -# KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,PLAIN:PLAINTEXT -# KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT -# KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2 -# JMX_PORT: 9997 -# KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=localhost -Dcom.sun.management.jmxremote.rmi.port=9997 kafka-init-topics0: image: confluentinc/cp-kafka:5.1.0 depends_on: - kafka0 - - kafka1 - command: - "kafka-console-producer --broker-list kafka1:9092 --topic secondUsers && \ - This is message 1 && \ - This is message 2 && \ - This is message 3 && \ - Message 4 && \ - Message 5" + command: "bash -c 'echo Waiting for Kafka to be ready... && \ + cub kafka-ready -b kafka0:9092 1 20 && \ + kafka-topics --create --topic users --partitions 2 --replication-factor 2 --if-not-exists --zookeeper zookeeper0:2183 && \ + kafka-topics --create --topic messages --partitions 3 --replication-factor 2 --if-not-exists --zookeeper zookeeper0:2183'" environment: KAFKA_BROKER_ID: ignored KAFKA_ZOOKEEPER_CONNECT: ignored @@ -101,21 +97,21 @@ services: networks: - default -# schemaregistry0: -# image: confluentinc/cp-schema-registry:5.1.0 -# depends_on: -# - zookeeper0 -# - kafka0 -# - kafka01 -# ports: -# - 8085:8085 -# environment: -# SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka0:9092,PLAINTEXT://kafka01:9092 -# SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper0:2183 -# SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL: PLAINTEXT -# SCHEMA_REGISTRY_HOST_NAME: schemaregistry -# SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8085 -# -# SCHEMA_REGISTRY_SCHEMA_REGISTRY_INTER_INSTANCE_PROTOCOL: "http" -# SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: INFO -# SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _schemas + schemaregistry0: + image: confluentinc/cp-schema-registry:5.1.0 + depends_on: + - zookeeper0 + - kafka0 + - kafka01 + ports: + - 8085:8085 + environment: + SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka0:9092,PLAINTEXT://kafka01:9092 + SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper0:2183 + SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL: PLAINTEXT + SCHEMA_REGISTRY_HOST_NAME: schemaregistry + SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8085 + + SCHEMA_REGISTRY_SCHEMA_REGISTRY_INTER_INSTANCE_PROTOCOL: "http" + SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: INFO + SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _schemas diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/config/Config.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/config/Config.java index 83c86553c96..5b6a623b006 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/config/Config.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/config/Config.java @@ -1,5 +1,6 @@ package com.provectus.kafka.ui.cluster.config; +import com.provectus.kafka.ui.cluster.util.JmxMetricsNames; import com.provectus.kafka.ui.cluster.util.JmxPoolFactory; import org.apache.commons.pool2.KeyedObjectPool; import org.apache.commons.pool2.impl.GenericKeyedObjectPool; @@ -9,6 +10,9 @@ import org.springframework.jmx.export.MBeanExporter; import javax.management.remote.JMXConnector; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; @Configuration public class Config { @@ -35,4 +39,9 @@ public MBeanExporter exporter() exporter.setExcludedBeans("pool"); return exporter; } + + @Bean + public List jmxMetricsNames() { + return Stream.of(JmxMetricsNames.values()).map(Enum::name).collect(Collectors.toList()); + } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/mapper/ClusterMapper.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/mapper/ClusterMapper.java index 3aa197a1524..8414fe42f7d 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/mapper/ClusterMapper.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/mapper/ClusterMapper.java @@ -9,24 +9,16 @@ import org.mapstruct.Mapper; import org.mapstruct.Mapping; -import java.math.BigDecimal; - @Mapper(componentModel = "spring") public interface ClusterMapper { - KafkaCluster toKafkaCluster(ClustersProperties.Cluster clusterProperties); - @Mapping(target = "brokerCount", source = "metrics.brokerCount") @Mapping(target = "onlinePartitionCount", source = "metrics.onlinePartitionCount") @Mapping(target = "topicCount", source = "metrics.topicCount") - @Mapping(target = "bytesInPerSec", source = "metrics.bytesInPerSec") - @Mapping(target = "bytesOutPerSec", source = "metrics.bytesOutPerSec") + @Mapping(target = "metrics", source = "metrics.metrics") Cluster toCluster(KafkaCluster cluster); - default BigDecimal map (Number number) { - return new BigDecimal(number.toString()); - } - + KafkaCluster toKafkaCluster(ClustersProperties.Cluster clusterProperties); BrokersMetrics toBrokerMetrics(InternalClusterMetrics metrics); Topic toTopic(InternalTopic topic); TopicDetails toTopicDetails(InternalTopic topic); diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalBrokerMetrics.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalBrokerMetrics.java index 734ef31b07c..5350aa3c24d 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalBrokerMetrics.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalBrokerMetrics.java @@ -1,10 +1,14 @@ package com.provectus.kafka.ui.cluster.model; +import com.provectus.kafka.ui.model.Metric; import lombok.Builder; import lombok.Data; +import java.util.List; + @Data @Builder(toBuilder = true) public class InternalBrokerMetrics { private final Long segmentSize; + private final List jmxMetrics; } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalClusterMetrics.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalClusterMetrics.java index 8b1fa5e341d..6603816d441 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalClusterMetrics.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalClusterMetrics.java @@ -1,9 +1,10 @@ package com.provectus.kafka.ui.cluster.model; +import com.provectus.kafka.ui.model.Metric; import lombok.Builder; import lombok.Data; -import java.math.BigDecimal; +import java.util.List; import java.util.Map; @@ -24,5 +25,6 @@ public class InternalClusterMetrics { private final int segmentCount; private final long segmentSize; private final Map internalBrokerMetrics; + private final List metrics; private final int zooKeeperStatus; } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalTopic.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalTopic.java index 7967f178abe..40deb0d6d9b 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalTopic.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalTopic.java @@ -22,7 +22,6 @@ public class InternalTopic { private final int inSyncReplicas; private final int replicationFactor; private final int underReplicatedPartitions; - //TODO: find way to fill private final long segmentSize; private final int segmentCount; private final Map partitionSegmentSize; diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/MetricDto.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/MetricDto.java new file mode 100644 index 00000000000..44b5b896e8f --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/MetricDto.java @@ -0,0 +1,14 @@ +package com.provectus.kafka.ui.cluster.model; + +import lombok.AllArgsConstructor; +import lombok.Getter; + +import java.math.BigDecimal; + +@Getter +@AllArgsConstructor +public class MetricDto { + private String canonicalName; + private String metricName; + private BigDecimal value; +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java index 6107839322e..508247ef8e2 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java @@ -38,15 +38,20 @@ public List getClusters() { .collect(Collectors.toList()); } - public Optional getBrokersMetrics(String name) { - return clustersStorage.getClusterByName(name) + public Mono getBrokersMetrics(String name, Integer id) { + return Mono.justOrEmpty(clustersStorage.getClusterByName(name) .map(KafkaCluster::getMetrics) - .map(clusterMapper::toBrokerMetrics); + .map(s -> { + var brokerMetrics = clusterMapper.toBrokerMetrics(s); + brokerMetrics.setMetrics(s.getInternalBrokerMetrics().get(id).getJmxMetrics()); + brokerMetrics.setSegmentZise(Long.valueOf(s.getSegmentSize()).intValue()); + return brokerMetrics; + })); } public List getTopics(String name) { return clustersStorage.getClusterByName(name) - .map( c -> + .map(c -> c.getTopics().values().stream() .map(clusterMapper::toTopic) .collect(Collectors.toList()) @@ -127,6 +132,7 @@ public Flux getBrokers (String clusterName) { .map(n -> n.stream().map(node -> { Broker broker = new Broker(); broker.setId(node.idString()); + broker.setHost(node.host()); return broker; }).collect(Collectors.toList()))) .flatMapMany(Flux::fromIterable); @@ -154,6 +160,5 @@ public Flux getMessages(String clusterName, String topicName, Cons return clustersStorage.getClusterByName(clusterName) .map(c -> consumingService.loadMessages(c, topicName, consumerPosition, query, limit)) .orElse(Flux.empty()); - } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/util/ClusterUtil.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/util/ClusterUtil.java index 1b014544bfa..fc593330f10 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/util/ClusterUtil.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/util/ClusterUtil.java @@ -1,7 +1,7 @@ package com.provectus.kafka.ui.cluster.util; -import com.provectus.kafka.ui.cluster.model.*; import com.provectus.kafka.ui.cluster.deserialization.RecordDeserializer; +import com.provectus.kafka.ui.cluster.model.*; import com.provectus.kafka.ui.model.*; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.admin.*; @@ -28,10 +28,6 @@ @Slf4j public class ClusterUtil { - - - - private static final String CLUSTER_VERSION_PARAM_KEY = "inter.broker.protocol.version"; private static final ZoneId UTC_ZONE_ID = ZoneId.of("UTC"); @@ -56,7 +52,7 @@ public static Mono toMono(KafkaFuture future, String topicName) { })); } - public static ConsumerGroup convertToConsumerGroup(ConsumerGroupDescription c, KafkaCluster cluster) { + public static ConsumerGroup convertToConsumerGroup(ConsumerGroupDescription c) { ConsumerGroup consumerGroup = new ConsumerGroup(); consumerGroup.setConsumerGroupId(c.groupId()); consumerGroup.setNumConsumers(c.members().size()); diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/util/JmxClusterUtil.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/util/JmxClusterUtil.java index 224e917a4d6..8554a712c61 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/util/JmxClusterUtil.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/util/JmxClusterUtil.java @@ -1,5 +1,8 @@ package com.provectus.kafka.ui.cluster.util; +import com.provectus.kafka.ui.cluster.model.InternalClusterMetrics; +import com.provectus.kafka.ui.cluster.model.MetricDto; +import com.provectus.kafka.ui.model.Metric; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.commons.pool2.KeyedObjectPool; @@ -10,10 +13,12 @@ import java.io.IOException; import java.math.BigDecimal; import java.net.MalformedURLException; -import java.util.Arrays; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; @Component @Slf4j @@ -21,31 +26,50 @@ public class JmxClusterUtil { private final KeyedObjectPool pool; + private final List jmxMetricsNames; private static final String JMX_URL = "service:jmx:rmi:///jndi/rmi://"; private static final String JMX_SERVICE_TYPE = "jmxrmi"; + private static final String KAFKA_SERVER_PARAM = "kafka.server"; + private static final String NAME_METRIC_FIELD = "name="; - public static final String BYTES_IN_PER_SEC = "BytesInPerSec"; - public static final String BYTES_OUT_PER_SEC = "BytesOutPerSec"; - private static final String BYTES_IN_PER_SEC_MBEAN_OBJECT_NAME = "kafka.server:type=BrokerTopicMetrics,name=" + BYTES_IN_PER_SEC; - private static final String BYTES_OUT_PER_SEC_MBEAN_OBJECT_NAME = "kafka.server:type=BrokerTopicMetrics,name=" + BYTES_OUT_PER_SEC; - - private static final List attrNames = Arrays.asList("OneMinuteRate", "FiveMinuteRate", "FifteenMinuteRate"); - - public Map getJmxTrafficMetrics(int jmxPort, String jmxHost, String metricName) { + public List getJmxMetrics(int jmxPort, String jmxHost) { String jmxUrl = JMX_URL + jmxHost + ":" + jmxPort + "/" + JMX_SERVICE_TYPE; - Map result = new HashMap<>(); + List result = new ArrayList<>(); JMXConnector srv = null; try { srv = pool.borrowObject(jmxUrl); MBeanServerConnection msc = srv.getMBeanServerConnection(); - ObjectName name = metricName.equals(BYTES_IN_PER_SEC) ? new ObjectName(BYTES_IN_PER_SEC_MBEAN_OBJECT_NAME) : - new ObjectName(BYTES_OUT_PER_SEC_MBEAN_OBJECT_NAME); - for (String attrName : attrNames) { - Number value = (Number) msc.getAttribute(name, attrName); - result.put(attrName, value instanceof Double ? BigDecimal.valueOf((Double) value) : Integer.valueOf(value.toString())); - } + var jmxMetrics = msc.queryNames(null, null).stream().filter(q -> q.getCanonicalName().startsWith(KAFKA_SERVER_PARAM)).collect(Collectors.toList()); + for (ObjectName jmxMetric : jmxMetrics) { + Metric metric = new Metric(); + metric.setCanonicalName(jmxMetric.getCanonicalName()); + metric.setValue(getJmxMetric(jmxMetric.getCanonicalName(), msc, srv, jmxUrl)); + result.add(metric); + }; pool.returnObject(jmxUrl, srv); + } catch (IOException ioe) { + log.error("Cannot get jmxMetricsNames, {}", jmxUrl, ioe); + closeConnectionExceptionally(jmxUrl, srv); + } catch (Exception e) { + log.error("Cannot get JmxConnection from pool, {}", jmxUrl, e); + closeConnectionExceptionally(jmxUrl, srv); + } + return result; + } + + private Map getJmxMetric(String canonicalName, MBeanServerConnection msc, JMXConnector srv, String jmxUrl) { + Map resultAttr = new HashMap<>(); + try { + ObjectName name = new ObjectName(canonicalName); + var attrNames = msc.getMBeanInfo(name).getAttributes(); + for (MBeanAttributeInfo attrName : attrNames) { + var value = msc.getAttribute(name, attrName.getName()); + if (value instanceof Number) { + if (!(value instanceof Double) || !((Double) value).isInfinite()) + resultAttr.put(attrName.getName(), new BigDecimal(value.toString())); + } + } } catch (MalformedURLException url) { log.error("Cannot create JmxServiceUrl from {}", jmxUrl); closeConnectionExceptionally(jmxUrl, srv); @@ -62,7 +86,7 @@ public Map getJmxTrafficMetrics(int jmxPort, String jmxHost, Str log.error("Error while retrieving connection {} from pool", jmxUrl); closeConnectionExceptionally(jmxUrl, srv); } - return result; + return resultAttr; } private void closeConnectionExceptionally(String url, JMXConnector srv) { @@ -72,4 +96,35 @@ private void closeConnectionExceptionally(String url, JMXConnector srv) { log.error("Cannot invalidate object in pool, {}", url); } } + + public List convertToMetricDto(InternalClusterMetrics internalClusterMetrics) { + return internalClusterMetrics.getInternalBrokerMetrics().values().stream() + .map(c -> + c.getJmxMetrics().stream() + .filter(j -> isSameMetric(j.getCanonicalName())) + .map(j -> j.getValue().entrySet().stream() + .map(e -> new MetricDto(j.getCanonicalName(), e.getKey(), e.getValue())))) + .flatMap(Function.identity()).flatMap(Function.identity()).collect(Collectors.toList()); + } + + public Metric reduceJmxMetrics (Metric metric1, Metric metric2) { + var result = new Metric(); + Map jmx1 = new HashMap<>(metric1.getValue()); + Map jmx2 = new HashMap<>(metric2.getValue()); + jmx1.forEach((k, v) -> jmx2.merge(k, v, BigDecimal::add)); + result.setCanonicalName(metric1.getCanonicalName()); + result.setValue(jmx2); + return result; + } + + private boolean isSameMetric (String metric) { + if (metric.contains(NAME_METRIC_FIELD)) { + int beginIndex = metric.indexOf(NAME_METRIC_FIELD); + int endIndex = metric.indexOf(',', beginIndex); + endIndex = endIndex < 0 ? metric.length() - 1 : endIndex; + return jmxMetricsNames.contains(metric.substring(beginIndex + 5, endIndex)); + } else { + return false; + } + } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/util/JmxMetricsNames.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/util/JmxMetricsNames.java new file mode 100644 index 00000000000..62197dc60c3 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/util/JmxMetricsNames.java @@ -0,0 +1,31 @@ +package com.provectus.kafka.ui.cluster.util; + +public enum JmxMetricsNames { + MessagesInPerSec, + BytesInPerSec, + ReplicationBytesInPerSec, + RequestsPerSec, + ErrorsPerSec, + MessageConversionsPerSec, + BytesOutPerSec, + ReplicationBytesOutPerSec, + NoKeyCompactedTopicRecordsPerSec, + InvalidMagicNumberRecordsPerSec, + InvalidMessageCrcRecordsPerSec, + InvalidOffsetOrSequenceRecordsPerSec, + UncleanLeaderElectionsPerSec, + IsrShrinksPerSec, + IsrExpandsPerSec, + ReassignmentBytesOutPerSec, + ReassignmentBytesInPerSec, + ProduceMessageConversionsPerSec, + FailedFetchRequestsPerSec, + ZooKeeperSyncConnectsPerSec, + BytesRejectedPerSec, + ZooKeeperAuthFailuresPerSec, + TotalFetchRequestsPerSec, + FailedIsrUpdatesPerSec, + IncrementalFetchSessionEvictionsPerSec, + FetchMessageConversionsPerSec, + FailedProduceRequestsPerSec +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/kafka/KafkaConstants.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/kafka/KafkaConstants.java index 9d13a6c95bc..0454ed57dcf 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/kafka/KafkaConstants.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/kafka/KafkaConstants.java @@ -10,11 +10,6 @@ public final class KafkaConstants { private KafkaConstants() { } - public static String IN_BYTE_PER_SEC_METRIC = "incoming-byte-rate"; - public static String IN_BYTE_PER_SEC_METRIC_DESCRIPTION = "The number of bytes read off all sockets per second"; - public static String OUT_BYTE_PER_SEC_METRIC = "outgoing-byte-rate"; - public static String OUT_BYTE_PER_SEC_METRIC_DESCRIPTION = "The number of outgoing bytes sent to all servers per second"; - public static Map TOPIC_DEFAULT_CONFIGS = Map.ofEntries( new AbstractMap.SimpleEntry<>(CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_DELETE), new AbstractMap.SimpleEntry<>(COMPRESSION_TYPE_CONFIG, "producer"), diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/kafka/KafkaService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/kafka/KafkaService.java index 5d6529acd8d..a3d466bcaf9 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/kafka/KafkaService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/kafka/KafkaService.java @@ -42,12 +42,14 @@ public class KafkaService { private final Map adminClientCache = new ConcurrentHashMap<>(); private final Map> leadersCache = new ConcurrentHashMap<>(); private final JmxClusterUtil jmxClusterUtil; + private final ClustersStorage clustersStorage; @SneakyThrows public Mono getUpdatedCluster(KafkaCluster cluster) { - return getOrCreateAdminClient(cluster).flatMap( - ac -> getClusterMetrics(cluster, ac.getAdminClient()) - + return getOrCreateAdminClient(cluster) + .flatMap( + ac -> getClusterMetrics(ac.getAdminClient()) + .flatMap(i -> fillJmxMetrics(i, cluster.getName(), ac.getAdminClient())) .flatMap( clusterMetrics -> getTopicsData(ac.getAdminClient()).flatMap( topics -> loadTopicsConfig(ac.getAdminClient(), topics.stream().map(InternalTopic::getName).collect(Collectors.toList())) @@ -155,19 +157,13 @@ private Mono> getTopicsData(AdminClient adminClient) { .map( m -> m.values().stream().map(ClusterUtil::mapToInternalTopic).collect(Collectors.toList())); } - private Mono getClusterMetrics(KafkaCluster cluster, AdminClient client) { + private Mono getClusterMetrics(AdminClient client) { return ClusterUtil.toMono(client.describeCluster().nodes()) .flatMap(brokers -> ClusterUtil.toMono(client.describeCluster().controller()).map( c -> { InternalClusterMetrics.InternalClusterMetricsBuilder metricsBuilder = InternalClusterMetrics.builder(); metricsBuilder.brokerCount(brokers.size()).activeControllers(c != null ? 1 : 0); - Map bytesInPerSec = jmxClusterUtil.getJmxTrafficMetrics(cluster.getJmxPort(), c.host(), JmxClusterUtil.BYTES_IN_PER_SEC); - Map bytesOutPerSec = jmxClusterUtil.getJmxTrafficMetrics(cluster.getJmxPort(), c.host(), JmxClusterUtil.BYTES_OUT_PER_SEC); - metricsBuilder - .internalBrokerMetrics((brokers.stream().map(Node::id).collect(Collectors.toMap(k -> k, v -> InternalBrokerMetrics.builder().build())))) - .bytesOutPerSec(bytesOutPerSec) - .bytesInPerSec(bytesInPerSec); return metricsBuilder.build(); } ) @@ -249,7 +245,7 @@ public Mono> getConsumerGroups(KafkaCluster cluster) { .flatMap(s -> ClusterUtil.toMono(ac.getAdminClient() .describeConsumerGroups(s.stream().map(ConsumerGroupListing::groupId).collect(Collectors.toList())).all())) .map(s -> s.values().stream() - .map(c -> ClusterUtil.convertToConsumerGroup(c, cluster)).collect(Collectors.toList()))); + .map(ClusterUtil::convertToConsumerGroup).collect(Collectors.toList()))); } public KafkaConsumer createConsumer(KafkaCluster cluster) { @@ -332,7 +328,7 @@ private Mono updateSegmentMetrics(AdminClient ac, Intern var brokerSegmentSize = log.get(e.getKey()).values().stream() .mapToLong(v -> v.replicaInfos.values().stream() .mapToLong(r -> r.size).sum()).sum(); - InternalBrokerMetrics tempBrokerMetrics = InternalBrokerMetrics.builder().segmentSize(brokerSegmentSize).build(); + InternalBrokerMetrics tempBrokerMetrics = e.getValue().toBuilder().segmentSize(brokerSegmentSize).build(); return Collections.singletonMap(e.getKey(), tempBrokerMetrics); }); @@ -348,6 +344,39 @@ private Mono updateSegmentMetrics(AdminClient ac, Intern ); } + public List getJmxMetric(String clusterName, Node node) { + return clustersStorage.getClusterByName(clusterName) + .map(c -> jmxClusterUtil.getJmxMetrics(c.getJmxPort(), node.host())).orElse(Collections.emptyList()); + } + + private Mono fillJmxMetrics (InternalClusterMetrics internalClusterMetrics, String clusterName, AdminClient ac) { + return fillBrokerMetrics(internalClusterMetrics, clusterName, ac).map(this::calculateClusterMetrics); + } + + private Mono fillBrokerMetrics(InternalClusterMetrics internalClusterMetrics, String clusterName, AdminClient ac) { + return ClusterUtil.toMono(ac.describeCluster().nodes()) + .flatMapIterable(nodes -> nodes) + .map(broker -> Map.of(broker.id(), InternalBrokerMetrics.builder(). + jmxMetrics(getJmxMetric(clusterName, broker)).build())) + .collectList() + .map(s -> internalClusterMetrics.toBuilder().internalBrokerMetrics(ClusterUtil.toSingleMap(s.stream())).build()); + } + + private InternalClusterMetrics calculateClusterMetrics(InternalClusterMetrics internalClusterMetrics) { + return internalClusterMetrics.toBuilder().metrics( + jmxClusterUtil.convertToMetricDto(internalClusterMetrics) + .stream().map(c -> { + Metric jmx = new Metric(); + jmx.setCanonicalName(c.getCanonicalName()); + jmx.setValue(Map.of(c.getMetricName(), c.getValue())); + return jmx; + }).collect(Collectors.groupingBy(Metric::getCanonicalName, Collectors.reducing(jmxClusterUtil::reduceJmxMetrics))) + .values().stream() + .filter(Optional::isPresent) + .map(Optional::get) + .collect(Collectors.toList())).build(); + } + public List partitionDtoList (InternalTopic topic, KafkaCluster cluster) { var topicPartitions = topic.getPartitions().stream().map(t -> new TopicPartition(topic.getName(), t.getPartition())).collect(Collectors.toList()); return getTopicPartitionOffset(cluster, topicPartitions); diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java index e566ee37382..f7308fdc69e 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java @@ -30,12 +30,10 @@ public Mono>> getClusters(ServerWebExchange exchang } @Override - public Mono> getBrokersMetrics(String clusterName, ServerWebExchange exchange) { - return Mono.just( - clusterService.getBrokersMetrics(clusterName) + public Mono> getBrokersMetrics(String clusterName, Integer id, ServerWebExchange exchange) { + return clusterService.getBrokersMetrics(clusterName, id) .map(ResponseEntity::ok) - .orElse(ResponseEntity.notFound().build()) - ); + .onErrorReturn(ResponseEntity.notFound().build()); } @Override @@ -97,7 +95,8 @@ public Mono> getConsumerGroup(String cluste public Mono> updateTopic(String clusterId, String topicName, @Valid Mono topicFormData, ServerWebExchange exchange) { return clusterService.updateTopic(clusterId, topicName, topicFormData).map(ResponseEntity::ok); } - + + private Mono parseConsumerPosition(SeekType seekType, List seekTo) { return Mono.justOrEmpty(seekTo) .defaultIfEmpty(Collections.emptyList()) diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/zookeeper/ZookeeperService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/zookeeper/ZookeeperService.java index c116dc5ad85..26b6f300b2a 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/zookeeper/ZookeeperService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/zookeeper/ZookeeperService.java @@ -1,6 +1,5 @@ package com.provectus.kafka.ui.zookeeper; -import com.provectus.kafka.ui.cluster.model.ClustersStorage; import com.provectus.kafka.ui.cluster.model.KafkaCluster; import lombok.RequiredArgsConstructor; import lombok.extern.log4j.Log4j2; diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml index 6fe0f3a0910..8e2212b7b11 100644 --- a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml +++ b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml @@ -52,7 +52,7 @@ paths: items: $ref: '#/components/schemas/Broker' - /api/clusters/{clusterName}/metrics/broker: + /api/clusters/{clusterName}/metrics/broker/{id}: get: tags: - /api/clusters @@ -64,6 +64,11 @@ paths: required: true schema: type: string + - name: id + in: path + required: true + schema: + type: integer responses: 200: description: OK @@ -307,14 +312,10 @@ components: type: integer topicCount: type: integer - bytesInPerSec: - type: object - additionalProperties: - type: number - bytesOutPerSec: - type: object - additionalProperties: - type: number + metrics: + type: array + items: + $ref: '#/components/schemas/Metric' required: - id - name @@ -345,6 +346,10 @@ components: type: integer segmentZise: type: integer + metrics: + type: array + items: + $ref: '#/components/schemas/Metric' Topic: type: object @@ -433,6 +438,8 @@ components: properties: id: type: string + host: + type: string ConsumerGroup: type: object @@ -526,4 +533,14 @@ components: consumers: type: array items: - $ref: '#/components/schemas/ConsumerTopicPartitionDetail' \ No newline at end of file + $ref: '#/components/schemas/ConsumerTopicPartitionDetail' + + Metric: + type: object + properties: + canonicalName: + type: string + value: + type: string + additionalProperties: + type: number \ No newline at end of file