diff --git a/internal/crdReplicator/crdReplicator-operator_test.go b/internal/crdReplicator/crdReplicator-operator_test.go index 96cfe453cd..dda8c06814 100644 --- a/internal/crdReplicator/crdReplicator-operator_test.go +++ b/internal/crdReplicator/crdReplicator-operator_test.go @@ -2,6 +2,7 @@ package crdreplicator import ( "context" + "os" "testing" "time" @@ -13,6 +14,7 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/dynamic" "k8s.io/client-go/dynamic/dynamicinformer" + "k8s.io/klog/v2" configv1alpha1 "github.com/liqotech/liqo/apis/config/v1alpha1" discoveryv1alpha1 "github.com/liqotech/liqo/apis/discovery/v1alpha1" @@ -51,6 +53,34 @@ func getObj() *unstructured.Unstructured { return networkConfig } +func getObjNamespaced() *unstructured.Unstructured { + resourceRequest := &discoveryv1alpha1.ResourceRequest{ + TypeMeta: metav1.TypeMeta{ + Kind: "ResourceRequest", + APIVersion: discoveryv1alpha1.GroupVersion.String(), + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "resourcerequest", + Namespace: "default", + }, + Spec: discoveryv1alpha1.ResourceRequestSpec{ + AuthURL: "https://example.com", + ClusterIdentity: discoveryv1alpha1.ClusterIdentity{ + ClusterID: "id", + }, + }, + } + resourceRequest.SetLabels(getLabels()) + obj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(resourceRequest) + if err != nil { + klog.Error(err) + os.Exit(1) + } + return &unstructured.Unstructured{ + Object: obj, + } +} + func getLabels() map[string]string { return map[string]string{ LocalLabelSelector: "true", @@ -78,10 +108,22 @@ func getCRDReplicator() Controller { IdentityManager: identitymanager.NewCertificateIdentityManager(k8sclient, clusterIDInterface, tenantmanager), LocalToRemoteNamespaceMapper: map[string]string{}, RemoteToLocalNamespaceMapper: map[string]string{}, + ClusterIDToLocalNamespaceMapper: map[string]string{}, ClusterIDToRemoteNamespaceMapper: map[string]string{}, } } +func setupReplication(d *Controller, ownership consts.OwnershipType) { + d.ClusterIDToLocalNamespaceMapper["testRemoteClusterID"] = "default" + d.RegisteredResources = []configv1alpha1.Resource{ + { + GroupVersionResource: metav1.GroupVersionResource(netv1alpha1.NetworkConfigGroupVersionResource), + PeeringPhase: consts.PeeringPhaseAll, + Ownership: ownership, + }, + } +} + func TestCRDReplicatorReconciler_CreateResource(t *testing.T) { networkConfig := getObj() d := getCRDReplicator() @@ -266,6 +308,8 @@ func TestCRDReplicatorReconciler_StartAndStopWatchers(t *testing.T) { func TestCRDReplicatorReconciler_AddedHandler(t *testing.T) { d := getCRDReplicator() + setupReplication(&d, consts.OwnershipShared) + //test 1 //adding a resource kind that exists on the cluster //we expect the resource to be created @@ -289,6 +333,7 @@ func TestCRDReplicatorReconciler_AddedHandler(t *testing.T) { } func TestCRDReplicatorReconciler_ModifiedHandler(t *testing.T) { d := getCRDReplicator() + setupReplication(&d, consts.OwnershipLocal) //test 1 //the modified resource does not exist on the cluster @@ -322,7 +367,7 @@ func TestCRDReplicatorReconciler_ModifiedHandler(t *testing.T) { assert.Nil(t, err) obj.SetLabels(test1.GetLabels()) d.ModifiedHandler(obj, gvr) - time.Sleep(10 * time.Second) + time.Sleep(1 * time.Second) newObj, err := dynClient.Resource(gvr).Get(context.TODO(), test1.GetName(), metav1.GetOptions{}) assert.Nil(t, err, "error should be empty") assert.True(t, areEqual(newObj, obj), "the two objects should be equal") @@ -333,6 +378,7 @@ func TestCRDReplicatorReconciler_ModifiedHandler(t *testing.T) { func TestCRDReplicatorReconciler_RemoteResourceModifiedHandler(t *testing.T) { d := getCRDReplicator() + setupReplication(&d, consts.OwnershipShared) //test 1 //the modified resource does not exist on the cluster @@ -358,6 +404,23 @@ func TestCRDReplicatorReconciler_RemoteResourceModifiedHandler(t *testing.T) { assert.Nil(t, err, "error should be empty") assert.NotEqual(t, obj.GetLabels(), test1.GetLabels(), "the labels of the two objects should be ") + // test 3 + // the modified resource already exists on the cluster + // we modify some fields in the status + // we expect the resource to be modified and the error to be nil + test1, err = dynClient.Resource(gvr).Get(context.TODO(), test1.GetName(), metav1.GetOptions{}) + assert.Nil(t, err, "error should be nil") + newStatus := map[string]interface{}{ + "processed": true, + } + err = unstructured.SetNestedMap(obj.Object, newStatus, "status") + assert.Nil(t, err, "error should be nil") + d.RemoteResourceModifiedHandler(test1, gvr, remoteClusterID, consts.OwnershipShared) + time.Sleep(1 * time.Second) + obj, err = dynClient.Resource(gvr).Get(context.TODO(), test1.GetName(), metav1.GetOptions{}) + assert.Nil(t, err, "error should be empty") + assert.Equal(t, obj, test1, "the two objects should be equal") + //clean up the resource err = dynClient.Resource(gvr).Delete(context.TODO(), test1.GetName(), metav1.DeleteOptions{}) assert.Nil(t, err, "should be nil") diff --git a/internal/crdReplicator/peeringPhase_test.go b/internal/crdReplicator/peeringPhase_test.go index 7871b12dea..22d4f76533 100644 --- a/internal/crdReplicator/peeringPhase_test.go +++ b/internal/crdReplicator/peeringPhase_test.go @@ -1,21 +1,46 @@ package crdreplicator import ( + "context" + "os" + "path/filepath" + "strings" "testing" + "time" . "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo/extensions/table" . "github.com/onsi/gomega" + "github.com/onsi/gomega/types" + v1 "k8s.io/api/core/v1" + kerrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/dynamic/dynamicinformer" + "k8s.io/client-go/kubernetes" + "sigs.k8s.io/controller-runtime/pkg/manager" + configv1alpha1 "github.com/liqotech/liqo/apis/config/v1alpha1" discoveryv1alpha1 "github.com/liqotech/liqo/apis/discovery/v1alpha1" + netv1alpha1 "github.com/liqotech/liqo/apis/net/v1alpha1" + "github.com/liqotech/liqo/pkg/clusterid" "github.com/liqotech/liqo/pkg/consts" + identitymanager "github.com/liqotech/liqo/pkg/identityManager" + tenantcontrolnamespace "github.com/liqotech/liqo/pkg/tenantControlNamespace" + testUtils "github.com/liqotech/liqo/pkg/utils/testUtils" ) -func TestDiscovery(t *testing.T) { +func TestPeeringPhase(t *testing.T) { RegisterFailHandler(Fail) RunSpecs(t, "PeeringPhase") } +const ( + timeout = time.Second * 30 + interval = time.Millisecond * 250 +) + var _ = Describe("PeeringPhase", func() { Context("getPeeringPhase", func() { @@ -89,3 +114,201 @@ var _ = Describe("PeeringPhase", func() { }) }) + +var _ = Describe("PeeringPhase-Based Replication", func() { + + var ( + cluster testUtils.Cluster + controller Controller + mgr manager.Manager + ctx context.Context + cancel context.CancelFunc + ) + + BeforeEach(func() { + ctx, cancel = context.WithCancel(context.Background()) + + var err error + cluster, mgr, err = testUtils.NewTestCluster([]string{filepath.Join("..", "..", "deployments", "liqo", "crds")}) + if err != nil { + By(err.Error()) + os.Exit(1) + } + + k8sclient = kubernetes.NewForConfigOrDie(mgr.GetConfig()) + + tenantmanager := tenantcontrolnamespace.NewTenantControlNamespaceManager(k8sclient) + clusterIDInterface := clusterid.NewStaticClusterID(localClusterID) + + dynClient := dynamic.NewForConfigOrDie(mgr.GetConfig()) + dynFac := dynamicinformer.NewFilteredDynamicSharedInformerFactory(dynClient, ResyncPeriod, metav1.NamespaceAll, func(options *metav1.ListOptions) { + //we want to watch only the resources that have been created by us on the remote cluster + if options.LabelSelector == "" { + newLabelSelector := []string{RemoteLabelSelector, "=", localClusterID} + options.LabelSelector = strings.Join(newLabelSelector, "") + } else { + newLabelSelector := []string{options.LabelSelector, RemoteLabelSelector, "=", localClusterID} + options.LabelSelector = strings.Join(newLabelSelector, "") + } + }) + + localDynFac := dynamicinformer.NewFilteredDynamicSharedInformerFactory(dynClient, ResyncPeriod, metav1.NamespaceAll, nil) + + controller = Controller{ + Scheme: mgr.GetScheme(), + Client: mgr.GetClient(), + ClusterID: localClusterID, + RemoteDynClients: map[string]dynamic.Interface{remoteClusterID: dynClient}, + RemoteDynSharedInformerFactory: map[string]dynamicinformer.DynamicSharedInformerFactory{remoteClusterID: dynFac}, + RegisteredResources: nil, + UnregisteredResources: nil, + RemoteWatchers: map[string]map[string]chan struct{}{}, + LocalDynClient: dynClient, + LocalDynSharedInformerFactory: localDynFac, + LocalWatchers: map[string]chan struct{}{}, + + UseNewAuth: false, + NamespaceManager: tenantmanager, + IdentityManager: identitymanager.NewCertificateIdentityManager(k8sclient, clusterIDInterface, tenantmanager), + LocalToRemoteNamespaceMapper: map[string]string{}, + RemoteToLocalNamespaceMapper: map[string]string{}, + ClusterIDToLocalNamespaceMapper: map[string]string{}, + ClusterIDToRemoteNamespaceMapper: map[string]string{}, + } + + go mgr.GetCache().Start(ctx) + }) + + AfterEach(func() { + cancel() + + err := cluster.GetEnv().Stop() + if err != nil { + By(err.Error()) + os.Exit(1) + } + }) + + Context("Outgoing Resource Replication", func() { + + type outgoingReplicationTestcase struct { + resource *unstructured.Unstructured + registeredResources []configv1alpha1.Resource + peeringPhases map[string]consts.PeeringPhase + expectedError types.GomegaMatcher + } + + DescribeTable("Filter resources to replicate to the remote cluster", + func(c outgoingReplicationTestcase) { + controller.RegisteredResources = c.registeredResources + controller.peeringPhases = c.peeringPhases + + controller.AddedHandler(c.resource, gvr) + + _, err := controller.LocalDynClient.Resource(gvr). + Get(context.TODO(), c.resource.GetName(), metav1.GetOptions{}) + Expect(err).To(c.expectedError) + }, + + Entry("replicated resource", outgoingReplicationTestcase{ + resource: getObj(), + registeredResources: []configv1alpha1.Resource{ + { + GroupVersionResource: metav1.GroupVersionResource( + netv1alpha1.NetworkConfigGroupVersionResource), + PeeringPhase: consts.PeeringPhaseAll, + }, + }, + peeringPhases: map[string]consts.PeeringPhase{ + remoteClusterID: consts.PeeringPhaseEstablished, + }, + expectedError: BeNil(), + }), + + Entry("not replicated resource (phase not enabled)", outgoingReplicationTestcase{ + resource: getObj(), + registeredResources: []configv1alpha1.Resource{ + { + GroupVersionResource: metav1.GroupVersionResource( + netv1alpha1.NetworkConfigGroupVersionResource), + PeeringPhase: consts.PeeringPhaseOutgoing, + }, + }, + peeringPhases: map[string]consts.PeeringPhase{ + remoteClusterID: consts.PeeringPhaseIncoming, + }, + expectedError: Not(BeNil()), + }), + + Entry("not replicated resource (peering not established)", outgoingReplicationTestcase{ + resource: getObj(), + registeredResources: []configv1alpha1.Resource{ + { + GroupVersionResource: metav1.GroupVersionResource( + netv1alpha1.NetworkConfigGroupVersionResource), + PeeringPhase: consts.PeeringPhaseEstablished, + }, + }, + peeringPhases: map[string]consts.PeeringPhase{ + remoteClusterID: consts.PeeringPhaseNone, + }, + expectedError: Not(BeNil()), + }), + ) + + }) + + Context("Enable Outgoing Replication", func() { + + It("Enable Outgoing Replication", func() { + + gvr := discoveryv1alpha1.GroupVersion.WithResource("resourcerequests") + + controller.RegisteredResources = []configv1alpha1.Resource{ + { + GroupVersionResource: metav1.GroupVersionResource(gvr), + PeeringPhase: consts.PeeringPhaseEstablished, + }, + } + controller.peeringPhases = map[string]consts.PeeringPhase{ + remoteClusterID: consts.PeeringPhaseNone, + } + controller.ClusterIDToLocalNamespaceMapper[remoteClusterID] = "default" + controller.LocalToRemoteNamespaceMapper["default"] = "remote-1" + controller.ClusterIDToRemoteNamespaceMapper[remoteClusterID] = "remote-1" + + // this namespace will be used as a remote cluster namespace + _, err := k8sclient.CoreV1().Namespaces().Create(ctx, &v1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: "remote-1", + }, + }, metav1.CreateOptions{}) + Expect(err).To(BeNil()) + + obj := getObjNamespaced() + obj, err = controller.LocalDynClient.Resource(gvr).Namespace("default"). + Create(ctx, obj, metav1.CreateOptions{}) + Expect(err).To(BeNil()) + + controller.checkResourcesOnPeeringPhaseChange(ctx, remoteClusterID, + consts.PeeringPhaseNone, consts.PeeringPhaseNone) + + _, err = controller.LocalDynClient.Resource(gvr).Namespace("remote-1"). + Get(context.TODO(), obj.GetName(), metav1.GetOptions{}) + Expect(kerrors.IsNotFound(err)).To(BeTrue()) + + // change peering phase + controller.peeringPhases[remoteClusterID] = consts.PeeringPhaseOutgoing + controller.checkResourcesOnPeeringPhaseChange(ctx, remoteClusterID, + consts.PeeringPhaseOutgoing, consts.PeeringPhaseNone) + + Eventually(func() error { + _, err = controller.LocalDynClient.Resource(gvr).Namespace("remote-1"). + Get(context.TODO(), obj.GetName(), metav1.GetOptions{}) + return err + }, timeout, interval).Should(BeNil()) + }) + + }) + +}) diff --git a/pkg/liqonet/routing/routing_suite_test.go b/pkg/liqonet/routing/routing_suite_test.go index e0c82cd63d..2fd6d9474e 100644 --- a/pkg/liqonet/routing/routing_suite_test.go +++ b/pkg/liqonet/routing/routing_suite_test.go @@ -68,7 +68,7 @@ var _ = BeforeSuite(func() { //*** Gateway Route Manager Configuration ***/ // Create a dummy interface used as tunnel device. - link = &netlink.Dummy{netlink.LinkAttrs{Name: "dummy-tunnel"}} + link = &netlink.Dummy{LinkAttrs: netlink.LinkAttrs{Name: "dummy-tunnel"}} Expect(netlink.LinkAdd(link)).To(BeNil()) tunnelDevice, err = netlink.LinkByName("dummy-tunnel") Expect(err).To(BeNil()) diff --git a/test/unit/crdReplicator/crdReplicator-operator_test.go b/test/unit/crdReplicator/crdReplicator-operator_test.go index f6e5e27883..846580dd14 100644 --- a/test/unit/crdReplicator/crdReplicator-operator_test.go +++ b/test/unit/crdReplicator/crdReplicator-operator_test.go @@ -18,6 +18,7 @@ import ( netv1alpha1 "github.com/liqotech/liqo/apis/net/v1alpha1" crdreplicator "github.com/liqotech/liqo/internal/crdReplicator" + "github.com/liqotech/liqo/pkg/consts" ) var ( @@ -184,6 +185,7 @@ func TestReplication2(t *testing.T) { // we label it to be replicated on all the three clusters, so we expect to find it on the remote clusters // we update the status on the peering clusters and expect it to be replicated on the local cluster as well func TestReplication4(t *testing.T) { + updateOwnership(consts.OwnershipShared) time.Sleep(10 * time.Second) localResources := map[string]*netv1alpha1.TunnelEndpoint{} // we create the resource on the localcluster to be replicated on all the peeringClusters @@ -243,6 +245,7 @@ func TestReplication4(t *testing.T) { // err = dOperator.LocalDynClient.Resource(fcGVR).Delete(context.TODO(), newFc.GetName(), metav1.DeleteOptions{}) time.Sleep(3 * time.Second) + updateOwnership(consts.OwnershipLocal) } // we create a resource which type has been registered for the replication diff --git a/test/unit/crdReplicator/env_test.go b/test/unit/crdReplicator/env_test.go index 87fadbeeb4..2838c5d732 100644 --- a/test/unit/crdReplicator/env_test.go +++ b/test/unit/crdReplicator/env_test.go @@ -22,6 +22,7 @@ import ( discoveryv1alpha1 "github.com/liqotech/liqo/apis/discovery/v1alpha1" netv1alpha1 "github.com/liqotech/liqo/apis/net/v1alpha1" crdreplicator "github.com/liqotech/liqo/internal/crdReplicator" + "github.com/liqotech/liqo/pkg/consts" crdclient "github.com/liqotech/liqo/pkg/crdClient" ) @@ -186,6 +187,23 @@ func tearDown() { } } +func updateOwnership(ownership consts.OwnershipType) { + tmp, err := configClusterClient.Resource("clusterconfigs").Get("configuration", &metav1.GetOptions{}) + if err != nil { + klog.Error(err) + os.Exit(1) + } + cc, _ := tmp.(*configv1alpha1.ClusterConfig) + for i := range cc.Spec.DispatcherConfig.ResourcesToReplicate { + cc.Spec.DispatcherConfig.ResourcesToReplicate[i].Ownership = ownership + } + _, err = configClusterClient.Resource("clusterconfigs").Update("configuration", cc, &metav1.UpdateOptions{}) + if err != nil { + klog.Error(err) + os.Exit(1) + } +} + func getClusterConfig() *configv1alpha1.ClusterConfig { return &configv1alpha1.ClusterConfig{ ObjectMeta: metav1.ObjectMeta{ @@ -224,6 +242,8 @@ func getClusterConfig() *configv1alpha1.ClusterConfig { Version: netv1alpha1.GroupVersion.Version, Resource: "tunnelendpoints", }, + PeeringPhase: consts.PeeringPhaseAll, + Ownership: consts.OwnershipLocal, }}}, }, }