Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OCPBUGS-17157: pkg/controller: use a metadata watch for CRDs #3014

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ ifeq (, $(wildcard $(KUBEBUILDER_ASSETS)/kube-apiserver))
endif

cover.out:
go test $(MOD_FLAGS) -tags "json1" -v -race -coverprofile=cover.out -covermode=atomic \
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not necessary but the DX of watching 10k lines of output scroll by on success makes it impossible to find test failures and should be an anti-pattern.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1000

go test $(MOD_FLAGS) -tags "json1" -race -coverprofile=cover.out -covermode=atomic \
-coverpkg ./pkg/controller/... ./pkg/...

coverage: cover.out
Expand Down
27 changes: 22 additions & 5 deletions pkg/controller/operators/catalog/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
apiextensionsv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
"k8s.io/apiextensions-apiserver/pkg/apiserver/validation"
extinf "k8s.io/apiextensions-apiserver/pkg/client/informers/externalversions"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
Expand All @@ -34,6 +33,9 @@ import (
"k8s.io/apimachinery/pkg/util/yaml"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/informers"
"k8s.io/client-go/metadata"
"k8s.io/client-go/metadata/metadatainformer"
"k8s.io/client-go/metadata/metadatalister"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/record"
Expand Down Expand Up @@ -144,6 +146,11 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
return nil, err
}

metadataClient, err := metadata.NewForConfig(config)
if err != nil {
return nil, err
}

// Create a new queueinformer-based operator.
opClient, err := operatorclient.NewClientFromRestConfig(config)
if err != nil {
Expand Down Expand Up @@ -443,13 +450,23 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
return nil, err
}

// Register CustomResourceDefinition QueueInformer
crdInformer := extinf.NewSharedInformerFactory(op.opClient.ApiextensionsInterface(), resyncPeriod()).Apiextensions().V1().CustomResourceDefinitions()
op.lister.APIExtensionsV1().RegisterCustomResourceDefinitionLister(crdInformer.Lister())
// Register CustomResourceDefinition QueueInformer. Object metadata requests are used
// by this informer in order to reduce cached size.
gvr := apiextensionsv1.SchemeGroupVersion.WithResource("customresourcedefinitions")
crdInformer := metadatainformer.NewFilteredMetadataInformer(
metadataClient,
gvr,
metav1.NamespaceAll,
resyncPeriod(),
cache.Indexers{},
nil,
).Informer()
crdLister := metadatalister.New(crdInformer.GetIndexer(), gvr)
op.lister.APIExtensionsV1().RegisterCustomResourceDefinitionLister(crdLister)
crdQueueInformer, err := queueinformer.NewQueueInformer(
ctx,
queueinformer.WithLogger(op.logger),
queueinformer.WithInformer(crdInformer.Informer()),
queueinformer.WithInformer(crdInformer),
queueinformer.WithSyncer(queueinformer.LegacySyncHandler(op.syncObject).ToSyncerWithDelete(op.handleDeletion)),
)
if err != nil {
Expand Down
22 changes: 16 additions & 6 deletions pkg/controller/operators/olm/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
extinf "k8s.io/apiextensions-apiserver/pkg/client/informers/externalversions"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -541,14 +540,25 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat
return nil, err
}

// Register CustomResourceDefinition QueueInformer
crdInformer := extinf.NewSharedInformerFactory(op.opClient.ApiextensionsInterface(), config.resyncPeriod()).Apiextensions().V1().CustomResourceDefinitions()
// Register CustomResourceDefinition QueueInformer. Object metadata requests are used
// by this informer in order to reduce cached size.
gvr := apiextensionsv1.SchemeGroupVersion.WithResource("customresourcedefinitions")
crdInformer := metadatainformer.NewFilteredMetadataInformer(
config.metadataClient,
gvr,
metav1.NamespaceAll,
config.resyncPeriod(),
cache.Indexers{},
nil,
).Informer()
crdLister := metadatalister.New(crdInformer.GetIndexer(), gvr)
informersByNamespace[metav1.NamespaceAll].CRDInformer = crdInformer
op.lister.APIExtensionsV1().RegisterCustomResourceDefinitionLister(crdInformer.Lister())
informersByNamespace[metav1.NamespaceAll].CRDLister = crdLister
op.lister.APIExtensionsV1().RegisterCustomResourceDefinitionLister(crdLister)
crdQueueInformer, err := queueinformer.NewQueueInformer(
ctx,
queueinformer.WithLogger(op.logger),
queueinformer.WithInformer(crdInformer.Informer()),
queueinformer.WithInformer(crdInformer),
queueinformer.WithSyncer(k8sSyncer),
)
if err != nil {
Expand Down Expand Up @@ -1183,7 +1193,7 @@ func (a *Operator) handleClusterServiceVersionDeletion(obj interface{}) {
}

for i, crdName := range desc.ConversionCRDs {
crd, err := a.lister.APIExtensionsV1().CustomResourceDefinitionLister().Get(crdName)
crd, err := a.opClient.ApiextensionsInterface().ApiextensionsV1().CustomResourceDefinitions().Get(context.TODO(), crdName, metav1.GetOptions{})
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the crux of the trade-off. In the rare case that a CSV is deleted, we incur server cost and latency to do a live GET for the data we need. Better than holding onto 20MiB of every CRD ever when we would only ever be concerned with a tiny subset, and with the spec of that tiny subset in an even smaller set of cases.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But it's an improvement...

if err != nil {
logger.Errorf("error getting CRD %v which was defined in CSVs spec.WebhookDefinition[%d]: %v\n", crdName, i, err)
continue
Expand Down
43 changes: 30 additions & 13 deletions pkg/controller/operators/olm/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,13 +298,14 @@ func NewFakeOperator(ctx context.Context, options ...fakeOperatorOption) (*Opera
*config.actionLog = append(*config.actionLog, action)
return false, nil, nil
}))
config.operatorClient = operatorclient.NewClient(k8sClientFake, apiextensionsfake.NewSimpleClientset(config.extObjs...), apiregistrationfake.NewSimpleClientset(config.regObjs...))
apiextensionsFake := apiextensionsfake.NewSimpleClientset(config.extObjs...)
config.operatorClient = operatorclient.NewClient(k8sClientFake, apiextensionsFake, apiregistrationfake.NewSimpleClientset(config.regObjs...))
config.configClient = configfake.NewSimpleClientset()
metadataFake := metadatafake.NewSimpleMetadataClient(scheme, config.partialMetadata...)
config.metadataClient = metadataFake
// It's a travesty that we need to do this, but the fakes leave us no other option. In the API server, of course
// changes to objects are transparently exposed in the metadata client. In fake-land, we need to enforce that ourselves.
externalFake.PrependReactor("*", "*", func(action clienttesting.Action) (bool, runtime.Object, error) {
propagate := func(action clienttesting.Action) (bool, runtime.Object, error) {
var err error
switch action.GetVerb() {
case "create":
Expand All @@ -320,7 +321,9 @@ func NewFakeOperator(ctx context.Context, options ...fakeOperatorOption) (*Opera
err = metadataFake.Resource(action.GetResource()).Delete(context.TODO(), a.GetName(), metav1.DeleteOptions{})
}
return false, nil, err
})
}
externalFake.PrependReactor("*", "*", propagate)
apiextensionsFake.PrependReactor("*", "*", propagate)

for _, ns := range config.namespaces {
_, err := config.operatorClient.KubernetesInterface().CoreV1().Namespaces().Create(context.TODO(), &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: ns}}, metav1.CreateOptions{})
Expand Down Expand Up @@ -4397,7 +4400,7 @@ func TestSyncOperatorGroups(t *testing.T) {
operatorGroup *operatorsv1.OperatorGroup
csvs []*v1alpha1.ClusterServiceVersion
clientObjs []runtime.Object
crds []runtime.Object
crds []*apiextensionsv1.CustomResourceDefinition
k8sObjs []runtime.Object
apis []runtime.Object
}
Expand Down Expand Up @@ -4474,7 +4477,7 @@ func TestSyncOperatorGroups(t *testing.T) {
role,
roleBinding,
},
crds: []runtime.Object{crd},
crds: []*apiextensionsv1.CustomResourceDefinition{crd},
},
expectedStatus: operatorsv1.OperatorGroupStatus{},
final: final{objects: map[string][]runtime.Object{
Expand Down Expand Up @@ -4553,7 +4556,7 @@ func TestSyncOperatorGroups(t *testing.T) {
role,
roleBinding,
},
crds: []runtime.Object{crd},
crds: []*apiextensionsv1.CustomResourceDefinition{crd},
},
expectedStatus: operatorsv1.OperatorGroupStatus{
Namespaces: []string{operatorNamespace, targetNamespace},
Expand Down Expand Up @@ -4656,7 +4659,7 @@ func TestSyncOperatorGroups(t *testing.T) {
role,
roleBinding,
},
crds: []runtime.Object{crd},
crds: []*apiextensionsv1.CustomResourceDefinition{crd},
},
expectedStatus: operatorsv1.OperatorGroupStatus{
Namespaces: []string{operatorNamespace, targetNamespace},
Expand Down Expand Up @@ -4762,7 +4765,7 @@ func TestSyncOperatorGroups(t *testing.T) {
role,
roleBinding,
},
crds: []runtime.Object{crd},
crds: []*apiextensionsv1.CustomResourceDefinition{crd},
},
expectedStatus: operatorsv1.OperatorGroupStatus{
Namespaces: []string{corev1.NamespaceAll},
Expand Down Expand Up @@ -4925,7 +4928,7 @@ func TestSyncOperatorGroups(t *testing.T) {
role,
roleBinding,
},
crds: []runtime.Object{crd},
crds: []*apiextensionsv1.CustomResourceDefinition{crd},
},
expectedStatus: operatorsv1.OperatorGroupStatus{
Namespaces: []string{corev1.NamespaceAll},
Expand Down Expand Up @@ -4982,7 +4985,7 @@ func TestSyncOperatorGroups(t *testing.T) {
operatorGroup = tt.initial.operatorGroup.DeepCopy()
clientObjs = copyObjs(append(tt.initial.clientObjs, operatorGroup))
k8sObjs = copyObjs(tt.initial.k8sObjs)
extObjs = copyObjs(tt.initial.crds)
extObjs []runtime.Object
regObjs = copyObjs(tt.initial.apis)
)

Expand All @@ -4992,11 +4995,25 @@ func TestSyncOperatorGroups(t *testing.T) {

var partials []runtime.Object
for _, csv := range tt.initial.csvs {
clientObjs = append(clientObjs, csv)
clientObjs = append(clientObjs, csv.DeepCopy())
partials = append(partials, &metav1.PartialObjectMetadata{
TypeMeta: metav1.TypeMeta{
Kind: "ClusterServiceVersion",
APIVersion: v1alpha1.SchemeGroupVersion.String(),
},
ObjectMeta: csv.ObjectMeta,
})
}
for _, crd := range tt.initial.crds {
extObjs = append(extObjs, crd.DeepCopy())
partials = append(partials, &metav1.PartialObjectMetadata{
TypeMeta: metav1.TypeMeta{
Kind: "CustomResourceDefinition",
APIVersion: apiextensionsv1.SchemeGroupVersion.String(),
},
ObjectMeta: crd.ObjectMeta,
})
}
l := logrus.New()
l.SetLevel(logrus.DebugLevel)
l = l.WithField("test", tt.name).Logger
Expand Down Expand Up @@ -5094,12 +5111,12 @@ func TestSyncOperatorGroups(t *testing.T) {

t.Log("op.syncClusterServiceVersion")
if err := op.syncClusterServiceVersion(&csv); err != nil {
return false, err
return false, fmt.Errorf("failed to syncClusterServiceVersion: %w", err)
}

t.Log("op.syncCopyCSV")
if err := op.syncCopyCSV(&csv); err != nil && !tt.ignoreCopyError {
return false, err
return false, fmt.Errorf("failed to syncCopyCSV: %w", err)
}
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/controller/operators/olm/operatorgroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -422,6 +423,7 @@ func (a *Operator) ensureClusterRolesForCSV(csv *v1alpha1.ClusterServiceVersion)
if err != nil {
return fmt.Errorf("crd %q not found: %s", owned.Name, err.Error())
}
crd.SetGroupVersionKind(apiextensionsv1.SchemeGroupVersion.WithKind("customresourcedefinition"))
nameGroupPair := strings.SplitN(owned.Name, ".", 2) // -> etcdclusters etcd.database.coreos.com
if len(nameGroupPair) != 2 {
return fmt.Errorf("invalid parsing of name '%v', got %v", owned.Name, nameGroupPair)
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/operators/olm/plugins/operator_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/operatorclient"
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/queueinformer"
"github.com/sirupsen/logrus"
extensionsv1informers "k8s.io/apiextensions-apiserver/pkg/client/informers/externalversions/apiextensions/v1"
appsv1informers "k8s.io/client-go/informers/apps/v1"
corev1informers "k8s.io/client-go/informers/core/v1"
rbacv1informers "k8s.io/client-go/informers/rbac/v1"
Expand Down Expand Up @@ -46,7 +45,8 @@ type Informers struct {
ClusterRoleBindingInformer rbacv1informers.ClusterRoleBindingInformer
NamespaceInformer corev1informers.NamespaceInformer
APIServiceInformer apiregistrationv1informers.APIServiceInformer
CRDInformer extensionsv1informers.CustomResourceDefinitionInformer
CRDInformer cache.SharedIndexInformer
CRDLister metadatalister.Lister
}

// OperatorConfig gives access to required configuration from the host operator
Expand Down
3 changes: 2 additions & 1 deletion pkg/controller/operators/olm/requirements.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package olm

import (
"context"
"encoding/json"
"fmt"
"strings"
Expand Down Expand Up @@ -94,7 +95,7 @@ func (a *Operator) requirementStatus(strategyDetailsDeployment *v1alpha1.Strateg
}

// check if CRD exists - this verifies group, version, and kind, so no need for GVK check via discovery
crd, err := a.lister.APIExtensionsV1().CustomResourceDefinitionLister().Get(r.Name)
crd, err := a.opClient.ApiextensionsInterface().ApiextensionsV1().CustomResourceDefinitions().Get(context.TODO(), r.Name, metav1.GetOptions{})
if err != nil {
status.Status = v1alpha1.RequirementStatusReasonNotPresent
status.Message = "CRD is not present"
Expand Down
28 changes: 19 additions & 9 deletions pkg/lib/operatorlister/customresourcedefinition.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,30 @@ import (
"fmt"
"sync"

apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
aextv1 "k8s.io/apiextensions-apiserver/pkg/client/listers/apiextensions/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/metadata/metadatalister"
)

// UnionCustomResourceDefinitionLister is a custom implementation of an CustomResourceDefinition lister that allows a new
// Lister to be registered on the fly. This Lister lists both v1 and v1beta1 APIVersion (at the newer version) CRDs.
// Lister to be registered on the fly.
type UnionCustomResourceDefinitionLister struct {
CustomResourceDefinitionLister aextv1.CustomResourceDefinitionLister
CustomResourceDefinitionLister metadatalister.Lister
CustomResourceDefinitionLock sync.RWMutex
}

func (ucl *UnionCustomResourceDefinitionLister) Namespace(namespace string) metadatalister.NamespaceLister {
ucl.CustomResourceDefinitionLock.RLock()
defer ucl.CustomResourceDefinitionLock.RUnlock()

if ucl.CustomResourceDefinitionLister == nil {
panic(fmt.Errorf("no CustomResourceDefinition lister registered"))
}
return ucl.CustomResourceDefinitionLister.Namespace(namespace)
}

// List lists all CustomResourceDefinitions in the indexer.
func (ucl *UnionCustomResourceDefinitionLister) List(selector labels.Selector) (ret []*apiextensionsv1.CustomResourceDefinition, err error) {
func (ucl *UnionCustomResourceDefinitionLister) List(selector labels.Selector) (ret []*metav1.PartialObjectMetadata, err error) {
ucl.CustomResourceDefinitionLock.RLock()
defer ucl.CustomResourceDefinitionLock.RUnlock()

Expand All @@ -28,7 +38,7 @@ func (ucl *UnionCustomResourceDefinitionLister) List(selector labels.Selector) (
}

// Get retrieves the CustomResourceDefinition with the given name
func (ucl *UnionCustomResourceDefinitionLister) Get(name string) (*apiextensionsv1.CustomResourceDefinition, error) {
func (ucl *UnionCustomResourceDefinitionLister) Get(name string) (*metav1.PartialObjectMetadata, error) {
ucl.CustomResourceDefinitionLock.RLock()
defer ucl.CustomResourceDefinitionLock.RUnlock()

Expand All @@ -39,17 +49,17 @@ func (ucl *UnionCustomResourceDefinitionLister) Get(name string) (*apiextensions
}

// RegisterCustomResourceDefinitionLister registers a new CustomResourceDefinitionLister
func (ucl *UnionCustomResourceDefinitionLister) RegisterCustomResourceDefinitionLister(lister aextv1.CustomResourceDefinitionLister) {
func (ucl *UnionCustomResourceDefinitionLister) RegisterCustomResourceDefinitionLister(lister metadatalister.Lister) {
ucl.CustomResourceDefinitionLock.Lock()
defer ucl.CustomResourceDefinitionLock.Unlock()

ucl.CustomResourceDefinitionLister = lister
}

func (l *apiExtensionsV1Lister) RegisterCustomResourceDefinitionLister(lister aextv1.CustomResourceDefinitionLister) {
func (l *apiExtensionsV1Lister) RegisterCustomResourceDefinitionLister(lister metadatalister.Lister) {
l.customResourceDefinitionLister.RegisterCustomResourceDefinitionLister(lister)
}

func (l *apiExtensionsV1Lister) CustomResourceDefinitionLister() aextv1.CustomResourceDefinitionLister {
func (l *apiExtensionsV1Lister) CustomResourceDefinitionLister() metadatalister.Lister {
return l.customResourceDefinitionLister
}
6 changes: 3 additions & 3 deletions pkg/lib/operatorlister/lister.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package operatorlister

import (
aextv1 "k8s.io/apiextensions-apiserver/pkg/client/listers/apiextensions/v1"
appsv1 "k8s.io/client-go/listers/apps/v1"
corev1 "k8s.io/client-go/listers/core/v1"
rbacv1 "k8s.io/client-go/listers/rbac/v1"
"k8s.io/client-go/metadata/metadatalister"
aregv1 "k8s.io/kube-aggregator/pkg/client/listers/apiregistration/v1"

v1 "github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/listers/operators/v1"
Expand Down Expand Up @@ -88,8 +88,8 @@ type APIRegistrationV1Lister interface {

//go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 . APIExtensionsV1Lister
type APIExtensionsV1Lister interface {
RegisterCustomResourceDefinitionLister(lister aextv1.CustomResourceDefinitionLister)
CustomResourceDefinitionLister() aextv1.CustomResourceDefinitionLister
RegisterCustomResourceDefinitionLister(lister metadatalister.Lister)
CustomResourceDefinitionLister() metadatalister.Lister
}

//go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 . OperatorsV1alpha1Lister
Expand Down