Skip to content

Commit

Permalink
resources phase signaling
Browse files Browse the repository at this point in the history
  • Loading branch information
aleoli committed Jul 2, 2021
1 parent 277e93b commit 1f90e60
Show file tree
Hide file tree
Showing 15 changed files with 364 additions and 40 deletions.
11 changes: 10 additions & 1 deletion apis/discovery/v1alpha1/resourcerequest_types.go
Expand Up @@ -6,6 +6,7 @@ import (
"k8s.io/client-go/kubernetes/scheme"

advtypes "github.com/liqotech/liqo/apis/sharing/v1alpha1"
"github.com/liqotech/liqo/pkg/consts"
crdclient "github.com/liqotech/liqo/pkg/crdClient"
object_references "github.com/liqotech/liqo/pkg/object-references"
)
Expand All @@ -22,15 +23,24 @@ type ResourceRequestSpec struct {
ClusterIdentity ClusterIdentity `json:"clusterIdentity"`
// Local auth service address
AuthURL string `json:"authUrl"`
// OfferPhase indicate the desired state for the ResourceOffer
// +kubebuilder:validation:Enum="Create";"Delete"
// +kubebuilder:default="Create"
OfferPhase consts.ResourceDesiredPhase `json:"offerPhase,omitempty"`
}

// ResourceRequestStatus defines the observed state of ResourceRequest.
type ResourceRequestStatus struct {
BroadcasterRef *object_references.DeploymentReference `json:"broadcasterRef,omitempty"`
AdvertisementStatus advtypes.AdvPhase `json:"advertisementStatus,omitempty"`
// OfferPhase indicate the observed state for the ResourceOffer
// +kubebuilder:validation:Enum="Created";"Deleted"
// +kubebuilder:default="Deleted"
OfferPhase consts.ResourceObservedPhase `json:"offerPhase,omitempty"`
}

// +kubebuilder:object:root=true
// +kubebuilder:subresource:status

// ResourceRequest is the Schema for the ResourceRequests API.
type ResourceRequest struct {
Expand All @@ -42,7 +52,6 @@ type ResourceRequest struct {
}

// +kubebuilder:object:root=true
// +kubebuilder:subresource:status

// ResourceRequestList contains a list of ResourceRequest.
type ResourceRequestList struct {
Expand Down
6 changes: 6 additions & 0 deletions apis/sharing/v1alpha1/resourceoffer_types.go
Expand Up @@ -3,6 +3,8 @@ package v1alpha1
import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/liqotech/liqo/pkg/consts"
)

// ResourceOfferSpec defines the desired state of ResourceOffer.
Expand All @@ -23,6 +25,10 @@ type ResourceOfferSpec struct {
// TimeToLive is the time instant until this ResourceOffer will be valid.
// If not refreshed, an ResourceOffer will expire after 30 minutes.
TimeToLive metav1.Time `json:"timeToLive"`
// VirtualNodePhase indicate the desired state for the virtual node
// +kubebuilder:validation:Enum="Create";"Delete"
// +kubebuilder:default="Create"
VirtualNodePhase consts.ResourceDesiredPhase `json:"virtualKubeletPhase,omitempty"`
}

// OfferPhase describes the phase of the ResourceOffer.
Expand Down
16 changes: 16 additions & 0 deletions deployments/liqo/crds/discovery.liqo.io_resourcerequests.yaml
Expand Up @@ -52,6 +52,13 @@ spec:
required:
- clusterID
type: object
offerPhase:
default: Create
description: OfferPhase indicate the desired state for the ResourceOffer
enum:
- Create
- Delete
type: string
required:
- authUrl
- clusterIdentity
Expand All @@ -75,10 +82,19 @@ spec:
name must be unique.
type: string
type: object
offerPhase:
default: Deleted
description: OfferPhase indicate the observed state for the ResourceOffer
enum:
- Created
- Deleted
type: string
type: object
type: object
served: true
storage: true
subresources:
status: {}
status:
acceptedNames:
kind: ""
Expand Down
8 changes: 8 additions & 0 deletions deployments/liqo/crds/sharing.liqo.io_resourceoffers.yaml
Expand Up @@ -166,6 +166,14 @@ spec:
was created.
format: date-time
type: string
virtualKubeletPhase:
default: Create
description: VirtualNodePhase indicate the desired state for the virtual
node
enum:
- Create
- Delete
type: string
required:
- clusterId
- timeToLive
Expand Down
Expand Up @@ -167,7 +167,8 @@ func (r *ForeignClusterReconciler) Reconcile(ctx context.Context, req ctrl.Reque
// ------ (3) peering/unpeering logic ------

// read the ForeignCluster status and ensure the peering state
switch r.getDesiredOutgoingPeeringState(&foreignCluster) {
phase := r.getDesiredOutgoingPeeringState(&foreignCluster)
switch phase {
case desiredPeeringPhasePeering:
if err = r.peerNamespaced(ctx, &foreignCluster); err != nil {
klog.Error(err)
Expand All @@ -178,6 +179,10 @@ func (r *ForeignClusterReconciler) Reconcile(ctx context.Context, req ctrl.Reque
klog.Error(err)
return ctrl.Result{}, err
}
default:
err := fmt.Errorf("unknown phase %v", phase)
klog.Error(err)
return ctrl.Result{}, err
}

// ------ (4) update peering conditions ------
Expand Down Expand Up @@ -337,14 +342,41 @@ func (r *ForeignClusterReconciler) Unpeer(fc *discoveryv1alpha1.ForeignCluster,
// unpeerNamespaced disables the peering deleting the resources in the correct TenantControlNamespace.
func (r *ForeignClusterReconciler) unpeerNamespaced(ctx context.Context,
foreignCluster *discoveryv1alpha1.ForeignCluster) error {
// resource request has to be removed
err := r.deleteResourceRequest(ctx, foreignCluster)
if err != nil && !errors.IsNotFound(err) {
var resourceRequest discoveryv1alpha1.ResourceRequest
err := r.Client.Get(ctx, types.NamespacedName{
Namespace: foreignCluster.Status.TenantControlNamespace.Local,
Name: r.clusterID.GetClusterID(),
}, &resourceRequest)
if errors.IsNotFound(err) {
foreignCluster.Status.Outgoing.PeeringPhase = discoveryv1alpha1.PeeringPhaseNone
return nil
}
if err != nil {
klog.Error(err)
return err
}

foreignCluster.Status.Outgoing.PeeringPhase = discoveryv1alpha1.PeeringPhaseDisconnecting
switch resourceRequest.Status.OfferPhase {
case liqoconst.ResourceObservedPhaseCreated:
resourceRequest.Spec.OfferPhase = liqoconst.ResourceDesiredPhaseDelete
err = r.Client.Update(ctx, &resourceRequest)
if err != nil && !errors.IsNotFound(err) {
klog.Error(err)
return err
}
foreignCluster.Status.Outgoing.PeeringPhase = discoveryv1alpha1.PeeringPhaseDisconnecting
case liqoconst.ResourceObservedPhaseDeleted:
err = r.deleteResourceRequest(ctx, foreignCluster)
if err != nil && !errors.IsNotFound(err) {
klog.Error(err)
return err
}
foreignCluster.Status.Outgoing.PeeringPhase = discoveryv1alpha1.PeeringPhaseNone
default:
err := fmt.Errorf("unknown phase %v", resourceRequest.Status.OfferPhase)
klog.Error(err)
return err
}
return nil
}

Expand Down Expand Up @@ -400,6 +432,14 @@ func getPeeringPhase(resourceRequestList *discoveryv1alpha1.ResourceRequestList)
case 0:
return discoveryv1alpha1.PeeringPhaseNone, nil
case 1:
resourceRequest := &resourceRequestList.Items[0]
desired := resourceRequest.Spec.OfferPhase
observed := resourceRequest.Status.OfferPhase
if desired == liqoconst.ResourceDesiredPhaseCreate && observed == liqoconst.ResourceObservedPhaseDeleted {
return discoveryv1alpha1.PeeringPhasePending, nil
} else if desired == liqoconst.ResourceDesiredPhaseDelete && observed == liqoconst.ResourceObservedPhaseCreated {
return discoveryv1alpha1.PeeringPhaseDisconnecting, nil
}
return discoveryv1alpha1.PeeringPhaseEstablished, nil
default:
err := fmt.Errorf("more than one resource request found")
Expand Down
Expand Up @@ -19,6 +19,7 @@ import (
discoveryv1alpha1 "github.com/liqotech/liqo/apis/discovery/v1alpha1"
crdreplicator "github.com/liqotech/liqo/internal/crdReplicator"
"github.com/liqotech/liqo/pkg/clusterid/test"
"github.com/liqotech/liqo/pkg/consts"
"github.com/liqotech/liqo/pkg/discovery"
identitymanager "github.com/liqotech/liqo/pkg/identityManager"
tenantcontrolnamespace "github.com/liqotech/liqo/pkg/tenantControlNamespace"
Expand Down Expand Up @@ -422,6 +423,7 @@ var _ = Describe("ForeignClusterOperator", func() {
// populate the resourcerequest CR
c.rr.Name = controller.clusterID.GetClusterID()
c.rr.Spec.ClusterIdentity.ClusterID = c.fc.Spec.ClusterIdentity.ClusterID
c.rr.Spec.OfferPhase = consts.ResourceDesiredPhaseCreate
c.rr.Labels = resourceRequestLabels(c.fc.Spec.ClusterIdentity.ClusterID)

// create the foreigncluster CR
Expand Down Expand Up @@ -451,6 +453,16 @@ var _ = Describe("ForeignClusterOperator", func() {
Expect(ok).To(BeTrue())
Expect(rr).NotTo(BeNil())

// set the ResourceRequest status to created
rr.Status.OfferPhase = consts.ResourceObservedPhaseCreated
obj, err = controller.crdClient.Resource("resourcerequests").Namespace(tenantNamespace.Name).UpdateStatus(rr.Name, rr, &metav1.UpdateOptions{})
Expect(err).To(BeNil())
Expect(obj).NotTo(BeNil())

rr, ok = obj.(*discoveryv1alpha1.ResourceRequest)
Expect(ok).To(BeTrue())
Expect(rr).NotTo(BeNil())

// disable the peering for that foreigncluster
err = controller.unpeerNamespaced(ctx, fc)
Expect(err).To(BeNil())
Expand All @@ -469,8 +481,37 @@ var _ = Describe("ForeignClusterOperator", func() {
Expect(rrs).NotTo(BeNil())

// check that the length of the resource request list is the expected one,
// and the resource request has been deleted in the correct namespace
// and the resource request has been set for deletion in the correct namespace
Expect(len(rrs.Items)).To(c.expectedPeeringLength)
if len(rrs.Items) > 0 {
Expect(rrs.Items[0].Spec.OfferPhase).To(Equal(consts.ResourceDesiredPhaseDelete))
rr = &rrs.Items[0]
}

// set the ResourceRequest status to deleted
rr.Status.OfferPhase = consts.ResourceObservedPhaseDeleted
obj, err = controller.crdClient.Resource("resourcerequests").Namespace(tenantNamespace.Name).UpdateStatus(rr.Name, rr, &metav1.UpdateOptions{})
Expect(err).To(BeNil())
Expect(obj).NotTo(BeNil())

rr, ok = obj.(*discoveryv1alpha1.ResourceRequest)
Expect(ok).To(BeTrue())
Expect(rr).NotTo(BeNil())

// call for the second time the unpeer function to delete the ResourceRequest
err = controller.unpeerNamespaced(ctx, fc)
Expect(err).To(BeNil())

// get the resource requests in the local tenant namespace
obj, err = controller.crdClient.Resource("resourcerequests").Namespace(tenantNamespace.Name).List(&metav1.ListOptions{})
Expect(err).To(BeNil())
Expect(obj).NotTo(BeNil())

rrs, ok = obj.(*discoveryv1alpha1.ResourceRequestList)
Expect(ok).To(BeTrue())
Expect(rrs).NotTo(BeNil())

Expect(len(rrs.Items)).To(BeNumerically("==", 0))
},

Entry("unpeer", unpeerTestcase{
Expand Down Expand Up @@ -512,7 +553,7 @@ var _ = Describe("ForeignClusterOperator", func() {
AuthURL: "",
},
},
expectedPeeringLength: Equal(0),
expectedPeeringLength: Equal(1),
expectedOutgoing: Equal(discoveryv1alpha1.Outgoing{
PeeringPhase: discoveryv1alpha1.PeeringPhaseDisconnecting,
}),
Expand Down
Expand Up @@ -10,6 +10,7 @@ import (

discoveryv1alpha1 "github.com/liqotech/liqo/apis/discovery/v1alpha1"
crdreplicator "github.com/liqotech/liqo/internal/crdReplicator"
"github.com/liqotech/liqo/pkg/consts"
)

// createResourceRequest creates a resource request to be sent to the specified ForeignCluster.
Expand Down Expand Up @@ -55,7 +56,8 @@ func (r *ForeignClusterReconciler) createResourceRequest(ctx context.Context,
ClusterID: localClusterID,
ClusterName: r.ConfigProvider.GetConfig().ClusterName,
},
AuthURL: authURL,
AuthURL: authURL,
OfferPhase: consts.ResourceDesiredPhaseCreate,
}

return controllerutil.SetControllerReference(foreignCluster, resourceRequest, r.Scheme)
Expand All @@ -64,7 +66,8 @@ func (r *ForeignClusterReconciler) createResourceRequest(ctx context.Context,
klog.Error(err)
return controllerutil.OperationResultNone, err
}
klog.Infof("[%v] ensured the existence of ResourceRequest (with %v operation)", remoteClusterID, result)
klog.Infof("[%v] ensured the existence of ResourceRequest with phase %v (with %v operation)",
remoteClusterID, result, resourceRequest.Spec.OfferPhase)

return result, nil
}
Expand Down
33 changes: 28 additions & 5 deletions internal/resource-request-operator/resourceRequest_controller.go
Expand Up @@ -2,6 +2,7 @@ package resourcerequestoperator

import (
"context"
"fmt"
"time"

"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -14,6 +15,7 @@ import (
discoveryv1alpha1 "github.com/liqotech/liqo/apis/discovery/v1alpha1"
sharingv1alpha1 "github.com/liqotech/liqo/apis/sharing/v1alpha1"
crdreplicator "github.com/liqotech/liqo/internal/crdReplicator"
"github.com/liqotech/liqo/pkg/consts"
)

// ResourceRequestReconciler reconciles a ResourceRequest object.
Expand All @@ -37,9 +39,9 @@ const (
// +kubebuilder:rbac:groups=capsule.clastix.io,resources=tenants,verbs=get;list;watch;create;update;patch;delete;

// Reconcile is the main function of the controller which reconciles ResourceRequest resources.
func (r *ResourceRequestReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
func (r *ResourceRequestReconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ctrl.Result, err error) {
var resourceRequest discoveryv1alpha1.ResourceRequest
err := r.Get(ctx, req.NamespacedName, &resourceRequest)
err = r.Get(ctx, req.NamespacedName, &resourceRequest)
if err != nil {
klog.Errorf("unable to get resourceRequest %s: %s", req.NamespacedName, err)
return ctrl.Result{}, nil
Expand Down Expand Up @@ -80,9 +82,30 @@ func (r *ResourceRequestReconciler) Reconcile(ctx context.Context, req ctrl.Requ
return ctrl.Result{}, err
}

err = r.generateResourceOffer(ctx, &resourceRequest)
if err != nil {
klog.Errorf("%s -> Error generating resourceOffer: %s", remoteClusterID, err)
defer func() {
newErr := r.Client.Status().Update(ctx, &resourceRequest)
if newErr != nil {
klog.Error(newErr)
err = newErr
}
}()

switch resourceRequest.Spec.OfferPhase {
case consts.ResourceDesiredPhaseCreate:
err = r.generateResourceOffer(ctx, &resourceRequest)
if err != nil {
klog.Errorf("%s -> Error generating resourceOffer: %s", remoteClusterID, err)
return ctrl.Result{}, err
}
case consts.ResourceDesiredPhaseDelete:
err = r.invalidateResourceOffer(ctx, &resourceRequest)
if err != nil {
klog.Errorf("%s -> Error invalidating resourceOffer: %s", remoteClusterID, err)
return ctrl.Result{}, err
}
default:
err = fmt.Errorf("unknown phase %v", resourceRequest.Spec.OfferPhase)
klog.Error(err)
return ctrl.Result{}, err
}

Expand Down

0 comments on commit 1f90e60

Please sign in to comment.