Skip to content

Commit

Permalink
resource offer operator improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
aleoli committed Jun 24, 2021
1 parent 068bb8d commit ff236dc
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 88 deletions.
Expand Up @@ -24,13 +24,12 @@ import (
v1 "k8s.io/api/apps/v1"
kerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/predicate"

configv1alpha1 "github.com/liqotech/liqo/apis/config/v1alpha1"
Expand Down Expand Up @@ -61,11 +60,11 @@ type ResourceOfferReconciler struct {

// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
func (r *ResourceOfferReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
func (r *ResourceOfferReconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ctrl.Result, err error) {
klog.V(4).Infof("reconciling ResourceOffer %v", req.NamespacedName)
// get resource offer
var resourceOffer sharingv1alpha1.ResourceOffer
if err := r.Get(ctx, req.NamespacedName, &resourceOffer); err != nil {
if err = r.Get(ctx, req.NamespacedName, &resourceOffer); err != nil {
if kerrors.IsNotFound(err) {
// reconcile was triggered by a delete request
klog.Infof("ResourceRequest %v deleted", req.NamespacedName)
Expand All @@ -77,76 +76,77 @@ func (r *ResourceOfferReconciler) Reconcile(ctx context.Context, req ctrl.Reques
}

// we do that on ResourceOffer creation
if result, err := r.setOwnerReference(ctx, &resourceOffer); err != nil {
klog.Error(err)
return ctrl.Result{}, err
} else if result != controllerutil.OperationResultNone {
if metav1.GetControllerOf(&resourceOffer) == nil {
if err = r.setControllerReference(ctx, &resourceOffer); err != nil {
klog.Error(err)
return ctrl.Result{}, err
}
if err = r.Client.Update(ctx, &resourceOffer); err != nil {
klog.Error(err)
return ctrl.Result{}, err
}
// we always return after a metadata or spec update to have a clean resource where to work
return ctrl.Result{}, nil
}

result = ctrl.Result{RequeueAfter: r.resyncPeriod}

// defer the status update function
defer func() {
if newErr := r.Client.Status().Update(ctx, &resourceOffer); newErr != nil {
klog.Error(newErr)
err = newErr
}
}()

// filter resource offers and create a virtual-kubelet only for the good ones
if result, err := r.setResourceOfferPhase(ctx, &resourceOffer); err != nil {
if err = r.setResourceOfferPhase(ctx, &resourceOffer); err != nil {
klog.Error(err)
return ctrl.Result{}, err
} else if result != controllerutil.OperationResultNone {
return ctrl.Result{}, nil
}

// check the virtual kubelet deployment
if result, err := r.checkVirtualKubeletDeployment(ctx, &resourceOffer); err != nil {
if err = r.checkVirtualKubeletDeployment(ctx, &resourceOffer); err != nil {
klog.Error(err)
return ctrl.Result{}, err
} else if result != controllerutil.OperationResultNone {
return ctrl.Result{}, nil
}

// delete the ClusterRoleBinding if the VirtualKubelet Deployment is not up
if resourceOffer.Status.VirtualKubeletStatus == sharingv1alpha1.VirtualKubeletStatusNone {
if err := r.deleteClusterRoleBinding(ctx, &resourceOffer); err != nil {
if err = r.deleteClusterRoleBinding(ctx, &resourceOffer); err != nil {
klog.Error(err)
return ctrl.Result{}, err
}
}

if !isAccepted(&resourceOffer) || !resourceOffer.DeletionTimestamp.IsZero() {
// delete virtual kubelet deployment
if result, err := r.deleteVirtualKubeletDeployment(ctx, &resourceOffer); err != nil {
if err = r.deleteVirtualKubeletDeployment(ctx, &resourceOffer); err != nil {
klog.Error(err)
return ctrl.Result{}, err
} else if result != controllerutil.OperationResultNone {
return ctrl.Result{}, nil
}
return ctrl.Result{RequeueAfter: r.resyncPeriod}, nil
return result, nil
}

// create the virtual kubelet deployment
if result, err := r.createVirtualKubeletDeployment(ctx, &resourceOffer); err != nil {
if err = r.createVirtualKubeletDeployment(ctx, &resourceOffer); err != nil {
klog.Error(err)
return ctrl.Result{}, err
} else if result != controllerutil.OperationResultNone {
return ctrl.Result{}, nil
}

return ctrl.Result{RequeueAfter: r.resyncPeriod}, nil
return result, nil
}

// SetupWithManager sets up the controller with the Manager.
func (r *ResourceOfferReconciler) SetupWithManager(mgr ctrl.Manager) error {
selector, err := metav1.LabelSelectorAsSelector(&crdreplicator.ReplicatedResourcesLabelSelector)
p, err := predicate.LabelSelectorPredicate(crdreplicator.ReplicatedResourcesLabelSelector)
if err != nil {
klog.Error(err)
return err
}

p := predicate.NewPredicateFuncs(func(object client.Object) bool {
matches := selector.Matches(labels.Set(object.GetLabels()))
_, isResourceOffer := object.(*sharingv1alpha1.ResourceOffer)
return matches || !isResourceOffer
})

return ctrl.NewControllerManagedBy(mgr).
For(&sharingv1alpha1.ResourceOffer{}).
For(&sharingv1alpha1.ResourceOffer{}, builder.WithPredicates(p)).
Owns(&v1.Deployment{}).
WithEventFilter(p).
Complete(r)
}
Expand Up @@ -6,7 +6,7 @@ import (
"reflect"
"sync"

v1 "k8s.io/api/apps/v1"
appsv1 "k8s.io/api/apps/v1"
rbacv1 "k8s.io/api/rbac/v1"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -49,69 +49,65 @@ func (r *ResourceOfferReconciler) setConfig(config *configv1alpha1.ClusterConfig
}
}

// setOwnerReference sets owner reference to the related ForeignCluster.
func (r *ResourceOfferReconciler) setOwnerReference(
ctx context.Context, resourceOffer *sharingv1alpha1.ResourceOffer) (controllerutil.OperationResult, error) {
// setControllerReference sets owner reference to the related ForeignCluster.
func (r *ResourceOfferReconciler) setControllerReference(
ctx context.Context, resourceOffer *sharingv1alpha1.ResourceOffer) error {
// get the foreign cluster by clusterID label
foreignCluster, err := foreigncluster.GetForeignClusterByID(ctx, r.Client, resourceOffer.Spec.ClusterId)
if err != nil {
klog.Error(err)
return controllerutil.OperationResultNone, err
}
return controllerutil.CreateOrUpdate(ctx, r.Client, resourceOffer, func() error {
// add owner reference, if it is not already set
if err := controllerutil.SetControllerReference(foreignCluster, resourceOffer, r.Scheme); err != nil {
klog.Error(err)
return err
}
return nil
})
return err
}

// add controller reference, if it is not already set
if err := controllerutil.SetControllerReference(foreignCluster, resourceOffer, r.Scheme); err != nil {
klog.Error(err)
return err
}

return nil
}

// setResourceOfferPhase checks if the resource request can be accepted and set its phase accordingly.
func (r *ResourceOfferReconciler) setResourceOfferPhase(
ctx context.Context, resourceOffer *sharingv1alpha1.ResourceOffer) (controllerutil.OperationResult, error) {
ctx context.Context, resourceOffer *sharingv1alpha1.ResourceOffer) error {
// we want only to care about resource offers with a pending status
if resourceOffer.Status.Phase != "" && resourceOffer.Status.Phase != sharingv1alpha1.ResourceOfferPending {
return controllerutil.OperationResultNone, nil
return nil
}

return controllerutil.CreateOrPatch(ctx, r.Client, resourceOffer, func() error {
switch r.getConfig().Spec.AdvertisementConfig.IngoingConfig.AcceptPolicy {
case configv1alpha1.AutoAcceptMax:
resourceOffer.Status.Phase = sharingv1alpha1.ResourceOfferAccepted
case configv1alpha1.ManualAccept:
// require a manual accept/refuse
resourceOffer.Status.Phase = sharingv1alpha1.ResourceOfferManualActionRequired
}
return nil
})
switch r.getConfig().Spec.AdvertisementConfig.IngoingConfig.AcceptPolicy {
case configv1alpha1.AutoAcceptMax:
resourceOffer.Status.Phase = sharingv1alpha1.ResourceOfferAccepted
case configv1alpha1.ManualAccept:
// require a manual accept/refuse
resourceOffer.Status.Phase = sharingv1alpha1.ResourceOfferManualActionRequired
}
return nil
}

// checkVirtualKubeletDeployment checks the existence of the VirtualKubelet Deployment
// and sets its status in the ResourceOffer accordingly.
func (r *ResourceOfferReconciler) checkVirtualKubeletDeployment(
ctx context.Context, resourceOffer *sharingv1alpha1.ResourceOffer) (controllerutil.OperationResult, error) {
ctx context.Context, resourceOffer *sharingv1alpha1.ResourceOffer) error {
virtualKubeletDeployment, err := r.getVirtualKubeletDeployment(ctx, resourceOffer)
if err != nil {
klog.Error(err)
return controllerutil.OperationResultNone, err
return err
}

return controllerutil.CreateOrPatch(ctx, r.Client, resourceOffer, func() error {
if virtualKubeletDeployment == nil {
resourceOffer.Status.VirtualKubeletStatus = sharingv1alpha1.VirtualKubeletStatusNone
} else if resourceOffer.Status.VirtualKubeletStatus != sharingv1alpha1.VirtualKubeletStatusDeleting {
// there is a virtual kubelet deployment and the phase is not deleting
resourceOffer.Status.VirtualKubeletStatus = sharingv1alpha1.VirtualKubeletStatusCreated
}
return nil
})
if virtualKubeletDeployment == nil {
resourceOffer.Status.VirtualKubeletStatus = sharingv1alpha1.VirtualKubeletStatusNone
} else if resourceOffer.Status.VirtualKubeletStatus != sharingv1alpha1.VirtualKubeletStatusDeleting {
// there is a virtual kubelet deployment and the phase is not deleting
resourceOffer.Status.VirtualKubeletStatus = sharingv1alpha1.VirtualKubeletStatusCreated
}
return nil
}

// createVirtualKubeletDeployment creates the VirtualKubelet Deployment.
func (r *ResourceOfferReconciler) createVirtualKubeletDeployment(
ctx context.Context, resourceOffer *sharingv1alpha1.ResourceOffer) (controllerutil.OperationResult, error) {
ctx context.Context, resourceOffer *sharingv1alpha1.ResourceOffer) error {
name := virtualKubelet.VirtualKubeletPrefix + resourceOffer.Spec.ClusterId
nodeName := virtualKubelet.VirtualNodePrefix + resourceOffer.Spec.ClusterId

Expand All @@ -125,7 +121,7 @@ func (r *ResourceOfferReconciler) createVirtualKubeletDeployment(
})
if err != nil {
klog.Error(err)
return op, err
return err
}
klog.V(5).Infof("[%v] ServiceAccount %s/%s reconciled: %s", remoteClusterID, vkServiceAccount.Namespace, vkServiceAccount.Name, op)

Expand All @@ -135,7 +131,7 @@ func (r *ResourceOfferReconciler) createVirtualKubeletDeployment(
})
if err != nil {
klog.Error(err)
return op, err
return err
}
klog.V(5).Infof("[%v] ClusterRoleBinding %s reconciled: %s", remoteClusterID, vkClusterRoleBinding.Name, op)

Expand All @@ -145,15 +141,15 @@ func (r *ResourceOfferReconciler) createVirtualKubeletDeployment(
r.initVirtualKubeletImage, nodeName, r.clusterID.GetClusterID())
if err != nil {
klog.Error(err)
return controllerutil.OperationResultNone, err
return err
}

op, err = controllerutil.CreateOrUpdate(context.TODO(), r.Client, vkDeployment, func() error {
return controllerutil.SetControllerReference(resourceOffer, vkDeployment, r.Scheme)
})
if err != nil {
klog.Error(err)
return op, err
return err
}
klog.V(5).Infof("[%v] Deployment %s/%s reconciled: %s", remoteClusterID, vkDeployment.Namespace, vkDeployment.Name, op)

Expand All @@ -163,37 +159,33 @@ func (r *ResourceOfferReconciler) createVirtualKubeletDeployment(
r.eventsRecorder.Event(resourceOffer, "Normal", "VkCreated", msg)
}

return controllerutil.CreateOrPatch(ctx, r.Client, resourceOffer, func() error {
resourceOffer.Status.VirtualKubeletStatus = sharingv1alpha1.VirtualKubeletStatusCreated
return nil
})
resourceOffer.Status.VirtualKubeletStatus = sharingv1alpha1.VirtualKubeletStatusCreated
return nil
}

// deleteVirtualKubeletDeployment deletes the VirtualKubelet Deployment.
func (r *ResourceOfferReconciler) deleteVirtualKubeletDeployment(
ctx context.Context, resourceOffer *sharingv1alpha1.ResourceOffer) (controllerutil.OperationResult, error) {
ctx context.Context, resourceOffer *sharingv1alpha1.ResourceOffer) error {
virtualKubeletDeployment, err := r.getVirtualKubeletDeployment(ctx, resourceOffer)
if err != nil {
klog.Error(err)
return controllerutil.OperationResultNone, err
return err
}
if virtualKubeletDeployment == nil || !virtualKubeletDeployment.DeletionTimestamp.IsZero() {
return controllerutil.OperationResultNone, nil
return nil
}

if err := r.Client.Delete(ctx, virtualKubeletDeployment); err != nil {
klog.Error(err)
return controllerutil.OperationResultNone, err
return err
}

msg := fmt.Sprintf("[%v] Deleting virtual-kubelet in namespace %v", resourceOffer.Spec.ClusterId, resourceOffer.Namespace)
klog.Info(msg)
r.eventsRecorder.Event(resourceOffer, "Normal", "VkDeleted", msg)

return controllerutil.CreateOrPatch(ctx, r.Client, resourceOffer, func() error {
resourceOffer.Status.VirtualKubeletStatus = sharingv1alpha1.VirtualKubeletStatusDeleting
return nil
})
resourceOffer.Status.VirtualKubeletStatus = sharingv1alpha1.VirtualKubeletStatusDeleting
return nil
}

// deleteClusterRoleBinding deletes the ClusterRoleBinding related to a VirtualKubelet if the deployment does not exist.
Expand All @@ -210,8 +202,8 @@ func (r *ResourceOfferReconciler) deleteClusterRoleBinding(

// getVirtualKubeletDeployment returns the VirtualKubelet Deployment given a ResourceOffer.
func (r *ResourceOfferReconciler) getVirtualKubeletDeployment(
ctx context.Context, resourceOffer *sharingv1alpha1.ResourceOffer) (*v1.Deployment, error) {
var deployList v1.DeploymentList
ctx context.Context, resourceOffer *sharingv1alpha1.ResourceOffer) (*appsv1.Deployment, error) {
var deployList appsv1.DeploymentList
labels := forge.VirtualKubeletLabels(resourceOffer.Spec.ClusterId)
if err := r.Client.List(ctx, &deployList, client.MatchingLabels(labels)); err != nil {
klog.Error(err)
Expand Down

0 comments on commit ff236dc

Please sign in to comment.