Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use cached discovery interface #1422

Merged
merged 8 commits into from
Mar 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion cmd/manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

apiextenstionsv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/discovery/cached/memory"
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
Expand Down Expand Up @@ -116,11 +117,12 @@ func main() {
log.Println(err)
os.Exit(1)
}
cachedDiscoveryClient := memory.NewMemCacheClient(discoveryClient)

err = (&instance.Reconciler{
Client: mgr.GetClient(),
Config: mgr.GetConfig(),
Discovery: discoveryClient,
Discovery: cachedDiscoveryClient,
Recorder: mgr.GetEventRecorderFor("instance-controller"),
Scheme: mgr.GetScheme(),
}).SetupWithManager(mgr)
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ require (
github.com/gosuri/uitable v0.0.4
github.com/huandu/xstrings v1.2.0 // indirect
github.com/imdario/mergo v0.3.7 // indirect
github.com/jstemmer/go-junit-report v0.9.1 // indirect
github.com/konsorten/go-windows-terminal-sequences v1.0.2 // indirect
github.com/manifoldco/promptui v0.6.0
github.com/mattn/go-colorable v0.1.4 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,8 @@ github.com/json-iterator/go v1.1.8 h1:QiWkFLKq0T7mpzwOTu6BzNDbfTE8OLrYhVKYMLF46O
github.com/json-iterator/go v1.1.8/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024 h1:rBMNdlhTLzJjJSDIjNEXX1Pz3Hmwmz91v+zycvx9PJc=
github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU=
github.com/jstemmer/go-junit-report v0.9.1 h1:6QPYqodiu3GuPL+7mfx+NwDdp2eTkp9IfEUpgAwUN0o=
github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk=
github.com/juju/ansiterm v0.0.0-20180109212912-720a0952cc2a h1:FaWFmfWdAUKbSCtOU2QjDaorUexogfaMgbipgYATUMU=
github.com/juju/ansiterm v0.0.0-20180109212912-720a0952cc2a/go.mod h1:UJSiEoRfvx3hP73CvoARgeLjaIOjybY9vj8PUPPFGeU=
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/instance/instance_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ import (
// Reconciler reconciles an Instance object.
type Reconciler struct {
client.Client
Discovery discovery.DiscoveryInterface
Discovery discovery.CachedDiscoveryInterface
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can use the same discovery.DiscoveryInterface and change the impl

Config *rest.Config
Recorder record.EventRecorder
Scheme *runtime.Scheme
Expand Down
12 changes: 6 additions & 6 deletions pkg/controller/instance/instance_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,22 @@ import (
"testing"
"time"

"k8s.io/apimachinery/pkg/util/uuid"
"sigs.k8s.io/controller-runtime/pkg/event"

"github.com/kudobuilder/kudo/pkg/engine/task"

"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/client-go/discovery/cached/memory"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/envtest"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/manager"

"github.com/kudobuilder/kudo/pkg/apis"
"github.com/kudobuilder/kudo/pkg/apis/kudo/v1beta1"
"github.com/kudobuilder/kudo/pkg/engine"
"github.com/kudobuilder/kudo/pkg/engine/task"
"github.com/kudobuilder/kudo/pkg/test/utils"
"github.com/kudobuilder/kudo/pkg/util/convert"
"github.com/kudobuilder/kudo/pkg/util/kudo"
Expand Down Expand Up @@ -132,10 +131,11 @@ func startTestManager(t *testing.T) (chan struct{}, *sync.WaitGroup, client.Clie

discoveryClient, err := utils.GetDiscoveryClient(mgr)
assert.NoError(t, err, "Error when creating discovery client")
cachedDiscoveryClient := memory.NewMemCacheClient(discoveryClient)

err = (&Reconciler{
Client: mgr.GetClient(),
Discovery: discoveryClient,
Discovery: cachedDiscoveryClient,
Config: mgr.GetConfig(),
Recorder: mgr.GetEventRecorderFor("instance-controller"),
Scheme: mgr.GetScheme(),
Expand Down
2 changes: 1 addition & 1 deletion pkg/engine/renderer/enhancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type Enhancer interface {
// DefaultEnhancer is implementation of Enhancer that applies the defined conventions by directly editing runtime.Objects (Unstructured).
type DefaultEnhancer struct {
Scheme *runtime.Scheme
Discovery discovery.DiscoveryInterface
Discovery discovery.CachedDiscoveryInterface
}

// Apply accepts templates to be rendered in kubernetes and enhances them with our own KUDO conventions
Expand Down
54 changes: 44 additions & 10 deletions pkg/engine/resource/object_key.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,20 @@ package resource

import (
"fmt"
"log"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/discovery"
"k8s.io/client-go/discovery/cached/memory"
"sigs.k8s.io/controller-runtime/pkg/client"
)

// ObjectKeyFromObject method wraps client.ObjectKeyFromObject method by additionally checking if passed object is
// a cluster-scoped resource (e.g. CustomResourceDefinition, ClusterRole etc.) and removing the namespace from the
// key since cluster-scoped resources are not namespaced.
func ObjectKeyFromObject(r runtime.Object, di discovery.DiscoveryInterface) (client.ObjectKey, error) {
func ObjectKeyFromObject(r runtime.Object, di discovery.CachedDiscoveryInterface) (client.ObjectKey, error) {
key, err := client.ObjectKeyFromObject(r)
if err != nil {
return client.ObjectKey{}, fmt.Errorf("failed to get an object key from object %v: %v", r.GetObjectKind(), err)
Expand All @@ -30,33 +33,64 @@ func ObjectKeyFromObject(r runtime.Object, di discovery.DiscoveryInterface) (cli
return key, nil
}

func IsNamespacedObject(r runtime.Object, di discovery.DiscoveryInterface) (bool, error) {
func IsNamespacedObject(r runtime.Object, di discovery.CachedDiscoveryInterface) (bool, error) {
gvk := r.GetObjectKind().GroupVersionKind()
return isNamespaced(gvk, di)
}

// isNamespaced method return true if given runtime.Object is a namespaced (not cluster-scoped) resource. It uses the
// discovery client to fetch all API resources (with Groups and Versions), searches for a resource with the passed GVK
// and returns true if it's namespaced. Method returns an error if passed GVK wasn't found in the discovered resource list.
func isNamespaced(gvk schema.GroupVersionKind, di discovery.DiscoveryInterface) (bool, error) {
resList, err := di.ServerResourcesForGroupVersion(gvk.GroupVersion().String())
func isNamespaced(gvk schema.GroupVersionKind, di discovery.CachedDiscoveryInterface) (bool, error) {
// Fetch namespaced API resources

// First try, this may return nil because of the cache
apiResource, err := getAPIResource(gvk, di)
if err != nil {
return false, fmt.Errorf("failed to fetch server resources for %s: %v", gvk.GroupVersion().String(), err)
return false, err
}
if resList == nil {
return false, fmt.Errorf("failed to find server resources for %s: %v", gvk.GroupVersion().String(), err)
if apiResource != nil {
return apiResource.Namespaced, nil
}

gv, err := schema.ParseGroupVersion(resList.GroupVersion)
// Second try, now with invalidated cache. If we still get nil, we know it's not there.
log.Printf("Failed to get APIResource for %v, retry with invalidated cache.", gvk)
di.Invalidate()
apiResource, err = getAPIResource(gvk, di)
if err != nil {
return false, err
}
if apiResource != nil {
return apiResource.Namespaced, nil
}

return false, fmt.Errorf("a resource with GVK %v seems to be missing in API resource list", gvk)
}

// getAPIResource returns a specific APIResource from the DiscoveryInterface or nil if no resource was found.
// As the CachedDiscoverInterface may contain stale data, it can return nil even if the resource actually exists, in that
// case it is advised to invalidate the DI cache and retry the query
// Additionally, this method may return false positives, i.e. an API resource that was already deleted from the api
// server. If no false positive results is required, call di.Invalidate before calling this method
func getAPIResource(gvk schema.GroupVersionKind, di discovery.CachedDiscoveryInterface) (*metav1.APIResource, error) {
resList, err := di.ServerResourcesForGroupVersion(gvk.GroupVersion().String())

if err != nil || resList == nil {
if err == memory.ErrCacheNotFound {
return nil, nil
}
return nil, err
}

gv, err := schema.ParseGroupVersion(resList.GroupVersion)
if err != nil {
return nil, err
}
for _, r := range resList.APIResources {
if gvk == gv.WithKind(r.Kind) {
return r.Namespaced, nil
return &r, nil
}
}

return false, fmt.Errorf("a resource with GVK %v seems to be missing in API resource list", gvk)
return nil, nil
}
2 changes: 1 addition & 1 deletion pkg/engine/resource/object_key_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func Test_isNamespaced(t *testing.T) {
tests := []struct {
name string
gvk schema.GroupVersionKind
di discovery.DiscoveryInterface
di discovery.CachedDiscoveryInterface
want bool
wantErr bool
}{
Expand Down
2 changes: 1 addition & 1 deletion pkg/engine/task/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
// Context is a engine.task execution context containing k8s client, templates parameters etc.
type Context struct {
Client client.Client
Discovery discovery.DiscoveryInterface
Discovery discovery.CachedDiscoveryInterface
Config *rest.Config
Enhancer renderer.Enhancer
Meta renderer.Metadata
Expand Down
2 changes: 1 addition & 1 deletion pkg/engine/workflow/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (ap *ActivePlan) taskByName(name string) (*v1beta1.Task, bool) {
//
// Furthermore, a transient ERROR during a step execution, means that the next step may be executed if the step strategy
// is "parallel". In case of a fatal error, it is returned alongside with the new plan status and published on the event bus.
func Execute(pl *ActivePlan, em *engine.Metadata, c client.Client, di discovery.DiscoveryInterface, config *rest.Config, enh renderer.Enhancer) (*v1beta1.PlanStatus, error) {
func Execute(pl *ActivePlan, em *engine.Metadata, c client.Client, di discovery.CachedDiscoveryInterface, config *rest.Config, enh renderer.Enhancer) (*v1beta1.PlanStatus, error) {
if pl.Status.IsTerminal() {
log.Printf("PlanExecution: %s/%s plan %s is terminal, nothing to do", em.InstanceNamespace, em.InstanceName, pl.Name)
return pl.PlanStatus, nil
Expand Down
4 changes: 3 additions & 1 deletion pkg/engine/workflow/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/discovery/cached/memory"
"k8s.io/client-go/kubernetes/scheme"
"sigs.k8s.io/controller-runtime/pkg/client/fake"

Expand Down Expand Up @@ -622,8 +623,9 @@ func TestExecutePlan(t *testing.T) {

testClient := fake.NewFakeClientWithScheme(scheme.Scheme)
fakeDiscovery := utils.FakeDiscoveryClient()
fakeCachedDiscovery := memory.NewMemCacheClient(fakeDiscovery)
for _, tt := range tests {
newStatus, err := Execute(tt.activePlan, tt.metadata, testClient, fakeDiscovery, nil, tt.enhancer)
newStatus, err := Execute(tt.activePlan, tt.metadata, testClient, fakeCachedDiscovery, nil, tt.enhancer)
newStatus.LastUpdatedTimestamp = &metav1.Time{Time: testTime}

if !tt.wantErr && err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/test/case.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type Case struct {
Timeout int

Client func(forceNew bool) (client.Client, error)
DiscoveryClient func() (discovery.DiscoveryInterface, error)
DiscoveryClient func() (discovery.CachedDiscoveryInterface, error)

Logger testutils.Logger
}
Expand Down
8 changes: 5 additions & 3 deletions pkg/test/harness.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
docker "github.com/docker/docker/client"
yaml "gopkg.in/yaml.v2"
"k8s.io/client-go/discovery"
"k8s.io/client-go/discovery/cached/memory"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -42,7 +43,7 @@ type Harness struct {
config *rest.Config
docker testutils.DockerClient
client client.Client
dclient discovery.DiscoveryInterface
dclient discovery.CachedDiscoveryInterface
env *envtest.Environment
kind *kind
kubeConfigPath string
Expand Down Expand Up @@ -315,7 +316,7 @@ func (h *Harness) Client(forceNew bool) (client.Client, error) {
}

// DiscoveryClient returns the current Kubernetes discovery client for the test harness.
func (h *Harness) DiscoveryClient() (discovery.DiscoveryInterface, error) {
func (h *Harness) DiscoveryClient() (discovery.CachedDiscoveryInterface, error) {
h.clientLock.Lock()
defer h.clientLock.Unlock()

Expand All @@ -328,7 +329,8 @@ func (h *Harness) DiscoveryClient() (discovery.DiscoveryInterface, error) {
return nil, err
}

h.dclient, err = discovery.NewDiscoveryClientForConfig(cfg)
disoveryClient, err := discovery.NewDiscoveryClientForConfig(cfg)
h.dclient = memory.NewMemCacheClient(disoveryClient)
return h.dclient, err
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/test/step.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type Step struct {
Timeout int

Client func(forceNew bool) (client.Client, error)
DiscoveryClient func() (discovery.DiscoveryInterface, error)
DiscoveryClient func() (discovery.CachedDiscoveryInterface, error)

Logger testutils.Logger
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/test/step_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ func TestCheckResourceIntegration(t *testing.T) {
step := Step{
Logger: testutils.NewTestLogger(t, ""),
Client: func(bool) (client.Client, error) { return testenv.Client, nil },
DiscoveryClient: func() (discovery.DiscoveryInterface, error) { return testenv.DiscoveryClient, nil },
DiscoveryClient: func() (discovery.CachedDiscoveryInterface, error) { return testenv.DiscoveryClient, nil },
}

errors := step.CheckResource(test.expected, namespace)
Expand Down Expand Up @@ -305,7 +305,7 @@ func TestStepDeleteExistingLabelMatch(t *testing.T) {
},
},
Client: func(bool) (client.Client, error) { return testenv.Client, nil },
DiscoveryClient: func() (discovery.DiscoveryInterface, error) { return testenv.DiscoveryClient, nil },
DiscoveryClient: func() (discovery.CachedDiscoveryInterface, error) { return testenv.DiscoveryClient, nil },
}

assert.Nil(t, testenv.Client.Create(context.TODO(), podToKeep))
Expand Down
12 changes: 6 additions & 6 deletions pkg/test/step_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestStepClean(t *testing.T) {
pod.DeepCopyObject(), pod2WithDiffNamespace.DeepCopyObject(), testutils.NewPod("does-not-exist", ""),
},
Client: func(bool) (client.Client, error) { return cl, nil },
DiscoveryClient: func() (discovery.DiscoveryInterface, error) { return testutils.FakeDiscoveryClient(), nil },
DiscoveryClient: func() (discovery.CachedDiscoveryInterface, error) { return testutils.FakeDiscoveryClient(), nil },
}

assert.Nil(t, step.Clean(testNamespace))
Expand Down Expand Up @@ -71,7 +71,7 @@ func TestStepCreate(t *testing.T) {
pod.DeepCopyObject(), podWithNamespace.DeepCopyObject(), clusterScopedResource, updateToApply,
},
Client: func(bool) (client.Client, error) { return cl, nil },
DiscoveryClient: func() (discovery.DiscoveryInterface, error) { return testutils.FakeDiscoveryClient(), nil },
DiscoveryClient: func() (discovery.CachedDiscoveryInterface, error) { return testutils.FakeDiscoveryClient(), nil },
}

assert.Equal(t, []error{}, step.Create(testNamespace))
Expand Down Expand Up @@ -119,7 +119,7 @@ func TestStepDeleteExisting(t *testing.T) {
},
},
Client: func(bool) (client.Client, error) { return cl, nil },
DiscoveryClient: func() (discovery.DiscoveryInterface, error) { return testutils.FakeDiscoveryClient(), nil },
DiscoveryClient: func() (discovery.CachedDiscoveryInterface, error) { return testutils.FakeDiscoveryClient(), nil },
}

assert.Nil(t, cl.Get(context.TODO(), testutils.ObjectKey(podToKeep), podToKeep))
Expand Down Expand Up @@ -182,7 +182,7 @@ func TestCheckResource(t *testing.T) {
Client: func(bool) (client.Client, error) {
return fake.NewFakeClientWithScheme(scheme.Scheme, test.actual), nil
},
DiscoveryClient: func() (discovery.DiscoveryInterface, error) { return fakeDiscovery, nil },
DiscoveryClient: func() (discovery.CachedDiscoveryInterface, error) { return fakeDiscovery, nil },
}

errors := step.CheckResource(test.expected, namespace)
Expand Down Expand Up @@ -233,7 +233,7 @@ func TestCheckResourceAbsent(t *testing.T) {
Client: func(bool) (client.Client, error) {
return fake.NewFakeClientWithScheme(scheme.Scheme, test.actual), nil
},
DiscoveryClient: func() (discovery.DiscoveryInterface, error) { return fakeDiscovery, nil },
DiscoveryClient: func() (discovery.CachedDiscoveryInterface, error) { return fakeDiscovery, nil },
}

error := step.CheckResourceAbsent(test.expected, testNamespace)
Expand Down Expand Up @@ -304,7 +304,7 @@ func TestRun(t *testing.T) {
cl := fake.NewFakeClientWithScheme(scheme.Scheme)

test.Step.Client = func(bool) (client.Client, error) { return cl, nil }
test.Step.DiscoveryClient = func() (discovery.DiscoveryInterface, error) { return testutils.FakeDiscoveryClient(), nil }
test.Step.DiscoveryClient = func() (discovery.CachedDiscoveryInterface, error) { return testutils.FakeDiscoveryClient(), nil }
test.Step.Logger = testutils.NewTestLogger(t, "")

if test.updateMethod != nil {
Expand Down
Loading