@@ -430,21 +430,23 @@ Reflector 的 `ListAndWatch()` 因 Watch 超时而周期调用 Replace() 方法
430
430
431
431
前文我们提到 DeltaFIFO 的使用场景之一是:** “你想周期处理所有的对象”** ,但对象一旦从 DeltaFIFO 中弹出,如果没有产生新的 Watch 事件,就不会对它再调用注册的回调函数。
432
432
433
- Reflector 的 ` ListAndWatch() ` 方法周期执行 DeltaFIFO 的 Resync() 方法,目的就是** 为对象产生新的 Sync 事件** ,从而有机会再次调用注册的 ` OnUpdate() ` 处理函数。因此 Resync 时,如果对象已经在 f.items,则后续由机会被弹出 ,所以不需要为它生成 Sync 事件。
433
+ Reflector 的 ` ListAndWatch() ` 方法周期执行 DeltaFIFO 的 Resync() 方法,目的就是** 为对象产生新的 Sync 事件** ,从而有机会再次调用注册的 ` OnUpdate() ` 处理函数。因此 Resync 时,如果对象已经在 f.items,则后续因有机会被弹出 ,所以不需要为它生成 Sync 事件。
434
434
435
- ** Replace() 和 Rsync() 方法会会生成 Sync 事件** 。
435
+ ** 只有 Replace() 和 Rsync() 方法会产生 Sync 事件** 。
436
436
437
437
### Pop() 方法
438
438
439
439
Pop(process PopProcessFunc)
440
440
441
- 1 . 如果 f.queue 为空,则阻塞等待;
442
- 2 . 每次返回 f.queue 第 0 个对象 id 对应的 f.items[ id] ,即队列中该对象当前所有的操作事件列表 Deltas,然后将该对象从队列删除(f.items);
443
- 3 . 返回的 Deltas 会传给 PopProcessFunc 执行,如果函数执行失败,应该再调用 AddIfNotPresent() 加会对象事件(Controler 里已实现);
441
+ 1 . 如果弹出队列 f.queue 为空,则阻塞等待;
442
+ 2 . 每次弹出队列头部的对象对应的事件列表(Deltas 类型),然后将该对象从事件列表缓存(f.items)中删除;
443
+ 3 . 调用配置的回调函数 PopProcessFunc,传入事件列表 Deltas;
444
+
445
+ 如果函数 PopProcessFunc 执行失败,应该再调用 ` AddIfNotPresent() ` 方法将 Deltas 重新加回 DeltaFIFO,这样后续可以再次被弹出处理,防止丢事件。(controler 已实现该逻辑)
444
446
445
447
### HasSyncd() 方法
446
448
447
- 创建 FIFO/DealtaFIFO 后,如果首先调用的是 ` Replace() ` 方法,则 ` f.populated ` 被设置为 ` true ` ,` f.initialPopulationCount ` 被设置为传入的对象数量。当这一批对象都被弹出完毕时( 包含弹出前被删除的对象),` HasSynced() ` 方法返回 ` true ` :
449
+ 创建 FIFO/DealtaFIFO 后,如果首先调用的是 ` Replace() ` 方法,则 ` f.populated ` 被设置为 ` true ` ,` f.initialPopulationCount ` 被设置为传入的对象数量。当这一批对象都被弹出完毕时(包含弹出前被删除的对象),` HasSynced() ` 方法返回 ` true ` :
448
450
449
451
``` go
450
452
// 来源于 k8s.io/client-go/tools/cache/fifo.go
@@ -459,17 +461,21 @@ func (f *DeltaFIFO) HasSynced() bool {
459
461
460
462
### DeltaFIFO 和 knownObjects 对象缓存的同步
461
463
462
- 1 . Reflector 从 etcd List 出对象后,调用 DeltaFIFO 的 Replace() 方法为传入的对象生成 Sync 事件,此时 knownObjects 为空 ;
463
- 2 . controller 从 DeltaFIFO 弹出对象的事件列表 Deltas,遍历 Deltas,根据 Delta 中的事件类型更新 knownObjects,从而实现 DeltaFIFO 和 knownObjects 缓存中的对象一致:
464
+ 1 . Reflector 从 etcd List 出对象后,调用 DeltaFIFO 的 Replace() 方法为各对象生成 Sync 事件,此时 knownObjects 对象缓存为空 ;
465
+ 2 . controller 从 DeltaFIFO 弹出对象事件列表 Deltas,遍历 Deltas,根据 Delta 中的事件类型更新 knownObjects,从而实现 DeltaFIFO 和 knownObjects 缓存中的对象一致:
464
466
465
- controller 每次** 启动** 时,因为 knownObjects 为空且事件类型为 Sync,所以对从 etcd 同步来的所有对象,都先调用一次 clientState 的 ** Add() 方法和注册的 OnAdd() 回调函数** 。
467
+ controller 每次** 启动** 时,因为 knownObjects 为空且事件类型为 Sync,所以会为同步来的所有对象:
468
+
469
+ 1 . 调用 knownObjects 的 ** Add() 方法** ,将它们加入到对象缓存;
470
+ 2 . 调用注册的 OnAdd() 回调函数。所以** 第一次对象同步时, controller 也会调用用户注册的 OnAdd() 回调函数** 。
466
471
467
472
``` go
468
473
// 来源于:k8s.io/client-go/tools/cache/controller.go
469
474
for _ , d := range obj.(Deltas) {
470
475
switch d.Type {
471
476
// Replace() 方法生成的 Sync 事件涉及到的对象,
472
477
case Sync, Added, Updated:
478
+ // clientState 即为 knownObjects 对象缓存
473
479
if old , exists , err := clientState.Get (d.Object ); err == nil && exists {
474
480
if err := clientState.Update (d.Object ); err != nil {
475
481
return err
@@ -490,9 +496,9 @@ func (f *DeltaFIFO) HasSynced() bool {
490
496
}
491
497
```
492
498
493
- 3 . 但是,Reflector 的 Watch 可能会出现**丢失事件**的情况(如 ListAndWatch 出错返回后,Reflector 会 Sleep 一段时间再执行它,期间 etcd 的对象变化事件丢失),这样再次 List 到的对象集合与 knownObjects 缓存中的对象集合不一致。
499
+ 3 . 但是,Reflector 的 Watch 可能会出现**丢失事件**的情况(如 ListAndWatch 出错返回后,Reflector 会 Sleep 一段时间再执行它,期间 etcd 的对象变化事件丢失),这样再次 List 到的对象集合与 knownObjects 缓存中的对象集合不一致。如何解决这个问题呢?
494
500
495
- Replace () 方法会为 knownObjects 缓存中多余的对象生成 Deleted 类型的 ` DeletedFinalStateUnknown` 事件,这样后续 controller 弹出该事件后,将对象从 knownObjects 缓存删除,从而达到两个缓存一致的目的;
501
+ 答案在于, Replace () 方法会为 knownObjects 中多余对象生成 Deleted 类型的 ` DeletedFinalStateUnknown` 事件,这样后续 controller 弹出该事件后,将对象从 knownObjects 缓存删除,从而达到两个缓存一致的目的;
496
502
497
503
4 . ListAndWatch () 方法起一个 goroutine,周期的调用 Resync () 方法将 knownObjects 中的对象更新到 DeltaFIFO ,为何要这么做呢?
498
504
0 commit comments