Skip to content

Commit

Permalink
feat(all): add an empty topic button (#258)
Browse files Browse the repository at this point in the history
Co-authored-by: Clement Allavena <FP17690@adgroup.michelin.com>
  • Loading branch information
clallavena and Clement Allavena committed Sep 14, 2020
1 parent cdbccd1 commit c42b6e9
Show file tree
Hide file tree
Showing 8 changed files with 158 additions and 9 deletions.
1 change: 1 addition & 0 deletions README.md
Expand Up @@ -57,6 +57,7 @@
- Configurations view
- Logs view
- Delete a record
- Empty a Topic (Delete all the record from one topic)
- Sort view
- Filter per partitions
- Filter with a starting time
Expand Down
57 changes: 55 additions & 2 deletions client/src/containers/Topic/Topic/Topic.jsx
Expand Up @@ -6,8 +6,10 @@ import TopicGroups from './TopicGroups';
import TopicConfigs from './TopicConfigs';
import TopicAcls from './TopicAcls';
import TopicLogs from './TopicLogs';
import { get } from '../../../utils/api';
import { uriTopicsConfigs } from '../../../utils/endpoints';
import { get, remove } from '../../../utils/api';
import { uriTopicsConfigs, uriTopicDataEmpty } from '../../../utils/endpoints';
import ConfirmModal from '../../../components/Modal/ConfirmModal';
import { toast } from 'react-toastify';
import {getSelectedTab} from "../../../utils/functions";

class Topic extends Component {
Expand All @@ -16,6 +18,9 @@ class Topic extends Component {
topicId: this.props.topicId,
topic: {},
selectedTab: '',
showDeleteModal: false,
deleteMessage: '',
compactMessageToDelete: '',
roles: JSON.parse(sessionStorage.getItem('roles')),
topicInternal: false,
configs: {}
Expand Down Expand Up @@ -52,6 +57,41 @@ class Topic extends Component {
);
}

handleOnEmpty() {
this.setState(() => {
this.showDeleteModal(
<React.Fragment>
Do you want to empty the Topic: {<code>{this.state.topicId}</code>} ?
</React.Fragment>
);
});
}

showDeleteModal = deleteMessage => {
this.setState({ showDeleteModal: true, deleteMessage });
};

closeDeleteModal = () => {
this.setState({ showDeleteModal: false, deleteMessage: '' });
};

emptyTopic = () => {
const { clusterId, topicId } = this.props.match.params;

remove(
uriTopicDataEmpty(clusterId, topicId)
)
.then(() => {
toast.success(`Topic '${topicId}' will be emptied`);
this.setState({ showDeleteModal: false }, () => {
this.getMessages();
});
})
.catch(() => {
this.setState({ showDeleteModal: false });
});
};

componentDidUpdate(prevProps, prevState) {
if (this.props.location.pathname !== prevProps.location.pathname) {
const tabSelected = getSelectedTab(this.props, this.tabs);
Expand Down Expand Up @@ -225,9 +265,22 @@ class Topic extends Component {
className="btn btn-primary">
<i className="fa fa-plus" aria-hidden={true} /> Produce to topic
</div>

<div
onClick={() => {
this.handleOnEmpty();
}}
className="btn btn-secondary mr-2">
<i className="fas fa-erase" aria-hidden={true} /> Empty Topic
</div>
</li>
</aside>
)}
<ConfirmModal show={this.state.showDeleteModal}
handleCancel={this.closeDeleteModal}
handleConfirm={this.emptyTopic}
message={this.state.deleteMessage}
/>
</div>
);
}
Expand Down
4 changes: 4 additions & 0 deletions client/src/utils/endpoints.js
Expand Up @@ -261,6 +261,10 @@ export const uriTopicDataDelete = (clusterId, topicName, partition, key) => {
return `${apiUrl}/${clusterId}/topic/${topicName}/data?partition=${partition}&key=${key}`;
};

export const uriTopicDataEmpty = (clusterId, topicName) => {
return `${apiUrl}/${clusterId}/topic/${topicName}/data/empty`;
}

export default {
apiUrl,
uriClusters,
Expand Down
14 changes: 13 additions & 1 deletion src/main/java/org/akhq/controllers/TopicController.java
Expand Up @@ -6,13 +6,15 @@
import io.micronaut.context.env.Environment;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.HttpResponse;
import io.micronaut.http.MutableHttpResponse;
import io.micronaut.http.MediaType;
import io.micronaut.http.annotation.Controller;
import io.micronaut.http.annotation.Delete;
import io.micronaut.http.annotation.Get;
import io.micronaut.http.annotation.Post;
import io.micronaut.http.sse.Event;
import io.micronaut.security.annotation.Secured;
import io.swagger.v3.oas.annotations.Hidden;
import io.swagger.v3.oas.annotations.Operation;
import lombok.EqualsAndHashCode;
import lombok.Getter;
Expand Down Expand Up @@ -240,10 +242,20 @@ public List<Config> updateConfig(String cluster, String topicName, Map<String, S
return updated;
}

@Secured(Role.ROLE_TOPIC_DATA_DELETE)
@Delete("api/{cluster}/topic/{topicName}/data/empty")
@Operation(tags = {"topic data"}, summary = "Empty data from a topic")
public void emptyTopicApi(String cluster, String topicName) throws ExecutionException, InterruptedException{
this.recordRepository.emptyTopic(
cluster,
topicName
);
}

@Secured(Role.ROLE_TOPIC_DATA_DELETE)
@Delete("api/{cluster}/topic/{topicName}/data")
@Operation(tags = {"topic data"}, summary = "Delete data from a topic by key")
public Record deleteRecord(String cluster, String topicName, Integer partition, String key) throws ExecutionException, InterruptedException {
public Record deleteRecordApi(String cluster, String topicName, Integer partition, String key) throws ExecutionException, InterruptedException {
return new Record(
this.recordRepository.delete(
cluster,
Expand Down
49 changes: 49 additions & 0 deletions src/main/java/org/akhq/repositories/RecordRepository.java
Expand Up @@ -9,6 +9,19 @@
import io.reactivex.Flowable;
import lombok.*;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.DeletedRecords;
import org.apache.kafka.clients.admin.RecordsToDelete;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.codehaus.httpcache4j.uri.URIBuilder;
import org.akhq.models.Partition;
import org.akhq.models.Record;
import org.akhq.models.Topic;
Expand Down Expand Up @@ -378,6 +391,42 @@ private RecordMetadata produce(
.get();
}

public void emptyTopic(String clusterId, String topicName) throws ExecutionException, InterruptedException {
Map<TopicPartition, RecordsToDelete> recordsToDelete = new HashMap<>();
var topic = topicRepository.findByName(clusterId, topicName);
topic.getPartitions().forEach(partition -> {
recordsToDelete.put(new TopicPartition(partition.getTopic(), partition.getId()),
RecordsToDelete.beforeOffset(partition.getLastOffset()));
});
deleteRecords(clusterId, recordsToDelete);
}

public void emptyTopicByTimestamp(String clusterId,
String topicName,
Long timestamp) throws ExecutionException, InterruptedException {
Map<TopicPartition, Long> timestamps = new HashMap<>();
Map<TopicPartition, RecordsToDelete> recordsToDelete = new HashMap<>();
var topic = topicRepository.findByName(clusterId, topicName);
topic.getPartitions().forEach(partition -> {
timestamps.put(new TopicPartition(partition.getTopic(), partition.getId()),
timestamp);
});
Map<TopicPartition, OffsetAndTimestamp> offsets = kafkaModule.getConsumer(clusterId).offsetsForTimes(timestamps);

offsets.forEach((topicPartition, offsetAndTimestamp) -> {
recordsToDelete.put(topicPartition, RecordsToDelete.beforeOffset(offsetAndTimestamp.offset()));
});
deleteRecords(clusterId, recordsToDelete);

}

private void deleteRecords(String clusterId, Map<TopicPartition, RecordsToDelete> recordsToDelete) throws InterruptedException, ExecutionException {
var deleted = kafkaModule.getAdminClient(clusterId).deleteRecords(recordsToDelete).lowWatermarks();
for (Map.Entry<TopicPartition, KafkaFuture<DeletedRecords>> entry : deleted.entrySet()){
log.debug(entry.getKey().topic() + " " + entry.getKey().partition() + " " + entry.getValue().get().lowWatermark());
}
}

public RecordMetadata produce(
String clusterId,
String topic,
Expand Down
16 changes: 12 additions & 4 deletions src/test/java/org/akhq/KafkaTestCluster.java
Expand Up @@ -46,6 +46,7 @@ public class KafkaTestCluster implements Runnable, Stoppable {
public static final String CLUSTER_ID = "test";

public static final String TOPIC_RANDOM = "random";
public static final String TOPIC_TOBE_EMPTIED = "emptied";
public static final String TOPIC_COMPACTED = "compacted";
public static final String TOPIC_EMPTY = "empty";
public static final String TOPIC_HUGE = "huge";
Expand All @@ -54,10 +55,10 @@ public class KafkaTestCluster implements Runnable, Stoppable {
public static final String TOPIC_STREAM_COUNT = "stream-count";
public static final String TOPIC_CONNECT = "connect-sink";

public static final int TOPIC_ALL_COUNT = 18;
public static final int TOPIC_HIDE_INTERNAL_COUNT = 10;
public static final int TOPIC_HIDE_INTERNAL_STREAM_COUNT = 8;
public static final int TOPIC_HIDE_STREAM_COUNT = 16;
public static final int TOPIC_ALL_COUNT = 19;
public static final int TOPIC_HIDE_INTERNAL_COUNT = 11;
public static final int TOPIC_HIDE_INTERNAL_STREAM_COUNT = 9;
public static final int TOPIC_HIDE_STREAM_COUNT = 17;

public static final String CONSUMER_STREAM_TEST = "stream-test-example";

Expand Down Expand Up @@ -275,6 +276,13 @@ private void injectTestData() throws InterruptedException, ExecutionException {
}
log.debug("Random topic created");

// random data to be emptied
testUtils.createTopic(TOPIC_TOBE_EMPTIED, 3, (short) 1);
for (int partition = 0; partition < 3; partition++) {
testUtils.produceRecords(randomDatas(100, 0), TOPIC_TOBE_EMPTIED, partition);
}
log.debug("Random topic to be emptied created");

// huge data
testUtils.createTopic(TOPIC_HUGE, 3, (short) 1);
for (int partition = 0; partition < 3; partition++) {
Expand Down
4 changes: 2 additions & 2 deletions src/test/java/org/akhq/controllers/TopicControllerTest.java
Expand Up @@ -33,8 +33,8 @@ void listApi() {
assertEquals(5, result.getResults().size());

result = this.retrievePagedList(HttpRequest.GET(BASE_URL + "?page=2"), Topic.class);
assertEquals(KafkaTestCluster.TOPIC_HIDE_INTERNAL_COUNT - 5, result.getResults().size());
assertEquals("stream-test-example-count-repartition", result.getResults().get(4).getName());
assertEquals(KafkaTestCluster.TOPIC_HIDE_INTERNAL_COUNT - 6, result.getResults().size());
assertEquals("stream-test-example-count-changelog", result.getResults().get(4).getName());
}

@Test
Expand Down
22 changes: 22 additions & 0 deletions src/test/java/org/akhq/repositories/RecordRepositoryTest.java
Expand Up @@ -3,7 +3,9 @@
import io.micronaut.context.env.Environment;
import lombok.extern.slf4j.Slf4j;
import org.codehaus.httpcache4j.uri.URIBuilder;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.akhq.AbstractTest;
import org.akhq.KafkaTestCluster;
Expand All @@ -15,6 +17,7 @@
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

Expand Down Expand Up @@ -125,6 +128,25 @@ public void consumeAvro() throws ExecutionException, InterruptedException {
avroRecord.ifPresent(record -> assertEquals("{\"id\":1,\"name\":\"WaWa\",\"breed\":\"ABYSSINIAN\"}", record.getValue()));
}

@Test
public void emptyTopic() throws ExecutionException, InterruptedException {
RecordRepository.Options options = new RecordRepository.Options(environment, KafkaTestCluster.CLUSTER_ID, KafkaTestCluster.TOPIC_TOBE_EMPTIED);
options.setSort(RecordRepository.Options.Sort.OLDEST);

repository.emptyTopic(options.clusterId, options.getTopic());
assertEquals(0, consumeAll(options) );
}

@Disabled("Method not ready yet")
@Test
public void emptyTopicByTimestamp() throws ExecutionException, InterruptedException {
RecordRepository.Options options = new RecordRepository.Options(environment, KafkaTestCluster.CLUSTER_ID, KafkaTestCluster.TOPIC_TOBE_EMPTIED);
options.setSort(RecordRepository.Options.Sort.OLDEST);

repository.emptyTopicByTimestamp(options.clusterId, options.getTopic(), System.currentTimeMillis());
assertEquals(0, consumeAll(options) );
}

private List<Record> consumeAllRecord(RecordRepository.Options options) throws ExecutionException, InterruptedException {
boolean hasNext = true;

Expand Down

0 comments on commit c42b6e9

Please sign in to comment.