diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/ConsumerPosition.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/ConsumerPosition.java new file mode 100644 index 00000000000..36e934f8263 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/ConsumerPosition.java @@ -0,0 +1,15 @@ +package com.provectus.kafka.ui.cluster.model; + +import lombok.Value; + +import java.util.Map; + +import com.provectus.kafka.ui.model.SeekType; + +@Value +public class ConsumerPosition { + + private SeekType seekType; + private Map seekTo; + +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java index 13067af3367..bad9625f12f 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java @@ -2,6 +2,7 @@ import com.provectus.kafka.ui.cluster.mapper.ClusterMapper; import com.provectus.kafka.ui.cluster.model.ClustersStorage; +import com.provectus.kafka.ui.cluster.model.ConsumerPosition; import com.provectus.kafka.ui.cluster.model.KafkaCluster; import com.provectus.kafka.ui.cluster.util.ClusterUtil; import com.provectus.kafka.ui.kafka.KafkaService; @@ -61,7 +62,7 @@ public Optional getTopicDetails(String name, String topicName) { .map(t -> t.get(topicName)) .map(clusterMapper::toTopicDetails); } - + public Optional> getTopicConfigs(String name, String topicName) { return clustersStorage.getClusterByName(name) .map(KafkaCluster::getTopics) @@ -149,9 +150,9 @@ private Mono updateCluster(T topic, String clusterName, KafkaCluster clus }); } - public Flux getMessages(String clusterName, String topicName, Integer partition, Long offset, OffsetDateTime timestamp) { + public Flux getMessages(String clusterName, String topicName, ConsumerPosition consumerPosition, Integer limit) { return clustersStorage.getClusterByName(clusterName) - .map(c -> consumingService.loadMessages(c, topicName)) + .map(c -> consumingService.loadMessages(c, topicName, consumerPosition, limit)) .orElse(Flux.empty()); } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ConsumingService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ConsumingService.java index 0fcf15f23b1..1bf345ca0a5 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ConsumingService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ConsumingService.java @@ -5,6 +5,7 @@ import java.time.Duration; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; @@ -15,10 +16,11 @@ import org.apache.kafka.common.utils.Bytes; import org.springframework.stereotype.Service; -import com.provectus.kafka.ui.cluster.model.InternalTopic; +import com.provectus.kafka.ui.cluster.model.ConsumerPosition; import com.provectus.kafka.ui.cluster.model.KafkaCluster; import com.provectus.kafka.ui.cluster.util.ClusterUtil; import com.provectus.kafka.ui.kafka.KafkaService; +import com.provectus.kafka.ui.model.SeekType; import com.provectus.kafka.ui.model.TopicMessage; import reactor.core.publisher.Flux; @@ -30,18 +32,21 @@ @RequiredArgsConstructor public class ConsumingService { - - // TODO: make this configurable - private static final int BATCH_SIZE = 20; + private static final int MAX_RECORD_LIMIT = 100; + private static final int DEFAULT_RECORD_LIMIT = 20; + private static final int MAX_POLLS_COUNT = 30; private final KafkaService kafkaService; - public Flux loadMessages(KafkaCluster cluster, String topic) { - RecordEmitter emitter = new RecordEmitter(kafkaService, cluster, topic); + public Flux loadMessages(KafkaCluster cluster, String topic, ConsumerPosition consumerPosition, Integer limit) { + int recordsLimit = Optional.ofNullable(limit) + .map(s -> Math.min(s, MAX_RECORD_LIMIT)) + .orElse(DEFAULT_RECORD_LIMIT); + RecordEmitter emitter = new RecordEmitter(kafkaService, cluster, topic, consumerPosition); return Flux.create(emitter::emit) .subscribeOn(Schedulers.boundedElastic()) .map(ClusterUtil::mapToTopicMessage) - .limitRequest(BATCH_SIZE); + .limitRequest(recordsLimit); } @RequiredArgsConstructor @@ -52,11 +57,14 @@ private static class RecordEmitter { private final KafkaService kafkaService; private final KafkaCluster cluster; private final String topic; + private final ConsumerPosition consumerPosition; public void emit(FluxSink> sink) { try (KafkaConsumer consumer = kafkaService.createConsumer(cluster)) { - assignPartitions(consumer, topic); - while (!sink.isCancelled()) { + assignPartitions(consumer); + seekOffsets(consumer); + int pollsCount = 0; + while (!sink.isCancelled() || ++pollsCount > MAX_POLLS_COUNT) { ConsumerRecords records = consumer.poll(POLL_TIMEOUT_MS); log.info("{} records polled", records.count()); records.iterator() @@ -68,16 +76,50 @@ public void emit(FluxSink> sink) { } } - private void assignPartitions(KafkaConsumer consumer, String topicName) { - List partitions = Optional.ofNullable(cluster.getTopics().get(topicName)) - .orElseThrow(() -> new IllegalArgumentException("Unknown topic: " + topicName)) + private List getRequestedPartitions() { + Map partitionPositions = consumerPosition.getSeekTo(); + + return Optional.ofNullable(cluster.getTopics().get(topic)) + .orElseThrow(() -> new IllegalArgumentException("Unknown topic: " + topic)) .getPartitions().stream() - .map(partitionInfo -> new TopicPartition(topicName, partitionInfo.getPartition())) + .filter(internalPartition -> partitionPositions.isEmpty() || partitionPositions.containsKey(internalPartition.getPartition())) + .map(partitionInfo -> new TopicPartition(topic, partitionInfo.getPartition())) .collect(Collectors.toList()); + } + + private void assignPartitions(KafkaConsumer consumer) { + List partitions = getRequestedPartitions(); consumer.assign(partitions); - // TODO: seek to requested offsets - consumer.seekToBeginning(partitions); + } + + private void seekOffsets(KafkaConsumer consumer) { + SeekType seekType = consumerPosition.getSeekType(); + switch (seekType) { + case OFFSET: + consumerPosition.getSeekTo().forEach((partition, offset) -> { + TopicPartition topicPartition = new TopicPartition(topic, partition); + consumer.seek(topicPartition, offset); + }); + break; + case TIMESTAMP: + Map timestampsToSearch = consumerPosition.getSeekTo().entrySet().stream() + .collect(Collectors.toMap( + partitionPosition -> new TopicPartition(topic, partitionPosition.getKey()), + Map.Entry::getValue + )); + consumer.offsetsForTimes(timestampsToSearch) + .forEach((topicPartition, offsetAndTimestamp) -> + consumer.seek(topicPartition, offsetAndTimestamp.offset()) + ); + break; + case BEGINNING: + List partitions = getRequestedPartitions(); + consumer.seekToBeginning(partitions); + break; + default: + throw new IllegalArgumentException("Unknown seekType: " + seekType); + } } } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java index 439e23b4cec..0b8d9396c69 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java @@ -1,9 +1,12 @@ package com.provectus.kafka.ui.rest; import com.provectus.kafka.ui.api.ApiClustersApi; +import com.provectus.kafka.ui.cluster.model.ConsumerPosition; import com.provectus.kafka.ui.cluster.service.ClusterService; import com.provectus.kafka.ui.model.*; import lombok.RequiredArgsConstructor; + +import org.apache.commons.lang3.tuple.Pair; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.RestController; @@ -11,8 +14,11 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import java.util.Collections; +import java.util.List; +import java.util.function.Function; + import javax.validation.Valid; -import java.time.OffsetDateTime; @RestController @RequiredArgsConstructor @@ -59,10 +65,9 @@ public Mono>> getTopicConfigs(String clusterNam } @Override - public Mono>> getTopicMessages(String clusterName, String topicName, @Valid Integer partition, @Valid Long offset, @Valid OffsetDateTime timestamp, ServerWebExchange exchange) { - return Mono.just( - ResponseEntity.ok(clusterService.getMessages(clusterName, topicName, partition, offset, timestamp)) - ); + public Mono>> getTopicMessages(String clusterName, String topicName, @Valid SeekType seekType, @Valid List seekTo, @Valid Integer limit, ServerWebExchange exchange) { + return parseConsumerPosition(seekType, seekTo) + .map(consumerPosition -> ResponseEntity.ok(clusterService.getMessages(clusterName, topicName, consumerPosition, limit))); } @Override @@ -94,4 +99,20 @@ public Mono> getConsumerGroup(String cluste public Mono> updateTopic(String clusterId, String topicName, @Valid Mono topicFormData, ServerWebExchange exchange) { return clusterService.updateTopic(clusterId, topicName, topicFormData).map(ResponseEntity::ok); } + + private Mono parseConsumerPosition(SeekType seekType, List seekTo) { + return Mono.justOrEmpty(seekTo) + .defaultIfEmpty(Collections.emptyList()) + .flatMapIterable(Function.identity()) + .map(p -> { + String[] splited = p.split("::"); + if (splited.length != 2) { + throw new IllegalArgumentException("Wrong seekTo argument format. See API docs for details"); + } + + return Pair.of(Integer.parseInt(splited[0]), Long.parseLong(splited[1])); + }) + .collectMap(Pair::getKey, Pair::getValue) + .map(positions -> new ConsumerPosition(seekType != null ? seekType : SeekType.BEGINNING, positions)); + } } diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml index 7bb211376c6..57cad5dddfe 100644 --- a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml +++ b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml @@ -214,20 +214,21 @@ paths: required: true schema: type: string - - name: partition + - name: seekType in: query schema: - type: integer - - name: offset + $ref: "#/components/schemas/SeekType" + - name: seekTo in: query schema: - type: integer - format: int64 - - name: timestamp + type: array + items: + type: string + description: The format is [partition]::[offset] for specifying offsets or [partition]::[timstamp in millis] for specifying timestamps + - name: limit in: query schema: - type: string - format: date-time + type: integer responses: 200: description: OK @@ -463,6 +464,13 @@ components: - offset - timestamp + SeekType: + type: string + enum: + - BEGINNING + - OFFSET + - TIMESTAMP + TopicPartitionDto: type: object properties: