-
Notifications
You must be signed in to change notification settings - Fork 39.4k
/
controller_utils.go
335 lines (288 loc) · 10.9 KB
/
controller_utils.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package endpoint
import (
"crypto/md5"
"encoding/hex"
"fmt"
"reflect"
"sort"
"sync"
v1 "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/labels"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
v1listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/util/hash"
)
// ServiceSelectorCache is a cache of service selectors to avoid high CPU consumption caused by frequent calls to AsSelectorPreValidated (see #73527)
type ServiceSelectorCache struct {
lock sync.RWMutex
cache map[string]labels.Selector
}
// NewServiceSelectorCache init ServiceSelectorCache for both endpoint controller and endpointSlice controller.
func NewServiceSelectorCache() *ServiceSelectorCache {
return &ServiceSelectorCache{
cache: map[string]labels.Selector{},
}
}
// Get return selector and existence in ServiceSelectorCache by key.
func (sc *ServiceSelectorCache) Get(key string) (labels.Selector, bool) {
sc.lock.RLock()
selector, ok := sc.cache[key]
// fine-grained lock improves GetPodServiceMemberships performance(16.5%) than defer measured by BenchmarkGetPodServiceMemberships
sc.lock.RUnlock()
return selector, ok
}
// Update can update or add a selector in ServiceSelectorCache while service's selector changed.
func (sc *ServiceSelectorCache) Update(key string, rawSelector map[string]string) labels.Selector {
sc.lock.Lock()
defer sc.lock.Unlock()
selector := labels.Set(rawSelector).AsSelectorPreValidated()
sc.cache[key] = selector
return selector
}
// Delete can delete selector which exist in ServiceSelectorCache.
func (sc *ServiceSelectorCache) Delete(key string) {
sc.lock.Lock()
defer sc.lock.Unlock()
delete(sc.cache, key)
}
// GetPodServiceMemberships returns a set of Service keys for Services that have
// a selector matching the given pod.
func (sc *ServiceSelectorCache) GetPodServiceMemberships(serviceLister v1listers.ServiceLister, pod *v1.Pod) (sets.String, error) {
set := sets.String{}
services, err := serviceLister.Services(pod.Namespace).List(labels.Everything())
if err != nil {
return set, err
}
var selector labels.Selector
for _, service := range services {
if service.Spec.Selector == nil {
// if the service has a nil selector this means selectors match nothing, not everything.
continue
}
key, err := controller.KeyFunc(service)
if err != nil {
return nil, err
}
if v, ok := sc.Get(key); ok {
selector = v
} else {
selector = sc.Update(key, service.Spec.Selector)
}
if selector.Matches(labels.Set(pod.Labels)) {
set.Insert(key)
}
}
return set, nil
}
// PortMapKey is used to uniquely identify groups of endpoint ports.
type PortMapKey string
// NewPortMapKey generates a PortMapKey from endpoint ports.
func NewPortMapKey(endpointPorts []discovery.EndpointPort) PortMapKey {
sort.Sort(portsInOrder(endpointPorts))
return PortMapKey(DeepHashObjectToString(endpointPorts))
}
// DeepHashObjectToString creates a unique hash string from a go object.
func DeepHashObjectToString(objectToWrite interface{}) string {
hasher := md5.New()
hash.DeepHashObject(hasher, objectToWrite)
return hex.EncodeToString(hasher.Sum(nil)[0:])
}
// ShouldPodBeInEndpointSlice returns true if a specified pod should be in an EndpointSlice object.
// Terminating pods are only included if includeTerminating is true
func ShouldPodBeInEndpointSlice(pod *v1.Pod, includeTerminating bool) bool {
if len(pod.Status.PodIP) == 0 && len(pod.Status.PodIPs) == 0 {
return false
}
if !includeTerminating && pod.DeletionTimestamp != nil {
return false
}
if pod.Spec.RestartPolicy == v1.RestartPolicyNever {
return pod.Status.Phase != v1.PodFailed && pod.Status.Phase != v1.PodSucceeded
}
if pod.Spec.RestartPolicy == v1.RestartPolicyOnFailure {
return pod.Status.Phase != v1.PodSucceeded
}
return true
}
// ShouldSetHostname returns true if the Hostname attribute should be set on an
// Endpoints Address or EndpointSlice Endpoint.
func ShouldSetHostname(pod *v1.Pod, svc *v1.Service) bool {
return len(pod.Spec.Hostname) > 0 && pod.Spec.Subdomain == svc.Name && svc.Namespace == pod.Namespace
}
// podEndpointsChanged returns two boolean values. The first is true if the pod has
// changed in a way that may change existing endpoints. The second value is true if the
// pod has changed in a way that may affect which Services it matches.
func podEndpointsChanged(oldPod, newPod *v1.Pod) (bool, bool) {
// Check if the pod labels have changed, indicating a possible
// change in the service membership
labelsChanged := false
if !reflect.DeepEqual(newPod.Labels, oldPod.Labels) ||
!hostNameAndDomainAreEqual(newPod, oldPod) {
labelsChanged = true
}
// If the pod's deletion timestamp is set, remove endpoint from ready address.
if newPod.DeletionTimestamp != oldPod.DeletionTimestamp {
return true, labelsChanged
}
// If the pod's readiness has changed, the associated endpoint address
// will move from the unready endpoints set to the ready endpoints.
// So for the purposes of an endpoint, a readiness change on a pod
// means we have a changed pod.
if podutil.IsPodReady(oldPod) != podutil.IsPodReady(newPod) {
return true, labelsChanged
}
// Check if the pod IPs have changed
if len(oldPod.Status.PodIPs) != len(newPod.Status.PodIPs) {
return true, labelsChanged
}
for i := range oldPod.Status.PodIPs {
if oldPod.Status.PodIPs[i].IP != newPod.Status.PodIPs[i].IP {
return true, labelsChanged
}
}
// Endpoints may also reference a pod's Name, Namespace, UID, and NodeName, but
// the first three are immutable, and NodeName is immutable once initially set,
// which happens before the pod gets an IP.
return false, labelsChanged
}
// GetServicesToUpdateOnPodChange returns a set of Service keys for Services
// that have potentially been affected by a change to this pod.
func GetServicesToUpdateOnPodChange(serviceLister v1listers.ServiceLister, selectorCache *ServiceSelectorCache, old, cur interface{}) sets.String {
newPod := cur.(*v1.Pod)
oldPod := old.(*v1.Pod)
if newPod.ResourceVersion == oldPod.ResourceVersion {
// Periodic resync will send update events for all known pods.
// Two different versions of the same pod will always have different RVs
return sets.String{}
}
podChanged, labelsChanged := podEndpointsChanged(oldPod, newPod)
// If both the pod and labels are unchanged, no update is needed
if !podChanged && !labelsChanged {
return sets.String{}
}
services, err := selectorCache.GetPodServiceMemberships(serviceLister, newPod)
if err != nil {
utilruntime.HandleError(fmt.Errorf("Unable to get pod %s/%s's service memberships: %v", newPod.Namespace, newPod.Name, err))
return sets.String{}
}
if labelsChanged {
oldServices, err := selectorCache.GetPodServiceMemberships(serviceLister, oldPod)
if err != nil {
utilruntime.HandleError(fmt.Errorf("Unable to get pod %s/%s's service memberships: %v", newPod.Namespace, newPod.Name, err))
}
services = determineNeededServiceUpdates(oldServices, services, podChanged)
}
return services
}
// GetPodFromDeleteAction returns a pointer to a pod if one can be derived from
// obj (could be a *v1.Pod, or a DeletionFinalStateUnknown marker item).
func GetPodFromDeleteAction(obj interface{}) *v1.Pod {
if pod, ok := obj.(*v1.Pod); ok {
// Enqueue all the services that the pod used to be a member of.
// This is the same thing we do when we add a pod.
return pod
}
// If we reached here it means the pod was deleted but its final state is unrecorded.
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj))
return nil
}
pod, ok := tombstone.Obj.(*v1.Pod)
if !ok {
utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a Pod: %#v", obj))
return nil
}
return pod
}
func hostNameAndDomainAreEqual(pod1, pod2 *v1.Pod) bool {
return pod1.Spec.Hostname == pod2.Spec.Hostname &&
pod1.Spec.Subdomain == pod2.Spec.Subdomain
}
func determineNeededServiceUpdates(oldServices, services sets.String, podChanged bool) sets.String {
if podChanged {
// if the labels and pod changed, all services need to be updated
services = services.Union(oldServices)
} else {
// if only the labels changed, services not common to both the new
// and old service set (the disjuntive union) need to be updated
services = services.Difference(oldServices).Union(oldServices.Difference(services))
}
return services
}
// portsInOrder helps sort endpoint ports in a consistent way for hashing.
type portsInOrder []discovery.EndpointPort
func (sl portsInOrder) Len() int { return len(sl) }
func (sl portsInOrder) Swap(i, j int) { sl[i], sl[j] = sl[j], sl[i] }
func (sl portsInOrder) Less(i, j int) bool {
h1 := DeepHashObjectToString(sl[i])
h2 := DeepHashObjectToString(sl[j])
return h1 < h2
}
// endpointsEqualBeyondHash returns true if endpoints have equal attributes
// but excludes equality checks that would have already been covered with
// endpoint hashing (see hashEndpoint func for more info).
func EndpointsEqualBeyondHash(ep1, ep2 *discovery.Endpoint) bool {
if stringPtrChanged(ep1.NodeName, ep1.NodeName) {
return false
}
if stringPtrChanged(ep1.Zone, ep1.Zone) {
return false
}
if boolPtrChanged(ep1.Conditions.Ready, ep2.Conditions.Ready) {
return false
}
if objectRefPtrChanged(ep1.TargetRef, ep2.TargetRef) {
return false
}
return true
}
// boolPtrChanged returns true if a set of bool pointers have different values.
func boolPtrChanged(ptr1, ptr2 *bool) bool {
if (ptr1 == nil) != (ptr2 == nil) {
return true
}
if ptr1 != nil && ptr2 != nil && *ptr1 != *ptr2 {
return true
}
return false
}
// objectRefPtrChanged returns true if a set of object ref pointers have
// different values.
func objectRefPtrChanged(ref1, ref2 *v1.ObjectReference) bool {
if (ref1 == nil) != (ref2 == nil) {
return true
}
if ref1 != nil && ref2 != nil && !apiequality.Semantic.DeepEqual(*ref1, *ref2) {
return true
}
return false
}
// stringPtrChanged returns true if a set of string pointers have different values.
func stringPtrChanged(ptr1, ptr2 *string) bool {
if (ptr1 == nil) != (ptr2 == nil) {
return true
}
if ptr1 != nil && ptr2 != nil && *ptr1 != *ptr2 {
return true
}
return false
}