Skip to content

Commit

Permalink
Don't prematurely close reflectors in case of slow initialization in …
Browse files Browse the repository at this point in the history
…watch based manager
  • Loading branch information
wojtek-t authored and jingxu97 committed Aug 26, 2021
1 parent cb2ee55 commit 04ff2a8
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 2 deletions.
11 changes: 9 additions & 2 deletions pkg/kubelet/util/manager/watch_based_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,11 @@ func (i *objectCacheItem) setImmutable() {
func (i *objectCacheItem) stopIfIdle(now time.Time, maxIdleTime time.Duration) bool {
i.lock.Lock()
defer i.lock.Unlock()
if !i.stopped && now.After(i.lastAccessTime.Add(maxIdleTime)) {
// Ensure that we don't try to stop not yet initialized reflector.
// In case of overloaded kube-apiserver, if the list request is
// already being processed, all the work would lost and would have
// to be retried.
if !i.stopped && i.store.hasSynced() && now.After(i.lastAccessTime.Add(maxIdleTime)) {
return i.stopThreadUnsafe()
}
return false
Expand Down Expand Up @@ -287,11 +291,14 @@ func (c *objectCache) Get(namespace, name string) (runtime.Object, error) {
if !exists {
return nil, fmt.Errorf("object %q/%q not registered", namespace, name)
}
// Record last access time independently if it succeeded or not.
// This protects from premature (racy) reflector closure.
item.setLastAccessTime(c.clock.Now())

item.restartReflectorIfNeeded()
if err := wait.PollImmediate(10*time.Millisecond, time.Second, item.hasSynced); err != nil {
return nil, fmt.Errorf("failed to sync %s cache: %v", c.groupResource.String(), err)
}
item.setLastAccessTime(c.clock.Now())
obj, exists, err := item.store.GetByKey(c.key(namespace, name))
if err != nil {
return nil, err
Expand Down
91 changes: 91 additions & 0 deletions pkg/kubelet/util/manager/watch_based_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,3 +405,94 @@ func TestMaxIdleTimeStopsTheReflector(t *testing.T) {
// Reflector should be running when the get function is called periodically.
assert.True(t, reflectorRunning())
}

func TestReflectorNotStopedOnSlowInitialization(t *testing.T) {
secret := &v1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "name",
Namespace: "ns",
ResourceVersion: "200",
},
}

fakeClock := clock.NewFakeClock(time.Now())

fakeClient := &fake.Clientset{}
listReactor := func(a core.Action) (bool, runtime.Object, error) {
<-fakeClock.After(120 * time.Second)

result := &v1.SecretList{
ListMeta: metav1.ListMeta{
ResourceVersion: "200",
},
Items: []v1.Secret{*secret},
}

return true, result, nil
}

fakeClient.AddReactor("list", "secrets", listReactor)
fakeWatch := watch.NewFake()
fakeClient.AddWatchReactor("secrets", core.DefaultWatchReactor(fakeWatch, nil))
store := newSecretCache(fakeClient, fakeClock, time.Minute)

key := objectKey{namespace: "ns", name: "name"}
itemExists := func() (bool, error) {
store.lock.Lock()
defer store.lock.Unlock()
_, ok := store.items[key]
return ok, nil
}

reflectorRunning := func() bool {
store.lock.Lock()
defer store.lock.Unlock()
item := store.items[key]

item.lock.Lock()
defer item.lock.Unlock()
return !item.stopped
}

reflectorInitialized := func() (bool, error) {
store.lock.Lock()
defer store.lock.Unlock()
item := store.items[key]

item.lock.Lock()
defer item.lock.Unlock()
return item.store.hasSynced(), nil
}

// AddReference should start reflector.
store.AddReference("ns", "name")
if err := wait.Poll(10*time.Millisecond, 10*time.Second, itemExists); err != nil {
t.Errorf("item wasn't added to cache")
}

fakeClock.Step(90 * time.Second)
store.startRecycleIdleWatch()

// Reflector didn't yet initialize, so it shouldn't be stopped.
// However, Get should still be failing.
assert.True(t, reflectorRunning())
initialized, _ := reflectorInitialized()
assert.False(t, initialized)
_, err := store.Get("ns", "name")
if err == nil || !strings.Contains(err.Error(), "failed to sync") {
t.Errorf("Expected failed to sync error, got: %v", err)
}

// Initialization should successfully finish.
fakeClock.Step(30 * time.Second)
if err := wait.Poll(10*time.Millisecond, time.Second, reflectorInitialized); err != nil {
t.Errorf("reflector didn't iniailize correctly")
}

// recycling shouldn't stop the reflector because it was accessed within last minute.
store.startRecycleIdleWatch()
assert.True(t, reflectorRunning())

obj, _ := store.Get("ns", "name")
assert.True(t, apiequality.Semantic.DeepEqual(secret, obj))
}

0 comments on commit 04ff2a8

Please sign in to comment.