Skip to content

Commit

Permalink
Feature/10 seek offsets (provectus#54)
Browse files Browse the repository at this point in the history
* Seek offsets

* Seek and limit
  • Loading branch information
apetrovs committed Jun 2, 2020
1 parent 76a9786 commit 790e485
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 31 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.provectus.kafka.ui.cluster.model;

import lombok.Value;

import java.util.Map;

import com.provectus.kafka.ui.model.SeekType;

@Value
public class ConsumerPosition {

private SeekType seekType;
private Map<Integer, Long> seekTo;

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.provectus.kafka.ui.cluster.mapper.ClusterMapper;
import com.provectus.kafka.ui.cluster.model.ClustersStorage;
import com.provectus.kafka.ui.cluster.model.ConsumerPosition;
import com.provectus.kafka.ui.cluster.model.KafkaCluster;
import com.provectus.kafka.ui.cluster.util.ClusterUtil;
import com.provectus.kafka.ui.kafka.KafkaService;
Expand Down Expand Up @@ -61,7 +62,7 @@ public Optional<TopicDetails> getTopicDetails(String name, String topicName) {
.map(t -> t.get(topicName))
.map(clusterMapper::toTopicDetails);
}

public Optional<List<TopicConfig>> getTopicConfigs(String name, String topicName) {
return clustersStorage.getClusterByName(name)
.map(KafkaCluster::getTopics)
Expand Down Expand Up @@ -149,9 +150,9 @@ private <T> Mono<T> updateCluster(T topic, String clusterName, KafkaCluster clus
});
}

public Flux<TopicMessage> getMessages(String clusterName, String topicName, Integer partition, Long offset, OffsetDateTime timestamp) {
public Flux<TopicMessage> getMessages(String clusterName, String topicName, ConsumerPosition consumerPosition, Integer limit) {
return clustersStorage.getClusterByName(clusterName)
.map(c -> consumingService.loadMessages(c, topicName))
.map(c -> consumingService.loadMessages(c, topicName, consumerPosition, limit))
.orElse(Flux.empty());

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

Expand All @@ -15,10 +16,11 @@
import org.apache.kafka.common.utils.Bytes;
import org.springframework.stereotype.Service;

import com.provectus.kafka.ui.cluster.model.InternalTopic;
import com.provectus.kafka.ui.cluster.model.ConsumerPosition;
import com.provectus.kafka.ui.cluster.model.KafkaCluster;
import com.provectus.kafka.ui.cluster.util.ClusterUtil;
import com.provectus.kafka.ui.kafka.KafkaService;
import com.provectus.kafka.ui.model.SeekType;
import com.provectus.kafka.ui.model.TopicMessage;

import reactor.core.publisher.Flux;
Expand All @@ -30,18 +32,21 @@
@RequiredArgsConstructor
public class ConsumingService {


// TODO: make this configurable
private static final int BATCH_SIZE = 20;
private static final int MAX_RECORD_LIMIT = 100;
private static final int DEFAULT_RECORD_LIMIT = 20;
private static final int MAX_POLLS_COUNT = 30;

private final KafkaService kafkaService;

public Flux<TopicMessage> loadMessages(KafkaCluster cluster, String topic) {
RecordEmitter emitter = new RecordEmitter(kafkaService, cluster, topic);
public Flux<TopicMessage> loadMessages(KafkaCluster cluster, String topic, ConsumerPosition consumerPosition, Integer limit) {
int recordsLimit = Optional.ofNullable(limit)
.map(s -> Math.min(s, MAX_RECORD_LIMIT))
.orElse(DEFAULT_RECORD_LIMIT);
RecordEmitter emitter = new RecordEmitter(kafkaService, cluster, topic, consumerPosition);
return Flux.create(emitter::emit)
.subscribeOn(Schedulers.boundedElastic())
.map(ClusterUtil::mapToTopicMessage)
.limitRequest(BATCH_SIZE);
.limitRequest(recordsLimit);
}

@RequiredArgsConstructor
Expand All @@ -52,11 +57,14 @@ private static class RecordEmitter {
private final KafkaService kafkaService;
private final KafkaCluster cluster;
private final String topic;
private final ConsumerPosition consumerPosition;

public void emit(FluxSink<ConsumerRecord<Bytes, Bytes>> sink) {
try (KafkaConsumer<Bytes, Bytes> consumer = kafkaService.createConsumer(cluster)) {
assignPartitions(consumer, topic);
while (!sink.isCancelled()) {
assignPartitions(consumer);
seekOffsets(consumer);
int pollsCount = 0;
while (!sink.isCancelled() || ++pollsCount > MAX_POLLS_COUNT) {
ConsumerRecords<Bytes, Bytes> records = consumer.poll(POLL_TIMEOUT_MS);
log.info("{} records polled", records.count());
records.iterator()
Expand All @@ -68,16 +76,50 @@ public void emit(FluxSink<ConsumerRecord<Bytes, Bytes>> sink) {
}
}

private void assignPartitions(KafkaConsumer<Bytes, Bytes> consumer, String topicName) {
List<TopicPartition> partitions = Optional.ofNullable(cluster.getTopics().get(topicName))
.orElseThrow(() -> new IllegalArgumentException("Unknown topic: " + topicName))
private List<TopicPartition> getRequestedPartitions() {
Map<Integer, Long> partitionPositions = consumerPosition.getSeekTo();

return Optional.ofNullable(cluster.getTopics().get(topic))
.orElseThrow(() -> new IllegalArgumentException("Unknown topic: " + topic))
.getPartitions().stream()
.map(partitionInfo -> new TopicPartition(topicName, partitionInfo.getPartition()))
.filter(internalPartition -> partitionPositions.isEmpty() || partitionPositions.containsKey(internalPartition.getPartition()))
.map(partitionInfo -> new TopicPartition(topic, partitionInfo.getPartition()))
.collect(Collectors.toList());
}

private void assignPartitions(KafkaConsumer<Bytes, Bytes> consumer) {
List<TopicPartition> partitions = getRequestedPartitions();

consumer.assign(partitions);
// TODO: seek to requested offsets
consumer.seekToBeginning(partitions);
}

private void seekOffsets(KafkaConsumer<Bytes, Bytes> consumer) {
SeekType seekType = consumerPosition.getSeekType();
switch (seekType) {
case OFFSET:
consumerPosition.getSeekTo().forEach((partition, offset) -> {
TopicPartition topicPartition = new TopicPartition(topic, partition);
consumer.seek(topicPartition, offset);
});
break;
case TIMESTAMP:
Map<TopicPartition, Long> timestampsToSearch = consumerPosition.getSeekTo().entrySet().stream()
.collect(Collectors.toMap(
partitionPosition -> new TopicPartition(topic, partitionPosition.getKey()),
Map.Entry::getValue
));
consumer.offsetsForTimes(timestampsToSearch)
.forEach((topicPartition, offsetAndTimestamp) ->
consumer.seek(topicPartition, offsetAndTimestamp.offset())
);
break;
case BEGINNING:
List<TopicPartition> partitions = getRequestedPartitions();
consumer.seekToBeginning(partitions);
break;
default:
throw new IllegalArgumentException("Unknown seekType: " + seekType);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,18 +1,24 @@
package com.provectus.kafka.ui.rest;

import com.provectus.kafka.ui.api.ApiClustersApi;
import com.provectus.kafka.ui.cluster.model.ConsumerPosition;
import com.provectus.kafka.ui.cluster.service.ClusterService;
import com.provectus.kafka.ui.model.*;
import lombok.RequiredArgsConstructor;

import org.apache.commons.lang3.tuple.Pair;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.Collections;
import java.util.List;
import java.util.function.Function;

import javax.validation.Valid;
import java.time.OffsetDateTime;

@RestController
@RequiredArgsConstructor
Expand Down Expand Up @@ -59,10 +65,9 @@ public Mono<ResponseEntity<Flux<TopicConfig>>> getTopicConfigs(String clusterNam
}

@Override
public Mono<ResponseEntity<Flux<TopicMessage>>> getTopicMessages(String clusterName, String topicName, @Valid Integer partition, @Valid Long offset, @Valid OffsetDateTime timestamp, ServerWebExchange exchange) {
return Mono.just(
ResponseEntity.ok(clusterService.getMessages(clusterName, topicName, partition, offset, timestamp))
);
public Mono<ResponseEntity<Flux<TopicMessage>>> getTopicMessages(String clusterName, String topicName, @Valid SeekType seekType, @Valid List<String> seekTo, @Valid Integer limit, ServerWebExchange exchange) {
return parseConsumerPosition(seekType, seekTo)
.map(consumerPosition -> ResponseEntity.ok(clusterService.getMessages(clusterName, topicName, consumerPosition, limit)));
}

@Override
Expand Down Expand Up @@ -94,4 +99,20 @@ public Mono<ResponseEntity<ConsumerGroupDetails>> getConsumerGroup(String cluste
public Mono<ResponseEntity<Topic>> updateTopic(String clusterId, String topicName, @Valid Mono<TopicFormData> topicFormData, ServerWebExchange exchange) {
return clusterService.updateTopic(clusterId, topicName, topicFormData).map(ResponseEntity::ok);
}

private Mono<ConsumerPosition> parseConsumerPosition(SeekType seekType, List<String> seekTo) {
return Mono.justOrEmpty(seekTo)
.defaultIfEmpty(Collections.emptyList())
.flatMapIterable(Function.identity())
.map(p -> {
String[] splited = p.split("::");
if (splited.length != 2) {
throw new IllegalArgumentException("Wrong seekTo argument format. See API docs for details");
}

return Pair.of(Integer.parseInt(splited[0]), Long.parseLong(splited[1]));
})
.collectMap(Pair::getKey, Pair::getValue)
.map(positions -> new ConsumerPosition(seekType != null ? seekType : SeekType.BEGINNING, positions));
}
}
24 changes: 16 additions & 8 deletions kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -214,20 +214,21 @@ paths:
required: true
schema:
type: string
- name: partition
- name: seekType
in: query
schema:
type: integer
- name: offset
$ref: "#/components/schemas/SeekType"
- name: seekTo
in: query
schema:
type: integer
format: int64
- name: timestamp
type: array
items:
type: string
description: The format is [partition]::[offset] for specifying offsets or [partition]::[timstamp in millis] for specifying timestamps
- name: limit
in: query
schema:
type: string
format: date-time
type: integer
responses:
200:
description: OK
Expand Down Expand Up @@ -463,6 +464,13 @@ components:
- offset
- timestamp

SeekType:
type: string
enum:
- BEGINNING
- OFFSET
- TIMESTAMP

TopicPartitionDto:
type: object
properties:
Expand Down

0 comments on commit 790e485

Please sign in to comment.