forked from kubernetes/kubernetes
-
Notifications
You must be signed in to change notification settings - Fork 4
/
endpoints.go
467 lines (400 loc) · 17.8 KB
/
endpoints.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package proxy
import (
"net"
"strconv"
"sync"
"time"
"k8s.io/client-go/tools/events"
"k8s.io/klog/v2"
"github.com/sanposhiho/kubernetes/pkg/proxy/metrics"
proxyutil "github.com/sanposhiho/kubernetes/pkg/proxy/util"
v1 "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
)
var supportedEndpointSliceAddressTypes = sets.New[string](
string(discovery.AddressTypeIPv4),
string(discovery.AddressTypeIPv6),
)
// BaseEndpointInfo contains base information that defines an endpoint.
// This could be used directly by proxier while processing endpoints,
// or can be used for constructing a more specific EndpointInfo struct
// defined by the proxier if needed.
type BaseEndpointInfo struct {
Endpoint string // TODO: should be an endpointString type
// IsLocal indicates whether the endpoint is running in same host as kube-proxy.
IsLocal bool
// ZoneHints represent the zone hints for the endpoint. This is based on
// endpoint.hints.forZones[*].name in the EndpointSlice API.
ZoneHints sets.Set[string]
// Ready indicates whether this endpoint is ready and NOT terminating.
// For pods, this is true if a pod has a ready status and a nil deletion timestamp.
// This is only set when watching EndpointSlices. If using Endpoints, this is always
// true since only ready endpoints are read from Endpoints.
// TODO: Ready can be inferred from Serving and Terminating below when enabled by default.
Ready bool
// Serving indiciates whether this endpoint is ready regardless of its terminating state.
// For pods this is true if it has a ready status regardless of its deletion timestamp.
// This is only set when watching EndpointSlices. If using Endpoints, this is always
// true since only ready endpoints are read from Endpoints.
Serving bool
// Terminating indicates whether this endpoint is terminating.
// For pods this is true if it has a non-nil deletion timestamp.
// This is only set when watching EndpointSlices. If using Endpoints, this is always
// false since terminating endpoints are always excluded from Endpoints.
Terminating bool
// NodeName is the name of the node this endpoint belongs to
NodeName string
// Zone is the name of the zone this endpoint belongs to
Zone string
}
var _ Endpoint = &BaseEndpointInfo{}
// String is part of proxy.Endpoint interface.
func (info *BaseEndpointInfo) String() string {
return info.Endpoint
}
// GetIsLocal is part of proxy.Endpoint interface.
func (info *BaseEndpointInfo) GetIsLocal() bool {
return info.IsLocal
}
// IsReady returns true if an endpoint is ready and not terminating.
func (info *BaseEndpointInfo) IsReady() bool {
return info.Ready
}
// IsServing returns true if an endpoint is ready, regardless of if the
// endpoint is terminating.
func (info *BaseEndpointInfo) IsServing() bool {
return info.Serving
}
// IsTerminating retruns true if an endpoint is terminating. For pods,
// that is any pod with a deletion timestamp.
func (info *BaseEndpointInfo) IsTerminating() bool {
return info.Terminating
}
// GetZoneHints returns the zone hint for the endpoint.
func (info *BaseEndpointInfo) GetZoneHints() sets.Set[string] {
return info.ZoneHints
}
// IP returns just the IP part of the endpoint, it's a part of proxy.Endpoint interface.
func (info *BaseEndpointInfo) IP() string {
return proxyutil.IPPart(info.Endpoint)
}
// Port returns just the Port part of the endpoint.
func (info *BaseEndpointInfo) Port() (int, error) {
return proxyutil.PortPart(info.Endpoint)
}
// Equal is part of proxy.Endpoint interface.
func (info *BaseEndpointInfo) Equal(other Endpoint) bool {
return info.String() == other.String() &&
info.GetIsLocal() == other.GetIsLocal() &&
info.IsReady() == other.IsReady()
}
// GetNodeName returns the NodeName for this endpoint.
func (info *BaseEndpointInfo) GetNodeName() string {
return info.NodeName
}
// GetZone returns the Zone for this endpoint.
func (info *BaseEndpointInfo) GetZone() string {
return info.Zone
}
func newBaseEndpointInfo(IP, nodeName, zone string, port int, isLocal bool,
ready, serving, terminating bool, zoneHints sets.Set[string]) *BaseEndpointInfo {
return &BaseEndpointInfo{
Endpoint: net.JoinHostPort(IP, strconv.Itoa(port)),
IsLocal: isLocal,
Ready: ready,
Serving: serving,
Terminating: terminating,
ZoneHints: zoneHints,
NodeName: nodeName,
Zone: zone,
}
}
type makeEndpointFunc func(info *BaseEndpointInfo, svcPortName *ServicePortName) Endpoint
// This handler is invoked by the apply function on every change. This function should not modify the
// EndpointsMap's but just use the changes for any Proxier specific cleanup.
type processEndpointsMapChangeFunc func(oldEndpointsMap, newEndpointsMap EndpointsMap)
// EndpointChangeTracker carries state about uncommitted changes to an arbitrary number of
// Endpoints, keyed by their namespace and name.
type EndpointChangeTracker struct {
// lock protects lastChangeTriggerTimes
lock sync.Mutex
processEndpointsMapChange processEndpointsMapChangeFunc
// endpointSliceCache holds a simplified version of endpoint slices.
endpointSliceCache *EndpointSliceCache
// Map from the Endpoints namespaced-name to the times of the triggers that caused the endpoints
// object to change. Used to calculate the network-programming-latency.
lastChangeTriggerTimes map[types.NamespacedName][]time.Time
// record the time when the endpointChangeTracker was created so we can ignore the endpoints
// that were generated before, because we can't estimate the network-programming-latency on those.
// This is specially problematic on restarts, because we process all the endpoints that may have been
// created hours or days before.
trackerStartTime time.Time
}
// NewEndpointChangeTracker initializes an EndpointsChangeMap
func NewEndpointChangeTracker(hostname string, makeEndpointInfo makeEndpointFunc, ipFamily v1.IPFamily, recorder events.EventRecorder, processEndpointsMapChange processEndpointsMapChangeFunc) *EndpointChangeTracker {
return &EndpointChangeTracker{
lastChangeTriggerTimes: make(map[types.NamespacedName][]time.Time),
trackerStartTime: time.Now(),
processEndpointsMapChange: processEndpointsMapChange,
endpointSliceCache: NewEndpointSliceCache(hostname, ipFamily, recorder, makeEndpointInfo),
}
}
// EndpointSliceUpdate updates given service's endpoints change map based on the <previous, current> endpoints pair.
// It returns true if items changed, otherwise return false. Will add/update/delete items of EndpointsChangeMap.
// If removeSlice is true, slice will be removed, otherwise it will be added or updated.
func (ect *EndpointChangeTracker) EndpointSliceUpdate(endpointSlice *discovery.EndpointSlice, removeSlice bool) bool {
if !supportedEndpointSliceAddressTypes.Has(string(endpointSlice.AddressType)) {
klog.V(4).InfoS("EndpointSlice address type not supported by kube-proxy", "addressType", endpointSlice.AddressType)
return false
}
// This should never happen
if endpointSlice == nil {
klog.ErrorS(nil, "Nil endpointSlice passed to EndpointSliceUpdate")
return false
}
namespacedName, _, err := endpointSliceCacheKeys(endpointSlice)
if err != nil {
klog.InfoS("Error getting endpoint slice cache keys", "err", err)
return false
}
metrics.EndpointChangesTotal.Inc()
ect.lock.Lock()
defer ect.lock.Unlock()
changeNeeded := ect.endpointSliceCache.updatePending(endpointSlice, removeSlice)
if changeNeeded {
metrics.EndpointChangesPending.Inc()
// In case of Endpoints deletion, the LastChangeTriggerTime annotation is
// by-definition coming from the time of last update, which is not what
// we want to measure. So we simply ignore it in this cases.
// TODO(wojtek-t, robscott): Address the problem for EndpointSlice deletion
// when other EndpointSlice for that service still exist.
if removeSlice {
delete(ect.lastChangeTriggerTimes, namespacedName)
} else if t := getLastChangeTriggerTime(endpointSlice.Annotations); !t.IsZero() && t.After(ect.trackerStartTime) {
ect.lastChangeTriggerTimes[namespacedName] =
append(ect.lastChangeTriggerTimes[namespacedName], t)
}
}
return changeNeeded
}
// PendingChanges returns a set whose keys are the names of the services whose endpoints
// have changed since the last time ect was used to update an EndpointsMap. (You must call
// this _before_ calling em.Update(ect).)
func (ect *EndpointChangeTracker) PendingChanges() sets.Set[string] {
return ect.endpointSliceCache.pendingChanges()
}
// checkoutChanges returns a list of pending endpointsChanges and marks them as
// applied.
func (ect *EndpointChangeTracker) checkoutChanges() []*endpointsChange {
metrics.EndpointChangesPending.Set(0)
return ect.endpointSliceCache.checkoutChanges()
}
// checkoutTriggerTimes applies the locally cached trigger times to a map of
// trigger times that have been passed in and empties the local cache.
func (ect *EndpointChangeTracker) checkoutTriggerTimes(lastChangeTriggerTimes *map[types.NamespacedName][]time.Time) {
ect.lock.Lock()
defer ect.lock.Unlock()
for k, v := range ect.lastChangeTriggerTimes {
prev, ok := (*lastChangeTriggerTimes)[k]
if !ok {
(*lastChangeTriggerTimes)[k] = v
} else {
(*lastChangeTriggerTimes)[k] = append(prev, v...)
}
}
ect.lastChangeTriggerTimes = make(map[types.NamespacedName][]time.Time)
}
// getLastChangeTriggerTime returns the time.Time value of the
// EndpointsLastChangeTriggerTime annotation stored in the given endpoints
// object or the "zero" time if the annotation wasn't set or was set
// incorrectly.
func getLastChangeTriggerTime(annotations map[string]string) time.Time {
// TODO(#81360): ignore case when Endpoint is deleted.
if _, ok := annotations[v1.EndpointsLastChangeTriggerTime]; !ok {
// It's possible that the Endpoints object won't have the
// EndpointsLastChangeTriggerTime annotation set. In that case return
// the 'zero value', which is ignored in the upstream code.
return time.Time{}
}
val, err := time.Parse(time.RFC3339Nano, annotations[v1.EndpointsLastChangeTriggerTime])
if err != nil {
klog.ErrorS(err, "Error while parsing EndpointsLastChangeTriggerTimeAnnotation",
"value", annotations[v1.EndpointsLastChangeTriggerTime])
// In case of error val = time.Zero, which is ignored in the upstream code.
}
return val
}
// endpointsChange contains all changes to endpoints that happened since proxy
// rules were synced. For a single object, changes are accumulated, i.e.
// previous is state from before applying the changes, current is state after
// applying the changes.
type endpointsChange struct {
previous EndpointsMap
current EndpointsMap
}
// UpdateEndpointMapResult is the updated results after applying endpoints changes.
type UpdateEndpointMapResult struct {
// DeletedUDPEndpoints identifies UDP endpoints that have just been deleted.
// Existing conntrack NAT entries pointing to these endpoints must be deleted to
// ensure that no further traffic for the Service gets delivered to them.
DeletedUDPEndpoints []ServiceEndpoint
// NewlyActiveUDPServices identifies UDP Services that have just gone from 0 to
// non-0 endpoints. Existing conntrack entries caching the fact that these
// services are black holes must be deleted to ensure that traffic can immediately
// begin flowing to the new endpoints.
NewlyActiveUDPServices []ServicePortName
// List of the trigger times for all endpoints objects that changed. It's used to export the
// network programming latency.
// NOTE(oxddr): this can be simplified to []time.Time if memory consumption becomes an issue.
LastChangeTriggerTimes map[types.NamespacedName][]time.Time
}
// Update updates endpointsMap base on the given changes.
func (em EndpointsMap) Update(changes *EndpointChangeTracker) (result UpdateEndpointMapResult) {
result.DeletedUDPEndpoints = make([]ServiceEndpoint, 0)
result.NewlyActiveUDPServices = make([]ServicePortName, 0)
result.LastChangeTriggerTimes = make(map[types.NamespacedName][]time.Time)
em.apply(changes, &result.DeletedUDPEndpoints, &result.NewlyActiveUDPServices, &result.LastChangeTriggerTimes)
return result
}
// EndpointsMap maps a service name to a list of all its Endpoints.
type EndpointsMap map[ServicePortName][]Endpoint
// apply the changes to EndpointsMap, update the passed-in stale-conntrack-entry arrays,
// and clear the changes map. In addition it returns (via argument) and resets the
// lastChangeTriggerTimes for all endpoints that were changed and will result in syncing
// the proxy rules. apply triggers processEndpointsMapChange on every change.
func (em EndpointsMap) apply(ect *EndpointChangeTracker, deletedUDPEndpoints *[]ServiceEndpoint,
newlyActiveUDPServices *[]ServicePortName, lastChangeTriggerTimes *map[types.NamespacedName][]time.Time) {
if ect == nil {
return
}
changes := ect.checkoutChanges()
for _, change := range changes {
if ect.processEndpointsMapChange != nil {
ect.processEndpointsMapChange(change.previous, change.current)
}
em.unmerge(change.previous)
em.merge(change.current)
detectStaleConntrackEntries(change.previous, change.current, deletedUDPEndpoints, newlyActiveUDPServices)
}
ect.checkoutTriggerTimes(lastChangeTriggerTimes)
}
// Merge ensures that the current EndpointsMap contains all <service, endpoints> pairs from the EndpointsMap passed in.
func (em EndpointsMap) merge(other EndpointsMap) {
for svcPortName := range other {
em[svcPortName] = other[svcPortName]
}
}
// Unmerge removes the <service, endpoints> pairs from the current EndpointsMap which are contained in the EndpointsMap passed in.
func (em EndpointsMap) unmerge(other EndpointsMap) {
for svcPortName := range other {
delete(em, svcPortName)
}
}
// getLocalEndpointIPs returns endpoints IPs if given endpoint is local - local means the endpoint is running in same host as kube-proxy.
func (em EndpointsMap) getLocalReadyEndpointIPs() map[types.NamespacedName]sets.Set[string] {
localIPs := make(map[types.NamespacedName]sets.Set[string])
for svcPortName, epList := range em {
for _, ep := range epList {
// Only add ready endpoints for health checking. Terminating endpoints may still serve traffic
// but the health check signal should fail if there are only terminating endpoints on a node.
if !ep.IsReady() {
continue
}
if ep.GetIsLocal() {
nsn := svcPortName.NamespacedName
if localIPs[nsn] == nil {
localIPs[nsn] = sets.New[string]()
}
localIPs[nsn].Insert(ep.IP())
}
}
}
return localIPs
}
// LocalReadyEndpoints returns a map of Service names to the number of local ready
// endpoints for that service.
func (em EndpointsMap) LocalReadyEndpoints() map[types.NamespacedName]int {
// TODO: If this will appear to be computationally expensive, consider
// computing this incrementally similarly to endpointsMap.
// (Note that we need to call getLocalEndpointIPs first to squash the data by IP,
// because the EndpointsMap is sorted by IP+port, not just IP, and we want to
// consider a Service pointing to 10.0.0.1:80 and 10.0.0.1:443 to have 1 endpoint,
// not 2.)
eps := make(map[types.NamespacedName]int)
localIPs := em.getLocalReadyEndpointIPs()
for nsn, ips := range localIPs {
eps[nsn] = len(ips)
}
return eps
}
// detectStaleConntrackEntries detects services that may be associated with stale conntrack entries.
// (See UpdateEndpointMapResult.DeletedUDPEndpoints and .NewlyActiveUDPServices.)
func detectStaleConntrackEntries(oldEndpointsMap, newEndpointsMap EndpointsMap, deletedUDPEndpoints *[]ServiceEndpoint, newlyActiveUDPServices *[]ServicePortName) {
// Find the UDP endpoints that we were sending traffic to in oldEndpointsMap, but
// are no longer sending to newEndpointsMap. The proxier should make sure that
// conntrack does not accidentally route any new connections to them.
for svcPortName, epList := range oldEndpointsMap {
if svcPortName.Protocol != v1.ProtocolUDP {
continue
}
for _, ep := range epList {
// If the old endpoint wasn't Ready then there can't be stale
// conntrack entries since there was no traffic sent to it.
if !ep.IsReady() {
continue
}
deleted := true
// Check if the endpoint has changed, including if it went from
// ready to not ready. If it did change stale entries for the old
// endpoint have to be cleared.
for i := range newEndpointsMap[svcPortName] {
if newEndpointsMap[svcPortName][i].Equal(ep) {
deleted = false
break
}
}
if deleted {
klog.V(4).InfoS("Deleted endpoint may have stale conntrack entries", "portName", svcPortName, "endpoint", ep)
*deletedUDPEndpoints = append(*deletedUDPEndpoints, ServiceEndpoint{Endpoint: ep.String(), ServicePortName: svcPortName})
}
}
}
// Detect services that have gone from 0 to non-0 ready endpoints. If there were
// previously 0 endpoints, but someone tried to connect to it, then a conntrack
// entry may have been created blackholing traffic to that IP, which should be
// deleted now.
for svcPortName, epList := range newEndpointsMap {
if svcPortName.Protocol != v1.ProtocolUDP {
continue
}
epReady := 0
for _, ep := range epList {
if ep.IsReady() {
epReady++
}
}
oldEpReady := 0
for _, ep := range oldEndpointsMap[svcPortName] {
if ep.IsReady() {
oldEpReady++
}
}
if epReady > 0 && oldEpReady == 0 {
*newlyActiveUDPServices = append(*newlyActiveUDPServices, svcPortName)
}
}
}