Skip to content

Commit

Permalink
fix permission management with one-way peering
Browse files Browse the repository at this point in the history
This commit fixes an issue, related to missing permission, that prevented the successful establishment of a one-way peering.

Now the ForeignCluster Operator handles the permission to assign to the remote cluster and enforces their status.
  • Loading branch information
aleoli committed Jul 9, 2021
1 parent 804313f commit 0cf3eda
Show file tree
Hide file tree
Showing 10 changed files with 504 additions and 153 deletions.
10 changes: 5 additions & 5 deletions internal/crdReplicator/crdReplicator-operator.go
Expand Up @@ -171,7 +171,7 @@ func (c *Controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
}
}

currentPhase := getPeeringPhase(&fc)
currentPhase := foreigncluster.GetPeeringPhase(&fc)
if oldPhase := c.getPeeringPhase(remoteClusterID); oldPhase != currentPhase {
c.setPeeringPhase(remoteClusterID, currentPhase)
defer c.checkResourcesOnPeeringPhaseChange(ctx, remoteClusterID, currentPhase, oldPhase)
Expand Down Expand Up @@ -398,7 +398,7 @@ func (c *Controller) StartWatchers() {
}
for i := range c.RegisteredResources {
res := &c.RegisteredResources[i]
if !isReplicationEnabled(c.getPeeringPhase(remCluster), res) {
if !foreigncluster.IsReplicationEnabled(c.getPeeringPhase(remCluster), res) {
continue
}

Expand Down Expand Up @@ -450,7 +450,7 @@ func (c *Controller) StopWatchers() {
// stop watchers for those resources no more needed
for i := range c.RegisteredResources {
res := &c.RegisteredResources[i]
if isReplicationEnabled(c.getPeeringPhase(remCluster), res) {
if foreigncluster.IsReplicationEnabled(c.getPeeringPhase(remCluster), res) {
continue
}

Expand Down Expand Up @@ -678,7 +678,7 @@ func (c *Controller) AddedHandler(obj *unstructured.Unstructured, gvr schema.Gro
}

resource := c.getResource(&gvr)
if resource == nil || !isReplicationEnabled(c.getPeeringPhase(remoteClusterID), resource) {
if resource == nil || !foreigncluster.IsReplicationEnabled(c.getPeeringPhase(remoteClusterID), resource) {
return
}

Expand Down Expand Up @@ -717,7 +717,7 @@ func (c *Controller) ModifiedHandler(obj *unstructured.Unstructured, gvr schema.
}

resource := c.getResource(&gvr)
if resource == nil || !isReplicationEnabled(c.getPeeringPhase(remoteClusterID), resource) {
if resource == nil || !foreigncluster.IsReplicationEnabled(c.getPeeringPhase(remoteClusterID), resource) {
return
}

Expand Down
44 changes: 1 addition & 43 deletions internal/crdReplicator/peeringPhase.go
Expand Up @@ -9,7 +9,6 @@ import (
"k8s.io/klog/v2"

configv1alpha1 "github.com/liqotech/liqo/apis/config/v1alpha1"
discoveryv1alpha1 "github.com/liqotech/liqo/apis/discovery/v1alpha1"
"github.com/liqotech/liqo/pkg/consts"
foreigncluster "github.com/liqotech/liqo/pkg/utils/foreignCluster"
)
Expand All @@ -23,7 +22,7 @@ func (c *Controller) checkResourcesOnPeeringPhaseChange(ctx context.Context,
remoteClusterID string, currentPhase, oldPhase consts.PeeringPhase) {
for i := range c.RegisteredResources {
res := &c.RegisteredResources[i]
if !isReplicationEnabled(oldPhase, res) && isReplicationEnabled(currentPhase, res) {
if !foreigncluster.IsReplicationEnabled(oldPhase, res) && foreigncluster.IsReplicationEnabled(currentPhase, res) {
// this change has triggered the replication on this resource
klog.Infof("phase from %v to %v triggers replication on resource %v",
oldPhase, currentPhase, res.GroupVersionResource)
Expand Down Expand Up @@ -90,44 +89,3 @@ func (c *Controller) setPeeringPhase(clusterID string, phase consts.PeeringPhase
}
c.peeringPhases[clusterID] = phase
}

// getPeeringPhase returns the peering phase for a fiver ForignCluster CR.
func getPeeringPhase(fc *discoveryv1alpha1.ForeignCluster) consts.PeeringPhase {
incoming := foreigncluster.IsIncomingEnabled(fc)
outgoing := foreigncluster.IsOutgoingEnabled(fc)
if incoming && outgoing {
return consts.PeeringPhaseBidirectional
}
if incoming {
return consts.PeeringPhaseIncoming
}
if outgoing {
return consts.PeeringPhaseOutgoing
}
return consts.PeeringPhaseNone
}

// isReplicationEnabled indicates if the replication has to be enabled for a given peeringPhase
// and a given CRD.
func isReplicationEnabled(peeringPhase consts.PeeringPhase, resource *configv1alpha1.Resource) bool {
switch resource.PeeringPhase {
case consts.PeeringPhaseNone:
return false
case consts.PeeringPhaseAll:
return true
case consts.PeeringPhaseBidirectional:
return peeringPhase == consts.PeeringPhaseBidirectional
case consts.PeeringPhaseIncoming:
return peeringPhase == consts.PeeringPhaseBidirectional || peeringPhase == consts.PeeringPhaseIncoming
case consts.PeeringPhaseOutgoing:
return peeringPhase == consts.PeeringPhaseBidirectional || peeringPhase == consts.PeeringPhaseOutgoing
case consts.PeeringPhaseEstablished:
bidirectional := peeringPhase == consts.PeeringPhaseBidirectional
incoming := peeringPhase == consts.PeeringPhaseIncoming
outgoing := peeringPhase == consts.PeeringPhaseOutgoing
return bidirectional || incoming || outgoing
default:
klog.Info("unknown peeringPhase %v", resource.PeeringPhase)
return false
}
}
98 changes: 0 additions & 98 deletions internal/crdReplicator/peeringPhase_test.go
Expand Up @@ -41,104 +41,6 @@ const (
interval = time.Millisecond * 250
)

var _ = Describe("PeeringPhase", func() {

Context("getPeeringPhase", func() {
type getPeeringPhaseTestcase struct {
foreignCluster *discoveryv1alpha1.ForeignCluster
expectedPhase consts.PeeringPhase
}

DescribeTable("getPeeringPhase table",
func(c getPeeringPhaseTestcase) {
phase := getPeeringPhase(c.foreignCluster)
Expect(phase).To(Equal(c.expectedPhase))
},

Entry("bidirectional", getPeeringPhaseTestcase{
foreignCluster: &discoveryv1alpha1.ForeignCluster{
Status: discoveryv1alpha1.ForeignClusterStatus{
PeeringConditions: []discoveryv1alpha1.PeeringCondition{
{
Type: discoveryv1alpha1.IncomingPeeringCondition,
Status: discoveryv1alpha1.PeeringConditionStatusEstablished,
LastTransitionTime: metav1.Now(),
},
{
Type: discoveryv1alpha1.OutgoingPeeringCondition,
Status: discoveryv1alpha1.PeeringConditionStatusEstablished,
LastTransitionTime: metav1.Now(),
},
},
},
},
expectedPhase: consts.PeeringPhaseBidirectional,
}),

Entry("incoming", getPeeringPhaseTestcase{
foreignCluster: &discoveryv1alpha1.ForeignCluster{
Status: discoveryv1alpha1.ForeignClusterStatus{
PeeringConditions: []discoveryv1alpha1.PeeringCondition{
{
Type: discoveryv1alpha1.IncomingPeeringCondition,
Status: discoveryv1alpha1.PeeringConditionStatusEstablished,
LastTransitionTime: metav1.Now(),
},
{
Type: discoveryv1alpha1.OutgoingPeeringCondition,
Status: discoveryv1alpha1.PeeringConditionStatusNone,
LastTransitionTime: metav1.Now(),
},
},
},
},
expectedPhase: consts.PeeringPhaseIncoming,
}),

Entry("outgoing", getPeeringPhaseTestcase{
foreignCluster: &discoveryv1alpha1.ForeignCluster{
Status: discoveryv1alpha1.ForeignClusterStatus{
PeeringConditions: []discoveryv1alpha1.PeeringCondition{
{
Type: discoveryv1alpha1.IncomingPeeringCondition,
Status: discoveryv1alpha1.PeeringConditionStatusNone,
LastTransitionTime: metav1.Now(),
},
{
Type: discoveryv1alpha1.OutgoingPeeringCondition,
Status: discoveryv1alpha1.PeeringConditionStatusEstablished,
LastTransitionTime: metav1.Now(),
},
},
},
},
expectedPhase: consts.PeeringPhaseOutgoing,
}),

Entry("none", getPeeringPhaseTestcase{
foreignCluster: &discoveryv1alpha1.ForeignCluster{
Status: discoveryv1alpha1.ForeignClusterStatus{
PeeringConditions: []discoveryv1alpha1.PeeringCondition{
{
Type: discoveryv1alpha1.IncomingPeeringCondition,
Status: discoveryv1alpha1.PeeringConditionStatusNone,
LastTransitionTime: metav1.Now(),
},
{
Type: discoveryv1alpha1.OutgoingPeeringCondition,
Status: discoveryv1alpha1.PeeringConditionStatusNone,
LastTransitionTime: metav1.Now(),
},
},
},
},
expectedPhase: consts.PeeringPhaseNone,
}),
)
})

})

var _ = Describe("PeeringPhase-Based Replication", func() {

var (
Expand Down
Expand Up @@ -228,7 +228,15 @@ func (r *ForeignClusterReconciler) Reconcile(ctx context.Context, req ctrl.Reque
return ctrl.Result{}, err
}

// ------ (5) garbage collection ------
// ------ (5) ensuring permission ------

// ensure the permission for the current peering phase
if err = r.ensurePermission(ctx, &foreignCluster); err != nil {
klog.Error(err)
return ctrl.Result{}, err
}

// ------ (6) garbage collection ------

// check if this ForeignCluster needs to be deleted. It could happen, for example, if it has been discovered
// thanks to incoming peeringRequest and it has no active connections
Expand Down

0 comments on commit 0cf3eda

Please sign in to comment.