Skip to content

Commit

Permalink
HPA: expose the metrics "reconciliations_total" and "reconciliation_d…
Browse files Browse the repository at this point in the history
…uration_seconds" from HPA controller (#116010)
  • Loading branch information
sanposhiho committed Mar 14, 2023
1 parent c2ad27a commit b49b34c
Show file tree
Hide file tree
Showing 4 changed files with 415 additions and 26 deletions.
84 changes: 71 additions & 13 deletions pkg/controller/podautoscaler/horizontal.go
Expand Up @@ -18,6 +18,7 @@ package podautoscaler

import (
"context"
"errors"
"fmt"
"math"
"sync"
Expand All @@ -27,7 +28,7 @@ import (
autoscalingv2 "k8s.io/api/autoscaling/v2"
v1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
apimeta "k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -50,6 +51,7 @@ import (
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/controller"
metricsclient "k8s.io/kubernetes/pkg/controller/podautoscaler/metrics"
"k8s.io/kubernetes/pkg/controller/podautoscaler/monitor"
"k8s.io/kubernetes/pkg/controller/util/selectors"
)

Expand All @@ -58,6 +60,13 @@ var (
scaleUpLimitMinimum = 4.0
)

var (
// errSpec is used to determine if the error comes from the spec of HPA object in reconcileAutoscaler.
// All such errors should have this error as a root error so that the upstream function can distinguish spec errors from internal errors.
// e.g., fmt.Errorf("invalid spec%w", errSpec)
errSpec error = errors.New("")
)

type timestampedRecommendation struct {
recommendation int32
timestamp time.Time
Expand All @@ -82,6 +91,8 @@ type HorizontalController struct {

downscaleStabilisationWindow time.Duration

monitor monitor.Monitor

// hpaLister is able to list/get HPAs from the shared cache from the informer passed in to
// NewHorizontalController.
hpaLister autoscalinglisters.HorizontalPodAutoscalerLister
Expand Down Expand Up @@ -139,6 +150,7 @@ func NewHorizontalController(
scaleNamespacer: scaleNamespacer,
hpaNamespacer: hpaNamespacer,
downscaleStabilisationWindow: downscaleStabilisationWindow,
monitor: monitor.New(),
queue: workqueue.NewNamedRateLimitingQueue(NewDefaultHPARateLimiter(resyncPeriod), "horizontalpodautoscaler"),
mapper: mapper,
recommendations: map[string][]timestampedRecommendation{},
Expand Down Expand Up @@ -174,6 +186,8 @@ func NewHorizontalController(
)
hpaController.replicaCalc = replicaCalc

monitor.Register()

return hpaController
}

Expand Down Expand Up @@ -276,14 +290,17 @@ func (a *HorizontalController) processNextWorkItem(ctx context.Context) bool {
}

// computeReplicasForMetrics computes the desired number of replicas for the metric specifications listed in the HPA,
// returning the maximum of the computed replica counts, a description of the associated metric, and the statuses of
// returning the maximum of the computed replica counts, a description of the associated metric, and the statuses of
// all metrics computed.
// It may return both valid metricDesiredReplicas and an error,
// when some metrics still work and HPA should perform scaling based on them.
// If HPA cannot do anything due to error, it returns -1 in metricDesiredReplicas as a failure signal.
func (a *HorizontalController) computeReplicasForMetrics(ctx context.Context, hpa *autoscalingv2.HorizontalPodAutoscaler, scale *autoscalingv1.Scale,
metricSpecs []autoscalingv2.MetricSpec) (replicas int32, metric string, statuses []autoscalingv2.MetricStatus, timestamp time.Time, err error) {

selector, err := a.validateAndParseSelector(hpa, scale.Status.Selector)
if err != nil {
return 0, "", nil, time.Time{}, err
return -1, "", nil, time.Time{}, err
}

specReplicas := scale.Spec.Replicas
Expand All @@ -303,23 +320,29 @@ func (a *HorizontalController) computeReplicasForMetrics(ctx context.Context, hp
invalidMetricError = err
}
invalidMetricsCount++
continue
}
if err == nil && (replicas == 0 || replicaCountProposal > replicas) {
if replicas == 0 || replicaCountProposal > replicas {
timestamp = timestampProposal
replicas = replicaCountProposal
metric = metricNameProposal
}
}

if invalidMetricError != nil {
invalidMetricError = fmt.Errorf("invalid metrics (%v invalid out of %v), first error is: %v", invalidMetricsCount, len(metricSpecs), invalidMetricError)
}

// If all metrics are invalid or some are invalid and we would scale down,
// return an error and set the condition of the hpa based on the first invalid metric.
// Otherwise set the condition as scaling active as we're going to scale
if invalidMetricsCount >= len(metricSpecs) || (invalidMetricsCount > 0 && replicas < specReplicas) {
setCondition(hpa, invalidMetricCondition.Type, invalidMetricCondition.Status, invalidMetricCondition.Reason, invalidMetricCondition.Message)
return 0, "", statuses, time.Time{}, fmt.Errorf("invalid metrics (%v invalid out of %v), first error is: %v", invalidMetricsCount, len(metricSpecs), invalidMetricError)
return -1, "", statuses, time.Time{}, invalidMetricError
}
setCondition(hpa, autoscalingv2.ScalingActive, v1.ConditionTrue, "ValidMetricFound", "the HPA was able to successfully calculate a replica count from %s", metric)
return replicas, metric, statuses, timestamp, nil

return replicas, metric, statuses, timestamp, invalidMetricError
}

// hpasControllingPodsUnderSelector returns a list of keys of all HPAs that control a given list of pods.
Expand Down Expand Up @@ -445,8 +468,8 @@ func (a *HorizontalController) computeReplicasForMetric(ctx context.Context, hpa
return 0, "", time.Time{}, condition, fmt.Errorf("failed to get %s external metric value: %v", spec.External.Metric.Name, err)
}
default:
errMsg := fmt.Sprintf("unknown metric source type %q", string(spec.Type))
err = fmt.Errorf(errMsg)
// It shouldn't reach here as invalid metric source type is filtered out in the api-server's validation.
err = fmt.Errorf("unknown metric source type %q%w", string(spec.Type), errSpec)
condition := a.getUnableComputeReplicaCountCondition(hpa, "InvalidMetricSourceType", err)
return 0, "", time.Time{}, condition, err
}
Expand All @@ -462,7 +485,7 @@ func (a *HorizontalController) reconcileKey(ctx context.Context, key string) (de
logger := klog.FromContext(ctx)

hpa, err := a.hpaLister.HorizontalPodAutoscalers(namespace).Get(name)
if errors.IsNotFound(err) {
if k8serrors.IsNotFound(err) {
logger.Info("Horizontal Pod Autoscaler has been deleted", "HPA", klog.KRef(namespace, name))

a.recommendationsLock.Lock()
Expand Down Expand Up @@ -691,7 +714,23 @@ func (a *HorizontalController) recordInitialRecommendation(currentReplicas int32
}
}

func (a *HorizontalController) reconcileAutoscaler(ctx context.Context, hpaShared *autoscalingv2.HorizontalPodAutoscaler, key string) error {
func (a *HorizontalController) reconcileAutoscaler(ctx context.Context, hpaShared *autoscalingv2.HorizontalPodAutoscaler, key string) (retErr error) {
// actionLabel is used to report which actions this reconciliation has taken.
actionLabel := monitor.ActionLabelNone
start := time.Now()
defer func() {
errorLabel := monitor.ErrorLabelNone
if retErr != nil {
// In case of error, set "internal" as default.
errorLabel = monitor.ErrorLabelInternal
}
if errors.Is(retErr, errSpec) {
errorLabel = monitor.ErrorLabelSpec
}

a.monitor.ObserveReconciliationResult(actionLabel, errorLabel, time.Since(start))
}()

// make a copy so that we never mutate the shared informer cache (conversion can mutate the object)
hpa := hpaShared.DeepCopy()
hpaStatusOriginal := hpa.Status.DeepCopy()
Expand All @@ -705,7 +744,7 @@ func (a *HorizontalController) reconcileAutoscaler(ctx context.Context, hpaShare
if err := a.updateStatusIfNeeded(ctx, hpaStatusOriginal, hpa); err != nil {
utilruntime.HandleError(err)
}
return fmt.Errorf("invalid API version in scale target reference: %v", err)
return fmt.Errorf("invalid API version in scale target reference: %v%w", err, errSpec)
}

targetGK := schema.GroupKind{
Expand Down Expand Up @@ -771,14 +810,20 @@ func (a *HorizontalController) reconcileAutoscaler(ctx context.Context, hpaShare
} else {
var metricTimestamp time.Time
metricDesiredReplicas, metricName, metricStatuses, metricTimestamp, err = a.computeReplicasForMetrics(ctx, hpa, scale, hpa.Spec.Metrics)
if err != nil {
// computeReplicasForMetrics may return both non-zero metricDesiredReplicas and an error.
// That means some metrics still work and HPA should perform scaling based on them.
if err != nil && metricDesiredReplicas == -1 {
a.setCurrentReplicasInStatus(hpa, currentReplicas)
if err := a.updateStatusIfNeeded(ctx, hpaStatusOriginal, hpa); err != nil {
utilruntime.HandleError(err)
}
a.eventRecorder.Event(hpa, v1.EventTypeWarning, "FailedComputeMetricsReplicas", err.Error())
return fmt.Errorf("failed to compute desired number of replicas based on listed metrics for %s: %v", reference, err)
}
if err != nil {
// We proceed to scaling, but return this error from reconcileAutoscaler() finally.
retErr = err
}

logger.V(4).Info("Proposing desired replicas",
"desiredReplicas", metricDesiredReplicas,
Expand Down Expand Up @@ -825,6 +870,12 @@ func (a *HorizontalController) reconcileAutoscaler(ctx context.Context, hpaShare
"currentReplicas", currentReplicas,
"desiredReplicas", desiredReplicas,
"reason", rescaleReason)

if desiredReplicas > currentReplicas {
actionLabel = monitor.ActionLabelScaleUp
} else {
actionLabel = monitor.ActionLabelScaleDown
}
} else {
logger.V(4).Info("Decided not to scale",
"scaleTarget", reference,
Expand All @@ -834,7 +885,14 @@ func (a *HorizontalController) reconcileAutoscaler(ctx context.Context, hpaShare
}

a.setStatus(hpa, currentReplicas, desiredReplicas, metricStatuses, rescale)
return a.updateStatusIfNeeded(ctx, hpaStatusOriginal, hpa)

err = a.updateStatusIfNeeded(ctx, hpaStatusOriginal, hpa)
if err != nil {
// we can overwrite retErr in this case because it's an internal error.
return err
}

return retErr
}

// stabilizeRecommendation:
Expand Down

0 comments on commit b49b34c

Please sign in to comment.