From 4a2184162c37137ef78f4e22770368cec99e8e0b Mon Sep 17 00:00:00 2001 From: Alessandro Olivero Date: Mon, 28 Jun 2021 12:37:13 +0000 Subject: [PATCH] resources phase explicit signaling allows peering-related resources (i.e. ResourceRequest and ResourceOffers) to have an explicit mechanism to indicate the desired state (Create os Delete) handled by the home cluster, and the observed state (Created od Deleted) handled by the remote cluster. In this way, we are able the handle a graceful deletion of these resources and to have a graceful peering tear down. The VirtualKublet can now have the time required for a completed clean-up of the remote cluster. --- .../v1alpha1/resourcerequest_types.go | 6 +- .../v1alpha1/zz_generated.deepcopy.go | 10 ++- apis/sharing/v1alpha1/resourceoffer_types.go | 2 + .../sharing/v1alpha1/zz_generated.deepcopy.go | 4 ++ .../discovery.liqo.io_resourcerequests.yaml | 12 ++++ .../crds/sharing.liqo.io_resourceoffers.yaml | 5 ++ .../foreign-cluster-controller.go | 46 ++++++++++-- .../foreign-cluster-operator_test.go | 44 +++++++++++- .../resourceRequest.go | 3 +- .../resourceRequest_controller.go | 28 ++++++-- .../resourceRequest_operator_test.go | 37 ++++++++++ internal/resource-request-operator/utils.go | 53 +++++++++++++- .../resourceoffer_controller.go | 14 +++- .../resourceoffer_controller_methods.go | 31 +++++++- .../resourceoffer_controller_test.go | 71 +++++++++++++++---- .../liqoNodeProvider/reconciler.go | 14 ++-- 16 files changed, 341 insertions(+), 39 deletions(-) diff --git a/apis/discovery/v1alpha1/resourcerequest_types.go b/apis/discovery/v1alpha1/resourcerequest_types.go index f3649876c9..b8cde96afc 100644 --- a/apis/discovery/v1alpha1/resourcerequest_types.go +++ b/apis/discovery/v1alpha1/resourcerequest_types.go @@ -22,15 +22,20 @@ type ResourceRequestSpec struct { ClusterIdentity ClusterIdentity `json:"clusterIdentity"` // Local auth service address AuthURL string `json:"authUrl"` + // WithdrawalTimestamp is set when a graceful deletion is requested by the user. + WithdrawalTimestamp *metav1.Time `json:"withdrawalTimestamp,omitempty"` } // ResourceRequestStatus defines the observed state of ResourceRequest. type ResourceRequestStatus struct { BroadcasterRef *object_references.DeploymentReference `json:"broadcasterRef,omitempty"` AdvertisementStatus advtypes.AdvPhase `json:"advertisementStatus,omitempty"` + // OfferWithdrawalTimestamp is the withdrawal timestamp of the child ResourceOffer resource. + OfferWithdrawalTimestamp *metav1.Time `json:"offerWithdrawalTimestamp,omitempty"` } // +kubebuilder:object:root=true +// +kubebuilder:subresource:status // ResourceRequest is the Schema for the ResourceRequests API. type ResourceRequest struct { @@ -42,7 +47,6 @@ type ResourceRequest struct { } // +kubebuilder:object:root=true -// +kubebuilder:subresource:status // ResourceRequestList contains a list of ResourceRequest. type ResourceRequestList struct { diff --git a/apis/discovery/v1alpha1/zz_generated.deepcopy.go b/apis/discovery/v1alpha1/zz_generated.deepcopy.go index 06a7356e02..6cad53cddb 100644 --- a/apis/discovery/v1alpha1/zz_generated.deepcopy.go +++ b/apis/discovery/v1alpha1/zz_generated.deepcopy.go @@ -327,7 +327,7 @@ func (in *ResourceRequest) DeepCopyInto(out *ResourceRequest) { *out = *in out.TypeMeta = in.TypeMeta in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) - out.Spec = in.Spec + in.Spec.DeepCopyInto(&out.Spec) in.Status.DeepCopyInto(&out.Status) } @@ -385,6 +385,10 @@ func (in *ResourceRequestList) DeepCopyObject() runtime.Object { func (in *ResourceRequestSpec) DeepCopyInto(out *ResourceRequestSpec) { *out = *in out.ClusterIdentity = in.ClusterIdentity + if in.WithdrawalTimestamp != nil { + in, out := &in.WithdrawalTimestamp, &out.WithdrawalTimestamp + *out = (*in).DeepCopy() + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ResourceRequestSpec. @@ -405,6 +409,10 @@ func (in *ResourceRequestStatus) DeepCopyInto(out *ResourceRequestStatus) { *out = new(object_references.DeploymentReference) **out = **in } + if in.OfferWithdrawalTimestamp != nil { + in, out := &in.OfferWithdrawalTimestamp, &out.OfferWithdrawalTimestamp + *out = (*in).DeepCopy() + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ResourceRequestStatus. diff --git a/apis/sharing/v1alpha1/resourceoffer_types.go b/apis/sharing/v1alpha1/resourceoffer_types.go index d5f9c6fa01..e365d1b8b2 100644 --- a/apis/sharing/v1alpha1/resourceoffer_types.go +++ b/apis/sharing/v1alpha1/resourceoffer_types.go @@ -23,6 +23,8 @@ type ResourceOfferSpec struct { // TimeToLive is the time instant until this ResourceOffer will be valid. // If not refreshed, an ResourceOffer will expire after 30 minutes. TimeToLive metav1.Time `json:"timeToLive"` + // WithdrawalTimestamp is set when a graceful deletion is requested by the user. + WithdrawalTimestamp *metav1.Time `json:"withdrawalTimestamp,omitempty"` } // OfferPhase describes the phase of the ResourceOffer. diff --git a/apis/sharing/v1alpha1/zz_generated.deepcopy.go b/apis/sharing/v1alpha1/zz_generated.deepcopy.go index 93747dc9c1..b440a4e136 100644 --- a/apis/sharing/v1alpha1/zz_generated.deepcopy.go +++ b/apis/sharing/v1alpha1/zz_generated.deepcopy.go @@ -252,6 +252,10 @@ func (in *ResourceOfferSpec) DeepCopyInto(out *ResourceOfferSpec) { } in.Timestamp.DeepCopyInto(&out.Timestamp) in.TimeToLive.DeepCopyInto(&out.TimeToLive) + if in.WithdrawalTimestamp != nil { + in, out := &in.WithdrawalTimestamp, &out.WithdrawalTimestamp + *out = (*in).DeepCopy() + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ResourceOfferSpec. diff --git a/deployments/liqo/crds/discovery.liqo.io_resourcerequests.yaml b/deployments/liqo/crds/discovery.liqo.io_resourcerequests.yaml index 9271c39b70..30b18ca5e8 100644 --- a/deployments/liqo/crds/discovery.liqo.io_resourcerequests.yaml +++ b/deployments/liqo/crds/discovery.liqo.io_resourcerequests.yaml @@ -52,6 +52,11 @@ spec: required: - clusterID type: object + withdrawalTimestamp: + description: WithdrawalTimestamp is set when a graceful deletion is + requested by the user. + format: date-time + type: string required: - authUrl - clusterIdentity @@ -75,10 +80,17 @@ spec: name must be unique. type: string type: object + offerWithdrawalTimestamp: + description: OfferWithdrawalTimestamp is the withdrawal timestamp + of the child ResourceOffer resource. + format: date-time + type: string type: object type: object served: true storage: true + subresources: + status: {} status: acceptedNames: kind: "" diff --git a/deployments/liqo/crds/sharing.liqo.io_resourceoffers.yaml b/deployments/liqo/crds/sharing.liqo.io_resourceoffers.yaml index 671dd6a96f..065991994e 100644 --- a/deployments/liqo/crds/sharing.liqo.io_resourceoffers.yaml +++ b/deployments/liqo/crds/sharing.liqo.io_resourceoffers.yaml @@ -166,6 +166,11 @@ spec: was created. format: date-time type: string + withdrawalTimestamp: + description: WithdrawalTimestamp is set when a graceful deletion is + requested by the user. + format: date-time + type: string required: - clusterId - timeToLive diff --git a/internal/discovery/foreign-cluster-operator/foreign-cluster-controller.go b/internal/discovery/foreign-cluster-operator/foreign-cluster-controller.go index 2ceef5e2b9..92c0858553 100644 --- a/internal/discovery/foreign-cluster-operator/foreign-cluster-controller.go +++ b/internal/discovery/foreign-cluster-operator/foreign-cluster-controller.go @@ -167,7 +167,8 @@ func (r *ForeignClusterReconciler) Reconcile(ctx context.Context, req ctrl.Reque // ------ (3) peering/unpeering logic ------ // read the ForeignCluster status and ensure the peering state - switch r.getDesiredOutgoingPeeringState(&foreignCluster) { + phase := r.getDesiredOutgoingPeeringState(&foreignCluster) + switch phase { case desiredPeeringPhasePeering: if err = r.peerNamespaced(ctx, &foreignCluster); err != nil { klog.Error(err) @@ -178,6 +179,10 @@ func (r *ForeignClusterReconciler) Reconcile(ctx context.Context, req ctrl.Reque klog.Error(err) return ctrl.Result{}, err } + default: + err := fmt.Errorf("unknown phase %v", phase) + klog.Error(err) + return ctrl.Result{}, err } // ------ (4) update peering conditions ------ @@ -337,14 +342,39 @@ func (r *ForeignClusterReconciler) Unpeer(fc *discoveryv1alpha1.ForeignCluster, // unpeerNamespaced disables the peering deleting the resources in the correct TenantControlNamespace. func (r *ForeignClusterReconciler) unpeerNamespaced(ctx context.Context, foreignCluster *discoveryv1alpha1.ForeignCluster) error { - // resource request has to be removed - err := r.deleteResourceRequest(ctx, foreignCluster) - if err != nil && !errors.IsNotFound(err) { + var resourceRequest discoveryv1alpha1.ResourceRequest + err := r.Client.Get(ctx, types.NamespacedName{ + Namespace: foreignCluster.Status.TenantControlNamespace.Local, + Name: r.clusterID.GetClusterID(), + }, &resourceRequest) + if errors.IsNotFound(err) { + foreignCluster.Status.Outgoing.PeeringPhase = discoveryv1alpha1.PeeringPhaseNone + return nil + } + if err != nil { klog.Error(err) return err } - foreignCluster.Status.Outgoing.PeeringPhase = discoveryv1alpha1.PeeringPhaseDisconnecting + if resourceRequest.Status.OfferWithdrawalTimestamp.IsZero() { + if resourceRequest.Spec.WithdrawalTimestamp.IsZero() { + now := metav1.Now() + resourceRequest.Spec.WithdrawalTimestamp = &now + } + err = r.Client.Update(ctx, &resourceRequest) + if err != nil && !errors.IsNotFound(err) { + klog.Error(err) + return err + } + foreignCluster.Status.Outgoing.PeeringPhase = discoveryv1alpha1.PeeringPhaseDisconnecting + } else { + err = r.deleteResourceRequest(ctx, foreignCluster) + if err != nil && !errors.IsNotFound(err) { + klog.Error(err) + return err + } + foreignCluster.Status.Outgoing.PeeringPhase = discoveryv1alpha1.PeeringPhaseNone + } return nil } @@ -400,6 +430,12 @@ func getPeeringPhase(resourceRequestList *discoveryv1alpha1.ResourceRequestList) case 0: return discoveryv1alpha1.PeeringPhaseNone, nil case 1: + resourceRequest := &resourceRequestList.Items[0] + desiredDelete := !resourceRequest.Spec.WithdrawalTimestamp.IsZero() + deleted := !resourceRequest.Status.OfferWithdrawalTimestamp.IsZero() + if desiredDelete && !deleted { + return discoveryv1alpha1.PeeringPhaseDisconnecting, nil + } return discoveryv1alpha1.PeeringPhaseEstablished, nil default: err := fmt.Errorf("more than one resource request found") diff --git a/internal/discovery/foreign-cluster-operator/foreign-cluster-operator_test.go b/internal/discovery/foreign-cluster-operator/foreign-cluster-operator_test.go index b34b057fe8..a17fbdac4e 100644 --- a/internal/discovery/foreign-cluster-operator/foreign-cluster-operator_test.go +++ b/internal/discovery/foreign-cluster-operator/foreign-cluster-operator_test.go @@ -58,6 +58,8 @@ var _ = Describe("ForeignClusterOperator", func() { mgr manager.Manager ctx context.Context cancel context.CancelFunc + + now = metav1.Now() ) BeforeEach(func() { @@ -451,6 +453,15 @@ var _ = Describe("ForeignClusterOperator", func() { Expect(ok).To(BeTrue()) Expect(rr).NotTo(BeNil()) + // set the ResourceRequest status to created + obj, err = controller.crdClient.Resource("resourcerequests").Namespace(tenantNamespace.Name).UpdateStatus(rr.Name, rr, &metav1.UpdateOptions{}) + Expect(err).To(BeNil()) + Expect(obj).NotTo(BeNil()) + + rr, ok = obj.(*discoveryv1alpha1.ResourceRequest) + Expect(ok).To(BeTrue()) + Expect(rr).NotTo(BeNil()) + // disable the peering for that foreigncluster err = controller.unpeerNamespaced(ctx, fc) Expect(err).To(BeNil()) @@ -469,8 +480,37 @@ var _ = Describe("ForeignClusterOperator", func() { Expect(rrs).NotTo(BeNil()) // check that the length of the resource request list is the expected one, - // and the resource request has been deleted in the correct namespace + // and the resource request has been set for deletion in the correct namespace Expect(len(rrs.Items)).To(c.expectedPeeringLength) + if len(rrs.Items) > 0 { + Expect(rrs.Items[0].Spec.WithdrawalTimestamp.IsZero()).To(BeFalse()) + rr = &rrs.Items[0] + } + + // set the ResourceRequest status to deleted + rr.Status.OfferWithdrawalTimestamp = &now + obj, err = controller.crdClient.Resource("resourcerequests").Namespace(tenantNamespace.Name).UpdateStatus(rr.Name, rr, &metav1.UpdateOptions{}) + Expect(err).To(BeNil()) + Expect(obj).NotTo(BeNil()) + + rr, ok = obj.(*discoveryv1alpha1.ResourceRequest) + Expect(ok).To(BeTrue()) + Expect(rr).NotTo(BeNil()) + + // call for the second time the unpeer function to delete the ResourceRequest + err = controller.unpeerNamespaced(ctx, fc) + Expect(err).To(BeNil()) + + // get the resource requests in the local tenant namespace + obj, err = controller.crdClient.Resource("resourcerequests").Namespace(tenantNamespace.Name).List(&metav1.ListOptions{}) + Expect(err).To(BeNil()) + Expect(obj).NotTo(BeNil()) + + rrs, ok = obj.(*discoveryv1alpha1.ResourceRequestList) + Expect(ok).To(BeTrue()) + Expect(rrs).NotTo(BeNil()) + + Expect(len(rrs.Items)).To(BeNumerically("==", 0)) }, Entry("unpeer", unpeerTestcase{ @@ -512,7 +552,7 @@ var _ = Describe("ForeignClusterOperator", func() { AuthURL: "", }, }, - expectedPeeringLength: Equal(0), + expectedPeeringLength: Equal(1), expectedOutgoing: Equal(discoveryv1alpha1.Outgoing{ PeeringPhase: discoveryv1alpha1.PeeringPhaseDisconnecting, }), diff --git a/internal/discovery/foreign-cluster-operator/resourceRequest.go b/internal/discovery/foreign-cluster-operator/resourceRequest.go index 7ace057ab4..cd23b60e5d 100644 --- a/internal/discovery/foreign-cluster-operator/resourceRequest.go +++ b/internal/discovery/foreign-cluster-operator/resourceRequest.go @@ -64,7 +64,8 @@ func (r *ForeignClusterReconciler) createResourceRequest(ctx context.Context, klog.Error(err) return controllerutil.OperationResultNone, err } - klog.Infof("[%v] ensured the existence of ResourceRequest (with %v operation)", remoteClusterID, result) + klog.Infof("[%v] ensured the existence of ResourceRequest (with %v operation)", + remoteClusterID, result) return result, nil } diff --git a/internal/resource-request-operator/resourceRequest_controller.go b/internal/resource-request-operator/resourceRequest_controller.go index 9c3e3cbcc7..8c71e47437 100644 --- a/internal/resource-request-operator/resourceRequest_controller.go +++ b/internal/resource-request-operator/resourceRequest_controller.go @@ -37,9 +37,9 @@ const ( // +kubebuilder:rbac:groups=capsule.clastix.io,resources=tenants,verbs=get;list;watch;create;update;patch;delete; // Reconcile is the main function of the controller which reconciles ResourceRequest resources. -func (r *ResourceRequestReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { +func (r *ResourceRequestReconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ctrl.Result, err error) { var resourceRequest discoveryv1alpha1.ResourceRequest - err := r.Get(ctx, req.NamespacedName, &resourceRequest) + err = r.Get(ctx, req.NamespacedName, &resourceRequest) if err != nil { klog.Errorf("unable to get resourceRequest %s: %s", req.NamespacedName, err) return ctrl.Result{}, nil @@ -80,10 +80,26 @@ func (r *ResourceRequestReconciler) Reconcile(ctx context.Context, req ctrl.Requ return ctrl.Result{}, err } - err = r.generateResourceOffer(ctx, &resourceRequest) - if err != nil { - klog.Errorf("%s -> Error generating resourceOffer: %s", remoteClusterID, err) - return ctrl.Result{}, err + defer func() { + newErr := r.Client.Status().Update(ctx, &resourceRequest) + if newErr != nil { + klog.Error(newErr) + err = newErr + } + }() + + if resourceRequest.Spec.WithdrawalTimestamp.IsZero() { + err = r.generateResourceOffer(ctx, &resourceRequest) + if err != nil { + klog.Errorf("%s -> Error generating resourceOffer: %s", remoteClusterID, err) + return ctrl.Result{}, err + } + } else { + err = r.invalidateResourceOffer(ctx, &resourceRequest) + if err != nil { + klog.Errorf("%s -> Error invalidating resourceOffer: %s", remoteClusterID, err) + return ctrl.Result{}, err + } } return ctrl.Result{}, nil diff --git a/internal/resource-request-operator/resourceRequest_operator_test.go b/internal/resource-request-operator/resourceRequest_operator_test.go index 5a55bbd928..34448fb533 100644 --- a/internal/resource-request-operator/resourceRequest_operator_test.go +++ b/internal/resource-request-operator/resourceRequest_operator_test.go @@ -10,9 +10,11 @@ import ( . "github.com/onsi/gomega/gstruct" corev1 "k8s.io/api/core/v1" rbacv1 "k8s.io/api/rbac/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/klog/v2" resourcehelper "k8s.io/kubectl/pkg/util/resource" "sigs.k8s.io/controller-runtime/pkg/client" @@ -31,6 +33,10 @@ const ( homeClusterID = "2468825c-0f62-44d7-bed1-9a7bc331c0b0" ) +var ( + now = metav1.Now() +) + func createTestNodes() (*corev1.Node, *corev1.Node) { resources := corev1.ResourceList{} resources[corev1.ResourceCPU] = *resource.NewQuantity(2, resource.DecimalSI) @@ -209,6 +215,8 @@ var _ = Describe("ResourceRequest Operator", func() { return resourceRequest.Finalizers }, timeout, interval).Should(ContainElement(tenantFinalizer)) + Expect(resourceRequest.Status.OfferWithdrawalTimestamp.IsZero()).To(BeTrue()) + By("Checking Tenant creation") var tenant capsulev1alpha1.Tenant Eventually(func() error { @@ -237,6 +245,7 @@ var _ = Describe("ResourceRequest Operator", func() { Name: offerPrefix + clusterId, Namespace: ResourcesNamespace, } + klog.Info(offerName) Eventually(func() error { return k8sClient.Get(ctx, offerName, createdResourceOffer) }, timeout, interval).ShouldNot(HaveOccurred()) @@ -263,6 +272,34 @@ var _ = Describe("ResourceRequest Operator", func() { Expect(quantity.Cmp(testValue)).Should(BeZero()) } + By("Checking ResourceOffer invalidation on request set deleting phase") + resourceRequest.Spec.WithdrawalTimestamp = &now + Expect(k8sClient.Update(ctx, &resourceRequest)).ToNot(HaveOccurred()) + + // set the vk status in the ResourceOffer to created + createdResourceOffer.Status.VirtualKubeletStatus = sharingv1alpha1.VirtualKubeletStatusCreated + createdResourceOffer.Status.Phase = sharingv1alpha1.ResourceOfferAccepted + Expect(k8sClient.Status().Update(ctx, createdResourceOffer)).ToNot(HaveOccurred()) + + var resourceOffer sharingv1alpha1.ResourceOffer + Eventually(func() bool { + if err := k8sClient.Get(ctx, offerName, &resourceOffer); err != nil { + return false + } + return !resourceOffer.Spec.WithdrawalTimestamp.IsZero() + }, timeout, interval).Should(BeTrue()) + + By("Checking ResourceOffer deletion") + // the ResourceOffer should be deleted when the remote VirtualKubelet will be down + Expect(k8sClient.Get(ctx, offerName, createdResourceOffer)).ToNot(HaveOccurred()) + createdResourceOffer.Status.VirtualKubeletStatus = sharingv1alpha1.VirtualKubeletStatusNone + Expect(k8sClient.Status().Update(ctx, createdResourceOffer)).ToNot(HaveOccurred()) + + Eventually(func() bool { + err := k8sClient.Get(ctx, offerName, &resourceOffer) + return apierrors.IsNotFound(err) + }, timeout, interval).Should(BeTrue()) + By("Checking Tenant Deletion") err := k8sClient.Delete(ctx, &resourceRequest) Expect(err).ToNot(HaveOccurred()) diff --git a/internal/resource-request-operator/utils.go b/internal/resource-request-operator/utils.go index e755089cff..7bb9c582ec 100644 --- a/internal/resource-request-operator/utils.go +++ b/internal/resource-request-operator/utils.go @@ -2,10 +2,13 @@ package resourcerequestoperator import ( "context" + "fmt" "time" corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" @@ -48,9 +51,10 @@ func (r *ResourceRequestReconciler) generateResourceOffer(ctx context.Context, r }) if err != nil { + klog.Error(err) return err } - klog.Infof("%s -> %s Offer: %s", r.ClusterID, op, offer.ObjectMeta.Name) + klog.Infof("%s -> %s Offer: %s/%s", r.ClusterID, op, offer.Namespace, offer.Name) return nil } @@ -124,3 +128,50 @@ func (r *ResourceRequestReconciler) createForeignCluster(ctx context.Context, resourceRequest.Spec.ClusterIdentity.ClusterID, foreignCluster.Name) return nil } + +func (r *ResourceRequestReconciler) invalidateResourceOffer(ctx context.Context, request *discoveryv1alpha1.ResourceRequest) error { + var offer sharingv1alpha1.ResourceOffer + err := r.Client.Get(ctx, types.NamespacedName{ + Namespace: request.GetNamespace(), + Name: offerPrefix + r.ClusterID, + }, &offer) + if apierrors.IsNotFound(err) { + // ignore not found errors + return nil + } + if err != nil { + klog.Error(err) + return err + } + + switch offer.Status.VirtualKubeletStatus { + case sharingv1alpha1.VirtualKubeletStatusDeleting, sharingv1alpha1.VirtualKubeletStatusCreated: + if offer.Spec.WithdrawalTimestamp.IsZero() { + now := metav1.Now() + offer.Spec.WithdrawalTimestamp = &now + } + err = client.IgnoreNotFound(r.Client.Update(ctx, &offer)) + if err != nil { + klog.Error(err) + return err + } + klog.Infof("%s -> Offer: %s/%s", r.ClusterID, offer.Namespace, offer.Name) + return nil + case sharingv1alpha1.VirtualKubeletStatusNone: + err = client.IgnoreNotFound(r.Client.Delete(ctx, &offer)) + if err != nil { + klog.Error(err) + return err + } + if request.Status.OfferWithdrawalTimestamp.IsZero() { + now := metav1.Now() + request.Status.OfferWithdrawalTimestamp = &now + } + klog.Infof("%s -> Deleted Offer: %s/%s", r.ClusterID, offer.Namespace, offer.Name) + return nil + default: + err := fmt.Errorf("unknown VirtualKubeletStatus %v", offer.Status.VirtualKubeletStatus) + klog.Error(err) + return err + } +} diff --git a/pkg/liqo-controller-manager/resourceoffer-controller/resourceoffer_controller.go b/pkg/liqo-controller-manager/resourceoffer-controller/resourceoffer_controller.go index 00b2498ed4..c6c7e504e1 100644 --- a/pkg/liqo-controller-manager/resourceoffer-controller/resourceoffer_controller.go +++ b/pkg/liqo-controller-manager/resourceoffer-controller/resourceoffer_controller.go @@ -129,13 +129,25 @@ func (r *ResourceOfferReconciler) Reconcile(ctx context.Context, req ctrl.Reques } } - if canDeleteVirtualKubeletDeployment(&resourceOffer) { + deletingPhase := getDeleteVirtualKubeletPhase(&resourceOffer) + switch deletingPhase { + case kubeletDeletePhaseNodeDeleted: // delete virtual kubelet deployment if err = r.deleteVirtualKubeletDeployment(ctx, &resourceOffer); err != nil { klog.Error(err) return ctrl.Result{}, err } + resourceOffer.Status.VirtualKubeletStatus = sharingv1alpha1.VirtualKubeletStatusNone return result, nil + case kubeletDeletePhaseDrainingNode: + // set virtual kubelet in deleting phase + resourceOffer.Status.VirtualKubeletStatus = sharingv1alpha1.VirtualKubeletStatusDeleting + case kubeletDeletePhaseNone: + break + default: + err = fmt.Errorf("unknown deleting phase %v", deletingPhase) + klog.Error(err) + return result, err } // create the virtual kubelet deployment diff --git a/pkg/liqo-controller-manager/resourceoffer-controller/resourceoffer_controller_methods.go b/pkg/liqo-controller-manager/resourceoffer-controller/resourceoffer_controller_methods.go index 7278d77b82..867bc5596b 100644 --- a/pkg/liqo-controller-manager/resourceoffer-controller/resourceoffer_controller_methods.go +++ b/pkg/liqo-controller-manager/resourceoffer-controller/resourceoffer_controller_methods.go @@ -225,11 +225,38 @@ func (r *ResourceOfferReconciler) getVirtualKubeletDeployment( return &deployList.Items[0], nil } -func canDeleteVirtualKubeletDeployment(resourceOffer *sharingv1alpha1.ResourceOffer) bool { +type kubeletDeletePhase string + +const ( + kubeletDeletePhaseNone kubeletDeletePhase = "None" + kubeletDeletePhaseDrainingNode kubeletDeletePhase = "DrainingNode" + kubeletDeletePhaseNodeDeleted kubeletDeletePhase = "NodeDeleted" +) + +// getDeleteVirtualKubeletPhase returns the delete phase for the VirtualKubelet created basing on the +// given ResourceOffer. +func getDeleteVirtualKubeletPhase(resourceOffer *sharingv1alpha1.ResourceOffer) kubeletDeletePhase { notAccepted := !isAccepted(resourceOffer) deleting := !resourceOffer.DeletionTimestamp.IsZero() + desiredDelete := !resourceOffer.Spec.WithdrawalTimestamp.IsZero() nodeDrained := !controllerutil.ContainsFinalizer(resourceOffer, consts.NodeFinalizer) - return (notAccepted || deleting) && nodeDrained + + // if the ResourceRequest has not been accepted by the local cluster, + // or it has a DeletionTimestamp not equal to zero (the resource has been deleted), + // or it has a WithdrawalTimestamp not equal to zero (the remote cluster asked for its graceful deletion), + // the VirtualKubelet is in a terminating phase, otherwise return the None phase. + if notAccepted || deleting || desiredDelete { + // if the liqo.io/node finalizer is not set, the remote cluster has been drained and the node has been delete, + // we can then proceed with the VirtualKubelet deletion. + if nodeDrained { + return kubeletDeletePhaseNodeDeleted + } + + // if the finalizer is still present, the node draining has not completed yet, we have to wait before to + // continue the unpeering process. + return kubeletDeletePhaseDrainingNode + } + return kubeletDeletePhaseNone } // isAccepted checks if a ResourceOffer is in Accepted phase. diff --git a/pkg/liqo-controller-manager/resourceoffer-controller/resourceoffer_controller_test.go b/pkg/liqo-controller-manager/resourceoffer-controller/resourceoffer_controller_test.go index da83cf28d8..b64bd93950 100644 --- a/pkg/liqo-controller-manager/resourceoffer-controller/resourceoffer_controller_test.go +++ b/pkg/liqo-controller-manager/resourceoffer-controller/resourceoffer_controller_test.go @@ -45,6 +45,8 @@ var ( controller *ResourceOfferReconciler ctx context.Context cancel context.CancelFunc + + now = metav1.Now() ) func TestIdentityManager(t *testing.T) { @@ -311,44 +313,48 @@ var _ = Describe("ResourceOffer Controller", func() { var _ = Describe("ResourceOffer Operator util functions", func() { - Context("canDeleteVirtualKubeletDeployment", func() { + Context("getDeleteVirtualKubeletPhase", func() { - type canDeleteVirtualKubeletDeploymentTestcase struct { + type getDeleteVirtualKubeletPhaseTestcase struct { resourceOffer *sharingv1alpha1.ResourceOffer expected OmegaMatcher } - DescribeTable("canDeleteVirtualKubeletDeployment table", + DescribeTable("getDeleteVirtualKubeletPhase table", - func(c canDeleteVirtualKubeletDeploymentTestcase) { - Expect(canDeleteVirtualKubeletDeployment(c.resourceOffer)).To(c.expected) + func(c getDeleteVirtualKubeletPhaseTestcase) { + Expect(getDeleteVirtualKubeletPhase(c.resourceOffer)).To(c.expected) }, - Entry("refused ResourceOffer", canDeleteVirtualKubeletDeploymentTestcase{ + Entry("refused ResourceOffer", getDeleteVirtualKubeletPhaseTestcase{ resourceOffer: &sharingv1alpha1.ResourceOffer{ ObjectMeta: metav1.ObjectMeta{ Finalizers: []string{}, }, + Spec: sharingv1alpha1.ResourceOfferSpec{ + WithdrawalTimestamp: &now, + }, Status: sharingv1alpha1.ResourceOfferStatus{ Phase: sharingv1alpha1.ResourceOfferRefused, }, }, - expected: BeTrue(), + expected: Equal(kubeletDeletePhaseNodeDeleted), }), - Entry("accepted ResourceOffer", canDeleteVirtualKubeletDeploymentTestcase{ + Entry("accepted ResourceOffer", getDeleteVirtualKubeletPhaseTestcase{ resourceOffer: &sharingv1alpha1.ResourceOffer{ ObjectMeta: metav1.ObjectMeta{ Finalizers: []string{}, }, + Spec: sharingv1alpha1.ResourceOfferSpec{}, Status: sharingv1alpha1.ResourceOfferStatus{ Phase: sharingv1alpha1.ResourceOfferAccepted, }, }, - expected: BeFalse(), + expected: Equal(kubeletDeletePhaseNone), }), - Entry("accepted ResourceOffer with deletion timestamp", canDeleteVirtualKubeletDeploymentTestcase{ + Entry("accepted ResourceOffer with deletion timestamp", getDeleteVirtualKubeletPhaseTestcase{ resourceOffer: &sharingv1alpha1.ResourceOffer{ ObjectMeta: metav1.ObjectMeta{ DeletionTimestamp: &metav1.Time{ @@ -356,28 +362,30 @@ var _ = Describe("ResourceOffer Operator util functions", func() { }, Finalizers: []string{}, }, + Spec: sharingv1alpha1.ResourceOfferSpec{}, Status: sharingv1alpha1.ResourceOfferStatus{ Phase: sharingv1alpha1.ResourceOfferAccepted, }, }, - expected: BeTrue(), + expected: Equal(kubeletDeletePhaseNodeDeleted), }), - Entry("refused ResourceOffer with finalizer", canDeleteVirtualKubeletDeploymentTestcase{ + Entry("refused ResourceOffer with finalizer", getDeleteVirtualKubeletPhaseTestcase{ resourceOffer: &sharingv1alpha1.ResourceOffer{ ObjectMeta: metav1.ObjectMeta{ Finalizers: []string{ consts.NodeFinalizer, }, }, + Spec: sharingv1alpha1.ResourceOfferSpec{}, Status: sharingv1alpha1.ResourceOfferStatus{ Phase: sharingv1alpha1.ResourceOfferRefused, }, }, - expected: BeFalse(), + expected: Equal(kubeletDeletePhaseDrainingNode), }), - Entry("accepted ResourceOffer with deletion timestamp and finalizer", canDeleteVirtualKubeletDeploymentTestcase{ + Entry("accepted ResourceOffer with deletion timestamp and finalizer", getDeleteVirtualKubeletPhaseTestcase{ resourceOffer: &sharingv1alpha1.ResourceOffer{ ObjectMeta: metav1.ObjectMeta{ DeletionTimestamp: &metav1.Time{ @@ -387,11 +395,44 @@ var _ = Describe("ResourceOffer Operator util functions", func() { consts.NodeFinalizer, }, }, + Spec: sharingv1alpha1.ResourceOfferSpec{}, + Status: sharingv1alpha1.ResourceOfferStatus{ + Phase: sharingv1alpha1.ResourceOfferAccepted, + }, + }, + expected: Equal(kubeletDeletePhaseDrainingNode), + }), + + Entry("desired deletion of ResourceOffer", getDeleteVirtualKubeletPhaseTestcase{ + resourceOffer: &sharingv1alpha1.ResourceOffer{ + ObjectMeta: metav1.ObjectMeta{ + Finalizers: []string{ + consts.NodeFinalizer, + }, + }, + Spec: sharingv1alpha1.ResourceOfferSpec{ + WithdrawalTimestamp: &now, + }, + Status: sharingv1alpha1.ResourceOfferStatus{ + Phase: sharingv1alpha1.ResourceOfferAccepted, + }, + }, + expected: Equal(kubeletDeletePhaseDrainingNode), + }), + + Entry("desired deletion of ResourceOffer without finalizer", getDeleteVirtualKubeletPhaseTestcase{ + resourceOffer: &sharingv1alpha1.ResourceOffer{ + ObjectMeta: metav1.ObjectMeta{ + Finalizers: []string{}, + }, + Spec: sharingv1alpha1.ResourceOfferSpec{ + WithdrawalTimestamp: &now, + }, Status: sharingv1alpha1.ResourceOfferStatus{ Phase: sharingv1alpha1.ResourceOfferAccepted, }, }, - expected: BeFalse(), + expected: Equal(kubeletDeletePhaseNodeDeleted), }), ) diff --git a/pkg/virtualKubelet/liqoNodeProvider/reconciler.go b/pkg/virtualKubelet/liqoNodeProvider/reconciler.go index 1a6d164e43..410f20064d 100644 --- a/pkg/virtualKubelet/liqoNodeProvider/reconciler.go +++ b/pkg/virtualKubelet/liqoNodeProvider/reconciler.go @@ -27,6 +27,12 @@ import ( "github.com/liqotech/liqo/pkg/utils" ) +func isResourceOfferTerminating(resourceOffer *sharingv1alpha1.ResourceOffer) bool { + hasTimestamp := !resourceOffer.DeletionTimestamp.IsZero() + desiredDelete := !resourceOffer.Spec.WithdrawalTimestamp.IsZero() + return hasTimestamp || desiredDelete +} + // The reconciliation function; every time this function is called, // the node status is updated by means of r.updateFromResourceOffer. func (p *LiqoNodeProvider) reconcileNodeFromResourceOffer(event watch.Event) error { @@ -40,7 +46,7 @@ func (p *LiqoNodeProvider) reconcileNodeFromResourceOffer(event watch.Event) err return err } - if event.Type == watch.Deleted || !resourceOffer.DeletionTimestamp.IsZero() { + if event.Type == watch.Deleted || isResourceOfferTerminating(&resourceOffer) { p.updateMutex.Lock() defer p.updateMutex.Unlock() klog.Infof("resourceOffer %v is going to be deleted... set node status not ready", resourceOffer.Name) @@ -331,12 +337,12 @@ func (p *LiqoNodeProvider) updateNode() error { func (p *LiqoNodeProvider) handleResourceOfferDelete(resourceOffer *sharingv1alpha1.ResourceOffer) error { ctx := context.TODO() - if err := p.cordonNode(ctx); err != nil { + if err := client.IgnoreNotFound(p.cordonNode(ctx)); err != nil { klog.Errorf("error cordoning node: %v", err) return err } - if err := p.drainNode(ctx); err != nil { + if err := client.IgnoreNotFound(p.drainNode(ctx)); err != nil { klog.Errorf("error draining node: %v", err) return err } @@ -346,7 +352,7 @@ func (p *LiqoNodeProvider) handleResourceOfferDelete(resourceOffer *sharingv1alp } // delete the node - if err := p.client.CoreV1().Nodes().Delete(ctx, p.node.GetName(), metav1.DeleteOptions{}); err != nil && !kerrors.IsNotFound(err) { + if err := client.IgnoreNotFound(p.client.CoreV1().Nodes().Delete(ctx, p.node.GetName(), metav1.DeleteOptions{})); err != nil { klog.Errorf("error deleting node: %v", err) return err }