-
Notifications
You must be signed in to change notification settings - Fork 395
/
peer_endpoint_lease.go
364 lines (320 loc) · 12.2 KB
/
peer_endpoint_lease.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package reconcilers
import (
"context"
"fmt"
"net"
"net/http"
"path"
"strconv"
"sync"
"sync/atomic"
"time"
"k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
apirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/rest"
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/storagebackend"
storagefactory "k8s.io/apiserver/pkg/storage/storagebackend/factory"
)
const (
APIServerIdentityLabel = "apiserverIdentity"
)
type PeerAdvertiseAddress struct {
PeerAdvertiseIP string
PeerAdvertisePort string
}
type peerEndpointLeases struct {
storage storage.Interface
destroyFn func()
baseKey string
leaseTime time.Duration
}
type PeerEndpointLeaseReconciler interface {
// GetEndpoint retrieves the endpoint for a given apiserverId
GetEndpoint(serverId string) (string, error)
// UpdateLease updates the ip and port of peer servers
UpdateLease(serverId string, ip string, endpointPorts []corev1.EndpointPort) error
// RemoveEndpoints removes this apiserver's peer endpoint lease.
RemoveLease(serverId string) error
// Destroy cleans up everything on shutdown.
Destroy()
// StopReconciling turns any later ReconcileEndpoints call into a noop.
StopReconciling()
}
type peerEndpointLeaseReconciler struct {
serverLeases *peerEndpointLeases
stopReconcilingCalled atomic.Bool
}
// NewPeerEndpointLeaseReconciler creates a new peer endpoint lease reconciler
func NewPeerEndpointLeaseReconciler(config *storagebackend.ConfigForResource, baseKey string, leaseTime time.Duration) (PeerEndpointLeaseReconciler, error) {
leaseStorage, destroyFn, err := storagefactory.Create(*config, nil)
if err != nil {
return nil, fmt.Errorf("error creating storage factory: %v", err)
}
var once sync.Once
return &peerEndpointLeaseReconciler{
serverLeases: &peerEndpointLeases{
storage: leaseStorage,
destroyFn: func() { once.Do(destroyFn) },
baseKey: baseKey,
leaseTime: leaseTime,
},
}, nil
}
// PeerEndpointController is the controller manager for updating the peer endpoint leases.
// This provides a separate independent reconciliation loop for peer endpoint leases
// which ensures that the peer kube-apiservers are fetching the updated endpoint info for a given apiserver
// in the case when the peer wants to proxy the request to the given apiserver because it can not serve the
// request itself due to version mismatch.
type PeerEndpointLeaseController struct {
reconciler PeerEndpointLeaseReconciler
endpointInterval time.Duration
serverId string
// peeraddress stores the IP and port of this kube-apiserver. Used by peer kube-apiservers to
// route request to this apiserver in case of a version skew.
peeraddress string
client kubernetes.Interface
lock sync.Mutex
stopCh chan struct{} // closed by Stop()
}
func New(serverId string, peeraddress string,
reconciler PeerEndpointLeaseReconciler, endpointInterval time.Duration, client kubernetes.Interface) *PeerEndpointLeaseController {
return &PeerEndpointLeaseController{
reconciler: reconciler,
serverId: serverId,
// peeraddress stores the IP and port of this kube-apiserver. Used by peer kube-apiservers to
// route request to this apiserver in case of a version skew.
peeraddress: peeraddress,
endpointInterval: endpointInterval,
client: client,
stopCh: make(chan struct{}),
}
}
// Start begins the peer endpoint lease reconciler loop that must exist for bootstrapping
// a cluster.
func (c *PeerEndpointLeaseController) Start(stopCh <-chan struct{}) {
localStopCh := make(chan struct{})
go func() {
defer close(localStopCh)
select {
case <-stopCh: // from Start
case <-c.stopCh: // from Stop
}
}()
go c.Run(localStopCh)
}
// RunPeerEndpointReconciler periodically updates the peer endpoint leases
func (c *PeerEndpointLeaseController) Run(stopCh <-chan struct{}) {
// wait until process is ready
wait.PollImmediateUntil(100*time.Millisecond, func() (bool, error) {
var code int
c.client.CoreV1().RESTClient().Get().AbsPath("/readyz").Do(context.TODO()).StatusCode(&code)
return code == http.StatusOK, nil
}, stopCh)
wait.NonSlidingUntil(func() {
if err := c.UpdatePeerEndpointLeases(); err != nil {
runtime.HandleError(fmt.Errorf("unable to update peer endpoint leases: %v", err))
}
}, c.endpointInterval, stopCh)
}
// Stop cleans up this apiserver's peer endpoint leases.
func (c *PeerEndpointLeaseController) Stop() {
c.lock.Lock()
defer c.lock.Unlock()
select {
case <-c.stopCh:
return // only close once
default:
close(c.stopCh)
}
finishedReconciling := make(chan struct{})
go func() {
defer close(finishedReconciling)
klog.Infof("Shutting down peer endpoint lease reconciler")
// stop reconciliation
c.reconciler.StopReconciling()
// Ensure that there will be no race condition with the ReconcileEndpointLeases.
if err := c.reconciler.RemoveLease(c.serverId); err != nil {
klog.Errorf("Unable to remove peer endpoint leases: %v", err)
}
c.reconciler.Destroy()
}()
select {
case <-finishedReconciling:
// done
case <-time.After(2 * c.endpointInterval):
// don't block server shutdown forever if we can't reach etcd to remove ourselves
klog.Warning("peer_endpoint_controller's RemoveEndpoints() timed out")
}
}
// UpdatePeerEndpointLeases attempts to update the peer endpoint leases.
func (c *PeerEndpointLeaseController) UpdatePeerEndpointLeases() error {
host, port, err := net.SplitHostPort(c.peeraddress)
if err != nil {
return err
}
p, err := strconv.Atoi(port)
if err != nil {
return err
}
endpointPorts := createEndpointPortSpec(p, "https")
// Ensure that there will be no race condition with the RemoveEndpointLeases.
c.lock.Lock()
defer c.lock.Unlock()
// Refresh the TTL on our key, independently of whether any error or
// update conflict happens below. This makes sure that at least some of
// the servers will add our endpoint lease.
if err := c.reconciler.UpdateLease(c.serverId, host, endpointPorts); err != nil {
return err
}
return nil
}
// UpdateLease resets the TTL on a server IP in storage
// UpdateLease will create a new key if it doesn't exist.
// We use the first element in endpointPorts as a part of the lease's base key
// This is done to support out tests that simulate 2 apiservers running on the same ip but
// different ports
// It will also do the following if UnknownVersionInteroperabilityProxy feature is enabled
// 1. store the apiserverId as a label
// 2. store the values passed to --peer-advertise-ip and --peer-advertise-port flags to kube-apiserver as an annotation
// with value of format <ip:port>
func (r *peerEndpointLeaseReconciler) UpdateLease(serverId string, ip string, endpointPorts []corev1.EndpointPort) error {
// reconcile endpoints only if apiserver was not shutdown
if r.stopReconcilingCalled.Load() {
return nil
}
// we use the serverID as the key to avoid using the server IP, port as the key.
// note: this means that this lease doesn't enforce mutual exclusion of ip/port usage between apiserver.
key := path.Join(r.serverLeases.baseKey, serverId)
return r.serverLeases.storage.GuaranteedUpdate(apirequest.NewDefaultContext(), key, &corev1.Endpoints{}, true, nil, func(input kruntime.Object, respMeta storage.ResponseMeta) (kruntime.Object, *uint64, error) {
existing := input.(*corev1.Endpoints)
existing.Subsets = []corev1.EndpointSubset{
{
Addresses: []corev1.EndpointAddress{{IP: ip}},
Ports: endpointPorts,
},
}
// store this server's identity (serverId) as a label. This will be used by
// peers to find the IP of this server when the peer can not serve a request
// due to version skew.
if existing.Labels == nil {
existing.Labels = map[string]string{}
}
existing.Labels[APIServerIdentityLabel] = serverId
// leaseTime needs to be in seconds
leaseTime := uint64(r.serverLeases.leaseTime / time.Second)
// NB: GuaranteedUpdate does not perform the store operation unless
// something changed between load and store (not including resource
// version), meaning we can't refresh the TTL without actually
// changing a field.
existing.Generation++
klog.V(6).Infof("Resetting TTL on server IP %q listed in storage to %v", ip, leaseTime)
return existing, &leaseTime, nil
}, nil)
}
// ListLeases retrieves a list of the current server IPs from storage
func (r *peerEndpointLeaseReconciler) ListLeases() ([]string, error) {
storageOpts := storage.ListOptions{
ResourceVersion: "0",
ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan,
Predicate: storage.Everything,
Recursive: true,
}
ipInfoList, err := r.getIpInfoList(storageOpts)
if err != nil {
return nil, err
}
ipList := make([]string, 0, len(ipInfoList.Items))
for _, ip := range ipInfoList.Items {
if len(ip.Subsets) > 0 && len(ip.Subsets[0].Addresses) > 0 && len(ip.Subsets[0].Addresses[0].IP) > 0 {
ipList = append(ipList, ip.Subsets[0].Addresses[0].IP)
}
}
klog.V(6).Infof("Current server IPs listed in storage are %v", ipList)
return ipList, nil
}
// GetLease retrieves the server IP and port for a specific server id
func (r *peerEndpointLeaseReconciler) GetLease(serverId string) (string, error) {
var fullAddr string
if serverId == "" {
return "", fmt.Errorf("error getting endpoint for serverId: empty serverId")
}
storageOpts := storage.ListOptions{
ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan,
Predicate: storage.Everything,
Recursive: true,
}
ipInfoList, err := r.getIpInfoList(storageOpts)
if err != nil {
return "", err
}
for _, ip := range ipInfoList.Items {
if ip.Labels[APIServerIdentityLabel] == serverId {
if len(ip.Subsets) > 0 {
var ipStr, portStr string
if len(ip.Subsets[0].Addresses) > 0 {
if len(ip.Subsets[0].Addresses[0].IP) > 0 {
ipStr = ip.Subsets[0].Addresses[0].IP
}
}
if len(ip.Subsets[0].Ports) > 0 {
portStr = fmt.Sprint(ip.Subsets[0].Ports[0].Port)
}
fullAddr = net.JoinHostPort(ipStr, portStr)
break
}
}
}
klog.V(6).Infof("Fetched this server IP for the specified apiserverId %v, %v", serverId, fullAddr)
return fullAddr, nil
}
func (r *peerEndpointLeaseReconciler) StopReconciling() {
r.stopReconcilingCalled.Store(true)
}
// RemoveLease removes the lease on a server IP in storage
// We use the first element in endpointPorts as a part of the lease's base key
// This is done to support out tests that simulate 2 apiservers running on the same ip but
// different ports
func (r *peerEndpointLeaseReconciler) RemoveLease(serverId string) error {
key := path.Join(r.serverLeases.baseKey, serverId)
return r.serverLeases.storage.Delete(apirequest.NewDefaultContext(), key, &corev1.Endpoints{}, nil, rest.ValidateAllObjectFunc, nil)
}
func (r *peerEndpointLeaseReconciler) Destroy() {
r.serverLeases.destroyFn()
}
func (r *peerEndpointLeaseReconciler) GetEndpoint(serverId string) (string, error) {
return r.GetLease(serverId)
}
func (r *peerEndpointLeaseReconciler) getIpInfoList(storageOpts storage.ListOptions) (*corev1.EndpointsList, error) {
ipInfoList := &corev1.EndpointsList{}
if err := r.serverLeases.storage.GetList(apirequest.NewDefaultContext(), r.serverLeases.baseKey, storageOpts, ipInfoList); err != nil {
return nil, err
}
return ipInfoList, nil
}
// createEndpointPortSpec creates the endpoint ports
func createEndpointPortSpec(endpointPort int, endpointPortName string) []corev1.EndpointPort {
return []corev1.EndpointPort{{
Protocol: corev1.ProtocolTCP,
Port: int32(endpointPort),
Name: endpointPortName,
}}
}