Skip to content

Commit

Permalink
Disable scale down
Browse files Browse the repository at this point in the history
- prevent cluster scale down from happening by checking
current number of replicas vs desired number of replicas
after running statefulSetBuilder.Update()
- return errors, logs, publish events and set ReconcileSuccess
to false if scale down request detected
  • Loading branch information
ChunyiLyu committed Mar 1, 2021
1 parent ba81e8e commit b09b47a
Show file tree
Hide file tree
Showing 6 changed files with 151 additions and 33 deletions.
2 changes: 1 addition & 1 deletion config/crd/bases/rabbitmq.com_rabbitmqclusters.yaml
Expand Up @@ -10,7 +10,7 @@ apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
annotations:
controller-gen.kubebuilder.io/version: v0.4.1
controller-gen.kubebuilder.io/version: v0.5.0
creationTimestamp: null
name: rabbitmqclusters.rabbitmq.com
spec:
Expand Down
42 changes: 30 additions & 12 deletions controllers/rabbitmqcluster_controller.go
Expand Up @@ -22,7 +22,7 @@ import (

"github.com/rabbitmq/cluster-operator/internal/resource"
"github.com/rabbitmq/cluster-operator/internal/status"
"k8s.io/apimachinery/pkg/api/errors"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/record"
Expand Down Expand Up @@ -89,7 +89,7 @@ func (r *RabbitmqClusterReconciler) Reconcile(ctx context.Context, req ctrl.Requ

if client.IgnoreNotFound(err) != nil {
return ctrl.Result{}, err
} else if errors.IsNotFound(err) {
} else if k8serrors.IsNotFound(err) {
// No need to requeue if the resource no longer exists
return ctrl.Result{}, nil
}
Expand Down Expand Up @@ -163,13 +163,31 @@ func (r *RabbitmqClusterReconciler) Reconcile(ctx context.Context, req ctrl.Requ

// only StatefulSetBuilder returns true
if builder.UpdateMayRequireStsRecreate() {
if err = r.reconcilePVC(ctx, builder, rabbitmqCluster, resource); err != nil {
rabbitmqCluster.Status.SetCondition(status.ReconcileSuccess, corev1.ConditionFalse, "FailedReconcilePVC", err.Error())
if statusErr := r.Status().Update(ctx, rabbitmqCluster); statusErr != nil {
logger.Error(statusErr, "Failed to update ReconcileSuccess condition state")
}
sts := resource.(*appsv1.StatefulSet)

current, err := r.statefulSet(ctx, rabbitmqCluster)
if client.IgnoreNotFound(err) != nil {
return ctrl.Result{}, err
}

// only checks for PVC expansion and scale down if statefulSet is created
// else continue to CreateOrUpdate()
if !k8serrors.IsNotFound(err) {
if err := builder.Update(sts); err != nil {
return ctrl.Result{}, err
}
if err = r.reconcilePVC(ctx, rabbitmqCluster, current, sts); err != nil {
rabbitmqCluster.Status.SetCondition(status.ReconcileSuccess, corev1.ConditionFalse, "FailedReconcilePVC", err.Error())
if statusErr := r.Status().Update(ctx, rabbitmqCluster); statusErr != nil {
logger.Error(statusErr, "Failed to update ReconcileSuccess condition state")
}
return ctrl.Result{}, err
}
if r.scaleDown(ctx, rabbitmqCluster, current, sts) {
// return when cluster scale down detected; unsupported operation
return ctrl.Result{}, nil
}
}
}

var operationResult controllerutil.OperationResult
Expand Down Expand Up @@ -269,7 +287,7 @@ func (r *RabbitmqClusterReconciler) updateStatus(ctx context.Context, rmq *rabbi

if !reflect.DeepEqual(rmq.Status.Conditions, oldConditions) {
if err = r.Status().Update(ctx, rmq); err != nil {
if errors.IsConflict(err) {
if k8serrors.IsConflict(err) {
logger.Info("failed to update status because of conflict; requeueing...",
"namespace", rmq.Namespace,
"name", rmq.Name)
Expand All @@ -287,17 +305,17 @@ func (r *RabbitmqClusterReconciler) getChildResources(ctx context.Context, rmq *

if err := r.Client.Get(ctx,
types.NamespacedName{Name: rmq.ChildResourceName("server"), Namespace: rmq.Namespace},
sts); err != nil && !errors.IsNotFound(err) {
sts); err != nil && !k8serrors.IsNotFound(err) {
return nil, err
} else if errors.IsNotFound(err) {
} else if k8serrors.IsNotFound(err) {
sts = nil
}

if err := r.Client.Get(ctx,
types.NamespacedName{Name: rmq.ChildResourceName(resource.ServiceSuffix), Namespace: rmq.Namespace},
endPoints); err != nil && !errors.IsNotFound(err) {
endPoints); err != nil && !k8serrors.IsNotFound(err) {
return nil, err
} else if errors.IsNotFound(err) {
} else if k8serrors.IsNotFound(err) {
endPoints = nil
}

Expand Down
4 changes: 2 additions & 2 deletions controllers/rabbitmqcluster_controller_test.go
Expand Up @@ -937,7 +937,7 @@ var _ = Describe("RabbitmqClusterController", func() {

It("updates", func() {
Expect(updateWithRetry(cluster, func(r *rabbitmqv1beta1.RabbitmqCluster) {
cluster.Spec.Override.StatefulSet.Spec.Replicas = pointer.Int32Ptr(5)
cluster.Spec.Override.StatefulSet.Spec.Replicas = pointer.Int32Ptr(15)
cluster.Spec.Override.StatefulSet.Spec.Template.Spec.Containers = []corev1.Container{
{
Name: "additional-container-2",
Expand All @@ -949,7 +949,7 @@ var _ = Describe("RabbitmqClusterController", func() {
Eventually(func() int32 {
sts := statefulSet(ctx, cluster)
return *sts.Spec.Replicas
}, 3).Should(Equal(int32(5)))
}, 3).Should(Equal(int32(15)))

Eventually(func() string {
sts := statefulSet(ctx, cluster)
Expand Down
19 changes: 1 addition & 18 deletions controllers/reconcile_persistence.go
Expand Up @@ -6,7 +6,6 @@ import (
"fmt"
"github.com/go-logr/logr"
rabbitmqv1beta1 "github.com/rabbitmq/cluster-operator/api/v1beta1"
"github.com/rabbitmq/cluster-operator/internal/resource"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -18,23 +17,7 @@ import (
"time"
)

func (r *RabbitmqClusterReconciler) reconcilePVC(ctx context.Context, builder resource.ResourceBuilder, cluster *rabbitmqv1beta1.RabbitmqCluster, resource client.Object) error {
logger := ctrl.LoggerFrom(ctx)

sts := resource.(*appsv1.StatefulSet)
current, err := r.statefulSet(ctx, cluster)

if client.IgnoreNotFound(err) != nil {
return err
} else if k8serrors.IsNotFound(err) {
logger.Info("statefulSet not created yet, skipping checks to expand PersistentVolumeClaims")
return nil
}

if err := builder.Update(sts); err != nil {
return err
}

func (r *RabbitmqClusterReconciler) reconcilePVC(ctx context.Context, cluster *rabbitmqv1beta1.RabbitmqCluster, current, sts *appsv1.StatefulSet) error {
resize, err := r.needsPVCResize(current, sts)
if err != nil {
return err
Expand Down
32 changes: 32 additions & 0 deletions controllers/reconcile_scale_down.go
@@ -0,0 +1,32 @@
package controllers

import (
"context"
"errors"
"github.com/go-logr/logr"
"github.com/rabbitmq/cluster-operator/api/v1beta1"
"github.com/rabbitmq/cluster-operator/internal/status"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
)

// cluster scale down not supported
// log error, publish warning event, and set ReconcileSuccess to false when scale down request detected
func (r *RabbitmqClusterReconciler) scaleDown(ctx context.Context, cluster *v1beta1.RabbitmqCluster, current, sts *appsv1.StatefulSet) bool {
logger := logr.FromContext(ctx)

currentReplicas := *current.Spec.Replicas
desiredReplicas := *sts.Spec.Replicas
if currentReplicas > desiredReplicas {
msg := "Cluster Scale down not supported"
reason := "UnsupportedOperation"
logger.Error(errors.New(reason), msg)
r.Recorder.Event(cluster, corev1.EventTypeWarning, reason, msg)
cluster.Status.SetCondition(status.ReconcileSuccess, corev1.ConditionFalse, reason, msg)
if statusErr := r.Status().Update(ctx, cluster); statusErr != nil {
logger.Error(statusErr, "Failed to update ReconcileSuccess condition state")
}
return true
}
return false
}
85 changes: 85 additions & 0 deletions controllers/reconcile_scale_down_test.go
@@ -0,0 +1,85 @@
package controllers_test

import (
"context"
"fmt"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
rabbitmqv1beta1 "github.com/rabbitmq/cluster-operator/api/v1beta1"
"github.com/rabbitmq/cluster-operator/internal/status"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/pointer"
runtimeClient "sigs.k8s.io/controller-runtime/pkg/client"
)

var _ = Describe("Cluster scale down", func() {
var (
cluster *rabbitmqv1beta1.RabbitmqCluster
defaultNamespace = "default"
ctx = context.Background()
)

AfterEach(func() {
Expect(client.Delete(ctx, cluster)).To(Succeed())
Eventually(func() bool {
rmq := &rabbitmqv1beta1.RabbitmqCluster{}
err := client.Get(ctx, types.NamespacedName{Name: cluster.Name, Namespace: cluster.Namespace}, rmq)
return apierrors.IsNotFound(err)
}, 5).Should(BeTrue())
})

It("does not allow cluster scale down", func() {
By("not updating statefulSet replicas", func() {
cluster = &rabbitmqv1beta1.RabbitmqCluster{
ObjectMeta: metav1.ObjectMeta{
Name: "rabbitmq-shrink",
Namespace: defaultNamespace,
},
Spec: rabbitmqv1beta1.RabbitmqClusterSpec{
Replicas: pointer.Int32Ptr(5),
},
}
Expect(client.Create(ctx, cluster)).To(Succeed())
waitForClusterCreation(ctx, cluster, client)

Expect(updateWithRetry(cluster, func(r *rabbitmqv1beta1.RabbitmqCluster) {
r.Spec.Replicas = pointer.Int32Ptr(3)
})).To(Succeed())
Consistently(func() int32 {
sts, err := clientSet.AppsV1().StatefulSets(defaultNamespace).Get(ctx, cluster.ChildResourceName("server"), metav1.GetOptions{})
Expect(err).NotTo(HaveOccurred())
return *sts.Spec.Replicas
}, 10, 1).Should(Equal(int32(5)))
})

By("setting 'Warning' events", func() {
Expect(aggregateEventMsgs(ctx, cluster, "UnsupportedOperation")).To(
ContainSubstring("Cluster Scale down not supported"))
})

By("setting ReconcileSuccess to 'false'", func() {
Eventually(func() string {
rabbit := &rabbitmqv1beta1.RabbitmqCluster{}
Expect(client.Get(ctx, runtimeClient.ObjectKey{
Name: cluster.Name,
Namespace: defaultNamespace,
}, rabbit)).To(Succeed())

for i := range rabbit.Status.Conditions {
if rabbit.Status.Conditions[i].Type == status.ReconcileSuccess {
return fmt.Sprintf(
"ReconcileSuccess status: %s, with reason: %s and message: %s",
rabbit.Status.Conditions[i].Status,
rabbit.Status.Conditions[i].Reason,
rabbit.Status.Conditions[i].Message)
}
}
return "ReconcileSuccess status: condition not present"
}, 5).Should(Equal("ReconcileSuccess status: False, " +
"with reason: UnsupportedOperation " +
"and message: Cluster Scale down not supported"))
})
})
})

0 comments on commit b09b47a

Please sign in to comment.