Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/10 seek offsets #54

Merged
merged 3 commits into from
Jun 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.provectus.kafka.ui.cluster.model;

import lombok.Value;

import java.util.Map;

import com.provectus.kafka.ui.model.SeekType;

@Value
public class ConsumerPosition {

private SeekType seekType;
private Map<Integer, Long> seekTo;

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.provectus.kafka.ui.cluster.mapper.ClusterMapper;
import com.provectus.kafka.ui.cluster.model.ClustersStorage;
import com.provectus.kafka.ui.cluster.model.ConsumerPosition;
import com.provectus.kafka.ui.cluster.model.KafkaCluster;
import com.provectus.kafka.ui.cluster.util.ClusterUtil;
import com.provectus.kafka.ui.kafka.KafkaService;
Expand Down Expand Up @@ -61,7 +62,7 @@ public Optional<TopicDetails> getTopicDetails(String name, String topicName) {
.map(t -> t.get(topicName))
.map(clusterMapper::toTopicDetails);
}

public Optional<List<TopicConfig>> getTopicConfigs(String name, String topicName) {
return clustersStorage.getClusterByName(name)
.map(KafkaCluster::getTopics)
Expand Down Expand Up @@ -149,9 +150,9 @@ private <T> Mono<T> updateCluster(T topic, String clusterName, KafkaCluster clus
});
}

public Flux<TopicMessage> getMessages(String clusterName, String topicName, Integer partition, Long offset, OffsetDateTime timestamp) {
public Flux<TopicMessage> getMessages(String clusterName, String topicName, ConsumerPosition consumerPosition, Integer limit) {
return clustersStorage.getClusterByName(clusterName)
.map(c -> consumingService.loadMessages(c, topicName))
.map(c -> consumingService.loadMessages(c, topicName, consumerPosition, limit))
.orElse(Flux.empty());

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

Expand All @@ -15,10 +16,11 @@
import org.apache.kafka.common.utils.Bytes;
import org.springframework.stereotype.Service;

import com.provectus.kafka.ui.cluster.model.InternalTopic;
import com.provectus.kafka.ui.cluster.model.ConsumerPosition;
import com.provectus.kafka.ui.cluster.model.KafkaCluster;
import com.provectus.kafka.ui.cluster.util.ClusterUtil;
import com.provectus.kafka.ui.kafka.KafkaService;
import com.provectus.kafka.ui.model.SeekType;
import com.provectus.kafka.ui.model.TopicMessage;

import reactor.core.publisher.Flux;
Expand All @@ -30,18 +32,21 @@
@RequiredArgsConstructor
public class ConsumingService {


// TODO: make this configurable
private static final int BATCH_SIZE = 20;
private static final int MAX_RECORD_LIMIT = 100;
private static final int DEFAULT_RECORD_LIMIT = 20;
private static final int MAX_POLLS_COUNT = 30;

private final KafkaService kafkaService;

public Flux<TopicMessage> loadMessages(KafkaCluster cluster, String topic) {
RecordEmitter emitter = new RecordEmitter(kafkaService, cluster, topic);
public Flux<TopicMessage> loadMessages(KafkaCluster cluster, String topic, ConsumerPosition consumerPosition, Integer limit) {
int recordsLimit = Optional.ofNullable(limit)
.map(s -> Math.min(s, MAX_RECORD_LIMIT))
.orElse(DEFAULT_RECORD_LIMIT);
RecordEmitter emitter = new RecordEmitter(kafkaService, cluster, topic, consumerPosition);
return Flux.create(emitter::emit)
.subscribeOn(Schedulers.boundedElastic())
.map(ClusterUtil::mapToTopicMessage)
.limitRequest(BATCH_SIZE);
.limitRequest(recordsLimit);
}

@RequiredArgsConstructor
Expand All @@ -52,11 +57,14 @@ private static class RecordEmitter {
private final KafkaService kafkaService;
private final KafkaCluster cluster;
private final String topic;
private final ConsumerPosition consumerPosition;

public void emit(FluxSink<ConsumerRecord<Bytes, Bytes>> sink) {
try (KafkaConsumer<Bytes, Bytes> consumer = kafkaService.createConsumer(cluster)) {
assignPartitions(consumer, topic);
while (!sink.isCancelled()) {
assignPartitions(consumer);
seekOffsets(consumer);
int pollsCount = 0;
while (!sink.isCancelled() || ++pollsCount > MAX_POLLS_COUNT) {
ConsumerRecords<Bytes, Bytes> records = consumer.poll(POLL_TIMEOUT_MS);
log.info("{} records polled", records.count());
records.iterator()
Expand All @@ -68,16 +76,50 @@ public void emit(FluxSink<ConsumerRecord<Bytes, Bytes>> sink) {
}
}

private void assignPartitions(KafkaConsumer<Bytes, Bytes> consumer, String topicName) {
List<TopicPartition> partitions = Optional.ofNullable(cluster.getTopics().get(topicName))
.orElseThrow(() -> new IllegalArgumentException("Unknown topic: " + topicName))
private List<TopicPartition> getRequestedPartitions() {
Map<Integer, Long> partitionPositions = consumerPosition.getSeekTo();

return Optional.ofNullable(cluster.getTopics().get(topic))
.orElseThrow(() -> new IllegalArgumentException("Unknown topic: " + topic))
.getPartitions().stream()
.map(partitionInfo -> new TopicPartition(topicName, partitionInfo.getPartition()))
.filter(internalPartition -> partitionPositions.isEmpty() || partitionPositions.containsKey(internalPartition.getPartition()))
.map(partitionInfo -> new TopicPartition(topic, partitionInfo.getPartition()))
.collect(Collectors.toList());
}

private void assignPartitions(KafkaConsumer<Bytes, Bytes> consumer) {
List<TopicPartition> partitions = getRequestedPartitions();

consumer.assign(partitions);
// TODO: seek to requested offsets
consumer.seekToBeginning(partitions);
}

private void seekOffsets(KafkaConsumer<Bytes, Bytes> consumer) {
SeekType seekType = consumerPosition.getSeekType();
switch (seekType) {
case OFFSET:
consumerPosition.getSeekTo().forEach((partition, offset) -> {
TopicPartition topicPartition = new TopicPartition(topic, partition);
consumer.seek(topicPartition, offset);
});
break;
case TIMESTAMP:
Map<TopicPartition, Long> timestampsToSearch = consumerPosition.getSeekTo().entrySet().stream()
.collect(Collectors.toMap(
partitionPosition -> new TopicPartition(topic, partitionPosition.getKey()),
Map.Entry::getValue
));
consumer.offsetsForTimes(timestampsToSearch)
.forEach((topicPartition, offsetAndTimestamp) ->
consumer.seek(topicPartition, offsetAndTimestamp.offset())
);
break;
case BEGINNING:
List<TopicPartition> partitions = getRequestedPartitions();
consumer.seekToBeginning(partitions);
break;
default:
throw new IllegalArgumentException("Unknown seekType: " + seekType);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,18 +1,24 @@
package com.provectus.kafka.ui.rest;

import com.provectus.kafka.ui.api.ApiClustersApi;
import com.provectus.kafka.ui.cluster.model.ConsumerPosition;
import com.provectus.kafka.ui.cluster.service.ClusterService;
import com.provectus.kafka.ui.model.*;
import lombok.RequiredArgsConstructor;

import org.apache.commons.lang3.tuple.Pair;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.Collections;
import java.util.List;
import java.util.function.Function;

import javax.validation.Valid;
import java.time.OffsetDateTime;

@RestController
@RequiredArgsConstructor
Expand Down Expand Up @@ -59,10 +65,9 @@ public Mono<ResponseEntity<Flux<TopicConfig>>> getTopicConfigs(String clusterNam
}

@Override
public Mono<ResponseEntity<Flux<TopicMessage>>> getTopicMessages(String clusterName, String topicName, @Valid Integer partition, @Valid Long offset, @Valid OffsetDateTime timestamp, ServerWebExchange exchange) {
return Mono.just(
ResponseEntity.ok(clusterService.getMessages(clusterName, topicName, partition, offset, timestamp))
);
public Mono<ResponseEntity<Flux<TopicMessage>>> getTopicMessages(String clusterName, String topicName, @Valid SeekType seekType, @Valid List<String> seekTo, @Valid Integer limit, ServerWebExchange exchange) {
return parseConsumerPosition(seekType, seekTo)
.map(consumerPosition -> ResponseEntity.ok(clusterService.getMessages(clusterName, topicName, consumerPosition, limit)));
}

@Override
Expand Down Expand Up @@ -94,4 +99,20 @@ public Mono<ResponseEntity<ConsumerGroupDetails>> getConsumerGroup(String cluste
public Mono<ResponseEntity<Topic>> updateTopic(String clusterId, String topicName, @Valid Mono<TopicFormData> topicFormData, ServerWebExchange exchange) {
return clusterService.updateTopic(clusterId, topicName, topicFormData).map(ResponseEntity::ok);
}

private Mono<ConsumerPosition> parseConsumerPosition(SeekType seekType, List<String> seekTo) {
return Mono.justOrEmpty(seekTo)
.defaultIfEmpty(Collections.emptyList())
.flatMapIterable(Function.identity())
.map(p -> {
String[] splited = p.split("::");
if (splited.length != 2) {
throw new IllegalArgumentException("Wrong seekTo argument format. See API docs for details");
}

return Pair.of(Integer.parseInt(splited[0]), Long.parseLong(splited[1]));
})
.collectMap(Pair::getKey, Pair::getValue)
.map(positions -> new ConsumerPosition(seekType != null ? seekType : SeekType.BEGINNING, positions));
}
}
24 changes: 16 additions & 8 deletions kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -214,20 +214,21 @@ paths:
required: true
schema:
type: string
- name: partition
- name: seekType
in: query
schema:
type: integer
- name: offset
$ref: "#/components/schemas/SeekType"
- name: seekTo
in: query
schema:
type: integer
format: int64
- name: timestamp
type: array
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is bad idea, let's make these fields explicit
[ {partition: 1, offset: 0}.. ]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's possible only if we'll use POST for search. And that will broke our restfulness.

items:
type: string
description: The format is [partition]::[offset] for specifying offsets or [partition]::[timstamp in millis] for specifying timestamps
- name: limit
in: query
schema:
type: string
format: date-time
type: integer
responses:
200:
description: OK
Expand Down Expand Up @@ -463,6 +464,13 @@ components:
- offset
- timestamp

SeekType:
type: string
enum:
- BEGINNING
- OFFSET
- TIMESTAMP

TopicPartitionDto:
type: object
properties:
Expand Down