From a8ed4ff37f5389808afb0550611c93ab1e313a83 Mon Sep 17 00:00:00 2001 From: Ramazan Yapparov Date: Mon, 15 Mar 2021 21:37:36 +0300 Subject: [PATCH] #188 added pagination for get topics api (#249) * added pagination for get topics api * frontend fix * - fixed merge conflicts - renamed pageSize to perPage --- kafka-ui-api/pom.xml | 19 ++++ .../kafka/ui/controller/TopicsController.java | 13 ++- .../kafka/ui/service/ClusterService.java | 22 ++-- .../kafka/ui/service/ClusterServiceTest.java | 101 ++++++++++++++++++ .../main/resources/swagger/kafka-ui-api.yaml | 24 ++++- .../src/redux/actions/thunks.ts | 2 +- pom.xml | 2 + 7 files changed, 166 insertions(+), 17 deletions(-) create mode 100644 kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/ClusterServiceTest.java diff --git a/kafka-ui-api/pom.xml b/kafka-ui-api/pom.xml index 3f8d23a8098..443ce32ca26 100644 --- a/kafka-ui-api/pom.xml +++ b/kafka-ui-api/pom.xml @@ -147,6 +147,25 @@ ${junit-jupiter-engine.version} test + + org.mockito + mockito-core + ${mockito.version} + test + + + org.mockito + mockito-junit-jupiter + ${mockito.version} + test + + + org.assertj + assertj-core + ${assertj.version} + test + + diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java index f4a05fa1db7..034ddd9011f 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java @@ -1,12 +1,8 @@ package com.provectus.kafka.ui.controller; import com.provectus.kafka.ui.api.TopicsApi; +import com.provectus.kafka.ui.model.*; import com.provectus.kafka.ui.service.ClusterService; -import com.provectus.kafka.ui.model.Topic; -import com.provectus.kafka.ui.model.TopicConfig; -import com.provectus.kafka.ui.model.TopicDetails; -import com.provectus.kafka.ui.model.TopicFormData; -import javax.validation.Valid; import lombok.RequiredArgsConstructor; import lombok.extern.log4j.Log4j2; import org.springframework.http.HttpStatus; @@ -16,6 +12,9 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import javax.validation.Valid; +import java.util.Optional; + @RestController @RequiredArgsConstructor @Log4j2 @@ -59,8 +58,8 @@ public Mono> getTopicDetails( } @Override - public Mono>> getTopics(String clusterName, ServerWebExchange exchange) { - return Mono.just(ResponseEntity.ok(Flux.fromIterable(clusterService.getTopics(clusterName)))); + public Mono> getTopics(String clusterName, @Valid Integer page, @Valid Integer perPage, ServerWebExchange exchange) { + return Mono.just(ResponseEntity.ok(clusterService.getTopics(clusterName, Optional.ofNullable(page), Optional.ofNullable(perPage)))); } @Override diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClusterService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClusterService.java index 17da7d271f9..cc49a6d2bcf 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClusterService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClusterService.java @@ -19,12 +19,14 @@ import reactor.core.publisher.Mono; import java.util.*; +import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.Stream; @Service @RequiredArgsConstructor public class ClusterService { + private static final Integer DEFAULT_PAGE_SIZE = 20; private final ClustersStorage clustersStorage; private final ClusterMapper clusterMapper; @@ -62,14 +64,22 @@ public Mono getClusterMetrics(String name) { } - public List getTopics(String name) { - return clustersStorage.getClusterByName(name) - .map(c -> - c.getTopics().values().stream() + public TopicsResponse getTopics(String name, Optional page, Optional nullablePerPage) { + Predicate positiveInt = i -> i > 0; + int perPage = nullablePerPage.filter(positiveInt).orElse(DEFAULT_PAGE_SIZE); + var topicsToSkip = (page.filter(positiveInt).orElse(1) - 1) * perPage; + var cluster = clustersStorage.getClusterByName(name).orElseThrow(() -> new NotFoundException("No such cluster")); + var totalPages = (cluster.getTopics().size() / perPage) + (cluster.getTopics().size() % perPage == 0 ? 0 : 1); + return new TopicsResponse() + .pageCount(totalPages) + .topics( + cluster.getTopics().values().stream() + .sorted(Comparator.comparing(InternalTopic::getName)) + .skip(topicsToSkip) + .limit(perPage) .map(clusterMapper::toTopic) - .sorted(Comparator.comparing(Topic::getName)) .collect(Collectors.toList()) - ).orElse(Collections.emptyList()); + ); } public Optional getTopicDetails(String name, String topicName) { diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/ClusterServiceTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/ClusterServiceTest.java new file mode 100644 index 00000000000..23643b4064c --- /dev/null +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/ClusterServiceTest.java @@ -0,0 +1,101 @@ +package com.provectus.kafka.ui.service; + +import com.provectus.kafka.ui.mapper.ClusterMapper; +import com.provectus.kafka.ui.model.InternalTopic; +import com.provectus.kafka.ui.model.KafkaCluster; +import com.provectus.kafka.ui.model.Topic; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mapstruct.factory.Mappers; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.Spy; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.UUID; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class ClusterServiceTest { + @InjectMocks + private ClusterService clusterService; + + @Mock + private ClustersStorage clustersStorage; + @Spy + private final ClusterMapper clusterMapper = Mappers.getMapper(ClusterMapper.class); + + @Test + public void shouldListFirst20Topics() { + var topicName = UUID.randomUUID().toString(); + + when(clustersStorage.getClusterByName(topicName)) + .thenReturn(Optional.of(KafkaCluster.builder() + .topics( + IntStream.rangeClosed(1, 100).boxed() + .map(Objects::toString) + .collect(Collectors.toMap(Function.identity(), e -> InternalTopic.builder() + .partitions(Map.of()) + .name(e) + .build())) + ) + .build())); + + var topics = clusterService.getTopics(topicName, Optional.empty(), Optional.empty()); + assertThat(topics.getPageCount()).isEqualTo(5); + assertThat(topics.getTopics()).hasSize(20); + assertThat(topics.getTopics()).map(Topic::getName).isSorted(); + } + + @Test + public void shouldCalculateCorrectPageCountForNonDivisiblePageSize() { + var topicName = UUID.randomUUID().toString(); + + when(clustersStorage.getClusterByName(topicName)) + .thenReturn(Optional.of(KafkaCluster.builder() + .topics( + IntStream.rangeClosed(1, 100).boxed() + .map(Objects::toString) + .collect(Collectors.toMap(Function.identity(), e -> InternalTopic.builder() + .partitions(Map.of()) + .name(e) + .build())) + ) + .build())); + + var topics = clusterService.getTopics(topicName, Optional.of(4), Optional.of(33)); + assertThat(topics.getPageCount()).isEqualTo(4); + assertThat(topics.getTopics()).hasSize(1) + .first().extracting(Topic::getName).isEqualTo("99"); + } + + @Test + public void shouldCorrectlyHandleNonPositivePageNumberAndPageSize() { + var topicName = UUID.randomUUID().toString(); + + when(clustersStorage.getClusterByName(topicName)) + .thenReturn(Optional.of(KafkaCluster.builder() + .topics( + IntStream.rangeClosed(1, 100).boxed() + .map(Objects::toString) + .collect(Collectors.toMap(Function.identity(), e -> InternalTopic.builder() + .partitions(Map.of()) + .name(e) + .build())) + ) + .build())); + + var topics = clusterService.getTopics(topicName, Optional.of(0), Optional.of(-1)); + assertThat(topics.getPageCount()).isEqualTo(5); + assertThat(topics.getTopics()).hasSize(20); + assertThat(topics.getTopics()).map(Topic::getName).isSorted(); + } +} diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml index e20fb81d90a..80c34ce4c43 100644 --- a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml +++ b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml @@ -130,15 +130,23 @@ paths: required: true schema: type: string + - name: page + in: query + required: false + schema: + type: integer + - name: perPage + in: query + required: false + schema: + type: integer responses: 200: description: OK content: application/json: schema: - type: array - items: - $ref: '#/components/schemas/Topic' + $ref: '#/components/schemas/TopicsResponse' post: tags: - Topics @@ -1140,6 +1148,16 @@ components: items: $ref: '#/components/schemas/Metric' + TopicsResponse: + type: object + properties: + pageCount: + type: integer + topics: + type: array + items: + $ref: '#/components/schemas/Topic' + Topic: type: object properties: diff --git a/kafka-ui-react-app/src/redux/actions/thunks.ts b/kafka-ui-react-app/src/redux/actions/thunks.ts index 23171cc1d46..1c49beeb04a 100644 --- a/kafka-ui-react-app/src/redux/actions/thunks.ts +++ b/kafka-ui-react-app/src/redux/actions/thunks.ts @@ -104,7 +104,7 @@ export const fetchTopicsList = ( dispatch(actions.fetchTopicsListAction.request()); try { const topics = await topicsApiClient.getTopics({ clusterName }); - dispatch(actions.fetchTopicsListAction.success(topics)); + dispatch(actions.fetchTopicsListAction.success(topics.topics || [])); } catch (e) { dispatch(actions.fetchTopicsListAction.failure()); } diff --git a/pom.xml b/pom.xml index 62106bc7a20..c0741d2007b 100644 --- a/pom.xml +++ b/pom.xml @@ -36,6 +36,8 @@ 2.2 1.15.1 5.4.0 + 2.21.0 + 3.19.0 ..//kafka-ui-react-app/src/generated-sources