From 8fac60589935c7065d6fe2f522d45b77ee7b91b7 Mon Sep 17 00:00:00 2001 From: Anton Petrov Date: Thu, 28 May 2020 14:33:14 +0300 Subject: [PATCH 1/2] Seek offsets --- .../ui/cluster/model/ConsumerPosition.java | 15 +++++++ .../ui/cluster/model/PartitionPosition.java | 9 ++++ .../ui/cluster/service/ClusterService.java | 7 +-- .../ui/cluster/service/ConsumingService.java | 45 ++++++++++++++++--- .../kafka/ui/rest/MetricsRestController.java | 26 ++++++++--- .../main/resources/swagger/kafka-ui-api.yaml | 25 ++++++----- 6 files changed, 103 insertions(+), 24 deletions(-) create mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/ConsumerPosition.java create mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/PartitionPosition.java diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/ConsumerPosition.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/ConsumerPosition.java new file mode 100644 index 00000000000..d3f27196eae --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/ConsumerPosition.java @@ -0,0 +1,15 @@ +package com.provectus.kafka.ui.cluster.model; + +import lombok.Value; + +import java.util.List; + +import com.provectus.kafka.ui.model.PositionType; + +@Value +public class ConsumerPosition { + + private PositionType positionType; + private List partitionPositions; + +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/PartitionPosition.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/PartitionPosition.java new file mode 100644 index 00000000000..1515eb5c29c --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/PartitionPosition.java @@ -0,0 +1,9 @@ +package com.provectus.kafka.ui.cluster.model; + +import lombok.Value; + +@Value +public class PartitionPosition { + private int partition; + private Long position; +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java index 13067af3367..b5815b24cab 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java @@ -2,6 +2,7 @@ import com.provectus.kafka.ui.cluster.mapper.ClusterMapper; import com.provectus.kafka.ui.cluster.model.ClustersStorage; +import com.provectus.kafka.ui.cluster.model.ConsumerPosition; import com.provectus.kafka.ui.cluster.model.KafkaCluster; import com.provectus.kafka.ui.cluster.util.ClusterUtil; import com.provectus.kafka.ui.kafka.KafkaService; @@ -61,7 +62,7 @@ public Optional getTopicDetails(String name, String topicName) { .map(t -> t.get(topicName)) .map(clusterMapper::toTopicDetails); } - + public Optional> getTopicConfigs(String name, String topicName) { return clustersStorage.getClusterByName(name) .map(KafkaCluster::getTopics) @@ -149,9 +150,9 @@ private Mono updateCluster(T topic, String clusterName, KafkaCluster clus }); } - public Flux getMessages(String clusterName, String topicName, Integer partition, Long offset, OffsetDateTime timestamp) { + public Flux getMessages(String clusterName, String topicName, ConsumerPosition consumerPosition) { return clustersStorage.getClusterByName(clusterName) - .map(c -> consumingService.loadMessages(c, topicName)) + .map(c -> consumingService.loadMessages(c, topicName, consumerPosition)) .orElse(Flux.empty()); } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ConsumingService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ConsumingService.java index 0fcf15f23b1..4a6f13d2190 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ConsumingService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ConsumingService.java @@ -5,6 +5,7 @@ import java.time.Duration; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; @@ -15,10 +16,12 @@ import org.apache.kafka.common.utils.Bytes; import org.springframework.stereotype.Service; -import com.provectus.kafka.ui.cluster.model.InternalTopic; +import com.provectus.kafka.ui.cluster.model.ConsumerPosition; import com.provectus.kafka.ui.cluster.model.KafkaCluster; +import com.provectus.kafka.ui.cluster.model.PartitionPosition; import com.provectus.kafka.ui.cluster.util.ClusterUtil; import com.provectus.kafka.ui.kafka.KafkaService; +import com.provectus.kafka.ui.model.PositionType; import com.provectus.kafka.ui.model.TopicMessage; import reactor.core.publisher.Flux; @@ -30,14 +33,13 @@ @RequiredArgsConstructor public class ConsumingService { - // TODO: make this configurable private static final int BATCH_SIZE = 20; private final KafkaService kafkaService; - public Flux loadMessages(KafkaCluster cluster, String topic) { - RecordEmitter emitter = new RecordEmitter(kafkaService, cluster, topic); + public Flux loadMessages(KafkaCluster cluster, String topic, ConsumerPosition consumerPosition) { + RecordEmitter emitter = new RecordEmitter(kafkaService, cluster, topic, consumerPosition); return Flux.create(emitter::emit) .subscribeOn(Schedulers.boundedElastic()) .map(ClusterUtil::mapToTopicMessage) @@ -52,10 +54,12 @@ private static class RecordEmitter { private final KafkaService kafkaService; private final KafkaCluster cluster; private final String topic; + private final ConsumerPosition consumerPosition; public void emit(FluxSink> sink) { try (KafkaConsumer consumer = kafkaService.createConsumer(cluster)) { assignPartitions(consumer, topic); + seekOffsets(consumer, topic, consumerPosition); while (!sink.isCancelled()) { ConsumerRecords records = consumer.poll(POLL_TIMEOUT_MS); log.info("{} records polled", records.count()); @@ -76,8 +80,37 @@ private void assignPartitions(KafkaConsumer consumer, String topic .collect(Collectors.toList()); consumer.assign(partitions); - // TODO: seek to requested offsets - consumer.seekToBeginning(partitions); + } + + private void seekOffsets(KafkaConsumer consumer, String topic, ConsumerPosition consumerPosition) { + PositionType positionType = consumerPosition.getPositionType(); + switch (positionType) { + case OFFSET: + consumerPosition.getPartitionPositions().forEach(partitionPosition -> { + TopicPartition topicPartition = new TopicPartition(topic, partitionPosition.getPartition()); + consumer.seek(topicPartition, partitionPosition.getPosition()); + }); + break; + case TIMESTAMP: + Map timestampsToSearch = consumerPosition.getPartitionPositions().stream() + .collect(Collectors.toMap( + partitionPosition -> new TopicPartition(topic, partitionPosition.getPartition()), + PartitionPosition::getPosition + )); + consumer.offsetsForTimes(timestampsToSearch) + .forEach((topicPartition, offsetAndTimestamp) -> + consumer.seek(topicPartition, offsetAndTimestamp.offset()) + ); + break; + case LATEST: + List partitions = consumerPosition.getPartitionPositions().stream() + .map(partitionPosition -> new TopicPartition(topic, partitionPosition.getPartition())) + .collect(Collectors.toList()); + Map endOffsets = consumer.endOffsets(partitions); + endOffsets.forEach(((topicPartition, offset) -> consumer.seek(topicPartition, offset - BATCH_SIZE))); + default: + throw new IllegalArgumentException("Unknown type of positionType: " + positionType); + } } } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java index 439e23b4cec..9c75b61a9aa 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java @@ -1,6 +1,8 @@ package com.provectus.kafka.ui.rest; import com.provectus.kafka.ui.api.ApiClustersApi; +import com.provectus.kafka.ui.cluster.model.ConsumerPosition; +import com.provectus.kafka.ui.cluster.model.PartitionPosition; import com.provectus.kafka.ui.cluster.service.ClusterService; import com.provectus.kafka.ui.model.*; import lombok.RequiredArgsConstructor; @@ -11,8 +13,9 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import java.util.List; import javax.validation.Valid; -import java.time.OffsetDateTime; +import javax.validation.constraints.NotNull; @RestController @RequiredArgsConstructor @@ -59,10 +62,9 @@ public Mono>> getTopicConfigs(String clusterNam } @Override - public Mono>> getTopicMessages(String clusterName, String topicName, @Valid Integer partition, @Valid Long offset, @Valid OffsetDateTime timestamp, ServerWebExchange exchange) { - return Mono.just( - ResponseEntity.ok(clusterService.getMessages(clusterName, topicName, partition, offset, timestamp)) - ); + public Mono>> getTopicMessages(String clusterName, String topicName, @NotNull @Valid PositionType positionType, @Valid List position, ServerWebExchange exchange) { + return parseConsumerPosition(positionType, position) + .map(consumerPosition -> ResponseEntity.ok(clusterService.getMessages(clusterName, topicName, consumerPosition))); } @Override @@ -94,4 +96,18 @@ public Mono> getConsumerGroup(String cluste public Mono> updateTopic(String clusterId, String topicName, @Valid Mono topicFormData, ServerWebExchange exchange) { return clusterService.updateTopic(clusterId, topicName, topicFormData).map(ResponseEntity::ok); } + + private Mono parseConsumerPosition(PositionType positionType, List positionParam) { + return Flux.fromIterable(positionParam) + .map(p -> { + String[] splited = p.split("::"); + if (splited.length != 2) { + throw new IllegalArgumentException("Wrong position argument format. See API docs for details"); + } + + return new PartitionPosition(Integer.parseInt(splited[0]), Long.parseLong(splited[1])); + }) + .collectList() + .map(positions -> new ConsumerPosition(positionType, positions)); + } } diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml index 7bb211376c6..17a3c14af1e 100644 --- a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml +++ b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml @@ -214,20 +214,18 @@ paths: required: true schema: type: string - - name: partition - in: query - schema: - type: integer - - name: offset + - name: positionType in: query + required: true schema: - type: integer - format: int64 - - name: timestamp + $ref: "#/components/schemas/PositionType" + - name: position in: query schema: - type: string - format: date-time + type: array + items: + type: string + description: The format is [partition]::[offset] for specifying offsets or [partition]::[timstamp in millis] for specifying timestamps responses: 200: description: OK @@ -463,6 +461,13 @@ components: - offset - timestamp + PositionType: + type: string + enum: + - LATEST + - OFFSET + - TIMESTAMP + TopicPartitionDto: type: object properties: From 60913a2bc9b829df891b135f5938ed42b60c7176 Mon Sep 17 00:00:00 2001 From: Anton Petrov Date: Mon, 1 Jun 2020 17:37:08 +0300 Subject: [PATCH 2/2] Seek and limit --- .../ui/cluster/model/ConsumerPosition.java | 8 +-- .../ui/cluster/model/PartitionPosition.java | 9 --- .../ui/cluster/service/ClusterService.java | 4 +- .../ui/cluster/service/ConsumingService.java | 69 +++++++++++-------- .../kafka/ui/rest/MetricsRestController.java | 27 +++++--- .../main/resources/swagger/kafka-ui-api.yaml | 15 ++-- 6 files changed, 70 insertions(+), 62 deletions(-) delete mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/PartitionPosition.java diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/ConsumerPosition.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/ConsumerPosition.java index d3f27196eae..36e934f8263 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/ConsumerPosition.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/ConsumerPosition.java @@ -2,14 +2,14 @@ import lombok.Value; -import java.util.List; +import java.util.Map; -import com.provectus.kafka.ui.model.PositionType; +import com.provectus.kafka.ui.model.SeekType; @Value public class ConsumerPosition { - private PositionType positionType; - private List partitionPositions; + private SeekType seekType; + private Map seekTo; } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/PartitionPosition.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/PartitionPosition.java deleted file mode 100644 index 1515eb5c29c..00000000000 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/PartitionPosition.java +++ /dev/null @@ -1,9 +0,0 @@ -package com.provectus.kafka.ui.cluster.model; - -import lombok.Value; - -@Value -public class PartitionPosition { - private int partition; - private Long position; -} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java index b5815b24cab..bad9625f12f 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java @@ -150,9 +150,9 @@ private Mono updateCluster(T topic, String clusterName, KafkaCluster clus }); } - public Flux getMessages(String clusterName, String topicName, ConsumerPosition consumerPosition) { + public Flux getMessages(String clusterName, String topicName, ConsumerPosition consumerPosition, Integer limit) { return clustersStorage.getClusterByName(clusterName) - .map(c -> consumingService.loadMessages(c, topicName, consumerPosition)) + .map(c -> consumingService.loadMessages(c, topicName, consumerPosition, limit)) .orElse(Flux.empty()); } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ConsumingService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ConsumingService.java index 4a6f13d2190..1bf345ca0a5 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ConsumingService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ConsumingService.java @@ -18,10 +18,9 @@ import com.provectus.kafka.ui.cluster.model.ConsumerPosition; import com.provectus.kafka.ui.cluster.model.KafkaCluster; -import com.provectus.kafka.ui.cluster.model.PartitionPosition; import com.provectus.kafka.ui.cluster.util.ClusterUtil; import com.provectus.kafka.ui.kafka.KafkaService; -import com.provectus.kafka.ui.model.PositionType; +import com.provectus.kafka.ui.model.SeekType; import com.provectus.kafka.ui.model.TopicMessage; import reactor.core.publisher.Flux; @@ -33,17 +32,21 @@ @RequiredArgsConstructor public class ConsumingService { - // TODO: make this configurable - private static final int BATCH_SIZE = 20; + private static final int MAX_RECORD_LIMIT = 100; + private static final int DEFAULT_RECORD_LIMIT = 20; + private static final int MAX_POLLS_COUNT = 30; private final KafkaService kafkaService; - public Flux loadMessages(KafkaCluster cluster, String topic, ConsumerPosition consumerPosition) { + public Flux loadMessages(KafkaCluster cluster, String topic, ConsumerPosition consumerPosition, Integer limit) { + int recordsLimit = Optional.ofNullable(limit) + .map(s -> Math.min(s, MAX_RECORD_LIMIT)) + .orElse(DEFAULT_RECORD_LIMIT); RecordEmitter emitter = new RecordEmitter(kafkaService, cluster, topic, consumerPosition); return Flux.create(emitter::emit) .subscribeOn(Schedulers.boundedElastic()) .map(ClusterUtil::mapToTopicMessage) - .limitRequest(BATCH_SIZE); + .limitRequest(recordsLimit); } @RequiredArgsConstructor @@ -58,9 +61,10 @@ private static class RecordEmitter { public void emit(FluxSink> sink) { try (KafkaConsumer consumer = kafkaService.createConsumer(cluster)) { - assignPartitions(consumer, topic); - seekOffsets(consumer, topic, consumerPosition); - while (!sink.isCancelled()) { + assignPartitions(consumer); + seekOffsets(consumer); + int pollsCount = 0; + while (!sink.isCancelled() || ++pollsCount > MAX_POLLS_COUNT) { ConsumerRecords records = consumer.poll(POLL_TIMEOUT_MS); log.info("{} records polled", records.count()); records.iterator() @@ -72,44 +76,49 @@ public void emit(FluxSink> sink) { } } - private void assignPartitions(KafkaConsumer consumer, String topicName) { - List partitions = Optional.ofNullable(cluster.getTopics().get(topicName)) - .orElseThrow(() -> new IllegalArgumentException("Unknown topic: " + topicName)) + private List getRequestedPartitions() { + Map partitionPositions = consumerPosition.getSeekTo(); + + return Optional.ofNullable(cluster.getTopics().get(topic)) + .orElseThrow(() -> new IllegalArgumentException("Unknown topic: " + topic)) .getPartitions().stream() - .map(partitionInfo -> new TopicPartition(topicName, partitionInfo.getPartition())) + .filter(internalPartition -> partitionPositions.isEmpty() || partitionPositions.containsKey(internalPartition.getPartition())) + .map(partitionInfo -> new TopicPartition(topic, partitionInfo.getPartition())) .collect(Collectors.toList()); + } + + private void assignPartitions(KafkaConsumer consumer) { + List partitions = getRequestedPartitions(); consumer.assign(partitions); } - private void seekOffsets(KafkaConsumer consumer, String topic, ConsumerPosition consumerPosition) { - PositionType positionType = consumerPosition.getPositionType(); - switch (positionType) { + private void seekOffsets(KafkaConsumer consumer) { + SeekType seekType = consumerPosition.getSeekType(); + switch (seekType) { case OFFSET: - consumerPosition.getPartitionPositions().forEach(partitionPosition -> { - TopicPartition topicPartition = new TopicPartition(topic, partitionPosition.getPartition()); - consumer.seek(topicPartition, partitionPosition.getPosition()); + consumerPosition.getSeekTo().forEach((partition, offset) -> { + TopicPartition topicPartition = new TopicPartition(topic, partition); + consumer.seek(topicPartition, offset); }); break; case TIMESTAMP: - Map timestampsToSearch = consumerPosition.getPartitionPositions().stream() + Map timestampsToSearch = consumerPosition.getSeekTo().entrySet().stream() .collect(Collectors.toMap( - partitionPosition -> new TopicPartition(topic, partitionPosition.getPartition()), - PartitionPosition::getPosition - )); + partitionPosition -> new TopicPartition(topic, partitionPosition.getKey()), + Map.Entry::getValue + )); consumer.offsetsForTimes(timestampsToSearch) .forEach((topicPartition, offsetAndTimestamp) -> consumer.seek(topicPartition, offsetAndTimestamp.offset()) ); break; - case LATEST: - List partitions = consumerPosition.getPartitionPositions().stream() - .map(partitionPosition -> new TopicPartition(topic, partitionPosition.getPartition())) - .collect(Collectors.toList()); - Map endOffsets = consumer.endOffsets(partitions); - endOffsets.forEach(((topicPartition, offset) -> consumer.seek(topicPartition, offset - BATCH_SIZE))); + case BEGINNING: + List partitions = getRequestedPartitions(); + consumer.seekToBeginning(partitions); + break; default: - throw new IllegalArgumentException("Unknown type of positionType: " + positionType); + throw new IllegalArgumentException("Unknown seekType: " + seekType); } } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java index 9c75b61a9aa..0b8d9396c69 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java @@ -2,10 +2,11 @@ import com.provectus.kafka.ui.api.ApiClustersApi; import com.provectus.kafka.ui.cluster.model.ConsumerPosition; -import com.provectus.kafka.ui.cluster.model.PartitionPosition; import com.provectus.kafka.ui.cluster.service.ClusterService; import com.provectus.kafka.ui.model.*; import lombok.RequiredArgsConstructor; + +import org.apache.commons.lang3.tuple.Pair; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.RestController; @@ -13,9 +14,11 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import java.util.Collections; import java.util.List; +import java.util.function.Function; + import javax.validation.Valid; -import javax.validation.constraints.NotNull; @RestController @RequiredArgsConstructor @@ -62,9 +65,9 @@ public Mono>> getTopicConfigs(String clusterNam } @Override - public Mono>> getTopicMessages(String clusterName, String topicName, @NotNull @Valid PositionType positionType, @Valid List position, ServerWebExchange exchange) { - return parseConsumerPosition(positionType, position) - .map(consumerPosition -> ResponseEntity.ok(clusterService.getMessages(clusterName, topicName, consumerPosition))); + public Mono>> getTopicMessages(String clusterName, String topicName, @Valid SeekType seekType, @Valid List seekTo, @Valid Integer limit, ServerWebExchange exchange) { + return parseConsumerPosition(seekType, seekTo) + .map(consumerPosition -> ResponseEntity.ok(clusterService.getMessages(clusterName, topicName, consumerPosition, limit))); } @Override @@ -97,17 +100,19 @@ public Mono> updateTopic(String clusterId, String topicNam return clusterService.updateTopic(clusterId, topicName, topicFormData).map(ResponseEntity::ok); } - private Mono parseConsumerPosition(PositionType positionType, List positionParam) { - return Flux.fromIterable(positionParam) + private Mono parseConsumerPosition(SeekType seekType, List seekTo) { + return Mono.justOrEmpty(seekTo) + .defaultIfEmpty(Collections.emptyList()) + .flatMapIterable(Function.identity()) .map(p -> { String[] splited = p.split("::"); if (splited.length != 2) { - throw new IllegalArgumentException("Wrong position argument format. See API docs for details"); + throw new IllegalArgumentException("Wrong seekTo argument format. See API docs for details"); } - return new PartitionPosition(Integer.parseInt(splited[0]), Long.parseLong(splited[1])); + return Pair.of(Integer.parseInt(splited[0]), Long.parseLong(splited[1])); }) - .collectList() - .map(positions -> new ConsumerPosition(positionType, positions)); + .collectMap(Pair::getKey, Pair::getValue) + .map(positions -> new ConsumerPosition(seekType != null ? seekType : SeekType.BEGINNING, positions)); } } diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml index 17a3c14af1e..57cad5dddfe 100644 --- a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml +++ b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml @@ -214,18 +214,21 @@ paths: required: true schema: type: string - - name: positionType + - name: seekType in: query - required: true schema: - $ref: "#/components/schemas/PositionType" - - name: position + $ref: "#/components/schemas/SeekType" + - name: seekTo in: query schema: type: array items: type: string description: The format is [partition]::[offset] for specifying offsets or [partition]::[timstamp in millis] for specifying timestamps + - name: limit + in: query + schema: + type: integer responses: 200: description: OK @@ -461,10 +464,10 @@ components: - offset - timestamp - PositionType: + SeekType: type: string enum: - - LATEST + - BEGINNING - OFFSET - TIMESTAMP