Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Peek code review #646

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
package com.slack.kaldb.clusterManager;

import com.google.common.util.concurrent.AbstractScheduledService;
import com.slack.kaldb.metadata.cache.CacheSlotMetadataStore;
import com.slack.kaldb.metadata.hpa.HpaMetricMetadata;
import com.slack.kaldb.metadata.hpa.HpaMetricMetadataStore;
import com.slack.kaldb.metadata.replica.ReplicaMetadata;
import com.slack.kaldb.metadata.replica.ReplicaMetadataStore;
import com.slack.kaldb.proto.metadata.Metadata;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* The Cluster HPA metrics service is intended to be run on the manager node, and is used for making
* centralized, application-aware decisions that can be used to inform a Kubernetes horizontal pod
* autoscaler (HPA).
*
* @see <a
* href="https://kubernetes.io/docs/tasks/run-application/horizontal-pod-autoscale/">Kubernetes
* HPA</a>
*/
public class ClusterHpaMetricService extends AbstractScheduledService {
private static final Logger LOG = LoggerFactory.getLogger(ClusterHpaMetricService.class);

// todo - consider making over-provision and lock duration configurable values
protected int CACHE_SLOT_OVER_PROVISION = 1000;
protected Duration CACHE_SCALEDOWN_LOCK = Duration.of(15, ChronoUnit.MINUTES);

private final ReplicaMetadataStore replicaMetadataStore;
private final CacheSlotMetadataStore cacheSlotMetadataStore;
private final HpaMetricMetadataStore hpaMetricMetadataStore;
protected final Map<String, Instant> cacheScalingLock = new ConcurrentHashMap<>();
protected static final String CACHE_HPA_METRIC_NAME = "hpa_cache_demand_factor_%s";

public ClusterHpaMetricService(
ReplicaMetadataStore replicaMetadataStore,
CacheSlotMetadataStore cacheSlotMetadataStore,
HpaMetricMetadataStore hpaMetricMetadataStore) {
this.replicaMetadataStore = replicaMetadataStore;
this.cacheSlotMetadataStore = cacheSlotMetadataStore;
this.hpaMetricMetadataStore = hpaMetricMetadataStore;
}

@Override
protected Scheduler scheduler() {
return Scheduler.newFixedRateSchedule(Duration.ofSeconds(15), Duration.ofSeconds(30));
}

@Override
protected void runOneIteration() {
LOG.info("Running ClusterHpaMetricService");
try {
publishCacheHpaMetrics();
} catch (Exception e) {
LOG.error("Error running ClusterHpaMetricService", e);
}
}

/**
* Calculates and publishes HPA scaling metric(s) to Zookeeper for the Cache nodes. This makes use
* of the Kubernetes HPA formula, which is:
*
* <p>desiredReplicas = ceil[currentReplicas * ( currentMetricValue / desiredMetricValue )]
*
* <p>Using this formula, if we inform the user what value to set for the desiredMetricValue, we
* can vary the currentMetricValue to control the scale of the cluster. This is accomplished by
* producing a metric (hpa_cache_demand_factor_REPLICASET) that targets a value of 1.0. As long as
* the HPA is configured to target a value of 1.0 this will result in an optimized cluster size
* based on cache slots vs replicas, plus a small buffer.
*
* <pre>
* metrics:
* - type: Pods
* pods:
* metric:
* name: hpa_cache_demand_factor_REPLICASET
* target:
* type: AverageValue
* averageValue: 1.0
* </pre>
*/
private void publishCacheHpaMetrics() {
Set<String> replicaSets =
replicaMetadataStore.listSync().stream()
.map(ReplicaMetadata::getReplicaSet)
.collect(Collectors.toSet());

for (String replicaSet : replicaSets) {
long totalCacheSlotCapacity =
cacheSlotMetadataStore.listSync().stream()
.filter(cacheSlotMetadata -> cacheSlotMetadata.replicaSet.equals(replicaSet))
.count();
long totalReplicaDemand =
replicaMetadataStore.listSync().stream()
.filter(replicaMetadata -> replicaMetadata.getReplicaSet().equals(replicaSet))
.count();
double rawDemandFactor =
(double) (totalReplicaDemand + CACHE_SLOT_OVER_PROVISION) / (totalCacheSlotCapacity + 1);
double demandFactor = (double) Math.round(rawDemandFactor * 100) / 100;
LOG.info(
"Cache autoscaler for replicaSet '{}' calculated a demandFactor of '{}' - totalReplicaDemand: '{}', cacheSlotOverProvision: '{}', totalCacheSlotCapacity: '{}'",
replicaSet,
demandFactor,
totalReplicaDemand,
CACHE_SLOT_OVER_PROVISION,
totalCacheSlotCapacity);

if (demandFactor >= 1.0) {
// Publish a scale-up metric
persistCacheConfig(replicaSet, demandFactor);
LOG.debug("Publishing scale-up request for replicaset '{}'", replicaSet);
} else {
if (tryCacheReplicasetLock(replicaSet)) {
// Publish a scale-down metric
persistCacheConfig(replicaSet, demandFactor);
LOG.debug("Acquired scale-down lock for replicaSet '{}'", replicaSet);
} else {
// Publish a no-op scale metric (0) to disable the HPA from applying
// https://kubernetes.io/docs/tasks/run-application/horizontal-pod-autoscale/#implicit-maintenance-mode-deactivation
persistCacheConfig(replicaSet, 0.0);
LOG.debug("Unable to acquire scale-down lock for replicaSet '{}'", replicaSet);
}
}
}
}

/** Updates or inserts an (ephemeral) HPA metric for the cache nodes. This is NOT threadsafe. */
private void persistCacheConfig(String replicaSet, Double demandFactor) {
String key = String.format(CACHE_HPA_METRIC_NAME, replicaSet);
if (hpaMetricMetadataStore.hasSync(key)) {
hpaMetricMetadataStore.updateSync(
new HpaMetricMetadata(key, Metadata.HpaMetricMetadata.NodeRole.CACHE, demandFactor));
} else {
hpaMetricMetadataStore.createSync(
new HpaMetricMetadata(key, Metadata.HpaMetricMetadata.NodeRole.CACHE, demandFactor));
}
}

/**
* Either acquires or refreshes an existing time-based lock for the given replicaset. Used to
* prevent scale-down operations from happening to quickly between replicasets, causing issues
* with re-balancing.
*/
protected boolean tryCacheReplicasetLock(String replicaset) {
Optional<Instant> lastOtherScaleOperation =
cacheScalingLock.entrySet().stream()
.filter(entry -> !Objects.equals(entry.getKey(), replicaset))
.map(Map.Entry::getValue)
.max(Instant::compareTo);

// if another replicaset was scaled down in the last CACHE_SCALEDOWN_LOCK mins, prevent this one
// from scaling
if (lastOtherScaleOperation.isPresent()) {
if (!lastOtherScaleOperation.get().isBefore(Instant.now().minus(CACHE_SCALEDOWN_LOCK))) {
return false;
}
}

// update the last-updated lock time to now
cacheScalingLock.put(replicaset, Instant.now());
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import static com.slack.kaldb.util.ArgValidationUtils.ensureTrue;

import com.slack.kaldb.proto.config.KaldbConfigs;
import com.slack.kaldb.util.RuntimeHalterImpl;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import org.apache.curator.RetryPolicy;
Expand Down Expand Up @@ -70,6 +71,7 @@ public static AsyncCuratorFramework build(
&& curatorEvent.getWatchedEvent().getState()
== Watcher.Event.KeeperState.Expired) {
LOG.warn("The ZK session has expired {}.", curatorEvent);
new RuntimeHalterImpl().handleFatal(new Throwable("ZK session expired."));
}
});
curator.start();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package com.slack.kaldb.metadata.hpa;

import com.slack.kaldb.metadata.core.KaldbMetadata;
import com.slack.kaldb.proto.metadata.Metadata;

/**
* HPA (horizontal pod autoscaler) metrics are calculated by the manager, and then stored in ZK so
* that each node can individually locally their scaling metrics. This allows use of an HPA while
* still centralizing the decision-making.
*/
public class HpaMetricMetadata extends KaldbMetadata {
public Metadata.HpaMetricMetadata.NodeRole nodeRole;
public Double value;

public HpaMetricMetadata(
String name, Metadata.HpaMetricMetadata.NodeRole nodeRole, Double value) {
super(name);
this.nodeRole = nodeRole;
this.value = value;
}

public Metadata.HpaMetricMetadata.NodeRole getNodeRole() {
return nodeRole;
}

public void setNodeRole(Metadata.HpaMetricMetadata.NodeRole nodeRole) {
this.nodeRole = nodeRole;
}

public Double getValue() {
return value;
}

public void setValue(Double value) {
this.value = value;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof HpaMetricMetadata that)) return false;
if (!super.equals(o)) return false;

if (nodeRole != that.nodeRole) return false;
return value.equals(that.value);
}

@Override
public int hashCode() {
int result = super.hashCode();
result = 31 * result + nodeRole.hashCode();
result = 31 * result + value.hashCode();
return result;
}

@Override
public String toString() {
return "HpaMetricMetadata{"
+ "nodeRole="
+ nodeRole
+ ", value="
+ value
+ ", name='"
+ name
+ '\''
+ '}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package com.slack.kaldb.metadata.hpa;

import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.util.JsonFormat;
import com.slack.kaldb.metadata.core.MetadataSerializer;
import com.slack.kaldb.proto.metadata.Metadata;

public class HpaMetricMetadataSerializer implements MetadataSerializer<HpaMetricMetadata> {
private static HpaMetricMetadata fromAutoscalerMetadataProto(
Metadata.HpaMetricMetadata autoscalerMetadata) {
return new HpaMetricMetadata(
autoscalerMetadata.getName(),
autoscalerMetadata.getNodeRole(),
autoscalerMetadata.getValue());
}

private static Metadata.HpaMetricMetadata toAutoscalerMetadataProto(
HpaMetricMetadata hpaMetricMetadata) {

return Metadata.HpaMetricMetadata.newBuilder()
.setName(hpaMetricMetadata.name)
.setNodeRole(hpaMetricMetadata.nodeRole)
.setValue(hpaMetricMetadata.value)
.build();
}

@Override
public String toJsonStr(HpaMetricMetadata metadata) throws InvalidProtocolBufferException {
if (metadata == null) throw new IllegalArgumentException("metadata object can't be null");

return printer.print(toAutoscalerMetadataProto(metadata));
}

@Override
public HpaMetricMetadata fromJsonStr(String data) throws InvalidProtocolBufferException {
Metadata.HpaMetricMetadata.Builder autoscalerMetadataBuilder =
Metadata.HpaMetricMetadata.newBuilder();
JsonFormat.parser().ignoringUnknownFields().merge(data, autoscalerMetadataBuilder);
return fromAutoscalerMetadataProto(autoscalerMetadataBuilder.build());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.slack.kaldb.metadata.hpa;

import com.slack.kaldb.metadata.core.KaldbMetadataStore;
import org.apache.curator.x.async.AsyncCuratorFramework;
import org.apache.zookeeper.CreateMode;

public class HpaMetricMetadataStore extends KaldbMetadataStore<HpaMetricMetadata> {
public static final String AUTOSCALER_METADATA_STORE_ZK_PATH = "/hpa_metrics";

public HpaMetricMetadataStore(AsyncCuratorFramework curator, boolean shouldCache) {
super(
curator,
CreateMode.EPHEMERAL,
shouldCache,
new HpaMetricMetadataSerializer().toModelSerializer(),
AUTOSCALER_METADATA_STORE_ZK_PATH);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package com.slack.kaldb.server;

import com.google.common.util.concurrent.AbstractIdleService;
import com.slack.kaldb.metadata.core.KaldbMetadataStoreChangeListener;
import com.slack.kaldb.metadata.hpa.HpaMetricMetadata;
import com.slack.kaldb.metadata.hpa.HpaMetricMetadataStore;
import com.slack.kaldb.proto.metadata.Metadata;
import io.micrometer.core.instrument.MeterRegistry;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This service reads stored HPA (horizontal pod autoscaler) metrics from Zookeeper as calculated by
* the manager node, and then reports these as pod-level metrics.
*/
public class HpaMetricPublisherService extends AbstractIdleService {
private static final Logger LOG = LoggerFactory.getLogger(HpaMetricPublisherService.class);
private final HpaMetricMetadataStore hpaMetricMetadataStore;
private final Metadata.HpaMetricMetadata.NodeRole nodeRole;
private final MeterRegistry meterRegistry;
private final KaldbMetadataStoreChangeListener<HpaMetricMetadata> listener = changeListener();

public HpaMetricPublisherService(
HpaMetricMetadataStore hpaMetricMetadataStore,
MeterRegistry meterRegistry,
Metadata.HpaMetricMetadata.NodeRole nodeRole) {
this.hpaMetricMetadataStore = hpaMetricMetadataStore;
this.nodeRole = nodeRole;
this.meterRegistry = meterRegistry;
}

private KaldbMetadataStoreChangeListener<HpaMetricMetadata> changeListener() {
return metadata -> {
if (metadata.getNodeRole().equals(nodeRole)) {
meterRegistry.gauge(
metadata.getName(),
hpaMetricMetadataStore,
store -> {
Optional<HpaMetricMetadata> metric =
store.listSync().stream()
.filter(m -> m.getName().equals(metadata.getName()))
.findFirst();
if (metric.isPresent()) {
return metric.get().getValue();
} else {
// store no longer has this metric - report a 0
// https://kubernetes.io/docs/tasks/run-application/horizontal-pod-autoscale/#implicit-maintenance-mode-deactivation
return 0;
}
});
}
};
}

@Override
protected void startUp() throws Exception {
LOG.info("Starting autoscaler publisher service");
hpaMetricMetadataStore.addListener(listener);
}

@Override
protected void shutDown() throws Exception {
LOG.info("Stopping autoscaler publisher service");
hpaMetricMetadataStore.removeListener(listener);
}
}