Skip to content

Commit

Permalink
⚠️ Add ability for the delegating client to avoid caching objects
Browse files Browse the repository at this point in the history
A long standing issue in controller runtime, the delegating client is
the default client that the manager creates. Whenever a user calls
Get/List on a typed object, the internal cache spins up an informer and
start watching *all* objects for that group-version-kind.

This change introduces the ability for the delegating client to take
a list of objects that should always hit the live api-server, and bypass
the cache.

We also offer the ability to build a manager.NewClientFunc with a new
builder that exposes the ability to mark which objects we want to be
uncached.

Signed-off-by: Vince Prignano <vincepri@vmware.com>
  • Loading branch information
vincepri committed Nov 12, 2020
1 parent 3e4e60c commit 6089977
Show file tree
Hide file tree
Showing 6 changed files with 140 additions and 44 deletions.
12 changes: 8 additions & 4 deletions pkg/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3065,10 +3065,11 @@ var _ = Describe("DelegatingClient", func() {
cachedReader := &fakeReader{}
cl, err := client.New(cfg, client.Options{})
Expect(err).NotTo(HaveOccurred())
dReader := client.NewDelegatingClient(client.NewDelegatingClientInput{
dReader, err := client.NewDelegatingClient(client.NewDelegatingClientInput{
CacheReader: cachedReader,
Client: cl,
})
Expect(err).NotTo(HaveOccurred())
var actual appsv1.Deployment
key := client.ObjectKey{Namespace: "ns", Name: "name"}
Expect(dReader.Get(context.TODO(), key, &actual)).To(Succeed())
Expand All @@ -3079,10 +3080,11 @@ var _ = Describe("DelegatingClient", func() {
cachedReader := &fakeReader{}
cl, err := client.New(cfg, client.Options{})
Expect(err).NotTo(HaveOccurred())
dReader := client.NewDelegatingClient(client.NewDelegatingClientInput{
dReader, err := client.NewDelegatingClient(client.NewDelegatingClientInput{
CacheReader: cachedReader,
Client: cl,
})
Expect(err).NotTo(HaveOccurred())
dep := &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: "deployment1",
Expand Down Expand Up @@ -3123,10 +3125,11 @@ var _ = Describe("DelegatingClient", func() {
cachedReader := &fakeReader{}
cl, err := client.New(cfg, client.Options{})
Expect(err).NotTo(HaveOccurred())
dReader := client.NewDelegatingClient(client.NewDelegatingClientInput{
dReader, err := client.NewDelegatingClient(client.NewDelegatingClientInput{
CacheReader: cachedReader,
Client: cl,
})
Expect(err).NotTo(HaveOccurred())
var actual appsv1.DeploymentList
Expect(dReader.List(context.Background(), &actual)).To(Succeed())
Expect(1).To(Equal(cachedReader.Called))
Expand All @@ -3136,10 +3139,11 @@ var _ = Describe("DelegatingClient", func() {
cachedReader := &fakeReader{}
cl, err := client.New(cfg, client.Options{})
Expect(err).NotTo(HaveOccurred())
dReader := client.NewDelegatingClient(client.NewDelegatingClientInput{
dReader, err := client.NewDelegatingClient(client.NewDelegatingClientInput{
CacheReader: cachedReader,
Client: cl,
})
Expect(err).NotTo(HaveOccurred())

actual := &unstructured.UnstructuredList{}
actual.SetGroupVersionKind(schema.GroupVersionKind{
Expand Down
46 changes: 38 additions & 8 deletions pkg/client/split.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,30 +22,44 @@ import (
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
)

// NewDelegatingClientInput encapsulates the input parameters to create a new delegating client.
type NewDelegatingClientInput struct {
CacheReader Reader
Client Client
CacheReader Reader
Client Client
UncachedObjects []Object
}

// NewDelegatingClient creates a new delegating client.
//
// A delegating client forms a Client by composing separate reader, writer and
// statusclient interfaces. This way, you can have an Client that reads from a
// cache and writes to the API server.
func NewDelegatingClient(in NewDelegatingClientInput) Client {
func NewDelegatingClient(in NewDelegatingClientInput) (Client, error) {
uncachedGVKs := map[schema.GroupVersionKind]struct{}{}
for _, obj := range in.UncachedObjects {
gvk, err := apiutil.GVKForObject(obj, in.Client.Scheme())
if err != nil {
return nil, err
}
uncachedGVKs[gvk] = struct{}{}
}

return &delegatingClient{
scheme: in.Client.Scheme(),
mapper: in.Client.RESTMapper(),
Reader: &delegatingReader{
CacheReader: in.CacheReader,
ClientReader: in.Client,
scheme: in.Client.Scheme(),
uncachedGVKs: uncachedGVKs,
},
Writer: in.Client,
StatusClient: in.Client,
}
}, nil
}

type delegatingClient struct {
Expand Down Expand Up @@ -75,21 +89,37 @@ func (d *delegatingClient) RESTMapper() meta.RESTMapper {
type delegatingReader struct {
CacheReader Reader
ClientReader Reader

uncachedGVKs map[schema.GroupVersionKind]struct{}
scheme *runtime.Scheme
}

func (d *delegatingReader) shouldBypassCache(obj runtime.Object) (bool, error) {
gvk, err := apiutil.GVKForObject(obj, d.scheme)
if err != nil {
return false, err
}
_, isUncached := d.uncachedGVKs[gvk]
_, isUnstructured := obj.(*unstructured.Unstructured)
_, isUnstructuredList := obj.(*unstructured.UnstructuredList)
return isUncached || isUnstructured || isUnstructuredList, nil
}

// Get retrieves an obj for a given object key from the Kubernetes Cluster.
func (d *delegatingReader) Get(ctx context.Context, key ObjectKey, obj Object) error {
_, isUnstructured := obj.(*unstructured.Unstructured)
if isUnstructured {
if isUncached, err := d.shouldBypassCache(obj); err != nil {
return err
} else if isUncached {
return d.ClientReader.Get(ctx, key, obj)
}
return d.CacheReader.Get(ctx, key, obj)
}

// List retrieves list of objects for a given namespace and list options.
func (d *delegatingReader) List(ctx context.Context, list ObjectList, opts ...ListOption) error {
_, isUnstructured := list.(*unstructured.UnstructuredList)
if isUnstructured {
if isUncached, err := d.shouldBypassCache(list); err != nil {
return err
} else if isUncached {
return d.ClientReader.List(ctx, list, opts...)
}
return d.CacheReader.List(ctx, list, opts...)
Expand Down
61 changes: 61 additions & 0 deletions pkg/manager/client_builder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package manager

import (
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
)

// ClientBuilder builder is the interface for the client builder.
type ClientBuilder interface {
// WithUncached takes a list of runtime objects (plain or lists) that users don't want to cache
// for this client. This function can be called multiple times, it should append to an internal slice.
WithUncached(objs ...client.Object) ClientBuilder

// Build returns a new client.
Build(cache cache.Cache, config *rest.Config, options client.Options) (client.Client, error)
}

// NewClientBuilder returns a builder to build new clients to be passed when creating a Manager.
func NewClientBuilder() ClientBuilder {
return &newClientBuilder{}
}

type newClientBuilder struct {
uncached []client.Object
}

func (n *newClientBuilder) WithUncached(objs ...client.Object) ClientBuilder {
n.uncached = append(n.uncached, objs...)
return n
}

func (n *newClientBuilder) Build(cache cache.Cache, config *rest.Config, options client.Options) (client.Client, error) {
// Create the Client for Write operations.
c, err := client.New(config, options)
if err != nil {
return nil, err
}

return client.NewDelegatingClient(client.NewDelegatingClientInput{
CacheReader: cache,
Client: c,
UncachedObjects: n.uncached,
})
}
39 changes: 15 additions & 24 deletions pkg/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,10 +235,14 @@ type Options struct {
// by the manager. If not set this will use the default new cache function.
NewCache cache.NewCacheFunc

// NewClient will create the client to be used by the manager.
// ClientBuilder is the builder that creates the client to be used by the manager.
// If not set this will create the default DelegatingClient that will
// use the cache for reads and the client for writes.
NewClient NewClientFunc
ClientBuilder ClientBuilder

// ClientDisableCacheFor tells the client that, if any cache is used, to bypass it
// for the given objects.
ClientDisableCacheFor []client.Object

// DryRunClient specifies whether the client should be configured to enforce
// dryRun mode.
Expand Down Expand Up @@ -270,9 +274,6 @@ type Options struct {
newHealthProbeListener func(addr string) (net.Listener, error)
}

// NewClientFunc allows a user to define how to create a client
type NewClientFunc func(cache cache.Cache, config *rest.Config, options client.Options) (client.Client, error)

// Runnable allows a component to be started.
// It's very important that Start blocks until
// it's done running.
Expand Down Expand Up @@ -323,12 +324,16 @@ func New(config *rest.Config, options Options) (Manager, error) {
return nil, err
}

apiReader, err := client.New(config, client.Options{Scheme: options.Scheme, Mapper: mapper})
clientOptions := client.Options{Scheme: options.Scheme, Mapper: mapper}

apiReader, err := client.New(config, clientOptions)
if err != nil {
return nil, err
}

writeObj, err := options.NewClient(cache, config, client.Options{Scheme: options.Scheme, Mapper: mapper})
writeObj, err := options.ClientBuilder.
WithUncached(options.ClientDisableCacheFor...).
Build(cache, config, clientOptions)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -503,20 +508,6 @@ func (o Options) setLeaderElectionConfig(obj v1alpha1.ControllerManagerConfigura
return o
}

// DefaultNewClient creates the default caching client
func DefaultNewClient(cache cache.Cache, config *rest.Config, options client.Options) (client.Client, error) {
// Create the Client for Write operations.
c, err := client.New(config, options)
if err != nil {
return nil, err
}

return client.NewDelegatingClient(client.NewDelegatingClientInput{
CacheReader: cache,
Client: c,
}), nil
}

// defaultHealthProbeListener creates the default health probes listener bound to the given address
func defaultHealthProbeListener(addr string) (net.Listener, error) {
if addr == "" || addr == "0" {
Expand All @@ -543,9 +534,9 @@ func setOptionsDefaults(options Options) Options {
}
}

// Allow newClient to be mocked
if options.NewClient == nil {
options.NewClient = DefaultNewClient
// Allow the client builder to be mocked
if options.ClientBuilder == nil {
options.ClientBuilder = NewClientBuilder()
}

// Allow newCache to be mocked
Expand Down
20 changes: 14 additions & 6 deletions pkg/manager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,18 @@ import (
"sigs.k8s.io/controller-runtime/pkg/runtime/inject"
)

type fakeClientBuilder struct {
err error
}

func (e *fakeClientBuilder) WithUncached(objs ...client.Object) ClientBuilder {
return e
}

func (e *fakeClientBuilder) Build(cache cache.Cache, config *rest.Config, options client.Options) (client.Client, error) {
return nil, e.err
}

var _ = Describe("manger.Manager", func() {
Describe("New", func() {
It("should return an error if there is no Config", func() {
Expand All @@ -75,9 +87,7 @@ var _ = Describe("manger.Manager", func() {

It("should return an error it can't create a client.Client", func(done Done) {
m, err := New(cfg, Options{
NewClient: func(cache cache.Cache, config *rest.Config, options client.Options) (client.Client, error) {
return nil, fmt.Errorf("expected error")
},
ClientBuilder: &fakeClientBuilder{err: fmt.Errorf("expected error")},
})
Expect(m).To(BeNil())
Expect(err).To(HaveOccurred())
Expand All @@ -101,9 +111,7 @@ var _ = Describe("manger.Manager", func() {

It("should create a client defined in by the new client function", func(done Done) {
m, err := New(cfg, Options{
NewClient: func(cache cache.Cache, config *rest.Config, options client.Options) (client.Client, error) {
return nil, nil
},
ClientBuilder: &fakeClientBuilder{},
})
Expect(m).ToNot(BeNil())
Expect(err).ToNot(HaveOccurred())
Expand Down
6 changes: 4 additions & 2 deletions pkg/runtime/inject/inject_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ var _ = Describe("runtime inject", func() {
})

It("should set client", func() {
client := client.NewDelegatingClient(client.NewDelegatingClientInput{Client: fake.NewFakeClient()})
client, err := client.NewDelegatingClient(client.NewDelegatingClientInput{Client: fake.NewFakeClient()})
Expect(err).NotTo(HaveOccurred())

By("Validating injecting client")
res, err := ClientInto(client, instance)
Expand Down Expand Up @@ -152,7 +153,8 @@ var _ = Describe("runtime inject", func() {
})

It("should set api reader", func() {
apiReader := client.NewDelegatingClient(client.NewDelegatingClientInput{Client: fake.NewFakeClient()})
apiReader, err := client.NewDelegatingClient(client.NewDelegatingClientInput{Client: fake.NewFakeClient()})
Expect(err).NotTo(HaveOccurred())

By("Validating injecting client")
res, err := APIReaderInto(apiReader, instance)
Expand Down

0 comments on commit 6089977

Please sign in to comment.