diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ConsumerGroupsController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ConsumerGroupsController.java index cd5336cf2b5..ba52586ee09 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ConsumerGroupsController.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ConsumerGroupsController.java @@ -3,6 +3,7 @@ import com.provectus.kafka.ui.api.ConsumerGroupsApi; import com.provectus.kafka.ui.model.ConsumerGroup; import com.provectus.kafka.ui.model.ConsumerGroupDetails; +import com.provectus.kafka.ui.model.TopicConsumerGroups; import com.provectus.kafka.ui.service.ClusterService; import lombok.RequiredArgsConstructor; import lombok.extern.log4j.Log4j2; @@ -34,4 +35,11 @@ public Mono>> getConsumerGroups(String cluste .map(ResponseEntity::ok) .switchIfEmpty(Mono.just(ResponseEntity.notFound().build())); } + + @Override + public Mono> getTopicConsumerGroups( + String clusterName, String topicName, ServerWebExchange exchange) { + return clusterService.getTopicConsumerGroupDetail(clusterName, topicName) + .map(ResponseEntity::ok); + } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClusterService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClusterService.java index 6f07f070ce0..70743d5f6f6 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClusterService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClusterService.java @@ -15,33 +15,28 @@ import com.provectus.kafka.ui.model.KafkaCluster; import com.provectus.kafka.ui.model.Topic; import com.provectus.kafka.ui.model.TopicConfig; +import com.provectus.kafka.ui.model.TopicConsumerGroups; import com.provectus.kafka.ui.model.TopicCreation; import com.provectus.kafka.ui.model.TopicDetails; import com.provectus.kafka.ui.model.TopicMessage; import com.provectus.kafka.ui.model.TopicUpdate; import com.provectus.kafka.ui.model.TopicsResponse; import com.provectus.kafka.ui.util.ClusterUtil; -import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.Properties; -import java.util.UUID; import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.Stream; import lombok.RequiredArgsConstructor; import lombok.SneakyThrows; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.stereotype.Service; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.util.function.Tuples; @Service @RequiredArgsConstructor @@ -142,42 +137,29 @@ public Mono getConsumerGroupDetail(String clusterName, return kafkaService.getOrCreateAdminClient(cluster).map(ac -> ac.getAdminClient().describeConsumerGroups(Collections.singletonList(consumerGroupId)).all() ).flatMap(groups -> - groupMetadata(cluster, consumerGroupId) + kafkaService.groupMetadata(cluster, consumerGroupId) .flatMap(offsets -> { Map endOffsets = - topicPartitionsEndOffsets(cluster, offsets.keySet()); - return ClusterUtil.toMono(groups).map(s -> s.get(consumerGroupId).members().stream() - .flatMap(c -> Stream.of(ClusterUtil - .convertToConsumerTopicPartitionDetails(c, offsets, endOffsets))) - .collect(Collectors.toList()).stream() - .flatMap(t -> t.stream().flatMap(Stream::of)).collect(Collectors.toList())); - }) - ) - .map(c -> new ConsumerGroupDetails().consumers(c).consumerGroupId(consumerGroupId)); - - } - - public Mono> groupMetadata(KafkaCluster cluster, - String consumerGroupId) { - return - kafkaService.getOrCreateAdminClient(cluster) - .map(ac -> ac.getAdminClient().listConsumerGroupOffsets(consumerGroupId) - .partitionsToOffsetAndMetadata()) - .flatMap(ClusterUtil::toMono); - } - - public Map topicPartitionsEndOffsets( - KafkaCluster cluster, Collection topicPartitions) { - Properties properties = new Properties(); - properties.putAll(cluster.getProperties()); - properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers()); - properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - properties.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString()); - - try (KafkaConsumer consumer = new KafkaConsumer<>(properties)) { - return consumer.endOffsets(topicPartitions); - } + kafkaService.topicPartitionsEndOffsets(cluster, offsets.keySet()); + return ClusterUtil.toMono(groups).map(s -> + Tuples.of( + s.get(consumerGroupId), + s.get(consumerGroupId).members().stream() + .flatMap(c -> + Stream.of( + ClusterUtil.convertToConsumerTopicPartitionDetails( + c, offsets, endOffsets, consumerGroupId + ) + ) + ) + .collect(Collectors.toList()).stream() + .flatMap(t -> + t.stream().flatMap(Stream::of) + ).collect(Collectors.toList()) + ) + ); + }).map(c -> ClusterUtil.convertToConsumerGroupDetails(c.getT1(), c.getT2())) + ); } public Mono> getConsumerGroups(String clusterName) { @@ -186,6 +168,13 @@ public Mono> getConsumerGroups(String clusterName) { .flatMap(kafkaService::getConsumerGroups); } + public Mono getTopicConsumerGroupDetail( + String clusterName, String topicName) { + return Mono.justOrEmpty(clustersStorage.getClusterByName(clusterName)) + .switchIfEmpty(Mono.error(ClusterNotFoundException::new)) + .flatMap(c -> kafkaService.getTopicConsumerGroups(c, topicName)); + } + public Flux getBrokers(String clusterName) { return kafkaService .getOrCreateAdminClient(clustersStorage.getClusterByName(clusterName).orElseThrow()) @@ -251,4 +240,6 @@ public Mono deleteTopicMessages(String clusterName, String topicName, return consumingService.offsetsForDeletion(cluster, topicName, partitions) .flatMap(offsets -> kafkaService.deleteTopicMessages(cluster, offsets)); } + + } \ No newline at end of file diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaService.java index 4ceb965d412..259351ef029 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaService.java @@ -12,6 +12,7 @@ import com.provectus.kafka.ui.model.KafkaCluster; import com.provectus.kafka.ui.model.Metric; import com.provectus.kafka.ui.model.ServerStatus; +import com.provectus.kafka.ui.model.TopicConsumerGroups; import com.provectus.kafka.ui.model.TopicCreation; import com.provectus.kafka.ui.model.TopicUpdate; import com.provectus.kafka.ui.util.ClusterUtil; @@ -38,12 +39,14 @@ import org.apache.kafka.clients.admin.AlterConfigOp; import org.apache.kafka.clients.admin.Config; import org.apache.kafka.clients.admin.ConfigEntry; +import org.apache.kafka.clients.admin.ConsumerGroupDescription; import org.apache.kafka.clients.admin.ConsumerGroupListing; import org.apache.kafka.clients.admin.ListTopicsOptions; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.admin.RecordsToDelete; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.config.ConfigResource; @@ -296,15 +299,71 @@ private Mono>> loadTopicsConfig( ); } + public Mono> getConsumerGroupsInternal( + KafkaCluster cluster) { + return getOrCreateAdminClient(cluster).flatMap(ac -> + ClusterUtil.toMono(ac.getAdminClient().listConsumerGroups().all()) + .flatMap(s -> + ClusterUtil.toMono( + ac.getAdminClient().describeConsumerGroups( + s.stream().map(ConsumerGroupListing::groupId).collect(Collectors.toList()) + ).all() + ).map(Map::values) + ) + ); + } + public Mono> getConsumerGroups(KafkaCluster cluster) { - return getOrCreateAdminClient(cluster) - .flatMap(ac -> ClusterUtil.toMono(ac.getAdminClient().listConsumerGroups().all()) - .flatMap(s -> ClusterUtil.toMono(ac.getAdminClient() - .describeConsumerGroups( - s.stream().map(ConsumerGroupListing::groupId).collect(Collectors.toList())) - .all())) - .map(s -> s.values().stream() - .map(ClusterUtil::convertToConsumerGroup).collect(Collectors.toList()))); + return getConsumerGroupsInternal(cluster) + .map(c -> c.stream().map(ClusterUtil::convertToConsumerGroup).collect(Collectors.toList())); + } + + public Mono getTopicConsumerGroups(KafkaCluster cluster, String topic) { + final Map endOffsets = topicEndOffsets(cluster, topic); + + return getConsumerGroupsInternal(cluster) + .flatMapIterable(c -> + c.stream() + .map(d -> ClusterUtil.filterConsumerGroupTopic(d, topic)) + .filter(Optional::isPresent) + .map(Optional::get) + .map(d -> + groupMetadata(cluster, d.groupId()) + .flatMapIterable(meta -> + d.members().stream().flatMap(m -> + ClusterUtil.convertToConsumerTopicPartitionDetails( + m, meta, endOffsets, d.groupId() + ).stream() + ).collect(Collectors.toList()) + ) + ).collect(Collectors.toList()) + ).flatMap(f -> f).collectList().map(l -> new TopicConsumerGroups().consumers(l)); + } + + public Mono> groupMetadata(KafkaCluster cluster, + String consumerGroupId) { + return getOrCreateAdminClient(cluster).map(ac -> + ac.getAdminClient() + .listConsumerGroupOffsets(consumerGroupId) + .partitionsToOffsetAndMetadata() + ).flatMap(ClusterUtil::toMono); + } + + public Map topicEndOffsets( + KafkaCluster cluster, String topic) { + try (KafkaConsumer consumer = createConsumer(cluster)) { + final List topicPartitions = consumer.partitionsFor(topic).stream() + .map(i -> new TopicPartition(i.topic(), i.partition())) + .collect(Collectors.toList()); + return consumer.endOffsets(topicPartitions); + } + } + + public Map topicPartitionsEndOffsets( + KafkaCluster cluster, Collection topicPartitions) { + try (KafkaConsumer consumer = createConsumer(cluster)) { + return consumer.endOffsets(topicPartitions); + } } public KafkaConsumer createConsumer(KafkaCluster cluster) { @@ -571,4 +630,6 @@ public Mono deleteTopicMessages(KafkaCluster cluster, Map ac.deleteRecords(records)).then(); } + + } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/ClusterUtil.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/ClusterUtil.java index 3bad2dcea4d..7abd4b64dde 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/ClusterUtil.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/ClusterUtil.java @@ -5,6 +5,7 @@ import com.provectus.kafka.ui.deserialization.RecordDeserializer; import com.provectus.kafka.ui.model.ConsumerGroup; +import com.provectus.kafka.ui.model.ConsumerGroupDetails; import com.provectus.kafka.ui.model.ConsumerTopicPartitionDetail; import com.provectus.kafka.ui.model.ExtendedAdminClient; import com.provectus.kafka.ui.model.InternalPartition; @@ -30,6 +31,7 @@ import org.apache.kafka.clients.admin.Config; import org.apache.kafka.clients.admin.ConfigEntry; import org.apache.kafka.clients.admin.ConsumerGroupDescription; +import org.apache.kafka.clients.admin.MemberAssignment; import org.apache.kafka.clients.admin.MemberDescription; import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -77,20 +79,40 @@ public static ConsumerGroup convertToConsumerGroup(ConsumerGroupDescription c) { .flatMap(m -> m.assignment().topicPartitions().stream().flatMap(t -> Stream.of(t.topic()))) .collect(Collectors.toSet()).size(); consumerGroup.setNumTopics(numTopics); + consumerGroup.setSimple(c.isSimpleConsumerGroup()); + Optional.ofNullable(c.state()) + .ifPresent(s -> consumerGroup.setState(s.name())); + Optional.ofNullable(c.coordinator()) + .ifPresent(coord -> consumerGroup.setCoordintor(coord.host())); + consumerGroup.setPartitionAssignor(c.partitionAssignor()); return consumerGroup; } + public static ConsumerGroupDetails convertToConsumerGroupDetails( + ConsumerGroupDescription desc, List consumers + ) { + return new ConsumerGroupDetails() + .consumers(consumers) + .consumerGroupId(desc.groupId()) + .simple(desc.isSimpleConsumerGroup()) + .coordintor(Optional.ofNullable(desc.coordinator()).map(Node::host).orElse("")) + .state(Optional.ofNullable(desc.state()).map(Enum::name).orElse("")) + .partitionAssignor(desc.partitionAssignor()); + } + public static List convertToConsumerTopicPartitionDetails( MemberDescription consumer, Map groupOffsets, - Map endOffsets + Map endOffsets, + String groupId ) { return consumer.assignment().topicPartitions().stream() .map(tp -> { - Long currentOffset = Optional.ofNullable( - groupOffsets.get(tp)).map(o -> o.offset()).orElse(0L); - Long endOffset = Optional.ofNullable(endOffsets.get(tp)).orElse(0L); + long currentOffset = Optional.ofNullable(groupOffsets.get(tp)) + .map(OffsetAndMetadata::offset).orElse(0L); + long endOffset = Optional.ofNullable(endOffsets.get(tp)).orElse(0L); ConsumerTopicPartitionDetail cd = new ConsumerTopicPartitionDetail(); + cd.setGroupId(groupId); cd.setConsumerId(consumer.consumerId()); cd.setHost(consumer.host()); cd.setTopic(tp.topic()); @@ -250,4 +272,42 @@ public static Map toSingleMap(Stream> streamOfMaps) { .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))).orElseThrow(); } + public static Optional filterConsumerGroupTopic( + ConsumerGroupDescription description, String topic) { + final List members = description.members().stream() + .map(m -> filterConsumerMemberTopic(m, topic)) + .filter(m -> !m.assignment().topicPartitions().isEmpty()) + .collect(Collectors.toList()); + + if (!members.isEmpty()) { + return Optional.of( + new ConsumerGroupDescription( + description.groupId(), + description.isSimpleConsumerGroup(), + members, + description.partitionAssignor(), + description.state(), + description.coordinator() + ) + ); + } else { + return Optional.empty(); + } + } + + public static MemberDescription filterConsumerMemberTopic( + MemberDescription description, String topic) { + final Set topicPartitions = description.assignment().topicPartitions() + .stream().filter(tp -> tp.topic().equals(topic)) + .collect(Collectors.toSet()); + MemberAssignment assignment = new MemberAssignment(topicPartitions); + return new MemberDescription( + description.consumerId(), + description.groupInstanceId(), + description.clientId(), + description.host(), + assignment + ); + } + } diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml index c1531a62aed..86bd3cd0ece 100644 --- a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml +++ b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml @@ -346,6 +346,31 @@ paths: 404: description: Not found + /api/clusters/{clusterName}/topics/{topicName}/consumergroups: + get: + tags: + - Consumer Groups + summary: get Consumer Groups By Topics + operationId: getTopicConsumerGroups + parameters: + - name: clusterName + in: path + required: true + schema: + type: string + - name: topicName + in: path + required: true + schema: + type: string + responses: + 200: + description: OK + content: + application/json: + schema: + $ref: '#/components/schemas/TopicConsumerGroups' + /api/clusters/{clusterName}/consumer-groups/{id}: get: tags: @@ -1330,6 +1355,14 @@ components: type: integer numTopics: type: integer + simple: + type: boolean + partitionAssignor: + type: string + state: + type: string + coordintor: + type: string required: - clusterId - consumerGroupId @@ -1397,6 +1430,8 @@ components: ConsumerTopicPartitionDetail: type: object properties: + groupId: + type: string consumerId: type: string topic: @@ -1416,12 +1451,28 @@ components: format: int64 required: - consumerId - + + TopicConsumerGroups: + type: object + properties: + consumers: + type: array + items: + $ref: '#/components/schemas/ConsumerTopicPartitionDetail' + ConsumerGroupDetails: type: object properties: consumerGroupId: type: string + simple: + type: boolean + partitionAssignor: + type: string + state: + type: string + coordintor: + type: string consumers: type: array items: