diff --git a/pkg/api/apis/operators/v1alpha1/catalogsource_types.go b/pkg/api/apis/operators/v1alpha1/catalogsource_types.go index e99806f225..aff7a27557 100644 --- a/pkg/api/apis/operators/v1alpha1/catalogsource_types.go +++ b/pkg/api/apis/operators/v1alpha1/catalogsource_types.go @@ -101,7 +101,10 @@ func (c *CatalogSource) Address() string { if c.Spec.Address != "" { return c.Spec.Address } - return c.Status.RegistryServiceStatus.Address() + if c.Status.RegistryServiceStatus != nil { + return c.Status.RegistryServiceStatus.Address() + } + return "" } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object diff --git a/pkg/api/apis/operators/v1alpha1/subscription_types.go b/pkg/api/apis/operators/v1alpha1/subscription_types.go index c5ba686e39..9199494435 100644 --- a/pkg/api/apis/operators/v1alpha1/subscription_types.go +++ b/pkg/api/apis/operators/v1alpha1/subscription_types.go @@ -63,6 +63,11 @@ type SubscriptionStatus struct { // +optional InstallPlanRef *corev1.ObjectReference `json:"installPlanRef,omitempty"` + // CatalogStatus contains the Subscription's view of its relevant CatalogSources' status. + // It is used to determine SubscriptionStatusConditions related to CatalogSources. + // +optional + CatalogStatus []SubscriptionCatalogStatus `json:"catalogStatus,omitempty"` + // LastUpdated represents the last time that the Subscription status was updated. LastUpdated metav1.Time `json:"lastUpdated"` } @@ -74,6 +79,52 @@ type InstallPlanReference struct { UID types.UID `json:"uuid"` } +// NewInstallPlanReference returns an InstallPlanReference for the given ObjectReference. +func NewInstallPlanReference(ref *corev1.ObjectReference) *InstallPlanReference { + return &InstallPlanReference{ + APIVersion: ref.APIVersion, + Kind: ref.Kind, + Name: ref.Name, + UID: ref.UID, + } +} + +// SubscriptionCatalogStatus describes a Subscription's view of a CatalogSource's status. +type SubscriptionCatalogStatus struct { + // CatalogSourceRef is a reference to a CatalogSource. + CatalogSourceRef *corev1.ObjectReference `json:"catalogSourceRef"` + + // LastUpdated represents the last time that the CatalogSourceHealth changed + LastUpdated metav1.Time `json:"lastUpdated"` + + // Healthy is true if the CatalogSource is healthy; false otherwise. + Healthy bool `json:"healthy"` +} + +// SetSubscriptionCatalogStatus sets the SubscriptionStatus' CatalogStatus field as the given slice if it differs +// from the stored value. Returns true if a change was made, false otherwise. +func (status *SubscriptionStatus) SetSubscriptionCatalogStatus(catalogStatus []SubscriptionCatalogStatus) bool { + if len(status.CatalogStatus) != len(catalogStatus) { + status.CatalogStatus = catalogStatus + return true + } + + // TODO: dedupe catalogStatus? + + set := map[SubscriptionCatalogStatus]struct{}{} + for _, cs := range status.CatalogStatus { + set[cs] = struct{}{} + } + for _, cs := range catalogStatus { + if _, ok := set[cs]; !ok { + status.CatalogStatus = catalogStatus + return true + } + } + + return false +} + // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object // +genclient type Subscription struct { @@ -99,13 +150,3 @@ func (s *Subscription) GetInstallPlanApproval() Approval { } return ApprovalAutomatic } - -// NewInstallPlanReference returns an InstallPlanReference for the given ObjectReference. -func NewInstallPlanReference(ref *corev1.ObjectReference) *InstallPlanReference { - return &InstallPlanReference{ - APIVersion: ref.APIVersion, - Kind: ref.Kind, - Name: ref.Name, - UID: ref.UID, - } -} diff --git a/pkg/api/apis/operators/v1alpha1/zz_generated.deepcopy.go b/pkg/api/apis/operators/v1alpha1/zz_generated.deepcopy.go index 22f3d810bc..57b256be7c 100644 --- a/pkg/api/apis/operators/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/api/apis/operators/v1alpha1/zz_generated.deepcopy.go @@ -968,6 +968,28 @@ func (in *Subscription) DeepCopyObject() runtime.Object { return nil } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *SubscriptionCatalogStatus) DeepCopyInto(out *SubscriptionCatalogStatus) { + *out = *in + if in.CatalogSourceRef != nil { + in, out := &in.CatalogSourceRef, &out.CatalogSourceRef + *out = new(corev1.ObjectReference) + **out = **in + } + in.LastUpdated.DeepCopyInto(&out.LastUpdated) + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SubscriptionCatalogStatus. +func (in *SubscriptionCatalogStatus) DeepCopy() *SubscriptionCatalogStatus { + if in == nil { + return nil + } + out := new(SubscriptionCatalogStatus) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *SubscriptionList) DeepCopyInto(out *SubscriptionList) { *out = *in @@ -1030,6 +1052,13 @@ func (in *SubscriptionStatus) DeepCopyInto(out *SubscriptionStatus) { *out = new(corev1.ObjectReference) **out = **in } + if in.CatalogStatus != nil { + in, out := &in.CatalogStatus, &out.CatalogStatus + *out = make([]SubscriptionCatalogStatus, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } in.LastUpdated.DeepCopyInto(&out.LastUpdated) return } diff --git a/pkg/controller/operators/catalog/metaview.go b/pkg/controller/operators/catalog/metaview.go new file mode 100644 index 0000000000..068b085585 --- /dev/null +++ b/pkg/controller/operators/catalog/metaview.go @@ -0,0 +1,60 @@ +package catalog + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/tools/cache" + + opcache "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/cache" +) + +// metaViewer is a transparent viewer that can be used as a basis for more complex viewers. +// It holds a set of commonly used utilities. +// TODO: we probably don't want to embed Operator. +type metaViewer struct { + *Operator + now func() metav1.Time + namespace string +} + +var _ opcache.Viewer = &metaViewer{} + +func (viewer *metaViewer) Key(obj interface{}) (key string, err error) { + // Use the most common key func (namespace/name) + // TODO: could we use metaViewer to store anything that implements meta.Interface by default? + return cache.DeletionHandlingMetaNamespaceKeyFunc(obj) +} + +func (viewer *metaViewer) KeyByView(view interface{}) (key string, err error) { + // Use the most common key func (namespace/name) + // TODO: could we use metaViewer to store anything that implements meta.Interface by default? + return cache.DeletionHandlingMetaNamespaceKeyFunc(view) +} + +func (viewer *metaViewer) View(obj interface{}) (view interface{}, err error) { + // Passthrough + view = obj + return +} + +type metaViewerOption func(*metaViewer) + +func withNamespace(namespace string) metaViewerOption { + return func(viewer *metaViewer) { + viewer.namespace = namespace + } +} + +// newMetaViewer returns a new metaViewer. +func newMetaViewer(op *Operator, options ...metaViewerOption) *metaViewer { + viewer := &metaViewer{ + Operator: op, + now: timeNow, + namespace: op.namespace, + } + + for _, option := range options { + option(viewer) + } + + return viewer +} diff --git a/pkg/controller/operators/catalog/operator.go b/pkg/controller/operators/catalog/operator.go index 7f12205ea6..d75afa1e21 100644 --- a/pkg/controller/operators/catalog/operator.go +++ b/pkg/controller/operators/catalog/operator.go @@ -33,6 +33,7 @@ import ( olmerrors "github.com/operator-framework/operator-lifecycle-manager/pkg/controller/errors" "github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/reconciler" "github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver" + opcache "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/cache" "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/operatorlister" "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/ownerutil" "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/queueinformer" @@ -61,17 +62,20 @@ type Operator struct { client versioned.Interface lister operatorlister.OperatorLister namespace string + watchedNamespaces []string sources map[resolver.CatalogKey]resolver.SourceRef sourcesLock sync.RWMutex sourcesLastUpdate metav1.Time resolver resolver.Resolver - subQueue workqueue.RateLimitingInterface - catSrcQueueSet *queueinformer.ResourceQueueSet + catsrcQueueSet *queueinformer.ResourceQueueSet + subQueueSet *queueinformer.ResourceQueueSet namespaceResolveQueue workqueue.RateLimitingInterface reconciler reconciler.RegistryReconcilerFactory + viewIndexerSet *opcache.ViewIndexerSet + subIndexerSet *opcache.IndexerSet } -// NewOperator creates a new Catalog Operator. +// NewOperator returns a configured, unstarted Operator. func NewOperator(kubeconfigPath string, logger *logrus.Logger, wakeupInterval time.Duration, configmapRegistryImage, operatorNamespace string, watchedNamespaces ...string) (*Operator, error) { // Default to watching all namespaces. if watchedNamespaces == nil { @@ -84,99 +88,90 @@ func NewOperator(kubeconfigPath string, logger *logrus.Logger, wakeupInterval ti return nil, err } - // Create an OperatorLister - lister := operatorlister.NewLister() - - // Create an informer for each watched namespace. - ipSharedIndexInformers := []cache.SharedIndexInformer{} - subSharedIndexInformers := []cache.SharedIndexInformer{} - csvSharedIndexInformers := []cache.SharedIndexInformer{} - for _, namespace := range watchedNamespaces { - nsInformerFactory := externalversions.NewSharedInformerFactoryWithOptions(crClient, wakeupInterval, externalversions.WithNamespace(namespace)) - ipSharedIndexInformers = append(ipSharedIndexInformers, nsInformerFactory.Operators().V1alpha1().InstallPlans().Informer()) - subSharedIndexInformers = append(subSharedIndexInformers, nsInformerFactory.Operators().V1alpha1().Subscriptions().Informer()) - csvSharedIndexInformers = append(csvSharedIndexInformers, nsInformerFactory.Operators().V1alpha1().ClusterServiceVersions().Informer()) - - // resolver needs subscription and csv listers - lister.OperatorsV1alpha1().RegisterSubscriptionLister(namespace, nsInformerFactory.Operators().V1alpha1().Subscriptions().Lister()) - lister.OperatorsV1alpha1().RegisterClusterServiceVersionLister(namespace, nsInformerFactory.Operators().V1alpha1().ClusterServiceVersions().Lister()) - lister.OperatorsV1alpha1().RegisterInstallPlanLister(namespace, nsInformerFactory.Operators().V1alpha1().InstallPlans().Lister()) - } - // Create a new queueinformer-based operator. queueOperator, err := queueinformer.NewOperator(kubeconfigPath, logger) if err != nil { return nil, err } - // Allocate the new instance of an Operator. + lister := operatorlister.NewLister() op := &Operator{ - Operator: queueOperator, - catSrcQueueSet: queueinformer.NewEmptyResourceQueueSet(), - client: crClient, - lister: lister, - namespace: operatorNamespace, - sources: make(map[resolver.CatalogKey]resolver.SourceRef), - resolver: resolver.NewOperatorsV1alpha1Resolver(lister), - } - - // Create an informer for each catalog namespace - deleteCatSrc := &cache.ResourceEventHandlerFuncs{ - DeleteFunc: op.handleCatSrcDeletion, - } - for _, namespace := range watchedNamespaces { - nsInformerFactory := externalversions.NewSharedInformerFactoryWithOptions(crClient, wakeupInterval, externalversions.WithNamespace(namespace)) - catsrcInformer := nsInformerFactory.Operators().V1alpha1().CatalogSources() - - // Register queue and QueueInformer - var queueName string - if namespace == corev1.NamespaceAll { - queueName = "catsrc" - } else { - queueName = fmt.Sprintf("%s/catsrc", namespace) - } - catsrcQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), queueName) - op.RegisterQueueInformer(queueinformer.NewInformer(catsrcQueue, catsrcInformer.Informer(), op.syncCatalogSources, deleteCatSrc, queueName, metrics.NewMetricsCatalogSource(op.client), logger)) - op.lister.OperatorsV1alpha1().RegisterCatalogSourceLister(namespace, catsrcInformer.Lister()) - op.catSrcQueueSet.Set(namespace, catsrcQueue) - } - - // Register InstallPlan informers. - ipQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "installplans") - ipQueueInformers := queueinformer.New( - ipQueue, - ipSharedIndexInformers, - op.syncInstallPlans, + Operator: queueOperator, + catsrcQueueSet: queueinformer.NewEmptyResourceQueueSet(), + subQueueSet: queueinformer.NewEmptyResourceQueueSet(), + client: crClient, + lister: lister, + namespace: operatorNamespace, + watchedNamespaces: watchedNamespaces, + sources: make(map[resolver.CatalogKey]resolver.SourceRef), + resolver: resolver.NewOperatorsV1alpha1Resolver(lister), + reconciler: reconciler.NewRegistryReconcilerFactory(lister, queueOperator.OpClient, configmapRegistryImage), + viewIndexerSet: opcache.NewViewIndexerSet(), + subIndexerSet: opcache.NewIndexerSet(), + } + + // Setup global components + namespaceInformer := informers.NewSharedInformerFactory(op.OpClient.KubernetesInterface(), wakeupInterval).Core().V1().Namespaces() + resolvingNamespaceQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "resolver") + namespaceQueueInformer := queueinformer.NewInformer( + resolvingNamespaceQueue, + namespaceInformer.Informer(), + // Namespace sync for resolving subscriptions + op.syncResolvingNamespace, nil, - "installplan", - metrics.NewMetricsInstallPlan(op.client), + "resolver", + metrics.NewMetricsNil(), logger, ) - for _, informer := range ipQueueInformers { - op.RegisterQueueInformer(informer) - } - // Register Subscription informers. - subscriptionQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "subscriptions") - subscriptionQueueInformers := queueinformer.New( - subscriptionQueue, - subSharedIndexInformers, - op.syncSubscriptions, - nil, - "subscription", - metrics.NewMetricsSubscription(op.client), - logger, - ) - op.subQueue = subscriptionQueue - for _, informer := range subscriptionQueueInformers { - op.RegisterQueueInformer(informer) - } + op.RegisterQueueInformer(namespaceQueueInformer) + op.lister.CoreV1().RegisterNamespaceLister(namespaceInformer.Lister()) + op.namespaceResolveQueue = resolvingNamespaceQueue - handleDelete := &cache.ResourceEventHandlerFuncs{ - DeleteFunc: op.handleDeletion, - } - // Set up informers for requeuing catalogs + // Setup namespaced components for _, namespace := range watchedNamespaces { + viewIndexer := opcache.NewViewIndexer() + op.viewIndexerSet.Set(namespace, viewIndexer) + + // Create CR informers + crInformerFactory := externalversions.NewSharedInformerFactoryWithOptions(crClient, wakeupInterval, externalversions.WithNamespace(namespace)) + catsrcInformer := crInformerFactory.Operators().V1alpha1().CatalogSources() + subInformer := crInformerFactory.Operators().V1alpha1().Subscriptions() + ipInformer := crInformerFactory.Operators().V1alpha1().InstallPlans() + csvInformer := crInformerFactory.Operators().V1alpha1().ClusterServiceVersions() + + // Create k8s informers + k8sInformerFactory := informers.NewSharedInformerFactoryWithOptions(op.OpClient.KubernetesInterface(), wakeupInterval, informers.WithNamespace(namespace)) + roleInformer := k8sInformerFactory.Rbac().V1().Roles() + roleBindingInformer := k8sInformerFactory.Rbac().V1().RoleBindings() + serviceAccountInformer := k8sInformerFactory.Core().V1().ServiceAccounts() + serviceInformer := k8sInformerFactory.Core().V1().Services() + podInformer := k8sInformerFactory.Core().V1().Pods() + configMapInformer := k8sInformerFactory.Core().V1().ConfigMaps() + + // Register CR listers + lister.OperatorsV1alpha1().RegisterCatalogSourceLister(namespace, catsrcInformer.Lister()) + lister.OperatorsV1alpha1().RegisterSubscriptionLister(namespace, subInformer.Lister()) + lister.OperatorsV1alpha1().RegisterInstallPlanLister(namespace, ipInformer.Lister()) + lister.OperatorsV1alpha1().RegisterClusterServiceVersionLister(namespace, csvInformer.Lister()) + + // Register k8s listers + op.lister.RbacV1().RegisterRoleLister(namespace, roleInformer.Lister()) + op.lister.RbacV1().RegisterRoleBindingLister(namespace, roleBindingInformer.Lister()) + op.lister.CoreV1().RegisterServiceAccountLister(namespace, serviceAccountInformer.Lister()) + op.lister.CoreV1().RegisterServiceLister(namespace, serviceInformer.Lister()) + op.lister.CoreV1().RegisterPodLister(namespace, podInformer.Lister()) + op.lister.CoreV1().RegisterConfigMapLister(namespace, configMapInformer.Lister()) + + // Create all queues + catsrcQueueName := "catalogsources" + subQueueName := "subscriptions" + subCatQueueName := "subcat" + ipQueueName := "installplans" + catsrcQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), catsrcQueueName) + subQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), subQueueName) + subCatQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), subCatQueueName) + ipQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), ipQueueName) roleQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "role") roleBindingQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "rolebinding") serviceAccountQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "serviceaccount") @@ -184,15 +179,69 @@ func NewOperator(kubeconfigPath string, logger *logrus.Logger, wakeupInterval ti podQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "pod") configmapQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "configmap") - informerFactory := informers.NewSharedInformerFactoryWithOptions(op.OpClient.KubernetesInterface(), wakeupInterval, informers.WithNamespace(namespace)) - roleInformer := informerFactory.Rbac().V1().Roles() - roleBindingInformer := informerFactory.Rbac().V1().RoleBindings() - serviceAccountInformer := informerFactory.Core().V1().ServiceAccounts() - serviceInformer := informerFactory.Core().V1().Services() - podInformer := informerFactory.Core().V1().Pods() - configMapInformer := informerFactory.Core().V1().ConfigMaps() + // Create queue sets + op.catsrcQueueSet.Set(namespace, catsrcQueue) + op.subQueueSet.Set(namespace, subQueue) + + // Set up viewers + mv := newMetaViewer(op, withNamespace(namespace)) + scv := newSubCatViewer(mv) + scvKey := opcache.NewViewerKey(subCatIndexKey, new(v1alpha1.CatalogSource), new(v1alpha1.SubscriptionCatalogStatus)) + viewIndexer.AddViewer(scvKey, scv) + + // Set up indexing and index enqueuing + if err := viewIndexer.AddIndex(subCatIndexKey, opcache.BuildEnqueuingIndex(scv.subCatViewIndex, subCatQueue)); err != nil { + logrus.WithError(err).Warn("failed to setup subscription catalog status view indexer") + return nil, err + } + + // TODO: Create and register queue indexers + subIndexers := map[string]cache.Indexer{namespace: subInformer.Informer().GetIndexer()} + op.subIndexerSet.Set(namespace, subInformer.Informer().GetIndexer()) + subCatQueueIndexerName := "subcat-" + namespace + subCatQueueIndexer := queueinformer.NewQueueIndexer(subCatQueue, subIndexers, scv.setSubCatStatus, subCatQueueIndexerName, logger, metrics.NewMetricsNil()) + op.RegisterQueueIndexer(subCatQueueIndexer) + // Define extra event handlers + handleCatsrcDelete := &cache.ResourceEventHandlerFuncs{ + DeleteFunc: op.handleCatSrcDeletion, + } + handleDelete := &cache.ResourceEventHandlerFuncs{ + DeleteFunc: op.handleDeletion, + } + + // Create and register queue informers queueInformers := []*queueinformer.QueueInformer{ + // CR informers + queueinformer.NewInformer( + catsrcQueue, + catsrcInformer.Informer(), + op.syncCatalogSources, + handleCatsrcDelete, + catsrcQueueName, + metrics.NewMetricsCatalogSource(op.client), + logger, + ).WithViewIndexer(viewIndexer), + queueinformer.NewInformer( + ipQueue, + ipInformer.Informer(), + op.syncInstallPlans, + nil, + ipQueueName, + metrics.NewMetricsInstallPlan(op.client), + logger, + ), + queueinformer.NewInformer( + subQueue, + subInformer.Informer(), + op.syncSubscriptions, + nil, + subQueueName, + metrics.NewMetricsSubscription(op.client), + logger, + ).AdditionallyDistribute(subCatQueue), + + // k8s informers queueinformer.NewInformer(roleQueue, roleInformer.Informer(), op.syncObject, handleDelete, "role", metrics.NewMetricsNil(), logger), queueinformer.NewInformer(roleBindingQueue, roleBindingInformer.Informer(), op.syncObject, handleDelete, "rolebinding", metrics.NewMetricsNil(), logger), queueinformer.NewInformer(serviceAccountQueue, serviceAccountInformer.Informer(), op.syncObject, handleDelete, "serviceaccount", metrics.NewMetricsNil(), logger), @@ -204,36 +253,35 @@ func NewOperator(kubeconfigPath string, logger *logrus.Logger, wakeupInterval ti op.RegisterQueueInformer(q) } - op.lister.RbacV1().RegisterRoleLister(namespace, roleInformer.Lister()) - op.lister.RbacV1().RegisterRoleBindingLister(namespace, roleBindingInformer.Lister()) - op.lister.CoreV1().RegisterServiceAccountLister(namespace, serviceAccountInformer.Lister()) - op.lister.CoreV1().RegisterServiceLister(namespace, serviceInformer.Lister()) - op.lister.CoreV1().RegisterPodLister(namespace, podInformer.Lister()) - op.lister.CoreV1().RegisterConfigMapLister(namespace, configMapInformer.Lister()) } - op.reconciler = reconciler.NewRegistryReconcilerFactory(op.lister, op.OpClient, configmapRegistryImage) - - // Namespace sync for resolving subscriptions - namespaceInformer := informers.NewSharedInformerFactory(op.OpClient.KubernetesInterface(), wakeupInterval).Core().V1().Namespaces() - resolvingNamespaceQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "resolver") - namespaceQueueInformer := queueinformer.NewInformer( - resolvingNamespaceQueue, - namespaceInformer.Informer(), - op.syncResolvingNamespace, - nil, - "resolver", - metrics.NewMetricsNil(), - logger, - ) - - op.RegisterQueueInformer(namespaceQueueInformer) - op.lister.CoreV1().RegisterNamespaceLister(namespaceInformer.Lister()) - op.namespaceResolveQueue = resolvingNamespaceQueue - // Register CSV informers to fill cache - for _, informer := range csvSharedIndexInformers { - op.RegisterInformer(informer) - } + // // Register the QueueInformer for SubscriptionStatus.CatalogStatus + // subCatsrcStatusQueueName := fmt.Sprintf("%s/subscription/status", catsrcQueueName) + // subCatsrcStatusQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), subCatsrcStatusQueueName) + // catsrcQueueInformers = append(catsrcQueueInformers, queueinformer.New( + // subCatsrcStatusQueue, + // catsrcSharedIndexInformers, + // op.syncSubscriptionCatalogStatus, + // nil, + // subCatsrcStatusQueueName, + // metrics.NewMetricsCatalogSource(op.client), + // logger, + // )...) + + // TODO: add subscription queueindexer for updating subscription status + + // Register the QueueInformer for SubscriptionStatus.CatalogStatus + // subCleanupQueueName := fmt.Sprintf("%s/catalogstatus/cleanup", subQueueName) + // subCleanupQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), subCleanupQueueName) + // subQueueInformers = append(subQueueInformers, queueinformer.New( + // subCleanupQueue, + // subSharedIndexInformers, + // op.cleanupSubscriptionCatalogStatus, + // nil, + // subCleanupQueueName, + // metrics.NewMetricsSubscription(op.client), + // logger, + // )...) return op, nil } @@ -273,7 +321,7 @@ func (o *Operator) syncObject(obj interface{}) (syncError error) { defer o.sourcesLock.RUnlock() if _, ok := o.sources[sourceKey]; ok { logger.Debug("requeueing owner CatalogSource") - if err := o.catSrcQueueSet.Requeue(owner.Name, metaObj.GetNamespace()); err != nil { + if err := o.catsrcQueueSet.Requeue(owner.Name, metaObj.GetNamespace()); err != nil { logger.Warn(err.Error()) } } @@ -324,7 +372,7 @@ func (o *Operator) handleDeletion(obj interface{}) { } // Requeue CatalogSource - if err := o.catSrcQueueSet.Requeue(catsrc.GetName(), catsrc.GetNamespace()); err != nil { + if err := o.catsrcQueueSet.Requeue(catsrc.GetName(), catsrc.GetNamespace()); err != nil { logger.WithError(err).Warn("error requeuing owner catalogsource") } } @@ -354,7 +402,7 @@ func (o *Operator) handleCatSrcDeletion(obj interface{}) { }() o.Log.WithField("source", sourceKey).Info("removed client for deleted catalogsource") - if err := o.catSrcQueueSet.Remove(sourceKey.Name, sourceKey.Namespace); err != nil { + if err := o.catsrcQueueSet.Remove(sourceKey.Name, sourceKey.Namespace); err != nil { o.Log.WithError(err) } } @@ -496,7 +544,7 @@ func (o *Operator) syncCatalogSources(obj interface{}) (syncError error) { if !client.Conn.WaitForStateChange(ctx, connectivity.TransientFailure) { logger.Debug("state didn't change, trigger reconnect. this may happen when cached dns is wrong.") delete(o.sources, sourceKey) - if err := o.catSrcQueueSet.Requeue(sourceKey.Name, sourceKey.Namespace); err != nil { + if err := o.catsrcQueueSet.Requeue(sourceKey.Name, sourceKey.Namespace); err != nil { logger.WithError(err).Debug("error requeueing") } return @@ -549,7 +597,9 @@ func (o *Operator) syncDependentSubscriptions(logger *logrus.Entry, catalogSourc } if sub.Spec.CatalogSource == catalogSource && catalogNamespace == catalogSourceNamespace { logger.Debug("requeueing subscription because catalog changed") - o.requeueSubscription(sub.GetName(), sub.GetNamespace()) + if err := o.subQueueSet.RequeueRateLimited(sub.GetName(), sub.GetNamespace()); err != nil { + logger.WithError(err).Warn("failed to requeue subscription") + } } } } @@ -681,7 +731,7 @@ func (o *Operator) ensureResolverSources(logger *logrus.Entry, namespace string) if ref.LastHealthy.IsZero() { logger = logger.WithField("source", k) logger.Debug("omitting source, hasn't yet become healthy") - if err := o.catSrcQueueSet.Requeue(k.Name, k.Namespace); err != nil { + if err := o.catsrcQueueSet.Requeue(k.Name, k.Namespace); err != nil { logger.Warn("error requeueing") } continue @@ -907,13 +957,6 @@ func (o *Operator) createInstallPlan(namespace string, subs []*v1alpha1.Subscrip return operators.GetReference(res) } -func (o *Operator) requeueSubscription(name, namespace string) { - // we can build the key directly, will need to change if queue uses different key scheme - key := fmt.Sprintf("%s/%s", namespace, name) - o.subQueue.Add(key) - return -} - func (o *Operator) syncInstallPlans(obj interface{}) (syncError error) { plan, ok := obj.(*v1alpha1.InstallPlan) if !ok { @@ -950,7 +993,9 @@ func (o *Operator) syncInstallPlans(obj interface{}) (syncError error) { if ownerutil.IsOwnedByKind(outInstallPlan, v1alpha1.SubscriptionKind) { oref := ownerutil.GetOwnerByKind(outInstallPlan, v1alpha1.SubscriptionKind) logger.WithField("owner", oref).Debug("requeueing installplan owner") - o.requeueSubscription(oref.Name, outInstallPlan.GetNamespace()) + if err := o.subQueueSet.RequeueRateLimited(oref.Name, outInstallPlan.GetNamespace()); err != nil { + logger.WithError(err).Warn("failed to requeue installplan owner subscription") + } } // Update InstallPlan with status of transition. Log errors if we can't write them to the status. diff --git a/pkg/controller/operators/catalog/subscriptions.go b/pkg/controller/operators/catalog/subscriptions.go index 8cea6b8be0..af4b32fe3c 100644 --- a/pkg/controller/operators/catalog/subscriptions.go +++ b/pkg/controller/operators/catalog/subscriptions.go @@ -2,8 +2,17 @@ package catalog import ( "errors" + "fmt" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/client-go/tools/cache" + + "github.com/operator-framework/operator-lifecycle-manager/pkg/api/apis/operators" "github.com/operator-framework/operator-lifecycle-manager/pkg/api/apis/operators/v1alpha1" + "k8s.io/apimachinery/pkg/labels" ) var ( @@ -26,7 +35,7 @@ func labelsForSubscription(sub *v1alpha1.Subscription) map[string]string { } } -// TODO remove this once UI no longer needs them +// TODO: remove this once UI no longer needs them func legacyLabelsForSubscription(sub *v1alpha1.Subscription) map[string]string { return map[string]string{ "alm-package": sub.Spec.Package, @@ -49,3 +58,211 @@ func ensureLabels(sub *v1alpha1.Subscription) *v1alpha1.Subscription { sub.SetLabels(labels) return sub } + +// --------------------------------------------------------------------------------------------------------- +// Views needed by Subscriptions + +// subCatViewer implements the SubscriptionCatalogStatus view over CatalogSources. +type subCatViewer struct { + *metaViewer + getReference func(obj runtime.Object) (*corev1.ObjectReference, error) + namespace string +} + +type subCatViewerOption func(*subCatViewer) + +func withMetaViewerOptions(options ...metaViewerOption) subCatViewerOption { + return func(viewer *subCatViewer) { + for _, option := range options { + option(viewer.metaViewer) + } + } +} + +// newSubCatViewer returns a Viewer that produces SubscriptionCatalogStatues from CatalogSources. +func newSubCatViewer(mv *metaViewer, options ...subCatViewerOption) *subCatViewer { + // Set defaults + viewer := &subCatViewer{ + metaViewer: mv, + getReference: operators.GetReference, + } + + for _, option := range options { + option(viewer) + } + + return viewer +} + +const ( + subCatPrefix string = "subcatalogstatus" + subCatKeyTemplate string = subCatPrefix + "/%s/%s" +) + +func (viewer *subCatViewer) Key(obj interface{}) (key string, err error) { + catalog, ok := obj.(*v1alpha1.CatalogSource) + if !ok { + err = fmt.Errorf("unexpected object value type %T, expected %T", obj, new(v1alpha1.CatalogSource)) + return + } + + key = fmt.Sprintf(subCatKeyTemplate, catalog.GetNamespace(), catalog.GetName()) + return +} + +func (viewer *subCatViewer) KeyByView(view interface{}) (key string, err error) { + scs, ok := view.(*v1alpha1.SubscriptionCatalogStatus) + if !ok { + err = fmt.Errorf("unexpected view value type %T, expected %T", view, new(v1alpha1.SubscriptionCatalogStatus)) + return + } + + key = fmt.Sprintf(subCatKeyTemplate, scs.CatalogSourceRef.Namespace, scs.CatalogSourceRef.Name) + return +} + +func (viewer *subCatViewer) View(obj interface{}) (view interface{}, err error) { + catalog, ok := obj.(*v1alpha1.CatalogSource) + if !ok { + return nil, fmt.Errorf("unexpected object type, expected %T and got %T", new(v1alpha1.CatalogSource), obj) + } + + // Check the registry server health + healthy, err := viewer.reconciler.ReconcilerForSource(catalog).CheckRegistryServer(catalog) + if err != nil { + return + } + + // Create the view + ref, err := viewer.getReference(catalog) + if err != nil { + return + } + + view = &v1alpha1.SubscriptionCatalogStatus{ + CatalogSourceRef: ref, + LastUpdated: viewer.now(), + Healthy: healthy, + } + + return +} + +const ( + subCatIndexKey string = "subcatalog" +) + +// subCatViewIndex returns a set of indices for a given SubscriptionCatalogStatus. +func (viewer *subCatViewer) subCatViewIndex(view interface{}) ([]string, error) { + scs, ok := view.(*v1alpha1.SubscriptionCatalogStatus) + if !ok { + // Can't build indices for this type. Fail silently. + return []string{}, nil + } + + if scs.CatalogSourceRef.Namespace == viewer.Operator.namespace { + // The CatalogSource is global, get keys for subscriptions in all watched namespaces + namespaces := viewer.watchedNamespaces + if len(namespaces) == 1 && namespaces[0] == metav1.NamespaceAll { + // Need to get all namespace names + nsList, err := viewer.lister.CoreV1().NamespaceLister().List(labels.Everything()) + if err != nil { + return nil, err + } + namespaces = make([]string, len(nsList)) + for i, ns := range nsList { + namespaces[i] = ns.GetName() + } + } + + keySet := sets.String{} + for _, namespace := range namespaces { + // TODO: namespace is probably metav1.NamespaceAll, in which case I don't think it will be indexed propery in other indexers. + indexer := viewer.subIndexerSet.Get(namespace) + if indexer == nil { + return nil, fmt.Errorf("no subscription indexer found for namespace %s", scs.CatalogSourceRef.Namespace) + } + + keys, err := indexer.IndexKeys(cache.NamespaceIndex, namespace) + if err != nil { + return nil, err + } + viewer.Log.WithField("subscription-namespace", namespace).Debugf("keys: %v", keys) + + keySet.Insert(keys...) + } + // panic("YARP") + + return keySet.List(), nil + } + + indexer := viewer.subIndexerSet.Get(scs.CatalogSourceRef.Namespace) + if indexer == nil { + return nil, fmt.Errorf("no subscription indexer found for namespace %s", scs.CatalogSourceRef.Namespace) + } + + return indexer.IndexKeys(cache.NamespaceIndex, scs.CatalogSourceRef.Namespace) +} + +// setSubCatStatus sets the SubscriptionCatalogStatus field on the given Subscription using the SubscriptionCatalogView. +func (viewer *subCatViewer) setSubCatStatus(obj interface{}) error { + sub, ok := obj.(*v1alpha1.Subscription) + if !ok { + return fmt.Errorf("casting Subscription failed") + } + + // Get views from catalogs in the global namespace + viewIndexer := viewer.viewIndexerSet.Get(viewer.Operator.namespace) + if viewIndexer == nil { + // TODO: panic here? + return fmt.Errorf("global namespace indexer nil") + } + + // Generate the Subscription key + indexKey := fmt.Sprintf("%s/%s", sub.GetNamespace(), sub.GetName()) + + var subCatStatus []v1alpha1.SubscriptionCatalogStatus + objs, err := viewIndexer.ByIndex(subCatIndexKey, indexKey) + if err != nil { + return err + } + for _, o := range objs { + if scs, ok := o.(*v1alpha1.SubscriptionCatalogStatus); ok { + subCatStatus = append(subCatStatus, *scs) + } else { + panic(fmt.Sprintf("obj %v not of type %T", o, new(v1alpha1.SubscriptionCatalogStatus))) + } + } + + if sub.GetNamespace() != viewer.Operator.namespace || viewer.watchedNamespaces[0] != metav1.NamespaceAll { + // Get views from the Subscription namespace + viewIndexer = viewer.viewIndexerSet.Get(sub.GetNamespace()) + if viewIndexer == nil { + // TODO: panic here? + return fmt.Errorf("%s indexer nil", sub.GetNamespace()) + } + + objs, err = viewIndexer.ByIndex(subCatIndexKey, indexKey) + if err != nil { + return err + } + for _, o := range objs { + if scs, ok := o.(*v1alpha1.SubscriptionCatalogStatus); ok { + subCatStatus = append(subCatStatus, *scs) + } else { + panic(fmt.Sprintf("obj %v not of type %T", o, new(v1alpha1.SubscriptionCatalogStatus))) + } + } + } + + // Update the catalog status if a change has been made + if sub.Status.SetSubscriptionCatalogStatus(subCatStatus) { + sub.Status.LastUpdated = viewer.now() + _, err := viewer.client.OperatorsV1alpha1().Subscriptions(sub.GetNamespace()).UpdateStatus(sub) + if err != nil { + return err + } + } + + return nil +} diff --git a/pkg/lib/cache/viewer.go b/pkg/lib/cache/viewer.go new file mode 100644 index 0000000000..7e645a3fc5 --- /dev/null +++ b/pkg/lib/cache/viewer.go @@ -0,0 +1,305 @@ +package cache + +import ( + "fmt" + "reflect" + "sync" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + utilerrors "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" +) + +// Viewer is a type that knows how to create and key an alternate view of a given resource. +type Viewer interface { + // Key returns the resulting view key for the given object. + Key(obj interface{}) (key string, err error) + + // KeyByView returns the view key for the given view object. + KeyByView(view interface{}) (key string, err error) + + // View returns the view for the given object. + View(obj interface{}) (value interface{}, err error) +} + +// ViewerKey relates a Viewer name to the type of resource it transforms. +type ViewerKey struct { + // Name is the Viewer's name. + Name string + + // Transforms is the type of resource the Viewer can transform. + Transforms reflect.Type + + // Into is the view type the Viewer produces. + Into reflect.Type +} + +// NewViewerKey returns a ViewerKey with the given name and reflect.Type of transforms. +func NewViewerKey(name string, transforms interface{}, into interface{}) ViewerKey { + return ViewerKey{ + Name: name, + Transforms: reflect.TypeOf(transforms), + Into: reflect.TypeOf(into), + } +} + +// Viewers maps ViewerKeys to a Viewers. +type Viewers map[ViewerKey]Viewer + +// ViewIndexer is a storage interface that supports building and indexing alternate views of given data. +type ViewIndexer interface { + cache.Indexer + + // View returns the stored view of the given object generated by the viewer of the given key if one exists. + View(viewerKey ViewerKey, obj interface{}) (value interface{}, err error) + + // AddViewer adds a Viewer to the ViewIndexer with the given key. The results of calling this after data has been added to the indexer are undefined. + AddViewer(viewerKey ViewerKey, viewer Viewer) error + + // AddViewers adds the given set of Viewers to the ViewIndexer. If you call this after you already have data in the indexer, the results are undefined. + AddViewers(viewers Viewers) error + + // AddIndex adds an index func to the indexer with the given name. + AddIndex(name string, indexFunc cache.IndexFunc) error + + // TODO: Include Add, Update, Delete methods that can specify a single view to use? +} + +// ViewIndexerSet provides thread-safe methods for storing and retrieving ViewIndexers. +type ViewIndexerSet struct { + lock sync.RWMutex + viewIndexerSet map[string]ViewIndexer +} + +// Get returns the ViewIndexer associated with the given namespace. +func (v *ViewIndexerSet) Get(namespace string) ViewIndexer { + v.lock.RLock() + defer v.lock.RUnlock() + + if viewIndexer, ok := v.viewIndexerSet[metav1.NamespaceAll]; ok { + return viewIndexer + } + + return v.viewIndexerSet[namespace] +} + +// Set sets the given ViewIndexer at the given namespace. +func (v *ViewIndexerSet) Set(namespace string, viewIndexer ViewIndexer) { + v.lock.Lock() + defer v.lock.Unlock() + + v.viewIndexerSet[namespace] = viewIndexer +} + +// NewViewIndexerSet returns a newly initialized +func NewViewIndexerSet() *ViewIndexerSet { + return &ViewIndexerSet{viewIndexerSet: map[string]ViewIndexer{}} +} + +// IndexerSet provides thread-safe methods for storing and retrieving . +type IndexerSet struct { + lock sync.RWMutex + indexerSet map[string]cache.Indexer +} + +// Get returns the Indexer associated with the given namespace. +func (i *IndexerSet) Get(namespace string) cache.Indexer { + i.lock.RLock() + defer i.lock.RUnlock() + + if indexer, ok := i.indexerSet[metav1.NamespaceAll]; ok { + return indexer + } + + return i.indexerSet[namespace] +} + +// Set sets the given Indexer at the given namespace. +func (i *IndexerSet) Set(namespace string, indexer cache.Indexer) { + i.lock.Lock() + defer i.lock.Unlock() + + i.indexerSet[namespace] = indexer +} + +// NewIndexerSet returns a newly initialized +func NewIndexerSet() *IndexerSet { + return &IndexerSet{indexerSet: map[string]cache.Indexer{}} +} + +// ------------------------------------------------------------------------------------------------------------------ + +type viewCache struct { + cache.Indexer + + lock sync.RWMutex + viewers Viewers + + // TODO: use key pointers internally if it will reduce overhead. + // viewerTypeToKey maps ViewerTypes to existing ViewerKeys. + // Used to prevent the same viewer from being registered multiple times. + viewerTypeToKey map[reflect.Type]ViewerKey + + // transformsToKeys maps transform types to sets of ViewerKeys. + // Used to quickly look up the set of applicable views for an added resource. + transformsToKeys map[reflect.Type][]ViewerKey + + // intoToKey maps a view type to the key of the viewer that produces it. + intoToKey map[reflect.Type]ViewerKey +} + +var _ ViewIndexer = &viewCache{} + +// NewViewIndexer returns a zeroed ViewIndexer. +func NewViewIndexer() ViewIndexer { + c := &viewCache{ + viewers: Viewers{}, + viewerTypeToKey: map[reflect.Type]ViewerKey{}, + transformsToKeys: map[reflect.Type][]ViewerKey{}, + intoToKey: map[reflect.Type]ViewerKey{}, + } + c.Indexer = cache.NewIndexer(c.sharedViewKey, cache.Indexers{}) + return c +} + +func (c *viewCache) View(viewerKey ViewerKey, obj interface{}) (value interface{}, err error) { + c.lock.RLock() + defer c.lock.RUnlock() + + viewer, ok := c.viewers[viewerKey] + if !ok { + return nil, fmt.Errorf("view %v not found", viewerKey) + } + + key, err := viewer.Key(obj) + if err != nil { + return nil, err + } + + stored, _, err := c.GetByKey(key) + + return stored, err +} + +func (c *viewCache) AddViewer(viewerKey ViewerKey, viewer Viewer) error { + return c.AddViewers(Viewers{viewerKey: viewer}) +} + +func (c *viewCache) AddViewers(viewers Viewers) error { + c.lock.Lock() + defer c.lock.Unlock() + + if len(c.viewers) > 0 { + return fmt.Errorf("cannot add views to running viewer") + } + + for vk, viewer := range viewers { + vt := reflect.TypeOf(viewer) + if vk, ok := c.viewerTypeToKey[vt]; ok { + return fmt.Errorf("viewer of type %T already registered with key %v", viewer, vk) + } + + c.viewers[vk] = viewer + c.viewerTypeToKey[vt] = vk + c.intoToKey[vk.Into] = vk + c.transformsToKeys[vk.Transforms] = append(c.transformsToKeys[vk.Transforms], vk) + } + + return nil +} + +func (c *viewCache) AddIndex(name string, indexFunc cache.IndexFunc) error { + return c.Indexer.AddIndexers(cache.Indexers{name: indexFunc}) +} + +// sharedViewKey invokes the key function matching the viewer type of the given view. +// This allows a store expecting a single key function to support multiple types. +func (c *viewCache) sharedViewKey(view interface{}) (string, error) { + // Find a viewer that matches the dynamic type + if vk, ok := c.intoToKey[reflect.TypeOf(view)]; ok { + if viewer, ok := c.viewers[vk]; ok { + return viewer.KeyByView(view) + } + } + + // TODO: have a default key function if a view doesn't exist? This could be useful for storing runtime.Objects in the same cache. + return "", fmt.Errorf("no viewer of type %T registered", view) +} + +type modifierFunc func(obj interface{}) error + +func (c *viewCache) modifyViews(obj interface{}, modify modifierFunc) error { + c.lock.RLock() + defer c.lock.RUnlock() + + errs := []error{} + for _, vk := range c.transformsToKeys[reflect.TypeOf(obj)] { + viewer, ok := c.viewers[vk] + if !ok { + return fmt.Errorf("no view found for key: %v", vk) + } + + view, err := viewer.View(obj) + if err != nil { + errs = append(errs, err) + continue + } + + if err := modify(view); err != nil { + errs = append(errs, err) + } + } + + return utilerrors.NewAggregate(errs) +} + +// Add sets an item in the cache. +func (c *viewCache) Add(obj interface{}) error { + return c.modifyViews(obj, c.Indexer.Add) +} + +// Update sets an item in the cache to its updated state. +func (c *viewCache) Update(obj interface{}) error { + return c.modifyViews(obj, c.Indexer.Update) +} + +// Delete removes an item from the cache. +func (c *viewCache) Delete(obj interface{}) error { + return c.modifyViews(obj, c.Indexer.Delete) +} + +// ------------------------------------------------------------------------------------------------------------------ + +// EnqueuingIndexBuilder builds index funcs that enqueue their indices on a set of queues as a side effect. +type EnqueuingIndexBuilder interface { + // Build builds a new function from the given index function that enqueues its indices on the given queues as a side effect. + Build(cache.IndexFunc, ...workqueue.RateLimitingInterface) cache.IndexFunc +} + +// EnqueuingIndexBuilderFunc implements EnqueuingIndexBuilder. +type EnqueuingIndexBuilderFunc func(cache.IndexFunc, ...workqueue.RateLimitingInterface) cache.IndexFunc + +// Build builds a new function from the given index function that enqueues its indices on the given queues as a side effect. +func (f EnqueuingIndexBuilderFunc) Build(index cache.IndexFunc, queues ...workqueue.RateLimitingInterface) cache.IndexFunc { + return f(index, queues...) +} + +// BuildEnqueuingIndex builds a new function from the given index function that enqueues its indices on the given queues synchronously as a side effect. +func BuildEnqueuingIndex(index cache.IndexFunc, queues ...workqueue.RateLimitingInterface) cache.IndexFunc { + return func(obj interface{}) ([]string, error) { + indices, err := index(obj) + if err != nil { + return nil, err + } + + // Enqueue on every queue before returning + for _, queue := range queues { + for _, idx := range indices { + queue.Add(idx) + } + } + + return indices, nil + } +} diff --git a/pkg/lib/index/label.go b/pkg/lib/index/label.go index 52ccd29bea..5d56f3cff2 100644 --- a/pkg/lib/index/label.go +++ b/pkg/lib/index/label.go @@ -13,19 +13,19 @@ const ( MetaLabelIndexFuncKey string = "metalabelindexfunc" ) -// MetaLabelIndexFunc returns indicies from the labels of the given object. +// MetaLabelIndexFunc returns indices from the labels of the given object. func MetaLabelIndexFunc(obj interface{}) ([]string, error) { - indicies := []string{} + indices := []string{} m, err := meta.Accessor(obj) if err != nil { - return indicies, fmt.Errorf("object has no meta: %v", err) + return indices, fmt.Errorf("object has no meta: %v", err) } for k, v := range m.GetLabels() { - indicies = append(indicies, fmt.Sprintf("%s=%s", k, v)) + indices = append(indices, fmt.Sprintf("%s=%s", k, v)) } - return indicies, nil + return indices, nil } // LabelIndexKeys returns the union of indexed cache keys in the given indexers matching the same labels as the given selector diff --git a/pkg/lib/queueinformer/queueinformer.go b/pkg/lib/queueinformer/queueinformer.go index 97f5c2dc34..f79a47ff9d 100644 --- a/pkg/lib/queueinformer/queueinformer.go +++ b/pkg/lib/queueinformer/queueinformer.go @@ -1,10 +1,12 @@ package queueinformer import ( - "github.com/operator-framework/operator-lifecycle-manager/pkg/metrics" "github.com/sirupsen/logrus" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" + + opcache "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/cache" + "github.com/operator-framework/operator-lifecycle-manager/pkg/metrics" ) // SyncHandler is the function that reconciles the controlled object when seen @@ -25,70 +27,77 @@ type QueueInformer struct { // enqueue adds a key to the queue. If obj is a key already it gets added directly. // Otherwise, the key is extracted via keyFunc. -func (q *QueueInformer) enqueue(obj interface{}) { +func enqueue(obj interface{}, key keyFunc, queue workqueue.RateLimitingInterface) { if obj == nil { return } - key, ok := obj.(string) + k, ok := obj.(string) if !ok { - key, ok = q.keyFunc(obj) + k, ok = key(obj) if !ok { return } } - q.queue.Add(key) + queue.Add(k) } // keyFunc turns an object into a key for the queue. In the future will use a (name, namespace) struct as key -func (q *QueueInformer) keyFunc(obj interface{}) (string, bool) { +type keyFunc func(obj interface{}) (string, bool) + +// key turns an object into a key for the queue. In the future will use a (name, namespace) struct as key +func (q *QueueInformer) key(obj interface{}) (string, bool) { k, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) if err != nil { - q.log.Infof("creating key failed: %s", err) + q.log.WithError(err).Debugf("creating key failed for: %v", obj) return k, false } return k, true } -func (q *QueueInformer) defaultAddFunc(obj interface{}) { - key, ok := q.keyFunc(obj) - if !ok { - q.log.Warnf("couldn't add %v, couldn't create key", obj) - return - } +func (q *QueueInformer) defaultAddFunc(queue workqueue.RateLimitingInterface) func(obj interface{}) { + return func(obj interface{}) { + if _, ok := q.key(obj); !ok { + q.log.Warnf("couldn't add %v, couldn't create key", obj) + return + } - q.enqueue(key) + enqueue(obj, q.key, queue) + } } -func (q *QueueInformer) defaultDeleteFunc(obj interface{}) { - key, ok := q.keyFunc(obj) - if !ok { - q.log.Warnf("couldn't delete %v, couldn't create key", obj) - return - } +func (q *QueueInformer) defaultDeleteFunc(queue workqueue.RateLimitingInterface) func(obj interface{}) { + return func(obj interface{}) { + k, ok := q.key(obj) + if !ok { + q.log.Warnf("couldn't delete %v, couldn't create key", obj) + return + } - q.queue.Forget(key) + queue.Forget(k) + } } -func (q *QueueInformer) defaultUpdateFunc(oldObj, newObj interface{}) { - key, ok := q.keyFunc(newObj) - if !ok { - q.log.Warnf("couldn't update %v, couldn't create key", newObj) - return - } +func (q *QueueInformer) defaultUpdateFunc(queue workqueue.RateLimitingInterface) func(oldObj, newObj interface{}) { + return func(oldObj, newObj interface{}) { + if _, ok := q.key(newObj); !ok { + q.log.Warnf("couldn't update %v, couldn't create key", newObj) + return + } - q.enqueue(key) + enqueue(newObj, q.key, queue) + } } // defaultResourceEventhandlerFuncs provides the default implementation for responding to events // these simply Log the event and add the object's key to the queue for later processing -func (q *QueueInformer) defaultResourceEventHandlerFuncs() *cache.ResourceEventHandlerFuncs { +func (q *QueueInformer) defaultResourceEventHandlerFuncs(queue workqueue.RateLimitingInterface) *cache.ResourceEventHandlerFuncs { return &cache.ResourceEventHandlerFuncs{ - AddFunc: q.defaultAddFunc, - DeleteFunc: q.defaultDeleteFunc, - UpdateFunc: q.defaultUpdateFunc, + AddFunc: q.defaultAddFunc(queue), + DeleteFunc: q.defaultDeleteFunc(queue), + UpdateFunc: q.defaultUpdateFunc(queue), } } @@ -113,7 +122,7 @@ func NewInformer(queue workqueue.RateLimitingInterface, informer cache.SharedInd MetricsProvider: metrics, log: logger, } - queueInformer.resourceEventHandlerFuncs = queueInformer.defaultResourceEventHandlerFuncs() + queueInformer.resourceEventHandlerFuncs = queueInformer.defaultResourceEventHandlerFuncs(queue) if funcs != nil { if funcs.AddFunc != nil { queueInformer.resourceEventHandlerFuncs.AddFunc = funcs.AddFunc @@ -128,3 +137,83 @@ func NewInformer(queue workqueue.RateLimitingInterface, informer cache.SharedInd queueInformer.informer.AddEventHandler(queueInformer.resourceEventHandlerFuncs) return queueInformer } + +// WithViewIndexer adds EventHandler funcs that inform the given ViewIndexer of changes. +// Returns the queueinformer to support chaining. +// TODO: Make this a method of a type that embeds QueueInformer instead. +func (q *QueueInformer) WithViewIndexer(viewIndexer opcache.ViewIndexer) *QueueInformer { + logger := q.log.WithField("eventhandler", "viewindexer") + handler := &cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + // TODO: Check for resource existing and update instead if it exists + logger := logger.WithField("operation", "add") + logger.Debugf("%v", obj) + if err := viewIndexer.Add(obj); err != nil { + logger.WithError(err).Warnf("could not add object of type %T to ViewIndexer", obj) + } + }, + UpdateFunc: func(oldObj, newObj interface{}) { + // TODO: Check for resource not existing before update and add instead if it doesn't exist + logger := logger.WithField("operation", "update") + logger.Debugf("%v", newObj) + if err := viewIndexer.Update(newObj); err != nil { + logger.WithError(err).Warnf("could not update object of type %T in ViewIndexer", newObj) + } + }, + DeleteFunc: func(obj interface{}) { + // TODO: Check for resource existence before deleting + logger := logger.WithField("operation", "delete") + if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok { + obj = tombstone.Obj + logger.Debug("unpacked tombstone") + } + + logger.Debugf("%v", obj) + if err := viewIndexer.Delete(obj); err != nil { + logger.WithError(err).Warnf("could not delete object of type %T in ViewIndexer", obj) + } + }, + } + + q.informer.AddEventHandler(handler) + + return q +} + +// AdditionallyDistribute adds EventHandler funcs that distribute events to each queue in the given list. +// Returns the queueinformer to support chaining. +// TODO: No ordering is guarenteed between event handlers. +func (q *QueueInformer) AdditionallyDistribute(queues ...workqueue.RateLimitingInterface) *QueueInformer { + // Build handler func for each queue + numQueues := len(queues) + add := make([]func(obj interface{}), numQueues) + update := make([]func(oldObj, newObj interface{}), numQueues) + del := make([]func(obj interface{}), numQueues) + for i, queue := range queues { + add[i] = q.defaultAddFunc(queue) + update[i] = q.defaultUpdateFunc(queue) + del[i] = q.defaultDeleteFunc(queue) + } + + handler := &cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + for _, f := range add { + f(obj) + } + }, + UpdateFunc: func(oldObj, newObj interface{}) { + for _, f := range update { + f(oldObj, newObj) + } + }, + DeleteFunc: func(obj interface{}) { + for _, f := range del { + f(obj) + } + }, + } + + q.informer.AddEventHandler(handler) + + return q +} diff --git a/pkg/lib/queueinformer/resourcequeue.go b/pkg/lib/queueinformer/resourcequeue.go index 08f4ea4424..d53c7ef547 100644 --- a/pkg/lib/queueinformer/resourcequeue.go +++ b/pkg/lib/queueinformer/resourcequeue.go @@ -53,6 +53,27 @@ func (r *ResourceQueueSet) Requeue(name, namespace string) error { return fmt.Errorf("couldn't find queue for resource") } +// RequeueRateLimited performs a rate limited requeue on the resource in the set with the given name and namespace +func (r *ResourceQueueSet) RequeueRateLimited(name, namespace string) error { + r.mutex.RLock() + defer r.mutex.RUnlock() + + // We can build the key directly, will need to change if queue uses different key scheme + key := fmt.Sprintf("%s/%s", namespace, name) + + if queue, ok := r.queueSet[metav1.NamespaceAll]; len(r.queueSet) == 1 && ok { + queue.AddRateLimited(key) + return nil + } + + if queue, ok := r.queueSet[namespace]; ok { + queue.AddRateLimited(key) + return nil + } + + return fmt.Errorf("couldn't find queue for resource") +} + // RequeueByKey adds the given key to the resource queue that should contain it func (r *ResourceQueueSet) RequeueByKey(key string) error { r.mutex.RLock()