Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CustomMetrics in HPA MetricsClient #20239

Merged
merged 1 commit into from
Jan 30, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
149 changes: 85 additions & 64 deletions pkg/controller/podautoscaler/metrics/metrics_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (

"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/resource"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/labels"

Expand All @@ -46,16 +45,19 @@ type MetricsClient interface {
// (e.g. 70 means that an average pod uses 70% of the requested CPU)
// and the time of generation of the oldest of utilization reports for pods.
GetCPUUtilization(namespace string, selector map[string]string) (*int, time.Time, error)

// GetCustomMetric returns the average value of the given custom metrics from the
// pods picked using the namespace and selector passed as arguments.
GetCustomMetric(customMetricName string, namespace string, selector map[string]string) (*float64, time.Time, error)
}

// ResourceConsumption specifies consumption of a particular resource.
type ResourceConsumption struct {
Resource api.ResourceName
Quantity resource.Quantity
type intAndFloat struct {
intValue int64
floatValue float64
}

// Aggregates results into ResourceConsumption. Also returns number of pods included in the aggregation.
type metricAggregator func(heapster.MetricResultList) (ResourceConsumption, int, time.Time)
type metricAggregator func(heapster.MetricResultList) (intAndFloat, int, time.Time)

type metricDefinition struct {
name string
Expand All @@ -64,98 +66,111 @@ type metricDefinition struct {

// HeapsterMetricsClient is Heapster-based implementation of MetricsClient
type HeapsterMetricsClient struct {
client client.Interface
resourceDefinitions map[api.ResourceName]metricDefinition
heapsterNamespace string
heapsterScheme string
heapsterService string
heapsterPort string
client client.Interface
heapsterNamespace string
heapsterScheme string
heapsterService string
heapsterPort string
}

var heapsterMetricDefinitions = map[api.ResourceName]metricDefinition{
api.ResourceCPU: {"cpu-usage",
func(metrics heapster.MetricResultList) (ResourceConsumption, int, time.Time) {
sum, count, timestamp := calculateSumFromLatestSample(metrics)
value := "0"
if count > 0 {
// assumes that cpu usage is in millis
value = fmt.Sprintf("%dm", sum/uint64(count))
}
return ResourceConsumption{Resource: api.ResourceCPU, Quantity: resource.MustParse(value)}, count, timestamp
}},
api.ResourceMemory: {"memory-usage",
func(metrics heapster.MetricResultList) (ResourceConsumption, int, time.Time) {
sum, count, timestamp := calculateSumFromLatestSample(metrics)
value := int64(0)
if count > 0 {
value = int64(sum) / int64(count)
}
return ResourceConsumption{Resource: api.ResourceMemory, Quantity: *resource.NewQuantity(value, resource.DecimalSI)}, count, timestamp
}},
var averageFunction = func(metrics heapster.MetricResultList) (intAndFloat, int, time.Time) {
sum, count, timestamp := calculateSumFromLatestSample(metrics)
result := intAndFloat{0, 0}
if count > 0 {
result.intValue = sum.intValue / int64(count)
result.floatValue = sum.floatValue / float64(count)
}
return result, count, timestamp
}

var heapsterCpuUsageMetricDefinition = metricDefinition{"cpu-usage", averageFunction}

func getHeapsterCustomMetricDefinition(metricName string) metricDefinition {
return metricDefinition{"CM:" + metricName, averageFunction}
}

// NewHeapsterMetricsClient returns a new instance of Heapster-based implementation of MetricsClient interface.
func NewHeapsterMetricsClient(client client.Interface, namespace, scheme, service, port string) *HeapsterMetricsClient {
return &HeapsterMetricsClient{
client: client,
resourceDefinitions: heapsterMetricDefinitions,
heapsterNamespace: namespace,
heapsterScheme: scheme,
heapsterService: service,
heapsterPort: port,
client: client,
heapsterNamespace: namespace,
heapsterScheme: scheme,
heapsterService: service,
heapsterPort: port,
}
}

func (h *HeapsterMetricsClient) GetCPUUtilization(namespace string, selector map[string]string) (*int, time.Time, error) {
consumption, request, timestamp, err := h.GetResourceConsumptionAndRequest(api.ResourceCPU, namespace, selector)
avgConsumption, avgRequest, timestamp, err := h.GetCpuConsumptionAndRequestInMillis(namespace, selector)
if err != nil {
return nil, time.Time{}, fmt.Errorf("failed to get CPU consumption and request: %v", err)
}
utilization := new(int)
*utilization = int(float64(consumption.Quantity.MilliValue()) / float64(request.MilliValue()) * 100)
return utilization, timestamp, nil
utilization := int((avgConsumption * 100) / avgRequest)
return &utilization, timestamp, nil
}

func (h *HeapsterMetricsClient) GetResourceConsumptionAndRequest(resourceName api.ResourceName, namespace string, selector map[string]string) (consumption *ResourceConsumption, request *resource.Quantity, timestamp time.Time, err error) {
func (h *HeapsterMetricsClient) GetCpuConsumptionAndRequestInMillis(namespace string, selector map[string]string) (avgConsumption int64,
avgRequest int64, timestamp time.Time, err error) {

labelSelector := labels.SelectorFromSet(labels.Set(selector))
podList, err := h.client.Pods(namespace).
List(api.ListOptions{LabelSelector: labelSelector})

if err != nil {
return nil, nil, time.Time{}, fmt.Errorf("failed to get pod list: %v", err)
return 0, 0, time.Time{}, fmt.Errorf("failed to get pod list: %v", err)
}
podNames := []string{}
sum := resource.MustParse("0")
requestSum := int64(0)
missing := false
for _, pod := range podList.Items {
podNames = append(podNames, pod.Name)
for _, container := range pod.Spec.Containers {
containerRequest := container.Resources.Requests[resourceName]
containerRequest := container.Resources.Requests[api.ResourceCPU]
if containerRequest.Amount != nil {
sum.Add(containerRequest)
requestSum += containerRequest.MilliValue()
} else {
missing = true
}
}
}
if missing || sum.Cmp(resource.MustParse("0")) == 0 {
return nil, nil, time.Time{}, fmt.Errorf("some pods do not have request for %s", resourceName)
if missing || requestSum == 0 {
return 0, 0, time.Time{}, fmt.Errorf("some pods do not have request for cpu")
}
glog.Infof("Sum of %s requested: %v", resourceName, sum)
avg := resource.MustParse(fmt.Sprintf("%dm", sum.MilliValue()/int64(len(podList.Items))))
request = &avg
consumption, timestamp, err = h.getForPods(resourceName, namespace, podNames)
glog.Infof("Sum of CPU requested: %d", requestSum)
requestAvg := requestSum / int64(len(podList.Items))
// Consumption is already averaged and in millis.
consumption, timestamp, err := h.getForPods(heapsterCpuUsageMetricDefinition, namespace, podNames)
if err != nil {
return nil, nil, time.Time{}, err
return 0, 0, time.Time{}, err
}
return consumption, request, timestamp, nil
return consumption.intValue, requestAvg, timestamp, nil
}

func (h *HeapsterMetricsClient) getForPods(resourceName api.ResourceName, namespace string, podNames []string) (*ResourceConsumption, time.Time, error) {
metricSpec, metricDefined := h.resourceDefinitions[resourceName]
if !metricDefined {
return nil, time.Time{}, fmt.Errorf("heapster metric not defined for %v", resourceName)
// GetCustomMetric returns the average value of the given custom metric from the
// pods picked using the namespace and selector passed as arguments.
func (h *HeapsterMetricsClient) GetCustomMetric(customMetricName string, namespace string, selector map[string]string) (*float64, time.Time, error) {
metricSpec := getHeapsterCustomMetricDefinition(customMetricName)

labelSelector := labels.SelectorFromSet(labels.Set(selector))
podList, err := h.client.Pods(namespace).List(api.ListOptions{LabelSelector: labelSelector})

if err != nil {
return nil, time.Time{}, fmt.Errorf("failed to get pod list: %v", err)
}
podNames := []string{}
for _, pod := range podList.Items {
podNames = append(podNames, pod.Name)
}

value, timestamp, err := h.getForPods(metricSpec, namespace, podNames)
if err != nil {
return nil, time.Time{}, err
}
return &value.floatValue, timestamp, nil
}

func (h *HeapsterMetricsClient) getForPods(metricSpec metricDefinition, namespace string, podNames []string) (*intAndFloat, time.Time, error) {

now := time.Now()

startTime := now.Add(heapsterQueryStart)
Expand All @@ -180,16 +195,16 @@ func (h *HeapsterMetricsClient) getForPods(resourceName api.ResourceName, namesp

glog.Infof("Metrics available: %s", string(resultRaw))

currentConsumption, count, timestamp := metricSpec.aggregator(metrics)
sum, count, timestamp := metricSpec.aggregator(metrics)
if count != len(podNames) {
return nil, time.Time{}, fmt.Errorf("metrics obtained for %d/%d of pods", count, len(podNames))
}

return &currentConsumption, timestamp, nil
return &sum, timestamp, nil
}

func calculateSumFromLatestSample(metrics heapster.MetricResultList) (sum uint64, count int, timestamp time.Time) {
sum = uint64(0)
func calculateSumFromLatestSample(metrics heapster.MetricResultList) (sum intAndFloat, count int, timestamp time.Time) {
sum = intAndFloat{0, 0}
count = 0
timestamp = time.Time{}
var oldest *time.Time // creation time of the oldest of used samples across pods
Expand All @@ -206,7 +221,13 @@ func calculateSumFromLatestSample(metrics heapster.MetricResultList) (sum uint64
if oldest == nil || newest.Timestamp.Before(*oldest) {
oldest = &newest.Timestamp
}
sum += newest.Value
if newest.FloatValue == nil {
sum.intValue += int64(newest.Value)
sum.floatValue += float64(newest.Value)
} else {
sum.intValue += int64(*newest.FloatValue)
sum.floatValue += *newest.FloatValue
}
count++
}
}
Expand Down