Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed forbidden window enforcement in horizontal pod autoscaler. #18065

Merged
merged 1 commit into from
Dec 3, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
21 changes: 10 additions & 11 deletions pkg/controller/podautoscaler/horizontal.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,27 +68,27 @@ func (a *HorizontalController) Run(syncPeriod time.Duration) {
}, syncPeriod, util.NeverStop)
}

func (a *HorizontalController) computeReplicasForCPUUtilization(hpa extensions.HorizontalPodAutoscaler, scale *extensions.Scale) (int, *int, error) {
func (a *HorizontalController) computeReplicasForCPUUtilization(hpa extensions.HorizontalPodAutoscaler, scale *extensions.Scale) (int, *int, time.Time, error) {
if hpa.Spec.CPUUtilization == nil {
// If CPUTarget is not specified than we should return some default values.
// Since we always take maximum number of replicas from all policies it is safe
// to just return 0.
return 0, nil, nil
return 0, nil, time.Time{}, nil
}
currentReplicas := scale.Status.Replicas
currentUtilization, err := a.metricsClient.GetCPUUtilization(hpa.Namespace, scale.Status.Selector)
currentUtilization, timestamp, err := a.metricsClient.GetCPUUtilization(hpa.Namespace, scale.Status.Selector)

// TODO: what to do on partial errors (like metrics obtained for 75% of pods).
if err != nil {
a.eventRecorder.Event(&hpa, api.EventTypeWarning, "FailedGetMetrics", err.Error())
return 0, nil, fmt.Errorf("failed to get cpu utilization: %v", err)
return 0, nil, time.Time{}, fmt.Errorf("failed to get cpu utilization: %v", err)
}

usageRatio := float64(*currentUtilization) / float64(hpa.Spec.CPUUtilization.TargetPercentage)
if math.Abs(1.0-usageRatio) > tolerance {
return int(math.Ceil(usageRatio * float64(currentReplicas))), currentUtilization, nil
return int(math.Ceil(usageRatio * float64(currentReplicas))), currentUtilization, timestamp, nil
} else {
return currentReplicas, currentUtilization, nil
return currentReplicas, currentUtilization, timestamp, nil
}
}

Expand All @@ -102,7 +102,7 @@ func (a *HorizontalController) reconcileAutoscaler(hpa extensions.HorizontalPodA
}
currentReplicas := scale.Status.Replicas

desiredReplicas, currentUtilization, err := a.computeReplicasForCPUUtilization(hpa, scale)
desiredReplicas, currentUtilization, timestamp, err := a.computeReplicasForCPUUtilization(hpa, scale)
if err != nil {
a.eventRecorder.Event(&hpa, api.EventTypeWarning, "FailedComputeReplicas", err.Error())
return fmt.Errorf("failed to compute desired number of replicas based on CPU utilization for %s: %v", reference, err)
Expand All @@ -120,23 +120,22 @@ func (a *HorizontalController) reconcileAutoscaler(hpa extensions.HorizontalPodA
if desiredReplicas > hpa.Spec.MaxReplicas {
desiredReplicas = hpa.Spec.MaxReplicas
}
now := time.Now()
rescale := false

if desiredReplicas != currentReplicas {
// Going down only if the usageRatio dropped significantly below the target
// and there was no rescaling in the last downscaleForbiddenWindow.
if desiredReplicas < currentReplicas &&
(hpa.Status.LastScaleTime == nil ||
hpa.Status.LastScaleTime.Add(downscaleForbiddenWindow).Before(now)) {
hpa.Status.LastScaleTime.Add(downscaleForbiddenWindow).Before(timestamp)) {
rescale = true
}

// Going up only if the usage ratio increased significantly above the target
// and there was no rescaling in the last upscaleForbiddenWindow.
if desiredReplicas > currentReplicas &&
(hpa.Status.LastScaleTime == nil ||
hpa.Status.LastScaleTime.Add(upscaleForbiddenWindow).Before(now)) {
hpa.Status.LastScaleTime.Add(upscaleForbiddenWindow).Before(timestamp)) {
rescale = true
}
}
Expand All @@ -162,7 +161,7 @@ func (a *HorizontalController) reconcileAutoscaler(hpa extensions.HorizontalPodA
LastScaleTime: hpa.Status.LastScaleTime,
}
if rescale {
now := unversioned.NewTime(now)
now := unversioned.NewTime(time.Now())
hpa.Status.LastScaleTime = &now
}

Expand Down
78 changes: 43 additions & 35 deletions pkg/controller/podautoscaler/metrics/metrics_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@ var heapsterQueryStart = -5 * time.Minute

// MetricsClient is an interface for getting metrics for pods.
type MetricsClient interface {
// GetCPUUtilization returns average utilization over all pods
// represented as a percent of requested CPU, e.g. 70 means that
// an average pod uses 70% of the requested CPU.
GetCPUUtilization(namespace string, selector map[string]string) (*int, error)
// GetCPUUtilization returns the average utilization over all pods represented as a percent of requested CPU
// (e.g. 70 means that an average pod uses 70% of the requested CPU)
// and the time of generation of the oldest of utilization reports for pods.
GetCPUUtilization(namespace string, selector map[string]string) (*int, time.Time, error)
}

// ResourceConsumption specifies consumption of a particular resource.
Expand All @@ -55,9 +55,8 @@ type ResourceConsumption struct {
Quantity resource.Quantity
}

// Aggregates results into ResourceConsumption. Also returns number of
// pods included in the aggregation.
type metricAggregator func(heapster.MetricResultList) (ResourceConsumption, int)
// Aggregates results into ResourceConsumption. Also returns number of pods included in the aggregation.
type metricAggregator func(heapster.MetricResultList) (ResourceConsumption, int, time.Time)

type metricDefinition struct {
name string
Expand All @@ -76,23 +75,23 @@ type HeapsterMetricsClient struct {

var heapsterMetricDefinitions = map[api.ResourceName]metricDefinition{
api.ResourceCPU: {"cpu-usage",
func(metrics heapster.MetricResultList) (ResourceConsumption, int) {
sum, count := calculateSumFromLatestSample(metrics)
func(metrics heapster.MetricResultList) (ResourceConsumption, int, time.Time) {
sum, count, timestamp := calculateSumFromLatestSample(metrics)
value := "0"
if count > 0 {
// assumes that cpu usage is in millis
value = fmt.Sprintf("%dm", sum/uint64(count))
}
return ResourceConsumption{Resource: api.ResourceCPU, Quantity: resource.MustParse(value)}, count
return ResourceConsumption{Resource: api.ResourceCPU, Quantity: resource.MustParse(value)}, count, timestamp
}},
api.ResourceMemory: {"memory-usage",
func(metrics heapster.MetricResultList) (ResourceConsumption, int) {
sum, count := calculateSumFromLatestSample(metrics)
func(metrics heapster.MetricResultList) (ResourceConsumption, int, time.Time) {
sum, count, timestamp := calculateSumFromLatestSample(metrics)
value := int64(0)
if count > 0 {
value = int64(sum) / int64(count)
}
return ResourceConsumption{Resource: api.ResourceMemory, Quantity: *resource.NewQuantity(value, resource.DecimalSI)}, count
return ResourceConsumption{Resource: api.ResourceMemory, Quantity: *resource.NewQuantity(value, resource.DecimalSI)}, count, timestamp
}},
}

Expand All @@ -108,22 +107,22 @@ func NewHeapsterMetricsClient(client client.Interface, namespace, scheme, servic
}
}

func (h *HeapsterMetricsClient) GetCPUUtilization(namespace string, selector map[string]string) (*int, error) {
consumption, request, err := h.GetResourceConsumptionAndRequest(api.ResourceCPU, namespace, selector)
func (h *HeapsterMetricsClient) GetCPUUtilization(namespace string, selector map[string]string) (*int, time.Time, error) {
consumption, request, timestamp, err := h.GetResourceConsumptionAndRequest(api.ResourceCPU, namespace, selector)
if err != nil {
return nil, fmt.Errorf("failed to get CPU consumption and request: %v", err)
return nil, time.Time{}, fmt.Errorf("failed to get CPU consumption and request: %v", err)
}
utilization := new(int)
*utilization = int(float64(consumption.Quantity.MilliValue()) / float64(request.MilliValue()) * 100)
return utilization, nil
return utilization, timestamp, nil
}

func (h *HeapsterMetricsClient) GetResourceConsumptionAndRequest(resourceName api.ResourceName, namespace string, selector map[string]string) (consumption *ResourceConsumption, request *resource.Quantity, err error) {
func (h *HeapsterMetricsClient) GetResourceConsumptionAndRequest(resourceName api.ResourceName, namespace string, selector map[string]string) (consumption *ResourceConsumption, request *resource.Quantity, timestamp time.Time, err error) {
podList, err := h.client.Pods(namespace).
List(labels.SelectorFromSet(labels.Set(selector)), fields.Everything())

if err != nil {
return nil, nil, fmt.Errorf("failed to get pod list: %v", err)
return nil, nil, time.Time{}, fmt.Errorf("failed to get pod list: %v", err)
}
podNames := []string{}
sum := resource.MustParse("0")
Expand All @@ -140,22 +139,22 @@ func (h *HeapsterMetricsClient) GetResourceConsumptionAndRequest(resourceName ap
}
}
if missing || sum.Cmp(resource.MustParse("0")) == 0 {
return nil, nil, fmt.Errorf("some pods do not have request for %s", resourceName)
return nil, nil, time.Time{}, fmt.Errorf("some pods do not have request for %s", resourceName)
}
glog.Infof("Sum of %s requested: %v", resourceName, sum)
avg := resource.MustParse(fmt.Sprintf("%dm", sum.MilliValue()/int64(len(podList.Items))))
request = &avg
consumption, err = h.getForPods(resourceName, namespace, podNames)
consumption, timestamp, err = h.getForPods(resourceName, namespace, podNames)
if err != nil {
return nil, nil, err
return nil, nil, time.Time{}, err
}
return consumption, request, nil
return consumption, request, timestamp, nil
}

func (h *HeapsterMetricsClient) getForPods(resourceName api.ResourceName, namespace string, podNames []string) (*ResourceConsumption, error) {
func (h *HeapsterMetricsClient) getForPods(resourceName api.ResourceName, namespace string, podNames []string) (*ResourceConsumption, time.Time, error) {
metricSpec, metricDefined := h.resourceDefinitions[resourceName]
if !metricDefined {
return nil, fmt.Errorf("heapster metric not defined for %v", resourceName)
return nil, time.Time{}, fmt.Errorf("heapster metric not defined for %v", resourceName)
}
now := time.Now()

Expand All @@ -170,40 +169,49 @@ func (h *HeapsterMetricsClient) getForPods(resourceName api.ResourceName, namesp
DoRaw()

if err != nil {
return nil, fmt.Errorf("failed to get pods metrics: %v", err)
return nil, time.Time{}, fmt.Errorf("failed to get pods metrics: %v", err)
}

var metrics heapster.MetricResultList
err = json.Unmarshal(resultRaw, &metrics)
if err != nil {
return nil, fmt.Errorf("failed to unmarshall heapster response: %v", err)
return nil, time.Time{}, fmt.Errorf("failed to unmarshall heapster response: %v", err)
}

glog.Infof("Metrics available: %s", string(resultRaw))

currentConsumption, count := metricSpec.aggregator(metrics)
currentConsumption, count, timestamp := metricSpec.aggregator(metrics)
if count != len(podNames) {
return nil, fmt.Errorf("metrics obtained for %d/%d of pods", count, len(podNames))
return nil, time.Time{}, fmt.Errorf("metrics obtained for %d/%d of pods", count, len(podNames))
}

return &currentConsumption, nil
return &currentConsumption, timestamp, nil
}

func calculateSumFromLatestSample(metrics heapster.MetricResultList) (uint64, int) {
sum := uint64(0)
count := 0
func calculateSumFromLatestSample(metrics heapster.MetricResultList) (sum uint64, count int, timestamp time.Time) {
sum = uint64(0)
count = 0
timestamp = time.Time{}
var oldest *time.Time // creation time of the oldest of used samples across pods
oldest = nil
for _, metrics := range metrics.Items {
var newest *heapster.MetricPoint
var newest *heapster.MetricPoint // creation time of the newest sample for pod
newest = nil
for i, metricPoint := range metrics.Metrics {
if newest == nil || newest.Timestamp.Before(metricPoint.Timestamp) {
newest = &metrics.Metrics[i]
}
}
if newest != nil {
if oldest == nil || newest.Timestamp.Before(*oldest) {
oldest = &newest.Timestamp
}
sum += newest.Value
count++
}
}
return sum, count
if oldest != nil {
timestamp = *oldest
}
return sum, count, timestamp
}