From 20e4e94b5f35eb7b0f3b71528aaa1fea8dffcfb5 Mon Sep 17 00:00:00 2001 From: Ramazan Yapparov Date: Mon, 15 Mar 2021 10:24:22 +0300 Subject: [PATCH 1/3] added pagination for get topics api --- kafka-ui-api/pom.xml | 19 ++++ .../ui/cluster/service/ClusterService.java | 22 +++- .../kafka/ui/rest/MetricsRestController.java | 5 +- .../cluster/service/ClusterServiceTest.java | 107 ++++++++++++++++++ .../main/resources/swagger/kafka-ui-api.yaml | 24 +++- .../src/redux/actions/thunks.ts | 3 +- pom.xml | 2 + 7 files changed, 170 insertions(+), 12 deletions(-) create mode 100644 kafka-ui-api/src/test/java/com/provectus/kafka/ui/cluster/service/ClusterServiceTest.java diff --git a/kafka-ui-api/pom.xml b/kafka-ui-api/pom.xml index 3f8d23a8098..443ce32ca26 100644 --- a/kafka-ui-api/pom.xml +++ b/kafka-ui-api/pom.xml @@ -147,6 +147,25 @@ ${junit-jupiter-engine.version} test + + org.mockito + mockito-core + ${mockito.version} + test + + + org.mockito + mockito-junit-jupiter + ${mockito.version} + test + + + org.assertj + assertj-core + ${assertj.version} + test + + diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java index ca38ff6185e..26a36ae4701 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java @@ -21,12 +21,14 @@ import reactor.core.publisher.Mono; import java.util.*; +import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.Stream; @Service @RequiredArgsConstructor public class ClusterService { + private static final Integer DEFAULT_PAGE_SIZE = 20; private final ClustersStorage clustersStorage; private final ClusterMapper clusterMapper; @@ -64,14 +66,22 @@ public Mono getClusterMetrics(String name) { } - public List getTopics(String name) { - return clustersStorage.getClusterByName(name) - .map(c -> - c.getTopics().values().stream() + public TopicsResponse getTopics(String name, Optional page, Optional nullablePageSize) { + Predicate positiveInt = i -> i > 0; + int pageSize = nullablePageSize.filter(positiveInt).orElse(DEFAULT_PAGE_SIZE); + var topicsToSkip = (page.filter(positiveInt).orElse(1) - 1) * pageSize; + var cluster = clustersStorage.getClusterByName(name).orElseThrow(() -> new NotFoundException("No such cluster")); + var totalPages = (cluster.getTopics().size() / pageSize) + (cluster.getTopics().size() % pageSize == 0 ? 0 : 1); + return new TopicsResponse() + .pageCount(totalPages) + .topics( + cluster.getTopics().values().stream() + .sorted(Comparator.comparing(InternalTopic::getName)) + .skip(topicsToSkip) + .limit(pageSize) .map(clusterMapper::toTopic) - .sorted(Comparator.comparing(Topic::getName)) .collect(Collectors.toList()) - ).orElse(Collections.emptyList()); + ); } public Optional getTopicDetails(String name, String topicName) { diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java index 9b86451b8fe..ff83d155c9f 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java @@ -18,6 +18,7 @@ import javax.validation.Valid; import java.util.Collections; import java.util.List; +import java.util.Optional; import java.util.function.Function; @RestController @@ -55,8 +56,8 @@ public Mono> getClusterStats(String clusterName, Se } @Override - public Mono>> getTopics(String clusterName, ServerWebExchange exchange) { - return Mono.just(ResponseEntity.ok(Flux.fromIterable(clusterService.getTopics(clusterName)))); + public Mono> getTopics(String clusterName, @Valid Integer page, @Valid Integer pageSize, ServerWebExchange exchange) { + return Mono.just(ResponseEntity.ok(clusterService.getTopics(clusterName, Optional.ofNullable(page), Optional.ofNullable(pageSize)))); } @Override diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/cluster/service/ClusterServiceTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/cluster/service/ClusterServiceTest.java new file mode 100644 index 00000000000..97cc57f4f4e --- /dev/null +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/cluster/service/ClusterServiceTest.java @@ -0,0 +1,107 @@ +package com.provectus.kafka.ui.cluster.service; + +import com.provectus.kafka.ui.cluster.mapper.ClusterMapper; +import com.provectus.kafka.ui.cluster.model.ClustersStorage; +import com.provectus.kafka.ui.cluster.model.InternalTopic; +import com.provectus.kafka.ui.cluster.model.KafkaCluster; +import com.provectus.kafka.ui.kafka.KafkaService; +import com.provectus.kafka.ui.model.Topic; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mapstruct.factory.Mappers; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.Spy; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.UUID; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class ClusterServiceTest { + @InjectMocks + private ClusterService clusterService; + + @Mock + private ClustersStorage clustersStorage; + @Spy + private final ClusterMapper clusterMapper = Mappers.getMapper(ClusterMapper.class); + @Mock + private KafkaService kafkaService; + @Mock + private ConsumingService consumingService; + + @Test + public void shouldListFirst20Topics() { + var topicName = UUID.randomUUID().toString(); + + when(clustersStorage.getClusterByName(topicName)) + .thenReturn(Optional.of(KafkaCluster.builder() + .topics( + IntStream.rangeClosed(1, 100).boxed() + .map(Objects::toString) + .collect(Collectors.toMap(Function.identity(), e -> InternalTopic.builder() + .partitions(Map.of()) + .name(e) + .build())) + ) + .build())); + + var topics = clusterService.getTopics(topicName, Optional.empty(), Optional.empty()); + assertThat(topics.getPageCount()).isEqualTo(5); + assertThat(topics.getTopics()).hasSize(20); + assertThat(topics.getTopics()).map(Topic::getName).isSorted(); + } + + @Test + public void shouldCalculateCorrectPageCountForNonDivisiblePageSize() { + var topicName = UUID.randomUUID().toString(); + + when(clustersStorage.getClusterByName(topicName)) + .thenReturn(Optional.of(KafkaCluster.builder() + .topics( + IntStream.rangeClosed(1, 100).boxed() + .map(Objects::toString) + .collect(Collectors.toMap(Function.identity(), e -> InternalTopic.builder() + .partitions(Map.of()) + .name(e) + .build())) + ) + .build())); + + var topics = clusterService.getTopics(topicName, Optional.of(4), Optional.of(33)); + assertThat(topics.getPageCount()).isEqualTo(4); + assertThat(topics.getTopics()).hasSize(1) + .first().extracting(Topic::getName).isEqualTo("99"); + } + + @Test + public void shouldCorrectlyHandleNonPositivePageNumberAndPageSize() { + var topicName = UUID.randomUUID().toString(); + + when(clustersStorage.getClusterByName(topicName)) + .thenReturn(Optional.of(KafkaCluster.builder() + .topics( + IntStream.rangeClosed(1, 100).boxed() + .map(Objects::toString) + .collect(Collectors.toMap(Function.identity(), e -> InternalTopic.builder() + .partitions(Map.of()) + .name(e) + .build())) + ) + .build())); + + var topics = clusterService.getTopics(topicName, Optional.of(0), Optional.of(-1)); + assertThat(topics.getPageCount()).isEqualTo(5); + assertThat(topics.getTopics()).hasSize(20); + assertThat(topics.getTopics()).map(Topic::getName).isSorted(); + } +} diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml index f50695abfc1..6b148c86664 100644 --- a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml +++ b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml @@ -130,15 +130,23 @@ paths: required: true schema: type: string + - name: page + in: query + required: false + schema: + type: integer + - name: pageSize + in: query + required: false + schema: + type: integer responses: 200: description: OK content: application/json: schema: - type: array - items: - $ref: '#/components/schemas/Topic' + $ref: '#/components/schemas/TopicsResponse' post: tags: - /api/clusters @@ -1112,6 +1120,16 @@ components: items: $ref: '#/components/schemas/Metric' + TopicsResponse: + type: object + properties: + pageCount: + type: integer + topics: + type: array + items: + $ref: '#/components/schemas/Topic' + Topic: type: object properties: diff --git a/kafka-ui-react-app/src/redux/actions/thunks.ts b/kafka-ui-react-app/src/redux/actions/thunks.ts index 36e1ccaeb0b..fb2a4d7050e 100644 --- a/kafka-ui-react-app/src/redux/actions/thunks.ts +++ b/kafka-ui-react-app/src/redux/actions/thunks.ts @@ -94,7 +94,8 @@ export const fetchTopicsList = ( dispatch(actions.fetchTopicsListAction.request()); try { const topics = await apiClient.getTopics({ clusterName }); - dispatch(actions.fetchTopicsListAction.success(topics)); + // todo: fix needed from FE person + // dispatch(actions.fetchTopicsListAction.success(topics)); } catch (e) { dispatch(actions.fetchTopicsListAction.failure()); } diff --git a/pom.xml b/pom.xml index 62106bc7a20..c0741d2007b 100644 --- a/pom.xml +++ b/pom.xml @@ -36,6 +36,8 @@ 2.2 1.15.1 5.4.0 + 2.21.0 + 3.19.0 ..//kafka-ui-react-app/src/generated-sources From 4d7eb9bafae6352e28a8b1f6442dac8f33157125 Mon Sep 17 00:00:00 2001 From: Ramazan Yapparov Date: Mon, 15 Mar 2021 11:06:34 +0300 Subject: [PATCH 2/3] frontend fix --- kafka-ui-react-app/src/redux/actions/thunks.ts | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/kafka-ui-react-app/src/redux/actions/thunks.ts b/kafka-ui-react-app/src/redux/actions/thunks.ts index fb2a4d7050e..ba8e7da46b2 100644 --- a/kafka-ui-react-app/src/redux/actions/thunks.ts +++ b/kafka-ui-react-app/src/redux/actions/thunks.ts @@ -94,8 +94,7 @@ export const fetchTopicsList = ( dispatch(actions.fetchTopicsListAction.request()); try { const topics = await apiClient.getTopics({ clusterName }); - // todo: fix needed from FE person - // dispatch(actions.fetchTopicsListAction.success(topics)); + dispatch(actions.fetchTopicsListAction.success(topics.topics || [])); } catch (e) { dispatch(actions.fetchTopicsListAction.failure()); } From 774c0a75b2364965971a8ab2a05a609265bd9a23 Mon Sep 17 00:00:00 2001 From: Ramazan Yapparov Date: Mon, 15 Mar 2021 19:35:23 +0300 Subject: [PATCH 3/3] - fixed merge conflicts - renamed pageSize to perPage --- .../kafka/ui/controller/TopicsController.java | 13 ++++++------- .../provectus/kafka/ui/service/ClusterService.java | 10 +++++----- .../kafka/ui/service/ClusterServiceTest.java | 14 ++++---------- .../src/main/resources/swagger/kafka-ui-api.yaml | 2 +- 4 files changed, 16 insertions(+), 23 deletions(-) diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java index f4a05fa1db7..034ddd9011f 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java @@ -1,12 +1,8 @@ package com.provectus.kafka.ui.controller; import com.provectus.kafka.ui.api.TopicsApi; +import com.provectus.kafka.ui.model.*; import com.provectus.kafka.ui.service.ClusterService; -import com.provectus.kafka.ui.model.Topic; -import com.provectus.kafka.ui.model.TopicConfig; -import com.provectus.kafka.ui.model.TopicDetails; -import com.provectus.kafka.ui.model.TopicFormData; -import javax.validation.Valid; import lombok.RequiredArgsConstructor; import lombok.extern.log4j.Log4j2; import org.springframework.http.HttpStatus; @@ -16,6 +12,9 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import javax.validation.Valid; +import java.util.Optional; + @RestController @RequiredArgsConstructor @Log4j2 @@ -59,8 +58,8 @@ public Mono> getTopicDetails( } @Override - public Mono>> getTopics(String clusterName, ServerWebExchange exchange) { - return Mono.just(ResponseEntity.ok(Flux.fromIterable(clusterService.getTopics(clusterName)))); + public Mono> getTopics(String clusterName, @Valid Integer page, @Valid Integer perPage, ServerWebExchange exchange) { + return Mono.just(ResponseEntity.ok(clusterService.getTopics(clusterName, Optional.ofNullable(page), Optional.ofNullable(perPage)))); } @Override diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClusterService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClusterService.java index 690aa662271..cc49a6d2bcf 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClusterService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClusterService.java @@ -64,19 +64,19 @@ public Mono getClusterMetrics(String name) { } - public TopicsResponse getTopics(String name, Optional page, Optional nullablePageSize) { + public TopicsResponse getTopics(String name, Optional page, Optional nullablePerPage) { Predicate positiveInt = i -> i > 0; - int pageSize = nullablePageSize.filter(positiveInt).orElse(DEFAULT_PAGE_SIZE); - var topicsToSkip = (page.filter(positiveInt).orElse(1) - 1) * pageSize; + int perPage = nullablePerPage.filter(positiveInt).orElse(DEFAULT_PAGE_SIZE); + var topicsToSkip = (page.filter(positiveInt).orElse(1) - 1) * perPage; var cluster = clustersStorage.getClusterByName(name).orElseThrow(() -> new NotFoundException("No such cluster")); - var totalPages = (cluster.getTopics().size() / pageSize) + (cluster.getTopics().size() % pageSize == 0 ? 0 : 1); + var totalPages = (cluster.getTopics().size() / perPage) + (cluster.getTopics().size() % perPage == 0 ? 0 : 1); return new TopicsResponse() .pageCount(totalPages) .topics( cluster.getTopics().values().stream() .sorted(Comparator.comparing(InternalTopic::getName)) .skip(topicsToSkip) - .limit(pageSize) + .limit(perPage) .map(clusterMapper::toTopic) .collect(Collectors.toList()) ); diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/ClusterServiceTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/ClusterServiceTest.java index 97cc57f4f4e..23643b4064c 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/ClusterServiceTest.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/ClusterServiceTest.java @@ -1,10 +1,8 @@ -package com.provectus.kafka.ui.cluster.service; +package com.provectus.kafka.ui.service; -import com.provectus.kafka.ui.cluster.mapper.ClusterMapper; -import com.provectus.kafka.ui.cluster.model.ClustersStorage; -import com.provectus.kafka.ui.cluster.model.InternalTopic; -import com.provectus.kafka.ui.cluster.model.KafkaCluster; -import com.provectus.kafka.ui.kafka.KafkaService; +import com.provectus.kafka.ui.mapper.ClusterMapper; +import com.provectus.kafka.ui.model.InternalTopic; +import com.provectus.kafka.ui.model.KafkaCluster; import com.provectus.kafka.ui.model.Topic; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -34,10 +32,6 @@ class ClusterServiceTest { private ClustersStorage clustersStorage; @Spy private final ClusterMapper clusterMapper = Mappers.getMapper(ClusterMapper.class); - @Mock - private KafkaService kafkaService; - @Mock - private ConsumingService consumingService; @Test public void shouldListFirst20Topics() { diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml index 5458f197fc0..80c34ce4c43 100644 --- a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml +++ b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml @@ -135,7 +135,7 @@ paths: required: false schema: type: integer - - name: pageSize + - name: perPage in: query required: false schema: