Skip to content

Commit

Permalink
provectus#211 Feature/clear topic messages (provectus#241)
Browse files Browse the repository at this point in the history
* added delete action

* added test for delete action

* added 404 status in contract

* fixed typo

* added partition parameter

* big refactoring
  • Loading branch information
RamazanYapparov committed Mar 15, 2021
1 parent 2b88d1b commit c4e146a
Show file tree
Hide file tree
Showing 10 changed files with 191 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -196,4 +196,14 @@ public Flux<TopicMessage> getMessages(String clusterName, String topicName, Cons
.map(c -> consumingService.loadMessages(c, topicName, consumerPosition, query, limit))
.orElse(Flux.empty());
}

public Mono<Void> deleteTopicMessages(String clusterName, String topicName, List<Integer> partitions) {
var cluster = clustersStorage.getClusterByName(clusterName)
.orElseThrow(() -> new NotFoundException("No such cluster"));
if (!cluster.getTopics().containsKey(topicName)) {
throw new NotFoundException("No such topic");
}
return consumingService.loadOffsets(cluster, topicName, partitions)
.flatMap(offsets -> kafkaService.deleteTopicMessages(cluster, offsets));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

import java.time.Duration;
Expand Down Expand Up @@ -55,6 +56,25 @@ public Flux<TopicMessage> loadMessages(KafkaCluster cluster, String topic, Consu
.limitRequest(recordsLimit);
}

public Mono<Map<TopicPartition, Long>> loadOffsets(KafkaCluster cluster, String topicName, List<Integer> partitionsToInclude) {
return Mono.fromSupplier(() -> {
try (KafkaConsumer<Bytes, Bytes> consumer = kafkaService.createConsumer(cluster)) {
var partitions = consumer.partitionsFor(topicName).stream()
.filter(p -> partitionsToInclude.isEmpty() || partitionsToInclude.contains(p.partition()))
.map(p -> new TopicPartition(topicName, p.partition()))
.collect(Collectors.toList());
var beginningOffsets = consumer.beginningOffsets(partitions);
var endOffsets = consumer.endOffsets(partitions);
return endOffsets.entrySet().stream()
.filter(entry -> !beginningOffsets.get(entry.getKey()).equals(entry.getValue()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
} catch (Exception e) {
log.error("Error occurred while consuming records", e);
throw new RuntimeException(e);
}
});
}

private boolean filterTopicMessage(TopicMessage message, String query) {
if (StringUtils.isEmpty(query)) {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -490,4 +490,12 @@ public Map<Integer, InternalPartition> getTopicPartitions(KafkaCluster c, Intern
return Collections.emptyMap();
}
}

public Mono<Void> deleteTopicMessages(KafkaCluster cluster, Map<TopicPartition, Long> offsets) {
var records = offsets.entrySet().stream()
.map(entry -> Map.entry(entry.getKey(), RecordsToDelete.beforeOffset(entry.getValue())))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
return getOrCreateAdminClient(cluster).map(ExtendedAdminClient::getAdminClient)
.map(ac -> ac.deleteRecords(records)).then();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import javax.validation.Valid;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.function.Function;

@RestController
Expand Down Expand Up @@ -84,6 +85,12 @@ public Mono<ResponseEntity<Flux<TopicMessage>>> getTopicMessages(String clusterN
.map(consumerPosition -> ResponseEntity.ok(clusterService.getMessages(clusterName, topicName, consumerPosition, q, limit)));
}

@Override
public Mono<ResponseEntity<Void>> deleteTopicMessages(String clusterName, String topicName, @Valid List<Integer> partitions, ServerWebExchange exchange) {
return clusterService.deleteTopicMessages(clusterName, topicName, Optional.ofNullable(partitions).orElse(List.of()))
.map(ResponseEntity::ok);
}

@Override
public Mono<ResponseEntity<Topic>> createTopic(String clusterName, @Valid Mono<TopicFormData> topicFormData, ServerWebExchange exchange) {
return clusterService.createTopic(clusterName, topicFormData)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.provectus.kafka.ui;

import com.provectus.kafka.ui.container.KafkaConnectContainer;
import com.provectus.kafka.ui.container.SchemaRegistryContainer;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.boot.test.context.SpringBootTest;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package com.provectus.kafka.ui;

import com.provectus.kafka.ui.model.TopicFormData;
import com.provectus.kafka.ui.model.TopicMessage;
import com.provectus.kafka.ui.producer.KafkaTestProducer;
import lombok.extern.log4j.Log4j2;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.web.reactive.AutoConfigureWebTestClient;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.web.reactive.server.WebTestClient;

import java.util.Map;
import java.util.UUID;
import java.util.stream.Stream;

@ContextConfiguration(initializers = {AbstractBaseTest.Initializer.class})
@Log4j2
@AutoConfigureWebTestClient(timeout = "60000")
public class KafkaConsumerTests extends AbstractBaseTest {

@Autowired
private WebTestClient webTestClient;


@Test
public void shouldDeleteRecords() {
var topicName = UUID.randomUUID().toString();
webTestClient.post()
.uri("/api/clusters/{clusterName}/topics", LOCAL)
.bodyValue(new TopicFormData()
.name(topicName)
.partitions(1)
.replicationFactor(1)
.configs(Map.of())
)
.exchange()
.expectStatus()
.isOk();

try(KafkaTestProducer<String, String> producer = KafkaTestProducer.forKafka(kafka)) {
Stream.of("one", "two", "three", "four")
.forEach(value -> producer.send(topicName, value));
}

webTestClient.get()
.uri("/api/clusters/{clusterName}/topics/{topicName}/messages", LOCAL, topicName)
.exchange()
.expectStatus()
.isOk()
.expectBodyList(TopicMessage.class)
.hasSize(4);

webTestClient.delete()
.uri("/api/clusters/{clusterName}/topics/{topicName}/messages", LOCAL, topicName)
.exchange()
.expectStatus()
.isOk();

webTestClient.get()
.uri("/api/clusters/{clusterName}/topics/{topicName}/messages", LOCAL, topicName)
.exchange()
.expectStatus()
.isOk()
.expectBodyList(TopicMessage.class)
.hasSize(0);
}

@Test
public void shouldReturn404ForNonExistingTopic() {
var topicName = UUID.randomUUID().toString();

webTestClient.delete()
.uri("/api/clusters/{clusterName}/topics/{topicName}/messages", LOCAL, topicName)
.exchange()
.expectStatus()
.isNotFound();
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.provectus.kafka.ui;
package com.provectus.kafka.ui.container;

import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.KafkaContainer;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.provectus.kafka.ui;
package com.provectus.kafka.ui.container;

import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.KafkaContainer;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package com.provectus.kafka.ui.producer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.testcontainers.containers.KafkaContainer;

import java.util.Map;

public class KafkaTestProducer<KeyT, ValueT> implements AutoCloseable {
private final KafkaProducer<KeyT, ValueT> producer;

private KafkaTestProducer(KafkaProducer<KeyT, ValueT> producer) {
this.producer = producer;
}

public static KafkaTestProducer<String, String> forKafka(KafkaContainer kafkaContainer) {
return new KafkaTestProducer<>(new KafkaProducer<>(Map.of(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers(),
ProducerConfig.CLIENT_ID_CONFIG, "KafkaTestProducer",
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class,
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class
)));
}

public void send(String topic, ValueT value) {
producer.send(new ProducerRecord<>(topic, value));
}

@Override
public void close() {
producer.close();
}
}
28 changes: 28 additions & 0 deletions kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,34 @@ paths:
type: array
items:
$ref: '#/components/schemas/TopicMessage'
delete:
tags:
- /api/clusters
summary: deleteTopicMessages
operationId: deleteTopicMessages
parameters:
- name: clusterName
in: path
required: true
schema:
type: string
- name: topicName
in: path
required: true
schema:
type: string
- name: partitions
in: query
required: false
schema:
type: array
items:
type: integer
responses:
200:
description: OK
404:
description: Not found

/api/clusters/{clusterName}/consumer-groups/{id}:
get:
Expand Down

0 comments on commit c4e146a

Please sign in to comment.