-
Notifications
You must be signed in to change notification settings - Fork 1k
/
kubernetes_workload_scaler.go
148 lines (125 loc) · 4.17 KB
/
kubernetes_workload_scaler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package scalers
import (
"context"
"fmt"
"strconv"
"k8s.io/api/autoscaling/v2beta2"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/metrics/pkg/apis/external_metrics"
"sigs.k8s.io/controller-runtime/pkg/client"
kedautil "github.com/kedacore/keda/v2/pkg/util"
)
type kubernetesWorkloadScaler struct {
metadata *kubernetesWorkloadMetadata
kubeClient client.Client
}
const (
kubernetesWorkloadMetricType = "External"
podSelectorKey = "podSelector"
valueKey = "value"
)
var phasesCountedAsTerminated = []corev1.PodPhase{
corev1.PodSucceeded,
corev1.PodFailed,
}
type kubernetesWorkloadMetadata struct {
podSelector labels.Selector
namespace string
value int64
scalerIndex int
}
// NewKubernetesWorkloadScaler creates a new kubernetesWorkloadScaler
func NewKubernetesWorkloadScaler(kubeClient client.Client, config *ScalerConfig) (Scaler, error) {
meta, parseErr := parseWorkloadMetadata(config)
if parseErr != nil {
return nil, fmt.Errorf("error parsing kubernetes workload metadata: %s", parseErr)
}
return &kubernetesWorkloadScaler{
metadata: meta,
kubeClient: kubeClient,
}, nil
}
func parseWorkloadMetadata(config *ScalerConfig) (*kubernetesWorkloadMetadata, error) {
meta := &kubernetesWorkloadMetadata{}
var err error
meta.namespace = config.Namespace
meta.podSelector, err = labels.Parse(config.TriggerMetadata[podSelectorKey])
if err != nil || meta.podSelector.String() == "" {
return nil, fmt.Errorf("invalid pod selector")
}
meta.value, err = strconv.ParseInt(config.TriggerMetadata[valueKey], 10, 64)
if err != nil || meta.value == 0 {
return nil, fmt.Errorf("value must be an integer greater than 0")
}
meta.scalerIndex = config.ScalerIndex
return meta, nil
}
// IsActive determines if we need to scale from zero
func (s *kubernetesWorkloadScaler) IsActive(ctx context.Context) (bool, error) {
pods, err := s.getMetricValue(ctx)
if err != nil {
return false, err
}
return pods > 0, nil
}
// Close no need for kubernetes workload scaler
func (s *kubernetesWorkloadScaler) Close(context.Context) error {
return nil
}
// GetMetricSpecForScaling returns the metric spec for the HPA
func (s *kubernetesWorkloadScaler) GetMetricSpecForScaling(context.Context) []v2beta2.MetricSpec {
targetMetricValue := resource.NewQuantity(s.metadata.value, resource.DecimalSI)
externalMetric := &v2beta2.ExternalMetricSource{
Metric: v2beta2.MetricIdentifier{
Name: GenerateMetricNameWithIndex(s.metadata.scalerIndex, kedautil.NormalizeString(fmt.Sprintf("workload-%s", s.metadata.namespace))),
},
Target: v2beta2.MetricTarget{
Type: v2beta2.AverageValueMetricType,
AverageValue: targetMetricValue,
},
}
metricSpec := v2beta2.MetricSpec{External: externalMetric, Type: kubernetesWorkloadMetricType}
return []v2beta2.MetricSpec{metricSpec}
}
// GetMetrics returns value for a supported metric
func (s *kubernetesWorkloadScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) {
pods, err := s.getMetricValue(ctx)
if err != nil {
return []external_metrics.ExternalMetricValue{}, fmt.Errorf("error inspecting kubernetes workload: %s", err)
}
metric := external_metrics.ExternalMetricValue{
MetricName: metricName,
Value: *resource.NewQuantity(int64(pods), resource.DecimalSI),
Timestamp: metav1.Now(),
}
return append([]external_metrics.ExternalMetricValue{}, metric), nil
}
func (s *kubernetesWorkloadScaler) getMetricValue(ctx context.Context) (int, error) {
podList := &corev1.PodList{}
listOptions := client.ListOptions{}
listOptions.LabelSelector = s.metadata.podSelector
listOptions.Namespace = s.metadata.namespace
opts := []client.ListOption{
&listOptions,
}
err := s.kubeClient.List(ctx, podList, opts...)
if err != nil {
return 0, err
}
count := 0
for _, pod := range podList.Items {
count += getCountValue(pod)
}
return count, nil
}
func getCountValue(pod corev1.Pod) int {
for _, ignore := range phasesCountedAsTerminated {
if pod.Status.Phase == ignore {
return 0
}
}
return 1
}