-
Notifications
You must be signed in to change notification settings - Fork 963
/
pdb.go
153 lines (127 loc) · 4.58 KB
/
pdb.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
/*
Copyright 2023 The Volcano Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package pdb
import (
pdbPolicy "k8s.io/api/policy/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/informers"
policylisters "k8s.io/client-go/listers/policy/v1"
"k8s.io/klog/v2"
"volcano.sh/volcano/pkg/scheduler/api"
"volcano.sh/volcano/pkg/scheduler/framework"
"volcano.sh/volcano/pkg/scheduler/plugins/util"
)
// PluginName indicates name of volcano scheduler plugin
const PluginName = "pdb"
type pdbPlugin struct {
// Arguments given for pdb plugin
pluginArguments framework.Arguments
// Lister for PodDisruptionBudget
lister policylisters.PodDisruptionBudgetLister
}
// New function returns pdb plugin object
func New(arguments framework.Arguments) framework.Plugin {
return &pdbPlugin{
pluginArguments: arguments,
lister: nil,
}
}
// Name function returns pdb plugin name
func (pp *pdbPlugin) Name() string {
return PluginName
}
func (pp *pdbPlugin) OnSessionOpen(ssn *framework.Session) {
klog.V(4).Infof("Enter pdb plugin ...")
defer klog.V(4).Infof("Leaving pdb plugin.")
// 0. Init the PDB lister
if pp.lister == nil {
pp.lister = getPDBLister(ssn.InformerFactory())
}
// 1. define the func to filter out tasks that violate PDB constraints
pdbFilterFn := func(tasks []*api.TaskInfo) []*api.TaskInfo {
var victims []*api.TaskInfo
// (a. get all PDBs
pdbs, err := getPodDisruptionBudgets(pp.lister)
if err != nil {
klog.Errorf("Failed to list pdbs condition: %v", err)
return victims
}
// (b. init the pdbsAllowed array
pdbsAllowed := make([]int32, len(pdbs))
for i, pdb := range pdbs {
pdbsAllowed[i] = pdb.Status.DisruptionsAllowed
}
// (c. range every task to check if it violates the PDB constraints.
// If task does not violate the PDB constraints, then add it to victims.
for _, task := range tasks {
pod := task.Pod
pdbForPodIsViolated := false
// A pod with no labels will not match any PDB. So, no need to check.
if len(pod.Labels) == 0 {
continue
}
for i, pdb := range pdbs {
if pdb.Namespace != pod.Namespace {
continue
}
selector, err := metav1.LabelSelectorAsSelector(pdb.Spec.Selector)
if err != nil {
continue
}
// A PDB with a nil or empty selector matches nothing.
if selector.Empty() || !selector.Matches(labels.Set(pod.Labels)) {
continue
}
// Existing in DisruptedPods means it has been processed in API server,
// we don't treat it as a violating case.
if _, exist := pdb.Status.DisruptedPods[pod.Name]; exist {
continue
}
// Only decrement the matched pdb when it's not in its <DisruptedPods>;
// otherwise we may over-decrement the budget number.
pdbsAllowed[i]--
if pdbsAllowed[i] < 0 {
pdbForPodIsViolated = true
}
}
if !pdbForPodIsViolated {
victims = append(victims, task)
} else {
klog.V(4).Infof("The pod <%s> of task <%s> violates the pdb constraint, so filter it from the victim list", task.Name, task.Pod.Name)
}
}
return victims
}
// 2. wrap pdbFilterFn to meet reclaimable and preemptable interface requirements
wrappedPdbFilterFn := func(preemptor *api.TaskInfo, preemptees []*api.TaskInfo) ([]*api.TaskInfo, int) {
return pdbFilterFn(preemptees), util.Permit
}
// 3. register VictimTasksFns, ReclaimableFn and PreemptableFn
victimsFns := []api.VictimTasksFn{pdbFilterFn}
ssn.AddVictimTasksFns(pp.Name(), victimsFns)
ssn.AddReclaimableFn(pp.Name(), wrappedPdbFilterFn)
ssn.AddPreemptableFn(pp.Name(), wrappedPdbFilterFn)
}
func (pp *pdbPlugin) OnSessionClose(ssn *framework.Session) {}
// getPDBLister returns the lister of PodDisruptionBudget
func getPDBLister(informerFactory informers.SharedInformerFactory) policylisters.PodDisruptionBudgetLister {
return informerFactory.Policy().V1().PodDisruptionBudgets().Lister()
}
// getPodDisruptionBudgets returns all pdbs
func getPodDisruptionBudgets(pdbLister policylisters.PodDisruptionBudgetLister) ([]*pdbPolicy.PodDisruptionBudget, error) {
if pdbLister != nil {
return pdbLister.List(labels.Everything())
}
return nil, nil
}