From eeab59c297266ed2a10cc3a920277e8fe3f6d8c2 Mon Sep 17 00:00:00 2001 From: Poor12 Date: Wed, 7 Jun 2023 16:20:43 +0800 Subject: [PATCH] add custom metrics Signed-off-by: Poor12 --- .../federatedhpa/federatedhpa_controller.go | 124 +++++++++++++++++- .../federatedhpa/replica_calculator.go | 87 ++++++++++++ 2 files changed, 208 insertions(+), 3 deletions(-) diff --git a/pkg/controllers/federatedhpa/federatedhpa_controller.go b/pkg/controllers/federatedhpa/federatedhpa_controller.go index f4ea553d162e..04d24afad2f3 100644 --- a/pkg/controllers/federatedhpa/federatedhpa_controller.go +++ b/pkg/controllers/federatedhpa/federatedhpa_controller.go @@ -561,7 +561,7 @@ func (c *FederatedHPAController) buildPodInformerForCluster(clusterScaleClient * return singleClusterInformerManager, nil } -// L565-L1292 is partly lifted from https://github.com/kubernetes/kubernetes/blob/d8e9a7b33a25e97b0880558fc94318a5d2fb3664/pkg/controller/podautoscaler/horizontal.go. +// L565-L1410 is partly lifted from https://github.com/kubernetes/kubernetes/blob/release-1.27/pkg/controller/podautoscaler/horizontal.go. // computeReplicasForMetrics computes the desired number of replicas for the metric specifications listed in the HPA, // returning the maximum of the computed replica counts, a description of the associated metric, and the statuses of // all metrics computed. @@ -576,6 +576,7 @@ func (c *FederatedHPAController) computeReplicasForMetrics(ctx context.Context, } specReplicas := templateReplicas + statusReplicas := scale.Status.Replicas calibration := float64(scale.Spec.Replicas) / float64(specReplicas) statuses = make([]autoscalingv2.MetricStatus, len(metricSpecs)) @@ -584,7 +585,7 @@ func (c *FederatedHPAController) computeReplicasForMetrics(ctx context.Context, var invalidMetricCondition autoscalingv2.HorizontalPodAutoscalerCondition for i, metricSpec := range metricSpecs { - replicaCountProposal, metricNameProposal, timestampProposal, condition, err := c.computeReplicasForMetric(ctx, hpa, metricSpec, specReplicas, 0, selector, &statuses[i], podList, calibration) + replicaCountProposal, metricNameProposal, timestampProposal, condition, err := c.computeReplicasForMetric(ctx, hpa, metricSpec, specReplicas, statusReplicas, selector, &statuses[i], podList, calibration) if err != nil { if invalidMetricsCount <= 0 { @@ -678,6 +679,8 @@ func (c *FederatedHPAController) validateAndParseSelector(hpa *autoscalingv1alph // Computes the desired number of replicas for a specific hpa and metric specification, // returning the metric status and a proposed condition to be set on the HPA object. +// +//nolint:gocyclo func (c *FederatedHPAController) computeReplicasForMetric(ctx context.Context, hpa *autoscalingv1alpha1.FederatedHPA, spec autoscalingv2.MetricSpec, specReplicas, statusReplicas int32, selector labels.Selector, status *autoscalingv2.MetricStatus, podList []*corev1.Pod, calibration float64) (replicaCountProposal int32, metricNameProposal string, timestampProposal time.Time, condition autoscalingv2.HorizontalPodAutoscalerCondition, err error) { @@ -706,12 +709,36 @@ func (c *FederatedHPAController) computeReplicasForMetric(ctx context.Context, h }() switch spec.Type { - // TODO(Poor12): only support resource metrics source type now + case autoscalingv2.ObjectMetricSourceType: + metricSelector, err := metav1.LabelSelectorAsSelector(spec.Object.Metric.Selector) + if err != nil { + condition := c.getUnableComputeReplicaCountCondition(hpa, "FailedGetObjectMetric", err) + return 0, "", time.Time{}, condition, fmt.Errorf("failed to get object metric value: %v", err) + } + replicaCountProposal, timestampProposal, metricNameProposal, condition, err = c.computeStatusForObjectMetric(specReplicas, statusReplicas, spec, hpa, status, metricSelector, podList, calibration) + if err != nil { + return 0, "", time.Time{}, condition, fmt.Errorf("failed to get object metric value: %v", err) + } + case autoscalingv2.PodsMetricSourceType: + metricSelector, err := metav1.LabelSelectorAsSelector(spec.Pods.Metric.Selector) + if err != nil { + condition := c.getUnableComputeReplicaCountCondition(hpa, "FailedGetPodsMetric", err) + return 0, "", time.Time{}, condition, fmt.Errorf("failed to get pods metric value: %v", err) + } + replicaCountProposal, timestampProposal, metricNameProposal, condition, err = c.computeStatusForPodsMetric(specReplicas, spec, hpa, selector, status, metricSelector, podList, calibration) + if err != nil { + return 0, "", time.Time{}, condition, fmt.Errorf("failed to get pods metric value: %v", err) + } case autoscalingv2.ResourceMetricSourceType: replicaCountProposal, timestampProposal, metricNameProposal, condition, err = c.computeStatusForResourceMetric(ctx, specReplicas, spec, hpa, selector, status, podList, calibration) if err != nil { return 0, "", time.Time{}, condition, fmt.Errorf("failed to get %s resource metric value: %v", spec.Resource.Name, err) } + case autoscalingv2.ContainerResourceMetricSourceType: + replicaCountProposal, timestampProposal, metricNameProposal, condition, err = c.computeStatusForContainerResourceMetric(ctx, specReplicas, spec, hpa, selector, status, podList, calibration) + if err != nil { + return 0, "", time.Time{}, condition, fmt.Errorf("failed to get %s container metric value: %v", spec.ContainerResource.Container, err) + } default: // It shouldn't reach here as invalid metric source type is filtered out in the api-server's validation. err = fmt.Errorf("unknown metric source type %q%w", string(spec.Type), errSpec) @@ -721,6 +748,77 @@ func (c *FederatedHPAController) computeReplicasForMetric(ctx context.Context, h return replicaCountProposal, metricNameProposal, timestampProposal, autoscalingv2.HorizontalPodAutoscalerCondition{}, nil } +// computeStatusForObjectMetric computes the desired number of replicas for the specified metric of type ObjectMetricSourceType. +func (c *FederatedHPAController) computeStatusForObjectMetric(specReplicas, statusReplicas int32, metricSpec autoscalingv2.MetricSpec, hpa *autoscalingv1alpha1.FederatedHPA, status *autoscalingv2.MetricStatus, metricSelector labels.Selector, podList []*corev1.Pod, calibration float64) (replicas int32, timestamp time.Time, metricName string, condition autoscalingv2.HorizontalPodAutoscalerCondition, err error) { + if metricSpec.Object.Target.Type == autoscalingv2.ValueMetricType { + replicaCountProposal, usageProposal, timestampProposal, err := c.ReplicaCalc.GetObjectMetricReplicas(specReplicas, metricSpec.Object.Target.Value.MilliValue(), metricSpec.Object.Metric.Name, hpa.Namespace, &metricSpec.Object.DescribedObject, metricSelector, podList, calibration) + if err != nil { + condition := c.getUnableComputeReplicaCountCondition(hpa, "FailedGetObjectMetric", err) + return 0, timestampProposal, "", condition, err + } + *status = autoscalingv2.MetricStatus{ + Type: autoscalingv2.ObjectMetricSourceType, + Object: &autoscalingv2.ObjectMetricStatus{ + DescribedObject: metricSpec.Object.DescribedObject, + Metric: autoscalingv2.MetricIdentifier{ + Name: metricSpec.Object.Metric.Name, + Selector: metricSpec.Object.Metric.Selector, + }, + Current: autoscalingv2.MetricValueStatus{ + Value: resource.NewMilliQuantity(usageProposal, resource.DecimalSI), + }, + }, + } + return replicaCountProposal, timestampProposal, fmt.Sprintf("%s metric %s", metricSpec.Object.DescribedObject.Kind, metricSpec.Object.Metric.Name), autoscalingv2.HorizontalPodAutoscalerCondition{}, nil + } else if metricSpec.Object.Target.Type == autoscalingv2.AverageValueMetricType { + replicaCountProposal, usageProposal, timestampProposal, err := c.ReplicaCalc.GetObjectPerPodMetricReplicas(statusReplicas, metricSpec.Object.Target.AverageValue.MilliValue(), metricSpec.Object.Metric.Name, hpa.Namespace, &metricSpec.Object.DescribedObject, metricSelector, calibration) + if err != nil { + condition := c.getUnableComputeReplicaCountCondition(hpa, "FailedGetObjectMetric", err) + return 0, time.Time{}, "", condition, fmt.Errorf("failed to get %s object metric: %v", metricSpec.Object.Metric.Name, err) + } + *status = autoscalingv2.MetricStatus{ + Type: autoscalingv2.ObjectMetricSourceType, + Object: &autoscalingv2.ObjectMetricStatus{ + Metric: autoscalingv2.MetricIdentifier{ + Name: metricSpec.Object.Metric.Name, + Selector: metricSpec.Object.Metric.Selector, + }, + Current: autoscalingv2.MetricValueStatus{ + AverageValue: resource.NewMilliQuantity(usageProposal, resource.DecimalSI), + }, + }, + } + return replicaCountProposal, timestampProposal, fmt.Sprintf("external metric %s(%+v)", metricSpec.Object.Metric.Name, metricSpec.Object.Metric.Selector), autoscalingv2.HorizontalPodAutoscalerCondition{}, nil + } + errMsg := "invalid object metric source: neither a value target nor an average value target was set" + err = fmt.Errorf(errMsg) + condition = c.getUnableComputeReplicaCountCondition(hpa, "FailedGetObjectMetric", err) + return 0, time.Time{}, "", condition, err +} + +// computeStatusForPodsMetric computes the desired number of replicas for the specified metric of type PodsMetricSourceType. +func (c *FederatedHPAController) computeStatusForPodsMetric(currentReplicas int32, metricSpec autoscalingv2.MetricSpec, hpa *autoscalingv1alpha1.FederatedHPA, selector labels.Selector, status *autoscalingv2.MetricStatus, metricSelector labels.Selector, podList []*corev1.Pod, calibration float64) (replicaCountProposal int32, timestampProposal time.Time, metricNameProposal string, condition autoscalingv2.HorizontalPodAutoscalerCondition, err error) { + replicaCountProposal, usageProposal, timestampProposal, err := c.ReplicaCalc.GetMetricReplicas(currentReplicas, metricSpec.Pods.Target.AverageValue.MilliValue(), metricSpec.Pods.Metric.Name, hpa.Namespace, selector, metricSelector, podList, calibration) + if err != nil { + condition = c.getUnableComputeReplicaCountCondition(hpa, "FailedGetPodsMetric", err) + return 0, timestampProposal, "", condition, err + } + *status = autoscalingv2.MetricStatus{ + Type: autoscalingv2.PodsMetricSourceType, + Pods: &autoscalingv2.PodsMetricStatus{ + Metric: autoscalingv2.MetricIdentifier{ + Name: metricSpec.Pods.Metric.Name, + Selector: metricSpec.Pods.Metric.Selector, + }, + Current: autoscalingv2.MetricValueStatus{ + AverageValue: resource.NewMilliQuantity(usageProposal, resource.DecimalSI), + }, + }, + } + + return replicaCountProposal, timestampProposal, fmt.Sprintf("pods metric %s", metricSpec.Pods.Metric.Name), autoscalingv2.HorizontalPodAutoscalerCondition{}, nil +} + func (c *FederatedHPAController) computeStatusForResourceMetricGeneric(ctx context.Context, currentReplicas int32, target autoscalingv2.MetricTarget, resourceName corev1.ResourceName, namespace string, container string, selector labels.Selector, sourceType autoscalingv2.MetricSourceType, podList []*corev1.Pod, calibration float64) (replicaCountProposal int32, metricStatus *autoscalingv2.MetricValueStatus, timestampProposal time.Time, metricNameProposal string, @@ -779,6 +877,26 @@ func (c *FederatedHPAController) computeStatusForResourceMetric(ctx context.Cont return replicaCountProposal, timestampProposal, metricNameProposal, condition, nil } +// computeStatusForContainerResourceMetric computes the desired number of replicas for the specified metric of type ResourceMetricSourceType. +func (c *FederatedHPAController) computeStatusForContainerResourceMetric(ctx context.Context, currentReplicas int32, metricSpec autoscalingv2.MetricSpec, hpa *autoscalingv1alpha1.FederatedHPA, + selector labels.Selector, status *autoscalingv2.MetricStatus, podList []*corev1.Pod, calibration float64) (replicaCountProposal int32, timestampProposal time.Time, + metricNameProposal string, condition autoscalingv2.HorizontalPodAutoscalerCondition, err error) { + replicaCountProposal, metricValueStatus, timestampProposal, metricNameProposal, condition, err := c.computeStatusForResourceMetricGeneric(ctx, currentReplicas, metricSpec.ContainerResource.Target, metricSpec.ContainerResource.Name, hpa.Namespace, metricSpec.ContainerResource.Container, selector, autoscalingv2.ContainerResourceMetricSourceType, podList, calibration) + if err != nil { + condition = c.getUnableComputeReplicaCountCondition(hpa, "FailedGetContainerResourceMetric", err) + return replicaCountProposal, timestampProposal, metricNameProposal, condition, err + } + *status = autoscalingv2.MetricStatus{ + Type: autoscalingv2.ContainerResourceMetricSourceType, + ContainerResource: &autoscalingv2.ContainerResourceMetricStatus{ + Name: metricSpec.ContainerResource.Name, + Container: metricSpec.ContainerResource.Container, + Current: *metricValueStatus, + }, + } + return replicaCountProposal, timestampProposal, metricNameProposal, condition, nil +} + func (c *FederatedHPAController) recordInitialRecommendation(currentReplicas int32, key string) { c.recommendationsLock.Lock() defer c.recommendationsLock.Unlock() diff --git a/pkg/controllers/federatedhpa/replica_calculator.go b/pkg/controllers/federatedhpa/replica_calculator.go index 10f354a63436..b0af74fdedf8 100644 --- a/pkg/controllers/federatedhpa/replica_calculator.go +++ b/pkg/controllers/federatedhpa/replica_calculator.go @@ -6,6 +6,7 @@ import ( "math" "time" + autoscalingv2 "k8s.io/api/autoscaling/v2" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/sets" @@ -139,6 +140,19 @@ func (c *ReplicaCalculator) GetRawResourceReplicas(ctx context.Context, currentR return replicaCount, usage, timestamp, err } +// GetMetricReplicas calculates the desired replica count based on a target metric usage +// (as a milli-value) for pods matching the given selector in the given namespace, and the +// current replica count +func (c *ReplicaCalculator) GetMetricReplicas(currentReplicas int32, targetUsage int64, metricName string, namespace string, selector labels.Selector, metricSelector labels.Selector, podList []*corev1.Pod, calibration float64) (replicaCount int32, usage int64, timestamp time.Time, err error) { + metrics, timestamp, err := c.metricsClient.GetRawMetric(metricName, namespace, selector, metricSelector) + if err != nil { + return 0, 0, time.Time{}, fmt.Errorf("unable to get metric %s: %v", metricName, err) + } + + replicaCount, usage, err = c.calcPlainMetricReplicas(metrics, currentReplicas, targetUsage, corev1.ResourceName(""), podList, calibration) + return replicaCount, usage, timestamp, err +} + // calcPlainMetricReplicas calculates the desired replicas for plain (i.e. non-utilization percentage) metrics. // //nolint:gocyclo @@ -210,6 +224,79 @@ func (c *ReplicaCalculator) calcPlainMetricReplicas(metrics metricsclient.PodMet return newReplicas, usage, nil } +// GetObjectMetricReplicas calculates the desired replica count based on a target metric usage (as a milli-value) +// for the given object in the given namespace, and the current replica count. +func (c *ReplicaCalculator) GetObjectMetricReplicas(currentReplicas int32, targetUsage int64, metricName string, namespace string, objectRef *autoscalingv2.CrossVersionObjectReference, metricSelector labels.Selector, podList []*corev1.Pod, calibration float64) (replicaCount int32, usage int64, timestamp time.Time, err error) { + usage, _, err = c.metricsClient.GetObjectMetric(metricName, namespace, objectRef, metricSelector) + if err != nil { + return 0, 0, time.Time{}, fmt.Errorf("unable to get metric %s: %v on %s %s/%s", metricName, objectRef.Kind, namespace, objectRef.Name, err) + } + + usageRatio := float64(usage) / float64(targetUsage) + replicaCount, timestamp, err = c.getUsageRatioReplicaCount(currentReplicas, usageRatio, podList, calibration) + return replicaCount, usage, timestamp, err +} + +// GetObjectPerPodMetricReplicas calculates the desired replica count based on a target metric usage (as a milli-value) +// for the given object in the given namespace, and the current replica count. +func (c *ReplicaCalculator) GetObjectPerPodMetricReplicas(statusReplicas int32, targetAverageUsage int64, metricName string, namespace string, objectRef *autoscalingv2.CrossVersionObjectReference, metricSelector labels.Selector, calibration float64) (replicaCount int32, usage int64, timestamp time.Time, err error) { + // The usage here refers to the total value of all metrics from the pods. + usage, timestamp, err = c.metricsClient.GetObjectMetric(metricName, namespace, objectRef, metricSelector) + if err != nil { + return 0, 0, time.Time{}, fmt.Errorf("unable to get metric %s: %v on %s %s/%s", metricName, objectRef.Kind, namespace, objectRef.Name, err) + } + + replicaCount = statusReplicas + usageRatio := float64(usage) / (float64(targetAverageUsage) * float64(replicaCount)) + if math.Abs(1.0-usageRatio) > c.tolerance { + // update number of replicas if change is large enough + replicaCount = int32(math.Ceil(float64(usage) / float64(targetAverageUsage) / calibration)) + } + usage = int64(math.Ceil(float64(usage) / float64(statusReplicas))) + return int32(math.Ceil(float64(replicaCount) / calibration)), usage, timestamp, nil +} + +// getUsageRatioReplicaCount calculates the desired replica count based on usageRatio and ready pods count. +// For currentReplicas=0 doesn't take into account ready pods count and tolerance to support scaling to zero pods. +func (c *ReplicaCalculator) getUsageRatioReplicaCount(currentReplicas int32, usageRatio float64, podList []*corev1.Pod, calibration float64) (replicaCount int32, timestamp time.Time, err error) { + if currentReplicas != 0 { + if math.Abs(1.0-usageRatio) <= c.tolerance { + // return the current replicas if the change would be too small + return currentReplicas, timestamp, nil + } + readyPodCount := int64(0) + readyPodCount, err = c.getReadyPodsCount(podList) + if err != nil { + return 0, time.Time{}, fmt.Errorf("unable to calculate ready pods: %s", err) + } + replicaCount = int32(math.Ceil(usageRatio * float64(readyPodCount) / calibration)) + } else { + // Scale to zero or n pods depending on usageRatio + replicaCount = int32(math.Ceil(usageRatio)) + } + + return replicaCount, timestamp, err +} + +// @TODO(mattjmcnaughton) Many different functions in this module use variations +// of this function. Make this function generic, so we don't repeat the same +// logic in multiple places. +func (c *ReplicaCalculator) getReadyPodsCount(podList []*corev1.Pod) (int64, error) { + if len(podList) == 0 { + return 0, fmt.Errorf("no pods returned by selector while calculating replica count") + } + + readyPodCount := 0 + + for _, pod := range podList { + if pod.Status.Phase == corev1.PodRunning && helper.IsPodReady(pod) { + readyPodCount++ + } + } + + return int64(readyPodCount), nil +} + func groupPods(pods []*corev1.Pod, metrics metricsclient.PodMetricsInfo, resource corev1.ResourceName, cpuInitializationPeriod, delayOfInitialReadinessStatus time.Duration) (readyPodCount int, unreadyPods, missingPods, ignoredPods sets.Set[string]) { missingPods = sets.New[string]() unreadyPods = sets.New[string]()