Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 34 additions & 12 deletions pkg/ansible/proxy/cache_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"net/http"
"strings"
"time"

"github.com/operator-framework/operator-sdk/pkg/ansible/proxy/controllermap"
"github.com/operator-framework/operator-sdk/pkg/ansible/proxy/requestfactory"
Expand Down Expand Up @@ -213,12 +214,22 @@ func (c *cacheResponseHandler) getListFromCache(r *requestfactory.RequestInfo, r
k.Kind = k.Kind + "List"
un := unstructured.UnstructuredList{}
un.SetGroupVersionKind(k)
err := c.informerCache.List(context.Background(), lo, &un)
if err != nil {
// break here in case resource doesn't exist in cache but exists on APIserver
// This is very unlikely but provides user with expected 404
log.Info(fmt.Sprintf("cache miss: %v err-%v", k, err))
return nil, err
errChan := make(chan error, 1)
go func() {
err := c.informerCache.List(context.Background(), lo, &un)
errChan <- err
}()

select {
case watchErr := <-errChan:
if watchErr != nil {
// break here in case resource doesn't exist in cache but exists on APIserver
// This is very unlikely but provides user with expected 404
log.Info(fmt.Sprintf("cache miss: %v err-%v", k, watchErr))
return nil, watchErr
}
case <-time.After(cacheEscacheEstablishmentTimeout):
return nil, fmt.Errorf("timeout establishing watch, commonly permissions of the controller are not sufficent")
}
return &un, nil
}
Expand All @@ -227,13 +238,24 @@ func (c *cacheResponseHandler) getObjectFromCache(r *requestfactory.RequestInfo,
un := &unstructured.Unstructured{}
un.SetGroupVersionKind(k)
obj := client.ObjectKey{Namespace: r.Namespace, Name: r.Name}
err := c.informerCache.Get(context.Background(), obj, un)
if err != nil {
// break here in case resource doesn't exist in cache but exists on APIserver
// This is very unlikely but provides user with expected 404
log.Info(fmt.Sprintf("Cache miss: %v, %v", k, obj))
return nil, err
errChan := make(chan error, 1)
go func() {
err := c.informerCache.Get(context.Background(), obj, un)
errChan <- err
}()

select {
case watchErr := <-errChan:
if watchErr != nil {
// break here in case resource doesn't exist in cache but exists on APIserver
// This is very unlikely but provides user with expected 404
log.Info(fmt.Sprintf("cache miss: %v err-%v", k, watchErr))
return nil, watchErr
}
case <-time.After(cacheEstacacheEstablishmentTimeout):
return nil, fmt.Errorf("timeout establishing watch, commonly permissions of the controller are not sufficent")
}

Comment on lines +241 to +258
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than using a channel, a goroutine, and a timer, a more idiomatic approach would be to use context.WithTimeout(context.Background(), cacheEstablishmentTimeout)

And then check the error from the Get() call to see if it is a context.DeadlineExceeded error.

Ditto for the informerCache.List() call above.

// Once we get the resource, we are going to attempt to recover the dependent watches here,
// This will happen in the background, and log errors.
if c.injectOwnerRef {
Expand Down
28 changes: 26 additions & 2 deletions pkg/ansible/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"net/http"
"strings"
"sync"
"time"

"github.com/operator-framework/operator-sdk/pkg/ansible/proxy/controllermap"
"github.com/operator-framework/operator-sdk/pkg/ansible/proxy/kubeconfig"
Expand All @@ -39,10 +40,16 @@ import (
"k8s.io/client-go/discovery"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/source"
)

// This is the default timeout to wait for the cache to respond
// TODO: Eventually this should be configurable
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// TODO: Eventually this should be configurable
// todo(shawn-hurley): Eventually this should be configurable

const cacheEstablishmentTimeout = 6 * time.Second

// RequestLogHandler - log the requests that come through the proxy.
func RequestLogHandler(h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
Expand Down Expand Up @@ -225,8 +232,9 @@ func addWatchToController(owner kubeconfig.NamespacedOwnerReference, cMap *contr
owMap.Store(resource.GroupVersionKind())
log.Info("Watching child resource", "kind", resource.GroupVersionKind(), "enqueue_kind", u.GroupVersionKind())
// Store watch in map
err := contents.Controller.Watch(&source.Kind{Type: resource}, &handler.EnqueueRequestForOwner{OwnerType: u})
err := addWatch(contents.Controller, &source.Kind{Type: resource}, &handler.EnqueueRequestForOwner{OwnerType: u})
if err != nil {
log.Error(err, "GVK", resource.GroupVersionKind())
return err
}
case (!useOwnerRef && dataNamespaceScoped) || contents.WatchClusterScopedResources:
Expand All @@ -238,15 +246,31 @@ func addWatchToController(owner kubeconfig.NamespacedOwnerReference, cMap *contr
awMap.Store(resource.GroupVersionKind())
typeString := fmt.Sprintf("%v.%v", owner.Kind, ownerGV.Group)
log.Info("Watching child resource", "kind", resource.GroupVersionKind(), "enqueue_annotation_type", typeString)
err = contents.Controller.Watch(&source.Kind{Type: resource}, &osdkHandler.EnqueueRequestForAnnotation{Type: typeString})
err = addWatch(contents.Controller, &source.Kind{Type: resource}, &osdkHandler.EnqueueRequestForAnnotation{Type: typeString})
if err != nil {
log.Error(err, "GVK", resource.GroupVersionKind())
return err
}
}
}
return nil
}

func addWatch(c controller.Controller, s source.Source, eh handler.EventHandler, predicates ...predicate.Predicate) error {
errChan := make(chan error, 1)
go func() {
err := c.Watch(s, eh, predicates...)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should work upstream to get a controller.WatchWithContext() function so that we don't need to resort to this.

The problem here is that it is not possible to cancel the c.Watch() call, so if we reach the deadline, the c.Watch call will just stay running in the background forever. This is effectively a resource leak.

errChan <- err
}()

select {
case watchErr := <-errChan:
return watchErr
case <-time.After(cacheEstablishmentTimeout):
return fmt.Errorf("timeout establishing watch, commonly permissions of the controller are not sufficent")
}
}

func removeAuthorizationHeader(h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
req.Header.Del("Authorization")
Expand Down