From 6f36453883055a5a0df839e640430aac31e70b5f Mon Sep 17 00:00:00 2001 From: Shawn Hurley Date: Fri, 21 Jun 2019 12:20:07 -0400 Subject: [PATCH 1/3] Adding timeout on dependent watch establishment * no longer hang ansible execution because we are unable to create a watch * exposing an error message with hint on potential cause of error --- pkg/ansible/proxy/proxy.go | 24 ++++++++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/pkg/ansible/proxy/proxy.go b/pkg/ansible/proxy/proxy.go index 9716a271e96..8e53f912066 100644 --- a/pkg/ansible/proxy/proxy.go +++ b/pkg/ansible/proxy/proxy.go @@ -27,6 +27,7 @@ import ( "net/http" "strings" "sync" + "time" "github.com/operator-framework/operator-sdk/pkg/ansible/proxy/controllermap" "github.com/operator-framework/operator-sdk/pkg/ansible/proxy/kubeconfig" @@ -39,7 +40,9 @@ import ( "k8s.io/client-go/discovery" "k8s.io/client-go/rest" "sigs.k8s.io/controller-runtime/pkg/cache" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/source" ) @@ -225,8 +228,9 @@ func addWatchToController(owner kubeconfig.NamespacedOwnerReference, cMap *contr owMap.Store(resource.GroupVersionKind()) log.Info("Watching child resource", "kind", resource.GroupVersionKind(), "enqueue_kind", u.GroupVersionKind()) // Store watch in map - err := contents.Controller.Watch(&source.Kind{Type: resource}, &handler.EnqueueRequestForOwner{OwnerType: u}) + err := addWatch(contents.Controller, &source.Kind{Type: resource}, &handler.EnqueueRequestForOwner{OwnerType: u}) if err != nil { + log.Error(err, "gvk", resource.GroupVersionKind()) return err } case (!useOwnerRef && dataNamespaceScoped) || contents.WatchClusterScopedResources: @@ -238,8 +242,9 @@ func addWatchToController(owner kubeconfig.NamespacedOwnerReference, cMap *contr awMap.Store(resource.GroupVersionKind()) typeString := fmt.Sprintf("%v.%v", owner.Kind, ownerGV.Group) log.Info("Watching child resource", "kind", resource.GroupVersionKind(), "enqueue_annotation_type", typeString) - err = contents.Controller.Watch(&source.Kind{Type: resource}, &osdkHandler.EnqueueRequestForAnnotation{Type: typeString}) + err = addWatch(contents.Controller, &source.Kind{Type: resource}, &osdkHandler.EnqueueRequestForAnnotation{Type: typeString}) if err != nil { + log.Error(err, "gvk", resource.GroupVersionKind()) return err } } @@ -247,6 +252,21 @@ func addWatchToController(owner kubeconfig.NamespacedOwnerReference, cMap *contr return nil } +func addWatch(c controller.Controller, s source.Source, eh handler.EventHandler, predicates ...predicate.Predicate) error { + errChan := make(chan error, 1) + go func() { + err := c.Watch(s, eh, predicates...) + errChan <- err + }() + + select { + case watchErr := <-errChan: + return watchErr + case <-time.After(3 * time.Second): + return fmt.Errorf("timeout establishing watch, commonly permissions of the controller are not sufficent") + } +} + func removeAuthorizationHeader(h http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { req.Header.Del("Authorization") From d8cde87d1ca750f93b179478f39c4b6aef234e4c Mon Sep 17 00:00:00 2001 From: Shawn Hurley Date: Fri, 5 Jul 2019 15:08:03 -0400 Subject: [PATCH 2/3] Adding timeout for list and get --- pkg/ansible/proxy/cache_response.go | 46 +++++++++++++++++++++-------- pkg/ansible/proxy/proxy.go | 4 +-- 2 files changed, 36 insertions(+), 14 deletions(-) diff --git a/pkg/ansible/proxy/cache_response.go b/pkg/ansible/proxy/cache_response.go index 66397b85103..57c7729b38f 100644 --- a/pkg/ansible/proxy/cache_response.go +++ b/pkg/ansible/proxy/cache_response.go @@ -21,6 +21,7 @@ import ( "fmt" "net/http" "strings" + "time" "github.com/operator-framework/operator-sdk/pkg/ansible/proxy/controllermap" "github.com/operator-framework/operator-sdk/pkg/ansible/proxy/requestfactory" @@ -213,12 +214,22 @@ func (c *cacheResponseHandler) getListFromCache(r *requestfactory.RequestInfo, r k.Kind = k.Kind + "List" un := unstructured.UnstructuredList{} un.SetGroupVersionKind(k) - err := c.informerCache.List(context.Background(), lo, &un) - if err != nil { - // break here in case resource doesn't exist in cache but exists on APIserver - // This is very unlikely but provides user with expected 404 - log.Info(fmt.Sprintf("cache miss: %v err-%v", k, err)) - return nil, err + errChan := make(chan error, 1) + go func() { + err := c.informerCache.List(context.Background(), lo, &un) + errChan <- err + }() + + select { + case watchErr := <-errChan: + if watchErr != nil { + // break here in case resource doesn't exist in cache but exists on APIserver + // This is very unlikely but provides user with expected 404 + log.Info(fmt.Sprintf("cache miss: %v err-%v", k, watchErr)) + return nil, watchErr + } + case <-time.After(3 * time.Second): + return nil, fmt.Errorf("timeout establishing watch, commonly permissions of the controller are not sufficent") } return &un, nil } @@ -227,13 +238,24 @@ func (c *cacheResponseHandler) getObjectFromCache(r *requestfactory.RequestInfo, un := &unstructured.Unstructured{} un.SetGroupVersionKind(k) obj := client.ObjectKey{Namespace: r.Namespace, Name: r.Name} - err := c.informerCache.Get(context.Background(), obj, un) - if err != nil { - // break here in case resource doesn't exist in cache but exists on APIserver - // This is very unlikely but provides user with expected 404 - log.Info(fmt.Sprintf("Cache miss: %v, %v", k, obj)) - return nil, err + errChan := make(chan error, 1) + go func() { + err := c.informerCache.Get(context.Background(), obj, un) + errChan <- err + }() + + select { + case watchErr := <-errChan: + if watchErr != nil { + // break here in case resource doesn't exist in cache but exists on APIserver + // This is very unlikely but provides user with expected 404 + log.Info(fmt.Sprintf("cache miss: %v err-%v", k, watchErr)) + return nil, watchErr + } + case <-time.After(3 * time.Second): + return nil, fmt.Errorf("timeout establishing watch, commonly permissions of the controller are not sufficent") } + // Once we get the resource, we are going to attempt to recover the dependent watches here, // This will happen in the background, and log errors. if c.injectOwnerRef { diff --git a/pkg/ansible/proxy/proxy.go b/pkg/ansible/proxy/proxy.go index 8e53f912066..9b9e0439604 100644 --- a/pkg/ansible/proxy/proxy.go +++ b/pkg/ansible/proxy/proxy.go @@ -230,7 +230,7 @@ func addWatchToController(owner kubeconfig.NamespacedOwnerReference, cMap *contr // Store watch in map err := addWatch(contents.Controller, &source.Kind{Type: resource}, &handler.EnqueueRequestForOwner{OwnerType: u}) if err != nil { - log.Error(err, "gvk", resource.GroupVersionKind()) + log.Error(err, "GVK", resource.GroupVersionKind()) return err } case (!useOwnerRef && dataNamespaceScoped) || contents.WatchClusterScopedResources: @@ -244,7 +244,7 @@ func addWatchToController(owner kubeconfig.NamespacedOwnerReference, cMap *contr log.Info("Watching child resource", "kind", resource.GroupVersionKind(), "enqueue_annotation_type", typeString) err = addWatch(contents.Controller, &source.Kind{Type: resource}, &osdkHandler.EnqueueRequestForAnnotation{Type: typeString}) if err != nil { - log.Error(err, "gvk", resource.GroupVersionKind()) + log.Error(err, "GVK", resource.GroupVersionKind()) return err } } From 3a85c3792f3647f4cf0268f937f433fbe00ba2f0 Mon Sep 17 00:00:00 2001 From: Shawn Hurley Date: Mon, 8 Jul 2019 17:55:15 -0400 Subject: [PATCH 3/3] adding constant and todo for cache timeout --- pkg/ansible/proxy/cache_response.go | 4 ++-- pkg/ansible/proxy/proxy.go | 6 +++++- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/pkg/ansible/proxy/cache_response.go b/pkg/ansible/proxy/cache_response.go index 57c7729b38f..bd871c5d2b6 100644 --- a/pkg/ansible/proxy/cache_response.go +++ b/pkg/ansible/proxy/cache_response.go @@ -228,7 +228,7 @@ func (c *cacheResponseHandler) getListFromCache(r *requestfactory.RequestInfo, r log.Info(fmt.Sprintf("cache miss: %v err-%v", k, watchErr)) return nil, watchErr } - case <-time.After(3 * time.Second): + case <-time.After(cacheEscacheEstablishmentTimeout): return nil, fmt.Errorf("timeout establishing watch, commonly permissions of the controller are not sufficent") } return &un, nil @@ -252,7 +252,7 @@ func (c *cacheResponseHandler) getObjectFromCache(r *requestfactory.RequestInfo, log.Info(fmt.Sprintf("cache miss: %v err-%v", k, watchErr)) return nil, watchErr } - case <-time.After(3 * time.Second): + case <-time.After(cacheEstacacheEstablishmentTimeout): return nil, fmt.Errorf("timeout establishing watch, commonly permissions of the controller are not sufficent") } diff --git a/pkg/ansible/proxy/proxy.go b/pkg/ansible/proxy/proxy.go index 9b9e0439604..f7bd27ed72d 100644 --- a/pkg/ansible/proxy/proxy.go +++ b/pkg/ansible/proxy/proxy.go @@ -46,6 +46,10 @@ import ( "sigs.k8s.io/controller-runtime/pkg/source" ) +// This is the default timeout to wait for the cache to respond +// TODO: Eventually this should be configurable +const cacheEstablishmentTimeout = 6 * time.Second + // RequestLogHandler - log the requests that come through the proxy. func RequestLogHandler(h http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { @@ -262,7 +266,7 @@ func addWatch(c controller.Controller, s source.Source, eh handler.EventHandler, select { case watchErr := <-errChan: return watchErr - case <-time.After(3 * time.Second): + case <-time.After(cacheEstablishmentTimeout): return fmt.Errorf("timeout establishing watch, commonly permissions of the controller are not sufficent") } }