Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#121 Topic level consumer groups & consumer group details #360

Merged
merged 1 commit into from
Apr 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.provectus.kafka.ui.api.ConsumerGroupsApi;
import com.provectus.kafka.ui.model.ConsumerGroup;
import com.provectus.kafka.ui.model.ConsumerGroupDetails;
import com.provectus.kafka.ui.model.TopicConsumerGroups;
import com.provectus.kafka.ui.service.ClusterService;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
Expand Down Expand Up @@ -34,4 +35,11 @@ public Mono<ResponseEntity<Flux<ConsumerGroup>>> getConsumerGroups(String cluste
.map(ResponseEntity::ok)
.switchIfEmpty(Mono.just(ResponseEntity.notFound().build()));
}

@Override
public Mono<ResponseEntity<TopicConsumerGroups>> getTopicConsumerGroups(
String clusterName, String topicName, ServerWebExchange exchange) {
return clusterService.getTopicConsumerGroupDetail(clusterName, topicName)
.map(ResponseEntity::ok);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,33 +15,28 @@
import com.provectus.kafka.ui.model.KafkaCluster;
import com.provectus.kafka.ui.model.Topic;
import com.provectus.kafka.ui.model.TopicConfig;
import com.provectus.kafka.ui.model.TopicConsumerGroups;
import com.provectus.kafka.ui.model.TopicCreation;
import com.provectus.kafka.ui.model.TopicDetails;
import com.provectus.kafka.ui.model.TopicMessage;
import com.provectus.kafka.ui.model.TopicUpdate;
import com.provectus.kafka.ui.model.TopicsResponse;
import com.provectus.kafka.ui.util.ClusterUtil;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.UUID;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuples;

@Service
@RequiredArgsConstructor
Expand Down Expand Up @@ -142,42 +137,29 @@ public Mono<ConsumerGroupDetails> getConsumerGroupDetail(String clusterName,
return kafkaService.getOrCreateAdminClient(cluster).map(ac ->
ac.getAdminClient().describeConsumerGroups(Collections.singletonList(consumerGroupId)).all()
).flatMap(groups ->
groupMetadata(cluster, consumerGroupId)
kafkaService.groupMetadata(cluster, consumerGroupId)
.flatMap(offsets -> {
Map<TopicPartition, Long> endOffsets =
topicPartitionsEndOffsets(cluster, offsets.keySet());
return ClusterUtil.toMono(groups).map(s -> s.get(consumerGroupId).members().stream()
.flatMap(c -> Stream.of(ClusterUtil
.convertToConsumerTopicPartitionDetails(c, offsets, endOffsets)))
.collect(Collectors.toList()).stream()
.flatMap(t -> t.stream().flatMap(Stream::of)).collect(Collectors.toList()));
})
)
.map(c -> new ConsumerGroupDetails().consumers(c).consumerGroupId(consumerGroupId));

}

public Mono<Map<TopicPartition, OffsetAndMetadata>> groupMetadata(KafkaCluster cluster,
String consumerGroupId) {
return
kafkaService.getOrCreateAdminClient(cluster)
.map(ac -> ac.getAdminClient().listConsumerGroupOffsets(consumerGroupId)
.partitionsToOffsetAndMetadata())
.flatMap(ClusterUtil::toMono);
}

public Map<TopicPartition, Long> topicPartitionsEndOffsets(
KafkaCluster cluster, Collection<TopicPartition> topicPartitions) {
Properties properties = new Properties();
properties.putAll(cluster.getProperties());
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers());
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());

try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties)) {
return consumer.endOffsets(topicPartitions);
}
kafkaService.topicPartitionsEndOffsets(cluster, offsets.keySet());
return ClusterUtil.toMono(groups).map(s ->
Tuples.of(
s.get(consumerGroupId),
s.get(consumerGroupId).members().stream()
.flatMap(c ->
Stream.of(
ClusterUtil.convertToConsumerTopicPartitionDetails(
c, offsets, endOffsets, consumerGroupId
)
)
)
.collect(Collectors.toList()).stream()
.flatMap(t ->
t.stream().flatMap(Stream::of)
).collect(Collectors.toList())
)
);
}).map(c -> ClusterUtil.convertToConsumerGroupDetails(c.getT1(), c.getT2()))
);
}

public Mono<List<ConsumerGroup>> getConsumerGroups(String clusterName) {
Expand All @@ -186,6 +168,13 @@ public Mono<List<ConsumerGroup>> getConsumerGroups(String clusterName) {
.flatMap(kafkaService::getConsumerGroups);
}

public Mono<TopicConsumerGroups> getTopicConsumerGroupDetail(
String clusterName, String topicName) {
return Mono.justOrEmpty(clustersStorage.getClusterByName(clusterName))
.switchIfEmpty(Mono.error(ClusterNotFoundException::new))
.flatMap(c -> kafkaService.getTopicConsumerGroups(c, topicName));
}

public Flux<Broker> getBrokers(String clusterName) {
return kafkaService
.getOrCreateAdminClient(clustersStorage.getClusterByName(clusterName).orElseThrow())
Expand Down Expand Up @@ -251,4 +240,6 @@ public Mono<Void> deleteTopicMessages(String clusterName, String topicName,
return consumingService.offsetsForDeletion(cluster, topicName, partitions)
.flatMap(offsets -> kafkaService.deleteTopicMessages(cluster, offsets));
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.provectus.kafka.ui.model.KafkaCluster;
import com.provectus.kafka.ui.model.Metric;
import com.provectus.kafka.ui.model.ServerStatus;
import com.provectus.kafka.ui.model.TopicConsumerGroups;
import com.provectus.kafka.ui.model.TopicCreation;
import com.provectus.kafka.ui.model.TopicUpdate;
import com.provectus.kafka.ui.util.ClusterUtil;
Expand All @@ -38,12 +39,14 @@
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
import org.apache.kafka.clients.admin.ConsumerGroupListing;
import org.apache.kafka.clients.admin.ListTopicsOptions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.RecordsToDelete;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigResource;
Expand Down Expand Up @@ -296,15 +299,71 @@ private Mono<Map<String, List<InternalTopicConfig>>> loadTopicsConfig(
);
}

public Mono<Collection<ConsumerGroupDescription>> getConsumerGroupsInternal(
KafkaCluster cluster) {
return getOrCreateAdminClient(cluster).flatMap(ac ->
ClusterUtil.toMono(ac.getAdminClient().listConsumerGroups().all())
.flatMap(s ->
ClusterUtil.toMono(
ac.getAdminClient().describeConsumerGroups(
s.stream().map(ConsumerGroupListing::groupId).collect(Collectors.toList())
).all()
).map(Map::values)
)
);
}

public Mono<List<ConsumerGroup>> getConsumerGroups(KafkaCluster cluster) {
return getOrCreateAdminClient(cluster)
.flatMap(ac -> ClusterUtil.toMono(ac.getAdminClient().listConsumerGroups().all())
.flatMap(s -> ClusterUtil.toMono(ac.getAdminClient()
.describeConsumerGroups(
s.stream().map(ConsumerGroupListing::groupId).collect(Collectors.toList()))
.all()))
.map(s -> s.values().stream()
.map(ClusterUtil::convertToConsumerGroup).collect(Collectors.toList())));
return getConsumerGroupsInternal(cluster)
.map(c -> c.stream().map(ClusterUtil::convertToConsumerGroup).collect(Collectors.toList()));
}

public Mono<TopicConsumerGroups> getTopicConsumerGroups(KafkaCluster cluster, String topic) {
final Map<TopicPartition, Long> endOffsets = topicEndOffsets(cluster, topic);

return getConsumerGroupsInternal(cluster)
.flatMapIterable(c ->
c.stream()
.map(d -> ClusterUtil.filterConsumerGroupTopic(d, topic))
.filter(Optional::isPresent)
.map(Optional::get)
.map(d ->
groupMetadata(cluster, d.groupId())
.flatMapIterable(meta ->
d.members().stream().flatMap(m ->
ClusterUtil.convertToConsumerTopicPartitionDetails(
m, meta, endOffsets, d.groupId()
).stream()
).collect(Collectors.toList())
)
).collect(Collectors.toList())
).flatMap(f -> f).collectList().map(l -> new TopicConsumerGroups().consumers(l));
}

public Mono<Map<TopicPartition, OffsetAndMetadata>> groupMetadata(KafkaCluster cluster,
String consumerGroupId) {
return getOrCreateAdminClient(cluster).map(ac ->
ac.getAdminClient()
.listConsumerGroupOffsets(consumerGroupId)
.partitionsToOffsetAndMetadata()
).flatMap(ClusterUtil::toMono);
}

public Map<TopicPartition, Long> topicEndOffsets(
KafkaCluster cluster, String topic) {
try (KafkaConsumer<Bytes, Bytes> consumer = createConsumer(cluster)) {
final List<TopicPartition> topicPartitions = consumer.partitionsFor(topic).stream()
.map(i -> new TopicPartition(i.topic(), i.partition()))
.collect(Collectors.toList());
return consumer.endOffsets(topicPartitions);
}
}

public Map<TopicPartition, Long> topicPartitionsEndOffsets(
KafkaCluster cluster, Collection<TopicPartition> topicPartitions) {
try (KafkaConsumer<Bytes, Bytes> consumer = createConsumer(cluster)) {
return consumer.endOffsets(topicPartitions);
}
}

public KafkaConsumer<Bytes, Bytes> createConsumer(KafkaCluster cluster) {
Expand Down Expand Up @@ -571,4 +630,6 @@ public Mono<Void> deleteTopicMessages(KafkaCluster cluster, Map<TopicPartition,
return getOrCreateAdminClient(cluster).map(ExtendedAdminClient::getAdminClient)
.map(ac -> ac.deleteRecords(records)).then();
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import com.provectus.kafka.ui.deserialization.RecordDeserializer;
import com.provectus.kafka.ui.model.ConsumerGroup;
import com.provectus.kafka.ui.model.ConsumerGroupDetails;
import com.provectus.kafka.ui.model.ConsumerTopicPartitionDetail;
import com.provectus.kafka.ui.model.ExtendedAdminClient;
import com.provectus.kafka.ui.model.InternalPartition;
Expand All @@ -30,6 +31,7 @@
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
import org.apache.kafka.clients.admin.MemberAssignment;
import org.apache.kafka.clients.admin.MemberDescription;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.ConsumerRecord;
Expand Down Expand Up @@ -77,20 +79,40 @@ public static ConsumerGroup convertToConsumerGroup(ConsumerGroupDescription c) {
.flatMap(m -> m.assignment().topicPartitions().stream().flatMap(t -> Stream.of(t.topic())))
.collect(Collectors.toSet()).size();
consumerGroup.setNumTopics(numTopics);
consumerGroup.setSimple(c.isSimpleConsumerGroup());
Optional.ofNullable(c.state())
.ifPresent(s -> consumerGroup.setState(s.name()));
Optional.ofNullable(c.coordinator())
.ifPresent(coord -> consumerGroup.setCoordintor(coord.host()));
consumerGroup.setPartitionAssignor(c.partitionAssignor());
return consumerGroup;
}

public static ConsumerGroupDetails convertToConsumerGroupDetails(
ConsumerGroupDescription desc, List<ConsumerTopicPartitionDetail> consumers
) {
return new ConsumerGroupDetails()
.consumers(consumers)
.consumerGroupId(desc.groupId())
.simple(desc.isSimpleConsumerGroup())
.coordintor(Optional.ofNullable(desc.coordinator()).map(Node::host).orElse(""))
.state(Optional.ofNullable(desc.state()).map(Enum::name).orElse(""))
.partitionAssignor(desc.partitionAssignor());
}

public static List<ConsumerTopicPartitionDetail> convertToConsumerTopicPartitionDetails(
MemberDescription consumer,
Map<TopicPartition, OffsetAndMetadata> groupOffsets,
Map<TopicPartition, Long> endOffsets
Map<TopicPartition, Long> endOffsets,
String groupId
) {
return consumer.assignment().topicPartitions().stream()
.map(tp -> {
Long currentOffset = Optional.ofNullable(
groupOffsets.get(tp)).map(o -> o.offset()).orElse(0L);
Long endOffset = Optional.ofNullable(endOffsets.get(tp)).orElse(0L);
long currentOffset = Optional.ofNullable(groupOffsets.get(tp))
.map(OffsetAndMetadata::offset).orElse(0L);
long endOffset = Optional.ofNullable(endOffsets.get(tp)).orElse(0L);
ConsumerTopicPartitionDetail cd = new ConsumerTopicPartitionDetail();
cd.setGroupId(groupId);
cd.setConsumerId(consumer.consumerId());
cd.setHost(consumer.host());
cd.setTopic(tp.topic());
Expand Down Expand Up @@ -250,4 +272,42 @@ public static <T, R> Map<T, R> toSingleMap(Stream<Map<T, R>> streamOfMaps) {
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))).orElseThrow();
}

public static Optional<ConsumerGroupDescription> filterConsumerGroupTopic(
ConsumerGroupDescription description, String topic) {
final List<MemberDescription> members = description.members().stream()
.map(m -> filterConsumerMemberTopic(m, topic))
.filter(m -> !m.assignment().topicPartitions().isEmpty())
.collect(Collectors.toList());

if (!members.isEmpty()) {
return Optional.of(
new ConsumerGroupDescription(
description.groupId(),
description.isSimpleConsumerGroup(),
members,
description.partitionAssignor(),
description.state(),
description.coordinator()
)
);
} else {
return Optional.empty();
}
}

public static MemberDescription filterConsumerMemberTopic(
MemberDescription description, String topic) {
final Set<TopicPartition> topicPartitions = description.assignment().topicPartitions()
.stream().filter(tp -> tp.topic().equals(topic))
.collect(Collectors.toSet());
MemberAssignment assignment = new MemberAssignment(topicPartitions);
return new MemberDescription(
description.consumerId(),
description.groupInstanceId(),
description.clientId(),
description.host(),
assignment
);
}

}
Loading