Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HPA: expose the metrics "reconciliations_total" and "reconciliation_duration_seconds" from HPA controller #116010

Merged
merged 1 commit into from Mar 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
84 changes: 71 additions & 13 deletions pkg/controller/podautoscaler/horizontal.go
Expand Up @@ -18,6 +18,7 @@ package podautoscaler

import (
"context"
"errors"
"fmt"
"math"
"sync"
Expand All @@ -27,7 +28,7 @@ import (
autoscalingv2 "k8s.io/api/autoscaling/v2"
v1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
apimeta "k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -50,6 +51,7 @@ import (
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/controller"
metricsclient "k8s.io/kubernetes/pkg/controller/podautoscaler/metrics"
"k8s.io/kubernetes/pkg/controller/podautoscaler/monitor"
"k8s.io/kubernetes/pkg/controller/util/selectors"
)

Expand All @@ -58,6 +60,13 @@ var (
scaleUpLimitMinimum = 4.0
)

var (
// errSpec is used to determine if the error comes from the spec of HPA object in reconcileAutoscaler.
// All such errors should have this error as a root error so that the upstream function can distinguish spec errors from internal errors.
// e.g., fmt.Errorf("invalid spec%w", errSpec)
errSpec error = errors.New("")
)

type timestampedRecommendation struct {
recommendation int32
timestamp time.Time
Expand All @@ -82,6 +91,8 @@ type HorizontalController struct {

downscaleStabilisationWindow time.Duration

monitor monitor.Monitor

// hpaLister is able to list/get HPAs from the shared cache from the informer passed in to
// NewHorizontalController.
hpaLister autoscalinglisters.HorizontalPodAutoscalerLister
Expand Down Expand Up @@ -139,6 +150,7 @@ func NewHorizontalController(
scaleNamespacer: scaleNamespacer,
hpaNamespacer: hpaNamespacer,
downscaleStabilisationWindow: downscaleStabilisationWindow,
monitor: monitor.New(),
queue: workqueue.NewNamedRateLimitingQueue(NewDefaultHPARateLimiter(resyncPeriod), "horizontalpodautoscaler"),
mapper: mapper,
recommendations: map[string][]timestampedRecommendation{},
Expand Down Expand Up @@ -174,6 +186,8 @@ func NewHorizontalController(
)
hpaController.replicaCalc = replicaCalc

monitor.Register()

return hpaController
}

Expand Down Expand Up @@ -276,14 +290,17 @@ func (a *HorizontalController) processNextWorkItem(ctx context.Context) bool {
}

// computeReplicasForMetrics computes the desired number of replicas for the metric specifications listed in the HPA,
// returning the maximum of the computed replica counts, a description of the associated metric, and the statuses of
// returning the maximum of the computed replica counts, a description of the associated metric, and the statuses of
// all metrics computed.
// It may return both valid metricDesiredReplicas and an error,
// when some metrics still work and HPA should perform scaling based on them.
// If HPA cannot do anything due to error, it returns -1 in metricDesiredReplicas as a failure signal.
func (a *HorizontalController) computeReplicasForMetrics(ctx context.Context, hpa *autoscalingv2.HorizontalPodAutoscaler, scale *autoscalingv1.Scale,
metricSpecs []autoscalingv2.MetricSpec) (replicas int32, metric string, statuses []autoscalingv2.MetricStatus, timestamp time.Time, err error) {

selector, err := a.validateAndParseSelector(hpa, scale.Status.Selector)
if err != nil {
return 0, "", nil, time.Time{}, err
return -1, "", nil, time.Time{}, err
}

specReplicas := scale.Spec.Replicas
Expand All @@ -303,23 +320,29 @@ func (a *HorizontalController) computeReplicasForMetrics(ctx context.Context, hp
invalidMetricError = err
}
invalidMetricsCount++
continue
}
if err == nil && (replicas == 0 || replicaCountProposal > replicas) {
if replicas == 0 || replicaCountProposal > replicas {
timestamp = timestampProposal
replicas = replicaCountProposal
metric = metricNameProposal
}
}

if invalidMetricError != nil {
invalidMetricError = fmt.Errorf("invalid metrics (%v invalid out of %v), first error is: %v", invalidMetricsCount, len(metricSpecs), invalidMetricError)
}

// If all metrics are invalid or some are invalid and we would scale down,
// return an error and set the condition of the hpa based on the first invalid metric.
// Otherwise set the condition as scaling active as we're going to scale
if invalidMetricsCount >= len(metricSpecs) || (invalidMetricsCount > 0 && replicas < specReplicas) {
setCondition(hpa, invalidMetricCondition.Type, invalidMetricCondition.Status, invalidMetricCondition.Reason, invalidMetricCondition.Message)
return 0, "", statuses, time.Time{}, fmt.Errorf("invalid metrics (%v invalid out of %v), first error is: %v", invalidMetricsCount, len(metricSpecs), invalidMetricError)
return -1, "", statuses, time.Time{}, invalidMetricError
}
setCondition(hpa, autoscalingv2.ScalingActive, v1.ConditionTrue, "ValidMetricFound", "the HPA was able to successfully calculate a replica count from %s", metric)
return replicas, metric, statuses, timestamp, nil

return replicas, metric, statuses, timestamp, invalidMetricError
}

// hpasControllingPodsUnderSelector returns a list of keys of all HPAs that control a given list of pods.
Expand Down Expand Up @@ -445,8 +468,8 @@ func (a *HorizontalController) computeReplicasForMetric(ctx context.Context, hpa
return 0, "", time.Time{}, condition, fmt.Errorf("failed to get %s external metric value: %v", spec.External.Metric.Name, err)
}
default:
errMsg := fmt.Sprintf("unknown metric source type %q", string(spec.Type))
err = fmt.Errorf(errMsg)
// It shouldn't reach here as invalid metric source type is filtered out in the api-server's validation.
err = fmt.Errorf("unknown metric source type %q%w", string(spec.Type), errSpec)
condition := a.getUnableComputeReplicaCountCondition(hpa, "InvalidMetricSourceType", err)
return 0, "", time.Time{}, condition, err
}
Expand All @@ -462,7 +485,7 @@ func (a *HorizontalController) reconcileKey(ctx context.Context, key string) (de
logger := klog.FromContext(ctx)

hpa, err := a.hpaLister.HorizontalPodAutoscalers(namespace).Get(name)
if errors.IsNotFound(err) {
if k8serrors.IsNotFound(err) {
logger.Info("Horizontal Pod Autoscaler has been deleted", "HPA", klog.KRef(namespace, name))

a.recommendationsLock.Lock()
Expand Down Expand Up @@ -691,7 +714,23 @@ func (a *HorizontalController) recordInitialRecommendation(currentReplicas int32
}
}

func (a *HorizontalController) reconcileAutoscaler(ctx context.Context, hpaShared *autoscalingv2.HorizontalPodAutoscaler, key string) error {
func (a *HorizontalController) reconcileAutoscaler(ctx context.Context, hpaShared *autoscalingv2.HorizontalPodAutoscaler, key string) (retErr error) {
// actionLabel is used to report which actions this reconciliation has taken.
actionLabel := monitor.ActionLabelNone
start := time.Now()
defer func() {
errorLabel := monitor.ErrorLabelNone
if retErr != nil {
// In case of error, set "internal" as default.
errorLabel = monitor.ErrorLabelInternal
}
if errors.Is(retErr, errSpec) {
errorLabel = monitor.ErrorLabelSpec
}

a.monitor.ObserveReconciliationResult(actionLabel, errorLabel, time.Since(start))
}()

// make a copy so that we never mutate the shared informer cache (conversion can mutate the object)
hpa := hpaShared.DeepCopy()
hpaStatusOriginal := hpa.Status.DeepCopy()
Expand All @@ -705,7 +744,7 @@ func (a *HorizontalController) reconcileAutoscaler(ctx context.Context, hpaShare
if err := a.updateStatusIfNeeded(ctx, hpaStatusOriginal, hpa); err != nil {
utilruntime.HandleError(err)
}
return fmt.Errorf("invalid API version in scale target reference: %v", err)
return fmt.Errorf("invalid API version in scale target reference: %v%w", err, errSpec)
}

targetGK := schema.GroupKind{
Expand Down Expand Up @@ -771,14 +810,20 @@ func (a *HorizontalController) reconcileAutoscaler(ctx context.Context, hpaShare
} else {
var metricTimestamp time.Time
metricDesiredReplicas, metricName, metricStatuses, metricTimestamp, err = a.computeReplicasForMetrics(ctx, hpa, scale, hpa.Spec.Metrics)
if err != nil {
// computeReplicasForMetrics may return both non-zero metricDesiredReplicas and an error.
// That means some metrics still work and HPA should perform scaling based on them.
if err != nil && metricDesiredReplicas == -1 {
a.setCurrentReplicasInStatus(hpa, currentReplicas)
if err := a.updateStatusIfNeeded(ctx, hpaStatusOriginal, hpa); err != nil {
utilruntime.HandleError(err)
}
a.eventRecorder.Event(hpa, v1.EventTypeWarning, "FailedComputeMetricsReplicas", err.Error())
return fmt.Errorf("failed to compute desired number of replicas based on listed metrics for %s: %v", reference, err)
}
if err != nil {
// We proceed to scaling, but return this error from reconcileAutoscaler() finally.
retErr = err
}

logger.V(4).Info("Proposing desired replicas",
"desiredReplicas", metricDesiredReplicas,
Expand Down Expand Up @@ -825,6 +870,12 @@ func (a *HorizontalController) reconcileAutoscaler(ctx context.Context, hpaShare
"currentReplicas", currentReplicas,
"desiredReplicas", desiredReplicas,
"reason", rescaleReason)

if desiredReplicas > currentReplicas {
actionLabel = monitor.ActionLabelScaleUp
} else {
actionLabel = monitor.ActionLabelScaleDown
}
} else {
logger.V(4).Info("Decided not to scale",
"scaleTarget", reference,
Expand All @@ -834,7 +885,14 @@ func (a *HorizontalController) reconcileAutoscaler(ctx context.Context, hpaShare
}

a.setStatus(hpa, currentReplicas, desiredReplicas, metricStatuses, rescale)
return a.updateStatusIfNeeded(ctx, hpaStatusOriginal, hpa)

err = a.updateStatusIfNeeded(ctx, hpaStatusOriginal, hpa)
if err != nil {
// we can overwrite retErr in this case because it's an internal error.
return err
}

return retErr
}

// stabilizeRecommendation:
Expand Down