Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automated cherry pick of #61201: Prevent garbage collector from attempting to sync with 0 resources #61318

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/controller/garbagecollector/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ go_test(
"//vendor/k8s.io/client-go/discovery:go_default_library",
"//vendor/k8s.io/client-go/dynamic:go_default_library",
"//vendor/k8s.io/client-go/informers:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/fake:go_default_library",
"//vendor/k8s.io/client-go/rest:go_default_library",
"//vendor/k8s.io/client-go/util/workqueue:go_default_library",
Expand Down
10 changes: 9 additions & 1 deletion pkg/controller/garbagecollector/garbagecollector.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ type resettableRESTMapper interface {
// Note that discoveryClient should NOT be shared with gc.restMapper, otherwise
// the mapper's underlying discovery client will be unnecessarily reset during
// the course of detecting new resources.
func (gc *GarbageCollector) Sync(discoveryClient discovery.DiscoveryInterface, period time.Duration, stopCh <-chan struct{}) {
func (gc *GarbageCollector) Sync(discoveryClient discovery.ServerResourcesInterface, period time.Duration, stopCh <-chan struct{}) {
oldResources := make(map[schema.GroupVersionResource]struct{})
wait.Until(func() {
// Get the current resource list from discovery.
Expand All @@ -179,6 +179,14 @@ func (gc *GarbageCollector) Sync(discoveryClient discovery.DiscoveryInterface, p
return
}

// This can occur if there is an internal error in GetDeletableResources.
// If the gc attempts to sync with 0 resources it will block forever.
// TODO: Implement a more complete solution for the garbage collector hanging.
if len(newResources) == 0 {
glog.V(5).Infof("no resources reported by discovery, skipping garbage collector sync")
return
}

// Decide whether discovery has reported a change.
if reflect.DeepEqual(oldResources, newResources) {
glog.V(5).Infof("no resource updates from discovery, skipping garbage collector sync")
Expand Down
151 changes: 141 additions & 10 deletions pkg/controller/garbagecollector/garbagecollector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"strings"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"

Expand All @@ -41,6 +42,7 @@ import (
"k8s.io/client-go/discovery"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/util/workqueue"
Expand Down Expand Up @@ -142,18 +144,34 @@ type fakeActionHandler struct {

// ServeHTTP logs the action that occurred and always returns the associated status code
func (f *fakeActionHandler) ServeHTTP(response http.ResponseWriter, request *http.Request) {
f.lock.Lock()
defer f.lock.Unlock()
func() {
f.lock.Lock()
defer f.lock.Unlock()

f.actions = append(f.actions, fakeAction{method: request.Method, path: request.URL.Path, query: request.URL.RawQuery})
fakeResponse, ok := f.response[request.Method+request.URL.Path]
if !ok {
fakeResponse.statusCode = 200
fakeResponse.content = []byte("{\"kind\": \"List\"}")
}
response.Header().Set("Content-Type", "application/json")
response.WriteHeader(fakeResponse.statusCode)
response.Write(fakeResponse.content)
}()

f.actions = append(f.actions, fakeAction{method: request.Method, path: request.URL.Path, query: request.URL.RawQuery})
fakeResponse, ok := f.response[request.Method+request.URL.Path]
if !ok {
fakeResponse.statusCode = 200
fakeResponse.content = []byte("{\"kind\": \"List\"}")
// This is to allow the fakeActionHandler to simulate a watch being opened
if strings.Contains(request.URL.RawQuery, "watch=true") {
hijacker, ok := response.(http.Hijacker)
if !ok {
return
}
connection, _, err := hijacker.Hijack()
if err != nil {
return
}
defer connection.Close()
time.Sleep(30 * time.Second)
}
response.Header().Set("Content-Type", "application/json")
response.WriteHeader(fakeResponse.statusCode)
response.Write(fakeResponse.content)
}

// testServerAndClientConfig returns a server that listens and a config that can reference it
Expand Down Expand Up @@ -759,9 +777,101 @@ func TestGetDeletableResources(t *testing.T) {
}
}

// TestGarbageCollectorSync ensures that a discovery client error
// will not cause the garbage collector to block infinitely.
func TestGarbageCollectorSync(t *testing.T) {
serverResources := []*metav1.APIResourceList{
{
GroupVersion: "v1",
APIResources: []metav1.APIResource{
{Name: "pods", Namespaced: true, Kind: "Pod", Verbs: metav1.Verbs{"delete", "list", "watch"}},
},
},
}
fakeDiscoveryClient := &fakeServerResources{
PreferredResources: serverResources,
Error: nil,
Lock: sync.Mutex{},
InterfaceUsedCount: 0,
}

testHandler := &fakeActionHandler{
response: map[string]FakeResponse{
"GET" + "/api/v1/pods": {
200,
[]byte("{}"),
},
},
}
srv, clientConfig := testServerAndClientConfig(testHandler.ServeHTTP)
defer srv.Close()
clientConfig.ContentConfig.NegotiatedSerializer = nil
client, err := kubernetes.NewForConfig(clientConfig)
if err != nil {
t.Fatal(err)
}

rm := &testRESTMapper{api.Registry.RESTMapper()}
metaOnlyClientPool := dynamic.NewClientPool(clientConfig, rm, dynamic.LegacyAPIPathResolverFunc)
clientPool := dynamic.NewClientPool(clientConfig, rm, dynamic.LegacyAPIPathResolverFunc)
podResource := map[schema.GroupVersionResource]struct{}{
{Group: "", Version: "v1", Resource: "pods"}: {},
}
sharedInformers := informers.NewSharedInformerFactory(client, 0)
alwaysStarted := make(chan struct{})
close(alwaysStarted)
gc, err := NewGarbageCollector(metaOnlyClientPool, clientPool, rm, podResource, map[schema.GroupResource]struct{}{}, sharedInformers, alwaysStarted)
if err != nil {
t.Fatal(err)
}

stopCh := make(chan struct{})
defer close(stopCh)
go gc.Run(1, stopCh)
go gc.Sync(fakeDiscoveryClient, 10*time.Millisecond, stopCh)

// Wait until the sync discovers the initial resources
fmt.Printf("Test output")
time.Sleep(1 * time.Second)

err = expectSyncNotBlocked(fakeDiscoveryClient)
if err != nil {
t.Fatalf("Expected garbagecollector.Sync to be running but it is blocked: %v", err)
}

// Simulate the discovery client returning an error
fakeDiscoveryClient.setPreferredResources(nil)
fakeDiscoveryClient.setError(fmt.Errorf("Error calling discoveryClient.ServerPreferredResources()"))

// Wait until sync discovers the change
time.Sleep(1 * time.Second)

// Remove the error from being returned and see if the garbage collector sync is still working
fakeDiscoveryClient.setPreferredResources(serverResources)
fakeDiscoveryClient.setError(nil)

err = expectSyncNotBlocked(fakeDiscoveryClient)
if err != nil {
t.Fatalf("Expected garbagecollector.Sync to still be running but it is blocked: %v", err)
}
}

func expectSyncNotBlocked(fakeDiscoveryClient *fakeServerResources) error {
before := fakeDiscoveryClient.getInterfaceUsedCount()
t := 1 * time.Second
time.Sleep(t)
after := fakeDiscoveryClient.getInterfaceUsedCount()
if before == after {
return fmt.Errorf("discoveryClient.ServerPreferredResources() called %d times over %v", after-before, t)
}
return nil
}

type fakeServerResources struct {
PreferredResources []*metav1.APIResourceList
Error error
Lock sync.Mutex
InterfaceUsedCount int
}

func (_ *fakeServerResources) ServerResourcesForGroupVersion(groupVersion string) (*metav1.APIResourceList, error) {
Expand All @@ -773,9 +883,30 @@ func (_ *fakeServerResources) ServerResources() ([]*metav1.APIResourceList, erro
}

func (f *fakeServerResources) ServerPreferredResources() ([]*metav1.APIResourceList, error) {
f.Lock.Lock()
defer f.Lock.Unlock()
f.InterfaceUsedCount++
return f.PreferredResources, f.Error
}

func (f *fakeServerResources) setPreferredResources(resources []*metav1.APIResourceList) {
f.Lock.Lock()
defer f.Lock.Unlock()
f.PreferredResources = resources
}

func (f *fakeServerResources) setError(err error) {
f.Lock.Lock()
defer f.Lock.Unlock()
f.Error = err
}

func (f *fakeServerResources) getInterfaceUsedCount() int {
f.Lock.Lock()
defer f.Lock.Unlock()
return f.InterfaceUsedCount
}

func (_ *fakeServerResources) ServerPreferredNamespacedResources() ([]*metav1.APIResourceList, error) {
return nil, nil
}