forked from kubernetes/kubernetes
-
Notifications
You must be signed in to change notification settings - Fork 0
/
eqivalence.go
357 lines (323 loc) · 11.6 KB
/
eqivalence.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package equivalence defines Pod equivalence classes and the equivalence class
// cache.
package equivalence
import (
"fmt"
"hash/fnv"
"sync"
"github.com/golang/glog"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler/algorithm"
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
schedulercache "k8s.io/kubernetes/pkg/scheduler/cache"
"k8s.io/kubernetes/pkg/scheduler/metrics"
hashutil "k8s.io/kubernetes/pkg/util/hash"
)
// Cache is a thread safe map saves and reuses the output of predicate functions,
// it uses node name as key to access those cached results.
//
// Internally, results are keyed by predicate name, and "equivalence
// class". (Equivalence class is defined in the `Class` type.) Saved results
// will be reused until an appropriate invalidation function is called.
type Cache struct {
// i.e. map[string]*NodeCache
sync.Map
}
// NewCache create an empty equiv class cache.
func NewCache() *Cache {
return new(Cache)
}
// NodeCache saves and reuses the output of predicate functions. Use RunPredicate to
// get or update the cached results. An appropriate Invalidate* function should
// be called when some predicate results are no longer valid.
//
// Internally, results are keyed by predicate name, and "equivalence
// class". (Equivalence class is defined in the `Class` type.) Saved results
// will be reused until an appropriate invalidation function is called.
//
// NodeCache objects are thread safe within the context of NodeCache,
type NodeCache struct {
mu sync.RWMutex
cache predicateMap
}
// newNodeCache returns an empty NodeCache.
func newNodeCache() *NodeCache {
return &NodeCache{
cache: make(predicateMap),
}
}
// GetNodeCache returns the existing NodeCache for given node if present. Otherwise,
// it creates the NodeCache and returns it.
// The boolean flag is true if the value was loaded, false if created.
func (c *Cache) GetNodeCache(name string) (nodeCache *NodeCache, exists bool) {
v, exists := c.LoadOrStore(name, newNodeCache())
nodeCache = v.(*NodeCache)
return
}
// InvalidatePredicates clears all cached results for the given predicates.
func (c *Cache) InvalidatePredicates(predicateKeys sets.String) {
if len(predicateKeys) == 0 {
return
}
c.Range(func(k, v interface{}) bool {
n := v.(*NodeCache)
n.invalidatePreds(predicateKeys)
return true
})
glog.V(5).Infof("Cache invalidation: node=*,predicates=%v", predicateKeys)
}
// InvalidatePredicatesOnNode clears cached results for the given predicates on one node.
func (c *Cache) InvalidatePredicatesOnNode(nodeName string, predicateKeys sets.String) {
if len(predicateKeys) == 0 {
return
}
if v, ok := c.Load(nodeName); ok {
n := v.(*NodeCache)
n.invalidatePreds(predicateKeys)
}
glog.V(5).Infof("Cache invalidation: node=%s,predicates=%v", nodeName, predicateKeys)
}
// InvalidateAllPredicatesOnNode clears all cached results for one node.
func (c *Cache) InvalidateAllPredicatesOnNode(nodeName string) {
c.Delete(nodeName)
glog.V(5).Infof("Cache invalidation: node=%s,predicates=*", nodeName)
}
// InvalidateCachedPredicateItemForPodAdd is a wrapper of
// InvalidateCachedPredicateItem for pod add case
// TODO: This does not belong with the equivalence cache implementation.
func (c *Cache) InvalidateCachedPredicateItemForPodAdd(pod *v1.Pod, nodeName string) {
// MatchInterPodAffinity: we assume scheduler can make sure newly bound pod
// will not break the existing inter pod affinity. So we does not need to
// invalidate MatchInterPodAffinity when pod added.
//
// But when a pod is deleted, existing inter pod affinity may become invalid.
// (e.g. this pod was preferred by some else, or vice versa)
//
// NOTE: assumptions above will not stand when we implemented features like
// RequiredDuringSchedulingRequiredDuringExecutioc.
// NoDiskConflict: the newly scheduled pod fits to existing pods on this node,
// it will also fits to equivalence class of existing pods
// GeneralPredicates: will always be affected by adding a new pod
invalidPredicates := sets.NewString(predicates.GeneralPred)
// MaxPDVolumeCountPredicate: we check the volumes of pod to make decisioc.
for _, vol := range pod.Spec.Volumes {
if vol.PersistentVolumeClaim != nil {
invalidPredicates.Insert(
predicates.MaxEBSVolumeCountPred,
predicates.MaxGCEPDVolumeCountPred,
predicates.MaxAzureDiskVolumeCountPred)
if utilfeature.DefaultFeatureGate.Enabled(features.AttachVolumeLimit) {
invalidPredicates.Insert(predicates.MaxCSIVolumeCountPred)
}
} else {
// We do not consider CSI volumes here because CSI
// volumes can not be used inline.
if vol.AWSElasticBlockStore != nil {
invalidPredicates.Insert(predicates.MaxEBSVolumeCountPred)
}
if vol.GCEPersistentDisk != nil {
invalidPredicates.Insert(predicates.MaxGCEPDVolumeCountPred)
}
if vol.AzureDisk != nil {
invalidPredicates.Insert(predicates.MaxAzureDiskVolumeCountPred)
}
}
}
c.InvalidatePredicatesOnNode(nodeName, invalidPredicates)
}
// Class represents a set of pods which are equivalent from the perspective of
// the scheduler. i.e. the scheduler would make the same decision for any pod
// from the same class.
type Class struct {
// Equivalence hash
hash uint64
}
// NewClass returns the equivalence class for a given Pod. The returned Class
// objects will be equal for two Pods in the same class. nil values should not
// be considered equal to each other.
//
// NOTE: Make sure to compare types of Class and not *Class.
// TODO(misterikkit): Return error instead of nil *Class.
func NewClass(pod *v1.Pod) *Class {
equivalencePod := getEquivalencePod(pod)
if equivalencePod != nil {
hash := fnv.New32a()
hashutil.DeepHashObject(hash, equivalencePod)
return &Class{
hash: uint64(hash.Sum32()),
}
}
return nil
}
// predicateMap stores resultMaps with predicate name as the key.
type predicateMap map[string]resultMap
// resultMap stores PredicateResult with pod equivalence hash as the key.
type resultMap map[uint64]predicateResult
// predicateResult stores the output of a FitPredicate.
type predicateResult struct {
Fit bool
FailReasons []algorithm.PredicateFailureReason
}
// RunPredicate returns a cached predicate result. In case of a cache miss, the predicate will be
// run and its results cached for the next call.
//
// NOTE: RunPredicate will not update the equivalence cache if the given NodeInfo is stale.
func (n *NodeCache) RunPredicate(
pred algorithm.FitPredicate,
predicateKey string,
pod *v1.Pod,
meta algorithm.PredicateMetadata,
nodeInfo *schedulercache.NodeInfo,
equivClass *Class,
cache schedulercache.Cache,
) (bool, []algorithm.PredicateFailureReason, error) {
if nodeInfo == nil || nodeInfo.Node() == nil {
// This may happen during tests.
return false, []algorithm.PredicateFailureReason{}, fmt.Errorf("nodeInfo is nil or node is invalid")
}
result, ok := n.lookupResult(pod.GetName(), nodeInfo.Node().GetName(), predicateKey, equivClass.hash)
if ok {
return result.Fit, result.FailReasons, nil
}
fit, reasons, err := pred(pod, meta, nodeInfo)
if err != nil {
return fit, reasons, err
}
if cache != nil {
n.updateResult(pod.GetName(), predicateKey, fit, reasons, equivClass.hash, cache, nodeInfo)
}
return fit, reasons, nil
}
// updateResult updates the cached result of a predicate.
func (n *NodeCache) updateResult(
podName, predicateKey string,
fit bool,
reasons []algorithm.PredicateFailureReason,
equivalenceHash uint64,
cache schedulercache.Cache,
nodeInfo *schedulercache.NodeInfo,
) {
if nodeInfo == nil || nodeInfo.Node() == nil {
// This may happen during tests.
metrics.EquivalenceCacheWrites.WithLabelValues("discarded_bad_node").Inc()
return
}
// Skip update if NodeInfo is stale.
if !cache.IsUpToDate(nodeInfo) {
metrics.EquivalenceCacheWrites.WithLabelValues("discarded_stale").Inc()
return
}
predicateItem := predicateResult{
Fit: fit,
FailReasons: reasons,
}
n.mu.Lock()
defer n.mu.Unlock()
// If cached predicate map already exists, just update the predicate by key
if predicates, ok := n.cache[predicateKey]; ok {
// maps in golang are references, no need to add them back
predicates[equivalenceHash] = predicateItem
} else {
n.cache[predicateKey] =
resultMap{
equivalenceHash: predicateItem,
}
}
glog.V(5).Infof("Cache update: node=%s, predicate=%s,pod=%s,value=%v",
nodeInfo.Node().Name, predicateKey, podName, predicateItem)
}
// lookupResult returns cached predicate results and a bool saying whether a
// cache entry was found.
func (n *NodeCache) lookupResult(
podName, nodeName, predicateKey string,
equivalenceHash uint64,
) (value predicateResult, ok bool) {
n.mu.RLock()
defer n.mu.RUnlock()
value, ok = n.cache[predicateKey][equivalenceHash]
if ok {
metrics.EquivalenceCacheHits.Inc()
} else {
metrics.EquivalenceCacheMisses.Inc()
}
return value, ok
}
// invalidatePreds deletes cached predicates by given keys.
func (n *NodeCache) invalidatePreds(predicateKeys sets.String) {
n.mu.Lock()
defer n.mu.Unlock()
for predicateKey := range predicateKeys {
delete(n.cache, predicateKey)
}
}
// equivalencePod is the set of pod attributes which must match for two pods to
// be considered equivalent for scheduling purposes. For correctness, this must
// include any Pod field which is used by a FitPredicate.
//
// NOTE: For equivalence hash to be formally correct, lists and maps in the
// equivalencePod should be normalized. (e.g. by sorting them) However, the vast
// majority of equivalent pod classes are expected to be created from a single
// pod template, so they will all have the same ordering.
type equivalencePod struct {
Namespace *string
Labels map[string]string
Affinity *v1.Affinity
Containers []v1.Container // See note about ordering
InitContainers []v1.Container // See note about ordering
NodeName *string
NodeSelector map[string]string
Tolerations []v1.Toleration
Volumes []v1.Volume // See note about ordering
}
// getEquivalencePod returns a normalized representation of a pod so that two
// "equivalent" pods will hash to the same value.
func getEquivalencePod(pod *v1.Pod) *equivalencePod {
ep := &equivalencePod{
Namespace: &pod.Namespace,
Labels: pod.Labels,
Affinity: pod.Spec.Affinity,
Containers: pod.Spec.Containers,
InitContainers: pod.Spec.InitContainers,
NodeName: &pod.Spec.NodeName,
NodeSelector: pod.Spec.NodeSelector,
Tolerations: pod.Spec.Tolerations,
Volumes: pod.Spec.Volumes,
}
// DeepHashObject considers nil and empty slices to be different. Normalize them.
if len(ep.Containers) == 0 {
ep.Containers = nil
}
if len(ep.InitContainers) == 0 {
ep.InitContainers = nil
}
if len(ep.Tolerations) == 0 {
ep.Tolerations = nil
}
if len(ep.Volumes) == 0 {
ep.Volumes = nil
}
// Normalize empty maps also.
if len(ep.Labels) == 0 {
ep.Labels = nil
}
if len(ep.NodeSelector) == 0 {
ep.NodeSelector = nil
}
// TODO(misterikkit): Also normalize nested maps and slices.
return ep
}