diff --git a/controllers/crd_controller.go b/controllers/crd_controller.go deleted file mode 100644 index bfa0ab4ed..000000000 --- a/controllers/crd_controller.go +++ /dev/null @@ -1,101 +0,0 @@ -package controllers - -import ( - "context" - "sync" - - extv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" - "k8s.io/apimachinery/pkg/api/errors" - "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/handler" - "sigs.k8s.io/controller-runtime/pkg/reconcile" - "sigs.k8s.io/controller-runtime/pkg/source" - - "kubevirt.io/ssp-operator/controllers/finishable" -) - -// +kubebuilder:rbac:groups=apiextensions.k8s.io,resources=customresourcedefinitions,verbs=get;list;watch - -func CreateCrdController(mgr controllerruntime.Manager, requiredCrds []string) (finishable.Controller, error) { - crds := make(map[string]bool, len(requiredCrds)) - for _, crd := range requiredCrds { - crds[crd] = false - } - - reconciler := &waitForCrds{ - client: mgr.GetClient(), - crds: crds, - } - - initCtrl, err := finishable.NewController("init-controller", mgr, reconciler) - if err != nil { - return nil, err - } - - err = initCtrl.Watch(&source.Kind{Type: &extv1.CustomResourceDefinition{}}, &handler.EnqueueRequestForObject{}) - if err != nil { - return nil, err - } - - return initCtrl, nil -} - -type waitForCrds struct { - client client.Client - - lock sync.RWMutex - crds map[string]bool -} - -var _ finishable.Reconciler = &waitForCrds{} - -func (w *waitForCrds) Reconcile(ctx context.Context, request reconcile.Request) (finishable.Result, error) { - crdExists := true - crd := &extv1.CustomResourceDefinition{} - err := w.client.Get(ctx, request.NamespacedName, crd) - if err != nil { - if !errors.IsNotFound(err) { - return finishable.Result{}, err - } - crdExists = false - } - - // If CRD is being deleted, we treat it as not existing. - if !crd.GetDeletionTimestamp().IsZero() { - crdExists = false - } - - key := request.NamespacedName.Name - if w.isCrdRequired(key) { - w.setCrdExists(key, crdExists) - } - - return finishable.Result{Finished: w.allCrdsExist()}, nil -} - -func (w *waitForCrds) isCrdRequired(key string) bool { - w.lock.RLock() - defer w.lock.RUnlock() - - _, exists := w.crds[key] - return exists -} - -func (w *waitForCrds) setCrdExists(key string, val bool) { - w.lock.Lock() - defer w.lock.Unlock() - - w.crds[key] = val -} - -func (w *waitForCrds) allCrdsExist() bool { - w.lock.RLock() - defer w.lock.RUnlock() - - allExist := true - for _, exists := range w.crds { - allExist = allExist && exists - } - return allExist -} diff --git a/controllers/finishable/finishable.go b/controllers/finishable/finishable.go deleted file mode 100644 index 2d18a9381..000000000 --- a/controllers/finishable/finishable.go +++ /dev/null @@ -1,84 +0,0 @@ -package finishable - -import ( - "context" - - ctrl "sigs.k8s.io/controller-runtime/pkg/controller" - "sigs.k8s.io/controller-runtime/pkg/handler" - "sigs.k8s.io/controller-runtime/pkg/manager" - "sigs.k8s.io/controller-runtime/pkg/predicate" - "sigs.k8s.io/controller-runtime/pkg/reconcile" - "sigs.k8s.io/controller-runtime/pkg/source" -) - -// Reconciler interface extends reconcile.Reconciler interface -// with the option to stop the reconciliation. -type Reconciler interface { - Reconcile(context.Context, reconcile.Request) (Result, error) -} - -type Controller interface { - manager.Runnable - - Watch(src source.Source, eventhandler handler.EventHandler, predicates ...predicate.Predicate) error -} - -type Result struct { - reconcile.Result - Finished bool -} - -type wrapper struct { - reconciler Reconciler - stopFunc func() -} - -var _ reconcile.Reconciler = &wrapper{} - -func (w *wrapper) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) { - res, err := w.reconciler.Reconcile(ctx, request) - if res.Finished { - w.stopFunc() - } - return res.Result, err -} - -type controller struct { - reconcilerWrapper *wrapper - innerController ctrl.Controller -} - -var _ Controller = &controller{} - -func (c *controller) Watch(src source.Source, eventhandler handler.EventHandler, predicates ...predicate.Predicate) error { - return c.innerController.Watch(src, eventhandler, predicates...) -} - -func (c *controller) Start(ctx context.Context) error { - innerCtx, innerCtxCancel := context.WithCancel(ctx) - // Using defer here in case the fllowing innerController.Start() - // will end with an error. In that case the context would not be closed. - // Closing a context multiple times is supported. - defer innerCtxCancel() - - c.reconcilerWrapper.stopFunc = innerCtxCancel - - return c.innerController.Start(innerCtx) -} - -// NewController returns a controller that can be stopped from the Reconciler implementation. -func NewController(name string, mgr manager.Manager, reconciler Reconciler) (Controller, error) { - wrap := &wrapper{reconciler: reconciler} - - innerController, err := ctrl.NewUnmanaged(name, mgr, ctrl.Options{ - Reconciler: wrap, - }) - if err != nil { - return nil, err - } - - return &controller{ - reconcilerWrapper: wrap, - innerController: innerController, - }, nil -} diff --git a/controllers/services_controller.go b/controllers/services_controller.go index 90f6102b1..de869356a 100644 --- a/controllers/services_controller.go +++ b/controllers/services_controller.go @@ -61,8 +61,8 @@ func ServiceObject(namespace string) *v1.Service { // Annotation to generate RBAC roles to read and modify services // +kubebuilder:rbac:groups="",resources=services,verbs=get;watch;list;create;update;delete -func CreateServiceController(mgr ctrl.Manager) (*serviceReconciler, error) { - return newServiceReconciler(mgr) +func CreateServiceController(ctx context.Context, mgr ctrl.Manager) (*serviceReconciler, error) { + return newServiceReconciler(ctx, mgr) } func (r *serviceReconciler) Start(ctx context.Context, mgr ctrl.Manager) error { @@ -105,24 +105,24 @@ type serviceReconciler struct { deployment *apps.Deployment } -func getOperatorDeployment(namespace string, apiReader client.Reader) (*apps.Deployment, error) { +func getOperatorDeployment(ctx context.Context, namespace string, apiReader client.Reader) (*apps.Deployment, error) { objKey := client.ObjectKey{Namespace: namespace, Name: OperatorName} var deployment apps.Deployment - err := apiReader.Get(context.TODO(), objKey, &deployment) + err := apiReader.Get(ctx, objKey, &deployment) if err != nil { return nil, fmt.Errorf("getOperatorDeployment, get deployment: %w", err) } return &deployment, nil } -func newServiceReconciler(mgr ctrl.Manager) (*serviceReconciler, error) { +func newServiceReconciler(ctx context.Context, mgr ctrl.Manager) (*serviceReconciler, error) { logger := ctrl.Log.WithName("controllers").WithName("Resources") namespace, err := common.GetOperatorNamespace(logger) if err != nil { return nil, fmt.Errorf("in newServiceReconciler: %w", err) } - deployment, err := getOperatorDeployment(namespace, mgr.GetAPIReader()) + deployment, err := getOperatorDeployment(ctx, namespace, mgr.GetAPIReader()) if err != nil { return nil, fmt.Errorf("in newServiceReconciler: %w", err) } diff --git a/controllers/setup.go b/controllers/setup.go index 4a4109ebd..e4d892ac8 100644 --- a/controllers/setup.go +++ b/controllers/setup.go @@ -5,9 +5,8 @@ import ( "fmt" "path/filepath" - extv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" - "kubevirt.io/ssp-operator/internal/common" + crd_watch "kubevirt.io/ssp-operator/internal/crd-watch" "kubevirt.io/ssp-operator/internal/operands" common_templates "kubevirt.io/ssp-operator/internal/operands/common-templates" data_sources "kubevirt.io/ssp-operator/internal/operands/data-sources" @@ -19,11 +18,14 @@ import ( "sigs.k8s.io/controller-runtime/pkg/manager" ) -func CreateAndSetupReconciler(mgr controllerruntime.Manager) error { +// Need to watch CRDs +// +kubebuilder:rbac:groups=apiextensions.k8s.io,resources=customresourcedefinitions,verbs=get;list;watch + +func CreateAndStartReconciler(ctx context.Context, mgr controllerruntime.Manager) error { templatesFile := filepath.Join(templateBundleDir, "common-templates-"+common_templates.Version+".yaml") templatesBundle, err := template_bundle.ReadBundle(templatesFile) if err != nil { - return err + return fmt.Errorf("failed to read template bundle: %w", err) } sspOperands := []operands.Operand{ @@ -39,27 +41,44 @@ func CreateAndSetupReconciler(mgr controllerruntime.Manager) error { requiredCrds = append(requiredCrds, sspOperands[i].RequiredCrds()...) } - // Check if all needed CRDs exist - crdList := &extv1.CustomResourceDefinitionList{} - err = mgr.GetAPIReader().List(context.TODO(), crdList) + mgrCtx, cancel := context.WithCancel(ctx) + defer cancel() + + crdWatch := crd_watch.New(requiredCrds...) + // Cleanly stops the manager and exit. The pod will be restarted. + crdWatch.AllCrdsAddedHandler = cancel + crdWatch.SomeCrdRemovedHandler = cancel + + err = crdWatch.Init(mgrCtx, mgr.GetAPIReader()) if err != nil { return err } - infrastructureTopology, err := common.GetInfrastructureTopology(mgr.GetAPIReader()) + if missingCrds := crdWatch.MissingCrds(); len(missingCrds) > 0 { + mgr.GetLogger().Error(nil, "Some required crds are missing. The operator will not create any new resources.", + "missingCrds", missingCrds, + ) + } + + err = mgr.Add(crdWatch) if err != nil { return err } - serviceController, err := CreateServiceController(mgr) + infrastructureTopology, err := common.GetInfrastructureTopology(mgrCtx, mgr.GetAPIReader()) if err != nil { - return err + return fmt.Errorf("failed to get infrastructure topology: %w", err) + } + + serviceController, err := CreateServiceController(mgrCtx, mgr) + if err != nil { + return fmt.Errorf("failed to create service controller: %w", err) } err = mgr.Add(manager.RunnableFunc(func(ctx context.Context) error { err := serviceController.Start(ctx, mgr) if err != nil { - return fmt.Errorf("error adding serviceController: %w", err) + return fmt.Errorf("error starting serviceController: %w", err) } mgr.GetLogger().Info("Services Controller started") @@ -67,51 +86,20 @@ func CreateAndSetupReconciler(mgr controllerruntime.Manager) error { return nil })) if err != nil { - return err + return fmt.Errorf("error adding service controller: %w", err) } - reconciler := NewSspReconciler(mgr.GetClient(), mgr.GetAPIReader(), infrastructureTopology, sspOperands) + reconciler := NewSspReconciler(mgr.GetClient(), mgr.GetAPIReader(), infrastructureTopology, sspOperands, crdWatch) - if requiredCrdsExist(requiredCrds, crdList.Items) { - // No need to start CRD controller - return reconciler.setupController(mgr) - } - - mgr.GetLogger().Info("Required CRDs do not exist. Waiting until they are installed.", - "required_crds", requiredCrds, - ) - - crdController, err := CreateCrdController(mgr, requiredCrds) + err = reconciler.setupController(mgr) if err != nil { return err } - return mgr.Add(manager.RunnableFunc(func(ctx context.Context) error { - // First start the CRD controller - err := crdController.Start(ctx) - if err != nil { - return err - } - - mgr.GetLogger().Info("Required CRDs were installed, starting SSP operator.") - - // Clear variable, so it can be garbage collected - crdController = nil - - // After it is finished, add the SSP controller to the manager - return reconciler.setupController(mgr) - })) -} - -func requiredCrdsExist(required []string, foundCrds []extv1.CustomResourceDefinition) bool { -OuterLoop: - for i := range required { - for j := range foundCrds { - if required[i] == foundCrds[j].Name { - continue OuterLoop - } - } - return false + mgr.GetLogger().Info("starting manager") + if err := mgr.Start(mgrCtx); err != nil { + mgr.GetLogger().Error(err, "problem running manager") + return err } - return true + return nil } diff --git a/controllers/ssp_controller.go b/controllers/ssp_controller.go index cb3686fe6..b73142365 100644 --- a/controllers/ssp_controller.go +++ b/controllers/ssp_controller.go @@ -22,8 +22,10 @@ import ( "os" "reflect" "strconv" + "strings" "github.com/go-logr/logr" + osconfv1 "github.com/openshift/api/config/v1" conditionsv1 "github.com/openshift/custom-resource-status/conditions/v1" libhandler "github.com/operator-framework/operator-lib/handler" v1 "k8s.io/api/core/v1" @@ -43,11 +45,11 @@ import ( "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/source" - osconfv1 "github.com/openshift/api/config/v1" ssp "kubevirt.io/ssp-operator/api/v1beta1" "kubevirt.io/ssp-operator/internal/common" handler_hook "kubevirt.io/ssp-operator/internal/controller/handler-hook" "kubevirt.io/ssp-operator/internal/controller/predicates" + crd_watch "kubevirt.io/ssp-operator/internal/crd-watch" "kubevirt.io/ssp-operator/internal/operands" ) @@ -75,9 +77,11 @@ type sspReconciler struct { lastSspSpec ssp.SSPSpec subresourceCache common.VersionCache topologyMode osconfv1.TopologyMode + crdWatch *crd_watch.CrdWatch + areCrdsMissing bool } -func NewSspReconciler(client client.Client, uncachedReader client.Reader, infrastructureTopology osconfv1.TopologyMode, operands []operands.Operand) *sspReconciler { +func NewSspReconciler(client client.Client, uncachedReader client.Reader, infrastructureTopology osconfv1.TopologyMode, operands []operands.Operand, crdWatch *crd_watch.CrdWatch) *sspReconciler { return &sspReconciler{ client: client, uncachedReader: uncachedReader, @@ -85,6 +89,7 @@ func NewSspReconciler(client client.Client, uncachedReader client.Reader, infras operands: operands, subresourceCache: common.VersionCache{}, topologyMode: infrastructureTopology, + crdWatch: crdWatch, } } @@ -111,8 +116,13 @@ func (r *sspReconciler) setupController(mgr ctrl.Manager) error { builder := ctrl.NewControllerManagedBy(mgr) watchSspResource(builder) - watchClusterResources(builder, r.operands, eventHandlerHook) - watchNamespacedResources(builder, r.operands, eventHandlerHook) + + r.areCrdsMissing = len(r.crdWatch.MissingCrds()) > 0 + + // Register watches for created objects only if all required CRDs exist + watchClusterResources(builder, r.crdWatch, r.operands, eventHandlerHook) + watchNamespacedResources(builder, r.crdWatch, r.operands, eventHandlerHook) + return builder.Complete(r) } @@ -156,6 +166,7 @@ func (r *sspReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res ct Logger: reqLogger, VersionCache: r.subresourceCache, TopologyMode: r.topologyMode, + CrdWatch: r.crdWatch, } if !isInitialized(sspRequest.Instance) { @@ -190,6 +201,11 @@ func (r *sspReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res ct return ctrl.Result{}, err } + if r.areCrdsMissing { + err := updateStatusMissingCrds(sspRequest, r.crdWatch.MissingCrds()) + return ctrl.Result{}, err + } + sspRequest.Logger.V(1).Info("Updating CR status prior to operand reconciliation...") err = preUpdateStatus(sspRequest) if err != nil { @@ -592,6 +608,34 @@ func updateStatus(request *common.Request, reconcileResults []common.ReconcileRe return request.Client.Status().Update(request.Context, request.Instance) } +func updateStatusMissingCrds(request *common.Request, missingCrds []string) error { + sspStatus := &request.Instance.Status + + message := fmt.Sprintf("Requred CRDs are missing: %s", strings.Join(missingCrds, ", ")) + conditionsv1.SetStatusCondition(&sspStatus.Conditions, conditionsv1.Condition{ + Type: conditionsv1.ConditionAvailable, + Status: v1.ConditionFalse, + Reason: "Available", + Message: message, + }) + + conditionsv1.SetStatusCondition(&sspStatus.Conditions, conditionsv1.Condition{ + Type: conditionsv1.ConditionProgressing, + Status: v1.ConditionTrue, + Reason: "Progressing", + Message: message, + }) + + conditionsv1.SetStatusCondition(&sspStatus.Conditions, conditionsv1.Condition{ + Type: conditionsv1.ConditionDegraded, + Status: v1.ConditionTrue, + Reason: "Degraded", + Message: message, + }) + + return request.Client.Status().Update(request.Context, request.Instance) +} + func prefixResourceTypeAndName(message string, resource client.Object) string { return fmt.Sprintf("%s %s/%s: %s", resource.GetObjectKind().GroupVersionKind().Kind, @@ -663,8 +707,9 @@ func watchSspResource(bldr *ctrl.Builder) { bldr.For(&ssp.SSP{}, builder.WithPredicates(pred)) } -func watchNamespacedResources(builder *ctrl.Builder, sspOperands []operands.Operand, eventHandlerHook handler_hook.HookFunc) { +func watchNamespacedResources(builder *ctrl.Builder, crdWatch *crd_watch.CrdWatch, sspOperands []operands.Operand, eventHandlerHook handler_hook.HookFunc) { watchResources(builder, + crdWatch, &handler.EnqueueRequestForOwner{ IsController: true, OwnerType: &ssp.SSP{}, @@ -675,8 +720,9 @@ func watchNamespacedResources(builder *ctrl.Builder, sspOperands []operands.Oper ) } -func watchClusterResources(builder *ctrl.Builder, sspOperands []operands.Operand, eventHandlerHook handler_hook.HookFunc) { +func watchClusterResources(builder *ctrl.Builder, crdWatch *crd_watch.CrdWatch, sspOperands []operands.Operand, eventHandlerHook handler_hook.HookFunc) { watchResources(builder, + crdWatch, &libhandler.EnqueueRequestForAnnotation{ Type: schema.GroupKind{ Group: ssp.GroupVersion.Group, @@ -689,7 +735,7 @@ func watchClusterResources(builder *ctrl.Builder, sspOperands []operands.Operand ) } -func watchResources(ctrlBuilder *ctrl.Builder, handler handler.EventHandler, sspOperands []operands.Operand, watchTypesFunc func(operands.Operand) []operands.WatchType, hookFunc handler_hook.HookFunc) { +func watchResources(ctrlBuilder *ctrl.Builder, crdWatch *crd_watch.CrdWatch, handler handler.EventHandler, sspOperands []operands.Operand, watchTypesFunc func(operands.Operand) []operands.WatchType, hookFunc handler_hook.HookFunc) { // Deduplicate watches watchedTypes := make(map[reflect.Type]operands.WatchType) for _, operand := range sspOperands { @@ -705,6 +751,11 @@ func watchResources(ctrlBuilder *ctrl.Builder, handler handler.EventHandler, ssp } for _, watchType := range watchedTypes { + if watchType.Crd != "" && !crdWatch.CrdExists(watchType.Crd) { + // Do not watch resources without CRD + continue + } + var predicates []predicate.Predicate if !watchType.WatchFullObject { predicates = []predicate.Predicate{relevantChangesPredicate()} diff --git a/internal/common/crypto_policy.go b/internal/common/crypto_policy.go index 434c10b16..5a5a601c1 100644 --- a/internal/common/crypto_policy.go +++ b/internal/common/crypto_policy.go @@ -68,7 +68,7 @@ func (s *SSPTLSOptions) CipherIDs() (cipherSuites []uint16) { return } -func GetSspTlsOptions() (*SSPTLSOptions, error) { +func GetSspTlsOptions(ctx context.Context) (*SSPTLSOptions, error) { setupLog := ctrl.Log.WithName("setup") restConfig := ctrl.GetConfigOrDie() apiReader, err := client.New(restConfig, client.Options{Scheme: Scheme}) @@ -82,7 +82,7 @@ func GetSspTlsOptions() (*SSPTLSOptions, error) { } var sspList v1beta1.SSPList - if err := apiReader.List(context.TODO(), &sspList, &client.ListOptions{Namespace: namespace}); err != nil { + if err := apiReader.List(ctx, &sspList, &client.ListOptions{Namespace: namespace}); err != nil { return nil, err } diff --git a/internal/common/environment.go b/internal/common/environment.go index d811fdb85..6dbdd3ebf 100644 --- a/internal/common/environment.go +++ b/internal/common/environment.go @@ -33,9 +33,9 @@ func GetOperatorVersion() string { return EnvOrDefault(OperatorVersionKey, defaultOperatorVersion) } -func GetInfrastructureTopology(c client.Reader) (osconfv1.TopologyMode, error) { +func GetInfrastructureTopology(ctx context.Context, c client.Reader) (osconfv1.TopologyMode, error) { infraConfig := &osconfv1.Infrastructure{} - if err := c.Get(context.TODO(), types.NamespacedName{Name: "cluster"}, infraConfig); err != nil { + if err := c.Get(ctx, types.NamespacedName{Name: "cluster"}, infraConfig); err != nil { return "", err } diff --git a/internal/common/request.go b/internal/common/request.go index f096d3535..549bb2f60 100644 --- a/internal/common/request.go +++ b/internal/common/request.go @@ -9,6 +9,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/reconcile" ssp "kubevirt.io/ssp-operator/api/v1beta1" + crd_watch "kubevirt.io/ssp-operator/internal/crd-watch" ) type Request struct { @@ -20,6 +21,8 @@ type Request struct { Logger logr.Logger VersionCache VersionCache TopologyMode osconfv1.TopologyMode + + CrdWatch *crd_watch.CrdWatch } func (r *Request) IsSingleReplicaTopologyMode() bool { diff --git a/internal/common/scheme.go b/internal/common/scheme.go index e7e66d344..a7302c07f 100644 --- a/internal/common/scheme.go +++ b/internal/common/scheme.go @@ -3,7 +3,7 @@ package common import ( osconfv1 "github.com/openshift/api/config/v1" extv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" - + internalmeta "k8s.io/apimachinery/pkg/apis/meta/internalversion" "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" clientgoscheme "k8s.io/client-go/kubernetes/scheme" @@ -18,6 +18,7 @@ var ( func init() { utilruntime.Must(clientgoscheme.AddToScheme(Scheme)) utilruntime.Must(extv1.AddToScheme(Scheme)) + utilruntime.Must(internalmeta.AddToScheme(Scheme)) utilruntime.Must(sspv1beta1.AddToScheme(Scheme)) utilruntime.Must(osconfv1.Install(Scheme)) } diff --git a/internal/crd-watch/crd-watch.go b/internal/crd-watch/crd-watch.go new file mode 100644 index 000000000..ef0a48095 --- /dev/null +++ b/internal/crd-watch/crd-watch.go @@ -0,0 +1,211 @@ +package crd_watch + +import ( + "context" + "fmt" + "sync" + + "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/tools/cache" + ctrlcache "sigs.k8s.io/controller-runtime/pkg/cache" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/runtime/inject" +) + +type CrdWatch struct { + AllCrdsAddedHandler func() + SomeCrdRemovedHandler func() + + lock sync.Mutex + requiredCrds map[string]struct{} + existingCrds map[string]struct{} + missingCrds map[string]struct{} + + initialized bool + cache ctrlcache.Cache +} + +func New(requiredCrds ...string) *CrdWatch { + requiredCrdsMap := make(map[string]struct{}, len(requiredCrds)) + missingCrds := make(map[string]struct{}, len(requiredCrds)) + for _, crdName := range requiredCrds { + requiredCrdsMap[crdName] = struct{}{} + missingCrds[crdName] = struct{}{} + } + + return &CrdWatch{ + requiredCrds: requiredCrdsMap, + existingCrds: map[string]struct{}{}, + missingCrds: missingCrds, + } +} + +func (c *CrdWatch) Init(ctx context.Context, reader client.Reader) error { + if err := c.sync(ctx, reader); err != nil { + return err + } + + c.initialized = true + return nil +} + +func (c *CrdWatch) CrdExists(crdName string) bool { + c.lock.Lock() + defer c.lock.Unlock() + + if !c.initialized { + panic("crd watch not initialized") + } + + _, exists := c.existingCrds[crdName] + return exists +} + +func (c *CrdWatch) MissingCrds() []string { + c.lock.Lock() + defer c.lock.Unlock() + + if !c.initialized { + panic("crd watch not initialized") + } + + names := make([]string, 0, len(c.missingCrds)) + for crdName := range c.missingCrds { + names = append(names, crdName) + } + return names +} + +var _ manager.Runnable = &CrdWatch{} + +func (c *CrdWatch) Start(ctx context.Context) error { + if !c.initialized { + err := c.Init(ctx, c.cache) + if err != nil { + return fmt.Errorf("failed to initialize crd watch: %w", err) + } + } + + informer, err := c.cache.GetInformer(ctx, &metav1.PartialObjectMetadata{ + TypeMeta: metav1.TypeMeta{ + APIVersion: apiextensions.GroupName + "/v1", + Kind: "CustomResourceDefinition", + }, + }) + if err != nil { + return fmt.Errorf("failed to get informer: %w", err) + } + + informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + c.lock.Lock() + defer c.lock.Unlock() + c.crdAdded(obj.(*metav1.PartialObjectMetadata).GetName()) + }, + DeleteFunc: func(obj interface{}) { + c.lock.Lock() + defer c.lock.Unlock() + c.crdDeleted(obj.(*metav1.PartialObjectMetadata).GetName()) + }, + }) + + if err := c.sync(ctx, c.cache); err != nil { + return err + } + + // This function has to block, because that is what manager.Runnable expects. + <-ctx.Done() + return nil +} + +func (c *CrdWatch) sync(ctx context.Context, reader client.Reader) error { + crdMetaList := &metav1.PartialObjectMetadataList{ + TypeMeta: metav1.TypeMeta{ + APIVersion: apiextensions.SchemeGroupVersion.String(), + Kind: "CustomResourceDefinitionList", + }, + } + + err := reader.List(ctx, crdMetaList) + if err != nil { + return fmt.Errorf("failed to list CRDs: %w", err) + } + + newCrds := make(map[string]struct{}, len(crdMetaList.Items)) + for i := range crdMetaList.Items { + name := crdMetaList.Items[i].Name + newCrds[name] = struct{}{} + } + + c.lock.Lock() + defer c.lock.Unlock() + + // Collecting added and deleted CRDs to slices, + // because the c.crdAdded() and c.crdDeleted() + // modify the c.existingCrds map + addedCrds := make([]string, 0, len(crdMetaList.Items)) + deletedCrds := make([]string, 0, len(c.existingCrds)) + + for name := range newCrds { + if _, exists := c.existingCrds[name]; !exists { + addedCrds = append(addedCrds, name) + } + } + for name := range c.existingCrds { + if _, exists := newCrds[name]; !exists { + deletedCrds = append(deletedCrds, name) + } + } + + for _, name := range addedCrds { + c.crdAdded(name) + } + for _, name := range deletedCrds { + c.crdDeleted(name) + } + + return nil +} + +func (c *CrdWatch) crdAdded(crdName string) { + c.existingCrds[crdName] = struct{}{} + missingCountOld := len(c.missingCrds) + delete(c.missingCrds, crdName) + + if !c.initialized || c.AllCrdsAddedHandler == nil { + return + } + + // Trigger the handler when the last required CRD is added + if missingCountOld == 1 && len(c.missingCrds) == 0 { + c.AllCrdsAddedHandler() + } +} + +func (c *CrdWatch) crdDeleted(crdName string) { + delete(c.existingCrds, crdName) + if _, isRequired := c.requiredCrds[crdName]; !isRequired { + return + } + + missingCountOld := len(c.missingCrds) + c.missingCrds[crdName] = struct{}{} + + if !c.initialized || c.SomeCrdRemovedHandler == nil { + return + } + + // Trigger the handler when all crds exist and then one is removed. + if missingCountOld == 0 { + c.SomeCrdRemovedHandler() + } +} + +var _ inject.Cache = &CrdWatch{} + +func (c *CrdWatch) InjectCache(cache ctrlcache.Cache) error { + c.cache = cache + return nil +} diff --git a/internal/crd-watch/crd-watch_test.go b/internal/crd-watch/crd-watch_test.go new file mode 100644 index 000000000..cf4d0d942 --- /dev/null +++ b/internal/crd-watch/crd-watch_test.go @@ -0,0 +1,216 @@ +package crd_watch + +import ( + "context" + goruntime "runtime" + "sync/atomic" + "time" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions" + internalmeta "k8s.io/apimachinery/pkg/apis/meta/internalversion" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/conversion" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes/scheme" + "sigs.k8s.io/controller-runtime/pkg/cache/informertest" + "sigs.k8s.io/controller-runtime/pkg/client/fake" +) + +var _ = Describe("CRD watch", func() { + const ( + crd1 = "required-crd-1" + crd2 = "required-crd-2" + crd3 = "required-crd-3" + ) + + var ( + fakeInformers *informertest.FakeInformers + + crdWatch *CrdWatch + cancel context.CancelFunc + ) + + BeforeEach(func() { + Expect(internalmeta.AddToScheme(scheme.Scheme)).To(Succeed()) + Expect(apiextensions.AddToScheme(scheme.Scheme)).To(Succeed()) + + // For unit tests, we to manually add conversion functions from CRD to PartialObjectMetadata + Expect(addConversionFunctions(scheme.Scheme)).To(Succeed()) + + crdObj := &apiextensions.CustomResourceDefinition{ + TypeMeta: metav1.TypeMeta{ + APIVersion: apiextensions.SchemeGroupVersion.String(), + Kind: "CustomResourceDefinition", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: crd1, + }, + } + + fakeClient := fake.NewClientBuilder().Build() + Expect(fakeClient.Create(context.Background(), crdObj)).To(Succeed()) + + fakeInformers = &informertest.FakeInformers{} + + crdWatch = New(crd1, crd2, crd3) + Expect(crdWatch.Init(context.Background(), fakeClient)).To(Succeed()) + Expect(crdWatch.InjectCache(fakeInformers)).To(Succeed()) + + var ctx context.Context + ctx, cancel = context.WithCancel(context.Background()) + go func() { + defer GinkgoRecover() + Expect(crdWatch.Start(ctx)).To(Succeed()) + }() + // Yield the current goroutine and allow the above one to start. + // This is a hack, because there is no way for tests to wait until crdWatch registers informers. + goruntime.Gosched() + }) + + AfterEach(func() { + if cancel != nil { + cancel() + } + }) + + It("should check for existing CRD", func() { + const crdName = "test-crd" + addCrdToFakeInformers(crdName, fakeInformers) + Expect(crdWatch.CrdExists(crdName)).To(BeTrue(), "Crd should exist") + }) + + It("should check for non-existing CRD", func() { + addCrdToFakeInformers("test-crd", fakeInformers) + Expect(crdWatch.CrdExists("nonexistent-crd")).To(BeFalse(), "Crd should not exist") + }) + + It("should return no missing CRD", func() { + addCrdToFakeInformers(crd2, fakeInformers) + addCrdToFakeInformers(crd3, fakeInformers) + + Expect(crdWatch.MissingCrds()).To(BeEmpty()) + }) + + It("should return CRDs that are missing", func() { + addCrdToFakeInformers(crd2, fakeInformers) + + missingCrds := crdWatch.MissingCrds() + Expect(missingCrds).To(HaveLen(1)) + Expect(missingCrds).To(ContainElement(crd3)) + }) + + Context("AllCrdsAddedHandler", func() { + It("should call handler when all requested CRDs are added", func() { + handlerCalled := make(chan struct{}) + crdWatch.AllCrdsAddedHandler = func() { + close(handlerCalled) + } + + addCrdToFakeInformers(crd2, fakeInformers) + addCrdToFakeInformers(crd3, fakeInformers) + + Eventually(handlerCalled, 50*time.Millisecond).Should(BeClosed()) + }) + + It("should not call the handler multiple times", func() { + var callCount int32 + crdWatch.AllCrdsAddedHandler = func() { + atomic.AddInt32(&callCount, 1) + } + + // Adding all needed CRDs to call handler for the first time + addCrdToFakeInformers(crd2, fakeInformers) + addCrdToFakeInformers(crd3, fakeInformers) + + Eventually(func() int32 { + return atomic.LoadInt32(&callCount) + }, 50*time.Millisecond).Should(BeNumerically(">", 0)) + + // Add Another CRD and verify that the handler will not be called again + addCrdToFakeInformers("not-required-crd", fakeInformers) + + Consistently(func() int32 { + return atomic.LoadInt32(&callCount) + }, 100*time.Millisecond).Should(Equal(1)) + }) + + It("should not call the handler when some CRDs are missing", func() { + + }) + }) + + Context("SomeCrdRemovedHandler", func() { + + }) + + It("should execute SomeCrdRemovedHandler when some required CRDs are removed", func() { + addCrdToFakeInformers(crd2, fakeInformers) + addCrdToFakeInformers(crd3, fakeInformers) + + handlerCalled := make(chan struct{}) + crdWatch.SomeCrdRemovedHandler = func() { + close(handlerCalled) + } + + removeCrdFromFakeInformers(crd1, fakeInformers) + + Eventually(handlerCalled, 50*time.Millisecond).Should(BeClosed()) + }) +}) + +func addConversionFunctions(s *runtime.Scheme) error { + err := s.AddConversionFunc((*apiextensions.CustomResourceDefinition)(nil), (*metav1.PartialObjectMetadata)(nil), func(a, b interface{}, scope conversion.Scope) error { + crd := a.(*apiextensions.CustomResourceDefinition) + partialMeta := b.(*metav1.PartialObjectMetadata) + + partialMeta.TypeMeta = crd.TypeMeta + partialMeta.ObjectMeta = crd.ObjectMeta + return nil + }) + if err != nil { + return err + } + + return s.AddConversionFunc((*apiextensions.CustomResourceDefinitionList)(nil), (*metav1.PartialObjectMetadataList)(nil), func(a, b interface{}, scope conversion.Scope) error { + crdList := a.(*apiextensions.CustomResourceDefinitionList) + partialMetaList := b.(*metav1.PartialObjectMetadataList) + + partialMetaList.TypeMeta = crdList.TypeMeta + partialMetaList.ListMeta = crdList.ListMeta + + partialMetaList.Items = make([]metav1.PartialObjectMetadata, len(crdList.Items)) + for i := range crdList.Items { + if err := scope.Convert(&crdList.Items[i], &partialMetaList.Items[i]); err != nil { + return err + } + } + return nil + }) +} + +func addCrdToFakeInformers(crdName string, fakeInformers *informertest.FakeInformers) { + fakeInformer, err := fakeInformers.FakeInformerFor(&metav1.PartialObjectMetadata{}) + ExpectWithOffset(1, err).ToNot(HaveOccurred()) + fakeInformer.Add(crdPartialMetadata(crdName)) +} + +func removeCrdFromFakeInformers(crdName string, fakeInformers *informertest.FakeInformers) { + fakeInformer, err := fakeInformers.FakeInformerFor(&metav1.PartialObjectMetadata{}) + ExpectWithOffset(1, err).ToNot(HaveOccurred()) + fakeInformer.Delete(crdPartialMetadata(crdName)) +} + +func crdPartialMetadata(crdName string) *metav1.PartialObjectMetadata { + return &metav1.PartialObjectMetadata{ + TypeMeta: metav1.TypeMeta{ + APIVersion: apiextensions.SchemeGroupVersion.String(), + Kind: "CustomResourceDefinition", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: crdName, + }, + } +} diff --git a/internal/operands/data-sources/reconcile.go b/internal/operands/data-sources/reconcile.go index 5d86261b3..dd312fd17 100644 --- a/internal/operands/data-sources/reconcile.go +++ b/internal/operands/data-sources/reconcile.go @@ -35,6 +35,12 @@ const ( operandComponent = common.AppComponentTemplating ) +const ( + dataVolumeCrd = "datavolumes.cdi.kubevirt.io" + dataSourceCrd = "datasources.cdi.kubevirt.io" + dataImportCronCrd = "dataimportcrons.cdi.kubevirt.io" +) + func init() { utilruntime.Must(cdiv1beta1.AddToScheme(common.Scheme)) } @@ -46,8 +52,8 @@ func WatchClusterTypes() []operands.WatchType { {Object: &rbac.RoleBinding{}}, {Object: &core.Namespace{}}, // Need to watch status of DataSource to notice if referenced PVC was deleted. - {Object: &cdiv1beta1.DataSource{}, WatchFullObject: true}, - {Object: &cdiv1beta1.DataImportCron{}}, + {Object: &cdiv1beta1.DataSource{}, Crd: dataSourceCrd, WatchFullObject: true}, + {Object: &cdiv1beta1.DataImportCron{}, Crd: dataImportCronCrd}, } } @@ -77,9 +83,9 @@ func (d *dataSources) WatchClusterTypes() []operands.WatchType { func (d *dataSources) RequiredCrds() []string { return []string{ - "datavolumes.cdi.kubevirt.io", - "datasources.cdi.kubevirt.io", - "dataimportcrons.cdi.kubevirt.io", + dataVolumeCrd, + dataSourceCrd, + dataImportCronCrd, } } @@ -135,34 +141,38 @@ func (d *dataSources) Reconcile(request *common.Request) ([]common.ReconcileResu } func (d *dataSources) Cleanup(request *common.Request) ([]common.CleanupResult, error) { - ownedCrons, err := listAllOwnedDataImportCrons(request) - if err != nil { - return nil, err - } - - var results []common.CleanupResult - allDataImportCronsDeleted := true - for i := range ownedCrons { - result, err := common.Cleanup(request, &ownedCrons[i]) + if request.CrdWatch.CrdExists(dataImportCronCrd) { + ownedCrons, err := listAllOwnedDataImportCrons(request) if err != nil { return nil, err } - results = append(results, result) - if !result.Deleted { - allDataImportCronsDeleted = false + + var results []common.CleanupResult + allDataImportCronsDeleted := true + for i := range ownedCrons { + result, err := common.Cleanup(request, &ownedCrons[i]) + if err != nil { + return nil, err + } + results = append(results, result) + if !result.Deleted { + allDataImportCronsDeleted = false + } } - } - // The rest of the objects will be deleted when all DataImportCrons are deleted. - if !allDataImportCronsDeleted { - return results, nil + // The rest of the objects will be deleted when all DataImportCrons are deleted. + if !allDataImportCronsDeleted { + return results, nil + } } var objects []client.Object - for i := range d.sources { - ds := d.sources[i] - ds.Namespace = internal.GoldenImagesNamespace - objects = append(objects, &ds) + if request.CrdWatch.CrdExists(dataSourceCrd) { + for i := range d.sources { + ds := d.sources[i] + ds.Namespace = internal.GoldenImagesNamespace + objects = append(objects, &ds) + } } objects = append(objects, diff --git a/internal/operands/metrics/reconcile.go b/internal/operands/metrics/reconcile.go index a39bf1aa8..007e44d8b 100644 --- a/internal/operands/metrics/reconcile.go +++ b/internal/operands/metrics/reconcile.go @@ -14,6 +14,8 @@ import ( // +kubebuilder:rbac:groups=rbac.authorization.k8s.io/v1,resources=role;rolebinding;serviceaccount,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups="",resources=pods;endpoints,verbs=get;list;watch +const prometheusRulesCrd = "prometheusrules.monitoring.coreos.com" + func init() { utilruntime.Must(promv1.AddToScheme(common.Scheme)) } @@ -27,8 +29,8 @@ func WatchTypes() []operands.WatchType { func WatchClusterTypes() []operands.WatchType { return []operands.WatchType{ - {Object: &rbac.ClusterRole{}}, - {Object: &rbac.ClusterRoleBinding{}}, + {Object: &rbac.ClusterRole{}, Crd: prometheusRulesCrd}, + {Object: &rbac.ClusterRoleBinding{}, Crd: prometheusRulesCrd}, } } @@ -47,7 +49,7 @@ func (m *metrics) WatchClusterTypes() []operands.WatchType { } func (m *metrics) RequiredCrds() []string { - return []string{"prometheusrules.monitoring.coreos.com"} + return []string{prometheusRulesCrd} } func (m *metrics) Reconcile(request *common.Request) ([]common.ReconcileResult, error) { diff --git a/internal/operands/operand.go b/internal/operands/operand.go index f3f0015d1..5749d33a9 100644 --- a/internal/operands/operand.go +++ b/internal/operands/operand.go @@ -30,6 +30,9 @@ type Operand interface { type WatchType struct { Object client.Object + // Crd name that defines the object + Crd string + // WatchFullObject specifies if the operator should watch for any changes in the full object. // Otherwise, only these changes in spec, labels, and annotations. // If an object does not have spec field, the full object is watched by default. diff --git a/main.go b/main.go index 51a7f9931..92b087afc 100644 --- a/main.go +++ b/main.go @@ -136,7 +136,9 @@ func main() { os.Exit(1) } - tlsOptions, err := common.GetSspTlsOptions() + ctx := ctrl.SetupSignalHandler() + + tlsOptions, err := common.GetSspTlsOptions(ctx) if err != nil { setupLog.Error(err, "Error while getting tls profile") os.Exit(1) @@ -163,10 +165,6 @@ func main() { os.Exit(1) } - if err = controllers.CreateAndSetupReconciler(mgr); err != nil { - setupLog.Error(err, "unable to create controller", "controller", "SSP") - os.Exit(1) - } if os.Getenv("ENABLE_WEBHOOKS") != "false" { if err = webhooks.Setup(mgr); err != nil { setupLog.Error(err, "unable to create webhook", "webhook", "SSP") @@ -183,10 +181,8 @@ func main() { } // +kubebuilder:scaffold:builder - - setupLog.Info("starting manager") - if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil { - setupLog.Error(err, "problem running manager") + if err = controllers.CreateAndStartReconciler(ctx, mgr); err != nil { + setupLog.Error(err, "unable to create or start controller", "controller", "SSP") os.Exit(1) } } diff --git a/vendor/github.com/openshift/api/config/v1/0000_03_config-operator_01_operatorhub.crd.yaml b/vendor/github.com/openshift/api/config/v1/0000_03_config-operator_01_operatorhub.crd.yaml new file mode 100644 index 000000000..4ba6c01cf --- /dev/null +++ b/vendor/github.com/openshift/api/config/v1/0000_03_config-operator_01_operatorhub.crd.yaml @@ -0,0 +1,83 @@ +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + annotations: + api-approved.openshift.io: https://github.com/openshift/api/pull/470 + include.release.openshift.io/ibm-cloud-managed: "true" + include.release.openshift.io/self-managed-high-availability: "true" + include.release.openshift.io/single-node-developer: "true" + name: operatorhubs.config.openshift.io +spec: + group: config.openshift.io + names: + kind: OperatorHub + listKind: OperatorHubList + plural: operatorhubs + singular: operatorhub + scope: Cluster + versions: + - name: v1 + schema: + openAPIV3Schema: + description: "OperatorHub is the Schema for the operatorhubs API. It can be used to change the state of the default hub sources for OperatorHub on the cluster from enabled to disabled and vice versa. \n Compatibility level 1: Stable within a major release for a minimum of 12 months or 3 minor releases (whichever is longer)." + type: object + properties: + apiVersion: + description: 'APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources' + type: string + kind: + description: 'Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds' + type: string + metadata: + type: object + spec: + description: OperatorHubSpec defines the desired state of OperatorHub + type: object + properties: + disableAllDefaultSources: + description: disableAllDefaultSources allows you to disable all the default hub sources. If this is true, a specific entry in sources can be used to enable a default source. If this is false, a specific entry in sources can be used to disable or enable a default source. + type: boolean + sources: + description: sources is the list of default hub sources and their configuration. If the list is empty, it implies that the default hub sources are enabled on the cluster unless disableAllDefaultSources is true. If disableAllDefaultSources is true and sources is not empty, the configuration present in sources will take precedence. The list of default hub sources and their current state will always be reflected in the status block. + type: array + items: + description: HubSource is used to specify the hub source and its configuration + type: object + properties: + disabled: + description: disabled is used to disable a default hub source on cluster + type: boolean + name: + description: name is the name of one of the default hub sources + type: string + maxLength: 253 + minLength: 1 + status: + description: OperatorHubStatus defines the observed state of OperatorHub. The current state of the default hub sources will always be reflected here. + type: object + properties: + sources: + description: sources encapsulates the result of applying the configuration for each hub source + type: array + items: + description: HubSourceStatus is used to reflect the current state of applying the configuration to a default source + type: object + properties: + disabled: + description: disabled is used to disable a default hub source on cluster + type: boolean + message: + description: message provides more information regarding failures + type: string + name: + description: name is the name of one of the default hub sources + type: string + maxLength: 253 + minLength: 1 + status: + description: status indicates success or failure in applying the configuration + type: string + served: true + storage: true + subresources: + status: {} diff --git a/vendor/modules.txt b/vendor/modules.txt index 12cd87be6..330318a30 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -699,6 +699,7 @@ kubevirt.io/ssp-operator/api/v1beta1 sigs.k8s.io/controller-runtime sigs.k8s.io/controller-runtime/pkg/builder sigs.k8s.io/controller-runtime/pkg/cache +sigs.k8s.io/controller-runtime/pkg/cache/informertest sigs.k8s.io/controller-runtime/pkg/cache/internal sigs.k8s.io/controller-runtime/pkg/certwatcher sigs.k8s.io/controller-runtime/pkg/certwatcher/metrics @@ -710,6 +711,7 @@ sigs.k8s.io/controller-runtime/pkg/cluster sigs.k8s.io/controller-runtime/pkg/config sigs.k8s.io/controller-runtime/pkg/config/v1alpha1 sigs.k8s.io/controller-runtime/pkg/controller +sigs.k8s.io/controller-runtime/pkg/controller/controllertest sigs.k8s.io/controller-runtime/pkg/controller/controllerutil sigs.k8s.io/controller-runtime/pkg/conversion sigs.k8s.io/controller-runtime/pkg/event diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/cache/informertest/fake_cache.go b/vendor/sigs.k8s.io/controller-runtime/pkg/cache/informertest/fake_cache.go new file mode 100644 index 000000000..e115d380f --- /dev/null +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/cache/informertest/fake_cache.go @@ -0,0 +1,141 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package informertest + +import ( + "context" + + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/kubernetes/scheme" + toolscache "k8s.io/client-go/tools/cache" + "sigs.k8s.io/controller-runtime/pkg/cache" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllertest" +) + +var _ cache.Cache = &FakeInformers{} + +// FakeInformers is a fake implementation of Informers. +type FakeInformers struct { + InformersByGVK map[schema.GroupVersionKind]toolscache.SharedIndexInformer + Scheme *runtime.Scheme + Error error + Synced *bool +} + +// GetInformerForKind implements Informers. +func (c *FakeInformers) GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind) (cache.Informer, error) { + if c.Scheme == nil { + c.Scheme = scheme.Scheme + } + obj, err := c.Scheme.New(gvk) + if err != nil { + return nil, err + } + return c.informerFor(gvk, obj) +} + +// FakeInformerForKind implements Informers. +func (c *FakeInformers) FakeInformerForKind(ctx context.Context, gvk schema.GroupVersionKind) (*controllertest.FakeInformer, error) { + if c.Scheme == nil { + c.Scheme = scheme.Scheme + } + obj, err := c.Scheme.New(gvk) + if err != nil { + return nil, err + } + i, err := c.informerFor(gvk, obj) + if err != nil { + return nil, err + } + return i.(*controllertest.FakeInformer), nil +} + +// GetInformer implements Informers. +func (c *FakeInformers) GetInformer(ctx context.Context, obj client.Object) (cache.Informer, error) { + if c.Scheme == nil { + c.Scheme = scheme.Scheme + } + gvks, _, err := c.Scheme.ObjectKinds(obj) + if err != nil { + return nil, err + } + gvk := gvks[0] + return c.informerFor(gvk, obj) +} + +// WaitForCacheSync implements Informers. +func (c *FakeInformers) WaitForCacheSync(ctx context.Context) bool { + if c.Synced == nil { + return true + } + return *c.Synced +} + +// FakeInformerFor implements Informers. +func (c *FakeInformers) FakeInformerFor(obj runtime.Object) (*controllertest.FakeInformer, error) { + if c.Scheme == nil { + c.Scheme = scheme.Scheme + } + gvks, _, err := c.Scheme.ObjectKinds(obj) + if err != nil { + return nil, err + } + gvk := gvks[0] + i, err := c.informerFor(gvk, obj) + if err != nil { + return nil, err + } + return i.(*controllertest.FakeInformer), nil +} + +func (c *FakeInformers) informerFor(gvk schema.GroupVersionKind, _ runtime.Object) (toolscache.SharedIndexInformer, error) { + if c.Error != nil { + return nil, c.Error + } + if c.InformersByGVK == nil { + c.InformersByGVK = map[schema.GroupVersionKind]toolscache.SharedIndexInformer{} + } + informer, ok := c.InformersByGVK[gvk] + if ok { + return informer, nil + } + + c.InformersByGVK[gvk] = &controllertest.FakeInformer{} + return c.InformersByGVK[gvk], nil +} + +// Start implements Informers. +func (c *FakeInformers) Start(ctx context.Context) error { + return c.Error +} + +// IndexField implements Cache. +func (c *FakeInformers) IndexField(ctx context.Context, obj client.Object, field string, extractValue client.IndexerFunc) error { + return nil +} + +// Get implements Cache. +func (c *FakeInformers) Get(ctx context.Context, key client.ObjectKey, obj client.Object) error { + return nil +} + +// List implements Cache. +func (c *FakeInformers) List(ctx context.Context, list client.ObjectList, opts ...client.ListOption) error { + return nil +} diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/controller/controllertest/doc.go b/vendor/sigs.k8s.io/controller-runtime/pkg/controller/controllertest/doc.go new file mode 100644 index 000000000..91c5a3e35 --- /dev/null +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/controller/controllertest/doc.go @@ -0,0 +1,20 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package controllertest contains fake informers for testing controllers +// When in doubt, it's almost always better to test against a real API server +// using envtest.Environment. +package controllertest diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/controller/controllertest/testing.go b/vendor/sigs.k8s.io/controller-runtime/pkg/controller/controllertest/testing.go new file mode 100644 index 000000000..b9f97d528 --- /dev/null +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/controller/controllertest/testing.go @@ -0,0 +1,62 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controllertest + +import ( + "time" + + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/util/workqueue" +) + +var _ runtime.Object = &ErrorType{} + +// ErrorType implements runtime.Object but isn't registered in any scheme and should cause errors in tests as a result. +type ErrorType struct{} + +// GetObjectKind implements runtime.Object. +func (ErrorType) GetObjectKind() schema.ObjectKind { return nil } + +// DeepCopyObject implements runtime.Object. +func (ErrorType) DeepCopyObject() runtime.Object { return nil } + +var _ workqueue.RateLimitingInterface = Queue{} + +// Queue implements a RateLimiting queue as a non-ratelimited queue for testing. +// This helps testing by having functions that use a RateLimiting queue synchronously add items to the queue. +type Queue struct { + workqueue.Interface +} + +// AddAfter implements RateLimitingInterface. +func (q Queue) AddAfter(item interface{}, duration time.Duration) { + q.Add(item) +} + +// AddRateLimited implements RateLimitingInterface. TODO(community): Implement this. +func (q Queue) AddRateLimited(item interface{}) { + q.Add(item) +} + +// Forget implements RateLimitingInterface. TODO(community): Implement this. +func (q Queue) Forget(item interface{}) {} + +// NumRequeues implements RateLimitingInterface. TODO(community): Implement this. +func (q Queue) NumRequeues(item interface{}) int { + return 0 +} diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/controller/controllertest/unconventionallisttypecrd.go b/vendor/sigs.k8s.io/controller-runtime/pkg/controller/controllertest/unconventionallisttypecrd.go new file mode 100644 index 000000000..d0f501715 --- /dev/null +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/controller/controllertest/unconventionallisttypecrd.go @@ -0,0 +1,76 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controllertest + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" +) + +var _ runtime.Object = &UnconventionalListType{} +var _ runtime.Object = &UnconventionalListTypeList{} + +// UnconventionalListType is used to test CRDs with List types that +// have a slice of pointers rather than a slice of literals. +type UnconventionalListType struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty"` + Spec string `json:"spec,omitempty"` +} + +// DeepCopyObject implements runtime.Object +// Handwritten for simplicity. +func (u *UnconventionalListType) DeepCopyObject() runtime.Object { + return u.DeepCopy() +} + +// DeepCopy implements *UnconventionalListType +// Handwritten for simplicity. +func (u *UnconventionalListType) DeepCopy() *UnconventionalListType { + return &UnconventionalListType{ + TypeMeta: u.TypeMeta, + ObjectMeta: *u.ObjectMeta.DeepCopy(), + Spec: u.Spec, + } +} + +// UnconventionalListTypeList is used to test CRDs with List types that +// have a slice of pointers rather than a slice of literals. +type UnconventionalListTypeList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata,omitempty"` + Items []*UnconventionalListType `json:"items"` +} + +// DeepCopyObject implements runtime.Object +// Handwritten for simplicity. +func (u *UnconventionalListTypeList) DeepCopyObject() runtime.Object { + return u.DeepCopy() +} + +// DeepCopy implements *UnconventionalListTypeListt +// Handwritten for simplicity. +func (u *UnconventionalListTypeList) DeepCopy() *UnconventionalListTypeList { + out := &UnconventionalListTypeList{ + TypeMeta: u.TypeMeta, + ListMeta: *u.ListMeta.DeepCopy(), + } + for _, item := range u.Items { + out.Items = append(out.Items, item.DeepCopy()) + } + return out +} diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/controller/controllertest/util.go b/vendor/sigs.k8s.io/controller-runtime/pkg/controller/controllertest/util.go new file mode 100644 index 000000000..b638b4976 --- /dev/null +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/controller/controllertest/util.go @@ -0,0 +1,118 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controllertest + +import ( + "time" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/tools/cache" +) + +var _ cache.SharedIndexInformer = &FakeInformer{} + +// FakeInformer provides fake Informer functionality for testing. +type FakeInformer struct { + // Synced is returned by the HasSynced functions to implement the Informer interface + Synced bool + + // RunCount is incremented each time RunInformersAndControllers is called + RunCount int + + handlers []cache.ResourceEventHandler +} + +// AddIndexers does nothing. TODO(community): Implement this. +func (f *FakeInformer) AddIndexers(indexers cache.Indexers) error { + return nil +} + +// GetIndexer does nothing. TODO(community): Implement this. +func (f *FakeInformer) GetIndexer() cache.Indexer { + return nil +} + +// Informer returns the fake Informer. +func (f *FakeInformer) Informer() cache.SharedIndexInformer { + return f +} + +// HasSynced implements the Informer interface. Returns f.Synced. +func (f *FakeInformer) HasSynced() bool { + return f.Synced +} + +// AddEventHandler implements the Informer interface. Adds an EventHandler to the fake Informers. +func (f *FakeInformer) AddEventHandler(handler cache.ResourceEventHandler) { + f.handlers = append(f.handlers, handler) +} + +// Run implements the Informer interface. Increments f.RunCount. +func (f *FakeInformer) Run(<-chan struct{}) { + f.RunCount++ +} + +// Add fakes an Add event for obj. +func (f *FakeInformer) Add(obj metav1.Object) { + for _, h := range f.handlers { + h.OnAdd(obj) + } +} + +// Update fakes an Update event for obj. +func (f *FakeInformer) Update(oldObj, newObj metav1.Object) { + for _, h := range f.handlers { + h.OnUpdate(oldObj, newObj) + } +} + +// Delete fakes an Delete event for obj. +func (f *FakeInformer) Delete(obj metav1.Object) { + for _, h := range f.handlers { + h.OnDelete(obj) + } +} + +// AddEventHandlerWithResyncPeriod does nothing. TODO(community): Implement this. +func (f *FakeInformer) AddEventHandlerWithResyncPeriod(handler cache.ResourceEventHandler, resyncPeriod time.Duration) { + +} + +// GetStore does nothing. TODO(community): Implement this. +func (f *FakeInformer) GetStore() cache.Store { + return nil +} + +// GetController does nothing. TODO(community): Implement this. +func (f *FakeInformer) GetController() cache.Controller { + return nil +} + +// LastSyncResourceVersion does nothing. TODO(community): Implement this. +func (f *FakeInformer) LastSyncResourceVersion() string { + return "" +} + +// SetWatchErrorHandler does nothing. TODO(community): Implement this. +func (f *FakeInformer) SetWatchErrorHandler(cache.WatchErrorHandler) error { + return nil +} + +// SetTransform does nothing. TODO(community): Implement this. +func (f *FakeInformer) SetTransform(t cache.TransformFunc) error { + return nil +}