Skip to content

Commit

Permalink
Use cached discovery interface (#1422)
Browse files Browse the repository at this point in the history
* Use cached discovery interface to prevent too many requests to K8s API

Signed-off-by: Andreas Neumann <aneumann@mesosphere.com>
  • Loading branch information
ANeumann82 committed Mar 23, 2020
1 parent b1ef318 commit dc74924
Show file tree
Hide file tree
Showing 17 changed files with 143 additions and 74 deletions.
4 changes: 3 additions & 1 deletion cmd/manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

apiextenstionsv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/discovery/cached/memory"
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
Expand Down Expand Up @@ -116,11 +117,12 @@ func main() {
log.Println(err)
os.Exit(1)
}
cachedDiscoveryClient := memory.NewMemCacheClient(discoveryClient)

err = (&instance.Reconciler{
Client: mgr.GetClient(),
Config: mgr.GetConfig(),
Discovery: discoveryClient,
Discovery: cachedDiscoveryClient,
Recorder: mgr.GetEventRecorderFor("instance-controller"),
Scheme: mgr.GetScheme(),
}).SetupWithManager(mgr)
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ require (
github.com/gosuri/uitable v0.0.4
github.com/huandu/xstrings v1.2.0 // indirect
github.com/imdario/mergo v0.3.7 // indirect
github.com/jstemmer/go-junit-report v0.9.1 // indirect
github.com/konsorten/go-windows-terminal-sequences v1.0.2 // indirect
github.com/manifoldco/promptui v0.6.0
github.com/mattn/go-colorable v0.1.4 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,8 @@ github.com/json-iterator/go v1.1.8 h1:QiWkFLKq0T7mpzwOTu6BzNDbfTE8OLrYhVKYMLF46O
github.com/json-iterator/go v1.1.8/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024 h1:rBMNdlhTLzJjJSDIjNEXX1Pz3Hmwmz91v+zycvx9PJc=
github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU=
github.com/jstemmer/go-junit-report v0.9.1 h1:6QPYqodiu3GuPL+7mfx+NwDdp2eTkp9IfEUpgAwUN0o=
github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk=
github.com/juju/ansiterm v0.0.0-20180109212912-720a0952cc2a h1:FaWFmfWdAUKbSCtOU2QjDaorUexogfaMgbipgYATUMU=
github.com/juju/ansiterm v0.0.0-20180109212912-720a0952cc2a/go.mod h1:UJSiEoRfvx3hP73CvoARgeLjaIOjybY9vj8PUPPFGeU=
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/instance/instance_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ import (
// Reconciler reconciles an Instance object.
type Reconciler struct {
client.Client
Discovery discovery.DiscoveryInterface
Discovery discovery.CachedDiscoveryInterface
Config *rest.Config
Recorder record.EventRecorder
Scheme *runtime.Scheme
Expand Down
12 changes: 6 additions & 6 deletions pkg/controller/instance/instance_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,22 @@ import (
"testing"
"time"

"k8s.io/apimachinery/pkg/util/uuid"
"sigs.k8s.io/controller-runtime/pkg/event"

"github.com/kudobuilder/kudo/pkg/engine/task"

"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/client-go/discovery/cached/memory"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/envtest"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/manager"

"github.com/kudobuilder/kudo/pkg/apis"
"github.com/kudobuilder/kudo/pkg/apis/kudo/v1beta1"
"github.com/kudobuilder/kudo/pkg/engine"
"github.com/kudobuilder/kudo/pkg/engine/task"
"github.com/kudobuilder/kudo/pkg/test/utils"
"github.com/kudobuilder/kudo/pkg/util/convert"
"github.com/kudobuilder/kudo/pkg/util/kudo"
Expand Down Expand Up @@ -132,10 +131,11 @@ func startTestManager(t *testing.T) (chan struct{}, *sync.WaitGroup, client.Clie

discoveryClient, err := utils.GetDiscoveryClient(mgr)
assert.NoError(t, err, "Error when creating discovery client")
cachedDiscoveryClient := memory.NewMemCacheClient(discoveryClient)

err = (&Reconciler{
Client: mgr.GetClient(),
Discovery: discoveryClient,
Discovery: cachedDiscoveryClient,
Config: mgr.GetConfig(),
Recorder: mgr.GetEventRecorderFor("instance-controller"),
Scheme: mgr.GetScheme(),
Expand Down
2 changes: 1 addition & 1 deletion pkg/engine/renderer/enhancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type Enhancer interface {
// DefaultEnhancer is implementation of Enhancer that applies the defined conventions by directly editing runtime.Objects (Unstructured).
type DefaultEnhancer struct {
Scheme *runtime.Scheme
Discovery discovery.DiscoveryInterface
Discovery discovery.CachedDiscoveryInterface
}

// Apply accepts templates to be rendered in kubernetes and enhances them with our own KUDO conventions
Expand Down
54 changes: 44 additions & 10 deletions pkg/engine/resource/object_key.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,20 @@ package resource

import (
"fmt"
"log"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/discovery"
"k8s.io/client-go/discovery/cached/memory"
"sigs.k8s.io/controller-runtime/pkg/client"
)

// ObjectKeyFromObject method wraps client.ObjectKeyFromObject method by additionally checking if passed object is
// a cluster-scoped resource (e.g. CustomResourceDefinition, ClusterRole etc.) and removing the namespace from the
// key since cluster-scoped resources are not namespaced.
func ObjectKeyFromObject(r runtime.Object, di discovery.DiscoveryInterface) (client.ObjectKey, error) {
func ObjectKeyFromObject(r runtime.Object, di discovery.CachedDiscoveryInterface) (client.ObjectKey, error) {
key, err := client.ObjectKeyFromObject(r)
if err != nil {
return client.ObjectKey{}, fmt.Errorf("failed to get an object key from object %v: %v", r.GetObjectKind(), err)
Expand All @@ -30,33 +33,64 @@ func ObjectKeyFromObject(r runtime.Object, di discovery.DiscoveryInterface) (cli
return key, nil
}

func IsNamespacedObject(r runtime.Object, di discovery.DiscoveryInterface) (bool, error) {
func IsNamespacedObject(r runtime.Object, di discovery.CachedDiscoveryInterface) (bool, error) {
gvk := r.GetObjectKind().GroupVersionKind()
return isNamespaced(gvk, di)
}

// isNamespaced method return true if given runtime.Object is a namespaced (not cluster-scoped) resource. It uses the
// discovery client to fetch all API resources (with Groups and Versions), searches for a resource with the passed GVK
// and returns true if it's namespaced. Method returns an error if passed GVK wasn't found in the discovered resource list.
func isNamespaced(gvk schema.GroupVersionKind, di discovery.DiscoveryInterface) (bool, error) {
resList, err := di.ServerResourcesForGroupVersion(gvk.GroupVersion().String())
func isNamespaced(gvk schema.GroupVersionKind, di discovery.CachedDiscoveryInterface) (bool, error) {
// Fetch namespaced API resources

// First try, this may return nil because of the cache
apiResource, err := getAPIResource(gvk, di)
if err != nil {
return false, fmt.Errorf("failed to fetch server resources for %s: %v", gvk.GroupVersion().String(), err)
return false, err
}
if resList == nil {
return false, fmt.Errorf("failed to find server resources for %s: %v", gvk.GroupVersion().String(), err)
if apiResource != nil {
return apiResource.Namespaced, nil
}

gv, err := schema.ParseGroupVersion(resList.GroupVersion)
// Second try, now with invalidated cache. If we still get nil, we know it's not there.
log.Printf("Failed to get APIResource for %v, retry with invalidated cache.", gvk)
di.Invalidate()
apiResource, err = getAPIResource(gvk, di)
if err != nil {
return false, err
}
if apiResource != nil {
return apiResource.Namespaced, nil
}

return false, fmt.Errorf("a resource with GVK %v seems to be missing in API resource list", gvk)
}

// getAPIResource returns a specific APIResource from the DiscoveryInterface or nil if no resource was found.
// As the CachedDiscoverInterface may contain stale data, it can return nil even if the resource actually exists, in that
// case it is advised to invalidate the DI cache and retry the query
// Additionally, this method may return false positives, i.e. an API resource that was already deleted from the api
// server. If no false positive results is required, call di.Invalidate before calling this method
func getAPIResource(gvk schema.GroupVersionKind, di discovery.CachedDiscoveryInterface) (*metav1.APIResource, error) {
resList, err := di.ServerResourcesForGroupVersion(gvk.GroupVersion().String())

if err != nil || resList == nil {
if err == memory.ErrCacheNotFound {
return nil, nil
}
return nil, err
}

gv, err := schema.ParseGroupVersion(resList.GroupVersion)
if err != nil {
return nil, err
}
for _, r := range resList.APIResources {
if gvk == gv.WithKind(r.Kind) {
return r.Namespaced, nil
return &r, nil
}
}

return false, fmt.Errorf("a resource with GVK %v seems to be missing in API resource list", gvk)
return nil, nil
}
2 changes: 1 addition & 1 deletion pkg/engine/resource/object_key_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func Test_isNamespaced(t *testing.T) {
tests := []struct {
name string
gvk schema.GroupVersionKind
di discovery.DiscoveryInterface
di discovery.CachedDiscoveryInterface
want bool
wantErr bool
}{
Expand Down
2 changes: 1 addition & 1 deletion pkg/engine/task/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
// Context is a engine.task execution context containing k8s client, templates parameters etc.
type Context struct {
Client client.Client
Discovery discovery.DiscoveryInterface
Discovery discovery.CachedDiscoveryInterface
Config *rest.Config
Enhancer renderer.Enhancer
Meta renderer.Metadata
Expand Down
2 changes: 1 addition & 1 deletion pkg/engine/workflow/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (ap *ActivePlan) taskByName(name string) (*v1beta1.Task, bool) {
//
// Furthermore, a transient ERROR during a step execution, means that the next step may be executed if the step strategy
// is "parallel". In case of a fatal error, it is returned alongside with the new plan status and published on the event bus.
func Execute(pl *ActivePlan, em *engine.Metadata, c client.Client, di discovery.DiscoveryInterface, config *rest.Config, enh renderer.Enhancer) (*v1beta1.PlanStatus, error) {
func Execute(pl *ActivePlan, em *engine.Metadata, c client.Client, di discovery.CachedDiscoveryInterface, config *rest.Config, enh renderer.Enhancer) (*v1beta1.PlanStatus, error) {
if pl.Status.IsTerminal() {
log.Printf("PlanExecution: %s/%s plan %s is terminal, nothing to do", em.InstanceNamespace, em.InstanceName, pl.Name)
return pl.PlanStatus, nil
Expand Down
4 changes: 3 additions & 1 deletion pkg/engine/workflow/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/discovery/cached/memory"
"k8s.io/client-go/kubernetes/scheme"
"sigs.k8s.io/controller-runtime/pkg/client/fake"

Expand Down Expand Up @@ -622,8 +623,9 @@ func TestExecutePlan(t *testing.T) {

testClient := fake.NewFakeClientWithScheme(scheme.Scheme)
fakeDiscovery := utils.FakeDiscoveryClient()
fakeCachedDiscovery := memory.NewMemCacheClient(fakeDiscovery)
for _, tt := range tests {
newStatus, err := Execute(tt.activePlan, tt.metadata, testClient, fakeDiscovery, nil, tt.enhancer)
newStatus, err := Execute(tt.activePlan, tt.metadata, testClient, fakeCachedDiscovery, nil, tt.enhancer)
newStatus.LastUpdatedTimestamp = &metav1.Time{Time: testTime}

if !tt.wantErr && err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/test/case.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type Case struct {
Timeout int

Client func(forceNew bool) (client.Client, error)
DiscoveryClient func() (discovery.DiscoveryInterface, error)
DiscoveryClient func() (discovery.CachedDiscoveryInterface, error)

Logger testutils.Logger
}
Expand Down
8 changes: 5 additions & 3 deletions pkg/test/harness.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
docker "github.com/docker/docker/client"
yaml "gopkg.in/yaml.v2"
"k8s.io/client-go/discovery"
"k8s.io/client-go/discovery/cached/memory"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -42,7 +43,7 @@ type Harness struct {
config *rest.Config
docker testutils.DockerClient
client client.Client
dclient discovery.DiscoveryInterface
dclient discovery.CachedDiscoveryInterface
env *envtest.Environment
kind *kind
kubeConfigPath string
Expand Down Expand Up @@ -315,7 +316,7 @@ func (h *Harness) Client(forceNew bool) (client.Client, error) {
}

// DiscoveryClient returns the current Kubernetes discovery client for the test harness.
func (h *Harness) DiscoveryClient() (discovery.DiscoveryInterface, error) {
func (h *Harness) DiscoveryClient() (discovery.CachedDiscoveryInterface, error) {
h.clientLock.Lock()
defer h.clientLock.Unlock()

Expand All @@ -328,7 +329,8 @@ func (h *Harness) DiscoveryClient() (discovery.DiscoveryInterface, error) {
return nil, err
}

h.dclient, err = discovery.NewDiscoveryClientForConfig(cfg)
disoveryClient, err := discovery.NewDiscoveryClientForConfig(cfg)
h.dclient = memory.NewMemCacheClient(disoveryClient)
return h.dclient, err
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/test/step.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type Step struct {
Timeout int

Client func(forceNew bool) (client.Client, error)
DiscoveryClient func() (discovery.DiscoveryInterface, error)
DiscoveryClient func() (discovery.CachedDiscoveryInterface, error)

Logger testutils.Logger
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/test/step_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ func TestCheckResourceIntegration(t *testing.T) {
step := Step{
Logger: testutils.NewTestLogger(t, ""),
Client: func(bool) (client.Client, error) { return testenv.Client, nil },
DiscoveryClient: func() (discovery.DiscoveryInterface, error) { return testenv.DiscoveryClient, nil },
DiscoveryClient: func() (discovery.CachedDiscoveryInterface, error) { return testenv.DiscoveryClient, nil },
}

errors := step.CheckResource(test.expected, namespace)
Expand Down Expand Up @@ -305,7 +305,7 @@ func TestStepDeleteExistingLabelMatch(t *testing.T) {
},
},
Client: func(bool) (client.Client, error) { return testenv.Client, nil },
DiscoveryClient: func() (discovery.DiscoveryInterface, error) { return testenv.DiscoveryClient, nil },
DiscoveryClient: func() (discovery.CachedDiscoveryInterface, error) { return testenv.DiscoveryClient, nil },
}

assert.Nil(t, testenv.Client.Create(context.TODO(), podToKeep))
Expand Down
12 changes: 6 additions & 6 deletions pkg/test/step_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestStepClean(t *testing.T) {
pod.DeepCopyObject(), pod2WithDiffNamespace.DeepCopyObject(), testutils.NewPod("does-not-exist", ""),
},
Client: func(bool) (client.Client, error) { return cl, nil },
DiscoveryClient: func() (discovery.DiscoveryInterface, error) { return testutils.FakeDiscoveryClient(), nil },
DiscoveryClient: func() (discovery.CachedDiscoveryInterface, error) { return testutils.FakeDiscoveryClient(), nil },
}

assert.Nil(t, step.Clean(testNamespace))
Expand Down Expand Up @@ -71,7 +71,7 @@ func TestStepCreate(t *testing.T) {
pod.DeepCopyObject(), podWithNamespace.DeepCopyObject(), clusterScopedResource, updateToApply,
},
Client: func(bool) (client.Client, error) { return cl, nil },
DiscoveryClient: func() (discovery.DiscoveryInterface, error) { return testutils.FakeDiscoveryClient(), nil },
DiscoveryClient: func() (discovery.CachedDiscoveryInterface, error) { return testutils.FakeDiscoveryClient(), nil },
}

assert.Equal(t, []error{}, step.Create(testNamespace))
Expand Down Expand Up @@ -119,7 +119,7 @@ func TestStepDeleteExisting(t *testing.T) {
},
},
Client: func(bool) (client.Client, error) { return cl, nil },
DiscoveryClient: func() (discovery.DiscoveryInterface, error) { return testutils.FakeDiscoveryClient(), nil },
DiscoveryClient: func() (discovery.CachedDiscoveryInterface, error) { return testutils.FakeDiscoveryClient(), nil },
}

assert.Nil(t, cl.Get(context.TODO(), testutils.ObjectKey(podToKeep), podToKeep))
Expand Down Expand Up @@ -182,7 +182,7 @@ func TestCheckResource(t *testing.T) {
Client: func(bool) (client.Client, error) {
return fake.NewFakeClientWithScheme(scheme.Scheme, test.actual), nil
},
DiscoveryClient: func() (discovery.DiscoveryInterface, error) { return fakeDiscovery, nil },
DiscoveryClient: func() (discovery.CachedDiscoveryInterface, error) { return fakeDiscovery, nil },
}

errors := step.CheckResource(test.expected, namespace)
Expand Down Expand Up @@ -233,7 +233,7 @@ func TestCheckResourceAbsent(t *testing.T) {
Client: func(bool) (client.Client, error) {
return fake.NewFakeClientWithScheme(scheme.Scheme, test.actual), nil
},
DiscoveryClient: func() (discovery.DiscoveryInterface, error) { return fakeDiscovery, nil },
DiscoveryClient: func() (discovery.CachedDiscoveryInterface, error) { return fakeDiscovery, nil },
}

error := step.CheckResourceAbsent(test.expected, testNamespace)
Expand Down Expand Up @@ -304,7 +304,7 @@ func TestRun(t *testing.T) {
cl := fake.NewFakeClientWithScheme(scheme.Scheme)

test.Step.Client = func(bool) (client.Client, error) { return cl, nil }
test.Step.DiscoveryClient = func() (discovery.DiscoveryInterface, error) { return testutils.FakeDiscoveryClient(), nil }
test.Step.DiscoveryClient = func() (discovery.CachedDiscoveryInterface, error) { return testutils.FakeDiscoveryClient(), nil }
test.Step.Logger = testutils.NewTestLogger(t, "")

if test.updateMethod != nil {
Expand Down
Loading

0 comments on commit dc74924

Please sign in to comment.