From 7b1f831874b47367e40204c9e2a4e99a39b840f7 Mon Sep 17 00:00:00 2001 From: Alessandro Olivero Date: Wed, 23 Jun 2021 16:36:37 +0000 Subject: [PATCH] resource offer operator improvements --- .../resourceoffer_controller.go | 64 +++++------ .../resourceoffer_controller_methods.go | 104 ++++++++---------- 2 files changed, 80 insertions(+), 88 deletions(-) diff --git a/pkg/liqo-controller-manager/resourceoffer-controller/resourceoffer_controller.go b/pkg/liqo-controller-manager/resourceoffer-controller/resourceoffer_controller.go index 6c79953c14..7022d7359a 100644 --- a/pkg/liqo-controller-manager/resourceoffer-controller/resourceoffer_controller.go +++ b/pkg/liqo-controller-manager/resourceoffer-controller/resourceoffer_controller.go @@ -24,13 +24,12 @@ import ( v1 "k8s.io/api/apps/v1" kerrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/tools/record" "k8s.io/klog/v2" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/predicate" configv1alpha1 "github.com/liqotech/liqo/apis/config/v1alpha1" @@ -61,11 +60,11 @@ type ResourceOfferReconciler struct { // Reconcile is part of the main kubernetes reconciliation loop which aims to // move the current state of the cluster closer to the desired state. -func (r *ResourceOfferReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { +func (r *ResourceOfferReconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ctrl.Result, err error) { klog.V(4).Infof("reconciling ResourceOffer %v", req.NamespacedName) // get resource offer var resourceOffer sharingv1alpha1.ResourceOffer - if err := r.Get(ctx, req.NamespacedName, &resourceOffer); err != nil { + if err = r.Get(ctx, req.NamespacedName, &resourceOffer); err != nil { if kerrors.IsNotFound(err) { // reconcile was triggered by a delete request klog.Infof("ResourceRequest %v deleted", req.NamespacedName) @@ -77,32 +76,44 @@ func (r *ResourceOfferReconciler) Reconcile(ctx context.Context, req ctrl.Reques } // we do that on ResourceOffer creation - if result, err := r.setOwnerReference(ctx, &resourceOffer); err != nil { - klog.Error(err) - return ctrl.Result{}, err - } else if result != controllerutil.OperationResultNone { + if metav1.GetControllerOf(&resourceOffer) == nil { + if err = r.setControllerReference(ctx, &resourceOffer); err != nil { + klog.Error(err) + return ctrl.Result{}, err + } + if err = r.Client.Update(ctx, &resourceOffer); err != nil { + klog.Error(err) + return ctrl.Result{}, err + } + // we always return after a metadata or spec update to have a clean resource where to work return ctrl.Result{}, nil } + result = ctrl.Result{RequeueAfter: r.resyncPeriod} + + // defer the status update function + defer func() { + if newErr := r.Client.Status().Update(ctx, &resourceOffer); newErr != nil { + klog.Error(newErr) + err = newErr + } + }() + // filter resource offers and create a virtual-kubelet only for the good ones - if result, err := r.setResourceOfferPhase(ctx, &resourceOffer); err != nil { + if err = r.setResourceOfferPhase(ctx, &resourceOffer); err != nil { klog.Error(err) return ctrl.Result{}, err - } else if result != controllerutil.OperationResultNone { - return ctrl.Result{}, nil } // check the virtual kubelet deployment - if result, err := r.checkVirtualKubeletDeployment(ctx, &resourceOffer); err != nil { + if err = r.checkVirtualKubeletDeployment(ctx, &resourceOffer); err != nil { klog.Error(err) return ctrl.Result{}, err - } else if result != controllerutil.OperationResultNone { - return ctrl.Result{}, nil } // delete the ClusterRoleBinding if the VirtualKubelet Deployment is not up if resourceOffer.Status.VirtualKubeletStatus == sharingv1alpha1.VirtualKubeletStatusNone { - if err := r.deleteClusterRoleBinding(ctx, &resourceOffer); err != nil { + if err = r.deleteClusterRoleBinding(ctx, &resourceOffer); err != nil { klog.Error(err) return ctrl.Result{}, err } @@ -110,43 +121,32 @@ func (r *ResourceOfferReconciler) Reconcile(ctx context.Context, req ctrl.Reques if !isAccepted(&resourceOffer) || !resourceOffer.DeletionTimestamp.IsZero() { // delete virtual kubelet deployment - if result, err := r.deleteVirtualKubeletDeployment(ctx, &resourceOffer); err != nil { + if err = r.deleteVirtualKubeletDeployment(ctx, &resourceOffer); err != nil { klog.Error(err) return ctrl.Result{}, err - } else if result != controllerutil.OperationResultNone { - return ctrl.Result{}, nil } - return ctrl.Result{RequeueAfter: r.resyncPeriod}, nil + return result, nil } // create the virtual kubelet deployment - if result, err := r.createVirtualKubeletDeployment(ctx, &resourceOffer); err != nil { + if err = r.createVirtualKubeletDeployment(ctx, &resourceOffer); err != nil { klog.Error(err) return ctrl.Result{}, err - } else if result != controllerutil.OperationResultNone { - return ctrl.Result{}, nil } - return ctrl.Result{RequeueAfter: r.resyncPeriod}, nil + return result, nil } // SetupWithManager sets up the controller with the Manager. func (r *ResourceOfferReconciler) SetupWithManager(mgr ctrl.Manager) error { - selector, err := metav1.LabelSelectorAsSelector(&crdreplicator.ReplicatedResourcesLabelSelector) + p, err := predicate.LabelSelectorPredicate(crdreplicator.ReplicatedResourcesLabelSelector) if err != nil { klog.Error(err) return err } - p := predicate.NewPredicateFuncs(func(object client.Object) bool { - matches := selector.Matches(labels.Set(object.GetLabels())) - _, isResourceOffer := object.(*sharingv1alpha1.ResourceOffer) - return matches || !isResourceOffer - }) - return ctrl.NewControllerManagedBy(mgr). - For(&sharingv1alpha1.ResourceOffer{}). + For(&sharingv1alpha1.ResourceOffer{}, builder.WithPredicates(p)). Owns(&v1.Deployment{}). - WithEventFilter(p). Complete(r) } diff --git a/pkg/liqo-controller-manager/resourceoffer-controller/resourceoffer_controller_methods.go b/pkg/liqo-controller-manager/resourceoffer-controller/resourceoffer_controller_methods.go index 04e31081f5..6e918e7566 100644 --- a/pkg/liqo-controller-manager/resourceoffer-controller/resourceoffer_controller_methods.go +++ b/pkg/liqo-controller-manager/resourceoffer-controller/resourceoffer_controller_methods.go @@ -6,7 +6,7 @@ import ( "reflect" "sync" - v1 "k8s.io/api/apps/v1" + appsv1 "k8s.io/api/apps/v1" rbacv1 "k8s.io/api/rbac/v1" "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/client" @@ -49,69 +49,65 @@ func (r *ResourceOfferReconciler) setConfig(config *configv1alpha1.ClusterConfig } } -// setOwnerReference sets owner reference to the related ForeignCluster. -func (r *ResourceOfferReconciler) setOwnerReference( - ctx context.Context, resourceOffer *sharingv1alpha1.ResourceOffer) (controllerutil.OperationResult, error) { +// setControllerReference sets owner reference to the related ForeignCluster. +func (r *ResourceOfferReconciler) setControllerReference( + ctx context.Context, resourceOffer *sharingv1alpha1.ResourceOffer) error { // get the foreign cluster by clusterID label foreignCluster, err := foreigncluster.GetForeignClusterByID(ctx, r.Client, resourceOffer.Spec.ClusterId) if err != nil { klog.Error(err) - return controllerutil.OperationResultNone, err - } - return controllerutil.CreateOrUpdate(ctx, r.Client, resourceOffer, func() error { - // add owner reference, if it is not already set - if err := controllerutil.SetControllerReference(foreignCluster, resourceOffer, r.Scheme); err != nil { - klog.Error(err) - return err - } - return nil - }) + return err + } + + // add controller reference, if it is not already set + if err := controllerutil.SetControllerReference(foreignCluster, resourceOffer, r.Scheme); err != nil { + klog.Error(err) + return err + } + + return nil } // setResourceOfferPhase checks if the resource request can be accepted and set its phase accordingly. func (r *ResourceOfferReconciler) setResourceOfferPhase( - ctx context.Context, resourceOffer *sharingv1alpha1.ResourceOffer) (controllerutil.OperationResult, error) { + ctx context.Context, resourceOffer *sharingv1alpha1.ResourceOffer) error { // we want only to care about resource offers with a pending status if resourceOffer.Status.Phase != "" && resourceOffer.Status.Phase != sharingv1alpha1.ResourceOfferPending { - return controllerutil.OperationResultNone, nil + return nil } - return controllerutil.CreateOrPatch(ctx, r.Client, resourceOffer, func() error { - switch r.getConfig().Spec.AdvertisementConfig.IngoingConfig.AcceptPolicy { - case configv1alpha1.AutoAcceptMax: - resourceOffer.Status.Phase = sharingv1alpha1.ResourceOfferAccepted - case configv1alpha1.ManualAccept: - // require a manual accept/refuse - resourceOffer.Status.Phase = sharingv1alpha1.ResourceOfferManualActionRequired - } - return nil - }) + switch r.getConfig().Spec.AdvertisementConfig.IngoingConfig.AcceptPolicy { + case configv1alpha1.AutoAcceptMax: + resourceOffer.Status.Phase = sharingv1alpha1.ResourceOfferAccepted + case configv1alpha1.ManualAccept: + // require a manual accept/refuse + resourceOffer.Status.Phase = sharingv1alpha1.ResourceOfferManualActionRequired + } + return nil } // checkVirtualKubeletDeployment checks the existence of the VirtualKubelet Deployment // and sets its status in the ResourceOffer accordingly. func (r *ResourceOfferReconciler) checkVirtualKubeletDeployment( - ctx context.Context, resourceOffer *sharingv1alpha1.ResourceOffer) (controllerutil.OperationResult, error) { + ctx context.Context, resourceOffer *sharingv1alpha1.ResourceOffer) error { virtualKubeletDeployment, err := r.getVirtualKubeletDeployment(ctx, resourceOffer) if err != nil { klog.Error(err) - return controllerutil.OperationResultNone, err + return err } - return controllerutil.CreateOrPatch(ctx, r.Client, resourceOffer, func() error { - if virtualKubeletDeployment == nil { - resourceOffer.Status.VirtualKubeletStatus = sharingv1alpha1.VirtualKubeletStatusNone - } else if resourceOffer.Status.VirtualKubeletStatus != sharingv1alpha1.VirtualKubeletStatusDeleting { - // there is a virtual kubelet deployment and the phase is not deleting - resourceOffer.Status.VirtualKubeletStatus = sharingv1alpha1.VirtualKubeletStatusCreated - } - return nil - }) + if virtualKubeletDeployment == nil { + resourceOffer.Status.VirtualKubeletStatus = sharingv1alpha1.VirtualKubeletStatusNone + } else if resourceOffer.Status.VirtualKubeletStatus != sharingv1alpha1.VirtualKubeletStatusDeleting { + // there is a virtual kubelet deployment and the phase is not deleting + resourceOffer.Status.VirtualKubeletStatus = sharingv1alpha1.VirtualKubeletStatusCreated + } + return nil } // createVirtualKubeletDeployment creates the VirtualKubelet Deployment. func (r *ResourceOfferReconciler) createVirtualKubeletDeployment( - ctx context.Context, resourceOffer *sharingv1alpha1.ResourceOffer) (controllerutil.OperationResult, error) { + ctx context.Context, resourceOffer *sharingv1alpha1.ResourceOffer) error { name := virtualKubelet.VirtualKubeletPrefix + resourceOffer.Spec.ClusterId nodeName := virtualKubelet.VirtualNodePrefix + resourceOffer.Spec.ClusterId @@ -125,7 +121,7 @@ func (r *ResourceOfferReconciler) createVirtualKubeletDeployment( }) if err != nil { klog.Error(err) - return op, err + return err } klog.V(5).Infof("[%v] ServiceAccount %s/%s reconciled: %s", remoteClusterID, vkServiceAccount.Namespace, vkServiceAccount.Name, op) @@ -135,7 +131,7 @@ func (r *ResourceOfferReconciler) createVirtualKubeletDeployment( }) if err != nil { klog.Error(err) - return op, err + return err } klog.V(5).Infof("[%v] ClusterRoleBinding %s reconciled: %s", remoteClusterID, vkClusterRoleBinding.Name, op) @@ -145,7 +141,7 @@ func (r *ResourceOfferReconciler) createVirtualKubeletDeployment( r.initVirtualKubeletImage, nodeName, r.clusterID.GetClusterID()) if err != nil { klog.Error(err) - return controllerutil.OperationResultNone, err + return err } op, err = controllerutil.CreateOrUpdate(context.TODO(), r.Client, vkDeployment, func() error { @@ -153,7 +149,7 @@ func (r *ResourceOfferReconciler) createVirtualKubeletDeployment( }) if err != nil { klog.Error(err) - return op, err + return err } klog.V(5).Infof("[%v] Deployment %s/%s reconciled: %s", remoteClusterID, vkDeployment.Namespace, vkDeployment.Name, op) @@ -163,37 +159,33 @@ func (r *ResourceOfferReconciler) createVirtualKubeletDeployment( r.eventsRecorder.Event(resourceOffer, "Normal", "VkCreated", msg) } - return controllerutil.CreateOrPatch(ctx, r.Client, resourceOffer, func() error { - resourceOffer.Status.VirtualKubeletStatus = sharingv1alpha1.VirtualKubeletStatusCreated - return nil - }) + resourceOffer.Status.VirtualKubeletStatus = sharingv1alpha1.VirtualKubeletStatusCreated + return nil } // deleteVirtualKubeletDeployment deletes the VirtualKubelet Deployment. func (r *ResourceOfferReconciler) deleteVirtualKubeletDeployment( - ctx context.Context, resourceOffer *sharingv1alpha1.ResourceOffer) (controllerutil.OperationResult, error) { + ctx context.Context, resourceOffer *sharingv1alpha1.ResourceOffer) error { virtualKubeletDeployment, err := r.getVirtualKubeletDeployment(ctx, resourceOffer) if err != nil { klog.Error(err) - return controllerutil.OperationResultNone, err + return err } if virtualKubeletDeployment == nil || !virtualKubeletDeployment.DeletionTimestamp.IsZero() { - return controllerutil.OperationResultNone, nil + return nil } if err := r.Client.Delete(ctx, virtualKubeletDeployment); err != nil { klog.Error(err) - return controllerutil.OperationResultNone, err + return err } msg := fmt.Sprintf("[%v] Deleting virtual-kubelet in namespace %v", resourceOffer.Spec.ClusterId, resourceOffer.Namespace) klog.Info(msg) r.eventsRecorder.Event(resourceOffer, "Normal", "VkDeleted", msg) - return controllerutil.CreateOrPatch(ctx, r.Client, resourceOffer, func() error { - resourceOffer.Status.VirtualKubeletStatus = sharingv1alpha1.VirtualKubeletStatusDeleting - return nil - }) + resourceOffer.Status.VirtualKubeletStatus = sharingv1alpha1.VirtualKubeletStatusDeleting + return nil } // deleteClusterRoleBinding deletes the ClusterRoleBinding related to a VirtualKubelet if the deployment does not exist. @@ -210,8 +202,8 @@ func (r *ResourceOfferReconciler) deleteClusterRoleBinding( // getVirtualKubeletDeployment returns the VirtualKubelet Deployment given a ResourceOffer. func (r *ResourceOfferReconciler) getVirtualKubeletDeployment( - ctx context.Context, resourceOffer *sharingv1alpha1.ResourceOffer) (*v1.Deployment, error) { - var deployList v1.DeploymentList + ctx context.Context, resourceOffer *sharingv1alpha1.ResourceOffer) (*appsv1.Deployment, error) { + var deployList appsv1.DeploymentList labels := forge.VirtualKubeletLabels(resourceOffer.Spec.ClusterId) if err := r.Client.List(ctx, &deployList, client.MatchingLabels(labels)); err != nil { klog.Error(err)