Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OCPBUGS-17157: *: filter informers when preconditions are met #3021

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ require (
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.8.3
golang.org/x/net v0.10.0
golang.org/x/sync v0.2.0
golang.org/x/time v0.3.0
google.golang.org/grpc v1.54.0
gopkg.in/yaml.v2 v2.4.0
Expand Down Expand Up @@ -208,7 +209,6 @@ require (
golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 // indirect
golang.org/x/mod v0.10.0 // indirect
golang.org/x/oauth2 v0.5.0 // indirect
golang.org/x/sync v0.2.0 // indirect
golang.org/x/sys v0.8.0 // indirect
golang.org/x/term v0.8.0 // indirect
golang.org/x/text v0.9.0 // indirect
Expand Down
78 changes: 39 additions & 39 deletions pkg/controller/operators/catalog/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"sync"
"time"

"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/internal/alongside"
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/labeller"
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/validatingroundtripper"
errorwrap "github.com/pkg/errors"
Expand Down Expand Up @@ -187,6 +186,11 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
return nil, err
}

canFilter, err := labeller.Validate(ctx, logger, metadataClient)
if err != nil {
return nil, err
}

// Allocate the new instance of an Operator.
op := &Operator{
Operator: queueOperator,
Expand Down Expand Up @@ -363,7 +367,14 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
}

// Wire k8s sharedIndexInformers
k8sInformerFactory := informers.NewSharedInformerFactoryWithOptions(op.opClient.KubernetesInterface(), resyncPeriod())
k8sInformerFactory := informers.NewSharedInformerFactoryWithOptions(op.opClient.KubernetesInterface(), resyncPeriod(), func() []informers.SharedInformerOption {
if !canFilter {
return nil
}
return []informers.SharedInformerOption{informers.WithTweakListOptions(func(options *metav1.ListOptions) {
options.LabelSelector = labels.SelectorFromSet(labels.Set{install.OLMManagedLabelKey: install.OLMManagedLabelValue}).String()
})}
}()...)
sharedIndexInformers := []cache.SharedIndexInformer{}

// Wire Roles
Expand All @@ -372,6 +383,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
sharedIndexInformers = append(sharedIndexInformers, roleInformer.Informer())

labelObjects := func(gvr schema.GroupVersionResource, informer cache.SharedIndexInformer, sync queueinformer.LegacySyncHandler) error {
if canFilter {
return nil
}
op.k8sLabelQueueSets[gvr] = workqueue.NewRateLimitingQueueWithConfig(workqueue.DefaultControllerRateLimiter(), workqueue.RateLimitingQueueConfig{
Name: gvr.String(),
})
Expand All @@ -392,8 +406,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
return nil
}

if err := labelObjects(rbacv1.SchemeGroupVersion.WithResource("roles"), roleInformer.Informer(), labeller.ObjectLabeler[*rbacv1.Role, *rbacv1applyconfigurations.RoleApplyConfiguration](
ctx, op.logger, labeller.HasOLMOwnerRef,
rolesgvk := rbacv1.SchemeGroupVersion.WithResource("roles")
if err := labelObjects(rolesgvk, roleInformer.Informer(), labeller.ObjectLabeler[*rbacv1.Role, *rbacv1applyconfigurations.RoleApplyConfiguration](
ctx, op.logger, labeller.Filter(rolesgvk),
rbacv1applyconfigurations.Role,
func(namespace string, ctx context.Context, cfg *rbacv1applyconfigurations.RoleApplyConfiguration, opts metav1.ApplyOptions) (*rbacv1.Role, error) {
return op.opClient.KubernetesInterface().RbacV1().Roles(namespace).Apply(ctx, cfg, opts)
Expand All @@ -407,8 +422,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
op.lister.RbacV1().RegisterRoleBindingLister(metav1.NamespaceAll, roleBindingInformer.Lister())
sharedIndexInformers = append(sharedIndexInformers, roleBindingInformer.Informer())

if err := labelObjects(rbacv1.SchemeGroupVersion.WithResource("rolebindings"), roleBindingInformer.Informer(), labeller.ObjectLabeler[*rbacv1.RoleBinding, *rbacv1applyconfigurations.RoleBindingApplyConfiguration](
ctx, op.logger, labeller.HasOLMOwnerRef,
rolebindingsgvk := rbacv1.SchemeGroupVersion.WithResource("rolebindings")
if err := labelObjects(rolebindingsgvk, roleBindingInformer.Informer(), labeller.ObjectLabeler[*rbacv1.RoleBinding, *rbacv1applyconfigurations.RoleBindingApplyConfiguration](
ctx, op.logger, labeller.Filter(rolebindingsgvk),
rbacv1applyconfigurations.RoleBinding,
func(namespace string, ctx context.Context, cfg *rbacv1applyconfigurations.RoleBindingApplyConfiguration, opts metav1.ApplyOptions) (*rbacv1.RoleBinding, error) {
return op.opClient.KubernetesInterface().RbacV1().RoleBindings(namespace).Apply(ctx, cfg, opts)
Expand All @@ -422,10 +438,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
op.lister.CoreV1().RegisterServiceAccountLister(metav1.NamespaceAll, serviceAccountInformer.Lister())
sharedIndexInformers = append(sharedIndexInformers, serviceAccountInformer.Informer())

if err := labelObjects(corev1.SchemeGroupVersion.WithResource("serviceaccounts"), serviceAccountInformer.Informer(), labeller.ObjectLabeler[*corev1.ServiceAccount, *corev1applyconfigurations.ServiceAccountApplyConfiguration](
ctx, op.logger, func(object metav1.Object) bool {
return labeller.HasOLMOwnerRef(object) || labeller.HasOLMLabel(object)
},
serviceaccountsgvk := corev1.SchemeGroupVersion.WithResource("serviceaccounts")
if err := labelObjects(serviceaccountsgvk, serviceAccountInformer.Informer(), labeller.ObjectLabeler[*corev1.ServiceAccount, *corev1applyconfigurations.ServiceAccountApplyConfiguration](
ctx, op.logger, labeller.Filter(serviceaccountsgvk),
corev1applyconfigurations.ServiceAccount,
func(namespace string, ctx context.Context, cfg *corev1applyconfigurations.ServiceAccountApplyConfiguration, opts metav1.ApplyOptions) (*corev1.ServiceAccount, error) {
return op.opClient.KubernetesInterface().CoreV1().ServiceAccounts(namespace).Apply(ctx, cfg, opts)
Expand All @@ -439,8 +454,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
op.lister.CoreV1().RegisterServiceLister(metav1.NamespaceAll, serviceInformer.Lister())
sharedIndexInformers = append(sharedIndexInformers, serviceInformer.Informer())

if err := labelObjects(corev1.SchemeGroupVersion.WithResource("services"), serviceInformer.Informer(), labeller.ObjectLabeler[*corev1.Service, *corev1applyconfigurations.ServiceApplyConfiguration](
ctx, op.logger, labeller.HasOLMOwnerRef,
servicesgvk := corev1.SchemeGroupVersion.WithResource("services")
if err := labelObjects(servicesgvk, serviceInformer.Informer(), labeller.ObjectLabeler[*corev1.Service, *corev1applyconfigurations.ServiceApplyConfiguration](
ctx, op.logger, labeller.Filter(servicesgvk),
corev1applyconfigurations.Service,
func(namespace string, ctx context.Context, cfg *corev1applyconfigurations.ServiceApplyConfiguration, opts metav1.ApplyOptions) (*corev1.Service, error) {
return op.opClient.KubernetesInterface().CoreV1().Services(namespace).Apply(ctx, cfg, opts)
Expand All @@ -463,11 +479,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
op.lister.CoreV1().RegisterPodLister(metav1.NamespaceAll, csPodInformer.Lister())
sharedIndexInformers = append(sharedIndexInformers, csPodInformer.Informer())

if err := labelObjects(corev1.SchemeGroupVersion.WithResource("pods"), csPodInformer.Informer(), labeller.ObjectLabeler[*corev1.Pod, *corev1applyconfigurations.PodApplyConfiguration](
ctx, op.logger, func(object metav1.Object) bool {
_, ok := object.GetLabels()[reconciler.CatalogSourceLabelKey]
return ok
},
podsgvk := corev1.SchemeGroupVersion.WithResource("pods")
if err := labelObjects(podsgvk, csPodInformer.Informer(), labeller.ObjectLabeler[*corev1.Pod, *corev1applyconfigurations.PodApplyConfiguration](
ctx, op.logger, labeller.Filter(podsgvk),
corev1applyconfigurations.Pod,
func(namespace string, ctx context.Context, cfg *corev1applyconfigurations.PodApplyConfiguration, opts metav1.ApplyOptions) (*corev1.Pod, error) {
return op.opClient.KubernetesInterface().CoreV1().Pods(namespace).Apply(ctx, cfg, opts)
Expand Down Expand Up @@ -500,19 +514,11 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
jobInformer := k8sInformerFactory.Batch().V1().Jobs()
sharedIndexInformers = append(sharedIndexInformers, jobInformer.Informer())

if err := labelObjects(batchv1.SchemeGroupVersion.WithResource("jobs"), jobInformer.Informer(), labeller.ObjectLabeler[*batchv1.Job, *batchv1applyconfigurations.JobApplyConfiguration](
ctx, op.logger, func(object metav1.Object) bool {
for _, ownerRef := range object.GetOwnerReferences() {
if ownerRef.APIVersion == corev1.SchemeGroupVersion.String() && ownerRef.Kind == "ConfigMap" {
cm, err := configMapInformer.Lister().ConfigMaps(object.GetNamespace()).Get(ownerRef.Name)
if err != nil {
return false
}
return labeller.HasOLMOwnerRef(cm)
}
}
return false
},
jobsgvk := batchv1.SchemeGroupVersion.WithResource("jobs")
if err := labelObjects(jobsgvk, jobInformer.Informer(), labeller.ObjectLabeler[*batchv1.Job, *batchv1applyconfigurations.JobApplyConfiguration](
ctx, op.logger, labeller.JobFilter(func(namespace, name string) (metav1.Object, error) {
return configMapInformer.Lister().ConfigMaps(namespace).Get(name)
}),
batchv1applyconfigurations.Job,
func(namespace string, ctx context.Context, cfg *batchv1applyconfigurations.JobApplyConfiguration, opts metav1.ApplyOptions) (*batchv1.Job, error) {
return op.opClient.KubernetesInterface().BatchV1().Jobs(namespace).Apply(ctx, cfg, opts)
Expand Down Expand Up @@ -585,15 +591,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
return nil, err
}

if err := labelObjects(apiextensionsv1.SchemeGroupVersion.WithResource("customresourcedefinitions"), crdInformer, labeller.ObjectPatchLabeler(
ctx, op.logger, func(object metav1.Object) bool {
for key := range object.GetAnnotations() {
if strings.HasPrefix(key, alongside.AnnotationPrefix) {
return true
}
}
return false
},
customresourcedefinitionsgvk := apiextensionsv1.SchemeGroupVersion.WithResource("customresourcedefinitions")
if err := labelObjects(customresourcedefinitionsgvk, crdInformer, labeller.ObjectPatchLabeler(
ctx, op.logger, labeller.Filter(customresourcedefinitionsgvk),
op.opClient.ApiextensionsInterface().ApiextensionsV1().CustomResourceDefinitions().Patch,
)); err != nil {
return nil, err
Expand Down
112 changes: 112 additions & 0 deletions pkg/controller/operators/labeller/filters.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package labeller
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we add some unit tests around this package?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would you like to see? I held off since most of the logic in here is very thin implementation on top of client calls and seemed like it could be validated by inspection.


import (
"context"
"fmt"
"strings"
"sync"

"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/reconciler"
"github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
appsv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/metadata"

"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/internal/alongside"
)

func Filter(gvr schema.GroupVersionResource) func(metav1.Object) bool {
if f, ok := filters[gvr]; ok {
return f
}
return func(object metav1.Object) bool {
return false
}
}

func JobFilter(getConfigMap func(namespace, name string) (metav1.Object, error)) func(object metav1.Object) bool {
return func(object metav1.Object) bool {
for _, ownerRef := range object.GetOwnerReferences() {
if ownerRef.APIVersion == corev1.SchemeGroupVersion.String() && ownerRef.Kind == "ConfigMap" {
cm, err := getConfigMap(object.GetNamespace(), ownerRef.Name)
if err != nil {
return false
}
return HasOLMOwnerRef(cm)
}
}
return false
}
}

var filters = map[schema.GroupVersionResource]func(metav1.Object) bool{
corev1.SchemeGroupVersion.WithResource("services"): HasOLMOwnerRef,
corev1.SchemeGroupVersion.WithResource("pods"): func(object metav1.Object) bool {
_, ok := object.GetLabels()[reconciler.CatalogSourceLabelKey]
return ok
},
corev1.SchemeGroupVersion.WithResource("serviceaccounts"): func(object metav1.Object) bool {
return HasOLMOwnerRef(object) || HasOLMLabel(object)
},
appsv1.SchemeGroupVersion.WithResource("deployments"): HasOLMOwnerRef,
rbacv1.SchemeGroupVersion.WithResource("roles"): HasOLMOwnerRef,
rbacv1.SchemeGroupVersion.WithResource("rolebindings"): HasOLMOwnerRef,
rbacv1.SchemeGroupVersion.WithResource("clusterroles"): HasOLMOwnerRef,
rbacv1.SchemeGroupVersion.WithResource("clusterrolebindings"): HasOLMOwnerRef,
apiextensionsv1.SchemeGroupVersion.WithResource("customresourcedefinitions"): func(object metav1.Object) bool {
for key := range object.GetAnnotations() {
if strings.HasPrefix(key, alongside.AnnotationPrefix) {
return true
}
}
return false
},
}

func Validate(ctx context.Context, logger *logrus.Logger, metadataClient metadata.Interface) (bool, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will it be safe to remove this check at some point, e.g. after we know everything has been labeled?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, whenever we are at a version where all upgrade paths have gone through some state that has the labeling controller in, we can remove this precondition, the label controllers, etc, and just filter by default.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be useful to place a sentinel ConfigMap somewhere that indicates 100% labeling, and check for that, so we don't have to list all the things each time we startup?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's no capital C consistent way to ensure that is always correct, since storage can be sharded by resource. We're only doing partial object metadata lists here, so I think it should be ok?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually ... for Jobs we need the spec 🤦

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Found a workaround for jobs that does not need the spec.

okLock := sync.Mutex{}
var ok bool
g, ctx := errgroup.WithContext(ctx)
allFilters := map[schema.GroupVersionResource]func(metav1.Object) bool{}
for gvr, filter := range filters {
allFilters[gvr] = filter
}
allFilters[batchv1.SchemeGroupVersion.WithResource("jobs")] = JobFilter(func(namespace, name string) (metav1.Object, error) {
return metadataClient.Resource(corev1.SchemeGroupVersion.WithResource("configmaps")).Namespace(namespace).Get(ctx, name, metav1.GetOptions{})
})
for gvr, filter := range allFilters {
gvr, filter := gvr, filter
g.Go(func() error {
list, err := metadataClient.Resource(gvr).List(ctx, metav1.ListOptions{})
if err != nil {
return fmt.Errorf("failed to list %s: %w", gvr.String(), err)
}
var count int
for _, item := range list.Items {
if filter(&item) && !hasLabel(&item) {
count++
}
}
if count > 0 {
logger.WithFields(logrus.Fields{
"gvr": gvr.String(),
"nonconforming": count,
}).Info("found nonconforming items")
}
okLock.Lock()
ok = ok && count == 0
okLock.Unlock()
return nil
})
}
if err := g.Wait(); err != nil {
return false, err
}
return ok, nil
}
5 changes: 2 additions & 3 deletions pkg/controller/operators/labeller/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,8 @@ import (
"strings"

jsonpatch "github.com/evanphx/json-patch"
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/ownerutil"
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/queueinformer"
"github.com/sirupsen/logrus"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
Expand All @@ -22,6 +19,8 @@ import (

"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/install"
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/decorators"
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/ownerutil"
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/queueinformer"
)

type ApplyConfig[T any] interface {
Expand Down