Skip to content

Commit

Permalink
provectus#187 added search and sorting (provectus#388)
Browse files Browse the repository at this point in the history
* - added search by topic name
- added filter by internal topics
- added sort by multiple columns
- added tests

* moved sorting later in the stream
  • Loading branch information
RamazanYapparov committed Apr 27, 2021
1 parent 0d2cbf2 commit 6a3c61d
Show file tree
Hide file tree
Showing 4 changed files with 180 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.provectus.kafka.ui.api.TopicsApi;
import com.provectus.kafka.ui.model.Topic;
import com.provectus.kafka.ui.model.TopicColumnsToSort;
import com.provectus.kafka.ui.model.TopicConfig;
import com.provectus.kafka.ui.model.TopicCreation;
import com.provectus.kafka.ui.model.TopicDetails;
Expand Down Expand Up @@ -64,9 +65,19 @@ public Mono<ResponseEntity<TopicDetails>> getTopicDetails(
@Override
public Mono<ResponseEntity<TopicsResponse>> getTopics(String clusterName, @Valid Integer page,
@Valid Integer perPage,
@Valid Boolean showInternal,
@Valid String search,
@Valid TopicColumnsToSort orderBy,
ServerWebExchange exchange) {
return Mono.just(ResponseEntity.ok(clusterService
.getTopics(clusterName, Optional.ofNullable(page), Optional.ofNullable(perPage))));
.getTopics(
clusterName,
Optional.ofNullable(page),
Optional.ofNullable(perPage),
Optional.ofNullable(showInternal),
Optional.ofNullable(search),
Optional.ofNullable(orderBy)
)));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import com.provectus.kafka.ui.model.InternalTopic;
import com.provectus.kafka.ui.model.KafkaCluster;
import com.provectus.kafka.ui.model.Topic;
import com.provectus.kafka.ui.model.TopicColumnsToSort;
import com.provectus.kafka.ui.model.TopicConfig;
import com.provectus.kafka.ui.model.TopicConsumerGroups;
import com.provectus.kafka.ui.model.TopicCreation;
Expand All @@ -32,6 +33,7 @@
import java.util.stream.Stream;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.common.TopicPartition;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
Expand Down Expand Up @@ -80,26 +82,55 @@ public Mono<ClusterMetrics> getClusterMetrics(String name) {


public TopicsResponse getTopics(String name, Optional<Integer> page,
Optional<Integer> nullablePerPage) {
Optional<Integer> nullablePerPage,
Optional<Boolean> showInternal,
Optional<String> search,
Optional<TopicColumnsToSort> sortBy) {
Predicate<Integer> positiveInt = i -> i > 0;
int perPage = nullablePerPage.filter(positiveInt).orElse(DEFAULT_PAGE_SIZE);
var topicsToSkip = (page.filter(positiveInt).orElse(1) - 1) * perPage;
var cluster = clustersStorage.getClusterByName(name)
.orElseThrow(ClusterNotFoundException::new);
var totalPages = (cluster.getTopics().size() / perPage)
+ (cluster.getTopics().size() % perPage == 0 ? 0 : 1);
List<Topic> topics = cluster.getTopics().values().stream()
.filter(topic -> !topic.isInternal()
|| showInternal
.map(i -> topic.isInternal() == i)
.orElse(true))
.filter(topic ->
search
.map(s -> StringUtils.containsIgnoreCase(topic.getName(), s))
.orElse(true))
.sorted(getComparatorForTopic(sortBy))
.map(clusterMapper::toTopic)
.collect(Collectors.toList());
var totalPages = (topics.size() / perPage)
+ (topics.size() % perPage == 0 ? 0 : 1);
return new TopicsResponse()
.pageCount(totalPages)
.topics(
cluster.getTopics().values().stream()
.sorted(Comparator.comparing(InternalTopic::getName))
topics.stream()
.skip(topicsToSkip)
.limit(perPage)
.map(clusterMapper::toTopic)
.collect(Collectors.toList())
);
}

private Comparator<InternalTopic> getComparatorForTopic(Optional<TopicColumnsToSort> sortBy) {
var defaultComparator = Comparator.comparing(InternalTopic::getName);
if (sortBy.isEmpty()) {
return defaultComparator;
}
switch (sortBy.get()) {
case TOTAL_PARTITIONS:
return Comparator.comparing(InternalTopic::getPartitionCount);
case OUT_OF_SYNC_REPLICAS:
return Comparator.comparing(t -> t.getReplicas() - t.getInSyncReplicas());
case NAME:
default:
return defaultComparator;
}
}

public Optional<TopicDetails> getTopicDetails(String name, String topicName) {
return clustersStorage.getClusterByName(name)
.flatMap(c ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.provectus.kafka.ui.model.InternalTopic;
import com.provectus.kafka.ui.model.KafkaCluster;
import com.provectus.kafka.ui.model.Topic;
import com.provectus.kafka.ui.model.TopicColumnsToSort;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
Expand Down Expand Up @@ -47,7 +48,9 @@ public void shouldListFirst20Topics() {
)
.build()));

var topics = clusterService.getTopics(topicName, Optional.empty(), Optional.empty());
var topics = clusterService.getTopics(topicName,
Optional.empty(), Optional.empty(), Optional.empty(),
Optional.empty(), Optional.empty());
assertThat(topics.getPageCount()).isEqualTo(5);
assertThat(topics.getTopics()).hasSize(20);
assertThat(topics.getTopics()).map(Topic::getName).isSorted();
Expand All @@ -69,7 +72,8 @@ public void shouldCalculateCorrectPageCountForNonDivisiblePageSize() {
)
.build()));

var topics = clusterService.getTopics(topicName, Optional.of(4), Optional.of(33));
var topics = clusterService.getTopics(topicName, Optional.of(4), Optional.of(33),
Optional.empty(), Optional.empty(), Optional.empty());
assertThat(topics.getPageCount()).isEqualTo(4);
assertThat(topics.getTopics()).hasSize(1)
.first().extracting(Topic::getName).isEqualTo("99");
Expand All @@ -91,9 +95,111 @@ public void shouldCorrectlyHandleNonPositivePageNumberAndPageSize() {
)
.build()));

var topics = clusterService.getTopics(topicName, Optional.of(0), Optional.of(-1));
var topics = clusterService.getTopics(topicName, Optional.of(0), Optional.of(-1),
Optional.empty(), Optional.empty(), Optional.empty());
assertThat(topics.getPageCount()).isEqualTo(5);
assertThat(topics.getTopics()).hasSize(20);
assertThat(topics.getTopics()).map(Topic::getName).isSorted();
}

@Test
public void shouldListBotInternalAndNonInternalTopics() {
var topicName = UUID.randomUUID().toString();

when(clustersStorage.getClusterByName(topicName))
.thenReturn(Optional.of(KafkaCluster.builder()
.topics(
IntStream.rangeClosed(1, 100).boxed()
.map(Objects::toString)
.collect(Collectors.toMap(Function.identity(), e -> InternalTopic.builder()
.partitions(Map.of())
.name(e)
.internal(Integer.parseInt(e) % 10 == 0)
.build()))
)
.build()));

var topics = clusterService.getTopics(topicName,
Optional.empty(), Optional.empty(), Optional.of(true),
Optional.empty(), Optional.empty());
assertThat(topics.getPageCount()).isEqualTo(5);
assertThat(topics.getTopics()).hasSize(20);
assertThat(topics.getTopics()).map(Topic::getName).isSorted();
}


@Test
public void shouldListOnlyNonInternalTopics() {
var topicName = UUID.randomUUID().toString();

when(clustersStorage.getClusterByName(topicName))
.thenReturn(Optional.of(KafkaCluster.builder()
.topics(
IntStream.rangeClosed(1, 100).boxed()
.map(Objects::toString)
.collect(Collectors.toMap(Function.identity(), e -> InternalTopic.builder()
.partitions(Map.of())
.name(e)
.internal(Integer.parseInt(e) % 10 == 0)
.build()))
)
.build()));

var topics = clusterService.getTopics(topicName,
Optional.empty(), Optional.empty(), Optional.of(true),
Optional.empty(), Optional.empty());
assertThat(topics.getPageCount()).isEqualTo(5);
assertThat(topics.getTopics()).hasSize(20);
assertThat(topics.getTopics()).map(Topic::getName).isSorted();
}


@Test
public void shouldListOnlyTopicsContainingOne() {
var topicName = UUID.randomUUID().toString();

when(clustersStorage.getClusterByName(topicName))
.thenReturn(Optional.of(KafkaCluster.builder()
.topics(
IntStream.rangeClosed(1, 100).boxed()
.map(Objects::toString)
.collect(Collectors.toMap(Function.identity(), e -> InternalTopic.builder()
.partitions(Map.of())
.name(e)
.build()))
)
.build()));

var topics = clusterService.getTopics(topicName,
Optional.empty(), Optional.empty(), Optional.empty(),
Optional.of("1"), Optional.empty());
assertThat(topics.getPageCount()).isEqualTo(1);
assertThat(topics.getTopics()).hasSize(20);
assertThat(topics.getTopics()).map(Topic::getName).isSorted();
}

@Test
public void shouldListTopicsOrderedByPartitionsCount() {
var topicName = UUID.randomUUID().toString();

when(clustersStorage.getClusterByName(topicName))
.thenReturn(Optional.of(KafkaCluster.builder()
.topics(
IntStream.rangeClosed(1, 100).boxed()
.map(Objects::toString)
.collect(Collectors.toMap(Function.identity(), e -> InternalTopic.builder()
.partitions(Map.of())
.name(e)
.partitionCount(100 - Integer.parseInt(e))
.build()))
)
.build()));

var topics = clusterService.getTopics(topicName,
Optional.empty(), Optional.empty(), Optional.empty(),
Optional.empty(), Optional.of(TopicColumnsToSort.TOTAL_PARTITIONS));
assertThat(topics.getPageCount()).isEqualTo(5);
assertThat(topics.getTopics()).hasSize(20);
assertThat(topics.getTopics()).map(Topic::getPartitionCount).isSorted();
}
}
22 changes: 22 additions & 0 deletions kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,21 @@ paths:
required: false
schema:
type: integer
- name: showInternal
in: query
required: false
schema:
type: boolean
- name: search
in: query
required: false
schema:
type: string
- name: orderBy
in: query
required: false
schema:
$ref: '#/components/schemas/TopicColumnsToSort'
responses:
200:
description: OK
Expand Down Expand Up @@ -1245,6 +1260,13 @@ components:
items:
$ref: '#/components/schemas/Topic'

TopicColumnsToSort:
type: string
enum:
- NAME
- OUT_OF_SYNC_REPLICAS
- TOTAL_PARTITIONS

Topic:
type: object
properties:
Expand Down

0 comments on commit 6a3c61d

Please sign in to comment.