Skip to content

Commit

Permalink
integration: extract statefulset stub to separate package
Browse files Browse the repository at this point in the history
  • Loading branch information
zimnx committed Nov 16, 2020
1 parent fb43ba7 commit c9c7384
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 57 deletions.
61 changes: 6 additions & 55 deletions pkg/controllers/cluster/controller_integration_test.go
Expand Up @@ -5,20 +5,19 @@ package cluster_test

import (
"context"
"fmt"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
scyllav1alpha1 "github.com/scylladb/scylla-operator/pkg/api/v1alpha1"
"github.com/scylladb/scylla-operator/pkg/naming"
"github.com/scylladb/scylla-operator/pkg/test/integration"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/utils/pointer"
"sigs.k8s.io/controller-runtime/pkg/client"
)

var _ = Describe("Cluster controller", func() {
Expand All @@ -45,11 +44,14 @@ var _ = Describe("Cluster controller", func() {

Expect(waitForCluster(ctx, scylla)).To(Succeed())

Expect(testEnv.Refresh(ctx, scylla)).To(Succeed())
sst := integration.NewStatefulSetOperatorStub(GinkgoT(), testEnv, retryInterval)
sst.Start(ctx, scylla.Name, scylla.Namespace)

// Cluster should be scaled sequentially up to 3
for _, rack := range scylla.Spec.Datacenter.Racks {
for _, replicas := range clusterScaleSteps(rack.Members) {
Expect(assertRackScaled(ctx, rack, scylla, replicas)).To(Succeed())
Expect(createFakePods(ctx, testEnv, rack, scylla)).To(Succeed())
}
}

Expand Down Expand Up @@ -151,57 +153,6 @@ func assertRackScaled(ctx context.Context, rack scyllav1alpha1.RackSpec, cluster
if err != nil {
return false, err
}

return *sts.Spec.Replicas == replicas, nil
return *sts.Spec.Replicas >= replicas, nil
})
}

func createFakePods(ctx context.Context, client client.Client, rack scyllav1alpha1.RackSpec, cluster *scyllav1alpha1.ScyllaCluster) error {
sts, err := statefulSetOfRack(ctx, rack, cluster)
if err != nil {
return err
}

podTemplate := corev1.Pod{
TypeMeta: metav1.TypeMeta{
Kind: "Pod",
APIVersion: "v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: "template",
Namespace: sts.Namespace,
Labels: naming.RackLabels(rack, cluster),
},
Spec: sts.Spec.Template.Spec,
Status: corev1.PodStatus{
Phase: corev1.PodRunning,
},
}

for i, c := range podTemplate.Spec.Containers {
podTemplate.Spec.Containers[i].VolumeMounts = []corev1.VolumeMount{}
podTemplate.Status.ContainerStatuses = append(podTemplate.Status.ContainerStatuses, corev1.ContainerStatus{
Name: c.Name,
Ready: true,
})
}

for i := sts.Status.Replicas; i < *sts.Spec.Replicas; i++ {
pod := podTemplate.DeepCopy()
pod.Name = fmt.Sprintf("%s-%d", sts.Name, i)
pod.Spec.Hostname = pod.Name
pod.Spec.Subdomain = cluster.Name
if err := client.Create(ctx, pod); err != nil {
return err
}
}

sts.Status.Replicas = *sts.Spec.Replicas
sts.Status.ReadyReplicas = *sts.Spec.Replicas
sts.Status.ObservedGeneration = sts.Generation
if err := client.Status().Update(ctx, sts); err != nil {
return err
}

return nil
}
2 changes: 1 addition & 1 deletion pkg/controllers/cluster/main_integration_test.go
Expand Up @@ -49,7 +49,7 @@ func TestMain(m *testing.M) {

logger.Info(ctx, "Creating test environment")
var err error
testEnv, err = integration.NewTestEnvironment(
testEnv, err = integration.NewTestEnvironment(logger.Named("env"),
integration.WithPollRetryInterval(retryInterval),
integration.WithPollTimeout(timeout),
)
Expand Down
5 changes: 4 additions & 1 deletion pkg/test/integration/envtest.go
Expand Up @@ -16,6 +16,7 @@ import (
"time"

"github.com/pkg/errors"
"github.com/scylladb/go-log"
scyllav1alpha1 "github.com/scylladb/scylla-operator/pkg/api/v1alpha1"
utilyaml "github.com/scylladb/scylla-operator/pkg/util/yaml"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -58,6 +59,7 @@ type TestEnvironment struct {
Client
Config *rest.Config

logger log.Logger
cancel context.CancelFunc
}

Expand All @@ -81,7 +83,7 @@ func WithPollTimeout(timeout time.Duration) func(*option) {
}

// NewTestEnvironment creates a new environment spinning up a local api-server.
func NewTestEnvironment(options ...EnvOption) (*TestEnvironment, error) {
func NewTestEnvironment(logger log.Logger, options ...EnvOption) (*TestEnvironment, error) {

envOpts := &option{
pollRetryInterval: 200 * time.Millisecond,
Expand Down Expand Up @@ -126,6 +128,7 @@ func NewTestEnvironment(options ...EnvOption) (*TestEnvironment, error) {
Timeout: envOpts.pollTimeout,
},
Config: mgr.GetConfig(),
logger: logger,
}, nil
}

Expand Down
124 changes: 124 additions & 0 deletions pkg/test/integration/statefulset.go
@@ -0,0 +1,124 @@
// Copyright (C) 2017 ScyllaDB

package integration

import (
"context"
"fmt"
"time"

"github.com/onsi/ginkgo"
"github.com/scylladb/go-log"
scyllav1alpha1 "github.com/scylladb/scylla-operator/pkg/api/v1alpha1"
"github.com/scylladb/scylla-operator/pkg/naming"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
)

type statefulSetOperatorStub struct {
t ginkgo.GinkgoTInterface
env *TestEnvironment
interval time.Duration
logger log.Logger
}

func NewStatefulSetOperatorStub(t ginkgo.GinkgoTInterface, env *TestEnvironment, interval time.Duration) *statefulSetOperatorStub {
return &statefulSetOperatorStub{
t: t,
env: env,
interval: interval,
logger: env.logger.Named("sts_stub"),
}
}

func (s *statefulSetOperatorStub) Start(ctx context.Context, name, namespace string) {
go func() {
ticker := time.NewTicker(s.interval)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
cluster := &scyllav1alpha1.ScyllaCluster{}
if err := s.env.Get(ctx, client.ObjectKey{
Name: name,
Namespace: namespace,
}, cluster); err != nil {
s.t.Errorf("refresh scylla cluster obj, err: %s", err)
continue
}

if err := s.syncStatefulSet(ctx, cluster); err != nil {
s.t.Errorf("sync statefulset, err: %s", err)
}
}
}
}()
}

func (s *statefulSetOperatorStub) syncStatefulSet(ctx context.Context, cluster *scyllav1alpha1.ScyllaCluster) error {
stss := &appsv1.StatefulSetList{}
err := s.env.List(ctx, stss, &client.ListOptions{Namespace: cluster.Namespace})
if err != nil {
return err
}

for _, sts := range stss.Items {
var rack scyllav1alpha1.RackSpec
for _, r := range cluster.Spec.Datacenter.Racks {
if sts.Name == naming.StatefulSetNameForRack(r, cluster) {
rack = r
break
}
}

podTemplate := corev1.Pod{
TypeMeta: metav1.TypeMeta{
Kind: "Pod",
APIVersion: "v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: "template",
Namespace: sts.Namespace,
Labels: naming.RackLabels(rack, cluster),
},
Spec: sts.Spec.Template.Spec,
Status: corev1.PodStatus{
Phase: corev1.PodRunning,
},
}

for i, c := range podTemplate.Spec.Containers {
podTemplate.Spec.Containers[i].VolumeMounts = []corev1.VolumeMount{}
podTemplate.Status.ContainerStatuses = append(podTemplate.Status.ContainerStatuses, corev1.ContainerStatus{
Name: c.Name,
Ready: true,
})
}

for i := sts.Status.Replicas; i < *sts.Spec.Replicas; i++ {
pod := podTemplate.DeepCopy()
pod.Name = fmt.Sprintf("%s-%d", sts.Name, i)
pod.Spec.Hostname = pod.Name
pod.Spec.Subdomain = cluster.Name
s.logger.Info(ctx, "Spawning fake Pod", "sts", sts.Name, "pod", pod.Name)
if err := s.env.Create(ctx, pod); err != nil {
return err
}
}

sts.Status.Replicas = *sts.Spec.Replicas
sts.Status.ReadyReplicas = *sts.Spec.Replicas
sts.Status.ObservedGeneration = sts.Generation
s.logger.Info(ctx, "Updating StatefulSet status", "replicas", sts.Status.Replicas, "observed_generation", sts.Status.ObservedGeneration)
if err := s.env.Status().Update(ctx, &sts); err != nil {
return err
}
}

return nil
}

0 comments on commit c9c7384

Please sign in to comment.