Skip to content

Commit

Permalink
feat(topic-data): use another scheduler for see
Browse files Browse the repository at this point in the history
also upgrade to last micronaut
  • Loading branch information
tchiotludo committed Nov 17, 2020
1 parent 19d59d4 commit 60a33bb
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 28 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ dependencies {
// micronaut
annotationProcessor "io.micronaut:micronaut-inject-java"
annotationProcessor "io.micronaut:micronaut-validation"
annotationProcessor 'io.micronaut.configuration:micronaut-openapi:1.5.2'
annotationProcessor 'io.micronaut.openapi:micronaut-openapi'
implementation "io.micronaut:micronaut-inject"
implementation "io.micronaut:micronaut-validation"
implementation "io.micronaut:micronaut-runtime"
Expand Down
3 changes: 0 additions & 3 deletions client/src/containers/Topic/TopicList/TopicList.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -198,12 +198,9 @@ class TopicList extends Root {
setState();
});

console.log(encodeURIComponent(topicsName))
this.getApi(uriTopicLastRecord(selectedCluster, encodeURIComponent(topicsName)))
.then(value => {
topics.forEach((topic) => {
console.log(topic.name)
console.log(tableTopics[topic.name]);
tableTopics[topic.name].lastWrite = value.data[topic.name] ? value.data[topic.name].timestamp : ''
});
setState();
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
micronautVersion=2.0.1
micronautVersion=2.1.3
confluentVersion=5.5.1
kafkaVersion=2.6.0
lombokVersion=1.18.16
5 changes: 5 additions & 0 deletions src/main/java/org/akhq/controllers/TailController.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@
import io.micronaut.http.annotation.Controller;
import io.micronaut.http.annotation.Get;
import io.micronaut.http.sse.Event;
import io.micronaut.scheduling.TaskExecutors;
import io.micronaut.scheduling.annotation.ExecuteOn;
import io.micronaut.security.annotation.Secured;
import io.reactivex.schedulers.Schedulers;
import io.swagger.v3.oas.annotations.Operation;
import lombok.EqualsAndHashCode;
import lombok.Getter;
Expand Down Expand Up @@ -33,6 +36,7 @@ public TailController(RecordRepository recordRepository) {
}

@Secured(Role.ROLE_TOPIC_DATA_READ)
@ExecuteOn(TaskExecutors.IO)
@Get(value = "api/{cluster}/tail/sse", produces = MediaType.TEXT_EVENT_STREAM)
@Operation(tags = {"topic data"}, summary = "Tail for data on multiple topic")
public Publisher<Event<TailRecord>> sse(
Expand All @@ -47,6 +51,7 @@ public Publisher<Event<TailRecord>> sse(

return recordRepository
.tail(cluster, options)
.observeOn(Schedulers.io())
.map(event -> {
TailRecord tailRecord = new TailRecord();
tailRecord.offsets = getOffsets(event);
Expand Down
7 changes: 6 additions & 1 deletion src/main/java/org/akhq/controllers/TopicController.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@
import io.micronaut.http.annotation.Post;
import io.micronaut.http.annotation.QueryValue;
import io.micronaut.http.sse.Event;
import io.micronaut.scheduling.TaskExecutors;
import io.micronaut.scheduling.annotation.ExecuteOn;
import io.micronaut.security.annotation.Secured;
import io.reactivex.schedulers.Schedulers;
import io.swagger.v3.oas.annotations.Operation;
import java.time.Instant;
import java.util.Base64;
Expand Down Expand Up @@ -305,6 +308,7 @@ public HttpResponse<?> delete(String cluster, String topicName) throws Execution
}

@Secured(Role.ROLE_TOPIC_DATA_READ)
@ExecuteOn(TaskExecutors.IO)
@Get(value = "api/{cluster}/topic/{topicName}/data/search/{search}", produces = MediaType.TEXT_EVENT_STREAM)
@Operation(tags = {"topic data"}, summary = "Search for data for a topic")
public Publisher<Event<SearchRecord>> sse(
Expand All @@ -315,7 +319,7 @@ public Publisher<Event<SearchRecord>> sse(
Optional<RecordRepository.Options.Sort> sort,
Optional<String> timestamp,
Optional<String> search
) throws ExecutionException, InterruptedException {
) {
RecordRepository.Options options = dataSearchOptions(
cluster,
topicName,
Expand All @@ -328,6 +332,7 @@ public Publisher<Event<SearchRecord>> sse(

return recordRepository
.search(cluster, options)
.observeOn(Schedulers.io())
.map(event -> {
SearchRecord searchRecord = new SearchRecord(
event.getData().getPercent(),
Expand Down
62 changes: 40 additions & 22 deletions src/main/java/org/akhq/repositories/RecordRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -540,36 +540,44 @@ public RecordMetadata delete(String clusterId, String topic, Integer partition,
)).get();
}

public Flowable<Event<SearchEvent>> search(String clusterId, Options options) throws ExecutionException, InterruptedException {
KafkaConsumer<byte[], byte[]> consumer = this.kafkaModule.getConsumer(options.clusterId);
Topic topic = topicRepository.findByName(clusterId, options.topic);
Map<TopicPartition, Long> partitions = getTopicPartitionForSortOldest(topic, options, consumer);

public Flowable<Event<SearchEvent>> search(String clusterId, Options options) {
AtomicInteger matchesCount = new AtomicInteger();

if (partitions.size() == 0) {
return Flowable.just(new SearchEvent(topic).end());
}
return Flowable.generate(() -> {
KafkaConsumer<byte[], byte[]> consumer = this.kafkaModule.getConsumer(options.clusterId);
Topic topic = topicRepository.findByName(clusterId, options.topic);
Map<TopicPartition, Long> partitions = getTopicPartitionForSortOldest(topic, options, consumer);

consumer.assign(partitions.keySet());
partitions.forEach(consumer::seek);
if (partitions.size() == 0) {
return new SearchState(consumer, null);
}

partitions.forEach((topicPartition, first) ->
log.trace(
"Search [topic: {}] [partition: {}] [start: {}]",
topicPartition.topic(),
topicPartition.partition(),
first
)
);
consumer.assign(partitions.keySet());
partitions.forEach(consumer::seek);

partitions.forEach((topicPartition, first) ->
log.trace(
"Search [topic: {}] [partition: {}] [start: {}]",
topicPartition.topic(),
topicPartition.partition(),
first
)
);

return new SearchState(consumer, new SearchEvent(topic));
}, (searchState, emitter) -> {
SearchEvent searchEvent = searchState.getSearchEvent();
KafkaConsumer<byte[], byte[]> consumer = searchState.getConsumer();

return Flowable.generate(() -> new SearchEvent(topic), (searchEvent, emitter) -> {
// end
if (searchEvent.emptyPoll == 666) {
if (searchEvent == null || searchEvent.emptyPoll == 666) {
Topic topic = topicRepository.findByName(clusterId, options.topic);

emitter.onNext(new SearchEvent(topic).end());
emitter.onComplete();
consumer.close();

return searchEvent;
return new SearchState(consumer, searchEvent);
}

SearchEvent currentEvent = new SearchEvent(searchEvent);
Expand Down Expand Up @@ -614,7 +622,7 @@ public Flowable<Event<SearchEvent>> search(String clusterId, Options options) th
emitter.onNext(currentEvent.progress(options));
}

return currentEvent;
return new SearchState(consumer, currentEvent);
});
}

Expand Down Expand Up @@ -849,6 +857,16 @@ public static class TailState {
private TailEvent tailEvent;
}

@ToString
@EqualsAndHashCode
@Getter
@AllArgsConstructor
public static class SearchState {
private final KafkaConsumer<byte[], byte[]> consumer;
private final SearchEvent searchEvent;
}


@ToString
@EqualsAndHashCode
@Getter
Expand Down

0 comments on commit 60a33bb

Please sign in to comment.