From 1f90e609c3363228c97e69e7b0ef4695a4222944 Mon Sep 17 00:00:00 2001 From: Alessandro Olivero Date: Mon, 28 Jun 2021 12:37:13 +0000 Subject: [PATCH] resources phase signaling --- .../v1alpha1/resourcerequest_types.go | 11 ++- apis/sharing/v1alpha1/resourceoffer_types.go | 6 ++ .../discovery.liqo.io_resourcerequests.yaml | 16 ++++ .../crds/sharing.liqo.io_resourceoffers.yaml | 8 ++ .../foreign-cluster-controller.go | 50 ++++++++++-- .../foreign-cluster-operator_test.go | 45 ++++++++++- .../resourceRequest.go | 7 +- .../resourceRequest_controller.go | 33 ++++++-- .../resourceRequest_operator_test.go | 35 +++++++++ internal/resource-request-operator/utils.go | 54 ++++++++++++- pkg/consts/resourcePhase.go | 15 ++++ .../resourceoffer_controller.go | 14 +++- .../resourceoffer_controller_methods.go | 19 ++++- .../resourceoffer_controller_test.go | 77 +++++++++++++++---- .../liqoNodeProvider/reconciler.go | 14 +++- 15 files changed, 364 insertions(+), 40 deletions(-) create mode 100644 pkg/consts/resourcePhase.go diff --git a/apis/discovery/v1alpha1/resourcerequest_types.go b/apis/discovery/v1alpha1/resourcerequest_types.go index f3649876c9..f0786eb02f 100644 --- a/apis/discovery/v1alpha1/resourcerequest_types.go +++ b/apis/discovery/v1alpha1/resourcerequest_types.go @@ -6,6 +6,7 @@ import ( "k8s.io/client-go/kubernetes/scheme" advtypes "github.com/liqotech/liqo/apis/sharing/v1alpha1" + "github.com/liqotech/liqo/pkg/consts" crdclient "github.com/liqotech/liqo/pkg/crdClient" object_references "github.com/liqotech/liqo/pkg/object-references" ) @@ -22,15 +23,24 @@ type ResourceRequestSpec struct { ClusterIdentity ClusterIdentity `json:"clusterIdentity"` // Local auth service address AuthURL string `json:"authUrl"` + // OfferPhase indicate the desired state for the ResourceOffer + // +kubebuilder:validation:Enum="Create";"Delete" + // +kubebuilder:default="Create" + OfferPhase consts.ResourceDesiredPhase `json:"offerPhase,omitempty"` } // ResourceRequestStatus defines the observed state of ResourceRequest. type ResourceRequestStatus struct { BroadcasterRef *object_references.DeploymentReference `json:"broadcasterRef,omitempty"` AdvertisementStatus advtypes.AdvPhase `json:"advertisementStatus,omitempty"` + // OfferPhase indicate the observed state for the ResourceOffer + // +kubebuilder:validation:Enum="Created";"Deleted" + // +kubebuilder:default="Deleted" + OfferPhase consts.ResourceObservedPhase `json:"offerPhase,omitempty"` } // +kubebuilder:object:root=true +// +kubebuilder:subresource:status // ResourceRequest is the Schema for the ResourceRequests API. type ResourceRequest struct { @@ -42,7 +52,6 @@ type ResourceRequest struct { } // +kubebuilder:object:root=true -// +kubebuilder:subresource:status // ResourceRequestList contains a list of ResourceRequest. type ResourceRequestList struct { diff --git a/apis/sharing/v1alpha1/resourceoffer_types.go b/apis/sharing/v1alpha1/resourceoffer_types.go index d5f9c6fa01..63dd4d873d 100644 --- a/apis/sharing/v1alpha1/resourceoffer_types.go +++ b/apis/sharing/v1alpha1/resourceoffer_types.go @@ -3,6 +3,8 @@ package v1alpha1 import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/liqotech/liqo/pkg/consts" ) // ResourceOfferSpec defines the desired state of ResourceOffer. @@ -23,6 +25,10 @@ type ResourceOfferSpec struct { // TimeToLive is the time instant until this ResourceOffer will be valid. // If not refreshed, an ResourceOffer will expire after 30 minutes. TimeToLive metav1.Time `json:"timeToLive"` + // VirtualNodePhase indicate the desired state for the virtual node + // +kubebuilder:validation:Enum="Create";"Delete" + // +kubebuilder:default="Create" + VirtualNodePhase consts.ResourceDesiredPhase `json:"virtualKubeletPhase,omitempty"` } // OfferPhase describes the phase of the ResourceOffer. diff --git a/deployments/liqo/crds/discovery.liqo.io_resourcerequests.yaml b/deployments/liqo/crds/discovery.liqo.io_resourcerequests.yaml index 9271c39b70..d24019b73f 100644 --- a/deployments/liqo/crds/discovery.liqo.io_resourcerequests.yaml +++ b/deployments/liqo/crds/discovery.liqo.io_resourcerequests.yaml @@ -52,6 +52,13 @@ spec: required: - clusterID type: object + offerPhase: + default: Create + description: OfferPhase indicate the desired state for the ResourceOffer + enum: + - Create + - Delete + type: string required: - authUrl - clusterIdentity @@ -75,10 +82,19 @@ spec: name must be unique. type: string type: object + offerPhase: + default: Deleted + description: OfferPhase indicate the observed state for the ResourceOffer + enum: + - Created + - Deleted + type: string type: object type: object served: true storage: true + subresources: + status: {} status: acceptedNames: kind: "" diff --git a/deployments/liqo/crds/sharing.liqo.io_resourceoffers.yaml b/deployments/liqo/crds/sharing.liqo.io_resourceoffers.yaml index 671dd6a96f..8a976a1fb9 100644 --- a/deployments/liqo/crds/sharing.liqo.io_resourceoffers.yaml +++ b/deployments/liqo/crds/sharing.liqo.io_resourceoffers.yaml @@ -166,6 +166,14 @@ spec: was created. format: date-time type: string + virtualKubeletPhase: + default: Create + description: VirtualNodePhase indicate the desired state for the virtual + node + enum: + - Create + - Delete + type: string required: - clusterId - timeToLive diff --git a/internal/discovery/foreign-cluster-operator/foreign-cluster-controller.go b/internal/discovery/foreign-cluster-operator/foreign-cluster-controller.go index 2ceef5e2b9..9e7fa10e34 100644 --- a/internal/discovery/foreign-cluster-operator/foreign-cluster-controller.go +++ b/internal/discovery/foreign-cluster-operator/foreign-cluster-controller.go @@ -167,7 +167,8 @@ func (r *ForeignClusterReconciler) Reconcile(ctx context.Context, req ctrl.Reque // ------ (3) peering/unpeering logic ------ // read the ForeignCluster status and ensure the peering state - switch r.getDesiredOutgoingPeeringState(&foreignCluster) { + phase := r.getDesiredOutgoingPeeringState(&foreignCluster) + switch phase { case desiredPeeringPhasePeering: if err = r.peerNamespaced(ctx, &foreignCluster); err != nil { klog.Error(err) @@ -178,6 +179,10 @@ func (r *ForeignClusterReconciler) Reconcile(ctx context.Context, req ctrl.Reque klog.Error(err) return ctrl.Result{}, err } + default: + err := fmt.Errorf("unknown phase %v", phase) + klog.Error(err) + return ctrl.Result{}, err } // ------ (4) update peering conditions ------ @@ -337,14 +342,41 @@ func (r *ForeignClusterReconciler) Unpeer(fc *discoveryv1alpha1.ForeignCluster, // unpeerNamespaced disables the peering deleting the resources in the correct TenantControlNamespace. func (r *ForeignClusterReconciler) unpeerNamespaced(ctx context.Context, foreignCluster *discoveryv1alpha1.ForeignCluster) error { - // resource request has to be removed - err := r.deleteResourceRequest(ctx, foreignCluster) - if err != nil && !errors.IsNotFound(err) { + var resourceRequest discoveryv1alpha1.ResourceRequest + err := r.Client.Get(ctx, types.NamespacedName{ + Namespace: foreignCluster.Status.TenantControlNamespace.Local, + Name: r.clusterID.GetClusterID(), + }, &resourceRequest) + if errors.IsNotFound(err) { + foreignCluster.Status.Outgoing.PeeringPhase = discoveryv1alpha1.PeeringPhaseNone + return nil + } + if err != nil { klog.Error(err) return err } - foreignCluster.Status.Outgoing.PeeringPhase = discoveryv1alpha1.PeeringPhaseDisconnecting + switch resourceRequest.Status.OfferPhase { + case liqoconst.ResourceObservedPhaseCreated: + resourceRequest.Spec.OfferPhase = liqoconst.ResourceDesiredPhaseDelete + err = r.Client.Update(ctx, &resourceRequest) + if err != nil && !errors.IsNotFound(err) { + klog.Error(err) + return err + } + foreignCluster.Status.Outgoing.PeeringPhase = discoveryv1alpha1.PeeringPhaseDisconnecting + case liqoconst.ResourceObservedPhaseDeleted: + err = r.deleteResourceRequest(ctx, foreignCluster) + if err != nil && !errors.IsNotFound(err) { + klog.Error(err) + return err + } + foreignCluster.Status.Outgoing.PeeringPhase = discoveryv1alpha1.PeeringPhaseNone + default: + err := fmt.Errorf("unknown phase %v", resourceRequest.Status.OfferPhase) + klog.Error(err) + return err + } return nil } @@ -400,6 +432,14 @@ func getPeeringPhase(resourceRequestList *discoveryv1alpha1.ResourceRequestList) case 0: return discoveryv1alpha1.PeeringPhaseNone, nil case 1: + resourceRequest := &resourceRequestList.Items[0] + desired := resourceRequest.Spec.OfferPhase + observed := resourceRequest.Status.OfferPhase + if desired == liqoconst.ResourceDesiredPhaseCreate && observed == liqoconst.ResourceObservedPhaseDeleted { + return discoveryv1alpha1.PeeringPhasePending, nil + } else if desired == liqoconst.ResourceDesiredPhaseDelete && observed == liqoconst.ResourceObservedPhaseCreated { + return discoveryv1alpha1.PeeringPhaseDisconnecting, nil + } return discoveryv1alpha1.PeeringPhaseEstablished, nil default: err := fmt.Errorf("more than one resource request found") diff --git a/internal/discovery/foreign-cluster-operator/foreign-cluster-operator_test.go b/internal/discovery/foreign-cluster-operator/foreign-cluster-operator_test.go index b34b057fe8..f1acaf4553 100644 --- a/internal/discovery/foreign-cluster-operator/foreign-cluster-operator_test.go +++ b/internal/discovery/foreign-cluster-operator/foreign-cluster-operator_test.go @@ -19,6 +19,7 @@ import ( discoveryv1alpha1 "github.com/liqotech/liqo/apis/discovery/v1alpha1" crdreplicator "github.com/liqotech/liqo/internal/crdReplicator" "github.com/liqotech/liqo/pkg/clusterid/test" + "github.com/liqotech/liqo/pkg/consts" "github.com/liqotech/liqo/pkg/discovery" identitymanager "github.com/liqotech/liqo/pkg/identityManager" tenantcontrolnamespace "github.com/liqotech/liqo/pkg/tenantControlNamespace" @@ -422,6 +423,7 @@ var _ = Describe("ForeignClusterOperator", func() { // populate the resourcerequest CR c.rr.Name = controller.clusterID.GetClusterID() c.rr.Spec.ClusterIdentity.ClusterID = c.fc.Spec.ClusterIdentity.ClusterID + c.rr.Spec.OfferPhase = consts.ResourceDesiredPhaseCreate c.rr.Labels = resourceRequestLabels(c.fc.Spec.ClusterIdentity.ClusterID) // create the foreigncluster CR @@ -451,6 +453,16 @@ var _ = Describe("ForeignClusterOperator", func() { Expect(ok).To(BeTrue()) Expect(rr).NotTo(BeNil()) + // set the ResourceRequest status to created + rr.Status.OfferPhase = consts.ResourceObservedPhaseCreated + obj, err = controller.crdClient.Resource("resourcerequests").Namespace(tenantNamespace.Name).UpdateStatus(rr.Name, rr, &metav1.UpdateOptions{}) + Expect(err).To(BeNil()) + Expect(obj).NotTo(BeNil()) + + rr, ok = obj.(*discoveryv1alpha1.ResourceRequest) + Expect(ok).To(BeTrue()) + Expect(rr).NotTo(BeNil()) + // disable the peering for that foreigncluster err = controller.unpeerNamespaced(ctx, fc) Expect(err).To(BeNil()) @@ -469,8 +481,37 @@ var _ = Describe("ForeignClusterOperator", func() { Expect(rrs).NotTo(BeNil()) // check that the length of the resource request list is the expected one, - // and the resource request has been deleted in the correct namespace + // and the resource request has been set for deletion in the correct namespace Expect(len(rrs.Items)).To(c.expectedPeeringLength) + if len(rrs.Items) > 0 { + Expect(rrs.Items[0].Spec.OfferPhase).To(Equal(consts.ResourceDesiredPhaseDelete)) + rr = &rrs.Items[0] + } + + // set the ResourceRequest status to deleted + rr.Status.OfferPhase = consts.ResourceObservedPhaseDeleted + obj, err = controller.crdClient.Resource("resourcerequests").Namespace(tenantNamespace.Name).UpdateStatus(rr.Name, rr, &metav1.UpdateOptions{}) + Expect(err).To(BeNil()) + Expect(obj).NotTo(BeNil()) + + rr, ok = obj.(*discoveryv1alpha1.ResourceRequest) + Expect(ok).To(BeTrue()) + Expect(rr).NotTo(BeNil()) + + // call for the second time the unpeer function to delete the ResourceRequest + err = controller.unpeerNamespaced(ctx, fc) + Expect(err).To(BeNil()) + + // get the resource requests in the local tenant namespace + obj, err = controller.crdClient.Resource("resourcerequests").Namespace(tenantNamespace.Name).List(&metav1.ListOptions{}) + Expect(err).To(BeNil()) + Expect(obj).NotTo(BeNil()) + + rrs, ok = obj.(*discoveryv1alpha1.ResourceRequestList) + Expect(ok).To(BeTrue()) + Expect(rrs).NotTo(BeNil()) + + Expect(len(rrs.Items)).To(BeNumerically("==", 0)) }, Entry("unpeer", unpeerTestcase{ @@ -512,7 +553,7 @@ var _ = Describe("ForeignClusterOperator", func() { AuthURL: "", }, }, - expectedPeeringLength: Equal(0), + expectedPeeringLength: Equal(1), expectedOutgoing: Equal(discoveryv1alpha1.Outgoing{ PeeringPhase: discoveryv1alpha1.PeeringPhaseDisconnecting, }), diff --git a/internal/discovery/foreign-cluster-operator/resourceRequest.go b/internal/discovery/foreign-cluster-operator/resourceRequest.go index 7ace057ab4..79a575140e 100644 --- a/internal/discovery/foreign-cluster-operator/resourceRequest.go +++ b/internal/discovery/foreign-cluster-operator/resourceRequest.go @@ -10,6 +10,7 @@ import ( discoveryv1alpha1 "github.com/liqotech/liqo/apis/discovery/v1alpha1" crdreplicator "github.com/liqotech/liqo/internal/crdReplicator" + "github.com/liqotech/liqo/pkg/consts" ) // createResourceRequest creates a resource request to be sent to the specified ForeignCluster. @@ -55,7 +56,8 @@ func (r *ForeignClusterReconciler) createResourceRequest(ctx context.Context, ClusterID: localClusterID, ClusterName: r.ConfigProvider.GetConfig().ClusterName, }, - AuthURL: authURL, + AuthURL: authURL, + OfferPhase: consts.ResourceDesiredPhaseCreate, } return controllerutil.SetControllerReference(foreignCluster, resourceRequest, r.Scheme) @@ -64,7 +66,8 @@ func (r *ForeignClusterReconciler) createResourceRequest(ctx context.Context, klog.Error(err) return controllerutil.OperationResultNone, err } - klog.Infof("[%v] ensured the existence of ResourceRequest (with %v operation)", remoteClusterID, result) + klog.Infof("[%v] ensured the existence of ResourceRequest with phase %v (with %v operation)", + remoteClusterID, result, resourceRequest.Spec.OfferPhase) return result, nil } diff --git a/internal/resource-request-operator/resourceRequest_controller.go b/internal/resource-request-operator/resourceRequest_controller.go index 9c3e3cbcc7..cceab4db50 100644 --- a/internal/resource-request-operator/resourceRequest_controller.go +++ b/internal/resource-request-operator/resourceRequest_controller.go @@ -2,6 +2,7 @@ package resourcerequestoperator import ( "context" + "fmt" "time" "k8s.io/apimachinery/pkg/runtime" @@ -14,6 +15,7 @@ import ( discoveryv1alpha1 "github.com/liqotech/liqo/apis/discovery/v1alpha1" sharingv1alpha1 "github.com/liqotech/liqo/apis/sharing/v1alpha1" crdreplicator "github.com/liqotech/liqo/internal/crdReplicator" + "github.com/liqotech/liqo/pkg/consts" ) // ResourceRequestReconciler reconciles a ResourceRequest object. @@ -37,9 +39,9 @@ const ( // +kubebuilder:rbac:groups=capsule.clastix.io,resources=tenants,verbs=get;list;watch;create;update;patch;delete; // Reconcile is the main function of the controller which reconciles ResourceRequest resources. -func (r *ResourceRequestReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { +func (r *ResourceRequestReconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ctrl.Result, err error) { var resourceRequest discoveryv1alpha1.ResourceRequest - err := r.Get(ctx, req.NamespacedName, &resourceRequest) + err = r.Get(ctx, req.NamespacedName, &resourceRequest) if err != nil { klog.Errorf("unable to get resourceRequest %s: %s", req.NamespacedName, err) return ctrl.Result{}, nil @@ -80,9 +82,30 @@ func (r *ResourceRequestReconciler) Reconcile(ctx context.Context, req ctrl.Requ return ctrl.Result{}, err } - err = r.generateResourceOffer(ctx, &resourceRequest) - if err != nil { - klog.Errorf("%s -> Error generating resourceOffer: %s", remoteClusterID, err) + defer func() { + newErr := r.Client.Status().Update(ctx, &resourceRequest) + if newErr != nil { + klog.Error(newErr) + err = newErr + } + }() + + switch resourceRequest.Spec.OfferPhase { + case consts.ResourceDesiredPhaseCreate: + err = r.generateResourceOffer(ctx, &resourceRequest) + if err != nil { + klog.Errorf("%s -> Error generating resourceOffer: %s", remoteClusterID, err) + return ctrl.Result{}, err + } + case consts.ResourceDesiredPhaseDelete: + err = r.invalidateResourceOffer(ctx, &resourceRequest) + if err != nil { + klog.Errorf("%s -> Error invalidating resourceOffer: %s", remoteClusterID, err) + return ctrl.Result{}, err + } + default: + err = fmt.Errorf("unknown phase %v", resourceRequest.Spec.OfferPhase) + klog.Error(err) return ctrl.Result{}, err } diff --git a/internal/resource-request-operator/resourceRequest_operator_test.go b/internal/resource-request-operator/resourceRequest_operator_test.go index 5a55bbd928..a72893c734 100644 --- a/internal/resource-request-operator/resourceRequest_operator_test.go +++ b/internal/resource-request-operator/resourceRequest_operator_test.go @@ -10,15 +10,18 @@ import ( . "github.com/onsi/gomega/gstruct" corev1 "k8s.io/api/core/v1" rbacv1 "k8s.io/api/rbac/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/klog/v2" resourcehelper "k8s.io/kubectl/pkg/util/resource" "sigs.k8s.io/controller-runtime/pkg/client" discoveryv1alpha1 "github.com/liqotech/liqo/apis/discovery/v1alpha1" sharingv1alpha1 "github.com/liqotech/liqo/apis/sharing/v1alpha1" crdreplicator "github.com/liqotech/liqo/internal/crdReplicator" + "github.com/liqotech/liqo/pkg/consts" "github.com/liqotech/liqo/pkg/discovery" "github.com/liqotech/liqo/pkg/virtualKubelet/forge" ) @@ -159,6 +162,7 @@ func createResourceRequest() *discoveryv1alpha1.ResourceRequest { ClusterIdentity: discoveryv1alpha1.ClusterIdentity{ ClusterID: homeClusterID, }, + OfferPhase: consts.ResourceDesiredPhaseCreate, }, } Expect(k8sClient.Create(ctx, resourceRequest)).Should(Succeed()) @@ -209,6 +213,8 @@ var _ = Describe("ResourceRequest Operator", func() { return resourceRequest.Finalizers }, timeout, interval).Should(ContainElement(tenantFinalizer)) + Expect(resourceRequest.Status.OfferPhase).To(Equal(consts.ResourceObservedPhaseCreated)) + By("Checking Tenant creation") var tenant capsulev1alpha1.Tenant Eventually(func() error { @@ -237,6 +243,7 @@ var _ = Describe("ResourceRequest Operator", func() { Name: offerPrefix + clusterId, Namespace: ResourcesNamespace, } + klog.Info(offerName) Eventually(func() error { return k8sClient.Get(ctx, offerName, createdResourceOffer) }, timeout, interval).ShouldNot(HaveOccurred()) @@ -263,6 +270,34 @@ var _ = Describe("ResourceRequest Operator", func() { Expect(quantity.Cmp(testValue)).Should(BeZero()) } + By("Checking ResourceOffer invalidation on request set deleting phase") + resourceRequest.Spec.OfferPhase = consts.ResourceDesiredPhaseDelete + Expect(k8sClient.Update(ctx, &resourceRequest)).ToNot(HaveOccurred()) + + // set the vk status in the ResourceOffer to created + createdResourceOffer.Status.VirtualKubeletStatus = sharingv1alpha1.VirtualKubeletStatusCreated + createdResourceOffer.Status.Phase = sharingv1alpha1.ResourceOfferAccepted + Expect(k8sClient.Status().Update(ctx, createdResourceOffer)).ToNot(HaveOccurred()) + + var resourceOffer sharingv1alpha1.ResourceOffer + Eventually(func() consts.ResourceDesiredPhase { + if err := k8sClient.Get(ctx, offerName, &resourceOffer); err != nil { + return "" + } + return resourceOffer.Spec.VirtualNodePhase + }, timeout, interval).Should(Equal(consts.ResourceDesiredPhaseDelete)) + + By("Checking ResourceOffer deletion") + // the ResourceOffer should be deleted when the remote VirtualKubelet will be down + Expect(k8sClient.Get(ctx, offerName, createdResourceOffer)).ToNot(HaveOccurred()) + createdResourceOffer.Status.VirtualKubeletStatus = sharingv1alpha1.VirtualKubeletStatusNone + Expect(k8sClient.Status().Update(ctx, createdResourceOffer)).ToNot(HaveOccurred()) + + Eventually(func() bool { + err := k8sClient.Get(ctx, offerName, &resourceOffer) + return apierrors.IsNotFound(err) + }, timeout, interval).Should(BeTrue()) + By("Checking Tenant Deletion") err := k8sClient.Delete(ctx, &resourceRequest) Expect(err).ToNot(HaveOccurred()) diff --git a/internal/resource-request-operator/utils.go b/internal/resource-request-operator/utils.go index cb4a751511..75a8a0cec1 100644 --- a/internal/resource-request-operator/utils.go +++ b/internal/resource-request-operator/utils.go @@ -2,10 +2,13 @@ package resourcerequestoperator import ( "context" + "fmt" "time" corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" @@ -13,6 +16,7 @@ import ( discoveryv1alpha1 "github.com/liqotech/liqo/apis/discovery/v1alpha1" sharingv1alpha1 "github.com/liqotech/liqo/apis/sharing/v1alpha1" crdreplicator "github.com/liqotech/liqo/internal/crdReplicator" + "github.com/liqotech/liqo/pkg/consts" "github.com/liqotech/liqo/pkg/discovery" ) @@ -39,17 +43,20 @@ func (r *ResourceRequestReconciler) generateResourceOffer(ctx context.Context, r ResourceQuota: corev1.ResourceQuotaSpec{ Hard: resources, }, - Timestamp: creationTime, - TimeToLive: metav1.NewTime(creationTime.Add(timeToLive)), + Timestamp: creationTime, + TimeToLive: metav1.NewTime(creationTime.Add(timeToLive)), + VirtualNodePhase: consts.ResourceDesiredPhaseCreate, } offer.Spec = spec return controllerutil.SetControllerReference(request, offer, r.Scheme) }) if err != nil { + klog.Error(err) return err } - klog.Infof("%s -> %s Offer: %s", r.ClusterID, op, offer.ObjectMeta.Name) + request.Status.OfferPhase = consts.ResourceObservedPhaseCreated + klog.Infof("%s -> %s Offer: %s/%s (phase: %v)", r.ClusterID, op, offer.Namespace, offer.Name, offer.Spec.VirtualNodePhase) return nil } @@ -123,3 +130,44 @@ func (r *ResourceRequestReconciler) createForeignCluster(ctx context.Context, resourceRequest.Spec.ClusterIdentity.ClusterID, foreignCluster.Name) return nil } + +func (r *ResourceRequestReconciler) invalidateResourceOffer(ctx context.Context, request *discoveryv1alpha1.ResourceRequest) error { + var offer sharingv1alpha1.ResourceOffer + err := r.Client.Get(ctx, types.NamespacedName{ + Namespace: request.GetNamespace(), + Name: offerPrefix + r.ClusterID, + }, &offer) + if apierrors.IsNotFound(err) { + // ignore not found errors + return nil + } + if err != nil { + klog.Error(err) + return err + } + + switch offer.Status.VirtualKubeletStatus { + case sharingv1alpha1.VirtualKubeletStatusDeleting, sharingv1alpha1.VirtualKubeletStatusCreated: + offer.Spec.VirtualNodePhase = consts.ResourceDesiredPhaseDelete + err = client.IgnoreNotFound(r.Client.Update(ctx, &offer)) + if err != nil { + klog.Error(err) + return err + } + klog.Infof("%s -> Offer: %s/%s (phase: %v)", r.ClusterID, offer.Namespace, offer.Name, offer.Spec.VirtualNodePhase) + return nil + case sharingv1alpha1.VirtualKubeletStatusNone: + err = client.IgnoreNotFound(r.Client.Delete(ctx, &offer)) + if err != nil { + klog.Error(err) + return err + } + request.Status.OfferPhase = consts.ResourceObservedPhaseDeleted + klog.Infof("%s -> Deleted Offer: %s/%s", r.ClusterID, offer.Namespace, offer.Name) + return nil + default: + err := fmt.Errorf("unknown VirtualKubeletStatus %v", offer.Status.VirtualKubeletStatus) + klog.Error(err) + return err + } +} diff --git a/pkg/consts/resourcePhase.go b/pkg/consts/resourcePhase.go new file mode 100644 index 0000000000..8e889d4c9c --- /dev/null +++ b/pkg/consts/resourcePhase.go @@ -0,0 +1,15 @@ +package consts + +type ResourceDesiredPhase string + +type ResourceObservedPhase string + +const ( + ResourceDesiredPhaseCreate ResourceDesiredPhase = "Create" + ResourceDesiredPhaseDelete ResourceDesiredPhase = "Delete" +) + +const ( + ResourceObservedPhaseCreated ResourceObservedPhase = "Created" + ResourceObservedPhaseDeleted ResourceObservedPhase = "Deleted" +) diff --git a/pkg/liqo-controller-manager/resourceoffer-controller/resourceoffer_controller.go b/pkg/liqo-controller-manager/resourceoffer-controller/resourceoffer_controller.go index 00b2498ed4..c6c7e504e1 100644 --- a/pkg/liqo-controller-manager/resourceoffer-controller/resourceoffer_controller.go +++ b/pkg/liqo-controller-manager/resourceoffer-controller/resourceoffer_controller.go @@ -129,13 +129,25 @@ func (r *ResourceOfferReconciler) Reconcile(ctx context.Context, req ctrl.Reques } } - if canDeleteVirtualKubeletDeployment(&resourceOffer) { + deletingPhase := getDeleteVirtualKubeletPhase(&resourceOffer) + switch deletingPhase { + case kubeletDeletePhaseNodeDeleted: // delete virtual kubelet deployment if err = r.deleteVirtualKubeletDeployment(ctx, &resourceOffer); err != nil { klog.Error(err) return ctrl.Result{}, err } + resourceOffer.Status.VirtualKubeletStatus = sharingv1alpha1.VirtualKubeletStatusNone return result, nil + case kubeletDeletePhaseDrainingNode: + // set virtual kubelet in deleting phase + resourceOffer.Status.VirtualKubeletStatus = sharingv1alpha1.VirtualKubeletStatusDeleting + case kubeletDeletePhaseNone: + break + default: + err = fmt.Errorf("unknown deleting phase %v", deletingPhase) + klog.Error(err) + return result, err } // create the virtual kubelet deployment diff --git a/pkg/liqo-controller-manager/resourceoffer-controller/resourceoffer_controller_methods.go b/pkg/liqo-controller-manager/resourceoffer-controller/resourceoffer_controller_methods.go index 7278d77b82..ca1ad92bd1 100644 --- a/pkg/liqo-controller-manager/resourceoffer-controller/resourceoffer_controller_methods.go +++ b/pkg/liqo-controller-manager/resourceoffer-controller/resourceoffer_controller_methods.go @@ -225,11 +225,26 @@ func (r *ResourceOfferReconciler) getVirtualKubeletDeployment( return &deployList.Items[0], nil } -func canDeleteVirtualKubeletDeployment(resourceOffer *sharingv1alpha1.ResourceOffer) bool { +type kubeletDeletePhase string + +const ( + kubeletDeletePhaseNone kubeletDeletePhase = "None" + kubeletDeletePhaseDrainingNode kubeletDeletePhase = "DrainingNode" + kubeletDeletePhaseNodeDeleted kubeletDeletePhase = "NodeDeleted" +) + +func getDeleteVirtualKubeletPhase(resourceOffer *sharingv1alpha1.ResourceOffer) kubeletDeletePhase { notAccepted := !isAccepted(resourceOffer) deleting := !resourceOffer.DeletionTimestamp.IsZero() + desiredDelete := resourceOffer.Spec.VirtualNodePhase == consts.ResourceDesiredPhaseDelete nodeDrained := !controllerutil.ContainsFinalizer(resourceOffer, consts.NodeFinalizer) - return (notAccepted || deleting) && nodeDrained + if notAccepted || deleting || desiredDelete { + if nodeDrained { + return kubeletDeletePhaseNodeDeleted + } + return kubeletDeletePhaseDrainingNode + } + return kubeletDeletePhaseNone } // isAccepted checks if a ResourceOffer is in Accepted phase. diff --git a/pkg/liqo-controller-manager/resourceoffer-controller/resourceoffer_controller_test.go b/pkg/liqo-controller-manager/resourceoffer-controller/resourceoffer_controller_test.go index da83cf28d8..60ad19c8dd 100644 --- a/pkg/liqo-controller-manager/resourceoffer-controller/resourceoffer_controller_test.go +++ b/pkg/liqo-controller-manager/resourceoffer-controller/resourceoffer_controller_test.go @@ -311,44 +311,50 @@ var _ = Describe("ResourceOffer Controller", func() { var _ = Describe("ResourceOffer Operator util functions", func() { - Context("canDeleteVirtualKubeletDeployment", func() { + Context("getDeleteVirtualKubeletPhase", func() { - type canDeleteVirtualKubeletDeploymentTestcase struct { + type getDeleteVirtualKubeletPhaseTestcase struct { resourceOffer *sharingv1alpha1.ResourceOffer expected OmegaMatcher } - DescribeTable("canDeleteVirtualKubeletDeployment table", + DescribeTable("getDeleteVirtualKubeletPhase table", - func(c canDeleteVirtualKubeletDeploymentTestcase) { - Expect(canDeleteVirtualKubeletDeployment(c.resourceOffer)).To(c.expected) + func(c getDeleteVirtualKubeletPhaseTestcase) { + Expect(getDeleteVirtualKubeletPhase(c.resourceOffer)).To(c.expected) }, - Entry("refused ResourceOffer", canDeleteVirtualKubeletDeploymentTestcase{ + Entry("refused ResourceOffer", getDeleteVirtualKubeletPhaseTestcase{ resourceOffer: &sharingv1alpha1.ResourceOffer{ ObjectMeta: metav1.ObjectMeta{ Finalizers: []string{}, }, + Spec: sharingv1alpha1.ResourceOfferSpec{ + VirtualNodePhase: consts.ResourceDesiredPhaseDelete, + }, Status: sharingv1alpha1.ResourceOfferStatus{ Phase: sharingv1alpha1.ResourceOfferRefused, }, }, - expected: BeTrue(), + expected: Equal(kubeletDeletePhaseNodeDeleted), }), - Entry("accepted ResourceOffer", canDeleteVirtualKubeletDeploymentTestcase{ + Entry("accepted ResourceOffer", getDeleteVirtualKubeletPhaseTestcase{ resourceOffer: &sharingv1alpha1.ResourceOffer{ ObjectMeta: metav1.ObjectMeta{ Finalizers: []string{}, }, + Spec: sharingv1alpha1.ResourceOfferSpec{ + VirtualNodePhase: consts.ResourceDesiredPhaseCreate, + }, Status: sharingv1alpha1.ResourceOfferStatus{ Phase: sharingv1alpha1.ResourceOfferAccepted, }, }, - expected: BeFalse(), + expected: Equal(kubeletDeletePhaseNone), }), - Entry("accepted ResourceOffer with deletion timestamp", canDeleteVirtualKubeletDeploymentTestcase{ + Entry("accepted ResourceOffer with deletion timestamp", getDeleteVirtualKubeletPhaseTestcase{ resourceOffer: &sharingv1alpha1.ResourceOffer{ ObjectMeta: metav1.ObjectMeta{ DeletionTimestamp: &metav1.Time{ @@ -356,28 +362,34 @@ var _ = Describe("ResourceOffer Operator util functions", func() { }, Finalizers: []string{}, }, + Spec: sharingv1alpha1.ResourceOfferSpec{ + VirtualNodePhase: consts.ResourceDesiredPhaseCreate, + }, Status: sharingv1alpha1.ResourceOfferStatus{ Phase: sharingv1alpha1.ResourceOfferAccepted, }, }, - expected: BeTrue(), + expected: Equal(kubeletDeletePhaseNodeDeleted), }), - Entry("refused ResourceOffer with finalizer", canDeleteVirtualKubeletDeploymentTestcase{ + Entry("refused ResourceOffer with finalizer", getDeleteVirtualKubeletPhaseTestcase{ resourceOffer: &sharingv1alpha1.ResourceOffer{ ObjectMeta: metav1.ObjectMeta{ Finalizers: []string{ consts.NodeFinalizer, }, }, + Spec: sharingv1alpha1.ResourceOfferSpec{ + VirtualNodePhase: consts.ResourceDesiredPhaseCreate, + }, Status: sharingv1alpha1.ResourceOfferStatus{ Phase: sharingv1alpha1.ResourceOfferRefused, }, }, - expected: BeFalse(), + expected: Equal(kubeletDeletePhaseDrainingNode), }), - Entry("accepted ResourceOffer with deletion timestamp and finalizer", canDeleteVirtualKubeletDeploymentTestcase{ + Entry("accepted ResourceOffer with deletion timestamp and finalizer", getDeleteVirtualKubeletPhaseTestcase{ resourceOffer: &sharingv1alpha1.ResourceOffer{ ObjectMeta: metav1.ObjectMeta{ DeletionTimestamp: &metav1.Time{ @@ -387,11 +399,46 @@ var _ = Describe("ResourceOffer Operator util functions", func() { consts.NodeFinalizer, }, }, + Spec: sharingv1alpha1.ResourceOfferSpec{ + VirtualNodePhase: consts.ResourceDesiredPhaseCreate, + }, + Status: sharingv1alpha1.ResourceOfferStatus{ + Phase: sharingv1alpha1.ResourceOfferAccepted, + }, + }, + expected: Equal(kubeletDeletePhaseDrainingNode), + }), + + Entry("desired deletion of ResourceOffer", getDeleteVirtualKubeletPhaseTestcase{ + resourceOffer: &sharingv1alpha1.ResourceOffer{ + ObjectMeta: metav1.ObjectMeta{ + Finalizers: []string{ + consts.NodeFinalizer, + }, + }, + Spec: sharingv1alpha1.ResourceOfferSpec{ + VirtualNodePhase: consts.ResourceDesiredPhaseDelete, + }, + Status: sharingv1alpha1.ResourceOfferStatus{ + Phase: sharingv1alpha1.ResourceOfferAccepted, + }, + }, + expected: Equal(kubeletDeletePhaseDrainingNode), + }), + + Entry("desired deletion of ResourceOffer without finalizer", getDeleteVirtualKubeletPhaseTestcase{ + resourceOffer: &sharingv1alpha1.ResourceOffer{ + ObjectMeta: metav1.ObjectMeta{ + Finalizers: []string{}, + }, + Spec: sharingv1alpha1.ResourceOfferSpec{ + VirtualNodePhase: consts.ResourceDesiredPhaseDelete, + }, Status: sharingv1alpha1.ResourceOfferStatus{ Phase: sharingv1alpha1.ResourceOfferAccepted, }, }, - expected: BeFalse(), + expected: Equal(kubeletDeletePhaseNodeDeleted), }), ) diff --git a/pkg/virtualKubelet/liqoNodeProvider/reconciler.go b/pkg/virtualKubelet/liqoNodeProvider/reconciler.go index 1a6d164e43..aef6360e9d 100644 --- a/pkg/virtualKubelet/liqoNodeProvider/reconciler.go +++ b/pkg/virtualKubelet/liqoNodeProvider/reconciler.go @@ -27,6 +27,12 @@ import ( "github.com/liqotech/liqo/pkg/utils" ) +func isResourceOfferTerminating(resourceOffer *sharingv1alpha1.ResourceOffer) bool { + hasTimestamp := !resourceOffer.DeletionTimestamp.IsZero() + desiredDelete := resourceOffer.Spec.VirtualNodePhase == consts.ResourceDesiredPhaseDelete + return hasTimestamp || desiredDelete +} + // The reconciliation function; every time this function is called, // the node status is updated by means of r.updateFromResourceOffer. func (p *LiqoNodeProvider) reconcileNodeFromResourceOffer(event watch.Event) error { @@ -40,7 +46,7 @@ func (p *LiqoNodeProvider) reconcileNodeFromResourceOffer(event watch.Event) err return err } - if event.Type == watch.Deleted || !resourceOffer.DeletionTimestamp.IsZero() { + if event.Type == watch.Deleted || isResourceOfferTerminating(&resourceOffer) { p.updateMutex.Lock() defer p.updateMutex.Unlock() klog.Infof("resourceOffer %v is going to be deleted... set node status not ready", resourceOffer.Name) @@ -331,12 +337,12 @@ func (p *LiqoNodeProvider) updateNode() error { func (p *LiqoNodeProvider) handleResourceOfferDelete(resourceOffer *sharingv1alpha1.ResourceOffer) error { ctx := context.TODO() - if err := p.cordonNode(ctx); err != nil { + if err := client.IgnoreNotFound(p.cordonNode(ctx)); err != nil { klog.Errorf("error cordoning node: %v", err) return err } - if err := p.drainNode(ctx); err != nil { + if err := client.IgnoreNotFound(p.drainNode(ctx)); err != nil { klog.Errorf("error draining node: %v", err) return err } @@ -346,7 +352,7 @@ func (p *LiqoNodeProvider) handleResourceOfferDelete(resourceOffer *sharingv1alp } // delete the node - if err := p.client.CoreV1().Nodes().Delete(ctx, p.node.GetName(), metav1.DeleteOptions{}); err != nil && !kerrors.IsNotFound(err) { + if err := client.IgnoreNotFound(p.client.CoreV1().Nodes().Delete(ctx, p.node.GetName(), metav1.DeleteOptions{})); err != nil { klog.Errorf("error deleting node: %v", err) return err }