Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#188 added pagination for get topics api #249

Merged
merged 4 commits into from
Mar 15, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions kafka-ui-api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,25 @@
<version>${junit-jupiter-engine.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>${mockito.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-junit-jupiter</artifactId>
<version>${mockito.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>${assertj.version}</version>
<scope>test</scope>
</dependency>

</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@
import reactor.core.publisher.Mono;

import java.util.*;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;

@Service
@RequiredArgsConstructor
public class ClusterService {
private static final Integer DEFAULT_PAGE_SIZE = 20;

private final ClustersStorage clustersStorage;
private final ClusterMapper clusterMapper;
Expand Down Expand Up @@ -64,14 +66,22 @@ public Mono<ClusterMetrics> getClusterMetrics(String name) {
}


public List<Topic> getTopics(String name) {
return clustersStorage.getClusterByName(name)
.map(c ->
c.getTopics().values().stream()
public TopicsResponse getTopics(String name, Optional<Integer> page, Optional<Integer> nullablePageSize) {
Predicate<Integer> positiveInt = i -> i > 0;
int pageSize = nullablePageSize.filter(positiveInt).orElse(DEFAULT_PAGE_SIZE);
var topicsToSkip = (page.filter(positiveInt).orElse(1) - 1) * pageSize;
var cluster = clustersStorage.getClusterByName(name).orElseThrow(() -> new NotFoundException("No such cluster"));
var totalPages = (cluster.getTopics().size() / pageSize) + (cluster.getTopics().size() % pageSize == 0 ? 0 : 1);
return new TopicsResponse()
.pageCount(totalPages)
.topics(
cluster.getTopics().values().stream()
.sorted(Comparator.comparing(InternalTopic::getName))
.skip(topicsToSkip)
.limit(pageSize)
.map(clusterMapper::toTopic)
.sorted(Comparator.comparing(Topic::getName))
.collect(Collectors.toList())
).orElse(Collections.emptyList());
);
}

public Optional<TopicDetails> getTopicDetails(String name, String topicName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import javax.validation.Valid;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.function.Function;

@RestController
Expand Down Expand Up @@ -55,8 +56,8 @@ public Mono<ResponseEntity<ClusterStats>> getClusterStats(String clusterName, Se
}

@Override
public Mono<ResponseEntity<Flux<Topic>>> getTopics(String clusterName, ServerWebExchange exchange) {
return Mono.just(ResponseEntity.ok(Flux.fromIterable(clusterService.getTopics(clusterName))));
public Mono<ResponseEntity<TopicsResponse>> getTopics(String clusterName, @Valid Integer page, @Valid Integer pageSize, ServerWebExchange exchange) {
return Mono.just(ResponseEntity.ok(clusterService.getTopics(clusterName, Optional.ofNullable(page), Optional.ofNullable(pageSize))));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package com.provectus.kafka.ui.cluster.service;

import com.provectus.kafka.ui.cluster.mapper.ClusterMapper;
import com.provectus.kafka.ui.cluster.model.ClustersStorage;
import com.provectus.kafka.ui.cluster.model.InternalTopic;
import com.provectus.kafka.ui.cluster.model.KafkaCluster;
import com.provectus.kafka.ui.kafka.KafkaService;
import com.provectus.kafka.ui.model.Topic;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mapstruct.factory.Mappers;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.junit.jupiter.MockitoExtension;

import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
class ClusterServiceTest {
@InjectMocks
private ClusterService clusterService;

@Mock
private ClustersStorage clustersStorage;
@Spy
private final ClusterMapper clusterMapper = Mappers.getMapper(ClusterMapper.class);
@Mock
private KafkaService kafkaService;
@Mock
private ConsumingService consumingService;

@Test
public void shouldListFirst20Topics() {
var topicName = UUID.randomUUID().toString();

when(clustersStorage.getClusterByName(topicName))
.thenReturn(Optional.of(KafkaCluster.builder()
.topics(
IntStream.rangeClosed(1, 100).boxed()
.map(Objects::toString)
.collect(Collectors.toMap(Function.identity(), e -> InternalTopic.builder()
.partitions(Map.of())
.name(e)
.build()))
)
.build()));

var topics = clusterService.getTopics(topicName, Optional.empty(), Optional.empty());
assertThat(topics.getPageCount()).isEqualTo(5);
assertThat(topics.getTopics()).hasSize(20);
assertThat(topics.getTopics()).map(Topic::getName).isSorted();
}

@Test
public void shouldCalculateCorrectPageCountForNonDivisiblePageSize() {
var topicName = UUID.randomUUID().toString();

when(clustersStorage.getClusterByName(topicName))
.thenReturn(Optional.of(KafkaCluster.builder()
.topics(
IntStream.rangeClosed(1, 100).boxed()
.map(Objects::toString)
.collect(Collectors.toMap(Function.identity(), e -> InternalTopic.builder()
.partitions(Map.of())
.name(e)
.build()))
)
.build()));

var topics = clusterService.getTopics(topicName, Optional.of(4), Optional.of(33));
assertThat(topics.getPageCount()).isEqualTo(4);
assertThat(topics.getTopics()).hasSize(1)
.first().extracting(Topic::getName).isEqualTo("99");
}

@Test
public void shouldCorrectlyHandleNonPositivePageNumberAndPageSize() {
var topicName = UUID.randomUUID().toString();

when(clustersStorage.getClusterByName(topicName))
.thenReturn(Optional.of(KafkaCluster.builder()
.topics(
IntStream.rangeClosed(1, 100).boxed()
.map(Objects::toString)
.collect(Collectors.toMap(Function.identity(), e -> InternalTopic.builder()
.partitions(Map.of())
.name(e)
.build()))
)
.build()));

var topics = clusterService.getTopics(topicName, Optional.of(0), Optional.of(-1));
assertThat(topics.getPageCount()).isEqualTo(5);
assertThat(topics.getTopics()).hasSize(20);
assertThat(topics.getTopics()).map(Topic::getName).isSorted();
}
}
24 changes: 21 additions & 3 deletions kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -130,15 +130,23 @@ paths:
required: true
schema:
type: string
- name: page
in: query
required: false
schema:
type: integer
- name: pageSize
in: query
required: false
schema:
type: integer
responses:
200:
description: OK
content:
application/json:
schema:
type: array
items:
$ref: '#/components/schemas/Topic'
$ref: '#/components/schemas/TopicsResponse'
post:
tags:
- /api/clusters
Expand Down Expand Up @@ -1112,6 +1120,16 @@ components:
items:
$ref: '#/components/schemas/Metric'

TopicsResponse:
type: object
properties:
pageCount:
type: integer
topics:
type: array
items:
$ref: '#/components/schemas/Topic'

Topic:
type: object
properties:
Expand Down
2 changes: 1 addition & 1 deletion kafka-ui-react-app/src/redux/actions/thunks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ export const fetchTopicsList = (
dispatch(actions.fetchTopicsListAction.request());
try {
const topics = await apiClient.getTopics({ clusterName });
dispatch(actions.fetchTopicsListAction.success(topics));
dispatch(actions.fetchTopicsListAction.success(topics.topics || []));
} catch (e) {
dispatch(actions.fetchTopicsListAction.failure());
}
Expand Down
2 changes: 2 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
<apache.commons.version>2.2</apache.commons.version>
<test.containers.version>1.15.1</test.containers.version>
<junit-jupiter-engine.version>5.4.0</junit-jupiter-engine.version>
<mockito.version>2.21.0</mockito.version>
<assertj.version>3.19.0</assertj.version>

<frontend-generated-sources-directory>..//kafka-ui-react-app/src/generated-sources</frontend-generated-sources-directory>
</properties>
Expand Down