Skip to content

Commit

Permalink
refactor standby monitoring
Browse files Browse the repository at this point in the history
  • Loading branch information
eduwercamacaro committed May 17, 2024
1 parent 7b120ed commit 73ead9e
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -62,6 +63,12 @@ public int registeredPartitions() {
return partitions.size();
}

public Optional<StandbyTopicPartitionMetrics> lagInfoForPartition(int partition) {
return partitions.stream()
.filter(standbyTopicPartitionMetrics -> standbyTopicPartitionMetrics.getPartition() == partition)
.findFirst();
}

/**
* Suspend offset tracking for the topic partition
* @param topicPartition suspended partition
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@

import io.littlehorse.common.LHServerConfig;
import io.littlehorse.server.monitoring.StandbyStoresOnInstance;
import io.littlehorse.server.monitoring.StandbyTopicPartitionMetrics;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import lombok.Data;
import org.apache.kafka.common.TopicPartition;
Expand Down Expand Up @@ -73,24 +75,28 @@ public ServerHealthState(
this.coreStandbyTasks.addAll(coreStreams.metadataForLocalThreads().stream()
.flatMap(thread -> thread.standbyTasks().stream())
.filter(standbyTask -> fromTask(standbyTask, config) == LHProcessorType.CORE)
.map(standbyTask -> new StandbyTaskState(
standbyTask, restorations, config, standbyTasks.get(LHProcessorType.CORE.getStoreName())))
.filter(standbyTask -> !standbyTask.topicPartitions().isEmpty())
.map(standbyTask -> createStandbyState(LHProcessorType.CORE.getStoreName(), standbyTasks, standbyTask))
.filter(Optional::isPresent)
.map(Optional::get)
.toList());

this.repartitionStandbyTasks.addAll(coreStreams.metadataForLocalThreads().stream()
.flatMap(thread -> thread.standbyTasks().stream())
.filter(standbyTask -> fromTask(standbyTask, config) == LHProcessorType.REPARTITION)
.map(standbyTask -> new StandbyTaskState(
standbyTask,
restorations,
config,
standbyTasks.get(LHProcessorType.REPARTITION.getStoreName())))
.filter(standbyTask -> !standbyTask.topicPartitions().isEmpty())
.map(standbyTask ->
createStandbyState(LHProcessorType.REPARTITION.getStoreName(), standbyTasks, standbyTask))
.filter(Optional::isPresent)
.map(Optional::get)
.toList());

this.timerStandbyTasks.addAll(timerStreams.metadataForLocalThreads().stream()
.flatMap(thread -> thread.standbyTasks().stream())
.map(timerTask -> new StandbyTaskState(
timerTask, restorations, config, standbyTasks.get(LHProcessorType.TIMER.getStoreName())))
.filter(timerTask -> !timerTask.topicPartitions().isEmpty())
.map(timerTask -> createStandbyState(LHProcessorType.TIMER.getStoreName(), standbyTasks, timerTask))
.filter(Optional::isPresent)
.map(Optional::get)
.toList());

this.coreState = coreStreams.state();
Expand All @@ -105,6 +111,20 @@ public static LHProcessorType fromTask(TaskMetadata task, LHServerConfig config)
return fromTopic(topics.stream().findFirst().get().topic(), config);
}

private Optional<StandbyTaskState> createStandbyState(
String storeName, Map<String, StandbyStoresOnInstance> standbyTasks, TaskMetadata metadata) {
StandbyStoresOnInstance registeredStandbyTasks = standbyTasks.get(storeName);
StandbyTopicPartitionMetrics lagInfo = registeredStandbyTasks
.lagInfoForPartition(
metadata.topicPartitions().stream().findFirst().get().partition())
.orElse(null);
if (lagInfo == null) {
return Optional.of(new StandbyTaskState(metadata, lagInfo));
} else {
return Optional.empty();
}
}

public static LHProcessorType fromTopic(String topic, LHServerConfig config) {
String truncated = topic.substring(config.getLHClusterId().length());

Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
package io.littlehorse.server.monitoring.health;

import io.littlehorse.common.LHServerConfig;
import io.littlehorse.server.monitoring.StandbyStoresOnInstance;
import io.littlehorse.server.monitoring.StandbyTopicPartitionMetrics;
import java.util.Map;
import java.util.Set;
import lombok.Data;
import lombok.Getter;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.streams.TaskMetadata;

@Data
@Getter
public class StandbyTaskState {

private int partition;
Expand All @@ -18,20 +15,7 @@ public class StandbyTaskState {

public StandbyTaskState() {}

public StandbyTaskState(
TaskMetadata meta,
Map<TopicPartition, InProgressRestoration> restorations,
LHServerConfig config,
StandbyStoresOnInstance storeLagInfos) {

StandbyTopicPartitionMetrics storeLagInfo = storeLagInfos.getPartitions().stream()
.filter(lagInfo -> {
return lagInfo.getPartition()
== meta.topicPartitions().stream().findFirst().get().partition();
})
.findFirst()
.get();

public StandbyTaskState(TaskMetadata meta, StandbyTopicPartitionMetrics storeLagInfo) {
Set<TopicPartition> topics = meta.topicPartitions();
if (topics.size() != 1) {
throw new IllegalStateException("Impossible. All LH processors have only one input topic");
Expand Down

0 comments on commit 73ead9e

Please sign in to comment.