Skip to content

Commit

Permalink
resource replication phase test
Browse files Browse the repository at this point in the history
  • Loading branch information
aleoli committed Jun 17, 2021
1 parent aaac77d commit eb65c61
Show file tree
Hide file tree
Showing 5 changed files with 312 additions and 3 deletions.
65 changes: 64 additions & 1 deletion internal/crdReplicator/crdReplicator-operator_test.go
Expand Up @@ -2,6 +2,7 @@ package crdreplicator

import (
"context"
"os"
"testing"
"time"

Expand All @@ -13,6 +14,7 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/klog/v2"

configv1alpha1 "github.com/liqotech/liqo/apis/config/v1alpha1"
discoveryv1alpha1 "github.com/liqotech/liqo/apis/discovery/v1alpha1"
Expand Down Expand Up @@ -51,6 +53,34 @@ func getObj() *unstructured.Unstructured {
return networkConfig
}

func getObjNamespaced() *unstructured.Unstructured {
resourceRequest := &discoveryv1alpha1.ResourceRequest{
TypeMeta: metav1.TypeMeta{
Kind: "ResourceRequest",
APIVersion: discoveryv1alpha1.GroupVersion.String(),
},
ObjectMeta: metav1.ObjectMeta{
Name: "resourcerequest",
Namespace: "default",
},
Spec: discoveryv1alpha1.ResourceRequestSpec{
AuthURL: "https://example.com",
ClusterIdentity: discoveryv1alpha1.ClusterIdentity{
ClusterID: "id",
},
},
}
resourceRequest.SetLabels(getLabels())
obj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(resourceRequest)
if err != nil {
klog.Error(err)
os.Exit(1)
}
return &unstructured.Unstructured{
Object: obj,
}
}

func getLabels() map[string]string {
return map[string]string{
LocalLabelSelector: "true",
Expand Down Expand Up @@ -78,10 +108,22 @@ func getCRDReplicator() Controller {
IdentityManager: identitymanager.NewCertificateIdentityManager(k8sclient, clusterIDInterface, tenantmanager),
LocalToRemoteNamespaceMapper: map[string]string{},
RemoteToLocalNamespaceMapper: map[string]string{},
ClusterIDToLocalNamespaceMapper: map[string]string{},
ClusterIDToRemoteNamespaceMapper: map[string]string{},
}
}

func setupReplication(d *Controller, ownership consts.OwnershipType) {
d.ClusterIDToLocalNamespaceMapper["testRemoteClusterID"] = "default"
d.RegisteredResources = []configv1alpha1.Resource{
{
GroupVersionResource: metav1.GroupVersionResource(netv1alpha1.NetworkConfigGroupVersionResource),
PeeringPhase: consts.PeeringPhaseAll,
Ownership: ownership,
},
}
}

func TestCRDReplicatorReconciler_CreateResource(t *testing.T) {
networkConfig := getObj()
d := getCRDReplicator()
Expand Down Expand Up @@ -266,6 +308,8 @@ func TestCRDReplicatorReconciler_StartAndStopWatchers(t *testing.T) {

func TestCRDReplicatorReconciler_AddedHandler(t *testing.T) {
d := getCRDReplicator()
setupReplication(&d, consts.OwnershipShared)

//test 1
//adding a resource kind that exists on the cluster
//we expect the resource to be created
Expand All @@ -289,6 +333,7 @@ func TestCRDReplicatorReconciler_AddedHandler(t *testing.T) {
}
func TestCRDReplicatorReconciler_ModifiedHandler(t *testing.T) {
d := getCRDReplicator()
setupReplication(&d, consts.OwnershipLocal)

//test 1
//the modified resource does not exist on the cluster
Expand Down Expand Up @@ -322,7 +367,7 @@ func TestCRDReplicatorReconciler_ModifiedHandler(t *testing.T) {
assert.Nil(t, err)
obj.SetLabels(test1.GetLabels())
d.ModifiedHandler(obj, gvr)
time.Sleep(10 * time.Second)
time.Sleep(1 * time.Second)
newObj, err := dynClient.Resource(gvr).Get(context.TODO(), test1.GetName(), metav1.GetOptions{})
assert.Nil(t, err, "error should be empty")
assert.True(t, areEqual(newObj, obj), "the two objects should be equal")
Expand All @@ -333,6 +378,7 @@ func TestCRDReplicatorReconciler_ModifiedHandler(t *testing.T) {

func TestCRDReplicatorReconciler_RemoteResourceModifiedHandler(t *testing.T) {
d := getCRDReplicator()
setupReplication(&d, consts.OwnershipShared)

//test 1
//the modified resource does not exist on the cluster
Expand All @@ -358,6 +404,23 @@ func TestCRDReplicatorReconciler_RemoteResourceModifiedHandler(t *testing.T) {
assert.Nil(t, err, "error should be empty")
assert.NotEqual(t, obj.GetLabels(), test1.GetLabels(), "the labels of the two objects should be ")

// test 3
// the modified resource already exists on the cluster
// we modify some fields in the status
// we expect the resource to be modified and the error to be nil
test1, err = dynClient.Resource(gvr).Get(context.TODO(), test1.GetName(), metav1.GetOptions{})
assert.Nil(t, err, "error should be nil")
newStatus := map[string]interface{}{
"processed": true,
}
err = unstructured.SetNestedMap(obj.Object, newStatus, "status")
assert.Nil(t, err, "error should be nil")
d.RemoteResourceModifiedHandler(test1, gvr, remoteClusterID, consts.OwnershipShared)
time.Sleep(1 * time.Second)
obj, err = dynClient.Resource(gvr).Get(context.TODO(), test1.GetName(), metav1.GetOptions{})
assert.Nil(t, err, "error should be empty")
assert.Equal(t, obj, test1, "the two objects should be equal")

//clean up the resource
err = dynClient.Resource(gvr).Delete(context.TODO(), test1.GetName(), metav1.DeleteOptions{})
assert.Nil(t, err, "should be nil")
Expand Down
225 changes: 224 additions & 1 deletion internal/crdReplicator/peeringPhase_test.go
@@ -1,21 +1,46 @@
package crdreplicator

import (
"context"
"os"
"path/filepath"
"strings"
"testing"
"time"

. "github.com/onsi/ginkgo"
. "github.com/onsi/ginkgo/extensions/table"
. "github.com/onsi/gomega"
"github.com/onsi/gomega/types"
v1 "k8s.io/api/core/v1"
kerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/kubernetes"
"sigs.k8s.io/controller-runtime/pkg/manager"

configv1alpha1 "github.com/liqotech/liqo/apis/config/v1alpha1"
discoveryv1alpha1 "github.com/liqotech/liqo/apis/discovery/v1alpha1"
netv1alpha1 "github.com/liqotech/liqo/apis/net/v1alpha1"
"github.com/liqotech/liqo/pkg/clusterid"
"github.com/liqotech/liqo/pkg/consts"
identitymanager "github.com/liqotech/liqo/pkg/identityManager"
tenantcontrolnamespace "github.com/liqotech/liqo/pkg/tenantControlNamespace"
testUtils "github.com/liqotech/liqo/pkg/utils/testUtils"
)

func TestDiscovery(t *testing.T) {
func TestPeeringPhase(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "PeeringPhase")
}

const (
timeout = time.Second * 30
interval = time.Millisecond * 250
)

var _ = Describe("PeeringPhase", func() {

Context("getPeeringPhase", func() {
Expand Down Expand Up @@ -89,3 +114,201 @@ var _ = Describe("PeeringPhase", func() {
})

})

var _ = Describe("PeeringPhase-Based Replication", func() {

var (
cluster testUtils.Cluster
controller Controller
mgr manager.Manager
ctx context.Context
cancel context.CancelFunc
)

BeforeEach(func() {
ctx, cancel = context.WithCancel(context.Background())

var err error
cluster, mgr, err = testUtils.NewTestCluster([]string{filepath.Join("..", "..", "deployments", "liqo", "crds")})
if err != nil {
By(err.Error())
os.Exit(1)
}

k8sclient = kubernetes.NewForConfigOrDie(mgr.GetConfig())

tenantmanager := tenantcontrolnamespace.NewTenantControlNamespaceManager(k8sclient)
clusterIDInterface := clusterid.NewStaticClusterID(localClusterID)

dynClient := dynamic.NewForConfigOrDie(mgr.GetConfig())
dynFac := dynamicinformer.NewFilteredDynamicSharedInformerFactory(dynClient, ResyncPeriod, metav1.NamespaceAll, func(options *metav1.ListOptions) {
//we want to watch only the resources that have been created by us on the remote cluster
if options.LabelSelector == "" {
newLabelSelector := []string{RemoteLabelSelector, "=", localClusterID}
options.LabelSelector = strings.Join(newLabelSelector, "")
} else {
newLabelSelector := []string{options.LabelSelector, RemoteLabelSelector, "=", localClusterID}
options.LabelSelector = strings.Join(newLabelSelector, "")
}
})

localDynFac := dynamicinformer.NewFilteredDynamicSharedInformerFactory(dynClient, ResyncPeriod, metav1.NamespaceAll, nil)

controller = Controller{
Scheme: mgr.GetScheme(),
Client: mgr.GetClient(),
ClusterID: localClusterID,
RemoteDynClients: map[string]dynamic.Interface{remoteClusterID: dynClient},
RemoteDynSharedInformerFactory: map[string]dynamicinformer.DynamicSharedInformerFactory{remoteClusterID: dynFac},
RegisteredResources: nil,
UnregisteredResources: nil,
RemoteWatchers: map[string]map[string]chan struct{}{},
LocalDynClient: dynClient,
LocalDynSharedInformerFactory: localDynFac,
LocalWatchers: map[string]chan struct{}{},

UseNewAuth: false,
NamespaceManager: tenantmanager,
IdentityManager: identitymanager.NewCertificateIdentityManager(k8sclient, clusterIDInterface, tenantmanager),
LocalToRemoteNamespaceMapper: map[string]string{},
RemoteToLocalNamespaceMapper: map[string]string{},
ClusterIDToLocalNamespaceMapper: map[string]string{},
ClusterIDToRemoteNamespaceMapper: map[string]string{},
}

go mgr.GetCache().Start(ctx)
})

AfterEach(func() {
cancel()

err := cluster.GetEnv().Stop()
if err != nil {
By(err.Error())
os.Exit(1)
}
})

Context("Outgoing Resource Replication", func() {

type outgoingReplicationTestcase struct {
resource *unstructured.Unstructured
registeredResources []configv1alpha1.Resource
peeringPhases map[string]consts.PeeringPhase
expectedError types.GomegaMatcher
}

DescribeTable("Filter resources to replicate to the remote cluster",
func(c outgoingReplicationTestcase) {
controller.RegisteredResources = c.registeredResources
controller.peeringPhases = c.peeringPhases

controller.AddedHandler(c.resource, gvr)

_, err := controller.LocalDynClient.Resource(gvr).
Get(context.TODO(), c.resource.GetName(), metav1.GetOptions{})
Expect(err).To(c.expectedError)
},

Entry("replicated resource", outgoingReplicationTestcase{
resource: getObj(),
registeredResources: []configv1alpha1.Resource{
{
GroupVersionResource: metav1.GroupVersionResource(
netv1alpha1.NetworkConfigGroupVersionResource),
PeeringPhase: consts.PeeringPhaseAll,
},
},
peeringPhases: map[string]consts.PeeringPhase{
remoteClusterID: consts.PeeringPhaseEstablished,
},
expectedError: BeNil(),
}),

Entry("not replicated resource (phase not enabled)", outgoingReplicationTestcase{
resource: getObj(),
registeredResources: []configv1alpha1.Resource{
{
GroupVersionResource: metav1.GroupVersionResource(
netv1alpha1.NetworkConfigGroupVersionResource),
PeeringPhase: consts.PeeringPhaseOutgoing,
},
},
peeringPhases: map[string]consts.PeeringPhase{
remoteClusterID: consts.PeeringPhaseIncoming,
},
expectedError: Not(BeNil()),
}),

Entry("not replicated resource (peering not established)", outgoingReplicationTestcase{
resource: getObj(),
registeredResources: []configv1alpha1.Resource{
{
GroupVersionResource: metav1.GroupVersionResource(
netv1alpha1.NetworkConfigGroupVersionResource),
PeeringPhase: consts.PeeringPhaseEstablished,
},
},
peeringPhases: map[string]consts.PeeringPhase{
remoteClusterID: consts.PeeringPhaseNone,
},
expectedError: Not(BeNil()),
}),
)

})

Context("Enable Outgoing Replication", func() {

It("Enable Outgoing Replication", func() {

gvr := discoveryv1alpha1.GroupVersion.WithResource("resourcerequests")

controller.RegisteredResources = []configv1alpha1.Resource{
{
GroupVersionResource: metav1.GroupVersionResource(gvr),
PeeringPhase: consts.PeeringPhaseEstablished,
},
}
controller.peeringPhases = map[string]consts.PeeringPhase{
remoteClusterID: consts.PeeringPhaseNone,
}
controller.ClusterIDToLocalNamespaceMapper[remoteClusterID] = "default"
controller.LocalToRemoteNamespaceMapper["default"] = "remote-1"
controller.ClusterIDToRemoteNamespaceMapper[remoteClusterID] = "remote-1"

// this namespace will be used as a remote cluster namespace
_, err := k8sclient.CoreV1().Namespaces().Create(ctx, &v1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: "remote-1",
},
}, metav1.CreateOptions{})
Expect(err).To(BeNil())

obj := getObjNamespaced()
obj, err = controller.LocalDynClient.Resource(gvr).Namespace("default").
Create(ctx, obj, metav1.CreateOptions{})
Expect(err).To(BeNil())

controller.checkResourcesOnPeeringPhaseChange(ctx, remoteClusterID,
consts.PeeringPhaseNone, consts.PeeringPhaseNone)

_, err = controller.LocalDynClient.Resource(gvr).Namespace("remote-1").
Get(context.TODO(), obj.GetName(), metav1.GetOptions{})
Expect(kerrors.IsNotFound(err)).To(BeTrue())

// change peering phase
controller.peeringPhases[remoteClusterID] = consts.PeeringPhaseOutgoing
controller.checkResourcesOnPeeringPhaseChange(ctx, remoteClusterID,
consts.PeeringPhaseOutgoing, consts.PeeringPhaseNone)

Eventually(func() error {
_, err = controller.LocalDynClient.Resource(gvr).Namespace("remote-1").
Get(context.TODO(), obj.GetName(), metav1.GetOptions{})
return err
}, timeout, interval).Should(BeNil())
})

})

})
2 changes: 1 addition & 1 deletion pkg/liqonet/routing/routing_suite_test.go
Expand Up @@ -68,7 +68,7 @@ var _ = BeforeSuite(func() {

//*** Gateway Route Manager Configuration ***/
// Create a dummy interface used as tunnel device.
link = &netlink.Dummy{netlink.LinkAttrs{Name: "dummy-tunnel"}}
link = &netlink.Dummy{LinkAttrs: netlink.LinkAttrs{Name: "dummy-tunnel"}}
Expect(netlink.LinkAdd(link)).To(BeNil())
tunnelDevice, err = netlink.LinkByName("dummy-tunnel")
Expect(err).To(BeNil())
Expand Down

0 comments on commit eb65c61

Please sign in to comment.