From 68493b1eac34c5b5d30db3455342001ace2e2c4f Mon Sep 17 00:00:00 2001 From: Naidenov Ivan Date: Sat, 20 Feb 2021 16:15:48 +0300 Subject: [PATCH] operator: fix problem with non-existent cluster leader (#82) --- CHANGELOG.md | 1 + pkg/controller/cluster/cluster_controller.go | 31 ++- pkg/controller/cluster/cluster_test.go | 230 +++++++++++++++++++ pkg/controller/cluster/suite_test.go | 97 ++++++++ 4 files changed, 350 insertions(+), 9 deletions(-) create mode 100644 pkg/controller/cluster/cluster_test.go create mode 100644 pkg/controller/cluster/suite_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index bf48a96d..0ca0621e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ### Fixed - Not working update of replicaset roles - Not working update of container env vars +- Problem with a non-existent leader of cluster ## [0.0.8] - 2020-12-16 diff --git a/pkg/controller/cluster/cluster_controller.go b/pkg/controller/cluster/cluster_controller.go index 6484ebeb..0bde8fcb 100644 --- a/pkg/controller/cluster/cluster_controller.go +++ b/pkg/controller/cluster/cluster_controller.go @@ -80,6 +80,21 @@ func SetInstanceUUID(o *corev1.Pod) *corev1.Pod { return o } +// Checking for a leader in the cluster Endpoint annotation +func IsLeaderExists(ep *corev1.Endpoints) bool { + leader, ok := ep.Annotations["tarantool.io/leader"] + if !ok || leader == "" { + return false + } + + for _, addr := range ep.Subsets[0].Addresses { + if leader == fmt.Sprintf("%s:%s", addr.IP, "8081") { + return true + } + } + return false +} + // Add creates a new Cluster Controller and adds it to the Manager. The Manager will set fields on the Controller // and Start it when the Manager is Started. func Add(mgr manager.Manager) error { @@ -234,14 +249,8 @@ func (r *ReconcileCluster) Reconcile(request reconcile.Request) (reconcile.Resul return reconcile.Result{RequeueAfter: time.Duration(5 * time.Second)}, nil } - leader, ok := ep.Annotations["tarantool.io/leader"] - if !ok { - if leader == "" { - reqLogger.Info("leader is not elected") - // return reconcile.Result{RequeueAfter: time.Duration(5000 * time.Millisecond)}, nil - } - - leader = fmt.Sprintf("%s:%s", ep.Subsets[0].Addresses[0].IP, "8081") + if !IsLeaderExists(ep) { + leader := fmt.Sprintf("%s:%s", ep.Subsets[0].Addresses[0].IP, "8081") if ep.Annotations == nil { ep.Annotations = make(map[string]string) @@ -262,7 +271,11 @@ func (r *ReconcileCluster) Reconcile(request reconcile.Request) (reconcile.Resul return reconcile.Result{RequeueAfter: time.Duration(5 * time.Second)}, err } - topologyClient := topology.NewBuiltInTopologyService(topology.WithTopologyEndpoint(fmt.Sprintf("http://%s/admin/api", leader)), topology.WithClusterID(cluster.GetName())) + topologyClient := topology.NewBuiltInTopologyService( + topology.WithTopologyEndpoint(fmt.Sprintf("http://%s/admin/api", ep.Annotations["tarantool.io/leader"])), + topology.WithClusterID(cluster.GetName()), + ) + for _, sts := range stsList.Items { for i := 0; i < int(*sts.Spec.Replicas); i++ { pod := &corev1.Pod{} diff --git a/pkg/controller/cluster/cluster_test.go b/pkg/controller/cluster/cluster_test.go new file mode 100644 index 00000000..dbe45ff5 --- /dev/null +++ b/pkg/controller/cluster/cluster_test.go @@ -0,0 +1,230 @@ +package cluster + +import ( + "context" + "fmt" + "time" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + helpers "github.com/tarantool/tarantool-operator/test/helpers" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + tarantoolv1alpha1 "github.com/tarantool/tarantool-operator/pkg/apis/tarantool/v1alpha1" + + "sigs.k8s.io/controller-runtime/pkg/client" +) + +var _ = Describe("cluster_controller unit testing", func() { + var ( + namespace = "default" + ctx = context.TODO() + + roleName = "" // setup for every spec in hook + rsTemplateName = "" + + clusterName = "test" + clusterId = clusterName + + defaultRolesToAssign = "[\"A\",\"B\"]" + ) + + Describe("cluster_controller manage cluster resources", func() { + BeforeEach(func() { + // setup variables for each spec + roleName = fmt.Sprintf("test-role-%s", RandStringRunes(4)) + rsTemplateName = fmt.Sprintf("test-rs-%s", RandStringRunes(4)) + + By("create new Role " + roleName) + role := helpers.NewRole(helpers.RoleParams{ + Name: roleName, + Namespace: namespace, + RolesToAssign: defaultRolesToAssign, + RsNum: int32(1), + RsTemplateName: rsTemplateName, + ClusterId: clusterId, + }) + // mock owner reference + role.SetOwnerReferences([]metav1.OwnerReference{ + { + APIVersion: "v0", + Kind: "mockRef", + Name: "mockRef", + UID: "-", + }, + }) + Expect(k8sClient.Create(ctx, &role)).NotTo(HaveOccurred(), "failed to create Role") + + By("create new Cluster " + clusterName) + cluster := helpers.NewCluster(helpers.ClusterParams{ + Name: clusterName, + Namespace: namespace, + Id: clusterId, + }) + Expect(k8sClient.Create(ctx, &cluster)).NotTo(HaveOccurred(), "failed to create Cluster") + }) + + AfterEach(func() { + By("remove role object " + roleName) + role := &tarantoolv1alpha1.Role{} + Expect( + k8sClient.Get(ctx, client.ObjectKey{Name: roleName, Namespace: namespace}, role), + ).NotTo(HaveOccurred(), "failed to get Role") + + Expect(k8sClient.Delete(ctx, role)).NotTo(HaveOccurred(), "failed to delete Role") + + By("remove Cluster object " + clusterName) + cluster := &tarantoolv1alpha1.Cluster{} + Expect( + k8sClient.Get(ctx, client.ObjectKey{Name: clusterName, Namespace: namespace}, cluster), + ).NotTo(HaveOccurred(), "failed to get Cluster") + + Expect(k8sClient.Delete(ctx, cluster)).NotTo(HaveOccurred(), "failed to delete Cluster") + }) + + Context("manage cluster leader: tarantool instance accepting admin requests", func() { + BeforeEach(func() { + By("create cluster endpoints") + ep := corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: clusterId, + Namespace: namespace, + }, + Subsets: []corev1.EndpointSubset{ + { + Addresses: []corev1.EndpointAddress{ + {IP: "1.1.1.1"}, + {IP: "2.2.2.2"}, + {IP: "3.3.3.3"}, + }, + }, + }, + } + Expect(k8sClient.Create(ctx, &ep)).NotTo(HaveOccurred(), "failed to create cluster endpoints") + }) + + AfterEach(func() { + ep := corev1.Endpoints{} + Expect( + k8sClient.Get(ctx, client.ObjectKey{Name: clusterId, Namespace: namespace}, &ep), + ).NotTo(HaveOccurred(), "failed to get cluster endpoints") + + Expect(k8sClient.Delete(ctx, &ep)).NotTo(HaveOccurred(), "failed to delete endpoints") + }) + + It("change the leader if the previous one does not exist", func() { + By("get the chosen leader") + ep := corev1.Endpoints{} + Eventually( + func() bool { + err := k8sClient.Get(ctx, client.ObjectKey{Name: clusterId, Namespace: namespace}, &ep) + if err != nil { + return false + } + + if ep.GetAnnotations()["tarantool.io/leader"] != "" { + return true + } + + return false + }, + time.Second*10, time.Millisecond*500, + ).Should(BeTrue()) + + By("save old leader") + oldLeader := ep.GetAnnotations()["tarantool.io/leader"] + + By("set all new IP addresses") + ep.Subsets = []corev1.EndpointSubset{ + { + Addresses: []corev1.EndpointAddress{ + {IP: "4.4.4.4"}, + {IP: "5.5.5.5"}, + {IP: "6.6.6.6"}, + }, + }, + } + Expect(k8sClient.Update(ctx, &ep)).NotTo(HaveOccurred(), "failed to update cluster endpoints") + + By("check that the leader has changed") + Eventually( + func() bool { + err := k8sClient.Get(ctx, client.ObjectKey{Name: clusterId, Namespace: namespace}, &ep) + if err != nil { + return false + } + + if ep.GetAnnotations()["tarantool.io/leader"] != oldLeader { + return true + } + return false + }, + time.Second*10, time.Millisecond*500, + ).Should(BeTrue()) + }) + }) + }) + + Describe("cluster_contriller unit testing functions", func() { + Describe("function IsLeaderExists must check for existence of leader in annotation of cluster Endpoints", func() { + Context("positive cases (leader exist)", func() { + It("should return True if leader assigned and exist", func() { + leaderIP := "1.1.1.1" + + ep := &corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "name", + Namespace: "namespace", + Annotations: map[string]string{ + "tarantool.io/leader": fmt.Sprintf("%s:8081", leaderIP), + }, + }, + Subsets: []corev1.EndpointSubset{ + { + Addresses: []corev1.EndpointAddress{ + {IP: leaderIP}, + }, + }, + }, + } + Expect(IsLeaderExists(ep)).To(BeTrue()) + }) + }) + + Context("negative cases (leader does not exist)", func() { + It("should return False if leader not assigned", func() { + ep := &corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "name", + Namespace: "namespace", + }, + } + Expect(IsLeaderExists(ep)).To(BeFalse()) + }) + + It("should return False if leader assigned, but IP not exists", func() { + ep := &corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "name", + Namespace: "namespace", + Annotations: map[string]string{ + "tarantool.io/leader": "6.6.6.6:8081", + }, + }, + Subsets: []corev1.EndpointSubset{ + { + Addresses: []corev1.EndpointAddress{ + {IP: "0.0.0.0"}, + }, + }, + }, + } + Expect(IsLeaderExists(ep)).To(BeFalse()) + }) + }) + }) + }) +}) diff --git a/pkg/controller/cluster/suite_test.go b/pkg/controller/cluster/suite_test.go new file mode 100644 index 00000000..09ae4d45 --- /dev/null +++ b/pkg/controller/cluster/suite_test.go @@ -0,0 +1,97 @@ +package cluster + +import ( + "math/rand" + "path/filepath" + "testing" + "time" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "github.com/operator-framework/operator-sdk/pkg/log/zap" + + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/rest" + + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/envtest" + logf "sigs.k8s.io/controller-runtime/pkg/runtime/log" + // +kubebuilder:scaffold:imports + "github.com/tarantool/tarantool-operator/pkg/apis" +) + +// These tests use Ginkgo (BDD-style Go testing framework). Refer to +// http://onsi.github.io/ginkgo/ to learn more about Ginkgo. + +var cfg *rest.Config +var k8sClient client.Client +var testEnv *envtest.Environment +var stopCh chan struct{} + +func TestRoleController(t *testing.T) { + RegisterFailHandler(Fail) + + RunSpecsWithDefaultAndCustomReporters(t, + "Cluster Controller Suite", + []Reporter{envtest.NewlineReporter{}}) +} + +var _ = BeforeSuite(func(done Done) { + logf.SetLogger(zap.LoggerTo(GinkgoWriter)) + + By("Bootstrapping test environment") + testEnv = &envtest.Environment{ + CRDDirectoryPaths: []string{filepath.Join("..", "..", "..", "ci", "helm-chart", "crds")}, + UseExistingCluster: false, + } + + var err error + cfg, err = testEnv.Start() + Expect(err).ToNot(HaveOccurred()) + Expect(cfg).ToNot(BeNil()) + + err = apis.AddToScheme(scheme.Scheme) + Expect(err).NotTo(HaveOccurred()) + + k8sClient, err = client.New(cfg, client.Options{Scheme: scheme.Scheme}) + Expect(err).ToNot(HaveOccurred()) + Expect(k8sClient).ToNot(BeNil()) + + // create channel for stopping manager + stopCh = make(chan struct{}) + + mgr, err := ctrl.NewManager(cfg, ctrl.Options{}) + Expect(err).NotTo(HaveOccurred(), "failed to create manager") + + err = Add(mgr) + Expect(err).NotTo(HaveOccurred(), "failed to setup controller") + + go func() { + err = mgr.Start(stopCh) + Expect(err).NotTo(HaveOccurred(), "failed to start manager") + }() + + close(done) +}, 60) + +var _ = AfterSuite(func() { + close(stopCh) + By("Tearing down the test environment") + err := testEnv.Stop() + Expect(err).ToNot(HaveOccurred()) +}) + +func init() { + rand.Seed(time.Now().UnixNano()) +} + +var letterRunes = []rune("abcdefghijklmnopqrstuvwxyz") + +func RandStringRunes(n int) string { + b := make([]rune, n) + for i := range b { + b[i] = letterRunes[rand.Intn(len(letterRunes))] + } + return string(b) +}