-
Notifications
You must be signed in to change notification settings - Fork 411
/
watcher.go
468 lines (426 loc) · 13.6 KB
/
watcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package etcd3
import (
"context"
"errors"
"fmt"
"os"
"reflect"
"strconv"
"strings"
"sync"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/etcd3/metrics"
"k8s.io/apiserver/pkg/storage/value"
"go.etcd.io/etcd/clientv3"
"k8s.io/klog/v2"
)
const (
// We have set a buffer in order to reduce times of context switches.
incomingBufSize = 100
outgoingBufSize = 100
)
// fatalOnDecodeError is used during testing to panic the server if watcher encounters a decoding error
var fatalOnDecodeError = false
// errTestingDecode is the only error that testingDeferOnDecodeError catches during a panic
var errTestingDecode = errors.New("sentinel error only used during testing to indicate watch decoding error")
// testingDeferOnDecodeError is used during testing to recover from a panic caused by errTestingDecode, all other values continue to panic
func testingDeferOnDecodeError() {
if r := recover(); r != nil && r != errTestingDecode {
panic(r)
}
}
func init() {
// check to see if we are running in a test environment
TestOnlySetFatalOnDecodeError(true)
fatalOnDecodeError, _ = strconv.ParseBool(os.Getenv("KUBE_PANIC_WATCH_DECODE_ERROR"))
}
// TestOnlySetFatalOnDecodeError should only be used for cases where decode errors are expected and need to be tested. e.g. conversion webhooks.
func TestOnlySetFatalOnDecodeError(b bool) {
fatalOnDecodeError = b
}
type watcher struct {
client *clientv3.Client
codec runtime.Codec
newFunc func() runtime.Object
objectType string
versioner storage.Versioner
transformer value.Transformer
}
// watchChan implements watch.Interface.
type watchChan struct {
watcher *watcher
key string
initialRev int64
recursive bool
progressNotify bool
internalPred storage.SelectionPredicate
ctx context.Context
cancel context.CancelFunc
incomingEventChan chan *event
resultChan chan watch.Event
errChan chan error
}
func newWatcher(client *clientv3.Client, codec runtime.Codec, newFunc func() runtime.Object, versioner storage.Versioner, transformer value.Transformer) *watcher {
res := &watcher{
client: client,
codec: codec,
newFunc: newFunc,
versioner: versioner,
transformer: transformer,
}
if newFunc == nil {
res.objectType = "<unknown>"
} else {
res.objectType = reflect.TypeOf(newFunc()).String()
}
return res
}
// Watch watches on a key and returns a watch.Interface that transfers relevant notifications.
// If rev is zero, it will return the existing object(s) and then start watching from
// the maximum revision+1 from returned objects.
// If rev is non-zero, it will watch events happened after given revision.
// If recursive is false, it watches on given key.
// If recursive is true, it watches any children and directories under the key, excluding the root key itself.
// pred must be non-nil. Only if pred matches the change, it will be returned.
func (w *watcher) Watch(ctx context.Context, key string, rev int64, recursive, progressNotify bool, pred storage.SelectionPredicate) (watch.Interface, error) {
if recursive && !strings.HasSuffix(key, "/") {
key += "/"
}
wc := w.createWatchChan(ctx, key, rev, recursive, progressNotify, pred)
go wc.run()
return wc, nil
}
func (w *watcher) createWatchChan(ctx context.Context, key string, rev int64, recursive, progressNotify bool, pred storage.SelectionPredicate) *watchChan {
wc := &watchChan{
watcher: w,
key: key,
initialRev: rev,
recursive: recursive,
progressNotify: progressNotify,
internalPred: pred,
incomingEventChan: make(chan *event, incomingBufSize),
resultChan: make(chan watch.Event, outgoingBufSize),
errChan: make(chan error, 1),
}
if pred.Empty() {
// The filter doesn't filter out any object.
wc.internalPred = storage.Everything
}
// The etcd server waits until it cannot find a leader for 3 election
// timeouts to cancel existing streams. 3 is currently a hard coded
// constant. The election timeout defaults to 1000ms. If the cluster is
// healthy, when the leader is stopped, the leadership transfer should be
// smooth. (leader transfers its leadership before stopping). If leader is
// hard killed, other servers will take an election timeout to realize
// leader lost and start campaign.
wc.ctx, wc.cancel = context.WithCancel(clientv3.WithRequireLeader(ctx))
return wc
}
func (wc *watchChan) run() {
watchClosedCh := make(chan struct{})
go wc.startWatching(watchClosedCh)
var resultChanWG sync.WaitGroup
resultChanWG.Add(1)
go wc.processEvent(&resultChanWG)
select {
case err := <-wc.errChan:
if err == context.Canceled {
break
}
errResult := transformErrorToEvent(err)
if errResult != nil {
// error result is guaranteed to be received by user before closing ResultChan.
select {
case wc.resultChan <- *errResult:
case <-wc.ctx.Done(): // user has given up all results
}
}
case <-watchClosedCh:
case <-wc.ctx.Done(): // user cancel
}
// We use wc.ctx to reap all goroutines. Under whatever condition, we should stop them all.
// It's fine to double cancel.
wc.cancel()
// we need to wait until resultChan wouldn't be used anymore
resultChanWG.Wait()
close(wc.resultChan)
}
func (wc *watchChan) Stop() {
wc.cancel()
}
func (wc *watchChan) ResultChan() <-chan watch.Event {
return wc.resultChan
}
// sync tries to retrieve existing data and send them to process.
// The revision to watch will be set to the revision in response.
// All events sent will have isCreated=true
func (wc *watchChan) sync() error {
opts := []clientv3.OpOption{}
if wc.recursive {
opts = append(opts, clientv3.WithPrefix())
}
getResp, err := wc.watcher.client.Get(wc.ctx, wc.key, opts...)
if err != nil {
return err
}
wc.initialRev = getResp.Header.Revision
for _, kv := range getResp.Kvs {
wc.sendEvent(parseKV(kv))
}
return nil
}
// logWatchChannelErr checks whether the error is about mvcc revision compaction which is regarded as warning
func logWatchChannelErr(err error) {
if !strings.Contains(err.Error(), "mvcc: required revision has been compacted") {
klog.Errorf("watch chan error: %v", err)
} else {
klog.Warningf("watch chan error: %v", err)
}
}
// startWatching does:
// - get current objects if initialRev=0; set initialRev to current rev
// - watch on given key and send events to process.
func (wc *watchChan) startWatching(watchClosedCh chan struct{}) {
if wc.initialRev == 0 {
if err := wc.sync(); err != nil {
klog.Errorf("failed to sync with latest state: %v", err)
wc.sendError(err)
return
}
}
opts := []clientv3.OpOption{clientv3.WithRev(wc.initialRev + 1), clientv3.WithPrevKV()}
if wc.recursive {
opts = append(opts, clientv3.WithPrefix())
}
if wc.progressNotify {
opts = append(opts, clientv3.WithProgressNotify())
}
wch := wc.watcher.client.Watch(wc.ctx, wc.key, opts...)
for wres := range wch {
if wres.Err() != nil {
err := wres.Err()
// If there is an error on server (e.g. compaction), the channel will return it before closed.
logWatchChannelErr(err)
wc.sendError(err)
return
}
if wres.IsProgressNotify() {
wc.sendEvent(progressNotifyEvent(wres.Header.GetRevision()))
metrics.RecordEtcdBookmark(wc.watcher.objectType)
continue
}
for _, e := range wres.Events {
parsedEvent, err := parseEvent(e)
if err != nil {
logWatchChannelErr(err)
wc.sendError(err)
return
}
wc.sendEvent(parsedEvent)
}
}
// When we come to this point, it's only possible that client side ends the watch.
// e.g. cancel the context, close the client.
// If this watch chan is broken and context isn't cancelled, other goroutines will still hang.
// We should notify the main thread that this goroutine has exited.
close(watchClosedCh)
}
// processEvent processes events from etcd watcher and sends results to resultChan.
func (wc *watchChan) processEvent(wg *sync.WaitGroup) {
defer wg.Done()
for {
select {
case e := <-wc.incomingEventChan:
res := wc.transform(e)
if res == nil {
continue
}
if len(wc.resultChan) == outgoingBufSize {
klog.V(3).InfoS("Fast watcher, slow processing. Probably caused by slow dispatching events to watchers", "outgoingEvents", outgoingBufSize)
}
// If user couldn't receive results fast enough, we also block incoming events from watcher.
// Because storing events in local will cause more memory usage.
// The worst case would be closing the fast watcher.
select {
case wc.resultChan <- *res:
case <-wc.ctx.Done():
return
}
case <-wc.ctx.Done():
return
}
}
}
func (wc *watchChan) filter(obj runtime.Object) bool {
if wc.internalPred.Empty() {
return true
}
matched, err := wc.internalPred.Matches(obj)
return err == nil && matched
}
func (wc *watchChan) acceptAll() bool {
return wc.internalPred.Empty()
}
// transform transforms an event into a result for user if not filtered.
func (wc *watchChan) transform(e *event) (res *watch.Event) {
curObj, oldObj, err := wc.prepareObjs(e)
if err != nil {
klog.Errorf("failed to prepare current and previous objects: %v", err)
wc.sendError(err)
return nil
}
switch {
case e.isProgressNotify:
if wc.watcher.newFunc == nil {
return nil
}
object := wc.watcher.newFunc()
if err := wc.watcher.versioner.UpdateObject(object, uint64(e.rev)); err != nil {
klog.Errorf("failed to propagate object version: %v", err)
return nil
}
res = &watch.Event{
Type: watch.Bookmark,
Object: object,
}
case e.isDeleted:
if !wc.filter(oldObj) {
return nil
}
res = &watch.Event{
Type: watch.Deleted,
Object: oldObj,
}
case e.isCreated:
if !wc.filter(curObj) {
return nil
}
res = &watch.Event{
Type: watch.Added,
Object: curObj,
}
default:
if wc.acceptAll() {
res = &watch.Event{
Type: watch.Modified,
Object: curObj,
}
return res
}
curObjPasses := wc.filter(curObj)
oldObjPasses := wc.filter(oldObj)
switch {
case curObjPasses && oldObjPasses:
res = &watch.Event{
Type: watch.Modified,
Object: curObj,
}
case curObjPasses && !oldObjPasses:
res = &watch.Event{
Type: watch.Added,
Object: curObj,
}
case !curObjPasses && oldObjPasses:
res = &watch.Event{
Type: watch.Deleted,
Object: oldObj,
}
}
}
return res
}
func transformErrorToEvent(err error) *watch.Event {
err = interpretWatchError(err)
if _, ok := err.(apierrors.APIStatus); !ok {
err = apierrors.NewInternalError(err)
}
status := err.(apierrors.APIStatus).Status()
return &watch.Event{
Type: watch.Error,
Object: &status,
}
}
func (wc *watchChan) sendError(err error) {
select {
case wc.errChan <- err:
case <-wc.ctx.Done():
}
}
func (wc *watchChan) sendEvent(e *event) {
if len(wc.incomingEventChan) == incomingBufSize {
klog.V(3).InfoS("Fast watcher, slow processing. Probably caused by slow decoding, user not receiving fast, or other processing logic", "incomingEvents", incomingBufSize)
}
select {
case wc.incomingEventChan <- e:
case <-wc.ctx.Done():
}
}
func (wc *watchChan) prepareObjs(e *event) (curObj runtime.Object, oldObj runtime.Object, err error) {
if e.isProgressNotify {
// progressNotify events doesn't contain neither current nor previous object version,
return nil, nil, nil
}
if !e.isDeleted {
data, _, err := wc.watcher.transformer.TransformFromStorage(e.value, authenticatedDataString(e.key))
if err != nil {
return nil, nil, err
}
curObj, err = decodeObj(wc.watcher.codec, wc.watcher.versioner, data, e.rev)
if err != nil {
return nil, nil, err
}
}
// We need to decode prevValue, only if this is deletion event or
// the underlying filter doesn't accept all objects (otherwise we
// know that the filter for previous object will return true and
// we need the object only to compute whether it was filtered out
// before).
if len(e.prevValue) > 0 && (e.isDeleted || !wc.acceptAll()) {
data, _, err := wc.watcher.transformer.TransformFromStorage(e.prevValue, authenticatedDataString(e.key))
if err != nil {
return nil, nil, err
}
// Note that this sends the *old* object with the etcd revision for the time at
// which it gets deleted.
oldObj, err = decodeObj(wc.watcher.codec, wc.watcher.versioner, data, e.rev)
if err != nil {
return nil, nil, err
}
}
return curObj, oldObj, nil
}
func decodeObj(codec runtime.Codec, versioner storage.Versioner, data []byte, rev int64) (_ runtime.Object, err error) {
obj, err := runtime.Decode(codec, []byte(data))
if err != nil {
if fatalOnDecodeError {
// catch watch decode error iff we caused it on
// purpose during a unit test
defer testingDeferOnDecodeError()
// we are running in a test environment and thus an
// error here is due to a coder mistake if the defer
// does not catch it
panic(err)
}
return nil, err
}
// ensure resource version is set on the object we load from etcd
if err := versioner.UpdateObject(obj, uint64(rev)); err != nil {
return nil, fmt.Errorf("failure to version api object (%d) %#v: %v", rev, obj, err)
}
return obj, nil
}