Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backend jmx metrics #64

Merged
merged 27 commits into from
Jul 30, 2020
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
d46a39e
Start doing endpoint for jmx metrics
Jun 16, 2020
914360e
Merge branch 'master' into backend-jmx-metrics
Jun 18, 2020
a562c98
Merge branch 'master' into backend-jmx-metrics
Jun 18, 2020
f427d01
Added endpoint for getting jmx metric per broker
Jun 23, 2020
a24b218
Cluster jmx metrics sum endpoit added
Jun 23, 2020
90df65b
Added endpoints for cluster metrics and broker metrics
Jun 29, 2020
b320aa6
Cleared some code
Jun 29, 2020
319d027
Fixed jmxmetrics names
Jun 29, 2020
0c72e07
Changed to all values in metrics
Jun 30, 2020
e5c1bb8
Removed redundant imports
Jun 30, 2020
4e624c1
Renamed param constant
Jun 30, 2020
65bc33e
Merge branch 'master' into backend-jmx-metrics
Jul 1, 2020
40e21c4
Changed to calculate brokers and clusters metrics in one place
Jul 4, 2020
6035003
Removed redundant imports
Jul 4, 2020
bceb161
Fixed some mistakes
Jul 4, 2020
c64579c
Replaced multiple method usage into single
Jul 7, 2020
51a6a74
Fixed mulptiple call
Jul 7, 2020
b218600
Removed cluster level metrics, now only broker-level metrics in cluster
Jul 9, 2020
21ca9a0
Just small fixes
Jul 9, 2020
04ca14d
removed redundant variable
Jul 9, 2020
105ef89
Renamed method for cluster level metrics
Jul 9, 2020
4edc5f8
Fixed after PR and added sum for number cluster metrics by num and pe…
Jul 15, 2020
5beb6ae
Added metricdto object
Jul 23, 2020
5ef0269
Added list of metrics to enum
Jul 23, 2020
9b04938
Renames and optimizings
Jul 23, 2020
c81c51d
Renamed jmxmetrics objects param to metrics
Jul 27, 2020
3e212aa
Fixed some issues
Jul 27, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,16 @@
import org.mapstruct.Mapper;
import org.mapstruct.Mapping;

import java.math.BigDecimal;

@Mapper(componentModel = "spring")
public interface ClusterMapper {

KafkaCluster toKafkaCluster(ClustersProperties.Cluster clusterProperties);

@Mapping(target = "brokerCount", source = "metrics.brokerCount")
@Mapping(target = "onlinePartitionCount", source = "metrics.onlinePartitionCount")
@Mapping(target = "topicCount", source = "metrics.topicCount")
@Mapping(target = "bytesInPerSec", source = "metrics.bytesInPerSec")
@Mapping(target = "bytesOutPerSec", source = "metrics.bytesOutPerSec")
@Mapping(target = "jmxMetrics", source = "metrics.jmxMetrics")
Cluster toCluster(KafkaCluster cluster);

default BigDecimal map (Number number) {
return new BigDecimal(number.toString());
}

KafkaCluster toKafkaCluster(ClustersProperties.Cluster clusterProperties);
BrokersMetrics toBrokerMetrics(InternalClusterMetrics metrics);
Topic toTopic(InternalTopic topic);
TopicDetails toTopicDetails(InternalTopic topic);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package com.provectus.kafka.ui.cluster.model;

import com.provectus.kafka.ui.model.JmxMetric;
import lombok.Builder;
import lombok.Data;

import java.math.BigDecimal;
import java.util.List;
import java.util.Map;


Expand All @@ -24,5 +26,6 @@ public class InternalClusterMetrics {
private final int segmentCount;
private final long segmentSize;
private final Map<Integer, InternalBrokerMetrics> internalBrokerMetrics;
private final List<JmxMetric> jmxMetrics;
private final int zooKeeperStatus;
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.provectus.kafka.ui.cluster.model.ConsumerPosition;
import com.provectus.kafka.ui.cluster.model.KafkaCluster;
import com.provectus.kafka.ui.cluster.util.ClusterUtil;
import com.provectus.kafka.ui.cluster.util.JmxClusterUtil;
import com.provectus.kafka.ui.kafka.KafkaService;
import com.provectus.kafka.ui.model.*;
import lombok.RequiredArgsConstructor;
Expand All @@ -30,6 +31,7 @@ public class ClusterService {
private final ClusterMapper clusterMapper;
private final KafkaService kafkaService;
private final ConsumingService consumingService;
private final JmxClusterUtil jmxClusterUtil;

public List<Cluster> getClusters() {
return clustersStorage.getKafkaClusters()
Expand All @@ -38,10 +40,12 @@ public List<Cluster> getClusters() {
.collect(Collectors.toList());
}

public Optional<BrokersMetrics> getBrokersMetrics(String name) {
public Mono<BrokersMetrics> getBrokersMetrics(String name, Integer id) {
return clustersStorage.getClusterByName(name)
.map(KafkaCluster::getMetrics)
.map(clusterMapper::toBrokerMetrics);
.map(s -> kafkaService.getJmxMetric(name, id)
.map(j -> s.toBuilder().jmxMetrics(j).build()))
.map(s -> s.map(clusterMapper::toBrokerMetrics)).orElseThrow();
}

public List<Topic> getTopics(String name) {
Expand Down Expand Up @@ -124,6 +128,7 @@ public Flux<Broker> getBrokers (String clusterName) {
.map(n -> n.stream().map(node -> {
Broker broker = new Broker();
broker.setId(node.idString());
broker.setHost(node.host());
return broker;
}).collect(Collectors.toList())))
.flatMapMany(Flux::fromIterable);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.provectus.kafka.ui.cluster.util;

import com.provectus.kafka.ui.cluster.model.*;
import com.provectus.kafka.ui.cluster.deserialization.RecordDeserializer;
import com.provectus.kafka.ui.cluster.model.*;
import com.provectus.kafka.ui.model.*;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.*;
Expand All @@ -28,10 +28,6 @@
@Slf4j
public class ClusterUtil {





private static final String CLUSTER_VERSION_PARAM_KEY = "inter.broker.protocol.version";

private static final ZoneId UTC_ZONE_ID = ZoneId.of("UTC");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.provectus.kafka.ui.cluster.util;

import com.provectus.kafka.ui.model.JmxMetric;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.pool2.KeyedObjectPool;
Expand All @@ -10,10 +11,11 @@
import java.io.IOException;
import java.math.BigDecimal;
import java.net.MalformedURLException;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

@Component
@Slf4j
Expand All @@ -24,26 +26,46 @@ public class JmxClusterUtil {

private static final String JMX_URL = "service:jmx:rmi:///jndi/rmi://";
private static final String JMX_SERVICE_TYPE = "jmxrmi";
private static final String KAFKA_SERVER_PARAM = "kafka.server";

public static final String BYTES_IN_PER_SEC = "BytesInPerSec";
public static final String BYTES_OUT_PER_SEC = "BytesOutPerSec";
private static final String BYTES_IN_PER_SEC_MBEAN_OBJECT_NAME = "kafka.server:type=BrokerTopicMetrics,name=" + BYTES_IN_PER_SEC;
private static final String BYTES_OUT_PER_SEC_MBEAN_OBJECT_NAME = "kafka.server:type=BrokerTopicMetrics,name=" + BYTES_OUT_PER_SEC;

private static final List<String> attrNames = Arrays.asList("OneMinuteRate", "FiveMinuteRate", "FifteenMinuteRate");
public List<JmxMetric> getJmxMetrics(int jmxPort, String jmxHost) {
String jmxUrl = JMX_URL + jmxHost + ":" + jmxPort + "/" + JMX_SERVICE_TYPE;
List<JmxMetric> result = new ArrayList<>();
JMXConnector srv = null;
try {
srv = pool.borrowObject(jmxUrl);
MBeanServerConnection msc = srv.getMBeanServerConnection();
var jmxMetrics = msc.queryNames(null, null).stream().filter(q -> q.getCanonicalName().startsWith(KAFKA_SERVER_PARAM)).collect(Collectors.toList());
jmxMetrics.forEach(j -> {
JmxMetric metric = new JmxMetric();
metric.setCanonicalName(j.getCanonicalName());
metric.setValue(getJmxMetric(jmxPort, jmxHost, j.getCanonicalName()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks strange

result.add(metric);
});
pool.returnObject(jmxUrl, srv);
} catch (IOException ioe) {
log.error("Cannot get jmxMetricsNames, {}", jmxUrl, ioe);
closeConnectionExceptionally(jmxUrl, srv);
} catch (Exception e) {
log.error("Cannot get JmxConnection from pool, {}", jmxUrl, e);
closeConnectionExceptionally(jmxUrl, srv);
}
return result;
}

public Map<String, Number> getJmxTrafficMetrics(int jmxPort, String jmxHost, String metricName) {
private Map<String, Object> getJmxMetric(int jmxPort, String jmxHost, String canonicalName) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why object here?

String jmxUrl = JMX_URL + jmxHost + ":" + jmxPort + "/" + JMX_SERVICE_TYPE;
Map<String, Number> result = new HashMap<>();

Map<String, Object> resultAttr = new HashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's better to collect metrics once, instead of doing it cluster-wide and then per broker

JMXConnector srv = null;
try {
srv = pool.borrowObject(jmxUrl);
MBeanServerConnection msc = srv.getMBeanServerConnection();
ObjectName name = metricName.equals(BYTES_IN_PER_SEC) ? new ObjectName(BYTES_IN_PER_SEC_MBEAN_OBJECT_NAME) :
new ObjectName(BYTES_OUT_PER_SEC_MBEAN_OBJECT_NAME);
for (String attrName : attrNames) {
Number value = (Number) msc.getAttribute(name, attrName);
result.put(attrName, value instanceof Double ? BigDecimal.valueOf((Double) value) : Integer.valueOf(value.toString()));

ObjectName name = new ObjectName(canonicalName);
var attrNames = msc.getMBeanInfo(name).getAttributes();
for (MBeanAttributeInfo attrName : attrNames) {
resultAttr.put(attrName.getName(), msc.getAttribute(name, attrName.getName()));
}
pool.returnObject(jmxUrl, srv);
} catch (MalformedURLException url) {
Expand All @@ -62,7 +84,7 @@ public Map<String, Number> getJmxTrafficMetrics(int jmxPort, String jmxHost, Str
log.error("Error while retrieving connection {} from pool", jmxUrl);
closeConnectionExceptionally(jmxUrl, srv);
}
return result;
return resultAttr;
}

private void closeConnectionExceptionally(String url, JMXConnector srv) {
Expand All @@ -72,4 +94,12 @@ private void closeConnectionExceptionally(String url, JMXConnector srv) {
log.error("Cannot invalidate object in pool, {}", url);
}
}

public static Object metricValueReduce(Object value1, Object value2) {
if (value1 instanceof Number) {
return new BigDecimal(value1.toString()).add(new BigDecimal(value2.toString()));
} else {
return value1;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,7 @@
import com.provectus.kafka.ui.cluster.model.*;
import com.provectus.kafka.ui.cluster.util.ClusterUtil;
import com.provectus.kafka.ui.cluster.util.JmxClusterUtil;
import com.provectus.kafka.ui.model.ConsumerGroup;
import com.provectus.kafka.ui.model.ServerStatus;
import com.provectus.kafka.ui.model.Topic;
import com.provectus.kafka.ui.model.TopicFormData;
import com.provectus.kafka.ui.model.*;
import com.provectus.kafka.ui.zookeeper.ZookeeperService;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
Expand All @@ -26,7 +23,6 @@
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;

import java.math.BigDecimal;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
Expand All @@ -46,6 +42,7 @@ public class KafkaService {
private final Map<String, ExtendedAdminClient> adminClientCache = new ConcurrentHashMap<>();
private final Map<AdminClient, Map<TopicPartition, Integer>> leadersCache = new ConcurrentHashMap<>();
private final JmxClusterUtil jmxClusterUtil;
private final ClustersStorage clustersStorage;

@SneakyThrows
public Mono<KafkaCluster> getUpdatedCluster(KafkaCluster cluster) {
Expand Down Expand Up @@ -162,20 +159,18 @@ private Mono<List<InternalTopic>> getTopicsData(AdminClient adminClient) {
private Mono<InternalClusterMetrics> getClusterMetrics(KafkaCluster cluster, AdminClient client) {
return ClusterUtil.toMono(client.describeCluster().nodes())
.flatMap(brokers ->
ClusterUtil.toMono(client.describeCluster().controller()).map(
c -> {
ClusterUtil.toMono(client.describeCluster().controller()).flatMap(
c ->
getClusterJmxMetric(cluster.getName()).map(jmxMetric -> {
InternalClusterMetrics.InternalClusterMetricsBuilder metricsBuilder = InternalClusterMetrics.builder();
metricsBuilder.brokerCount(brokers.size()).activeControllers(c != null ? 1 : 0);
Map<String, Number> bytesInPerSec = jmxClusterUtil.getJmxTrafficMetrics(cluster.getJmxPort(), c.host(), JmxClusterUtil.BYTES_IN_PER_SEC);
Map<String, Number> bytesOutPerSec = jmxClusterUtil.getJmxTrafficMetrics(cluster.getJmxPort(), c.host(), JmxClusterUtil.BYTES_OUT_PER_SEC);
metricsBuilder
.internalBrokerMetrics((brokers.stream().map(Node::id).collect(Collectors.toMap(k -> k, v -> InternalBrokerMetrics.builder().build()))))
.bytesOutPerSec(bytesOutPerSec)
.bytesInPerSec(bytesInPerSec);
.jmxMetrics(jmxMetric);
return metricsBuilder.build();
}
)
);
));
}


Expand Down Expand Up @@ -351,4 +346,32 @@ private Mono<InternalSegmentSizeDto> updateSegmentMetrics(AdminClient ac, Intern
})
);
}

public Mono<List<JmxMetric>> getClusterJmxMetric(String clusterName) {
return clustersStorage.getClusterByName(clusterName)
.map(c -> getOrCreateAdminClient(c)
.flatMap(eac -> ClusterUtil.toMono(eac.getAdminClient().describeCluster().nodes()))
.flatMapIterable(n -> n.stream().flatMap(node -> Stream.of(node.host())).collect(Collectors.toList()))
.map(host -> jmxClusterUtil.getJmxMetrics(c.getJmxPort(), host))
.collectList()
.map(s -> s.stream().reduce((s1, s2) -> {
s1.forEach(j1 -> {
s2.forEach(j2 -> {
if (j1.getCanonicalName().equals(j2.getCanonicalName())) {
j1.getValue().keySet().forEach(k -> j2.getValue().compute(k, (k1, v1) ->
JmxClusterUtil.metricValueReduce(j1, j2.getValue().get(k1))));
}
});
});
return s1;
}).orElseThrow())).orElseThrow();
}

public Mono<List<JmxMetric>> getJmxMetric(String clusterName, Integer nodeId) {
return clustersStorage.getClusterByName(clusterName)
.map(c -> getOrCreateAdminClient(c)
.flatMap(a -> ClusterUtil.toMono(a.getAdminClient().describeCluster().nodes())
.map(n -> n.stream().filter(s -> s.id() == nodeId).findFirst().orElseThrow().host()))
.map(host -> jmxClusterUtil.getJmxMetrics(c.getJmxPort(), host))).orElseThrow();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import com.provectus.kafka.ui.cluster.service.ClusterService;
import com.provectus.kafka.ui.model.*;
import lombok.RequiredArgsConstructor;

import org.apache.commons.lang3.tuple.Pair;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
Expand All @@ -14,12 +13,11 @@
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import javax.validation.Valid;
import java.util.Collections;
import java.util.List;
import java.util.function.Function;

import javax.validation.Valid;

@RestController
@RequiredArgsConstructor
public class MetricsRestController implements ApiClustersApi {
Expand All @@ -32,12 +30,10 @@ public Mono<ResponseEntity<Flux<Cluster>>> getClusters(ServerWebExchange exchang
}

@Override
public Mono<ResponseEntity<BrokersMetrics>> getBrokersMetrics(String clusterName, ServerWebExchange exchange) {
return Mono.just(
clusterService.getBrokersMetrics(clusterName)
public Mono<ResponseEntity<BrokersMetrics>> getBrokersMetrics(String clusterName, Integer id, ServerWebExchange exchange) {
return clusterService.getBrokersMetrics(clusterName, id)
.map(ResponseEntity::ok)
.orElse(ResponseEntity.notFound().build())
);
.onErrorReturn(ResponseEntity.notFound().build());
}

@Override
Expand Down Expand Up @@ -99,7 +95,8 @@ public Mono<ResponseEntity<ConsumerGroupDetails>> getConsumerGroup(String cluste
public Mono<ResponseEntity<Topic>> updateTopic(String clusterId, String topicName, @Valid Mono<TopicFormData> topicFormData, ServerWebExchange exchange) {
return clusterService.updateTopic(clusterId, topicName, topicFormData).map(ResponseEntity::ok);
}



private Mono<ConsumerPosition> parseConsumerPosition(SeekType seekType, List<String> seekTo) {
return Mono.justOrEmpty(seekTo)
.defaultIfEmpty(Collections.emptyList())
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.provectus.kafka.ui.zookeeper;

import com.provectus.kafka.ui.cluster.model.ClustersStorage;
import com.provectus.kafka.ui.cluster.model.KafkaCluster;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
Expand Down
Loading