forked from kubewharf/katalyst-core
/
pressure_suppression.go
148 lines (124 loc) · 5.2 KB
/
pressure_suppression.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
/*
Copyright 2022 The Katalyst Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package strategy
import (
"context"
"fmt"
"math"
"sync"
"time"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"github.com/kubewharf/katalyst-api/pkg/protocol/evictionplugin/v1alpha1"
pluginapi "github.com/kubewharf/katalyst-api/pkg/protocol/evictionplugin/v1alpha1"
"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/dynamicpolicy/state"
"github.com/kubewharf/katalyst-core/pkg/config"
"github.com/kubewharf/katalyst-core/pkg/metaserver"
"github.com/kubewharf/katalyst-core/pkg/metrics"
"github.com/kubewharf/katalyst-core/pkg/util/general"
"github.com/kubewharf/katalyst-core/pkg/util/native"
"github.com/kubewharf/katalyst-core/pkg/util/qos"
)
type CPUPressureSuppression struct {
conf *config.Configuration
state state.ReadonlyState
lastToleranceTime sync.Map
}
func NewCPUPressureSuppressionEviction(_ metrics.MetricEmitter, _ *metaserver.MetaServer,
conf *config.Configuration, state state.ReadonlyState) CPUPressureForceEviction {
return &CPUPressureSuppression{
conf: conf,
state: state,
}
}
func (p *CPUPressureSuppression) Start(context.Context) error { return nil }
func (p *CPUPressureSuppression) Name() string { return "pressure-suppression" }
func (p *CPUPressureSuppression) GetEvictPods(_ context.Context, request *pluginapi.GetEvictPodsRequest) (*pluginapi.GetEvictPodsResponse, error) {
if request == nil {
return nil, fmt.Errorf("GetTopEvictionPods got nil request")
}
dynamicConfig := p.conf.GetDynamicConfiguration()
if !dynamicConfig.EnableSuppressionEviction {
return &pluginapi.GetEvictPodsResponse{}, nil
}
// only reclaim pool support suppression tolerance eviction
entries := p.state.GetPodEntries()
poolCPUSet, err := entries.GetCPUSetForPool(state.PoolNameReclaim)
if err != nil {
return nil, fmt.Errorf("get reclaim pool failed: %s", err)
}
// skip evict pods if pool size is zero
poolSize := poolCPUSet.Size()
if poolSize == 0 {
return &pluginapi.GetEvictPodsResponse{}, nil
}
filteredPods := native.FilterPods(request.ActivePods, p.conf.CheckReclaimedQoSForPod)
if len(filteredPods) == 0 {
return &pluginapi.GetEvictPodsResponse{}, nil
}
// prioritize evicting the pod whose cpu request is larger and priority is lower
general.NewMultiSorter(
general.ReverseCmpFunc(native.PodCPURequestCmpFunc),
general.ReverseCmpFunc(native.PodPriorityCmpFunc),
).Sort(native.NewPodSourceImpList(filteredPods))
// sum all pod cpu request
totalCPURequest := resource.Quantity{}
for _, pod := range filteredPods {
totalCPURequest.Add(native.GetCPUQuantity(native.SumUpPodRequestResources(pod)))
}
now := time.Now()
var evictPods []*v1alpha1.EvictPod
for _, pod := range filteredPods {
key := native.GenerateUniqObjectNameKey(pod)
poolSuppressionRate := float64(totalCPURequest.Value()) / float64(poolSize)
if podToleranceRate := p.getPodToleranceRate(pod, dynamicConfig.MaxSuppressionToleranceRate); podToleranceRate < poolSuppressionRate {
last, _ := p.lastToleranceTime.LoadOrStore(key, now)
lastDuration := now.Sub(last.(time.Time))
general.Infof("current pool suppression rate %.2f, "+
"and it is over than suppression tolerance rate %.2f of pod %s, last duration: %s secs", poolSuppressionRate,
podToleranceRate, key, now.Sub(last.(time.Time)))
// a pod will only be evicted if its cpu suppression lasts longer than minToleranceDuration
if lastDuration > dynamicConfig.MinSuppressionToleranceDuration {
evictPods = append(evictPods, &v1alpha1.EvictPod{
Pod: pod,
Reason: fmt.Sprintf("current pool suppression rate %.2f is over than the "+
"pod suppression tolerance rate %.2f", poolSuppressionRate, podToleranceRate),
})
totalCPURequest.Sub(native.GetCPUQuantity(native.SumUpPodRequestResources(pod)))
}
} else {
p.lastToleranceTime.Delete(key)
}
}
// clear inactive filtered pod from lastToleranceTime
filteredPodsMap := native.GetPodNamespaceNameKeyMap(filteredPods)
p.lastToleranceTime.Range(func(key, _ interface{}) bool {
if _, ok := filteredPodsMap[key.(string)]; !ok {
p.lastToleranceTime.Delete(key)
}
return true
})
return &pluginapi.GetEvictPodsResponse{EvictPods: evictPods}, nil
}
// getPodToleranceRate returns pod suppression tolerance rate,
// and it is limited by max cpu suppression tolerance rate.
func (p *CPUPressureSuppression) getPodToleranceRate(pod *v1.Pod, maxToleranceRate float64) float64 {
rate, err := qos.GetPodCPUSuppressionToleranceRate(p.conf.QoSConfiguration, pod)
if err != nil {
general.Errorf("pod %s get cpu suppression tolerance rate failed: %s",
native.GenerateUniqObjectNameKey(pod), err)
return maxToleranceRate
} else {
return math.Min(rate, maxToleranceRate)
}
}