Skip to content

Commit

Permalink
pkg/controller: use a metadata watch for CRDs
Browse files Browse the repository at this point in the history
Using a full LIST+WATCH is an optimization, with trade-offs. Holding the
state of the world for CRDs in memory when we rarely, if ever, actually
need to access them is a bad use of that trade-off, especially when the
sum total size of CRDs on even the most basic cluster is O(20MiB).

Signed-off-by: Steve Kuznetsov <skuznets@redhat.com>
  • Loading branch information
stevekuznetsov committed Aug 21, 2023
1 parent a827c02 commit 0886a7a
Show file tree
Hide file tree
Showing 24 changed files with 115 additions and 927 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ ifeq (, $(wildcard $(KUBEBUILDER_ASSETS)/kube-apiserver))
endif

cover.out:
go test $(MOD_FLAGS) -tags "json1" -v -race -coverprofile=cover.out -covermode=atomic \
go test $(MOD_FLAGS) -tags "json1" -race -coverprofile=cover.out -covermode=atomic \
-coverpkg ./pkg/controller/... ./pkg/...

coverage: cover.out
Expand Down
27 changes: 22 additions & 5 deletions pkg/controller/operators/catalog/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
apiextensionsv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
"k8s.io/apiextensions-apiserver/pkg/apiserver/validation"
extinf "k8s.io/apiextensions-apiserver/pkg/client/informers/externalversions"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
Expand All @@ -34,6 +33,9 @@ import (
"k8s.io/apimachinery/pkg/util/yaml"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/informers"
"k8s.io/client-go/metadata"
"k8s.io/client-go/metadata/metadatainformer"
"k8s.io/client-go/metadata/metadatalister"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/record"
Expand Down Expand Up @@ -144,6 +146,11 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
return nil, err
}

metadataClient, err := metadata.NewForConfig(config)
if err != nil {
return nil, err
}

// Create a new queueinformer-based operator.
opClient, err := operatorclient.NewClientFromRestConfig(config)
if err != nil {
Expand Down Expand Up @@ -443,13 +450,23 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
return nil, err
}

// Register CustomResourceDefinition QueueInformer
crdInformer := extinf.NewSharedInformerFactory(op.opClient.ApiextensionsInterface(), resyncPeriod()).Apiextensions().V1().CustomResourceDefinitions()
op.lister.APIExtensionsV1().RegisterCustomResourceDefinitionLister(crdInformer.Lister())
// Register CustomResourceDefinition QueueInformer. Object metadata requests are used
// by this informer in order to reduce cached size.
gvr := apiextensionsv1.SchemeGroupVersion.WithResource("customresourcedefinitions")
crdInformer := metadatainformer.NewFilteredMetadataInformer(
metadataClient,
gvr,
metav1.NamespaceAll,
resyncPeriod(),
cache.Indexers{},
nil,
).Informer()
crdLister := metadatalister.New(crdInformer.GetIndexer(), gvr)
op.lister.APIExtensionsV1().RegisterCustomResourceDefinitionLister(crdLister)
crdQueueInformer, err := queueinformer.NewQueueInformer(
ctx,
queueinformer.WithLogger(op.logger),
queueinformer.WithInformer(crdInformer.Informer()),
queueinformer.WithInformer(crdInformer),
queueinformer.WithSyncer(queueinformer.LegacySyncHandler(op.syncObject).ToSyncerWithDelete(op.handleDeletion)),
)
if err != nil {
Expand Down
22 changes: 16 additions & 6 deletions pkg/controller/operators/olm/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
extinf "k8s.io/apiextensions-apiserver/pkg/client/informers/externalversions"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -541,14 +540,25 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat
return nil, err
}

// Register CustomResourceDefinition QueueInformer
crdInformer := extinf.NewSharedInformerFactory(op.opClient.ApiextensionsInterface(), config.resyncPeriod()).Apiextensions().V1().CustomResourceDefinitions()
// Register CustomResourceDefinition QueueInformer. Object metadata requests are used
// by this informer in order to reduce cached size.
gvr := apiextensionsv1.SchemeGroupVersion.WithResource("customresourcedefinitions")
crdInformer := metadatainformer.NewFilteredMetadataInformer(
config.metadataClient,
gvr,
metav1.NamespaceAll,
config.resyncPeriod(),
cache.Indexers{},
nil,
).Informer()
crdLister := metadatalister.New(crdInformer.GetIndexer(), gvr)
informersByNamespace[metav1.NamespaceAll].CRDInformer = crdInformer
op.lister.APIExtensionsV1().RegisterCustomResourceDefinitionLister(crdInformer.Lister())
informersByNamespace[metav1.NamespaceAll].CRDLister = crdLister
op.lister.APIExtensionsV1().RegisterCustomResourceDefinitionLister(crdLister)
crdQueueInformer, err := queueinformer.NewQueueInformer(
ctx,
queueinformer.WithLogger(op.logger),
queueinformer.WithInformer(crdInformer.Informer()),
queueinformer.WithInformer(crdInformer),
queueinformer.WithSyncer(k8sSyncer),
)
if err != nil {
Expand Down Expand Up @@ -1183,7 +1193,7 @@ func (a *Operator) handleClusterServiceVersionDeletion(obj interface{}) {
}

for i, crdName := range desc.ConversionCRDs {
crd, err := a.lister.APIExtensionsV1().CustomResourceDefinitionLister().Get(crdName)
crd, err := a.opClient.ApiextensionsInterface().ApiextensionsV1().CustomResourceDefinitions().Get(context.TODO(), crdName, metav1.GetOptions{})
if err != nil {
logger.Errorf("error getting CRD %v which was defined in CSVs spec.WebhookDefinition[%d]: %v\n", crdName, i, err)
continue
Expand Down
43 changes: 30 additions & 13 deletions pkg/controller/operators/olm/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,13 +298,14 @@ func NewFakeOperator(ctx context.Context, options ...fakeOperatorOption) (*Opera
*config.actionLog = append(*config.actionLog, action)
return false, nil, nil
}))
config.operatorClient = operatorclient.NewClient(k8sClientFake, apiextensionsfake.NewSimpleClientset(config.extObjs...), apiregistrationfake.NewSimpleClientset(config.regObjs...))
apiextensionsFake := apiextensionsfake.NewSimpleClientset(config.extObjs...)
config.operatorClient = operatorclient.NewClient(k8sClientFake, apiextensionsFake, apiregistrationfake.NewSimpleClientset(config.regObjs...))
config.configClient = configfake.NewSimpleClientset()
metadataFake := metadatafake.NewSimpleMetadataClient(scheme, config.partialMetadata...)
config.metadataClient = metadataFake
// It's a travesty that we need to do this, but the fakes leave us no other option. In the API server, of course
// changes to objects are transparently exposed in the metadata client. In fake-land, we need to enforce that ourselves.
externalFake.PrependReactor("*", "*", func(action clienttesting.Action) (bool, runtime.Object, error) {
propagate := func(action clienttesting.Action) (bool, runtime.Object, error) {
var err error
switch action.GetVerb() {
case "create":
Expand All @@ -320,7 +321,9 @@ func NewFakeOperator(ctx context.Context, options ...fakeOperatorOption) (*Opera
err = metadataFake.Resource(action.GetResource()).Delete(context.TODO(), a.GetName(), metav1.DeleteOptions{})
}
return false, nil, err
})
}
externalFake.PrependReactor("*", "*", propagate)
apiextensionsFake.PrependReactor("*", "*", propagate)

for _, ns := range config.namespaces {
_, err := config.operatorClient.KubernetesInterface().CoreV1().Namespaces().Create(context.TODO(), &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: ns}}, metav1.CreateOptions{})
Expand Down Expand Up @@ -4397,7 +4400,7 @@ func TestSyncOperatorGroups(t *testing.T) {
operatorGroup *operatorsv1.OperatorGroup
csvs []*v1alpha1.ClusterServiceVersion
clientObjs []runtime.Object
crds []runtime.Object
crds []*apiextensionsv1.CustomResourceDefinition
k8sObjs []runtime.Object
apis []runtime.Object
}
Expand Down Expand Up @@ -4474,7 +4477,7 @@ func TestSyncOperatorGroups(t *testing.T) {
role,
roleBinding,
},
crds: []runtime.Object{crd},
crds: []*apiextensionsv1.CustomResourceDefinition{crd},
},
expectedStatus: operatorsv1.OperatorGroupStatus{},
final: final{objects: map[string][]runtime.Object{
Expand Down Expand Up @@ -4553,7 +4556,7 @@ func TestSyncOperatorGroups(t *testing.T) {
role,
roleBinding,
},
crds: []runtime.Object{crd},
crds: []*apiextensionsv1.CustomResourceDefinition{crd},
},
expectedStatus: operatorsv1.OperatorGroupStatus{
Namespaces: []string{operatorNamespace, targetNamespace},
Expand Down Expand Up @@ -4656,7 +4659,7 @@ func TestSyncOperatorGroups(t *testing.T) {
role,
roleBinding,
},
crds: []runtime.Object{crd},
crds: []*apiextensionsv1.CustomResourceDefinition{crd},
},
expectedStatus: operatorsv1.OperatorGroupStatus{
Namespaces: []string{operatorNamespace, targetNamespace},
Expand Down Expand Up @@ -4762,7 +4765,7 @@ func TestSyncOperatorGroups(t *testing.T) {
role,
roleBinding,
},
crds: []runtime.Object{crd},
crds: []*apiextensionsv1.CustomResourceDefinition{crd},
},
expectedStatus: operatorsv1.OperatorGroupStatus{
Namespaces: []string{corev1.NamespaceAll},
Expand Down Expand Up @@ -4925,7 +4928,7 @@ func TestSyncOperatorGroups(t *testing.T) {
role,
roleBinding,
},
crds: []runtime.Object{crd},
crds: []*apiextensionsv1.CustomResourceDefinition{crd},
},
expectedStatus: operatorsv1.OperatorGroupStatus{
Namespaces: []string{corev1.NamespaceAll},
Expand Down Expand Up @@ -4982,7 +4985,7 @@ func TestSyncOperatorGroups(t *testing.T) {
operatorGroup = tt.initial.operatorGroup.DeepCopy()
clientObjs = copyObjs(append(tt.initial.clientObjs, operatorGroup))
k8sObjs = copyObjs(tt.initial.k8sObjs)
extObjs = copyObjs(tt.initial.crds)
extObjs []runtime.Object
regObjs = copyObjs(tt.initial.apis)
)

Expand All @@ -4992,11 +4995,25 @@ func TestSyncOperatorGroups(t *testing.T) {

var partials []runtime.Object
for _, csv := range tt.initial.csvs {
clientObjs = append(clientObjs, csv)
clientObjs = append(clientObjs, csv.DeepCopy())
partials = append(partials, &metav1.PartialObjectMetadata{
TypeMeta: metav1.TypeMeta{
Kind: "ClusterServiceVersion",
APIVersion: v1alpha1.SchemeGroupVersion.String(),
},
ObjectMeta: csv.ObjectMeta,
})
}
for _, crd := range tt.initial.crds {
extObjs = append(extObjs, crd.DeepCopy())
partials = append(partials, &metav1.PartialObjectMetadata{
TypeMeta: metav1.TypeMeta{
Kind: "CustomResourceDefinition",
APIVersion: apiextensionsv1.SchemeGroupVersion.String(),
},
ObjectMeta: crd.ObjectMeta,
})
}
l := logrus.New()
l.SetLevel(logrus.DebugLevel)
l = l.WithField("test", tt.name).Logger
Expand Down Expand Up @@ -5094,12 +5111,12 @@ func TestSyncOperatorGroups(t *testing.T) {

t.Log("op.syncClusterServiceVersion")
if err := op.syncClusterServiceVersion(&csv); err != nil {
return false, err
return false, fmt.Errorf("failed to syncClusterServiceVersion: %w", err)
}

t.Log("op.syncCopyCSV")
if err := op.syncCopyCSV(&csv); err != nil && !tt.ignoreCopyError {
return false, err
return false, fmt.Errorf("failed to syncCopyCSV: %w", err)
}
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/controller/operators/olm/operatorgroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -422,6 +423,7 @@ func (a *Operator) ensureClusterRolesForCSV(csv *v1alpha1.ClusterServiceVersion)
if err != nil {
return fmt.Errorf("crd %q not found: %s", owned.Name, err.Error())
}
crd.SetGroupVersionKind(apiextensionsv1.SchemeGroupVersion.WithKind("customresourcedefinition"))
nameGroupPair := strings.SplitN(owned.Name, ".", 2) // -> etcdclusters etcd.database.coreos.com
if len(nameGroupPair) != 2 {
return fmt.Errorf("invalid parsing of name '%v', got %v", owned.Name, nameGroupPair)
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/operators/olm/plugins/operator_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/operatorclient"
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/queueinformer"
"github.com/sirupsen/logrus"
extensionsv1informers "k8s.io/apiextensions-apiserver/pkg/client/informers/externalversions/apiextensions/v1"
appsv1informers "k8s.io/client-go/informers/apps/v1"
corev1informers "k8s.io/client-go/informers/core/v1"
rbacv1informers "k8s.io/client-go/informers/rbac/v1"
Expand Down Expand Up @@ -46,7 +45,8 @@ type Informers struct {
ClusterRoleBindingInformer rbacv1informers.ClusterRoleBindingInformer
NamespaceInformer corev1informers.NamespaceInformer
APIServiceInformer apiregistrationv1informers.APIServiceInformer
CRDInformer extensionsv1informers.CustomResourceDefinitionInformer
CRDInformer cache.SharedIndexInformer
CRDLister metadatalister.Lister
}

// OperatorConfig gives access to required configuration from the host operator
Expand Down
3 changes: 2 additions & 1 deletion pkg/controller/operators/olm/requirements.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package olm

import (
"context"
"encoding/json"
"fmt"
"strings"
Expand Down Expand Up @@ -94,7 +95,7 @@ func (a *Operator) requirementStatus(strategyDetailsDeployment *v1alpha1.Strateg
}

// check if CRD exists - this verifies group, version, and kind, so no need for GVK check via discovery
crd, err := a.lister.APIExtensionsV1().CustomResourceDefinitionLister().Get(r.Name)
crd, err := a.opClient.ApiextensionsInterface().ApiextensionsV1().CustomResourceDefinitions().Get(context.TODO(), r.Name, metav1.GetOptions{})
if err != nil {
status.Status = v1alpha1.RequirementStatusReasonNotPresent
status.Message = "CRD is not present"
Expand Down
28 changes: 19 additions & 9 deletions pkg/lib/operatorlister/customresourcedefinition.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,30 @@ import (
"fmt"
"sync"

apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
aextv1 "k8s.io/apiextensions-apiserver/pkg/client/listers/apiextensions/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/metadata/metadatalister"
)

// UnionCustomResourceDefinitionLister is a custom implementation of an CustomResourceDefinition lister that allows a new
// Lister to be registered on the fly. This Lister lists both v1 and v1beta1 APIVersion (at the newer version) CRDs.
// Lister to be registered on the fly.
type UnionCustomResourceDefinitionLister struct {
CustomResourceDefinitionLister aextv1.CustomResourceDefinitionLister
CustomResourceDefinitionLister metadatalister.Lister
CustomResourceDefinitionLock sync.RWMutex
}

func (ucl *UnionCustomResourceDefinitionLister) Namespace(namespace string) metadatalister.NamespaceLister {
ucl.CustomResourceDefinitionLock.RLock()
defer ucl.CustomResourceDefinitionLock.RUnlock()

if ucl.CustomResourceDefinitionLister == nil {
panic(fmt.Errorf("no CustomResourceDefinition lister registered"))
}
return ucl.CustomResourceDefinitionLister.Namespace(namespace)
}

// List lists all CustomResourceDefinitions in the indexer.
func (ucl *UnionCustomResourceDefinitionLister) List(selector labels.Selector) (ret []*apiextensionsv1.CustomResourceDefinition, err error) {
func (ucl *UnionCustomResourceDefinitionLister) List(selector labels.Selector) (ret []*metav1.PartialObjectMetadata, err error) {
ucl.CustomResourceDefinitionLock.RLock()
defer ucl.CustomResourceDefinitionLock.RUnlock()

Expand All @@ -28,7 +38,7 @@ func (ucl *UnionCustomResourceDefinitionLister) List(selector labels.Selector) (
}

// Get retrieves the CustomResourceDefinition with the given name
func (ucl *UnionCustomResourceDefinitionLister) Get(name string) (*apiextensionsv1.CustomResourceDefinition, error) {
func (ucl *UnionCustomResourceDefinitionLister) Get(name string) (*metav1.PartialObjectMetadata, error) {
ucl.CustomResourceDefinitionLock.RLock()
defer ucl.CustomResourceDefinitionLock.RUnlock()

Expand All @@ -39,17 +49,17 @@ func (ucl *UnionCustomResourceDefinitionLister) Get(name string) (*apiextensions
}

// RegisterCustomResourceDefinitionLister registers a new CustomResourceDefinitionLister
func (ucl *UnionCustomResourceDefinitionLister) RegisterCustomResourceDefinitionLister(lister aextv1.CustomResourceDefinitionLister) {
func (ucl *UnionCustomResourceDefinitionLister) RegisterCustomResourceDefinitionLister(lister metadatalister.Lister) {
ucl.CustomResourceDefinitionLock.Lock()
defer ucl.CustomResourceDefinitionLock.Unlock()

ucl.CustomResourceDefinitionLister = lister
}

func (l *apiExtensionsV1Lister) RegisterCustomResourceDefinitionLister(lister aextv1.CustomResourceDefinitionLister) {
func (l *apiExtensionsV1Lister) RegisterCustomResourceDefinitionLister(lister metadatalister.Lister) {
l.customResourceDefinitionLister.RegisterCustomResourceDefinitionLister(lister)
}

func (l *apiExtensionsV1Lister) CustomResourceDefinitionLister() aextv1.CustomResourceDefinitionLister {
func (l *apiExtensionsV1Lister) CustomResourceDefinitionLister() metadatalister.Lister {
return l.customResourceDefinitionLister
}
6 changes: 3 additions & 3 deletions pkg/lib/operatorlister/lister.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package operatorlister

import (
aextv1 "k8s.io/apiextensions-apiserver/pkg/client/listers/apiextensions/v1"
appsv1 "k8s.io/client-go/listers/apps/v1"
corev1 "k8s.io/client-go/listers/core/v1"
rbacv1 "k8s.io/client-go/listers/rbac/v1"
"k8s.io/client-go/metadata/metadatalister"
aregv1 "k8s.io/kube-aggregator/pkg/client/listers/apiregistration/v1"

v1 "github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/listers/operators/v1"
Expand Down Expand Up @@ -88,8 +88,8 @@ type APIRegistrationV1Lister interface {

//go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 . APIExtensionsV1Lister
type APIExtensionsV1Lister interface {
RegisterCustomResourceDefinitionLister(lister aextv1.CustomResourceDefinitionLister)
CustomResourceDefinitionLister() aextv1.CustomResourceDefinitionLister
RegisterCustomResourceDefinitionLister(lister metadatalister.Lister)
CustomResourceDefinitionLister() metadatalister.Lister
}

//go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 . OperatorsV1alpha1Lister
Expand Down

0 comments on commit 0886a7a

Please sign in to comment.