From 9ad031725afb47bdc4eadffe0d5cbdfcceac711d Mon Sep 17 00:00:00 2001 From: njhale Date: Fri, 26 Apr 2019 12:48:19 -0400 Subject: [PATCH 1/3] feat(subscriptions): add catalogstatus to subscriptionstatus --- .../operators/v1alpha1/subscription_types.go | 89 ++++++++++++++++--- 1 file changed, 79 insertions(+), 10 deletions(-) diff --git a/pkg/api/apis/operators/v1alpha1/subscription_types.go b/pkg/api/apis/operators/v1alpha1/subscription_types.go index c5ba686e39..f4cfedb869 100644 --- a/pkg/api/apis/operators/v1alpha1/subscription_types.go +++ b/pkg/api/apis/operators/v1alpha1/subscription_types.go @@ -63,6 +63,11 @@ type SubscriptionStatus struct { // +optional InstallPlanRef *corev1.ObjectReference `json:"installPlanRef,omitempty"` + // CatalogStatus contains the Subscription's view of its relevant CatalogSources' status. + // It is used to determine SubscriptionStatusConditions related to CatalogSources. + // +optional + CatalogStatus []SubscriptionCatalogStatus `json:"catalogStatus,omitempty"` + // LastUpdated represents the last time that the Subscription status was updated. LastUpdated metav1.Time `json:"lastUpdated"` } @@ -74,6 +79,80 @@ type InstallPlanReference struct { UID types.UID `json:"uuid"` } +// NewInstallPlanReference returns an InstallPlanReference for the given ObjectReference. +func NewInstallPlanReference(ref *corev1.ObjectReference) *InstallPlanReference { + return &InstallPlanReference{ + APIVersion: ref.APIVersion, + Kind: ref.Kind, + Name: ref.Name, + UID: ref.UID, + } +} + +// SubscriptionCatalogStatus describes a Subscription's view of a CatalogSource's status. +type SubscriptionCatalogStatus struct { + // CatalogSourceRef is a reference to a CatalogSource. + CatalogSourceRef *corev1.ObjectReference `json:"catalogSourceRef"` + + // LastUpdated represents the last time that the CatalogSourceHealth changed + LastUpdated metav1.Time `json:"lastUpdated"` + + // Healthy is true if the CatalogSource is healthy; false otherwise. + Healthy bool `json:"healthy"` +} + +// SetSubscriptionCatalogStatus sets the given SusbcriptionCatalogStatus in a SubscriptionStatus if it doesn't already exist +// or the status has changed and returns true if the status was set; false otherwise. +func (status *SubscriptionStatus) SetSubscriptionCatalogStatus(catalogStatus SubscriptionCatalogStatus) bool { + target := catalogStatus.CatalogSourceRef + if target == nil && target.APIVersion == SchemeGroupVersion.String() && target.Kind == SubscriptionKind { + return false + } + + // Search for status to replace + for i, cs := range status.CatalogStatus { + ref := cs.CatalogSourceRef + if ref == nil { + continue + } + + if ref.Namespace == target.Namespace && ref.Name == target.Name && ref.UID == target.UID { + if cs.Healthy != catalogStatus.Healthy { + status.CatalogStatus[i] = catalogStatus + return true + } + + return false + } + } + + status.CatalogStatus = append(status.CatalogStatus, catalogStatus) + return true +} + +// RemoveSubscriptionCatalogStatus removes the SubscriptionCatalogStatus matching the given ObjectReference from a SubscriptionStatus +// and returns true if the status was removed; false otherwise. +func (status *SubscriptionStatus) RemoveSubscriptionCatalogStatus(target *corev1.ObjectReference) bool { + if target == nil && target.APIVersion == SchemeGroupVersion.String() && target.Kind == SubscriptionKind { + return false + } + + // Search for status to remove + for i, cs := range status.CatalogStatus { + ref := cs.CatalogSourceRef + if ref == nil { + continue + } + + if ref.Namespace == target.Namespace && ref.Name == target.Name && ref.UID == target.UID { + status.CatalogStatus = append(status.CatalogStatus[:i], status.CatalogStatus[i+1:]...) + return true + } + } + + return false +} + // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object // +genclient type Subscription struct { @@ -99,13 +178,3 @@ func (s *Subscription) GetInstallPlanApproval() Approval { } return ApprovalAutomatic } - -// NewInstallPlanReference returns an InstallPlanReference for the given ObjectReference. -func NewInstallPlanReference(ref *corev1.ObjectReference) *InstallPlanReference { - return &InstallPlanReference{ - APIVersion: ref.APIVersion, - Kind: ref.Kind, - Name: ref.Name, - UID: ref.UID, - } -} From 9d7e794b1827074af8c4c4fdf465e9609fd9d185 Mon Sep 17 00:00:00 2001 From: njhale Date: Fri, 26 Apr 2019 12:49:36 -0400 Subject: [PATCH 2/3] refactor(syncing): add viewer framework for reconciliation --- pkg/controller/operators/catalog/scratch.go | 485 ++++++++++++++++++++ pkg/lib/index/label.go | 10 +- 2 files changed, 490 insertions(+), 5 deletions(-) create mode 100644 pkg/controller/operators/catalog/scratch.go diff --git a/pkg/controller/operators/catalog/scratch.go b/pkg/controller/operators/catalog/scratch.go new file mode 100644 index 0000000000..af6ddc1097 --- /dev/null +++ b/pkg/controller/operators/catalog/scratch.go @@ -0,0 +1,485 @@ +package catalog + +import ( + "fmt" + "reflect" + "sync" + "time" + + "github.com/sirupsen/logrus" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + // "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + utilerrors "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/client-go/informers" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + + "github.com/operator-framework/operator-lifecycle-manager/pkg/api/apis/operators" + "github.com/operator-framework/operator-lifecycle-manager/pkg/api/apis/operators/v1alpha1" + "github.com/operator-framework/operator-lifecycle-manager/pkg/api/client" + "github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/clientset/versioned" + exv "github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/informers/externalversions" + "github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/reconciler" + "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/operatorclient" + "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/operatorlister" + "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/queueinformer" + "github.com/operator-framework/operator-lifecycle-manager/pkg/metrics" +) + +// Q: What do we want to do? +// A: Register a set of views, relations, and actions to be triggered when an object changes. +// +// Q: What kind of objects? +// A: Arbitrary +// +// Q: What should produce object change events? +// A: A cache being updated (objects, indices) +// +// Q: How do we inform a set of views of an event? +// A: A view updates: +// - View store updates +// - Indices update (calling index funcs) +// - Related views should be enqueued (by key) in their workqueues +// - Related actions should be enqueued (by key) in their workqueues + +type View interface { + Key(value interface{}) (key string, err error) + Value(obj interface{}) (value interface{}, err error) + Indexers() cache.Indexers +} + +// Views are a set of ViewFuncs keyed by their resource type +type Views map[reflect.Type]View + +// Viewer is a storage interface that supports building alternate views of stored data. +type Viewer interface { + cache.Indexer + + // View gets the value for a view type and given object. + View(viewType reflect.Type, obj interface{}) (value interface{}, err error) + + // AddViews adds more views to this store. If you call this after you already have data + // in the store, the results are undefined. + AddViews(views ...View) error +} + +type viewCache struct { + cache.Indexer + lock sync.RWMutex + views Views + defaultKey cache.KeyFunc +} + +func (c *viewCache) View(viewType reflect.Type, obj interface{}) (value interface{}, err error) { + c.lock.RLock() + defer c.lock.RUnlock() + + view, ok := c.views[viewType] + if !ok { + return nil, fmt.Errorf("view %s not found", viewType.Name()) + } + + value, err = view.Value(obj) + if err != nil { + return nil, err + } + + key, err := view.Key(value) + if err != nil { + return nil, err + } + + stored, _, err := c.GetByKey(key) + + return stored, err +} + +func (c *viewCache) AddViews(views ...View) error { + c.lock.Lock() + defer c.lock.Unlock() + + if len(c.views) > 0 { + return fmt.Errorf("cannot add views to running viewer") + } + + if c.views == nil { + c.views = Views{} + } + + for _, view := range views { + c.views[reflect.TypeOf(view)] = view + } + + return nil +} + +// viewKey invokes the key function matching the view object type. +// This allows a store expecting a single key function to support multiple types. +func (c *viewCache) viewKey(value interface{}) (string, error) { + // Check for a view that matches the dynamic type. + if view, ok := c.views[reflect.TypeOf(value)]; ok { + return view.Key(value) + } + + // TODO: have a default key function if a view doesn't exist? This could be useful for storing runtime.Objects in the same cache. + return "", fmt.Errorf("view type %T unmanaged", value) +} + +type modifierFunc func(obj interface{}) error + +func (c *viewCache) modifyViews(obj interface{}, modify modifierFunc) error { + c.lock.RLock() + defer c.lock.RUnlock() + + errs := []error{} + for _, view := range c.views { + value, err := view.Value(obj) + if err != nil { + errs = append(errs, err) + continue + } + + if err := modify(value); err != nil { + errs = append(errs, err) + } + } + + // TODO: run indexers on unmodified object? + // TODO: modify object as is? + + return utilerrors.NewAggregate(errs) +} + +// Add sets an item in the cache. +func (c *viewCache) Add(obj interface{}) error { + return c.modifyViews(obj, c.Indexer.Add) +} + +// Update sets an item in the cache to its updated state. +func (c *viewCache) Update(obj interface{}) error { + return c.modifyViews(obj, c.Indexer.Update) +} + +// Delete removes an item from the cache. +func (c *viewCache) Delete(obj interface{}) error { + return c.modifyViews(obj, c.Indexer.Delete) +} + +// NewViewer returns a new Viewer containing all of the given views. +func NewViewer(views Views) Viewer { + c := &viewCache{views: views} + indexers := cache.Indexers{} + for _, view := range views { + for name, index := range view.Indexers() { + indexers[name] = index + } + } + c.Indexer = cache.NewIndexer(c.viewKey, indexers) + return c +} + +type viewOperator struct { + *queueinformer.Operator + logger *logrus.Logger + resyncPeriod time.Duration + kubeconfig string + namespaces []string + client versioned.Interface + checker reconciler.RegistryChecker + viewOptions []viewOption + viewer Viewer +} + +type OperatorOption func(*viewOperator) + +func NewViewOperator(configmapRegistryImage string, options ...OperatorOption) (*viewOperator, error) { + // Set defaults + op := &viewOperator{ + logger: logrus.New(), + resyncPeriod: 15 * time.Minute, + namespaces: []string{""}, + viewOptions: []viewOption{}, + } + + // Apply all options + for _, option := range options { + option(op) + } + + // Set additional defaults if not set by options + if op.client == nil { + c, err := client.NewClient("") + if err != nil { + return nil, err + } + op.client = c + } + + if op.OpClient == nil { + op.OpClient = operatorclient.NewClientFromConfig("", op.logger) + } + + if op.Operator == nil { + queueinformer.NewOperatorFromClient(op.OpClient, op.logger) + } + + if op.viewer == nil { + op.viewer = NewViewer(Views{}) + } + + // TODO: View method for adding notification recipients - mapping view to queue + // TODO: Build index functions for notification recipients. + + // TODO: Notify - Lists viewer indexes by index key and enqueues them in the appropriate workqueue. + // TODO: Act - Updates the cluster in some way using the views + + // Create an OperatorLister + lister := operatorlister.NewLister() + + // Setup Informer + namespace := metav1.NamespaceAll + crFactory := exv.NewSharedInformerFactoryWithOptions(op.client, op.resyncPeriod, exv.WithNamespace(namespace)) + catsrcInformer := crFactory.Operators().V1alpha1().CatalogSources() + subInformer := crFactory.Operators().V1alpha1().Subscriptions() + + // Setup View + scv := NewSubCatalogView(configmapRegistryImage, append(op.viewOptions, withSubIndexer(subInformer.Informer().GetIndexer()))...) + op.viewer.AddViews(scv) + + queueName := "catsrc/" + namespace + catsrcQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), queueName) + + // Create an informer for each catalog namespace + deleteCatalog := &cache.ResourceEventHandlerFuncs{ + DeleteFunc: func(obj interface{}) { + catsrc, ok := obj.(*v1alpha1.CatalogSource) + if !ok { + logrus.Warnf("incorrect delete type %T, skipping catalogsource view removal", obj) + return + } + logger := op.logger.WithFields(logrus.Fields{ + "catsrc": catsrc.GetName(), + "namespace": catsrc.GetNamespace(), + }) + + if err := op.viewer.Delete(obj); err != nil { + logger.WithError(err).Warn("failed to remove views") + return + } + + logger.Debug("views removed") + }, + } + + // TODO: Use MetaView as a mechanism to notify CatalogSource changes + // Pod create pod queue informers + factory := informers.NewSharedInformerFactoryWithOptions(op.OpClient.KubernetesInterface(), op.resyncPeriod, informers.WithNamespace(namespace)) + podInformer := factory.Core().V1().Pods() + lister.CoreV1().RegisterPodLister(namespace, podInformer.Lister()) + op.RegisterQueueInformer( + queueinformer.NewInformer( + catsrcQueue, + catsrcInformer.Informer(), + func(obj interface{}) (syncErr error) { + // TODO: implement (View -> Notify) -> Act stages. + // View stage: generates all views - collect aggregate error + // Notify: notify all interested parties by looking at the indexes (only if they have changed) - collect aggregate error + // Act stage: uses views to act on the cluster - collect aggregate error + // View and Notify should be synchronous + // Actions can execute in parallel to (View and Notify) and possibly to themselves + // Preferrably these stages are encapsulated by a QueueInformer wrapper... ViewInformer? + + // Generate all views + var errs []error + if err := op.viewer.Add(obj); err != nil { + errs = append(errs, err) + } + + // Act on views + // TODO: A few ways we could associate views with actions: + // * A view _has an_ action - a direct function call (e.g. view.Act()) + // * A view _informs_ an action - a key is enqueued on an action queue when a view is updated + // * How do we decide which actions to enqueue? + // * How do we order enqueuing? + // * Can some actions be orthogonal? + // * Should the relation be reversed - action relates to a set of views)? + // * Is there a pre-existing model we can use for this - petri-net? + + // What would this look like with a petri-net: + // * Views are places + // * + + syncErr = utilerrors.NewAggregate(errs) + return + }, + deleteCatalog, + queueName, + metrics.NewMetricsNil(), + op.logger)) + + return op, nil +} + +// Define + +// Define views + +// metaView is a transparent view that can be used as a basis for more complex views. +// It holds a set of commonly used utilities. +// TODO: should this just be the Operator itself? +type metaView struct { + logger *logrus.Logger + namespace string + opClient operatorclient.ClientInterface + lister operatorlister.OperatorLister + getReference func(runtime.Object) (*corev1.ObjectReference, error) + now func() metav1.Time +} + +func (view *metaView) Key(value interface{}) (key string, err error) { + // Use the most common key func (namespace/name) + // TODO: could we use metaView to store anything that implements meta.Interface by default? + return cache.DeletionHandlingMetaNamespaceKeyFunc(value) +} + +func (view *metaView) Value(obj interface{}) (value interface{}, err error) { + // Passthrough + value = obj + return +} + +func (view *metaView) Indexers() cache.Indexers { + // Use the most common indexer (namespace) + return cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc} +} + +type viewOption func(View) + +func applyViewOptions(view View, options ...viewOption) { + for _, option := range options { + option(view) + } +} + +// NewMetaView returns a new metaView with the given options applied. +func NewMetaView(options ...viewOption) *metaView { + // Set defaults + view := &metaView{ + lister: operatorlister.NewLister(), + getReference: operators.GetReference, + now: timeNow, + } + + applyViewOptions(view, options...) + + // Set additional defaults if not updated by options + if view.opClient == nil { + view.opClient = operatorclient.NewClientFromConfig("", view.logger) + } + + return view +} + +// subCatalogView implements the SubscriptionCatalogStatus view over CatalogSources. +type subCatalogView struct { + *metaView + subIndexer cache.Indexer + reconcilerFactory reconciler.RegistryReconcilerFactory +} + +func withSubIndexer(subIndexer cache.Indexer) viewOption { + return func(view View) { + if scv, ok := view.(*subCatalogView); ok { + scv.subIndexer = subIndexer + } + } +} + +func NewSubCatalogView(configmapRegistryImage string, options ...viewOption) *subCatalogView { + // Set defaults + view := &subCatalogView{ + metaView: NewMetaView(), + // TODO: Missing subCatalogIndexer can result in bad configuration + } + + applyViewOptions(view, options...) + + // Set additional defaults if not updated by options + if view.reconcilerFactory == nil { + view.reconcilerFactory = reconciler.NewRegistryReconcilerFactory(view.lister, view.opClient, configmapRegistryImage) + } + + return view +} + +const ( + subCatalogPrefix string = "subcatalogstatus" + subCatalogKey string = subCatalogPrefix + "/%s/%s" +) + +func (view *subCatalogView) Key(value interface{}) (key string, err error) { + scs, ok := value.(*v1alpha1.SubscriptionCatalogStatus) + if !ok { + err = fmt.Errorf("unexpected view value type %T, expected %T", value, (*v1alpha1.SubscriptionCatalogStatus)(nil)) + return + } + + key = fmt.Sprintf(subCatalogKey, scs.CatalogSourceRef.Name, scs.CatalogSourceRef.Namespace) + return +} + +func (view *subCatalogView) Value(obj interface{}) (value interface{}, err error) { + catalog, ok := obj.(*v1alpha1.CatalogSource) + if !ok { + return nil, fmt.Errorf("unexpected object type, expected %T and got %T", (*v1alpha1.CatalogSource)(nil), obj) + } + + // Check the registry server health + healthy, err := view.reconcilerFactory.ReconcilerForSource(catalog).CheckRegistryServer(catalog) + if err != nil { + return + } + + // Create the view + ref, err := view.getReference(catalog) + if err != nil { + return + } + + value = &v1alpha1.SubscriptionCatalogStatus{ + CatalogSourceRef: ref, + LastUpdated: view.now(), + Healthy: healthy, + } + + return +} + +const ( + subCatalogIndex string = "subcatalog" +) + +// subCatalogIndexer returns a set of indices for a given SubscriptionCatalogStatus +func (view *subCatalogView) subCatalogIndexer(obj interface{}) ([]string, error) { + scs, ok := obj.(*v1alpha1.SubscriptionCatalogStatus) + if !ok { + // Can't build indices for this type + return []string{}, nil + } + + if scs.CatalogSourceRef.Namespace == view.namespace { + // The CatalogSource is global, get keys for subscriptions in all watched namespaces + // TODO: No guarantees that subIndex contains cache.NamespaceIndex + return view.subIndexer.IndexKeys(cache.NamespaceIndex, scs.CatalogSourceRef.Namespace) + } + + return view.subIndexer.IndexKeys(cache.NamespaceIndex, scs.CatalogSourceRef.Namespace) +} + +func (view *subCatalogView) Indexers() cache.Indexers { + return cache.Indexers{ + subCatalogIndex: view.subCatalogIndexer, + } +} diff --git a/pkg/lib/index/label.go b/pkg/lib/index/label.go index 52ccd29bea..5d56f3cff2 100644 --- a/pkg/lib/index/label.go +++ b/pkg/lib/index/label.go @@ -13,19 +13,19 @@ const ( MetaLabelIndexFuncKey string = "metalabelindexfunc" ) -// MetaLabelIndexFunc returns indicies from the labels of the given object. +// MetaLabelIndexFunc returns indices from the labels of the given object. func MetaLabelIndexFunc(obj interface{}) ([]string, error) { - indicies := []string{} + indices := []string{} m, err := meta.Accessor(obj) if err != nil { - return indicies, fmt.Errorf("object has no meta: %v", err) + return indices, fmt.Errorf("object has no meta: %v", err) } for k, v := range m.GetLabels() { - indicies = append(indicies, fmt.Sprintf("%s=%s", k, v)) + indices = append(indices, fmt.Sprintf("%s=%s", k, v)) } - return indicies, nil + return indices, nil } // LabelIndexKeys returns the union of indexed cache keys in the given indexers matching the same labels as the given selector From 6931ef809deabe666336dd5634c3f2a8c8ade465 Mon Sep 17 00:00:00 2001 From: njhale Date: Thu, 2 May 2019 00:04:51 -0400 Subject: [PATCH 3/3] feat(substatus): user viewer to set catsrc status on subscriptions --- .../operators/v1alpha1/catalogsource_types.go | 5 +- .../operators/v1alpha1/subscription_types.go | 54 +- .../v1alpha1/zz_generated.deepcopy.go | 29 ++ pkg/controller/operators/catalog/metaview.go | 60 +++ pkg/controller/operators/catalog/operator.go | 309 ++++++----- pkg/controller/operators/catalog/scratch.go | 485 ------------------ .../operators/catalog/subscriptions.go | 219 +++++++- pkg/lib/cache/viewer.go | 305 +++++++++++ pkg/lib/queueinformer/queueinformer.go | 155 ++++-- pkg/lib/queueinformer/resourcequeue.go | 21 + 10 files changed, 949 insertions(+), 693 deletions(-) create mode 100644 pkg/controller/operators/catalog/metaview.go delete mode 100644 pkg/controller/operators/catalog/scratch.go create mode 100644 pkg/lib/cache/viewer.go diff --git a/pkg/api/apis/operators/v1alpha1/catalogsource_types.go b/pkg/api/apis/operators/v1alpha1/catalogsource_types.go index e99806f225..aff7a27557 100644 --- a/pkg/api/apis/operators/v1alpha1/catalogsource_types.go +++ b/pkg/api/apis/operators/v1alpha1/catalogsource_types.go @@ -101,7 +101,10 @@ func (c *CatalogSource) Address() string { if c.Spec.Address != "" { return c.Spec.Address } - return c.Status.RegistryServiceStatus.Address() + if c.Status.RegistryServiceStatus != nil { + return c.Status.RegistryServiceStatus.Address() + } + return "" } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object diff --git a/pkg/api/apis/operators/v1alpha1/subscription_types.go b/pkg/api/apis/operators/v1alpha1/subscription_types.go index f4cfedb869..9199494435 100644 --- a/pkg/api/apis/operators/v1alpha1/subscription_types.go +++ b/pkg/api/apis/operators/v1alpha1/subscription_types.go @@ -101,51 +101,23 @@ type SubscriptionCatalogStatus struct { Healthy bool `json:"healthy"` } -// SetSubscriptionCatalogStatus sets the given SusbcriptionCatalogStatus in a SubscriptionStatus if it doesn't already exist -// or the status has changed and returns true if the status was set; false otherwise. -func (status *SubscriptionStatus) SetSubscriptionCatalogStatus(catalogStatus SubscriptionCatalogStatus) bool { - target := catalogStatus.CatalogSourceRef - if target == nil && target.APIVersion == SchemeGroupVersion.String() && target.Kind == SubscriptionKind { - return false +// SetSubscriptionCatalogStatus sets the SubscriptionStatus' CatalogStatus field as the given slice if it differs +// from the stored value. Returns true if a change was made, false otherwise. +func (status *SubscriptionStatus) SetSubscriptionCatalogStatus(catalogStatus []SubscriptionCatalogStatus) bool { + if len(status.CatalogStatus) != len(catalogStatus) { + status.CatalogStatus = catalogStatus + return true } - // Search for status to replace - for i, cs := range status.CatalogStatus { - ref := cs.CatalogSourceRef - if ref == nil { - continue - } - - if ref.Namespace == target.Namespace && ref.Name == target.Name && ref.UID == target.UID { - if cs.Healthy != catalogStatus.Healthy { - status.CatalogStatus[i] = catalogStatus - return true - } - - return false - } - } + // TODO: dedupe catalogStatus? - status.CatalogStatus = append(status.CatalogStatus, catalogStatus) - return true -} - -// RemoveSubscriptionCatalogStatus removes the SubscriptionCatalogStatus matching the given ObjectReference from a SubscriptionStatus -// and returns true if the status was removed; false otherwise. -func (status *SubscriptionStatus) RemoveSubscriptionCatalogStatus(target *corev1.ObjectReference) bool { - if target == nil && target.APIVersion == SchemeGroupVersion.String() && target.Kind == SubscriptionKind { - return false + set := map[SubscriptionCatalogStatus]struct{}{} + for _, cs := range status.CatalogStatus { + set[cs] = struct{}{} } - - // Search for status to remove - for i, cs := range status.CatalogStatus { - ref := cs.CatalogSourceRef - if ref == nil { - continue - } - - if ref.Namespace == target.Namespace && ref.Name == target.Name && ref.UID == target.UID { - status.CatalogStatus = append(status.CatalogStatus[:i], status.CatalogStatus[i+1:]...) + for _, cs := range catalogStatus { + if _, ok := set[cs]; !ok { + status.CatalogStatus = catalogStatus return true } } diff --git a/pkg/api/apis/operators/v1alpha1/zz_generated.deepcopy.go b/pkg/api/apis/operators/v1alpha1/zz_generated.deepcopy.go index 22f3d810bc..57b256be7c 100644 --- a/pkg/api/apis/operators/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/api/apis/operators/v1alpha1/zz_generated.deepcopy.go @@ -968,6 +968,28 @@ func (in *Subscription) DeepCopyObject() runtime.Object { return nil } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *SubscriptionCatalogStatus) DeepCopyInto(out *SubscriptionCatalogStatus) { + *out = *in + if in.CatalogSourceRef != nil { + in, out := &in.CatalogSourceRef, &out.CatalogSourceRef + *out = new(corev1.ObjectReference) + **out = **in + } + in.LastUpdated.DeepCopyInto(&out.LastUpdated) + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SubscriptionCatalogStatus. +func (in *SubscriptionCatalogStatus) DeepCopy() *SubscriptionCatalogStatus { + if in == nil { + return nil + } + out := new(SubscriptionCatalogStatus) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *SubscriptionList) DeepCopyInto(out *SubscriptionList) { *out = *in @@ -1030,6 +1052,13 @@ func (in *SubscriptionStatus) DeepCopyInto(out *SubscriptionStatus) { *out = new(corev1.ObjectReference) **out = **in } + if in.CatalogStatus != nil { + in, out := &in.CatalogStatus, &out.CatalogStatus + *out = make([]SubscriptionCatalogStatus, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } in.LastUpdated.DeepCopyInto(&out.LastUpdated) return } diff --git a/pkg/controller/operators/catalog/metaview.go b/pkg/controller/operators/catalog/metaview.go new file mode 100644 index 0000000000..068b085585 --- /dev/null +++ b/pkg/controller/operators/catalog/metaview.go @@ -0,0 +1,60 @@ +package catalog + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/tools/cache" + + opcache "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/cache" +) + +// metaViewer is a transparent viewer that can be used as a basis for more complex viewers. +// It holds a set of commonly used utilities. +// TODO: we probably don't want to embed Operator. +type metaViewer struct { + *Operator + now func() metav1.Time + namespace string +} + +var _ opcache.Viewer = &metaViewer{} + +func (viewer *metaViewer) Key(obj interface{}) (key string, err error) { + // Use the most common key func (namespace/name) + // TODO: could we use metaViewer to store anything that implements meta.Interface by default? + return cache.DeletionHandlingMetaNamespaceKeyFunc(obj) +} + +func (viewer *metaViewer) KeyByView(view interface{}) (key string, err error) { + // Use the most common key func (namespace/name) + // TODO: could we use metaViewer to store anything that implements meta.Interface by default? + return cache.DeletionHandlingMetaNamespaceKeyFunc(view) +} + +func (viewer *metaViewer) View(obj interface{}) (view interface{}, err error) { + // Passthrough + view = obj + return +} + +type metaViewerOption func(*metaViewer) + +func withNamespace(namespace string) metaViewerOption { + return func(viewer *metaViewer) { + viewer.namespace = namespace + } +} + +// newMetaViewer returns a new metaViewer. +func newMetaViewer(op *Operator, options ...metaViewerOption) *metaViewer { + viewer := &metaViewer{ + Operator: op, + now: timeNow, + namespace: op.namespace, + } + + for _, option := range options { + option(viewer) + } + + return viewer +} diff --git a/pkg/controller/operators/catalog/operator.go b/pkg/controller/operators/catalog/operator.go index 7f12205ea6..d75afa1e21 100644 --- a/pkg/controller/operators/catalog/operator.go +++ b/pkg/controller/operators/catalog/operator.go @@ -33,6 +33,7 @@ import ( olmerrors "github.com/operator-framework/operator-lifecycle-manager/pkg/controller/errors" "github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/reconciler" "github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver" + opcache "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/cache" "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/operatorlister" "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/ownerutil" "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/queueinformer" @@ -61,17 +62,20 @@ type Operator struct { client versioned.Interface lister operatorlister.OperatorLister namespace string + watchedNamespaces []string sources map[resolver.CatalogKey]resolver.SourceRef sourcesLock sync.RWMutex sourcesLastUpdate metav1.Time resolver resolver.Resolver - subQueue workqueue.RateLimitingInterface - catSrcQueueSet *queueinformer.ResourceQueueSet + catsrcQueueSet *queueinformer.ResourceQueueSet + subQueueSet *queueinformer.ResourceQueueSet namespaceResolveQueue workqueue.RateLimitingInterface reconciler reconciler.RegistryReconcilerFactory + viewIndexerSet *opcache.ViewIndexerSet + subIndexerSet *opcache.IndexerSet } -// NewOperator creates a new Catalog Operator. +// NewOperator returns a configured, unstarted Operator. func NewOperator(kubeconfigPath string, logger *logrus.Logger, wakeupInterval time.Duration, configmapRegistryImage, operatorNamespace string, watchedNamespaces ...string) (*Operator, error) { // Default to watching all namespaces. if watchedNamespaces == nil { @@ -84,99 +88,90 @@ func NewOperator(kubeconfigPath string, logger *logrus.Logger, wakeupInterval ti return nil, err } - // Create an OperatorLister - lister := operatorlister.NewLister() - - // Create an informer for each watched namespace. - ipSharedIndexInformers := []cache.SharedIndexInformer{} - subSharedIndexInformers := []cache.SharedIndexInformer{} - csvSharedIndexInformers := []cache.SharedIndexInformer{} - for _, namespace := range watchedNamespaces { - nsInformerFactory := externalversions.NewSharedInformerFactoryWithOptions(crClient, wakeupInterval, externalversions.WithNamespace(namespace)) - ipSharedIndexInformers = append(ipSharedIndexInformers, nsInformerFactory.Operators().V1alpha1().InstallPlans().Informer()) - subSharedIndexInformers = append(subSharedIndexInformers, nsInformerFactory.Operators().V1alpha1().Subscriptions().Informer()) - csvSharedIndexInformers = append(csvSharedIndexInformers, nsInformerFactory.Operators().V1alpha1().ClusterServiceVersions().Informer()) - - // resolver needs subscription and csv listers - lister.OperatorsV1alpha1().RegisterSubscriptionLister(namespace, nsInformerFactory.Operators().V1alpha1().Subscriptions().Lister()) - lister.OperatorsV1alpha1().RegisterClusterServiceVersionLister(namespace, nsInformerFactory.Operators().V1alpha1().ClusterServiceVersions().Lister()) - lister.OperatorsV1alpha1().RegisterInstallPlanLister(namespace, nsInformerFactory.Operators().V1alpha1().InstallPlans().Lister()) - } - // Create a new queueinformer-based operator. queueOperator, err := queueinformer.NewOperator(kubeconfigPath, logger) if err != nil { return nil, err } - // Allocate the new instance of an Operator. + lister := operatorlister.NewLister() op := &Operator{ - Operator: queueOperator, - catSrcQueueSet: queueinformer.NewEmptyResourceQueueSet(), - client: crClient, - lister: lister, - namespace: operatorNamespace, - sources: make(map[resolver.CatalogKey]resolver.SourceRef), - resolver: resolver.NewOperatorsV1alpha1Resolver(lister), - } - - // Create an informer for each catalog namespace - deleteCatSrc := &cache.ResourceEventHandlerFuncs{ - DeleteFunc: op.handleCatSrcDeletion, - } - for _, namespace := range watchedNamespaces { - nsInformerFactory := externalversions.NewSharedInformerFactoryWithOptions(crClient, wakeupInterval, externalversions.WithNamespace(namespace)) - catsrcInformer := nsInformerFactory.Operators().V1alpha1().CatalogSources() - - // Register queue and QueueInformer - var queueName string - if namespace == corev1.NamespaceAll { - queueName = "catsrc" - } else { - queueName = fmt.Sprintf("%s/catsrc", namespace) - } - catsrcQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), queueName) - op.RegisterQueueInformer(queueinformer.NewInformer(catsrcQueue, catsrcInformer.Informer(), op.syncCatalogSources, deleteCatSrc, queueName, metrics.NewMetricsCatalogSource(op.client), logger)) - op.lister.OperatorsV1alpha1().RegisterCatalogSourceLister(namespace, catsrcInformer.Lister()) - op.catSrcQueueSet.Set(namespace, catsrcQueue) - } - - // Register InstallPlan informers. - ipQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "installplans") - ipQueueInformers := queueinformer.New( - ipQueue, - ipSharedIndexInformers, - op.syncInstallPlans, + Operator: queueOperator, + catsrcQueueSet: queueinformer.NewEmptyResourceQueueSet(), + subQueueSet: queueinformer.NewEmptyResourceQueueSet(), + client: crClient, + lister: lister, + namespace: operatorNamespace, + watchedNamespaces: watchedNamespaces, + sources: make(map[resolver.CatalogKey]resolver.SourceRef), + resolver: resolver.NewOperatorsV1alpha1Resolver(lister), + reconciler: reconciler.NewRegistryReconcilerFactory(lister, queueOperator.OpClient, configmapRegistryImage), + viewIndexerSet: opcache.NewViewIndexerSet(), + subIndexerSet: opcache.NewIndexerSet(), + } + + // Setup global components + namespaceInformer := informers.NewSharedInformerFactory(op.OpClient.KubernetesInterface(), wakeupInterval).Core().V1().Namespaces() + resolvingNamespaceQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "resolver") + namespaceQueueInformer := queueinformer.NewInformer( + resolvingNamespaceQueue, + namespaceInformer.Informer(), + // Namespace sync for resolving subscriptions + op.syncResolvingNamespace, nil, - "installplan", - metrics.NewMetricsInstallPlan(op.client), + "resolver", + metrics.NewMetricsNil(), logger, ) - for _, informer := range ipQueueInformers { - op.RegisterQueueInformer(informer) - } - // Register Subscription informers. - subscriptionQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "subscriptions") - subscriptionQueueInformers := queueinformer.New( - subscriptionQueue, - subSharedIndexInformers, - op.syncSubscriptions, - nil, - "subscription", - metrics.NewMetricsSubscription(op.client), - logger, - ) - op.subQueue = subscriptionQueue - for _, informer := range subscriptionQueueInformers { - op.RegisterQueueInformer(informer) - } + op.RegisterQueueInformer(namespaceQueueInformer) + op.lister.CoreV1().RegisterNamespaceLister(namespaceInformer.Lister()) + op.namespaceResolveQueue = resolvingNamespaceQueue - handleDelete := &cache.ResourceEventHandlerFuncs{ - DeleteFunc: op.handleDeletion, - } - // Set up informers for requeuing catalogs + // Setup namespaced components for _, namespace := range watchedNamespaces { + viewIndexer := opcache.NewViewIndexer() + op.viewIndexerSet.Set(namespace, viewIndexer) + + // Create CR informers + crInformerFactory := externalversions.NewSharedInformerFactoryWithOptions(crClient, wakeupInterval, externalversions.WithNamespace(namespace)) + catsrcInformer := crInformerFactory.Operators().V1alpha1().CatalogSources() + subInformer := crInformerFactory.Operators().V1alpha1().Subscriptions() + ipInformer := crInformerFactory.Operators().V1alpha1().InstallPlans() + csvInformer := crInformerFactory.Operators().V1alpha1().ClusterServiceVersions() + + // Create k8s informers + k8sInformerFactory := informers.NewSharedInformerFactoryWithOptions(op.OpClient.KubernetesInterface(), wakeupInterval, informers.WithNamespace(namespace)) + roleInformer := k8sInformerFactory.Rbac().V1().Roles() + roleBindingInformer := k8sInformerFactory.Rbac().V1().RoleBindings() + serviceAccountInformer := k8sInformerFactory.Core().V1().ServiceAccounts() + serviceInformer := k8sInformerFactory.Core().V1().Services() + podInformer := k8sInformerFactory.Core().V1().Pods() + configMapInformer := k8sInformerFactory.Core().V1().ConfigMaps() + + // Register CR listers + lister.OperatorsV1alpha1().RegisterCatalogSourceLister(namespace, catsrcInformer.Lister()) + lister.OperatorsV1alpha1().RegisterSubscriptionLister(namespace, subInformer.Lister()) + lister.OperatorsV1alpha1().RegisterInstallPlanLister(namespace, ipInformer.Lister()) + lister.OperatorsV1alpha1().RegisterClusterServiceVersionLister(namespace, csvInformer.Lister()) + + // Register k8s listers + op.lister.RbacV1().RegisterRoleLister(namespace, roleInformer.Lister()) + op.lister.RbacV1().RegisterRoleBindingLister(namespace, roleBindingInformer.Lister()) + op.lister.CoreV1().RegisterServiceAccountLister(namespace, serviceAccountInformer.Lister()) + op.lister.CoreV1().RegisterServiceLister(namespace, serviceInformer.Lister()) + op.lister.CoreV1().RegisterPodLister(namespace, podInformer.Lister()) + op.lister.CoreV1().RegisterConfigMapLister(namespace, configMapInformer.Lister()) + + // Create all queues + catsrcQueueName := "catalogsources" + subQueueName := "subscriptions" + subCatQueueName := "subcat" + ipQueueName := "installplans" + catsrcQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), catsrcQueueName) + subQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), subQueueName) + subCatQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), subCatQueueName) + ipQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), ipQueueName) roleQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "role") roleBindingQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "rolebinding") serviceAccountQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "serviceaccount") @@ -184,15 +179,69 @@ func NewOperator(kubeconfigPath string, logger *logrus.Logger, wakeupInterval ti podQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "pod") configmapQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "configmap") - informerFactory := informers.NewSharedInformerFactoryWithOptions(op.OpClient.KubernetesInterface(), wakeupInterval, informers.WithNamespace(namespace)) - roleInformer := informerFactory.Rbac().V1().Roles() - roleBindingInformer := informerFactory.Rbac().V1().RoleBindings() - serviceAccountInformer := informerFactory.Core().V1().ServiceAccounts() - serviceInformer := informerFactory.Core().V1().Services() - podInformer := informerFactory.Core().V1().Pods() - configMapInformer := informerFactory.Core().V1().ConfigMaps() + // Create queue sets + op.catsrcQueueSet.Set(namespace, catsrcQueue) + op.subQueueSet.Set(namespace, subQueue) + + // Set up viewers + mv := newMetaViewer(op, withNamespace(namespace)) + scv := newSubCatViewer(mv) + scvKey := opcache.NewViewerKey(subCatIndexKey, new(v1alpha1.CatalogSource), new(v1alpha1.SubscriptionCatalogStatus)) + viewIndexer.AddViewer(scvKey, scv) + + // Set up indexing and index enqueuing + if err := viewIndexer.AddIndex(subCatIndexKey, opcache.BuildEnqueuingIndex(scv.subCatViewIndex, subCatQueue)); err != nil { + logrus.WithError(err).Warn("failed to setup subscription catalog status view indexer") + return nil, err + } + + // TODO: Create and register queue indexers + subIndexers := map[string]cache.Indexer{namespace: subInformer.Informer().GetIndexer()} + op.subIndexerSet.Set(namespace, subInformer.Informer().GetIndexer()) + subCatQueueIndexerName := "subcat-" + namespace + subCatQueueIndexer := queueinformer.NewQueueIndexer(subCatQueue, subIndexers, scv.setSubCatStatus, subCatQueueIndexerName, logger, metrics.NewMetricsNil()) + op.RegisterQueueIndexer(subCatQueueIndexer) + // Define extra event handlers + handleCatsrcDelete := &cache.ResourceEventHandlerFuncs{ + DeleteFunc: op.handleCatSrcDeletion, + } + handleDelete := &cache.ResourceEventHandlerFuncs{ + DeleteFunc: op.handleDeletion, + } + + // Create and register queue informers queueInformers := []*queueinformer.QueueInformer{ + // CR informers + queueinformer.NewInformer( + catsrcQueue, + catsrcInformer.Informer(), + op.syncCatalogSources, + handleCatsrcDelete, + catsrcQueueName, + metrics.NewMetricsCatalogSource(op.client), + logger, + ).WithViewIndexer(viewIndexer), + queueinformer.NewInformer( + ipQueue, + ipInformer.Informer(), + op.syncInstallPlans, + nil, + ipQueueName, + metrics.NewMetricsInstallPlan(op.client), + logger, + ), + queueinformer.NewInformer( + subQueue, + subInformer.Informer(), + op.syncSubscriptions, + nil, + subQueueName, + metrics.NewMetricsSubscription(op.client), + logger, + ).AdditionallyDistribute(subCatQueue), + + // k8s informers queueinformer.NewInformer(roleQueue, roleInformer.Informer(), op.syncObject, handleDelete, "role", metrics.NewMetricsNil(), logger), queueinformer.NewInformer(roleBindingQueue, roleBindingInformer.Informer(), op.syncObject, handleDelete, "rolebinding", metrics.NewMetricsNil(), logger), queueinformer.NewInformer(serviceAccountQueue, serviceAccountInformer.Informer(), op.syncObject, handleDelete, "serviceaccount", metrics.NewMetricsNil(), logger), @@ -204,36 +253,35 @@ func NewOperator(kubeconfigPath string, logger *logrus.Logger, wakeupInterval ti op.RegisterQueueInformer(q) } - op.lister.RbacV1().RegisterRoleLister(namespace, roleInformer.Lister()) - op.lister.RbacV1().RegisterRoleBindingLister(namespace, roleBindingInformer.Lister()) - op.lister.CoreV1().RegisterServiceAccountLister(namespace, serviceAccountInformer.Lister()) - op.lister.CoreV1().RegisterServiceLister(namespace, serviceInformer.Lister()) - op.lister.CoreV1().RegisterPodLister(namespace, podInformer.Lister()) - op.lister.CoreV1().RegisterConfigMapLister(namespace, configMapInformer.Lister()) } - op.reconciler = reconciler.NewRegistryReconcilerFactory(op.lister, op.OpClient, configmapRegistryImage) - - // Namespace sync for resolving subscriptions - namespaceInformer := informers.NewSharedInformerFactory(op.OpClient.KubernetesInterface(), wakeupInterval).Core().V1().Namespaces() - resolvingNamespaceQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "resolver") - namespaceQueueInformer := queueinformer.NewInformer( - resolvingNamespaceQueue, - namespaceInformer.Informer(), - op.syncResolvingNamespace, - nil, - "resolver", - metrics.NewMetricsNil(), - logger, - ) - - op.RegisterQueueInformer(namespaceQueueInformer) - op.lister.CoreV1().RegisterNamespaceLister(namespaceInformer.Lister()) - op.namespaceResolveQueue = resolvingNamespaceQueue - // Register CSV informers to fill cache - for _, informer := range csvSharedIndexInformers { - op.RegisterInformer(informer) - } + // // Register the QueueInformer for SubscriptionStatus.CatalogStatus + // subCatsrcStatusQueueName := fmt.Sprintf("%s/subscription/status", catsrcQueueName) + // subCatsrcStatusQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), subCatsrcStatusQueueName) + // catsrcQueueInformers = append(catsrcQueueInformers, queueinformer.New( + // subCatsrcStatusQueue, + // catsrcSharedIndexInformers, + // op.syncSubscriptionCatalogStatus, + // nil, + // subCatsrcStatusQueueName, + // metrics.NewMetricsCatalogSource(op.client), + // logger, + // )...) + + // TODO: add subscription queueindexer for updating subscription status + + // Register the QueueInformer for SubscriptionStatus.CatalogStatus + // subCleanupQueueName := fmt.Sprintf("%s/catalogstatus/cleanup", subQueueName) + // subCleanupQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), subCleanupQueueName) + // subQueueInformers = append(subQueueInformers, queueinformer.New( + // subCleanupQueue, + // subSharedIndexInformers, + // op.cleanupSubscriptionCatalogStatus, + // nil, + // subCleanupQueueName, + // metrics.NewMetricsSubscription(op.client), + // logger, + // )...) return op, nil } @@ -273,7 +321,7 @@ func (o *Operator) syncObject(obj interface{}) (syncError error) { defer o.sourcesLock.RUnlock() if _, ok := o.sources[sourceKey]; ok { logger.Debug("requeueing owner CatalogSource") - if err := o.catSrcQueueSet.Requeue(owner.Name, metaObj.GetNamespace()); err != nil { + if err := o.catsrcQueueSet.Requeue(owner.Name, metaObj.GetNamespace()); err != nil { logger.Warn(err.Error()) } } @@ -324,7 +372,7 @@ func (o *Operator) handleDeletion(obj interface{}) { } // Requeue CatalogSource - if err := o.catSrcQueueSet.Requeue(catsrc.GetName(), catsrc.GetNamespace()); err != nil { + if err := o.catsrcQueueSet.Requeue(catsrc.GetName(), catsrc.GetNamespace()); err != nil { logger.WithError(err).Warn("error requeuing owner catalogsource") } } @@ -354,7 +402,7 @@ func (o *Operator) handleCatSrcDeletion(obj interface{}) { }() o.Log.WithField("source", sourceKey).Info("removed client for deleted catalogsource") - if err := o.catSrcQueueSet.Remove(sourceKey.Name, sourceKey.Namespace); err != nil { + if err := o.catsrcQueueSet.Remove(sourceKey.Name, sourceKey.Namespace); err != nil { o.Log.WithError(err) } } @@ -496,7 +544,7 @@ func (o *Operator) syncCatalogSources(obj interface{}) (syncError error) { if !client.Conn.WaitForStateChange(ctx, connectivity.TransientFailure) { logger.Debug("state didn't change, trigger reconnect. this may happen when cached dns is wrong.") delete(o.sources, sourceKey) - if err := o.catSrcQueueSet.Requeue(sourceKey.Name, sourceKey.Namespace); err != nil { + if err := o.catsrcQueueSet.Requeue(sourceKey.Name, sourceKey.Namespace); err != nil { logger.WithError(err).Debug("error requeueing") } return @@ -549,7 +597,9 @@ func (o *Operator) syncDependentSubscriptions(logger *logrus.Entry, catalogSourc } if sub.Spec.CatalogSource == catalogSource && catalogNamespace == catalogSourceNamespace { logger.Debug("requeueing subscription because catalog changed") - o.requeueSubscription(sub.GetName(), sub.GetNamespace()) + if err := o.subQueueSet.RequeueRateLimited(sub.GetName(), sub.GetNamespace()); err != nil { + logger.WithError(err).Warn("failed to requeue subscription") + } } } } @@ -681,7 +731,7 @@ func (o *Operator) ensureResolverSources(logger *logrus.Entry, namespace string) if ref.LastHealthy.IsZero() { logger = logger.WithField("source", k) logger.Debug("omitting source, hasn't yet become healthy") - if err := o.catSrcQueueSet.Requeue(k.Name, k.Namespace); err != nil { + if err := o.catsrcQueueSet.Requeue(k.Name, k.Namespace); err != nil { logger.Warn("error requeueing") } continue @@ -907,13 +957,6 @@ func (o *Operator) createInstallPlan(namespace string, subs []*v1alpha1.Subscrip return operators.GetReference(res) } -func (o *Operator) requeueSubscription(name, namespace string) { - // we can build the key directly, will need to change if queue uses different key scheme - key := fmt.Sprintf("%s/%s", namespace, name) - o.subQueue.Add(key) - return -} - func (o *Operator) syncInstallPlans(obj interface{}) (syncError error) { plan, ok := obj.(*v1alpha1.InstallPlan) if !ok { @@ -950,7 +993,9 @@ func (o *Operator) syncInstallPlans(obj interface{}) (syncError error) { if ownerutil.IsOwnedByKind(outInstallPlan, v1alpha1.SubscriptionKind) { oref := ownerutil.GetOwnerByKind(outInstallPlan, v1alpha1.SubscriptionKind) logger.WithField("owner", oref).Debug("requeueing installplan owner") - o.requeueSubscription(oref.Name, outInstallPlan.GetNamespace()) + if err := o.subQueueSet.RequeueRateLimited(oref.Name, outInstallPlan.GetNamespace()); err != nil { + logger.WithError(err).Warn("failed to requeue installplan owner subscription") + } } // Update InstallPlan with status of transition. Log errors if we can't write them to the status. diff --git a/pkg/controller/operators/catalog/scratch.go b/pkg/controller/operators/catalog/scratch.go deleted file mode 100644 index af6ddc1097..0000000000 --- a/pkg/controller/operators/catalog/scratch.go +++ /dev/null @@ -1,485 +0,0 @@ -package catalog - -import ( - "fmt" - "reflect" - "sync" - "time" - - "github.com/sirupsen/logrus" - corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - // "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/runtime" - utilerrors "k8s.io/apimachinery/pkg/util/errors" - "k8s.io/client-go/informers" - "k8s.io/client-go/tools/cache" - "k8s.io/client-go/util/workqueue" - - "github.com/operator-framework/operator-lifecycle-manager/pkg/api/apis/operators" - "github.com/operator-framework/operator-lifecycle-manager/pkg/api/apis/operators/v1alpha1" - "github.com/operator-framework/operator-lifecycle-manager/pkg/api/client" - "github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/clientset/versioned" - exv "github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/informers/externalversions" - "github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/reconciler" - "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/operatorclient" - "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/operatorlister" - "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/queueinformer" - "github.com/operator-framework/operator-lifecycle-manager/pkg/metrics" -) - -// Q: What do we want to do? -// A: Register a set of views, relations, and actions to be triggered when an object changes. -// -// Q: What kind of objects? -// A: Arbitrary -// -// Q: What should produce object change events? -// A: A cache being updated (objects, indices) -// -// Q: How do we inform a set of views of an event? -// A: A view updates: -// - View store updates -// - Indices update (calling index funcs) -// - Related views should be enqueued (by key) in their workqueues -// - Related actions should be enqueued (by key) in their workqueues - -type View interface { - Key(value interface{}) (key string, err error) - Value(obj interface{}) (value interface{}, err error) - Indexers() cache.Indexers -} - -// Views are a set of ViewFuncs keyed by their resource type -type Views map[reflect.Type]View - -// Viewer is a storage interface that supports building alternate views of stored data. -type Viewer interface { - cache.Indexer - - // View gets the value for a view type and given object. - View(viewType reflect.Type, obj interface{}) (value interface{}, err error) - - // AddViews adds more views to this store. If you call this after you already have data - // in the store, the results are undefined. - AddViews(views ...View) error -} - -type viewCache struct { - cache.Indexer - lock sync.RWMutex - views Views - defaultKey cache.KeyFunc -} - -func (c *viewCache) View(viewType reflect.Type, obj interface{}) (value interface{}, err error) { - c.lock.RLock() - defer c.lock.RUnlock() - - view, ok := c.views[viewType] - if !ok { - return nil, fmt.Errorf("view %s not found", viewType.Name()) - } - - value, err = view.Value(obj) - if err != nil { - return nil, err - } - - key, err := view.Key(value) - if err != nil { - return nil, err - } - - stored, _, err := c.GetByKey(key) - - return stored, err -} - -func (c *viewCache) AddViews(views ...View) error { - c.lock.Lock() - defer c.lock.Unlock() - - if len(c.views) > 0 { - return fmt.Errorf("cannot add views to running viewer") - } - - if c.views == nil { - c.views = Views{} - } - - for _, view := range views { - c.views[reflect.TypeOf(view)] = view - } - - return nil -} - -// viewKey invokes the key function matching the view object type. -// This allows a store expecting a single key function to support multiple types. -func (c *viewCache) viewKey(value interface{}) (string, error) { - // Check for a view that matches the dynamic type. - if view, ok := c.views[reflect.TypeOf(value)]; ok { - return view.Key(value) - } - - // TODO: have a default key function if a view doesn't exist? This could be useful for storing runtime.Objects in the same cache. - return "", fmt.Errorf("view type %T unmanaged", value) -} - -type modifierFunc func(obj interface{}) error - -func (c *viewCache) modifyViews(obj interface{}, modify modifierFunc) error { - c.lock.RLock() - defer c.lock.RUnlock() - - errs := []error{} - for _, view := range c.views { - value, err := view.Value(obj) - if err != nil { - errs = append(errs, err) - continue - } - - if err := modify(value); err != nil { - errs = append(errs, err) - } - } - - // TODO: run indexers on unmodified object? - // TODO: modify object as is? - - return utilerrors.NewAggregate(errs) -} - -// Add sets an item in the cache. -func (c *viewCache) Add(obj interface{}) error { - return c.modifyViews(obj, c.Indexer.Add) -} - -// Update sets an item in the cache to its updated state. -func (c *viewCache) Update(obj interface{}) error { - return c.modifyViews(obj, c.Indexer.Update) -} - -// Delete removes an item from the cache. -func (c *viewCache) Delete(obj interface{}) error { - return c.modifyViews(obj, c.Indexer.Delete) -} - -// NewViewer returns a new Viewer containing all of the given views. -func NewViewer(views Views) Viewer { - c := &viewCache{views: views} - indexers := cache.Indexers{} - for _, view := range views { - for name, index := range view.Indexers() { - indexers[name] = index - } - } - c.Indexer = cache.NewIndexer(c.viewKey, indexers) - return c -} - -type viewOperator struct { - *queueinformer.Operator - logger *logrus.Logger - resyncPeriod time.Duration - kubeconfig string - namespaces []string - client versioned.Interface - checker reconciler.RegistryChecker - viewOptions []viewOption - viewer Viewer -} - -type OperatorOption func(*viewOperator) - -func NewViewOperator(configmapRegistryImage string, options ...OperatorOption) (*viewOperator, error) { - // Set defaults - op := &viewOperator{ - logger: logrus.New(), - resyncPeriod: 15 * time.Minute, - namespaces: []string{""}, - viewOptions: []viewOption{}, - } - - // Apply all options - for _, option := range options { - option(op) - } - - // Set additional defaults if not set by options - if op.client == nil { - c, err := client.NewClient("") - if err != nil { - return nil, err - } - op.client = c - } - - if op.OpClient == nil { - op.OpClient = operatorclient.NewClientFromConfig("", op.logger) - } - - if op.Operator == nil { - queueinformer.NewOperatorFromClient(op.OpClient, op.logger) - } - - if op.viewer == nil { - op.viewer = NewViewer(Views{}) - } - - // TODO: View method for adding notification recipients - mapping view to queue - // TODO: Build index functions for notification recipients. - - // TODO: Notify - Lists viewer indexes by index key and enqueues them in the appropriate workqueue. - // TODO: Act - Updates the cluster in some way using the views - - // Create an OperatorLister - lister := operatorlister.NewLister() - - // Setup Informer - namespace := metav1.NamespaceAll - crFactory := exv.NewSharedInformerFactoryWithOptions(op.client, op.resyncPeriod, exv.WithNamespace(namespace)) - catsrcInformer := crFactory.Operators().V1alpha1().CatalogSources() - subInformer := crFactory.Operators().V1alpha1().Subscriptions() - - // Setup View - scv := NewSubCatalogView(configmapRegistryImage, append(op.viewOptions, withSubIndexer(subInformer.Informer().GetIndexer()))...) - op.viewer.AddViews(scv) - - queueName := "catsrc/" + namespace - catsrcQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), queueName) - - // Create an informer for each catalog namespace - deleteCatalog := &cache.ResourceEventHandlerFuncs{ - DeleteFunc: func(obj interface{}) { - catsrc, ok := obj.(*v1alpha1.CatalogSource) - if !ok { - logrus.Warnf("incorrect delete type %T, skipping catalogsource view removal", obj) - return - } - logger := op.logger.WithFields(logrus.Fields{ - "catsrc": catsrc.GetName(), - "namespace": catsrc.GetNamespace(), - }) - - if err := op.viewer.Delete(obj); err != nil { - logger.WithError(err).Warn("failed to remove views") - return - } - - logger.Debug("views removed") - }, - } - - // TODO: Use MetaView as a mechanism to notify CatalogSource changes - // Pod create pod queue informers - factory := informers.NewSharedInformerFactoryWithOptions(op.OpClient.KubernetesInterface(), op.resyncPeriod, informers.WithNamespace(namespace)) - podInformer := factory.Core().V1().Pods() - lister.CoreV1().RegisterPodLister(namespace, podInformer.Lister()) - op.RegisterQueueInformer( - queueinformer.NewInformer( - catsrcQueue, - catsrcInformer.Informer(), - func(obj interface{}) (syncErr error) { - // TODO: implement (View -> Notify) -> Act stages. - // View stage: generates all views - collect aggregate error - // Notify: notify all interested parties by looking at the indexes (only if they have changed) - collect aggregate error - // Act stage: uses views to act on the cluster - collect aggregate error - // View and Notify should be synchronous - // Actions can execute in parallel to (View and Notify) and possibly to themselves - // Preferrably these stages are encapsulated by a QueueInformer wrapper... ViewInformer? - - // Generate all views - var errs []error - if err := op.viewer.Add(obj); err != nil { - errs = append(errs, err) - } - - // Act on views - // TODO: A few ways we could associate views with actions: - // * A view _has an_ action - a direct function call (e.g. view.Act()) - // * A view _informs_ an action - a key is enqueued on an action queue when a view is updated - // * How do we decide which actions to enqueue? - // * How do we order enqueuing? - // * Can some actions be orthogonal? - // * Should the relation be reversed - action relates to a set of views)? - // * Is there a pre-existing model we can use for this - petri-net? - - // What would this look like with a petri-net: - // * Views are places - // * - - syncErr = utilerrors.NewAggregate(errs) - return - }, - deleteCatalog, - queueName, - metrics.NewMetricsNil(), - op.logger)) - - return op, nil -} - -// Define - -// Define views - -// metaView is a transparent view that can be used as a basis for more complex views. -// It holds a set of commonly used utilities. -// TODO: should this just be the Operator itself? -type metaView struct { - logger *logrus.Logger - namespace string - opClient operatorclient.ClientInterface - lister operatorlister.OperatorLister - getReference func(runtime.Object) (*corev1.ObjectReference, error) - now func() metav1.Time -} - -func (view *metaView) Key(value interface{}) (key string, err error) { - // Use the most common key func (namespace/name) - // TODO: could we use metaView to store anything that implements meta.Interface by default? - return cache.DeletionHandlingMetaNamespaceKeyFunc(value) -} - -func (view *metaView) Value(obj interface{}) (value interface{}, err error) { - // Passthrough - value = obj - return -} - -func (view *metaView) Indexers() cache.Indexers { - // Use the most common indexer (namespace) - return cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc} -} - -type viewOption func(View) - -func applyViewOptions(view View, options ...viewOption) { - for _, option := range options { - option(view) - } -} - -// NewMetaView returns a new metaView with the given options applied. -func NewMetaView(options ...viewOption) *metaView { - // Set defaults - view := &metaView{ - lister: operatorlister.NewLister(), - getReference: operators.GetReference, - now: timeNow, - } - - applyViewOptions(view, options...) - - // Set additional defaults if not updated by options - if view.opClient == nil { - view.opClient = operatorclient.NewClientFromConfig("", view.logger) - } - - return view -} - -// subCatalogView implements the SubscriptionCatalogStatus view over CatalogSources. -type subCatalogView struct { - *metaView - subIndexer cache.Indexer - reconcilerFactory reconciler.RegistryReconcilerFactory -} - -func withSubIndexer(subIndexer cache.Indexer) viewOption { - return func(view View) { - if scv, ok := view.(*subCatalogView); ok { - scv.subIndexer = subIndexer - } - } -} - -func NewSubCatalogView(configmapRegistryImage string, options ...viewOption) *subCatalogView { - // Set defaults - view := &subCatalogView{ - metaView: NewMetaView(), - // TODO: Missing subCatalogIndexer can result in bad configuration - } - - applyViewOptions(view, options...) - - // Set additional defaults if not updated by options - if view.reconcilerFactory == nil { - view.reconcilerFactory = reconciler.NewRegistryReconcilerFactory(view.lister, view.opClient, configmapRegistryImage) - } - - return view -} - -const ( - subCatalogPrefix string = "subcatalogstatus" - subCatalogKey string = subCatalogPrefix + "/%s/%s" -) - -func (view *subCatalogView) Key(value interface{}) (key string, err error) { - scs, ok := value.(*v1alpha1.SubscriptionCatalogStatus) - if !ok { - err = fmt.Errorf("unexpected view value type %T, expected %T", value, (*v1alpha1.SubscriptionCatalogStatus)(nil)) - return - } - - key = fmt.Sprintf(subCatalogKey, scs.CatalogSourceRef.Name, scs.CatalogSourceRef.Namespace) - return -} - -func (view *subCatalogView) Value(obj interface{}) (value interface{}, err error) { - catalog, ok := obj.(*v1alpha1.CatalogSource) - if !ok { - return nil, fmt.Errorf("unexpected object type, expected %T and got %T", (*v1alpha1.CatalogSource)(nil), obj) - } - - // Check the registry server health - healthy, err := view.reconcilerFactory.ReconcilerForSource(catalog).CheckRegistryServer(catalog) - if err != nil { - return - } - - // Create the view - ref, err := view.getReference(catalog) - if err != nil { - return - } - - value = &v1alpha1.SubscriptionCatalogStatus{ - CatalogSourceRef: ref, - LastUpdated: view.now(), - Healthy: healthy, - } - - return -} - -const ( - subCatalogIndex string = "subcatalog" -) - -// subCatalogIndexer returns a set of indices for a given SubscriptionCatalogStatus -func (view *subCatalogView) subCatalogIndexer(obj interface{}) ([]string, error) { - scs, ok := obj.(*v1alpha1.SubscriptionCatalogStatus) - if !ok { - // Can't build indices for this type - return []string{}, nil - } - - if scs.CatalogSourceRef.Namespace == view.namespace { - // The CatalogSource is global, get keys for subscriptions in all watched namespaces - // TODO: No guarantees that subIndex contains cache.NamespaceIndex - return view.subIndexer.IndexKeys(cache.NamespaceIndex, scs.CatalogSourceRef.Namespace) - } - - return view.subIndexer.IndexKeys(cache.NamespaceIndex, scs.CatalogSourceRef.Namespace) -} - -func (view *subCatalogView) Indexers() cache.Indexers { - return cache.Indexers{ - subCatalogIndex: view.subCatalogIndexer, - } -} diff --git a/pkg/controller/operators/catalog/subscriptions.go b/pkg/controller/operators/catalog/subscriptions.go index 8cea6b8be0..af4b32fe3c 100644 --- a/pkg/controller/operators/catalog/subscriptions.go +++ b/pkg/controller/operators/catalog/subscriptions.go @@ -2,8 +2,17 @@ package catalog import ( "errors" + "fmt" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/client-go/tools/cache" + + "github.com/operator-framework/operator-lifecycle-manager/pkg/api/apis/operators" "github.com/operator-framework/operator-lifecycle-manager/pkg/api/apis/operators/v1alpha1" + "k8s.io/apimachinery/pkg/labels" ) var ( @@ -26,7 +35,7 @@ func labelsForSubscription(sub *v1alpha1.Subscription) map[string]string { } } -// TODO remove this once UI no longer needs them +// TODO: remove this once UI no longer needs them func legacyLabelsForSubscription(sub *v1alpha1.Subscription) map[string]string { return map[string]string{ "alm-package": sub.Spec.Package, @@ -49,3 +58,211 @@ func ensureLabels(sub *v1alpha1.Subscription) *v1alpha1.Subscription { sub.SetLabels(labels) return sub } + +// --------------------------------------------------------------------------------------------------------- +// Views needed by Subscriptions + +// subCatViewer implements the SubscriptionCatalogStatus view over CatalogSources. +type subCatViewer struct { + *metaViewer + getReference func(obj runtime.Object) (*corev1.ObjectReference, error) + namespace string +} + +type subCatViewerOption func(*subCatViewer) + +func withMetaViewerOptions(options ...metaViewerOption) subCatViewerOption { + return func(viewer *subCatViewer) { + for _, option := range options { + option(viewer.metaViewer) + } + } +} + +// newSubCatViewer returns a Viewer that produces SubscriptionCatalogStatues from CatalogSources. +func newSubCatViewer(mv *metaViewer, options ...subCatViewerOption) *subCatViewer { + // Set defaults + viewer := &subCatViewer{ + metaViewer: mv, + getReference: operators.GetReference, + } + + for _, option := range options { + option(viewer) + } + + return viewer +} + +const ( + subCatPrefix string = "subcatalogstatus" + subCatKeyTemplate string = subCatPrefix + "/%s/%s" +) + +func (viewer *subCatViewer) Key(obj interface{}) (key string, err error) { + catalog, ok := obj.(*v1alpha1.CatalogSource) + if !ok { + err = fmt.Errorf("unexpected object value type %T, expected %T", obj, new(v1alpha1.CatalogSource)) + return + } + + key = fmt.Sprintf(subCatKeyTemplate, catalog.GetNamespace(), catalog.GetName()) + return +} + +func (viewer *subCatViewer) KeyByView(view interface{}) (key string, err error) { + scs, ok := view.(*v1alpha1.SubscriptionCatalogStatus) + if !ok { + err = fmt.Errorf("unexpected view value type %T, expected %T", view, new(v1alpha1.SubscriptionCatalogStatus)) + return + } + + key = fmt.Sprintf(subCatKeyTemplate, scs.CatalogSourceRef.Namespace, scs.CatalogSourceRef.Name) + return +} + +func (viewer *subCatViewer) View(obj interface{}) (view interface{}, err error) { + catalog, ok := obj.(*v1alpha1.CatalogSource) + if !ok { + return nil, fmt.Errorf("unexpected object type, expected %T and got %T", new(v1alpha1.CatalogSource), obj) + } + + // Check the registry server health + healthy, err := viewer.reconciler.ReconcilerForSource(catalog).CheckRegistryServer(catalog) + if err != nil { + return + } + + // Create the view + ref, err := viewer.getReference(catalog) + if err != nil { + return + } + + view = &v1alpha1.SubscriptionCatalogStatus{ + CatalogSourceRef: ref, + LastUpdated: viewer.now(), + Healthy: healthy, + } + + return +} + +const ( + subCatIndexKey string = "subcatalog" +) + +// subCatViewIndex returns a set of indices for a given SubscriptionCatalogStatus. +func (viewer *subCatViewer) subCatViewIndex(view interface{}) ([]string, error) { + scs, ok := view.(*v1alpha1.SubscriptionCatalogStatus) + if !ok { + // Can't build indices for this type. Fail silently. + return []string{}, nil + } + + if scs.CatalogSourceRef.Namespace == viewer.Operator.namespace { + // The CatalogSource is global, get keys for subscriptions in all watched namespaces + namespaces := viewer.watchedNamespaces + if len(namespaces) == 1 && namespaces[0] == metav1.NamespaceAll { + // Need to get all namespace names + nsList, err := viewer.lister.CoreV1().NamespaceLister().List(labels.Everything()) + if err != nil { + return nil, err + } + namespaces = make([]string, len(nsList)) + for i, ns := range nsList { + namespaces[i] = ns.GetName() + } + } + + keySet := sets.String{} + for _, namespace := range namespaces { + // TODO: namespace is probably metav1.NamespaceAll, in which case I don't think it will be indexed propery in other indexers. + indexer := viewer.subIndexerSet.Get(namespace) + if indexer == nil { + return nil, fmt.Errorf("no subscription indexer found for namespace %s", scs.CatalogSourceRef.Namespace) + } + + keys, err := indexer.IndexKeys(cache.NamespaceIndex, namespace) + if err != nil { + return nil, err + } + viewer.Log.WithField("subscription-namespace", namespace).Debugf("keys: %v", keys) + + keySet.Insert(keys...) + } + // panic("YARP") + + return keySet.List(), nil + } + + indexer := viewer.subIndexerSet.Get(scs.CatalogSourceRef.Namespace) + if indexer == nil { + return nil, fmt.Errorf("no subscription indexer found for namespace %s", scs.CatalogSourceRef.Namespace) + } + + return indexer.IndexKeys(cache.NamespaceIndex, scs.CatalogSourceRef.Namespace) +} + +// setSubCatStatus sets the SubscriptionCatalogStatus field on the given Subscription using the SubscriptionCatalogView. +func (viewer *subCatViewer) setSubCatStatus(obj interface{}) error { + sub, ok := obj.(*v1alpha1.Subscription) + if !ok { + return fmt.Errorf("casting Subscription failed") + } + + // Get views from catalogs in the global namespace + viewIndexer := viewer.viewIndexerSet.Get(viewer.Operator.namespace) + if viewIndexer == nil { + // TODO: panic here? + return fmt.Errorf("global namespace indexer nil") + } + + // Generate the Subscription key + indexKey := fmt.Sprintf("%s/%s", sub.GetNamespace(), sub.GetName()) + + var subCatStatus []v1alpha1.SubscriptionCatalogStatus + objs, err := viewIndexer.ByIndex(subCatIndexKey, indexKey) + if err != nil { + return err + } + for _, o := range objs { + if scs, ok := o.(*v1alpha1.SubscriptionCatalogStatus); ok { + subCatStatus = append(subCatStatus, *scs) + } else { + panic(fmt.Sprintf("obj %v not of type %T", o, new(v1alpha1.SubscriptionCatalogStatus))) + } + } + + if sub.GetNamespace() != viewer.Operator.namespace || viewer.watchedNamespaces[0] != metav1.NamespaceAll { + // Get views from the Subscription namespace + viewIndexer = viewer.viewIndexerSet.Get(sub.GetNamespace()) + if viewIndexer == nil { + // TODO: panic here? + return fmt.Errorf("%s indexer nil", sub.GetNamespace()) + } + + objs, err = viewIndexer.ByIndex(subCatIndexKey, indexKey) + if err != nil { + return err + } + for _, o := range objs { + if scs, ok := o.(*v1alpha1.SubscriptionCatalogStatus); ok { + subCatStatus = append(subCatStatus, *scs) + } else { + panic(fmt.Sprintf("obj %v not of type %T", o, new(v1alpha1.SubscriptionCatalogStatus))) + } + } + } + + // Update the catalog status if a change has been made + if sub.Status.SetSubscriptionCatalogStatus(subCatStatus) { + sub.Status.LastUpdated = viewer.now() + _, err := viewer.client.OperatorsV1alpha1().Subscriptions(sub.GetNamespace()).UpdateStatus(sub) + if err != nil { + return err + } + } + + return nil +} diff --git a/pkg/lib/cache/viewer.go b/pkg/lib/cache/viewer.go new file mode 100644 index 0000000000..7e645a3fc5 --- /dev/null +++ b/pkg/lib/cache/viewer.go @@ -0,0 +1,305 @@ +package cache + +import ( + "fmt" + "reflect" + "sync" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + utilerrors "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" +) + +// Viewer is a type that knows how to create and key an alternate view of a given resource. +type Viewer interface { + // Key returns the resulting view key for the given object. + Key(obj interface{}) (key string, err error) + + // KeyByView returns the view key for the given view object. + KeyByView(view interface{}) (key string, err error) + + // View returns the view for the given object. + View(obj interface{}) (value interface{}, err error) +} + +// ViewerKey relates a Viewer name to the type of resource it transforms. +type ViewerKey struct { + // Name is the Viewer's name. + Name string + + // Transforms is the type of resource the Viewer can transform. + Transforms reflect.Type + + // Into is the view type the Viewer produces. + Into reflect.Type +} + +// NewViewerKey returns a ViewerKey with the given name and reflect.Type of transforms. +func NewViewerKey(name string, transforms interface{}, into interface{}) ViewerKey { + return ViewerKey{ + Name: name, + Transforms: reflect.TypeOf(transforms), + Into: reflect.TypeOf(into), + } +} + +// Viewers maps ViewerKeys to a Viewers. +type Viewers map[ViewerKey]Viewer + +// ViewIndexer is a storage interface that supports building and indexing alternate views of given data. +type ViewIndexer interface { + cache.Indexer + + // View returns the stored view of the given object generated by the viewer of the given key if one exists. + View(viewerKey ViewerKey, obj interface{}) (value interface{}, err error) + + // AddViewer adds a Viewer to the ViewIndexer with the given key. The results of calling this after data has been added to the indexer are undefined. + AddViewer(viewerKey ViewerKey, viewer Viewer) error + + // AddViewers adds the given set of Viewers to the ViewIndexer. If you call this after you already have data in the indexer, the results are undefined. + AddViewers(viewers Viewers) error + + // AddIndex adds an index func to the indexer with the given name. + AddIndex(name string, indexFunc cache.IndexFunc) error + + // TODO: Include Add, Update, Delete methods that can specify a single view to use? +} + +// ViewIndexerSet provides thread-safe methods for storing and retrieving ViewIndexers. +type ViewIndexerSet struct { + lock sync.RWMutex + viewIndexerSet map[string]ViewIndexer +} + +// Get returns the ViewIndexer associated with the given namespace. +func (v *ViewIndexerSet) Get(namespace string) ViewIndexer { + v.lock.RLock() + defer v.lock.RUnlock() + + if viewIndexer, ok := v.viewIndexerSet[metav1.NamespaceAll]; ok { + return viewIndexer + } + + return v.viewIndexerSet[namespace] +} + +// Set sets the given ViewIndexer at the given namespace. +func (v *ViewIndexerSet) Set(namespace string, viewIndexer ViewIndexer) { + v.lock.Lock() + defer v.lock.Unlock() + + v.viewIndexerSet[namespace] = viewIndexer +} + +// NewViewIndexerSet returns a newly initialized +func NewViewIndexerSet() *ViewIndexerSet { + return &ViewIndexerSet{viewIndexerSet: map[string]ViewIndexer{}} +} + +// IndexerSet provides thread-safe methods for storing and retrieving . +type IndexerSet struct { + lock sync.RWMutex + indexerSet map[string]cache.Indexer +} + +// Get returns the Indexer associated with the given namespace. +func (i *IndexerSet) Get(namespace string) cache.Indexer { + i.lock.RLock() + defer i.lock.RUnlock() + + if indexer, ok := i.indexerSet[metav1.NamespaceAll]; ok { + return indexer + } + + return i.indexerSet[namespace] +} + +// Set sets the given Indexer at the given namespace. +func (i *IndexerSet) Set(namespace string, indexer cache.Indexer) { + i.lock.Lock() + defer i.lock.Unlock() + + i.indexerSet[namespace] = indexer +} + +// NewIndexerSet returns a newly initialized +func NewIndexerSet() *IndexerSet { + return &IndexerSet{indexerSet: map[string]cache.Indexer{}} +} + +// ------------------------------------------------------------------------------------------------------------------ + +type viewCache struct { + cache.Indexer + + lock sync.RWMutex + viewers Viewers + + // TODO: use key pointers internally if it will reduce overhead. + // viewerTypeToKey maps ViewerTypes to existing ViewerKeys. + // Used to prevent the same viewer from being registered multiple times. + viewerTypeToKey map[reflect.Type]ViewerKey + + // transformsToKeys maps transform types to sets of ViewerKeys. + // Used to quickly look up the set of applicable views for an added resource. + transformsToKeys map[reflect.Type][]ViewerKey + + // intoToKey maps a view type to the key of the viewer that produces it. + intoToKey map[reflect.Type]ViewerKey +} + +var _ ViewIndexer = &viewCache{} + +// NewViewIndexer returns a zeroed ViewIndexer. +func NewViewIndexer() ViewIndexer { + c := &viewCache{ + viewers: Viewers{}, + viewerTypeToKey: map[reflect.Type]ViewerKey{}, + transformsToKeys: map[reflect.Type][]ViewerKey{}, + intoToKey: map[reflect.Type]ViewerKey{}, + } + c.Indexer = cache.NewIndexer(c.sharedViewKey, cache.Indexers{}) + return c +} + +func (c *viewCache) View(viewerKey ViewerKey, obj interface{}) (value interface{}, err error) { + c.lock.RLock() + defer c.lock.RUnlock() + + viewer, ok := c.viewers[viewerKey] + if !ok { + return nil, fmt.Errorf("view %v not found", viewerKey) + } + + key, err := viewer.Key(obj) + if err != nil { + return nil, err + } + + stored, _, err := c.GetByKey(key) + + return stored, err +} + +func (c *viewCache) AddViewer(viewerKey ViewerKey, viewer Viewer) error { + return c.AddViewers(Viewers{viewerKey: viewer}) +} + +func (c *viewCache) AddViewers(viewers Viewers) error { + c.lock.Lock() + defer c.lock.Unlock() + + if len(c.viewers) > 0 { + return fmt.Errorf("cannot add views to running viewer") + } + + for vk, viewer := range viewers { + vt := reflect.TypeOf(viewer) + if vk, ok := c.viewerTypeToKey[vt]; ok { + return fmt.Errorf("viewer of type %T already registered with key %v", viewer, vk) + } + + c.viewers[vk] = viewer + c.viewerTypeToKey[vt] = vk + c.intoToKey[vk.Into] = vk + c.transformsToKeys[vk.Transforms] = append(c.transformsToKeys[vk.Transforms], vk) + } + + return nil +} + +func (c *viewCache) AddIndex(name string, indexFunc cache.IndexFunc) error { + return c.Indexer.AddIndexers(cache.Indexers{name: indexFunc}) +} + +// sharedViewKey invokes the key function matching the viewer type of the given view. +// This allows a store expecting a single key function to support multiple types. +func (c *viewCache) sharedViewKey(view interface{}) (string, error) { + // Find a viewer that matches the dynamic type + if vk, ok := c.intoToKey[reflect.TypeOf(view)]; ok { + if viewer, ok := c.viewers[vk]; ok { + return viewer.KeyByView(view) + } + } + + // TODO: have a default key function if a view doesn't exist? This could be useful for storing runtime.Objects in the same cache. + return "", fmt.Errorf("no viewer of type %T registered", view) +} + +type modifierFunc func(obj interface{}) error + +func (c *viewCache) modifyViews(obj interface{}, modify modifierFunc) error { + c.lock.RLock() + defer c.lock.RUnlock() + + errs := []error{} + for _, vk := range c.transformsToKeys[reflect.TypeOf(obj)] { + viewer, ok := c.viewers[vk] + if !ok { + return fmt.Errorf("no view found for key: %v", vk) + } + + view, err := viewer.View(obj) + if err != nil { + errs = append(errs, err) + continue + } + + if err := modify(view); err != nil { + errs = append(errs, err) + } + } + + return utilerrors.NewAggregate(errs) +} + +// Add sets an item in the cache. +func (c *viewCache) Add(obj interface{}) error { + return c.modifyViews(obj, c.Indexer.Add) +} + +// Update sets an item in the cache to its updated state. +func (c *viewCache) Update(obj interface{}) error { + return c.modifyViews(obj, c.Indexer.Update) +} + +// Delete removes an item from the cache. +func (c *viewCache) Delete(obj interface{}) error { + return c.modifyViews(obj, c.Indexer.Delete) +} + +// ------------------------------------------------------------------------------------------------------------------ + +// EnqueuingIndexBuilder builds index funcs that enqueue their indices on a set of queues as a side effect. +type EnqueuingIndexBuilder interface { + // Build builds a new function from the given index function that enqueues its indices on the given queues as a side effect. + Build(cache.IndexFunc, ...workqueue.RateLimitingInterface) cache.IndexFunc +} + +// EnqueuingIndexBuilderFunc implements EnqueuingIndexBuilder. +type EnqueuingIndexBuilderFunc func(cache.IndexFunc, ...workqueue.RateLimitingInterface) cache.IndexFunc + +// Build builds a new function from the given index function that enqueues its indices on the given queues as a side effect. +func (f EnqueuingIndexBuilderFunc) Build(index cache.IndexFunc, queues ...workqueue.RateLimitingInterface) cache.IndexFunc { + return f(index, queues...) +} + +// BuildEnqueuingIndex builds a new function from the given index function that enqueues its indices on the given queues synchronously as a side effect. +func BuildEnqueuingIndex(index cache.IndexFunc, queues ...workqueue.RateLimitingInterface) cache.IndexFunc { + return func(obj interface{}) ([]string, error) { + indices, err := index(obj) + if err != nil { + return nil, err + } + + // Enqueue on every queue before returning + for _, queue := range queues { + for _, idx := range indices { + queue.Add(idx) + } + } + + return indices, nil + } +} diff --git a/pkg/lib/queueinformer/queueinformer.go b/pkg/lib/queueinformer/queueinformer.go index 97f5c2dc34..f79a47ff9d 100644 --- a/pkg/lib/queueinformer/queueinformer.go +++ b/pkg/lib/queueinformer/queueinformer.go @@ -1,10 +1,12 @@ package queueinformer import ( - "github.com/operator-framework/operator-lifecycle-manager/pkg/metrics" "github.com/sirupsen/logrus" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" + + opcache "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/cache" + "github.com/operator-framework/operator-lifecycle-manager/pkg/metrics" ) // SyncHandler is the function that reconciles the controlled object when seen @@ -25,70 +27,77 @@ type QueueInformer struct { // enqueue adds a key to the queue. If obj is a key already it gets added directly. // Otherwise, the key is extracted via keyFunc. -func (q *QueueInformer) enqueue(obj interface{}) { +func enqueue(obj interface{}, key keyFunc, queue workqueue.RateLimitingInterface) { if obj == nil { return } - key, ok := obj.(string) + k, ok := obj.(string) if !ok { - key, ok = q.keyFunc(obj) + k, ok = key(obj) if !ok { return } } - q.queue.Add(key) + queue.Add(k) } // keyFunc turns an object into a key for the queue. In the future will use a (name, namespace) struct as key -func (q *QueueInformer) keyFunc(obj interface{}) (string, bool) { +type keyFunc func(obj interface{}) (string, bool) + +// key turns an object into a key for the queue. In the future will use a (name, namespace) struct as key +func (q *QueueInformer) key(obj interface{}) (string, bool) { k, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) if err != nil { - q.log.Infof("creating key failed: %s", err) + q.log.WithError(err).Debugf("creating key failed for: %v", obj) return k, false } return k, true } -func (q *QueueInformer) defaultAddFunc(obj interface{}) { - key, ok := q.keyFunc(obj) - if !ok { - q.log.Warnf("couldn't add %v, couldn't create key", obj) - return - } +func (q *QueueInformer) defaultAddFunc(queue workqueue.RateLimitingInterface) func(obj interface{}) { + return func(obj interface{}) { + if _, ok := q.key(obj); !ok { + q.log.Warnf("couldn't add %v, couldn't create key", obj) + return + } - q.enqueue(key) + enqueue(obj, q.key, queue) + } } -func (q *QueueInformer) defaultDeleteFunc(obj interface{}) { - key, ok := q.keyFunc(obj) - if !ok { - q.log.Warnf("couldn't delete %v, couldn't create key", obj) - return - } +func (q *QueueInformer) defaultDeleteFunc(queue workqueue.RateLimitingInterface) func(obj interface{}) { + return func(obj interface{}) { + k, ok := q.key(obj) + if !ok { + q.log.Warnf("couldn't delete %v, couldn't create key", obj) + return + } - q.queue.Forget(key) + queue.Forget(k) + } } -func (q *QueueInformer) defaultUpdateFunc(oldObj, newObj interface{}) { - key, ok := q.keyFunc(newObj) - if !ok { - q.log.Warnf("couldn't update %v, couldn't create key", newObj) - return - } +func (q *QueueInformer) defaultUpdateFunc(queue workqueue.RateLimitingInterface) func(oldObj, newObj interface{}) { + return func(oldObj, newObj interface{}) { + if _, ok := q.key(newObj); !ok { + q.log.Warnf("couldn't update %v, couldn't create key", newObj) + return + } - q.enqueue(key) + enqueue(newObj, q.key, queue) + } } // defaultResourceEventhandlerFuncs provides the default implementation for responding to events // these simply Log the event and add the object's key to the queue for later processing -func (q *QueueInformer) defaultResourceEventHandlerFuncs() *cache.ResourceEventHandlerFuncs { +func (q *QueueInformer) defaultResourceEventHandlerFuncs(queue workqueue.RateLimitingInterface) *cache.ResourceEventHandlerFuncs { return &cache.ResourceEventHandlerFuncs{ - AddFunc: q.defaultAddFunc, - DeleteFunc: q.defaultDeleteFunc, - UpdateFunc: q.defaultUpdateFunc, + AddFunc: q.defaultAddFunc(queue), + DeleteFunc: q.defaultDeleteFunc(queue), + UpdateFunc: q.defaultUpdateFunc(queue), } } @@ -113,7 +122,7 @@ func NewInformer(queue workqueue.RateLimitingInterface, informer cache.SharedInd MetricsProvider: metrics, log: logger, } - queueInformer.resourceEventHandlerFuncs = queueInformer.defaultResourceEventHandlerFuncs() + queueInformer.resourceEventHandlerFuncs = queueInformer.defaultResourceEventHandlerFuncs(queue) if funcs != nil { if funcs.AddFunc != nil { queueInformer.resourceEventHandlerFuncs.AddFunc = funcs.AddFunc @@ -128,3 +137,83 @@ func NewInformer(queue workqueue.RateLimitingInterface, informer cache.SharedInd queueInformer.informer.AddEventHandler(queueInformer.resourceEventHandlerFuncs) return queueInformer } + +// WithViewIndexer adds EventHandler funcs that inform the given ViewIndexer of changes. +// Returns the queueinformer to support chaining. +// TODO: Make this a method of a type that embeds QueueInformer instead. +func (q *QueueInformer) WithViewIndexer(viewIndexer opcache.ViewIndexer) *QueueInformer { + logger := q.log.WithField("eventhandler", "viewindexer") + handler := &cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + // TODO: Check for resource existing and update instead if it exists + logger := logger.WithField("operation", "add") + logger.Debugf("%v", obj) + if err := viewIndexer.Add(obj); err != nil { + logger.WithError(err).Warnf("could not add object of type %T to ViewIndexer", obj) + } + }, + UpdateFunc: func(oldObj, newObj interface{}) { + // TODO: Check for resource not existing before update and add instead if it doesn't exist + logger := logger.WithField("operation", "update") + logger.Debugf("%v", newObj) + if err := viewIndexer.Update(newObj); err != nil { + logger.WithError(err).Warnf("could not update object of type %T in ViewIndexer", newObj) + } + }, + DeleteFunc: func(obj interface{}) { + // TODO: Check for resource existence before deleting + logger := logger.WithField("operation", "delete") + if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok { + obj = tombstone.Obj + logger.Debug("unpacked tombstone") + } + + logger.Debugf("%v", obj) + if err := viewIndexer.Delete(obj); err != nil { + logger.WithError(err).Warnf("could not delete object of type %T in ViewIndexer", obj) + } + }, + } + + q.informer.AddEventHandler(handler) + + return q +} + +// AdditionallyDistribute adds EventHandler funcs that distribute events to each queue in the given list. +// Returns the queueinformer to support chaining. +// TODO: No ordering is guarenteed between event handlers. +func (q *QueueInformer) AdditionallyDistribute(queues ...workqueue.RateLimitingInterface) *QueueInformer { + // Build handler func for each queue + numQueues := len(queues) + add := make([]func(obj interface{}), numQueues) + update := make([]func(oldObj, newObj interface{}), numQueues) + del := make([]func(obj interface{}), numQueues) + for i, queue := range queues { + add[i] = q.defaultAddFunc(queue) + update[i] = q.defaultUpdateFunc(queue) + del[i] = q.defaultDeleteFunc(queue) + } + + handler := &cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + for _, f := range add { + f(obj) + } + }, + UpdateFunc: func(oldObj, newObj interface{}) { + for _, f := range update { + f(oldObj, newObj) + } + }, + DeleteFunc: func(obj interface{}) { + for _, f := range del { + f(obj) + } + }, + } + + q.informer.AddEventHandler(handler) + + return q +} diff --git a/pkg/lib/queueinformer/resourcequeue.go b/pkg/lib/queueinformer/resourcequeue.go index 08f4ea4424..d53c7ef547 100644 --- a/pkg/lib/queueinformer/resourcequeue.go +++ b/pkg/lib/queueinformer/resourcequeue.go @@ -53,6 +53,27 @@ func (r *ResourceQueueSet) Requeue(name, namespace string) error { return fmt.Errorf("couldn't find queue for resource") } +// RequeueRateLimited performs a rate limited requeue on the resource in the set with the given name and namespace +func (r *ResourceQueueSet) RequeueRateLimited(name, namespace string) error { + r.mutex.RLock() + defer r.mutex.RUnlock() + + // We can build the key directly, will need to change if queue uses different key scheme + key := fmt.Sprintf("%s/%s", namespace, name) + + if queue, ok := r.queueSet[metav1.NamespaceAll]; len(r.queueSet) == 1 && ok { + queue.AddRateLimited(key) + return nil + } + + if queue, ok := r.queueSet[namespace]; ok { + queue.AddRateLimited(key) + return nil + } + + return fmt.Errorf("couldn't find queue for resource") +} + // RequeueByKey adds the given key to the resource queue that should contain it func (r *ResourceQueueSet) RequeueByKey(key string) error { r.mutex.RLock()