Skip to content

Commit

Permalink
ensure no clusterID repetition
Browse files Browse the repository at this point in the history
  • Loading branch information
aleoli committed Aug 5, 2021
1 parent ad7828f commit f340ee4
Show file tree
Hide file tree
Showing 9 changed files with 225 additions and 27 deletions.
11 changes: 9 additions & 2 deletions apis/discovery/v1alpha1/foreigncluster_types.go
Expand Up @@ -47,6 +47,10 @@ const (
// This is only used by the AuthenticationCondition Type, and indicates that
// the identity verification was denied with an empty token.
PeeringConditionStatusEmptyDenied PeeringConditionStatusType = "EmptyDenied"
// PeeringConditionStatusError indicates that an error has occurred.
PeeringConditionStatusError PeeringConditionStatusType = "Error"
// PeeringConditionStatusSuccess indicates that the condition is successful.
PeeringConditionStatusSuccess PeeringConditionStatusType = "Success"
)

// PeeringEnabledType indicates the desired state for the peering with this remote cluster.
Expand Down Expand Up @@ -127,15 +131,17 @@ const (
NetworkStatusCondition PeeringConditionType = "NetworkStatus"
// AuthenticationStatusCondition informs users about the Authentication status.
AuthenticationStatusCondition PeeringConditionType = "AuthenticationStatus"
// ProcessableForeignCluster informs users about the Authentication status.
ProcessForeignClusterStatusCondition PeeringConditionType = "ProcessForeignClusterStatus"
)

// PeeringCondition contains details about state of the peering.
type PeeringCondition struct {
// Type of the peering condition.
// +kubebuilder:validation:Enum="OutgoingPeering";"IncomingPeering";"NetworkStatus";"AuthenticationStatus"
// +kubebuilder:validation:Enum="OutgoingPeering";"IncomingPeering";"NetworkStatus";"AuthenticationStatus";"ProcessForeignClusterStatus"
Type PeeringConditionType `json:"type"`
// Status of the condition.
// +kubebuilder:validation:Enum="None";"Pending";"Established";"Disconnecting";"Denied";"EmptyDenied"
// +kubebuilder:validation:Enum="None";"Pending";"Established";"Disconnecting";"Denied";"EmptyDenied";"Error";"Success"
// +kubebuilder:default="None"
Status PeeringConditionStatusType `json:"status"`
// LastTransitionTime -> timestamp for when the condition last transitioned from one status to another.
Expand All @@ -161,6 +167,7 @@ type TenantNamespaceType struct {

// ForeignCluster is the Schema for the foreignclusters API.
// +kubebuilder:printcolumn:name="ClusterID",type=string,priority=1,JSONPath=`.spec.clusterIdentity.clusterID`
// +kubebuilder:printcolumn:name="ClusterName",type=string,priority=1,JSONPath=`.spec.clusterIdentity.clusterName`
// +kubebuilder:printcolumn:name="Outgoing peering phase",type=string,JSONPath=`.status.peeringConditions[?(@.type == 'OutgoingPeering')].status`
// +kubebuilder:printcolumn:name="Incoming peering phase",type=string,JSONPath=`.status.peeringConditions[?(@.type == 'IncomingPeering')].status`
// +kubebuilder:printcolumn:name="Networking status",type=string,JSONPath=`.status.peeringConditions[?(@.type == 'NetworkStatus')].status`
Expand Down
7 changes: 7 additions & 0 deletions deployments/liqo/crds/discovery.liqo.io_foreignclusters.yaml
Expand Up @@ -21,6 +21,10 @@ spec:
name: ClusterID
priority: 1
type: string
- jsonPath: .spec.clusterIdentity.clusterName
name: ClusterName
priority: 1
type: string
- jsonPath: .status.peeringConditions[?(@.type == 'OutgoingPeering')].status
name: Outgoing peering phase
type: string
Expand Down Expand Up @@ -128,6 +132,8 @@ spec:
- Disconnecting
- Denied
- EmptyDenied
- Error
- Success
type: string
type:
description: Type of the peering condition.
Expand All @@ -136,6 +142,7 @@ spec:
- IncomingPeering
- NetworkStatus
- AuthenticationStatus
- ProcessForeignClusterStatus
type: string
required:
- status
Expand Down
@@ -1,7 +1,6 @@
package foreignclusteroperator

import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2"

"github.com/liqotech/liqo/apis/discovery/v1alpha1"
Expand All @@ -15,10 +14,9 @@ func (r *ForeignClusterReconciler) needsClusterIdentityDefaulting(fc *v1alpha1.F
return fc.Spec.ClusterIdentity.ClusterID == ""
}

// load the default values for that ForeignCluster basing on the AuthUrl value, an HTTP request is sent and the retrieved
// values are applied for the following fields (if they are empty): Namespace, ClusterIdentity.ClusterID, ClusterIdentity.Namespace
// and the TrustMode
// if it returns no error, the ForeignCluster CR has been updated.
// clusterIdentityDefaulting loads the default values for that ForeignCluster basing on the AuthUrl value, an HTTP request
// is sent and the retrieved values are applied for the following fields (if they are empty):
// ClusterIdentity.ClusterID, ClusterIdentity.ClusterName.
func (r *ForeignClusterReconciler) clusterIdentityDefaulting(fc *v1alpha1.ForeignCluster) error {
klog.V(4).Infof("Defaulting ClusterIdentity values for ForeignCluster %v", fc.Name)
ids, err := utils.GetClusterInfo(foreignclusterutils.InsecureSkipTLSVerify(fc), fc.Spec.ForeignAuthURL)
Expand All @@ -37,11 +35,5 @@ func (r *ForeignClusterReconciler) clusterIdentityDefaulting(fc *v1alpha1.Foreig
klog.V(4).Infof("New values:\n\tClusterId:\t%v\n\tClusterName:\t%v",
fc.Spec.ClusterIdentity.ClusterID,
fc.Spec.ClusterIdentity.ClusterName)

// update the ForeignCluster
if _, err = r.crdClient.Resource("foreignclusters").Update(fc.Name, fc, &metav1.UpdateOptions{}); err != nil {
klog.Error(err)
return err
}
return nil
}
Expand Up @@ -167,6 +167,21 @@ func (r *ForeignClusterReconciler) Reconcile(ctx context.Context, req ctrl.Reque
}
}()

// ensure that there are not multiple clusters with the same clusterID
if processable, err := r.isClusterProcessable(ctx, &foreignCluster); err != nil {
klog.Error(err)
return ctrl.Result{}, err
} else if !processable {
klog.Warningf("[%v] ClusterID not processable (%v): %v",
foreignCluster.Spec.ClusterIdentity.ClusterID,
foreignCluster.Name,
peeringconditionsutils.GetMessage(&foreignCluster, discoveryv1alpha1.ProcessForeignClusterStatusCondition))
return ctrl.Result{
Requeue: true,
RequeueAfter: r.RequeueAfter,
}, nil
}

// ------ (2) ensuring prerequirements ------

// ensure the existence of the local TenantNamespace
Expand Down
Expand Up @@ -880,6 +880,99 @@ var _ = Describe("ForeignClusterOperator", func() {

})

Context("Test isClusterProcessable", func() {

It("multiple ForeignClusters with the same clusterID", func() {

fc1 := &discoveryv1alpha1.ForeignCluster{
ObjectMeta: metav1.ObjectMeta{
Name: "cluster-1",
Labels: map[string]string{
discovery.DiscoveryTypeLabel: string(discovery.ManualDiscovery),
discovery.ClusterIDLabel: "cluster-1",
},
},
Spec: discoveryv1alpha1.ForeignClusterSpec{
OutgoingPeeringEnabled: discoveryv1alpha1.PeeringEnabledAuto,
InsecureSkipTLSVerify: pointer.BoolPtr(true),
ForeignAuthURL: "https://example.com",
ClusterIdentity: discoveryv1alpha1.ClusterIdentity{
ClusterID: "cluster-1",
},
},
}

fc2 := fc1.DeepCopy()
fc2.Name = "cluster-2"

Expect(controller.Client.Create(ctx, fc1)).To(Succeed())
// we need at least 1 second of delay between the two creation timestamps
time.Sleep(1 * time.Second)
Expect(controller.Client.Create(ctx, fc2)).To(Succeed())

By("Create the first ForeignCluster")

processable, err := controller.isClusterProcessable(ctx, fc1)
Expect(err).To(Succeed())
Expect(processable).To(BeTrue())
Expect(peeringconditionsutils.GetStatus(fc1, discoveryv1alpha1.ProcessForeignClusterStatusCondition)).
To(Equal(discoveryv1alpha1.PeeringConditionStatusSuccess))

By("Create the second ForeignCluster")

processable, err = controller.isClusterProcessable(ctx, fc2)
Expect(err).To(Succeed())
Expect(processable).To(BeFalse())
Expect(peeringconditionsutils.GetStatus(fc2, discoveryv1alpha1.ProcessForeignClusterStatusCondition)).
To(Equal(discoveryv1alpha1.PeeringConditionStatusError))

By("Delete the first ForeignCluster")

Expect(controller.Client.Delete(ctx, fc1)).To(Succeed())

By("Check that the second ForeignCluster is now processable")

Eventually(func() bool {
processable, err = controller.isClusterProcessable(ctx, fc2)
Expect(err).To(Succeed())
return processable
}, timeout, interval).Should(BeTrue())
Expect(peeringconditionsutils.GetStatus(fc2, discoveryv1alpha1.ProcessForeignClusterStatusCondition)).
To(Equal(discoveryv1alpha1.PeeringConditionStatusSuccess))
})

It("add a cluster with the same clusterID of the local cluster", func() {

fc := &discoveryv1alpha1.ForeignCluster{
ObjectMeta: metav1.ObjectMeta{
Name: "cluster-1",
Labels: map[string]string{
discovery.DiscoveryTypeLabel: string(discovery.ManualDiscovery),
discovery.ClusterIDLabel: controller.clusterID.GetClusterID(),
},
},
Spec: discoveryv1alpha1.ForeignClusterSpec{
OutgoingPeeringEnabled: discoveryv1alpha1.PeeringEnabledAuto,
InsecureSkipTLSVerify: pointer.BoolPtr(true),
ForeignAuthURL: "https://example.com",
ClusterIdentity: discoveryv1alpha1.ClusterIdentity{
ClusterID: controller.clusterID.GetClusterID(),
},
},
}

Expect(controller.Client.Create(ctx, fc)).To(Succeed())

processable, err := controller.isClusterProcessable(ctx, fc)
Expect(err).To(Succeed())
Expect(processable).To(BeFalse())
Expect(peeringconditionsutils.GetStatus(fc, discoveryv1alpha1.ProcessForeignClusterStatusCondition)).
To(Equal(discoveryv1alpha1.PeeringConditionStatusError))

})

})

})

var _ = Describe("PeeringPolicy", func() {
Expand Down
60 changes: 55 additions & 5 deletions internal/discovery/foreign-cluster-operator/validator.go
Expand Up @@ -2,31 +2,33 @@ package foreignclusteroperator

import (
"context"
"fmt"

"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"

discoveryv1alpha1 "github.com/liqotech/liqo/apis/discovery/v1alpha1"
"github.com/liqotech/liqo/pkg/discovery"
foreignclusterutils "github.com/liqotech/liqo/pkg/utils/foreignCluster"
peeringconditionsutils "github.com/liqotech/liqo/pkg/utils/peeringConditions"
)

// validateForeignCluster contains the logic that validates and defaults labels and spec fields.
// TODO: this function will be refactored in a future pr.
func (r *ForeignClusterReconciler) validateForeignCluster(ctx context.Context,
foreignCluster *discoveryv1alpha1.ForeignCluster) (cont bool, res ctrl.Result, err error) {
requireUpdate := false

if r.needsClusterIdentityDefaulting(foreignCluster) {
// this ForeignCluster has not all the required fields, probably it has been added manually, so default to exposed values
// this ForeignCluster has not all the cluster identity fields (clusterID and clusterName),
// get them from the foreignAuthUrl.
if err := r.clusterIdentityDefaulting(foreignCluster); err != nil {
klog.Error(err)
return false, ctrl.Result{
Requeue: true,
RequeueAfter: r.RequeueAfter,
}, err
}
// the resource has been updated, no need to requeue
return false, ctrl.Result{}, nil
requireUpdate = true
}

// set cluster-id label to easy retrieve ForeignClusters by ClusterId,
Expand All @@ -40,7 +42,7 @@ func (r *ForeignClusterReconciler) validateForeignCluster(ctx context.Context,
}

if requireUpdate {
_, err := r.update(foreignCluster)
_, err = r.update(foreignCluster)
if err != nil {
klog.Error(err, err.Error())
return false, ctrl.Result{
Expand All @@ -57,3 +59,51 @@ func (r *ForeignClusterReconciler) validateForeignCluster(ctx context.Context,

return true, ctrl.Result{}, nil
}

// isClusterProcessable checks if the provided ForeignCluster is processable.
// It can not be processable if:
// * the clusterID is the same of the local cluster
// * the same clusterID is already present in a previously created ForeignCluster.
func (r *ForeignClusterReconciler) isClusterProcessable(ctx context.Context,
foreignCluster *discoveryv1alpha1.ForeignCluster) (bool, error) {
foreignClusterID := foreignCluster.Spec.ClusterIdentity.ClusterID

if foreignClusterID == r.clusterID.GetClusterID() {
// this is the local cluster, it is not processable
peeringconditionsutils.EnsureStatus(foreignCluster,
discoveryv1alpha1.ProcessForeignClusterStatusCondition,
discoveryv1alpha1.PeeringConditionStatusError,
"LocalCluster",
"This cluster has the same clusterID of the local cluster",
)

return false, nil
}

foreignClusterWithSameID, err := foreignclusterutils.GetForeignClusterByID(ctx,
r.Client, foreignClusterID)
if err != nil {
klog.Error(err)
return false, err
}

if foreignClusterWithSameID.GetUID() == foreignCluster.GetUID() {
// these are the same resource, no clusterID repetition
peeringconditionsutils.EnsureStatus(foreignCluster,
discoveryv1alpha1.ProcessForeignClusterStatusCondition,
discoveryv1alpha1.PeeringConditionStatusSuccess,
"ForeignClusterProcesssable",
"This ForeignCluster seems to be processable",
)

return true, nil
}

peeringconditionsutils.EnsureStatus(foreignCluster,
discoveryv1alpha1.ProcessForeignClusterStatusCondition,
discoveryv1alpha1.PeeringConditionStatusError,
"ClusterIDRepetition",
fmt.Sprintf("The same clusterID is already present in another ForeignCluster (%v)", foreignClusterWithSameID.GetName()),
)
return false, nil
}
2 changes: 1 addition & 1 deletion internal/discovery/foreign.go
Expand Up @@ -238,5 +238,5 @@ func (discovery *Controller) getForeignClusterByID(clusterID string) (*v1alpha1.
Resource: "foreignclusters",
}, clusterID)
}
return &fcs.Items[0], nil
return foreignclusterutils.GetOlderForeignCluster(fcs), nil
}
17 changes: 16 additions & 1 deletion pkg/utils/foreignCluster/getForeignCluster.go
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -31,5 +32,19 @@ func GetForeignClusterByID(ctx context.Context, cl client.Client, clusterID stri
klog.Error(err)
return nil, err
}
return &foreignClusterList.Items[0], nil
return GetOlderForeignCluster(&foreignClusterList), nil
}

// GetOlderForeignCluster returns the ForeignCluster from the list with the older creationTimestamp.
func GetOlderForeignCluster(
foreignClusterList *discoveryv1alpha1.ForeignClusterList) (foreignCluster *discoveryv1alpha1.ForeignCluster) {
var olderTime *metav1.Time = nil
for i := range foreignClusterList.Items {
fc := &foreignClusterList.Items[i]
if olderTime.IsZero() || fc.CreationTimestamp.Before(olderTime) {
olderTime = &fc.CreationTimestamp
foreignCluster = fc
}
}
return foreignCluster
}

0 comments on commit f340ee4

Please sign in to comment.