-
Notifications
You must be signed in to change notification settings - Fork 1
/
eval_containerscalingrule.go
120 lines (102 loc) · 3.16 KB
/
eval_containerscalingrule.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
package scaling
import (
"sync"
scalingpolicy "github.com/justinsb/scaler/pkg/apis/scalingpolicy/v1alpha1"
"github.com/justinsb/scaler/pkg/factors"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/clock"
)
type containerScalingRuleEvaluator struct {
mutex sync.Mutex
clock clock.Clock
rule *scalingpolicy.ContainerScalingRule
limits map[v1.ResourceName]*resourceScalingRuleEvaluator
requests map[v1.ResourceName]*resourceScalingRuleEvaluator
}
func newContainerScalingRuleEvaluator(rule *scalingpolicy.ContainerScalingRule, clock clock.Clock) *containerScalingRuleEvaluator {
e := &containerScalingRuleEvaluator{
rule: rule,
clock: clock,
limits: make(map[v1.ResourceName]*resourceScalingRuleEvaluator),
requests: make(map[v1.ResourceName]*resourceScalingRuleEvaluator),
}
e.updatePolicy(rule)
return e
}
func (e *containerScalingRuleEvaluator) updatePolicy(rule *scalingpolicy.ContainerScalingRule) {
e.mutex.Lock()
defer e.mutex.Unlock()
e.updateResourceMap(rule.Resources.Limits, e.limits)
e.updateResourceMap(rule.Resources.Requests, e.requests)
}
func (e *containerScalingRuleEvaluator) updateResourceMap(rules []scalingpolicy.ResourceScalingRule, evaluators map[v1.ResourceName]*resourceScalingRuleEvaluator) {
marked := make(map[v1.ResourceName]bool)
for i := range rules {
r := &rules[i]
re := evaluators[r.Resource]
if re == nil {
re = &resourceScalingRuleEvaluator{clock: e.clock}
evaluators[r.Resource] = re
}
re.updatePolicy(r)
marked[r.Resource] = true
}
for k := range evaluators {
if !marked[k] {
delete(evaluators, k)
}
}
}
// ComputeResources computes a list of resource quantities based on the input state and the specified policy
// It returns a partial PodSpec with the resources we should apply
func (e *containerScalingRuleEvaluator) computeResources(parentPath string, currentParent *v1.Container) (*v1.Container, error) {
container := &v1.Container{
Name: e.rule.Name,
}
if currentParent == nil {
currentParent = &v1.Container{}
}
for k, re := range e.limits {
current := currentParent.Resources.Limits[k]
r, err := re.computeResources(parentPath+".limits."+string(k), current)
if err != nil {
return nil, err
}
if r == nil {
continue
}
if container.Resources.Limits == nil {
container.Resources.Limits = make(v1.ResourceList)
}
container.Resources.Limits[k] = *r
}
for k, re := range e.requests {
current := currentParent.Resources.Requests[k]
r, err := re.computeResources(parentPath+".requests."+string(k), current)
if err != nil {
return nil, err
}
if r == nil {
continue
}
if container.Resources.Requests == nil {
container.Resources.Requests = make(v1.ResourceList)
}
container.Resources.Requests[k] = *r
}
if len(container.Resources.Requests) == 0 && len(container.Resources.Limits) == 0 {
return nil, nil
}
return container, nil
}
// AddObservation is called whenever we observe input values
func (e *containerScalingRuleEvaluator) addObservation(inputs factors.Snapshot) {
e.mutex.Lock()
defer e.mutex.Unlock()
for _, re := range e.limits {
re.addObservation(inputs)
}
for _, re := range e.requests {
re.addObservation(inputs)
}
}