From 6a3c61d5c310deb2f4ac44ce59a3cb0dbbd61a44 Mon Sep 17 00:00:00 2001 From: Ramazan Yapparov Date: Tue, 27 Apr 2021 13:32:08 +0300 Subject: [PATCH] #187 added search and sorting (#388) * - added search by topic name - added filter by internal topics - added sort by multiple columns - added tests * moved sorting later in the stream --- .../kafka/ui/controller/TopicsController.java | 13 +- .../kafka/ui/service/ClusterService.java | 43 ++++++- .../kafka/ui/service/ClusterServiceTest.java | 112 +++++++++++++++++- .../main/resources/swagger/kafka-ui-api.yaml | 22 ++++ 4 files changed, 180 insertions(+), 10 deletions(-) diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java index be76f78c46a..b7e934e7b85 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java @@ -2,6 +2,7 @@ import com.provectus.kafka.ui.api.TopicsApi; import com.provectus.kafka.ui.model.Topic; +import com.provectus.kafka.ui.model.TopicColumnsToSort; import com.provectus.kafka.ui.model.TopicConfig; import com.provectus.kafka.ui.model.TopicCreation; import com.provectus.kafka.ui.model.TopicDetails; @@ -64,9 +65,19 @@ public Mono> getTopicDetails( @Override public Mono> getTopics(String clusterName, @Valid Integer page, @Valid Integer perPage, + @Valid Boolean showInternal, + @Valid String search, + @Valid TopicColumnsToSort orderBy, ServerWebExchange exchange) { return Mono.just(ResponseEntity.ok(clusterService - .getTopics(clusterName, Optional.ofNullable(page), Optional.ofNullable(perPage)))); + .getTopics( + clusterName, + Optional.ofNullable(page), + Optional.ofNullable(perPage), + Optional.ofNullable(showInternal), + Optional.ofNullable(search), + Optional.ofNullable(orderBy) + ))); } @Override diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClusterService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClusterService.java index 70743d5f6f6..63dd249200b 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClusterService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClusterService.java @@ -14,6 +14,7 @@ import com.provectus.kafka.ui.model.InternalTopic; import com.provectus.kafka.ui.model.KafkaCluster; import com.provectus.kafka.ui.model.Topic; +import com.provectus.kafka.ui.model.TopicColumnsToSort; import com.provectus.kafka.ui.model.TopicConfig; import com.provectus.kafka.ui.model.TopicConsumerGroups; import com.provectus.kafka.ui.model.TopicCreation; @@ -32,6 +33,7 @@ import java.util.stream.Stream; import lombok.RequiredArgsConstructor; import lombok.SneakyThrows; +import org.apache.commons.lang3.StringUtils; import org.apache.kafka.common.TopicPartition; import org.springframework.stereotype.Service; import reactor.core.publisher.Flux; @@ -80,26 +82,55 @@ public Mono getClusterMetrics(String name) { public TopicsResponse getTopics(String name, Optional page, - Optional nullablePerPage) { + Optional nullablePerPage, + Optional showInternal, + Optional search, + Optional sortBy) { Predicate positiveInt = i -> i > 0; int perPage = nullablePerPage.filter(positiveInt).orElse(DEFAULT_PAGE_SIZE); var topicsToSkip = (page.filter(positiveInt).orElse(1) - 1) * perPage; var cluster = clustersStorage.getClusterByName(name) .orElseThrow(ClusterNotFoundException::new); - var totalPages = (cluster.getTopics().size() / perPage) - + (cluster.getTopics().size() % perPage == 0 ? 0 : 1); + List topics = cluster.getTopics().values().stream() + .filter(topic -> !topic.isInternal() + || showInternal + .map(i -> topic.isInternal() == i) + .orElse(true)) + .filter(topic -> + search + .map(s -> StringUtils.containsIgnoreCase(topic.getName(), s)) + .orElse(true)) + .sorted(getComparatorForTopic(sortBy)) + .map(clusterMapper::toTopic) + .collect(Collectors.toList()); + var totalPages = (topics.size() / perPage) + + (topics.size() % perPage == 0 ? 0 : 1); return new TopicsResponse() .pageCount(totalPages) .topics( - cluster.getTopics().values().stream() - .sorted(Comparator.comparing(InternalTopic::getName)) + topics.stream() .skip(topicsToSkip) .limit(perPage) - .map(clusterMapper::toTopic) .collect(Collectors.toList()) ); } + private Comparator getComparatorForTopic(Optional sortBy) { + var defaultComparator = Comparator.comparing(InternalTopic::getName); + if (sortBy.isEmpty()) { + return defaultComparator; + } + switch (sortBy.get()) { + case TOTAL_PARTITIONS: + return Comparator.comparing(InternalTopic::getPartitionCount); + case OUT_OF_SYNC_REPLICAS: + return Comparator.comparing(t -> t.getReplicas() - t.getInSyncReplicas()); + case NAME: + default: + return defaultComparator; + } + } + public Optional getTopicDetails(String name, String topicName) { return clustersStorage.getClusterByName(name) .flatMap(c -> diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/ClusterServiceTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/ClusterServiceTest.java index 0281c5a4c16..76e3e79944d 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/ClusterServiceTest.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/ClusterServiceTest.java @@ -7,6 +7,7 @@ import com.provectus.kafka.ui.model.InternalTopic; import com.provectus.kafka.ui.model.KafkaCluster; import com.provectus.kafka.ui.model.Topic; +import com.provectus.kafka.ui.model.TopicColumnsToSort; import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -47,7 +48,9 @@ public void shouldListFirst20Topics() { ) .build())); - var topics = clusterService.getTopics(topicName, Optional.empty(), Optional.empty()); + var topics = clusterService.getTopics(topicName, + Optional.empty(), Optional.empty(), Optional.empty(), + Optional.empty(), Optional.empty()); assertThat(topics.getPageCount()).isEqualTo(5); assertThat(topics.getTopics()).hasSize(20); assertThat(topics.getTopics()).map(Topic::getName).isSorted(); @@ -69,7 +72,8 @@ public void shouldCalculateCorrectPageCountForNonDivisiblePageSize() { ) .build())); - var topics = clusterService.getTopics(topicName, Optional.of(4), Optional.of(33)); + var topics = clusterService.getTopics(topicName, Optional.of(4), Optional.of(33), + Optional.empty(), Optional.empty(), Optional.empty()); assertThat(topics.getPageCount()).isEqualTo(4); assertThat(topics.getTopics()).hasSize(1) .first().extracting(Topic::getName).isEqualTo("99"); @@ -91,9 +95,111 @@ public void shouldCorrectlyHandleNonPositivePageNumberAndPageSize() { ) .build())); - var topics = clusterService.getTopics(topicName, Optional.of(0), Optional.of(-1)); + var topics = clusterService.getTopics(topicName, Optional.of(0), Optional.of(-1), + Optional.empty(), Optional.empty(), Optional.empty()); assertThat(topics.getPageCount()).isEqualTo(5); assertThat(topics.getTopics()).hasSize(20); assertThat(topics.getTopics()).map(Topic::getName).isSorted(); } + + @Test + public void shouldListBotInternalAndNonInternalTopics() { + var topicName = UUID.randomUUID().toString(); + + when(clustersStorage.getClusterByName(topicName)) + .thenReturn(Optional.of(KafkaCluster.builder() + .topics( + IntStream.rangeClosed(1, 100).boxed() + .map(Objects::toString) + .collect(Collectors.toMap(Function.identity(), e -> InternalTopic.builder() + .partitions(Map.of()) + .name(e) + .internal(Integer.parseInt(e) % 10 == 0) + .build())) + ) + .build())); + + var topics = clusterService.getTopics(topicName, + Optional.empty(), Optional.empty(), Optional.of(true), + Optional.empty(), Optional.empty()); + assertThat(topics.getPageCount()).isEqualTo(5); + assertThat(topics.getTopics()).hasSize(20); + assertThat(topics.getTopics()).map(Topic::getName).isSorted(); + } + + + @Test + public void shouldListOnlyNonInternalTopics() { + var topicName = UUID.randomUUID().toString(); + + when(clustersStorage.getClusterByName(topicName)) + .thenReturn(Optional.of(KafkaCluster.builder() + .topics( + IntStream.rangeClosed(1, 100).boxed() + .map(Objects::toString) + .collect(Collectors.toMap(Function.identity(), e -> InternalTopic.builder() + .partitions(Map.of()) + .name(e) + .internal(Integer.parseInt(e) % 10 == 0) + .build())) + ) + .build())); + + var topics = clusterService.getTopics(topicName, + Optional.empty(), Optional.empty(), Optional.of(true), + Optional.empty(), Optional.empty()); + assertThat(topics.getPageCount()).isEqualTo(5); + assertThat(topics.getTopics()).hasSize(20); + assertThat(topics.getTopics()).map(Topic::getName).isSorted(); + } + + + @Test + public void shouldListOnlyTopicsContainingOne() { + var topicName = UUID.randomUUID().toString(); + + when(clustersStorage.getClusterByName(topicName)) + .thenReturn(Optional.of(KafkaCluster.builder() + .topics( + IntStream.rangeClosed(1, 100).boxed() + .map(Objects::toString) + .collect(Collectors.toMap(Function.identity(), e -> InternalTopic.builder() + .partitions(Map.of()) + .name(e) + .build())) + ) + .build())); + + var topics = clusterService.getTopics(topicName, + Optional.empty(), Optional.empty(), Optional.empty(), + Optional.of("1"), Optional.empty()); + assertThat(topics.getPageCount()).isEqualTo(1); + assertThat(topics.getTopics()).hasSize(20); + assertThat(topics.getTopics()).map(Topic::getName).isSorted(); + } + + @Test + public void shouldListTopicsOrderedByPartitionsCount() { + var topicName = UUID.randomUUID().toString(); + + when(clustersStorage.getClusterByName(topicName)) + .thenReturn(Optional.of(KafkaCluster.builder() + .topics( + IntStream.rangeClosed(1, 100).boxed() + .map(Objects::toString) + .collect(Collectors.toMap(Function.identity(), e -> InternalTopic.builder() + .partitions(Map.of()) + .name(e) + .partitionCount(100 - Integer.parseInt(e)) + .build())) + ) + .build())); + + var topics = clusterService.getTopics(topicName, + Optional.empty(), Optional.empty(), Optional.empty(), + Optional.empty(), Optional.of(TopicColumnsToSort.TOTAL_PARTITIONS)); + assertThat(topics.getPageCount()).isEqualTo(5); + assertThat(topics.getTopics()).hasSize(20); + assertThat(topics.getTopics()).map(Topic::getPartitionCount).isSorted(); + } } diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml index a89bbbc55af..ded5da35eff 100644 --- a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml +++ b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml @@ -140,6 +140,21 @@ paths: required: false schema: type: integer + - name: showInternal + in: query + required: false + schema: + type: boolean + - name: search + in: query + required: false + schema: + type: string + - name: orderBy + in: query + required: false + schema: + $ref: '#/components/schemas/TopicColumnsToSort' responses: 200: description: OK @@ -1245,6 +1260,13 @@ components: items: $ref: '#/components/schemas/Topic' + TopicColumnsToSort: + type: string + enum: + - NAME + - OUT_OF_SYNC_REPLICAS + - TOTAL_PARTITIONS + Topic: type: object properties: