Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid deadlock in gc resync if available resources change during sync #64235

Merged
merged 2 commits into from
Jun 5, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
113 changes: 82 additions & 31 deletions pkg/controller/garbagecollector/garbagecollector.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"k8s.io/apimachinery/pkg/types"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/discovery"
"k8s.io/client-go/dynamic"
Expand Down Expand Up @@ -170,10 +171,8 @@ func (gc *GarbageCollector) Sync(discoveryClient discovery.ServerResourcesInterf
newResources := GetDeletableResources(discoveryClient)

// This can occur if there is an internal error in GetDeletableResources.
// If the gc attempts to sync with 0 resources it will block forever.
// TODO: Implement a more complete solution for the garbage collector hanging.
if len(newResources) == 0 {
glog.V(5).Infof("no resources reported by discovery, skipping garbage collector sync")
glog.V(2).Infof("no resources reported by discovery, skipping garbage collector sync")
return
}

Expand All @@ -183,39 +182,61 @@ func (gc *GarbageCollector) Sync(discoveryClient discovery.ServerResourcesInterf
return
}

// Something has changed, time to sync.
glog.V(2).Infof("syncing garbage collector with updated resources from discovery: %v", newResources)

// Ensure workers are paused to avoid processing events before informers
// have resynced.
gc.workerLock.Lock()
defer gc.workerLock.Unlock()

// Resetting the REST mapper will also invalidate the underlying discovery
// client. This is a leaky abstraction and assumes behavior about the REST
// mapper, but we'll deal with it for now.
gc.restMapper.Reset()

// Perform the monitor resync and wait for controllers to report cache sync.
//
// NOTE: It's possible that newResources will diverge from the resources
// discovered by restMapper during the call to Reset, since they are
// distinct discovery clients invalidated at different times. For example,
// newResources may contain resources not returned in the restMapper's
// discovery call if the resources appeared in-between the calls. In that
// case, the restMapper will fail to map some of newResources until the next
// sync period.
if err := gc.resyncMonitors(newResources); err != nil {
utilruntime.HandleError(fmt.Errorf("failed to sync resource monitors: %v", err))
return
}
// TODO: WaitForCacheSync can block forever during normal operation. Could
// pass a timeout channel, but we have to consider the implications of
// un-pausing the GC with a partially synced graph builder.
if !controller.WaitForCacheSync("garbage collector", stopCh, gc.dependencyGraphBuilder.IsSynced) {
utilruntime.HandleError(fmt.Errorf("timed out waiting for dependency graph builder sync during GC sync"))
return
}
// Once we get here, we should not unpause workers until we've successfully synced
attempt := 0
wait.PollImmediateUntil(100*time.Millisecond, func() (bool, error) {
attempt++

// On a reattempt, check if available resources have changed
if attempt > 1 {
newResources = GetDeletableResources(discoveryClient)
if len(newResources) == 0 {
glog.V(2).Infof("no resources reported by discovery (attempt %d)", attempt)
return false, nil
}
}

glog.V(2).Infof("syncing garbage collector with updated resources from discovery (attempt %d): %s", attempt, printDiff(oldResources, newResources))

// Resetting the REST mapper will also invalidate the underlying discovery
// client. This is a leaky abstraction and assumes behavior about the REST
// mapper, but we'll deal with it for now.
gc.restMapper.Reset()
glog.V(4).Infof("reset restmapper")

// Perform the monitor resync and wait for controllers to report cache sync.
//
// NOTE: It's possible that newResources will diverge from the resources
// discovered by restMapper during the call to Reset, since they are
// distinct discovery clients invalidated at different times. For example,
// newResources may contain resources not returned in the restMapper's
// discovery call if the resources appeared in-between the calls. In that
// case, the restMapper will fail to map some of newResources until the next
// attempt.
if err := gc.resyncMonitors(newResources); err != nil {
utilruntime.HandleError(fmt.Errorf("failed to sync resource monitors (attempt %d): %v", attempt, err))
return false, nil
}
glog.V(4).Infof("resynced monitors")

// wait for caches to fill for a while (our sync period) before attempting to rediscover resources and retry syncing.
// this protects us from deadlocks where available resources changed and one of our informer caches will never fill.
// informers keep attempting to sync in the background, so retrying doesn't interrupt them.
// the call to resyncMonitors on the reattempt will no-op for resources that still exist.
// note that workers stay paused until we successfully resync.
if !controller.WaitForCacheSync("garbage collector", waitForStopOrTimeout(stopCh, period), gc.dependencyGraphBuilder.IsSynced) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Timeout when waiting for cache sync seems to be useful in general, consider adding a WaitForCacheSyncUntil to client-go.

utilruntime.HandleError(fmt.Errorf("timed out waiting for dependency graph builder sync during GC sync (attempt %d)", attempt))
return false, nil
}

// success, break out of the loop
return true, nil
}, stopCh)

// Finally, keep track of our new state. Do this after all preceding steps
// have succeeded to ensure we'll retry on subsequent syncs if an error
Expand All @@ -225,6 +246,36 @@ func (gc *GarbageCollector) Sync(discoveryClient discovery.ServerResourcesInterf
}, period, stopCh)
}

// printDiff returns a human-readable summary of what resources were added and removed
func printDiff(oldResources, newResources map[schema.GroupVersionResource]struct{}) string {
removed := sets.NewString()
for oldResource := range oldResources {
if _, ok := newResources[oldResource]; !ok {
removed.Insert(fmt.Sprintf("%+v", oldResource))
}
}
added := sets.NewString()
for newResource := range newResources {
if _, ok := oldResources[newResource]; !ok {
added.Insert(fmt.Sprintf("%+v", newResource))
}
}
return fmt.Sprintf("added: %v, removed: %v", added.List(), removed.List())
}

// waitForStopOrTimeout returns a stop channel that closes when the provided stop channel closes or when the specified timeout is reached
func waitForStopOrTimeout(stopCh <-chan struct{}, timeout time.Duration) <-chan struct{} {
stopChWithTimeout := make(chan struct{})
go func() {
select {
case <-stopCh:
case <-time.After(timeout):
}
close(stopChWithTimeout)
}()
return stopChWithTimeout
}

func (gc *GarbageCollector) IsSynced() bool {
return gc.dependencyGraphBuilder.IsSynced()
}
Expand Down
49 changes: 45 additions & 4 deletions pkg/controller/garbagecollector/garbagecollector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -800,6 +800,15 @@ func TestGarbageCollectorSync(t *testing.T) {
},
},
}
unsyncableServerResources := []*metav1.APIResourceList{
{
GroupVersion: "v1",
APIResources: []metav1.APIResource{
{Name: "pods", Namespaced: true, Kind: "Pod", Verbs: metav1.Verbs{"delete", "list", "watch"}},
{Name: "secrets", Namespaced: true, Kind: "Secret", Verbs: metav1.Verbs{"delete", "list", "watch"}},
},
},
}
fakeDiscoveryClient := &fakeServerResources{
PreferredResources: serverResources,
Error: nil,
Expand All @@ -813,6 +822,10 @@ func TestGarbageCollectorSync(t *testing.T) {
200,
[]byte("{}"),
},
"GET" + "/api/v1/secrets": {
404,
[]byte("{}"),
},
},
}
srv, clientConfig := testServerAndClientConfig(testHandler.ServeHTTP)
Expand Down Expand Up @@ -849,7 +862,7 @@ func TestGarbageCollectorSync(t *testing.T) {
fmt.Printf("Test output")
time.Sleep(1 * time.Second)

err = expectSyncNotBlocked(fakeDiscoveryClient)
err = expectSyncNotBlocked(fakeDiscoveryClient, &gc.workerLock)
if err != nil {
t.Fatalf("Expected garbagecollector.Sync to be running but it is blocked: %v", err)
}
Expand All @@ -865,21 +878,49 @@ func TestGarbageCollectorSync(t *testing.T) {
fakeDiscoveryClient.setPreferredResources(serverResources)
fakeDiscoveryClient.setError(nil)

err = expectSyncNotBlocked(fakeDiscoveryClient)
err = expectSyncNotBlocked(fakeDiscoveryClient, &gc.workerLock)
if err != nil {
t.Fatalf("Expected garbagecollector.Sync to still be running but it is blocked: %v", err)
}

// Simulate the discovery client returning a resource the restmapper can resolve, but will not sync caches
fakeDiscoveryClient.setPreferredResources(unsyncableServerResources)
fakeDiscoveryClient.setError(nil)

// Wait until sync discovers the change
time.Sleep(1 * time.Second)

// Put the resources back to normal and ensure garbage collector sync recovers
fakeDiscoveryClient.setPreferredResources(serverResources)
fakeDiscoveryClient.setError(nil)

err = expectSyncNotBlocked(fakeDiscoveryClient, &gc.workerLock)
if err != nil {
t.Fatalf("Expected garbagecollector.Sync to still be running but it is blocked: %v", err)
}
}

func expectSyncNotBlocked(fakeDiscoveryClient *fakeServerResources) error {
func expectSyncNotBlocked(fakeDiscoveryClient *fakeServerResources, workerLock *sync.RWMutex) error {
before := fakeDiscoveryClient.getInterfaceUsedCount()
t := 1 * time.Second
time.Sleep(t)
after := fakeDiscoveryClient.getInterfaceUsedCount()
if before == after {
return fmt.Errorf("discoveryClient.ServerPreferredResources() called %d times over %v", after-before, t)
}
return nil

workerLockAcquired := make(chan struct{})
go func() {
workerLock.Lock()
workerLock.Unlock()
close(workerLockAcquired)
}()
select {
case <-workerLockAcquired:
return nil
case <-time.After(t):
return fmt.Errorf("workerLock blocked for at least %v", t)
}
}

type fakeServerResources struct {
Expand Down
4 changes: 3 additions & 1 deletion pkg/controller/garbagecollector/graph_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,11 +288,13 @@ func (gb *GraphBuilder) IsSynced() bool {
defer gb.monitorLock.Unlock()

if len(gb.monitors) == 0 {
glog.V(4).Info("garbage controller monitor not synced: no monitors")
return false
}

for _, monitor := range gb.monitors {
for resource, monitor := range gb.monitors {
if !monitor.controller.HasSynced() {
glog.V(4).Infof("garbage controller monitor not yet synced: %+v", resource)
return false
}
}
Expand Down
22 changes: 21 additions & 1 deletion staging/src/k8s.io/apimachinery/pkg/util/wait/wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,12 +284,32 @@ func PollImmediateInfinite(interval time.Duration, condition ConditionFunc) erro
// PollUntil tries a condition func until it returns true, an error or stopCh is
// closed.
//
// PolUntil always waits interval before the first run of 'condition'.
// PollUntil always waits interval before the first run of 'condition'.
// 'condition' will always be invoked at least once.
func PollUntil(interval time.Duration, condition ConditionFunc, stopCh <-chan struct{}) error {
return WaitFor(poller(interval, 0), condition, stopCh)
}

// PollImmediateUntil tries a condition func until it returns true, an error or stopCh is closed.
//
// PollImmediateUntil runs the 'condition' before waiting for the interval.
// 'condition' will always be invoked at least once.
func PollImmediateUntil(interval time.Duration, condition ConditionFunc, stopCh <-chan struct{}) error {
done, err := condition()
if err != nil {
return err
}
if done {
return nil
}
select {
case <-stopCh:
return ErrWaitTimeout
default:
return PollUntil(interval, condition, stopCh)
}
}

// WaitFunc creates a channel that receives an item every time a test
// should be executed and is closed when the last test should be invoked.
type WaitFunc func(done <-chan struct{}) <-chan struct{}
Expand Down