Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automated cherry pick of #34955 #35046

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
27 changes: 23 additions & 4 deletions pkg/controller/podautoscaler/horizontal.go
Expand Up @@ -48,8 +48,15 @@ const (

HpaCustomMetricsTargetAnnotationName = "alpha/target.custom-metrics.podautoscaler.kubernetes.io"
HpaCustomMetricsStatusAnnotationName = "alpha/status.custom-metrics.podautoscaler.kubernetes.io"

scaleUpLimitFactor = 2
scaleUpLimitMinimum = 4
)

func calculateScaleUpLimit(currentReplicas int32) int32 {
return int32(math.Max(scaleUpLimitFactor*float64(currentReplicas), scaleUpLimitMinimum))
}

type HorizontalController struct {
scaleNamespacer unversionedextensions.ScalesGetter
hpaNamespacer unversionedautoscaling.HorizontalPodAutoscalersGetter
Expand Down Expand Up @@ -148,7 +155,7 @@ func (a *HorizontalController) computeReplicasForCPUUtilization(hpa *autoscaling
a.eventRecorder.Event(hpa, api.EventTypeWarning, "InvalidSelector", errMsg)
return 0, nil, time.Time{}, fmt.Errorf(errMsg)
}
currentUtilization, timestamp, err := a.metricsClient.GetCPUUtilization(hpa.Namespace, selector)
currentUtilization, numRunningPods, timestamp, err := a.metricsClient.GetCPUUtilization(hpa.Namespace, selector)

// TODO: what to do on partial errors (like metrics obtained for 75% of pods).
if err != nil {
Expand All @@ -159,11 +166,17 @@ func (a *HorizontalController) computeReplicasForCPUUtilization(hpa *autoscaling
utilization := int32(*currentUtilization)

usageRatio := float64(utilization) / float64(targetUtilization)
if math.Abs(1.0-usageRatio) > tolerance {
return int32(math.Ceil(usageRatio * float64(currentReplicas))), &utilization, timestamp, nil
if math.Abs(1.0-usageRatio) <= tolerance {
return currentReplicas, &utilization, timestamp, nil
}

return currentReplicas, &utilization, timestamp, nil
desiredReplicas := math.Ceil(usageRatio * float64(numRunningPods))

a.eventRecorder.Eventf(hpa, api.EventTypeNormal, "DesiredReplicasComputed",
"Computed the desired num of replicas: %d, on a base of %d report(s) (avgCPUutil: %d, current replicas: %d)",
int32(desiredReplicas), numRunningPods, utilization, scale.Status.Replicas)

return int32(desiredReplicas), &utilization, timestamp, nil
}

// Computes the desired number of replicas based on the CustomMetrics passed in cmAnnotation as json-serialized
Expand Down Expand Up @@ -329,6 +342,12 @@ func (a *HorizontalController) reconcileAutoscaler(hpa *autoscaling.HorizontalPo
if desiredReplicas > hpa.Spec.MaxReplicas {
desiredReplicas = hpa.Spec.MaxReplicas
}

// Do not upscale too much to prevent incorrect rapid increase of the number of master replicas caused by
// bogus CPU usage report from heapster/kubelet (like in issue #32304).
if desiredReplicas > calculateScaleUpLimit(currentReplicas) {
desiredReplicas = calculateScaleUpLimit(currentReplicas)
}
}

rescale := shouldScale(hpa, currentReplicas, desiredReplicas, timestamp)
Expand Down
41 changes: 39 additions & 2 deletions pkg/controller/podautoscaler/horizontal_test.go
Expand Up @@ -382,8 +382,17 @@ func (tc *testCase) prepareTestClient(t *testing.T) *fake.Clientset {

obj := action.(core.CreateAction).GetObject().(*api.Event)
if tc.verifyEvents {
assert.Equal(t, "SuccessfulRescale", obj.Reason)
assert.Equal(t, fmt.Sprintf("New size: %d; reason: CPU utilization above target", tc.desiredReplicas), obj.Message)
switch obj.Reason {
case "SuccessfulRescale":
assert.Equal(t, fmt.Sprintf("New size: %d; reason: CPU utilization above target", tc.desiredReplicas), obj.Message)
case "DesiredReplicasComputed":
assert.Equal(t, fmt.Sprintf(
"Computed the desired num of replicas: %d, on a base of %d report(s) (avgCPUutil: %d, current replicas: %d)",
tc.desiredReplicas, len(tc.reportedLevels),
(int64(tc.reportedLevels[0])*100)/tc.reportedCPURequests[0].MilliValue(), tc.initialReplicas), obj.Message)
default:
assert.False(t, true, fmt.Sprintf("Unexpected event: %s / %s", obj.Reason, obj.Message))
}
}
tc.eventCreated = true
return true, obj, nil
Expand Down Expand Up @@ -801,6 +810,34 @@ func TestEventNotCreated(t *testing.T) {
tc.runTest(t)
}

func TestMissingReports(t *testing.T) {
tc := testCase{
minReplicas: 1,
maxReplicas: 5,
initialReplicas: 4,
desiredReplicas: 2,
CPUTarget: 50,
reportedLevels: []uint64{200},
reportedCPURequests: []resource.Quantity{resource.MustParse("0.2")},
useMetricsApi: true,
}
tc.runTest(t)
}

func TestUpscaleCap(t *testing.T) {
tc := testCase{
minReplicas: 1,
maxReplicas: 100,
initialReplicas: 3,
desiredReplicas: 6,
CPUTarget: 10,
reportedLevels: []uint64{100, 200, 300},
reportedCPURequests: []resource.Quantity{resource.MustParse("0.1"), resource.MustParse("0.1"), resource.MustParse("0.1")},
useMetricsApi: true,
}
tc.runTest(t)
}

// TestComputedToleranceAlgImplementation is a regression test which
// back-calculates a minimal percentage for downscaling based on a small percentage
// increase in pod utilization which is calibrated against the tolerance value.
Expand Down
28 changes: 14 additions & 14 deletions pkg/controller/podautoscaler/metrics/metrics_client.go
Expand Up @@ -45,9 +45,9 @@ var heapsterQueryStart = -5 * time.Minute
// MetricsClient is an interface for getting metrics for pods.
type MetricsClient interface {
// GetCPUUtilization returns the average utilization over all pods represented as a percent of requested CPU
// (e.g. 70 means that an average pod uses 70% of the requested CPU)
// and the time of generation of the oldest of utilization reports for pods.
GetCPUUtilization(namespace string, selector labels.Selector) (*int, time.Time, error)
// (e.g. 70 means that an average pod uses 70% of the requested CPU),
// the number of running pods from which CPU usage was collected and the time of generation of the oldest of utilization reports for pods.
GetCPUUtilization(namespace string, selector labels.Selector) (*int, int, time.Time, error)

// GetCustomMetric returns the average value of the given custom metrics from the
// pods picked using the namespace and selector passed as arguments.
Expand Down Expand Up @@ -101,23 +101,23 @@ func NewHeapsterMetricsClient(client clientset.Interface, namespace, scheme, ser
}
}

func (h *HeapsterMetricsClient) GetCPUUtilization(namespace string, selector labels.Selector) (*int, time.Time, error) {
avgConsumption, avgRequest, timestamp, err := h.GetCpuConsumptionAndRequestInMillis(namespace, selector)
func (h *HeapsterMetricsClient) GetCPUUtilization(namespace string, selector labels.Selector) (utilization *int, numRunningPods int, timestamp time.Time, err error) {
avgConsumption, avgRequest, numRunningPods, timestamp, err := h.GetCpuConsumptionAndRequestInMillis(namespace, selector)
if err != nil {
return nil, time.Time{}, fmt.Errorf("failed to get CPU consumption and request: %v", err)
return nil, 0, time.Time{}, fmt.Errorf("failed to get CPU consumption and request: %v", err)
}
utilization := int((avgConsumption * 100) / avgRequest)
return &utilization, timestamp, nil
tmp := int((avgConsumption * 100) / avgRequest)
return &tmp, numRunningPods, timestamp, nil
}

func (h *HeapsterMetricsClient) GetCpuConsumptionAndRequestInMillis(namespace string, selector labels.Selector) (avgConsumption int64,
avgRequest int64, timestamp time.Time, err error) {
avgRequest int64, numRunningPods int, timestamp time.Time, err error) {

podList, err := h.client.Core().Pods(namespace).
List(api.ListOptions{LabelSelector: selector})

if err != nil {
return 0, 0, time.Time{}, fmt.Errorf("failed to get pod list: %v", err)
return 0, 0, 0, time.Time{}, fmt.Errorf("failed to get pod list: %v", err)
}
podNames := map[string]struct{}{}
requestSum := int64(0)
Expand All @@ -138,19 +138,19 @@ func (h *HeapsterMetricsClient) GetCpuConsumptionAndRequestInMillis(namespace st
}
}
if len(podNames) == 0 && len(podList.Items) > 0 {
return 0, 0, time.Time{}, fmt.Errorf("no running pods")
return 0, 0, 0, time.Time{}, fmt.Errorf("no running pods")
}
if missing || requestSum == 0 {
return 0, 0, time.Time{}, fmt.Errorf("some pods do not have request for cpu")
return 0, 0, 0, time.Time{}, fmt.Errorf("some pods do not have request for cpu")
}
glog.V(4).Infof("%s %s - sum of CPU requested: %d", namespace, selector, requestSum)
requestAvg := requestSum / int64(len(podNames))
// Consumption is already averaged and in millis.
consumption, timestamp, err := h.getCpuUtilizationForPods(namespace, selector, podNames)
if err != nil {
return 0, 0, time.Time{}, err
return 0, 0, 0, time.Time{}, err
}
return consumption, requestAvg, timestamp, nil
return consumption, requestAvg, len(podNames), timestamp, nil
}

func (h *HeapsterMetricsClient) getCpuUtilizationForPods(namespace string, selector labels.Selector, podNames map[string]struct{}) (int64, time.Time, error) {
Expand Down
14 changes: 10 additions & 4 deletions pkg/controller/podautoscaler/metrics/metrics_client_test.go
Expand Up @@ -69,6 +69,7 @@ type testCase struct {
desiredValue float64
desiredRequest *float64
desiredError error
desiredRunningPods int
targetResource string
targetTimestamp int
reportedMetricsPoints [][]metricPoint
Expand Down Expand Up @@ -184,7 +185,7 @@ func buildPod(namespace, podName string, podLabels map[string]string, phase api.
}
}

func (tc *testCase) verifyResults(t *testing.T, val *float64, req *float64, timestamp time.Time, err error) {
func (tc *testCase) verifyResults(t *testing.T, val *float64, req *float64, pods int, timestamp time.Time, err error) {
if tc.desiredError != nil {
assert.Error(t, err)
assert.Contains(t, fmt.Sprintf("%v", err), fmt.Sprintf("%v", tc.desiredError))
Expand All @@ -194,6 +195,7 @@ func (tc *testCase) verifyResults(t *testing.T, val *float64, req *float64, time
assert.NotNil(t, val)
assert.True(t, tc.desiredValue-0.001 < *val)
assert.True(t, tc.desiredValue+0.001 > *val)
assert.Equal(t, tc.desiredRunningPods, pods)

if tc.desiredRequest != nil {
assert.True(t, *tc.desiredRequest-0.001 < *req)
Expand All @@ -208,20 +210,21 @@ func (tc *testCase) runTest(t *testing.T) {
testClient := tc.prepareTestClient(t)
metricsClient := NewHeapsterMetricsClient(testClient, DefaultHeapsterNamespace, DefaultHeapsterScheme, DefaultHeapsterService, DefaultHeapsterPort)
if tc.targetResource == "cpu-usage" {
val, req, timestamp, err := metricsClient.GetCpuConsumptionAndRequestInMillis(tc.namespace, tc.selector)
val, req, pods, timestamp, err := metricsClient.GetCpuConsumptionAndRequestInMillis(tc.namespace, tc.selector)
fval := float64(val)
freq := float64(req)
tc.verifyResults(t, &fval, &freq, timestamp, err)
tc.verifyResults(t, &fval, &freq, pods, timestamp, err)
} else {
val, timestamp, err := metricsClient.GetCustomMetric(tc.targetResource, tc.namespace, tc.selector)
tc.verifyResults(t, val, nil, timestamp, err)
tc.verifyResults(t, val, nil, 0, timestamp, err)
}
}

func TestCPU(t *testing.T) {
tc := testCase{
replicas: 3,
desiredValue: 5000,
desiredRunningPods: 3,
targetResource: "cpu-usage",
targetTimestamp: 1,
reportedPodMetrics: [][]int64{{5000}, {5000}, {5000}},
Expand All @@ -236,6 +239,7 @@ func TestCPUPending(t *testing.T) {
replicas: 5,
desiredValue: 5000,
desiredRequest: &desiredRequest,
desiredRunningPods: 3,
targetResource: "cpu-usage",
targetTimestamp: 1,
reportedPodMetrics: [][]int64{{5000}, {5000}, {5000}},
Expand Down Expand Up @@ -339,6 +343,7 @@ func TestCPUSumEqualZero(t *testing.T) {
tc := testCase{
replicas: 3,
desiredValue: 0,
desiredRunningPods: 3,
targetResource: "cpu-usage",
targetTimestamp: 0,
reportedPodMetrics: [][]int64{{0}, {0}, {0}},
Expand All @@ -362,6 +367,7 @@ func TestCPUMoreMetrics(t *testing.T) {
tc := testCase{
replicas: 5,
desiredValue: 5000,
desiredRunningPods: 5,
targetResource: "cpu-usage",
targetTimestamp: 10,
reportedPodMetrics: [][]int64{{1000, 2000, 2000}, {5000}, {1000, 1000, 1000, 2000}, {4000, 1000}, {5000}},
Expand Down