Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Watch required CRDs, and restart operator if they are removed #417

Merged
merged 3 commits into from Oct 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
101 changes: 0 additions & 101 deletions controllers/crd_controller.go

This file was deleted.

84 changes: 0 additions & 84 deletions controllers/finishable/finishable.go

This file was deleted.

12 changes: 6 additions & 6 deletions controllers/services_controller.go
Expand Up @@ -61,8 +61,8 @@ func ServiceObject(namespace string) *v1.Service {
// Annotation to generate RBAC roles to read and modify services
// +kubebuilder:rbac:groups="",resources=services,verbs=get;watch;list;create;update;delete

func CreateServiceController(mgr ctrl.Manager) (*serviceReconciler, error) {
return newServiceReconciler(mgr)
func CreateServiceController(ctx context.Context, mgr ctrl.Manager) (*serviceReconciler, error) {
return newServiceReconciler(ctx, mgr)
}

func (r *serviceReconciler) Start(ctx context.Context, mgr ctrl.Manager) error {
Expand Down Expand Up @@ -105,24 +105,24 @@ type serviceReconciler struct {
deployment *apps.Deployment
}

func getOperatorDeployment(namespace string, apiReader client.Reader) (*apps.Deployment, error) {
func getOperatorDeployment(ctx context.Context, namespace string, apiReader client.Reader) (*apps.Deployment, error) {
objKey := client.ObjectKey{Namespace: namespace, Name: OperatorName}
var deployment apps.Deployment
err := apiReader.Get(context.TODO(), objKey, &deployment)
err := apiReader.Get(ctx, objKey, &deployment)
if err != nil {
return nil, fmt.Errorf("getOperatorDeployment, get deployment: %w", err)
}
return &deployment, nil
}

func newServiceReconciler(mgr ctrl.Manager) (*serviceReconciler, error) {
func newServiceReconciler(ctx context.Context, mgr ctrl.Manager) (*serviceReconciler, error) {
logger := ctrl.Log.WithName("controllers").WithName("Resources")
namespace, err := common.GetOperatorNamespace(logger)
if err != nil {
return nil, fmt.Errorf("in newServiceReconciler: %w", err)
}

deployment, err := getOperatorDeployment(namespace, mgr.GetAPIReader())
deployment, err := getOperatorDeployment(ctx, namespace, mgr.GetAPIReader())
if err != nil {
return nil, fmt.Errorf("in newServiceReconciler: %w", err)
}
Expand Down
88 changes: 38 additions & 50 deletions controllers/setup.go
Expand Up @@ -5,9 +5,8 @@ import (
"fmt"
"path/filepath"

extv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"

"kubevirt.io/ssp-operator/internal/common"
crd_watch "kubevirt.io/ssp-operator/internal/crd-watch"
"kubevirt.io/ssp-operator/internal/operands"
common_templates "kubevirt.io/ssp-operator/internal/operands/common-templates"
data_sources "kubevirt.io/ssp-operator/internal/operands/data-sources"
Expand All @@ -19,11 +18,14 @@ import (
"sigs.k8s.io/controller-runtime/pkg/manager"
)

func CreateAndSetupReconciler(mgr controllerruntime.Manager) error {
// Need to watch CRDs
// +kubebuilder:rbac:groups=apiextensions.k8s.io,resources=customresourcedefinitions,verbs=get;list;watch

func CreateAndStartReconciler(ctx context.Context, mgr controllerruntime.Manager) error {
templatesFile := filepath.Join(templateBundleDir, "common-templates-"+common_templates.Version+".yaml")
templatesBundle, err := template_bundle.ReadBundle(templatesFile)
if err != nil {
return err
return fmt.Errorf("failed to read template bundle: %w", err)
}

sspOperands := []operands.Operand{
Expand All @@ -39,79 +41,65 @@ func CreateAndSetupReconciler(mgr controllerruntime.Manager) error {
requiredCrds = append(requiredCrds, sspOperands[i].RequiredCrds()...)
}

// Check if all needed CRDs exist
crdList := &extv1.CustomResourceDefinitionList{}
err = mgr.GetAPIReader().List(context.TODO(), crdList)
mgrCtx, cancel := context.WithCancel(ctx)
defer cancel()

crdWatch := crd_watch.New(requiredCrds...)
// Cleanly stops the manager and exit. The pod will be restarted.
crdWatch.AllCrdsAddedHandler = cancel
crdWatch.SomeCrdRemovedHandler = cancel

err = crdWatch.Init(mgrCtx, mgr.GetAPIReader())
if err != nil {
return err
}

infrastructureTopology, err := common.GetInfrastructureTopology(mgr.GetAPIReader())
if missingCrds := crdWatch.MissingCrds(); len(missingCrds) > 0 {
mgr.GetLogger().Error(nil, "Some required crds are missing. The operator will not create any new resources.",
"missingCrds", missingCrds,
)
}

err = mgr.Add(crdWatch)
if err != nil {
return err
}

serviceController, err := CreateServiceController(mgr)
infrastructureTopology, err := common.GetInfrastructureTopology(mgrCtx, mgr.GetAPIReader())
if err != nil {
return err
return fmt.Errorf("failed to get infrastructure topology: %w", err)
}

serviceController, err := CreateServiceController(mgrCtx, mgr)
if err != nil {
return fmt.Errorf("failed to create service controller: %w", err)
}

err = mgr.Add(manager.RunnableFunc(func(ctx context.Context) error {
err := serviceController.Start(ctx, mgr)
if err != nil {
return fmt.Errorf("error adding serviceController: %w", err)
return fmt.Errorf("error starting serviceController: %w", err)
}

mgr.GetLogger().Info("Services Controller started")

return nil
}))
if err != nil {
return err
return fmt.Errorf("error adding service controller: %w", err)
}

reconciler := NewSspReconciler(mgr.GetClient(), mgr.GetAPIReader(), infrastructureTopology, sspOperands)
reconciler := NewSspReconciler(mgr.GetClient(), mgr.GetAPIReader(), infrastructureTopology, sspOperands, crdWatch)

if requiredCrdsExist(requiredCrds, crdList.Items) {
// No need to start CRD controller
return reconciler.setupController(mgr)
}

mgr.GetLogger().Info("Required CRDs do not exist. Waiting until they are installed.",
"required_crds", requiredCrds,
)

crdController, err := CreateCrdController(mgr, requiredCrds)
err = reconciler.setupController(mgr)
if err != nil {
return err
}

return mgr.Add(manager.RunnableFunc(func(ctx context.Context) error {
// First start the CRD controller
err := crdController.Start(ctx)
if err != nil {
return err
}

mgr.GetLogger().Info("Required CRDs were installed, starting SSP operator.")

// Clear variable, so it can be garbage collected
crdController = nil

// After it is finished, add the SSP controller to the manager
return reconciler.setupController(mgr)
}))
}

func requiredCrdsExist(required []string, foundCrds []extv1.CustomResourceDefinition) bool {
OuterLoop:
for i := range required {
for j := range foundCrds {
if required[i] == foundCrds[j].Name {
continue OuterLoop
}
}
return false
mgr.GetLogger().Info("starting manager")
if err := mgr.Start(mgrCtx); err != nil {
mgr.GetLogger().Error(err, "problem running manager")
return err
}
return true
return nil
}