Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

hpa-controller supports custom metrics #3673

Merged
merged 1 commit into from
Jun 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 121 additions & 3 deletions pkg/controllers/federatedhpa/federatedhpa_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -561,7 +561,7 @@ func (c *FederatedHPAController) buildPodInformerForCluster(clusterScaleClient *
return singleClusterInformerManager, nil
}

// L565-L1292 is partly lifted from https://github.com/kubernetes/kubernetes/blob/d8e9a7b33a25e97b0880558fc94318a5d2fb3664/pkg/controller/podautoscaler/horizontal.go.
// L565-L1410 is partly lifted from https://github.com/kubernetes/kubernetes/blob/release-1.27/pkg/controller/podautoscaler/horizontal.go.
// computeReplicasForMetrics computes the desired number of replicas for the metric specifications listed in the HPA,
// returning the maximum of the computed replica counts, a description of the associated metric, and the statuses of
// all metrics computed.
Expand All @@ -576,6 +576,7 @@ func (c *FederatedHPAController) computeReplicasForMetrics(ctx context.Context,
}

specReplicas := templateReplicas
statusReplicas := scale.Status.Replicas
calibration := float64(scale.Spec.Replicas) / float64(specReplicas)
statuses = make([]autoscalingv2.MetricStatus, len(metricSpecs))

Expand All @@ -584,7 +585,7 @@ func (c *FederatedHPAController) computeReplicasForMetrics(ctx context.Context,
var invalidMetricCondition autoscalingv2.HorizontalPodAutoscalerCondition

for i, metricSpec := range metricSpecs {
replicaCountProposal, metricNameProposal, timestampProposal, condition, err := c.computeReplicasForMetric(ctx, hpa, metricSpec, specReplicas, 0, selector, &statuses[i], podList, calibration)
replicaCountProposal, metricNameProposal, timestampProposal, condition, err := c.computeReplicasForMetric(ctx, hpa, metricSpec, specReplicas, statusReplicas, selector, &statuses[i], podList, calibration)

if err != nil {
if invalidMetricsCount <= 0 {
Expand Down Expand Up @@ -678,6 +679,8 @@ func (c *FederatedHPAController) validateAndParseSelector(hpa *autoscalingv1alph

// Computes the desired number of replicas for a specific hpa and metric specification,
// returning the metric status and a proposed condition to be set on the HPA object.
//
//nolint:gocyclo
func (c *FederatedHPAController) computeReplicasForMetric(ctx context.Context, hpa *autoscalingv1alpha1.FederatedHPA, spec autoscalingv2.MetricSpec,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Too many parameters on this method, please open an issue to optimize it and remove nolint:gocyclo.

specReplicas, statusReplicas int32, selector labels.Selector, status *autoscalingv2.MetricStatus, podList []*corev1.Pod, calibration float64) (replicaCountProposal int32, metricNameProposal string,
timestampProposal time.Time, condition autoscalingv2.HorizontalPodAutoscalerCondition, err error) {
Expand Down Expand Up @@ -706,12 +709,36 @@ func (c *FederatedHPAController) computeReplicasForMetric(ctx context.Context, h
}()

switch spec.Type {
// TODO(Poor12): only support resource metrics source type now
case autoscalingv2.ObjectMetricSourceType:
metricSelector, err := metav1.LabelSelectorAsSelector(spec.Object.Metric.Selector)
if err != nil {
condition := c.getUnableComputeReplicaCountCondition(hpa, "FailedGetObjectMetric", err)
return 0, "", time.Time{}, condition, fmt.Errorf("failed to get object metric value: %v", err)
}
replicaCountProposal, timestampProposal, metricNameProposal, condition, err = c.computeStatusForObjectMetric(specReplicas, statusReplicas, spec, hpa, status, metricSelector, podList, calibration)
if err != nil {
return 0, "", time.Time{}, condition, fmt.Errorf("failed to get object metric value: %v", err)
}
case autoscalingv2.PodsMetricSourceType:
metricSelector, err := metav1.LabelSelectorAsSelector(spec.Pods.Metric.Selector)
if err != nil {
condition := c.getUnableComputeReplicaCountCondition(hpa, "FailedGetPodsMetric", err)
return 0, "", time.Time{}, condition, fmt.Errorf("failed to get pods metric value: %v", err)
}
replicaCountProposal, timestampProposal, metricNameProposal, condition, err = c.computeStatusForPodsMetric(specReplicas, spec, hpa, selector, status, metricSelector, podList, calibration)
if err != nil {
return 0, "", time.Time{}, condition, fmt.Errorf("failed to get pods metric value: %v", err)
}
case autoscalingv2.ResourceMetricSourceType:
replicaCountProposal, timestampProposal, metricNameProposal, condition, err = c.computeStatusForResourceMetric(ctx, specReplicas, spec, hpa, selector, status, podList, calibration)
if err != nil {
return 0, "", time.Time{}, condition, fmt.Errorf("failed to get %s resource metric value: %v", spec.Resource.Name, err)
}
case autoscalingv2.ContainerResourceMetricSourceType:
replicaCountProposal, timestampProposal, metricNameProposal, condition, err = c.computeStatusForContainerResourceMetric(ctx, specReplicas, spec, hpa, selector, status, podList, calibration)
if err != nil {
return 0, "", time.Time{}, condition, fmt.Errorf("failed to get %s container metric value: %v", spec.ContainerResource.Container, err)
}
default:
// It shouldn't reach here as invalid metric source type is filtered out in the api-server's validation.
err = fmt.Errorf("unknown metric source type %q%w", string(spec.Type), errSpec)
Expand All @@ -721,6 +748,77 @@ func (c *FederatedHPAController) computeReplicasForMetric(ctx context.Context, h
return replicaCountProposal, metricNameProposal, timestampProposal, autoscalingv2.HorizontalPodAutoscalerCondition{}, nil
}

// computeStatusForObjectMetric computes the desired number of replicas for the specified metric of type ObjectMetricSourceType.
func (c *FederatedHPAController) computeStatusForObjectMetric(specReplicas, statusReplicas int32, metricSpec autoscalingv2.MetricSpec, hpa *autoscalingv1alpha1.FederatedHPA, status *autoscalingv2.MetricStatus, metricSelector labels.Selector, podList []*corev1.Pod, calibration float64) (replicas int32, timestamp time.Time, metricName string, condition autoscalingv2.HorizontalPodAutoscalerCondition, err error) {
if metricSpec.Object.Target.Type == autoscalingv2.ValueMetricType {
replicaCountProposal, usageProposal, timestampProposal, err := c.ReplicaCalc.GetObjectMetricReplicas(specReplicas, metricSpec.Object.Target.Value.MilliValue(), metricSpec.Object.Metric.Name, hpa.Namespace, &metricSpec.Object.DescribedObject, metricSelector, podList, calibration)
if err != nil {
condition := c.getUnableComputeReplicaCountCondition(hpa, "FailedGetObjectMetric", err)
return 0, timestampProposal, "", condition, err
}
*status = autoscalingv2.MetricStatus{
Type: autoscalingv2.ObjectMetricSourceType,
Object: &autoscalingv2.ObjectMetricStatus{
DescribedObject: metricSpec.Object.DescribedObject,
Metric: autoscalingv2.MetricIdentifier{
Name: metricSpec.Object.Metric.Name,
Selector: metricSpec.Object.Metric.Selector,
},
Current: autoscalingv2.MetricValueStatus{
Value: resource.NewMilliQuantity(usageProposal, resource.DecimalSI),
},
},
}
return replicaCountProposal, timestampProposal, fmt.Sprintf("%s metric %s", metricSpec.Object.DescribedObject.Kind, metricSpec.Object.Metric.Name), autoscalingv2.HorizontalPodAutoscalerCondition{}, nil
} else if metricSpec.Object.Target.Type == autoscalingv2.AverageValueMetricType {
replicaCountProposal, usageProposal, timestampProposal, err := c.ReplicaCalc.GetObjectPerPodMetricReplicas(statusReplicas, metricSpec.Object.Target.AverageValue.MilliValue(), metricSpec.Object.Metric.Name, hpa.Namespace, &metricSpec.Object.DescribedObject, metricSelector, calibration)
if err != nil {
condition := c.getUnableComputeReplicaCountCondition(hpa, "FailedGetObjectMetric", err)
return 0, time.Time{}, "", condition, fmt.Errorf("failed to get %s object metric: %v", metricSpec.Object.Metric.Name, err)
}
*status = autoscalingv2.MetricStatus{
Type: autoscalingv2.ObjectMetricSourceType,
Object: &autoscalingv2.ObjectMetricStatus{
Metric: autoscalingv2.MetricIdentifier{
Name: metricSpec.Object.Metric.Name,
Selector: metricSpec.Object.Metric.Selector,
},
Current: autoscalingv2.MetricValueStatus{
AverageValue: resource.NewMilliQuantity(usageProposal, resource.DecimalSI),
},
},
}
return replicaCountProposal, timestampProposal, fmt.Sprintf("external metric %s(%+v)", metricSpec.Object.Metric.Name, metricSpec.Object.Metric.Selector), autoscalingv2.HorizontalPodAutoscalerCondition{}, nil
}
errMsg := "invalid object metric source: neither a value target nor an average value target was set"
err = fmt.Errorf(errMsg)
condition = c.getUnableComputeReplicaCountCondition(hpa, "FailedGetObjectMetric", err)
return 0, time.Time{}, "", condition, err
}

// computeStatusForPodsMetric computes the desired number of replicas for the specified metric of type PodsMetricSourceType.
func (c *FederatedHPAController) computeStatusForPodsMetric(currentReplicas int32, metricSpec autoscalingv2.MetricSpec, hpa *autoscalingv1alpha1.FederatedHPA, selector labels.Selector, status *autoscalingv2.MetricStatus, metricSelector labels.Selector, podList []*corev1.Pod, calibration float64) (replicaCountProposal int32, timestampProposal time.Time, metricNameProposal string, condition autoscalingv2.HorizontalPodAutoscalerCondition, err error) {
replicaCountProposal, usageProposal, timestampProposal, err := c.ReplicaCalc.GetMetricReplicas(currentReplicas, metricSpec.Pods.Target.AverageValue.MilliValue(), metricSpec.Pods.Metric.Name, hpa.Namespace, selector, metricSelector, podList, calibration)
if err != nil {
condition = c.getUnableComputeReplicaCountCondition(hpa, "FailedGetPodsMetric", err)
return 0, timestampProposal, "", condition, err
}
*status = autoscalingv2.MetricStatus{
Type: autoscalingv2.PodsMetricSourceType,
Pods: &autoscalingv2.PodsMetricStatus{
Metric: autoscalingv2.MetricIdentifier{
Name: metricSpec.Pods.Metric.Name,
Selector: metricSpec.Pods.Metric.Selector,
},
Current: autoscalingv2.MetricValueStatus{
AverageValue: resource.NewMilliQuantity(usageProposal, resource.DecimalSI),
},
},
}

return replicaCountProposal, timestampProposal, fmt.Sprintf("pods metric %s", metricSpec.Pods.Metric.Name), autoscalingv2.HorizontalPodAutoscalerCondition{}, nil
}

func (c *FederatedHPAController) computeStatusForResourceMetricGeneric(ctx context.Context, currentReplicas int32, target autoscalingv2.MetricTarget,
resourceName corev1.ResourceName, namespace string, container string, selector labels.Selector, sourceType autoscalingv2.MetricSourceType, podList []*corev1.Pod, calibration float64) (replicaCountProposal int32,
metricStatus *autoscalingv2.MetricValueStatus, timestampProposal time.Time, metricNameProposal string,
Expand Down Expand Up @@ -779,6 +877,26 @@ func (c *FederatedHPAController) computeStatusForResourceMetric(ctx context.Cont
return replicaCountProposal, timestampProposal, metricNameProposal, condition, nil
}

// computeStatusForContainerResourceMetric computes the desired number of replicas for the specified metric of type ResourceMetricSourceType.
func (c *FederatedHPAController) computeStatusForContainerResourceMetric(ctx context.Context, currentReplicas int32, metricSpec autoscalingv2.MetricSpec, hpa *autoscalingv1alpha1.FederatedHPA,
selector labels.Selector, status *autoscalingv2.MetricStatus, podList []*corev1.Pod, calibration float64) (replicaCountProposal int32, timestampProposal time.Time,
metricNameProposal string, condition autoscalingv2.HorizontalPodAutoscalerCondition, err error) {
replicaCountProposal, metricValueStatus, timestampProposal, metricNameProposal, condition, err := c.computeStatusForResourceMetricGeneric(ctx, currentReplicas, metricSpec.ContainerResource.Target, metricSpec.ContainerResource.Name, hpa.Namespace, metricSpec.ContainerResource.Container, selector, autoscalingv2.ContainerResourceMetricSourceType, podList, calibration)
if err != nil {
condition = c.getUnableComputeReplicaCountCondition(hpa, "FailedGetContainerResourceMetric", err)
return replicaCountProposal, timestampProposal, metricNameProposal, condition, err
}
*status = autoscalingv2.MetricStatus{
Type: autoscalingv2.ContainerResourceMetricSourceType,
ContainerResource: &autoscalingv2.ContainerResourceMetricStatus{
Name: metricSpec.ContainerResource.Name,
Container: metricSpec.ContainerResource.Container,
Current: *metricValueStatus,
},
}
return replicaCountProposal, timestampProposal, metricNameProposal, condition, nil
}

func (c *FederatedHPAController) recordInitialRecommendation(currentReplicas int32, key string) {
c.recommendationsLock.Lock()
defer c.recommendationsLock.Unlock()
Expand Down
87 changes: 87 additions & 0 deletions pkg/controllers/federatedhpa/replica_calculator.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"math"
"time"

autoscalingv2 "k8s.io/api/autoscaling/v2"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/sets"
Expand Down Expand Up @@ -139,6 +140,19 @@ func (c *ReplicaCalculator) GetRawResourceReplicas(ctx context.Context, currentR
return replicaCount, usage, timestamp, err
}

// GetMetricReplicas calculates the desired replica count based on a target metric usage
// (as a milli-value) for pods matching the given selector in the given namespace, and the
// current replica count
func (c *ReplicaCalculator) GetMetricReplicas(currentReplicas int32, targetUsage int64, metricName string, namespace string, selector labels.Selector, metricSelector labels.Selector, podList []*corev1.Pod, calibration float64) (replicaCount int32, usage int64, timestamp time.Time, err error) {
metrics, timestamp, err := c.metricsClient.GetRawMetric(metricName, namespace, selector, metricSelector)
if err != nil {
return 0, 0, time.Time{}, fmt.Errorf("unable to get metric %s: %v", metricName, err)
}

replicaCount, usage, err = c.calcPlainMetricReplicas(metrics, currentReplicas, targetUsage, corev1.ResourceName(""), podList, calibration)
return replicaCount, usage, timestamp, err
}

// calcPlainMetricReplicas calculates the desired replicas for plain (i.e. non-utilization percentage) metrics.
//
//nolint:gocyclo
Expand Down Expand Up @@ -210,6 +224,79 @@ func (c *ReplicaCalculator) calcPlainMetricReplicas(metrics metricsclient.PodMet
return newReplicas, usage, nil
}

// GetObjectMetricReplicas calculates the desired replica count based on a target metric usage (as a milli-value)
// for the given object in the given namespace, and the current replica count.
func (c *ReplicaCalculator) GetObjectMetricReplicas(currentReplicas int32, targetUsage int64, metricName string, namespace string, objectRef *autoscalingv2.CrossVersionObjectReference, metricSelector labels.Selector, podList []*corev1.Pod, calibration float64) (replicaCount int32, usage int64, timestamp time.Time, err error) {
usage, _, err = c.metricsClient.GetObjectMetric(metricName, namespace, objectRef, metricSelector)
if err != nil {
return 0, 0, time.Time{}, fmt.Errorf("unable to get metric %s: %v on %s %s/%s", metricName, objectRef.Kind, namespace, objectRef.Name, err)
}

usageRatio := float64(usage) / float64(targetUsage)
replicaCount, timestamp, err = c.getUsageRatioReplicaCount(currentReplicas, usageRatio, podList, calibration)
return replicaCount, usage, timestamp, err
}

// GetObjectPerPodMetricReplicas calculates the desired replica count based on a target metric usage (as a milli-value)
// for the given object in the given namespace, and the current replica count.
func (c *ReplicaCalculator) GetObjectPerPodMetricReplicas(statusReplicas int32, targetAverageUsage int64, metricName string, namespace string, objectRef *autoscalingv2.CrossVersionObjectReference, metricSelector labels.Selector, calibration float64) (replicaCount int32, usage int64, timestamp time.Time, err error) {
// The usage here refers to the total value of all metrics from the pods.
usage, timestamp, err = c.metricsClient.GetObjectMetric(metricName, namespace, objectRef, metricSelector)
if err != nil {
return 0, 0, time.Time{}, fmt.Errorf("unable to get metric %s: %v on %s %s/%s", metricName, objectRef.Kind, namespace, objectRef.Name, err)
}

replicaCount = statusReplicas
usageRatio := float64(usage) / (float64(targetAverageUsage) * float64(replicaCount))
Poor12 marked this conversation as resolved.
Show resolved Hide resolved
if math.Abs(1.0-usageRatio) > c.tolerance {
// update number of replicas if change is large enough
Poor12 marked this conversation as resolved.
Show resolved Hide resolved
replicaCount = int32(math.Ceil(float64(usage) / float64(targetAverageUsage) / calibration))
}
usage = int64(math.Ceil(float64(usage) / float64(statusReplicas)))
Poor12 marked this conversation as resolved.
Show resolved Hide resolved
return int32(math.Ceil(float64(replicaCount) / calibration)), usage, timestamp, nil
}

// getUsageRatioReplicaCount calculates the desired replica count based on usageRatio and ready pods count.
// For currentReplicas=0 doesn't take into account ready pods count and tolerance to support scaling to zero pods.
func (c *ReplicaCalculator) getUsageRatioReplicaCount(currentReplicas int32, usageRatio float64, podList []*corev1.Pod, calibration float64) (replicaCount int32, timestamp time.Time, err error) {
if currentReplicas != 0 {
if math.Abs(1.0-usageRatio) <= c.tolerance {
// return the current replicas if the change would be too small
return currentReplicas, timestamp, nil
}
readyPodCount := int64(0)
readyPodCount, err = c.getReadyPodsCount(podList)
if err != nil {
return 0, time.Time{}, fmt.Errorf("unable to calculate ready pods: %s", err)
}
replicaCount = int32(math.Ceil(usageRatio * float64(readyPodCount) / calibration))
} else {
// Scale to zero or n pods depending on usageRatio
replicaCount = int32(math.Ceil(usageRatio))
}

return replicaCount, timestamp, err
}

// @TODO(mattjmcnaughton) Many different functions in this module use variations
// of this function. Make this function generic, so we don't repeat the same
// logic in multiple places.
func (c *ReplicaCalculator) getReadyPodsCount(podList []*corev1.Pod) (int64, error) {
if len(podList) == 0 {
return 0, fmt.Errorf("no pods returned by selector while calculating replica count")
}

readyPodCount := 0
Poor12 marked this conversation as resolved.
Show resolved Hide resolved

for _, pod := range podList {
if pod.Status.Phase == corev1.PodRunning && helper.IsPodReady(pod) {
readyPodCount++
}
}

return int64(readyPodCount), nil
}

func groupPods(pods []*corev1.Pod, metrics metricsclient.PodMetricsInfo, resource corev1.ResourceName, cpuInitializationPeriod, delayOfInitialReadinessStatus time.Duration) (readyPodCount int, unreadyPods, missingPods, ignoredPods sets.Set[string]) {
missingPods = sets.New[string]()
unreadyPods = sets.New[string]()
Expand Down
Loading