Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HPA: Consider unready pods separately #33593

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion cmd/kube-controller-manager/app/controllermanager.go
Expand Up @@ -427,7 +427,8 @@ func StartControllers(s *options.CMServer, kubeconfig *restclient.Config, rootCl
metrics.DefaultHeapsterService,
metrics.DefaultHeapsterPort,
)
go podautoscaler.NewHorizontalController(hpaClient.Core(), hpaClient.Extensions(), hpaClient.Autoscaling(), metricsClient, s.HorizontalPodAutoscalerSyncPeriod.Duration).
replicaCalc := podautoscaler.NewReplicaCalculator(metricsClient, hpaClient.Core())
go podautoscaler.NewHorizontalController(hpaClient.Core(), hpaClient.Extensions(), hpaClient.Autoscaling(), replicaCalc, s.HorizontalPodAutoscalerSyncPeriod.Duration).
Run(wait.NeverStop)
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
}
Expand Down
9 changes: 8 additions & 1 deletion pkg/controller/podautoscaler/BUILD
Expand Up @@ -15,6 +15,7 @@ go_library(
srcs = [
"doc.go",
"horizontal.go",
"replica_calculator.go",
],
tags = ["automanaged"],
deps = [
Expand All @@ -29,16 +30,21 @@ go_library(
"//pkg/client/clientset_generated/internalclientset/typed/extensions/internalversion:go_default_library",
"//pkg/client/record:go_default_library",
"//pkg/controller/podautoscaler/metrics:go_default_library",
"//pkg/labels:go_default_library",
"//pkg/runtime:go_default_library",
"//pkg/util/runtime:go_default_library",
"//pkg/util/sets:go_default_library",
"//pkg/watch:go_default_library",
"//vendor:github.com/golang/glog",
],
)

go_test(
name = "go_default_test",
srcs = ["horizontal_test.go"],
srcs = [
"horizontal_test.go",
"replica_calculator_test.go",
],
library = "go_default_library",
tags = ["automanaged"],
deps = [
Expand All @@ -58,6 +64,7 @@ go_test(
"//pkg/runtime:go_default_library",
"//pkg/watch:go_default_library",
"//vendor:github.com/stretchr/testify/assert",
"//vendor:github.com/stretchr/testify/require",
"//vendor:k8s.io/heapster/metrics/api/v1/types",
"//vendor:k8s.io/heapster/metrics/apis/metrics/v1alpha1",
],
Expand Down
43 changes: 13 additions & 30 deletions pkg/controller/podautoscaler/horizontal.go
Expand Up @@ -33,7 +33,6 @@ import (
unversionedcore "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion"
unversionedextensions "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/extensions/internalversion"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/controller/podautoscaler/metrics"
"k8s.io/kubernetes/pkg/runtime"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/watch"
Expand Down Expand Up @@ -61,7 +60,7 @@ type HorizontalController struct {
scaleNamespacer unversionedextensions.ScalesGetter
hpaNamespacer unversionedautoscaling.HorizontalPodAutoscalersGetter

metricsClient metrics.MetricsClient
replicaCalc *ReplicaCalculator
eventRecorder record.EventRecorder

// A store of HPA objects, populated by the controller.
Expand Down Expand Up @@ -110,13 +109,13 @@ func newInformer(controller *HorizontalController, resyncPeriod time.Duration) (
)
}

func NewHorizontalController(evtNamespacer unversionedcore.EventsGetter, scaleNamespacer unversionedextensions.ScalesGetter, hpaNamespacer unversionedautoscaling.HorizontalPodAutoscalersGetter, metricsClient metrics.MetricsClient, resyncPeriod time.Duration) *HorizontalController {
func NewHorizontalController(evtNamespacer unversionedcore.EventsGetter, scaleNamespacer unversionedextensions.ScalesGetter, hpaNamespacer unversionedautoscaling.HorizontalPodAutoscalersGetter, replicaCalc *ReplicaCalculator, resyncPeriod time.Duration) *HorizontalController {
broadcaster := record.NewBroadcaster()
broadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{Interface: evtNamespacer.Events("")})
recorder := broadcaster.NewRecorder(api.EventSource{Component: "horizontal-pod-autoscaler"})

controller := &HorizontalController{
metricsClient: metricsClient,
replicaCalc: replicaCalc,
eventRecorder: recorder,
scaleNamespacer: scaleNamespacer,
hpaNamespacer: hpaNamespacer,
Expand Down Expand Up @@ -164,9 +163,8 @@ func (a *HorizontalController) computeReplicasForCPUUtilization(hpa *autoscaling
a.eventRecorder.Event(hpa, api.EventTypeWarning, "InvalidSelector", errMsg)
return 0, nil, time.Time{}, fmt.Errorf(errMsg)
}
currentUtilization, numRunningPods, timestamp, err := a.metricsClient.GetCPUUtilization(hpa.Namespace, selector)

// TODO: what to do on partial errors (like metrics obtained for 75% of pods).
desiredReplicas, utilization, timestamp, err := a.replicaCalc.GetResourceReplicas(currentReplicas, targetUtilization, api.ResourceCPU, hpa.Namespace, selector)
if err != nil {
lastScaleTime := getLastScaleTime(hpa)
if time.Now().After(lastScaleTime.Add(upscaleForbiddenWindow)) {
Expand All @@ -178,20 +176,13 @@ func (a *HorizontalController) computeReplicasForCPUUtilization(hpa *autoscaling
return 0, nil, time.Time{}, fmt.Errorf("failed to get CPU utilization: %v", err)
}

utilization := int32(*currentUtilization)

usageRatio := float64(utilization) / float64(targetUtilization)
if math.Abs(1.0-usageRatio) <= tolerance {
return currentReplicas, &utilization, timestamp, nil
if desiredReplicas != currentReplicas {
a.eventRecorder.Eventf(hpa, api.EventTypeNormal, "DesiredReplicasComputed",
"Computed the desired num of replicas: %d (avgCPUutil: %d, current replicas: %d)",
desiredReplicas, utilization, scale.Status.Replicas)
}

desiredReplicas := math.Ceil(usageRatio * float64(numRunningPods))

a.eventRecorder.Eventf(hpa, api.EventTypeNormal, "DesiredReplicasComputed",
"Computed the desired num of replicas: %d, on a base of %d report(s) (avgCPUutil: %d, current replicas: %d)",
int32(desiredReplicas), numRunningPods, utilization, scale.Status.Replicas)

return int32(desiredReplicas), &utilization, timestamp, nil
return desiredReplicas, &utilization, timestamp, nil
}

// computeReplicasForCustomMetrics computes the desired number of replicas based on the CustomMetrics passed in cmAnnotation
Expand Down Expand Up @@ -233,8 +224,8 @@ func (a *HorizontalController) computeReplicasForCustomMetrics(hpa *autoscaling.
a.eventRecorder.Event(hpa, api.EventTypeWarning, "InvalidSelector", errMsg)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, computeReplicasForCustomMetrics method will not assume 0 from not ready pods during scale-up, but will rather take average from ready pods only? This is a different between scaling base on CPU and on custom metric. I think It would be more correct if we also assume 0 for not ready pods here but I don't have a strong opinion. Anyway, it definitely should be documented.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whoops, that was not intentional ;-). I'll fix it so both have the same

return 0, "", "", time.Time{}, fmt.Errorf("couldn't convert selector string to a corresponding selector object: %v", err)
}
value, currentTimestamp, err := a.metricsClient.GetCustomMetric(customMetricTarget.Name, hpa.Namespace, selector)
// TODO: what to do on partial errors (like metrics obtained for 75% of pods).
floatTarget := float64(customMetricTarget.TargetValue.MilliValue()) / 1000.0
replicaCountProposal, utilizationProposal, timestampProposal, err := a.replicaCalc.GetMetricReplicas(currentReplicas, floatTarget, fmt.Sprintf("custom/%s", customMetricTarget.Name), hpa.Namespace, selector)
if err != nil {
lastScaleTime := getLastScaleTime(hpa)
if time.Now().After(lastScaleTime.Add(upscaleForbiddenWindow)) {
Expand All @@ -245,21 +236,13 @@ func (a *HorizontalController) computeReplicasForCustomMetrics(hpa *autoscaling.

return 0, "", "", time.Time{}, fmt.Errorf("failed to get custom metric value: %v", err)
}
floatTarget := float64(customMetricTarget.TargetValue.MilliValue()) / 1000.0
usageRatio := *value / floatTarget

replicaCountProposal := int32(0)
if math.Abs(1.0-usageRatio) > tolerance {
replicaCountProposal = int32(math.Ceil(usageRatio * float64(currentReplicas)))
} else {
replicaCountProposal = currentReplicas
}
if replicaCountProposal > replicas {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand this comparison: replicas is not initialized here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

replicas is 0 the first time through the loop, I believe (it's a named parameter, so it initially is the default value). This code is mostly unchanged from the old code.

timestamp = currentTimestamp
timestamp = timestampProposal
replicas = replicaCountProposal
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How can we ever scale down if we set replicas only if it is smaller than the proposal?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

replicas starts at zero -- this just chooses the biggest of the replica counts from all the custom metrics.

metric = fmt.Sprintf("Custom metric %s", customMetricTarget.Name)
}
quantity, err := resource.ParseQuantity(fmt.Sprintf("%.3f", *value))
quantity, err := resource.ParseQuantity(fmt.Sprintf("%.3f", utilizationProposal))
if err != nil {
return 0, "", "", time.Time{}, fmt.Errorf("failed to set custom metric value: %v", err)
}
Expand Down