Skip to content

Commit

Permalink
Added custom properties & fixed reset policy (provectus#134)
Browse files Browse the repository at this point in the history
* Added custom properties support
  • Loading branch information
germanosin committed Dec 2, 2020
1 parent 9bb2e28 commit 27467e4
Show file tree
Hide file tree
Showing 8 changed files with 29 additions and 9 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.provectus.kafka.ui.cluster.config;

import java.util.Properties;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
Expand All @@ -24,5 +25,6 @@ public static class Cluster {
String protobufFile;
String protobufMessageName;
int jmxPort;
Properties properties;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ private MessageFormat detectFormat(ConsumerRecord<Bytes, Bytes> record) {
final List<Integer> versions = schemaRegistryClient.getAllVersions(schemaName);
if (!versions.isEmpty()) {
final Integer version = versions.iterator().next();
final Schema schema = schemaRegistryClient.getByVersion(record.topic(), version, false);
final String subjectName = String.format(cluster.getSchemaNameTemplate(), record.topic());
final Schema schema = schemaRegistryClient.getByVersion(subjectName, version, false);
if (schema.getSchemaType().equals(MessageFormat.PROTOBUF.name())) {
try {
protobufDeserializer.deserialize(record.topic(), record.value().get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.provectus.kafka.ui.cluster.config.ClustersProperties;
import com.provectus.kafka.ui.cluster.model.*;
import com.provectus.kafka.ui.model.*;
import java.util.Properties;
import org.mapstruct.Mapper;
import org.mapstruct.Mapping;

Expand All @@ -22,6 +23,7 @@ public interface ClusterMapper {
@Mapping(target = "bytesOutPerSec", source = "metrics.bytesOutPerSec", qualifiedByName = "sumMetrics")
Cluster toCluster(KafkaCluster cluster);
@Mapping(target = "protobufFile", source = "protobufFile", qualifiedByName="resolvePath")
@Mapping(target = "properties", source = "properties", qualifiedByName="setProperties")
KafkaCluster toKafkaCluster(ClustersProperties.Cluster clusterProperties);
@Mapping(target = "diskUsage", source = "internalBrokerDiskUsage", qualifiedByName="mapDiskUsage")
ClusterStats toClusterStats(InternalClusterMetrics metrics);
Expand Down Expand Up @@ -73,4 +75,12 @@ default Path resolvePath(String path) {
}
}

default Properties setProperties(Properties properties) {
Properties copy = new Properties();
if (properties!=null) {
copy.putAll(properties);
}
return copy;
}

}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.provectus.kafka.ui.cluster.model;

import com.provectus.kafka.ui.model.ServerStatus;
import java.util.Properties;
import lombok.Builder;
import lombok.Data;

Expand All @@ -24,4 +25,5 @@ public class KafkaCluster {
private final Throwable lastZookeeperException;
private final Path protobufFile;
private final String protobufMessageName;
private final Properties properties;
}
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ public Mono<Map<TopicPartition, OffsetAndMetadata>> groupMetadata(KafkaCluster c

public Map<TopicPartition, Long> topicPartitionsEndOffsets(KafkaCluster cluster, Collection<TopicPartition> topicPartitions) {
Properties properties = new Properties();
properties.putAll(cluster.getProperties());
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers());
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ private boolean treeContainsValue(JsonNode tree, String query) {

@RequiredArgsConstructor
private static class RecordEmitter {
private static final int MAX_POLLS_COUNT = 30;
private static final int MAX_EMPTY_POLLS_COUNT = 3;
private static final Duration POLL_TIMEOUT_MS = Duration.ofMillis(1000L);

private final KafkaService kafkaService;
Expand All @@ -98,15 +98,16 @@ private static class RecordEmitter {

public void emit(FluxSink<ConsumerRecord<Bytes, Bytes>> sink) {
try (KafkaConsumer<Bytes, Bytes> consumer = kafkaService.createConsumer(cluster)) {
// assignPartitions(consumer);
// seekOffsets(consumer);
assignAndSeek(consumer);
int pollsCount = 0;
while (!sink.isCancelled() && ++pollsCount < MAX_POLLS_COUNT) {
int emptyPollsCount = 0;
log.info("assignment: {}", consumer.assignment());
while (!sink.isCancelled()) {
ConsumerRecords<Bytes, Bytes> records = consumer.poll(POLL_TIMEOUT_MS);
log.info("{} records polled", records.count());
if (records.count() == 0) {
if (records.count() == 0 && emptyPollsCount < MAX_EMPTY_POLLS_COUNT) {
break;
} else {
emptyPollsCount++;
}
records.iterator()
.forEachRemaining(sink::next);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ public Mono<ExtendedAdminClient> getOrCreateAdminClient(KafkaCluster cluster) {

public Mono<ExtendedAdminClient> createAdminClient(KafkaCluster kafkaCluster) {
Properties properties = new Properties();
properties.putAll(kafkaCluster.getProperties());
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaCluster.getBootstrapServers());
properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, clientTimeout);
AdminClient adminClient = AdminClient.create(properties);
Expand Down Expand Up @@ -245,10 +246,12 @@ public Mono<List<ConsumerGroup>> getConsumerGroups(KafkaCluster cluster) {

public KafkaConsumer<Bytes, Bytes> createConsumer(KafkaCluster cluster) {
Properties props = new Properties();
props.putAll(cluster.getProperties());
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "kafka-ui");
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

return new KafkaConsumer<>(props);
}
Expand Down
4 changes: 2 additions & 2 deletions kafka-ui-api/src/main/resources/application.yml
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
kafka:
clusters:
- name: local
bootstrapServers: b-1.sandbox.kbqc5i.c3.kafka.eu-central-1.amazonaws.com:9092
zookeeper: z-2.sandbox.kbqc5i.c3.kafka.eu-central-1.amazonaws.com:2181
bootstrapServers: localhost:9092
zookeeper: localhost:2181
schemaRegistry: http://localhost:8081
admin-client-timeout: 5000
zookeeper:
Expand Down

0 comments on commit 27467e4

Please sign in to comment.