Skip to content

Commit

Permalink
Update topic info after create/update ops (provectus#114)
Browse files Browse the repository at this point in the history
  • Loading branch information
germanosin committed Nov 9, 2020
1 parent 7cecc21 commit f8a693c
Show file tree
Hide file tree
Showing 8 changed files with 154 additions and 119 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
import org.mapstruct.Mapper;
import org.mapstruct.Mapping;

import java.util.Map;
import java.util.stream.Collectors;

@Mapper(componentModel = "spring")
public interface ClusterMapper {

Expand All @@ -19,7 +22,13 @@ public interface ClusterMapper {
ClusterMetrics toClusterMetrics(InternalClusterMetrics metrics);
BrokerMetrics toBrokerMetrics(InternalBrokerMetrics metrics);
Topic toTopic(InternalTopic topic);
Partition toPartition(InternalPartition topic);
TopicDetails toTopicDetails(InternalTopic topic);
TopicConfig toTopicConfig(InternalTopicConfig topic);
Replica toReplica(InternalReplica replica);

default java.util.List<Partition> map(Map<Integer, InternalPartition> map) {
return map.values().stream().map(this::toPartition).collect(Collectors.toList());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public class InternalClusterMetrics {
private final int outOfSyncReplicasCount;
private final Map<String, Number> bytesInPerSec;
private final Map<String, Number> bytesOutPerSec;
private final int segmentCount;
private final long segmentCount;
private final long segmentSize;
private final Map<Integer, InternalBrokerMetrics> internalBrokerMetrics;
private final List<Metric> metrics;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,6 @@ public class InternalPartition {
private final int replicasCount;
private final long offsetMin;
private final long offsetMax;
private final long segmentSize;
private final long segmentCount;
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public class InternalTopic {

private final String name;
private final boolean internal;
private final List<InternalPartition> partitions;
private final Map<Integer,InternalPartition> partitions;
private final List<InternalTopicConfig> topicConfigs;

private final int replicas;
Expand All @@ -22,6 +22,6 @@ public class InternalTopic {
private final int replicationFactor;
private final int underReplicatedPartitions;
private final long segmentSize;
private final int segmentCount;
private final Map<TopicPartition, Long> partitionSegmentSize;
private final long segmentCount;
// private final Map<TopicPartition, Long> partitionSegmentSize;
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.provectus.kafka.ui.cluster.mapper.ClusterMapper;
import com.provectus.kafka.ui.cluster.model.ClustersStorage;
import com.provectus.kafka.ui.cluster.model.ConsumerPosition;
import com.provectus.kafka.ui.cluster.model.InternalTopic;
import com.provectus.kafka.ui.cluster.model.KafkaCluster;
import com.provectus.kafka.ui.cluster.util.ClusterUtil;
import com.provectus.kafka.ui.kafka.KafkaService;
Expand Down Expand Up @@ -83,10 +84,12 @@ public Optional<List<TopicConfig>> getTopicConfigs(String name, String topicName
.map(t -> t.getTopicConfigs().stream().map(clusterMapper::toTopicConfig).collect(Collectors.toList()));
}

public Mono<Topic> createTopic(String name, Mono<TopicFormData> topicFormData) {
return clustersStorage.getClusterByName(name).map(
cluster -> kafkaService.createTopic(cluster, topicFormData)
).orElse(Mono.empty()).map(clusterMapper::toTopic);
public Mono<Topic> createTopic(String clusterName, Mono<TopicFormData> topicFormData) {
return clustersStorage.getClusterByName(clusterName).map(cluster ->
kafkaService.createTopic(cluster, topicFormData)
.doOnNext(t -> updateCluster(t, clusterName, cluster))
.map(clusterMapper::toTopic)
).orElse(Mono.empty());
}

@SneakyThrows
Expand Down Expand Up @@ -151,18 +154,15 @@ public Mono<Topic> updateTopic(String clusterName, String topicName, Mono<TopicF
return clustersStorage.getClusterByName(clusterName).map(cl ->
topicFormData
.flatMap(t -> kafkaService.updateTopic(cl, topicName, t))
.doOnNext(t -> updateCluster(t, clusterName, cl))
.map(clusterMapper::toTopic)
.flatMap(t -> updateCluster(t, clusterName, cl))
)
.orElse(Mono.empty());
).orElse(Mono.empty());
}

private <T> Mono<T> updateCluster(T topic, String clusterName, KafkaCluster cluster) {
return kafkaService.getUpdatedCluster(cluster)
.map(c -> {
clustersStorage.setKafkaCluster(clusterName, c);
return topic;
});
private KafkaCluster updateCluster(InternalTopic topic, String clusterName, KafkaCluster cluster) {
final KafkaCluster updatedCluster = kafkaService.getUpdatedCluster(cluster, topic);
clustersStorage.setKafkaCluster(clusterName, updatedCluster);
return updatedCluster;
}

public Flux<TopicMessage> getMessages(String clusterName, String topicName, ConsumerPosition consumerPosition, String query, Integer limit) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,25 +1,5 @@
package com.provectus.kafka.ui.cluster.service;

import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;

import java.time.Duration;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;

import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Bytes;
import org.springframework.stereotype.Service;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.provectus.kafka.ui.cluster.deserialization.DeserializationService;
Expand All @@ -30,11 +10,26 @@
import com.provectus.kafka.ui.kafka.KafkaService;
import com.provectus.kafka.ui.model.SeekType;
import com.provectus.kafka.ui.model.TopicMessage;

import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Bytes;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.scheduler.Schedulers;

import java.time.Duration;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

@Service
@Log4j2
@RequiredArgsConstructor
Expand Down Expand Up @@ -128,7 +123,7 @@ private List<TopicPartition> getRequestedPartitions() {

return Optional.ofNullable(cluster.getTopics().get(topic))
.orElseThrow(() -> new IllegalArgumentException("Unknown topic: " + topic))
.getPartitions().stream()
.getPartitions().values().stream()
.filter(internalPartition -> partitionPositions.isEmpty() || partitionPositions.containsKey(internalPartition.getPartition()))
.map(partitionInfo -> new TopicPartition(topic, partitionInfo.getPartition()))
.collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,10 @@ public static InternalTopic mapToInternalTopic(TopicDescription topicDescription
.mapToInt(InternalPartition::getReplicasCount)
.sum();

topic.partitions(partitions);
topic.partitions(partitions.stream().collect(Collectors.toMap(
InternalPartition::getPartition,
t -> t
)));
topic.replicas(replicasCount);
topic.partitionCount(topicDescription.partitions().size());
topic.inSyncReplicas(inSyncReplicasCount);
Expand Down
Loading

0 comments on commit f8a693c

Please sign in to comment.