Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
### Fixed
- Not working update of replicaset roles
- Not working update of container env vars
- Problem with a non-existent leader of cluster

## [0.0.8] - 2020-12-16

Expand Down
31 changes: 22 additions & 9 deletions pkg/controller/cluster/cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,21 @@ func SetInstanceUUID(o *corev1.Pod) *corev1.Pod {
return o
}

// Checking for a leader in the cluster Endpoint annotation
func IsLeaderExists(ep *corev1.Endpoints) bool {
leader, ok := ep.Annotations["tarantool.io/leader"]
if !ok || leader == "" {
return false
}

for _, addr := range ep.Subsets[0].Addresses {
if leader == fmt.Sprintf("%s:%s", addr.IP, "8081") {
return true
}
}
return false
}

// Add creates a new Cluster Controller and adds it to the Manager. The Manager will set fields on the Controller
// and Start it when the Manager is Started.
func Add(mgr manager.Manager) error {
Expand Down Expand Up @@ -234,14 +249,8 @@ func (r *ReconcileCluster) Reconcile(request reconcile.Request) (reconcile.Resul
return reconcile.Result{RequeueAfter: time.Duration(5 * time.Second)}, nil
}

leader, ok := ep.Annotations["tarantool.io/leader"]
if !ok {
if leader == "" {
reqLogger.Info("leader is not elected")
// return reconcile.Result{RequeueAfter: time.Duration(5000 * time.Millisecond)}, nil
}

leader = fmt.Sprintf("%s:%s", ep.Subsets[0].Addresses[0].IP, "8081")
if !IsLeaderExists(ep) {
leader := fmt.Sprintf("%s:%s", ep.Subsets[0].Addresses[0].IP, "8081")

if ep.Annotations == nil {
ep.Annotations = make(map[string]string)
Expand All @@ -262,7 +271,11 @@ func (r *ReconcileCluster) Reconcile(request reconcile.Request) (reconcile.Resul
return reconcile.Result{RequeueAfter: time.Duration(5 * time.Second)}, err
}

topologyClient := topology.NewBuiltInTopologyService(topology.WithTopologyEndpoint(fmt.Sprintf("http://%s/admin/api", leader)), topology.WithClusterID(cluster.GetName()))
topologyClient := topology.NewBuiltInTopologyService(
topology.WithTopologyEndpoint(fmt.Sprintf("http://%s/admin/api", ep.Annotations["tarantool.io/leader"])),
topology.WithClusterID(cluster.GetName()),
)

for _, sts := range stsList.Items {
for i := 0; i < int(*sts.Spec.Replicas); i++ {
pod := &corev1.Pod{}
Expand Down
230 changes: 230 additions & 0 deletions pkg/controller/cluster/cluster_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
package cluster

import (
"context"
"fmt"
"time"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"

helpers "github.com/tarantool/tarantool-operator/test/helpers"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

tarantoolv1alpha1 "github.com/tarantool/tarantool-operator/pkg/apis/tarantool/v1alpha1"

"sigs.k8s.io/controller-runtime/pkg/client"
)

var _ = Describe("cluster_controller unit testing", func() {
var (
namespace = "default"
ctx = context.TODO()

roleName = "" // setup for every spec in hook
rsTemplateName = ""

clusterName = "test"
clusterId = clusterName

defaultRolesToAssign = "[\"A\",\"B\"]"
)

Describe("cluster_controller manage cluster resources", func() {
BeforeEach(func() {
// setup variables for each spec
roleName = fmt.Sprintf("test-role-%s", RandStringRunes(4))
rsTemplateName = fmt.Sprintf("test-rs-%s", RandStringRunes(4))

By("create new Role " + roleName)
role := helpers.NewRole(helpers.RoleParams{
Name: roleName,
Namespace: namespace,
RolesToAssign: defaultRolesToAssign,
RsNum: int32(1),
RsTemplateName: rsTemplateName,
ClusterId: clusterId,
})
// mock owner reference
role.SetOwnerReferences([]metav1.OwnerReference{
{
APIVersion: "v0",
Kind: "mockRef",
Name: "mockRef",
UID: "-",
},
})
Expect(k8sClient.Create(ctx, &role)).NotTo(HaveOccurred(), "failed to create Role")

By("create new Cluster " + clusterName)
cluster := helpers.NewCluster(helpers.ClusterParams{
Name: clusterName,
Namespace: namespace,
Id: clusterId,
})
Expect(k8sClient.Create(ctx, &cluster)).NotTo(HaveOccurred(), "failed to create Cluster")
})

AfterEach(func() {
By("remove role object " + roleName)
role := &tarantoolv1alpha1.Role{}
Expect(
k8sClient.Get(ctx, client.ObjectKey{Name: roleName, Namespace: namespace}, role),
).NotTo(HaveOccurred(), "failed to get Role")

Expect(k8sClient.Delete(ctx, role)).NotTo(HaveOccurred(), "failed to delete Role")

By("remove Cluster object " + clusterName)
cluster := &tarantoolv1alpha1.Cluster{}
Expect(
k8sClient.Get(ctx, client.ObjectKey{Name: clusterName, Namespace: namespace}, cluster),
).NotTo(HaveOccurred(), "failed to get Cluster")

Expect(k8sClient.Delete(ctx, cluster)).NotTo(HaveOccurred(), "failed to delete Cluster")
})

Context("manage cluster leader: tarantool instance accepting admin requests", func() {
BeforeEach(func() {
By("create cluster endpoints")
ep := corev1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: clusterId,
Namespace: namespace,
},
Subsets: []corev1.EndpointSubset{
{
Addresses: []corev1.EndpointAddress{
{IP: "1.1.1.1"},
{IP: "2.2.2.2"},
{IP: "3.3.3.3"},
},
},
},
}
Expect(k8sClient.Create(ctx, &ep)).NotTo(HaveOccurred(), "failed to create cluster endpoints")
})

AfterEach(func() {
ep := corev1.Endpoints{}
Expect(
k8sClient.Get(ctx, client.ObjectKey{Name: clusterId, Namespace: namespace}, &ep),
).NotTo(HaveOccurred(), "failed to get cluster endpoints")

Expect(k8sClient.Delete(ctx, &ep)).NotTo(HaveOccurred(), "failed to delete endpoints")
})

It("change the leader if the previous one does not exist", func() {
By("get the chosen leader")
ep := corev1.Endpoints{}
Eventually(
func() bool {
err := k8sClient.Get(ctx, client.ObjectKey{Name: clusterId, Namespace: namespace}, &ep)
if err != nil {
return false
}

if ep.GetAnnotations()["tarantool.io/leader"] != "" {
return true
}

return false
},
time.Second*10, time.Millisecond*500,
).Should(BeTrue())

By("save old leader")
oldLeader := ep.GetAnnotations()["tarantool.io/leader"]

By("set all new IP addresses")
ep.Subsets = []corev1.EndpointSubset{
{
Addresses: []corev1.EndpointAddress{
{IP: "4.4.4.4"},
{IP: "5.5.5.5"},
{IP: "6.6.6.6"},
},
},
}
Expect(k8sClient.Update(ctx, &ep)).NotTo(HaveOccurred(), "failed to update cluster endpoints")

By("check that the leader has changed")
Eventually(
func() bool {
err := k8sClient.Get(ctx, client.ObjectKey{Name: clusterId, Namespace: namespace}, &ep)
if err != nil {
return false
}

if ep.GetAnnotations()["tarantool.io/leader"] != oldLeader {
return true
}
return false
},
time.Second*10, time.Millisecond*500,
).Should(BeTrue())
})
})
})

Describe("cluster_contriller unit testing functions", func() {
Describe("function IsLeaderExists must check for existence of leader in annotation of cluster Endpoints", func() {
Context("positive cases (leader exist)", func() {
It("should return True if leader assigned and exist", func() {
leaderIP := "1.1.1.1"

ep := &corev1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "name",
Namespace: "namespace",
Annotations: map[string]string{
"tarantool.io/leader": fmt.Sprintf("%s:8081", leaderIP),
},
},
Subsets: []corev1.EndpointSubset{
{
Addresses: []corev1.EndpointAddress{
{IP: leaderIP},
},
},
},
}
Expect(IsLeaderExists(ep)).To(BeTrue())
})
})

Context("negative cases (leader does not exist)", func() {
It("should return False if leader not assigned", func() {
ep := &corev1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "name",
Namespace: "namespace",
},
}
Expect(IsLeaderExists(ep)).To(BeFalse())
})

It("should return False if leader assigned, but IP not exists", func() {
ep := &corev1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "name",
Namespace: "namespace",
Annotations: map[string]string{
"tarantool.io/leader": "6.6.6.6:8081",
},
},
Subsets: []corev1.EndpointSubset{
{
Addresses: []corev1.EndpointAddress{
{IP: "0.0.0.0"},
},
},
},
}
Expect(IsLeaderExists(ep)).To(BeFalse())
})
})
})
})
})
97 changes: 97 additions & 0 deletions pkg/controller/cluster/suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package cluster

import (
"math/rand"
"path/filepath"
"testing"
"time"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/operator-framework/operator-sdk/pkg/log/zap"

"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"

ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/envtest"
logf "sigs.k8s.io/controller-runtime/pkg/runtime/log"
// +kubebuilder:scaffold:imports
"github.com/tarantool/tarantool-operator/pkg/apis"
)

// These tests use Ginkgo (BDD-style Go testing framework). Refer to
// http://onsi.github.io/ginkgo/ to learn more about Ginkgo.

var cfg *rest.Config
var k8sClient client.Client
var testEnv *envtest.Environment
var stopCh chan struct{}

func TestRoleController(t *testing.T) {
RegisterFailHandler(Fail)

RunSpecsWithDefaultAndCustomReporters(t,
"Cluster Controller Suite",
[]Reporter{envtest.NewlineReporter{}})
}

var _ = BeforeSuite(func(done Done) {
logf.SetLogger(zap.LoggerTo(GinkgoWriter))

By("Bootstrapping test environment")
testEnv = &envtest.Environment{
CRDDirectoryPaths: []string{filepath.Join("..", "..", "..", "ci", "helm-chart", "crds")},
UseExistingCluster: false,
}

var err error
cfg, err = testEnv.Start()
Expect(err).ToNot(HaveOccurred())
Expect(cfg).ToNot(BeNil())

err = apis.AddToScheme(scheme.Scheme)
Expect(err).NotTo(HaveOccurred())

k8sClient, err = client.New(cfg, client.Options{Scheme: scheme.Scheme})
Expect(err).ToNot(HaveOccurred())
Expect(k8sClient).ToNot(BeNil())

// create channel for stopping manager
stopCh = make(chan struct{})

mgr, err := ctrl.NewManager(cfg, ctrl.Options{})
Expect(err).NotTo(HaveOccurred(), "failed to create manager")

err = Add(mgr)
Expect(err).NotTo(HaveOccurred(), "failed to setup controller")

go func() {
err = mgr.Start(stopCh)
Expect(err).NotTo(HaveOccurred(), "failed to start manager")
}()

close(done)
}, 60)

var _ = AfterSuite(func() {
close(stopCh)
By("Tearing down the test environment")
err := testEnv.Stop()
Expect(err).ToNot(HaveOccurred())
})

func init() {
rand.Seed(time.Now().UnixNano())
}

var letterRunes = []rune("abcdefghijklmnopqrstuvwxyz")

func RandStringRunes(n int) string {
b := make([]rune, n)
for i := range b {
b[i] = letterRunes[rand.Intn(len(letterRunes))]
}
return string(b)
}