From b09b47a7d17c951bac162a0e38561692ebc21c03 Mon Sep 17 00:00:00 2001 From: Chunyi Lyu Date: Mon, 1 Mar 2021 15:39:53 +0000 Subject: [PATCH] Disable scale down - prevent cluster scale down from happening by checking current number of replicas vs desired number of replicas after running statefulSetBuilder.Update() - return errors, logs, publish events and set ReconcileSuccess to false if scale down request detected --- .../bases/rabbitmq.com_rabbitmqclusters.yaml | 2 +- controllers/rabbitmqcluster_controller.go | 42 ++++++--- .../rabbitmqcluster_controller_test.go | 4 +- controllers/reconcile_persistence.go | 19 +---- controllers/reconcile_scale_down.go | 32 +++++++ controllers/reconcile_scale_down_test.go | 85 +++++++++++++++++++ 6 files changed, 151 insertions(+), 33 deletions(-) create mode 100644 controllers/reconcile_scale_down.go create mode 100644 controllers/reconcile_scale_down_test.go diff --git a/config/crd/bases/rabbitmq.com_rabbitmqclusters.yaml b/config/crd/bases/rabbitmq.com_rabbitmqclusters.yaml index 34cabbedd..2ace008d3 100644 --- a/config/crd/bases/rabbitmq.com_rabbitmqclusters.yaml +++ b/config/crd/bases/rabbitmq.com_rabbitmqclusters.yaml @@ -10,7 +10,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.4.1 + controller-gen.kubebuilder.io/version: v0.5.0 creationTimestamp: null name: rabbitmqclusters.rabbitmq.com spec: diff --git a/controllers/rabbitmqcluster_controller.go b/controllers/rabbitmqcluster_controller.go index d1593f6c3..41e282f61 100644 --- a/controllers/rabbitmqcluster_controller.go +++ b/controllers/rabbitmqcluster_controller.go @@ -22,7 +22,7 @@ import ( "github.com/rabbitmq/cluster-operator/internal/resource" "github.com/rabbitmq/cluster-operator/internal/status" - "k8s.io/apimachinery/pkg/api/errors" + k8serrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/record" @@ -89,7 +89,7 @@ func (r *RabbitmqClusterReconciler) Reconcile(ctx context.Context, req ctrl.Requ if client.IgnoreNotFound(err) != nil { return ctrl.Result{}, err - } else if errors.IsNotFound(err) { + } else if k8serrors.IsNotFound(err) { // No need to requeue if the resource no longer exists return ctrl.Result{}, nil } @@ -163,13 +163,31 @@ func (r *RabbitmqClusterReconciler) Reconcile(ctx context.Context, req ctrl.Requ // only StatefulSetBuilder returns true if builder.UpdateMayRequireStsRecreate() { - if err = r.reconcilePVC(ctx, builder, rabbitmqCluster, resource); err != nil { - rabbitmqCluster.Status.SetCondition(status.ReconcileSuccess, corev1.ConditionFalse, "FailedReconcilePVC", err.Error()) - if statusErr := r.Status().Update(ctx, rabbitmqCluster); statusErr != nil { - logger.Error(statusErr, "Failed to update ReconcileSuccess condition state") - } + sts := resource.(*appsv1.StatefulSet) + + current, err := r.statefulSet(ctx, rabbitmqCluster) + if client.IgnoreNotFound(err) != nil { return ctrl.Result{}, err } + + // only checks for PVC expansion and scale down if statefulSet is created + // else continue to CreateOrUpdate() + if !k8serrors.IsNotFound(err) { + if err := builder.Update(sts); err != nil { + return ctrl.Result{}, err + } + if err = r.reconcilePVC(ctx, rabbitmqCluster, current, sts); err != nil { + rabbitmqCluster.Status.SetCondition(status.ReconcileSuccess, corev1.ConditionFalse, "FailedReconcilePVC", err.Error()) + if statusErr := r.Status().Update(ctx, rabbitmqCluster); statusErr != nil { + logger.Error(statusErr, "Failed to update ReconcileSuccess condition state") + } + return ctrl.Result{}, err + } + if r.scaleDown(ctx, rabbitmqCluster, current, sts) { + // return when cluster scale down detected; unsupported operation + return ctrl.Result{}, nil + } + } } var operationResult controllerutil.OperationResult @@ -269,7 +287,7 @@ func (r *RabbitmqClusterReconciler) updateStatus(ctx context.Context, rmq *rabbi if !reflect.DeepEqual(rmq.Status.Conditions, oldConditions) { if err = r.Status().Update(ctx, rmq); err != nil { - if errors.IsConflict(err) { + if k8serrors.IsConflict(err) { logger.Info("failed to update status because of conflict; requeueing...", "namespace", rmq.Namespace, "name", rmq.Name) @@ -287,17 +305,17 @@ func (r *RabbitmqClusterReconciler) getChildResources(ctx context.Context, rmq * if err := r.Client.Get(ctx, types.NamespacedName{Name: rmq.ChildResourceName("server"), Namespace: rmq.Namespace}, - sts); err != nil && !errors.IsNotFound(err) { + sts); err != nil && !k8serrors.IsNotFound(err) { return nil, err - } else if errors.IsNotFound(err) { + } else if k8serrors.IsNotFound(err) { sts = nil } if err := r.Client.Get(ctx, types.NamespacedName{Name: rmq.ChildResourceName(resource.ServiceSuffix), Namespace: rmq.Namespace}, - endPoints); err != nil && !errors.IsNotFound(err) { + endPoints); err != nil && !k8serrors.IsNotFound(err) { return nil, err - } else if errors.IsNotFound(err) { + } else if k8serrors.IsNotFound(err) { endPoints = nil } diff --git a/controllers/rabbitmqcluster_controller_test.go b/controllers/rabbitmqcluster_controller_test.go index 7cb6315aa..861d1e5d3 100644 --- a/controllers/rabbitmqcluster_controller_test.go +++ b/controllers/rabbitmqcluster_controller_test.go @@ -937,7 +937,7 @@ var _ = Describe("RabbitmqClusterController", func() { It("updates", func() { Expect(updateWithRetry(cluster, func(r *rabbitmqv1beta1.RabbitmqCluster) { - cluster.Spec.Override.StatefulSet.Spec.Replicas = pointer.Int32Ptr(5) + cluster.Spec.Override.StatefulSet.Spec.Replicas = pointer.Int32Ptr(15) cluster.Spec.Override.StatefulSet.Spec.Template.Spec.Containers = []corev1.Container{ { Name: "additional-container-2", @@ -949,7 +949,7 @@ var _ = Describe("RabbitmqClusterController", func() { Eventually(func() int32 { sts := statefulSet(ctx, cluster) return *sts.Spec.Replicas - }, 3).Should(Equal(int32(5))) + }, 3).Should(Equal(int32(15))) Eventually(func() string { sts := statefulSet(ctx, cluster) diff --git a/controllers/reconcile_persistence.go b/controllers/reconcile_persistence.go index 7dc2102e1..e44bc7d4c 100644 --- a/controllers/reconcile_persistence.go +++ b/controllers/reconcile_persistence.go @@ -6,7 +6,6 @@ import ( "fmt" "github.com/go-logr/logr" rabbitmqv1beta1 "github.com/rabbitmq/cluster-operator/api/v1beta1" - "github.com/rabbitmq/cluster-operator/internal/resource" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" @@ -18,23 +17,7 @@ import ( "time" ) -func (r *RabbitmqClusterReconciler) reconcilePVC(ctx context.Context, builder resource.ResourceBuilder, cluster *rabbitmqv1beta1.RabbitmqCluster, resource client.Object) error { - logger := ctrl.LoggerFrom(ctx) - - sts := resource.(*appsv1.StatefulSet) - current, err := r.statefulSet(ctx, cluster) - - if client.IgnoreNotFound(err) != nil { - return err - } else if k8serrors.IsNotFound(err) { - logger.Info("statefulSet not created yet, skipping checks to expand PersistentVolumeClaims") - return nil - } - - if err := builder.Update(sts); err != nil { - return err - } - +func (r *RabbitmqClusterReconciler) reconcilePVC(ctx context.Context, cluster *rabbitmqv1beta1.RabbitmqCluster, current, sts *appsv1.StatefulSet) error { resize, err := r.needsPVCResize(current, sts) if err != nil { return err diff --git a/controllers/reconcile_scale_down.go b/controllers/reconcile_scale_down.go new file mode 100644 index 000000000..984c59a14 --- /dev/null +++ b/controllers/reconcile_scale_down.go @@ -0,0 +1,32 @@ +package controllers + +import ( + "context" + "errors" + "github.com/go-logr/logr" + "github.com/rabbitmq/cluster-operator/api/v1beta1" + "github.com/rabbitmq/cluster-operator/internal/status" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" +) + +// cluster scale down not supported +// log error, publish warning event, and set ReconcileSuccess to false when scale down request detected +func (r *RabbitmqClusterReconciler) scaleDown(ctx context.Context, cluster *v1beta1.RabbitmqCluster, current, sts *appsv1.StatefulSet) bool { + logger := logr.FromContext(ctx) + + currentReplicas := *current.Spec.Replicas + desiredReplicas := *sts.Spec.Replicas + if currentReplicas > desiredReplicas { + msg := "Cluster Scale down not supported" + reason := "UnsupportedOperation" + logger.Error(errors.New(reason), msg) + r.Recorder.Event(cluster, corev1.EventTypeWarning, reason, msg) + cluster.Status.SetCondition(status.ReconcileSuccess, corev1.ConditionFalse, reason, msg) + if statusErr := r.Status().Update(ctx, cluster); statusErr != nil { + logger.Error(statusErr, "Failed to update ReconcileSuccess condition state") + } + return true + } + return false +} diff --git a/controllers/reconcile_scale_down_test.go b/controllers/reconcile_scale_down_test.go new file mode 100644 index 000000000..9065c9c45 --- /dev/null +++ b/controllers/reconcile_scale_down_test.go @@ -0,0 +1,85 @@ +package controllers_test + +import ( + "context" + "fmt" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + rabbitmqv1beta1 "github.com/rabbitmq/cluster-operator/api/v1beta1" + "github.com/rabbitmq/cluster-operator/internal/status" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/utils/pointer" + runtimeClient "sigs.k8s.io/controller-runtime/pkg/client" +) + +var _ = Describe("Cluster scale down", func() { + var ( + cluster *rabbitmqv1beta1.RabbitmqCluster + defaultNamespace = "default" + ctx = context.Background() + ) + + AfterEach(func() { + Expect(client.Delete(ctx, cluster)).To(Succeed()) + Eventually(func() bool { + rmq := &rabbitmqv1beta1.RabbitmqCluster{} + err := client.Get(ctx, types.NamespacedName{Name: cluster.Name, Namespace: cluster.Namespace}, rmq) + return apierrors.IsNotFound(err) + }, 5).Should(BeTrue()) + }) + + It("does not allow cluster scale down", func() { + By("not updating statefulSet replicas", func() { + cluster = &rabbitmqv1beta1.RabbitmqCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "rabbitmq-shrink", + Namespace: defaultNamespace, + }, + Spec: rabbitmqv1beta1.RabbitmqClusterSpec{ + Replicas: pointer.Int32Ptr(5), + }, + } + Expect(client.Create(ctx, cluster)).To(Succeed()) + waitForClusterCreation(ctx, cluster, client) + + Expect(updateWithRetry(cluster, func(r *rabbitmqv1beta1.RabbitmqCluster) { + r.Spec.Replicas = pointer.Int32Ptr(3) + })).To(Succeed()) + Consistently(func() int32 { + sts, err := clientSet.AppsV1().StatefulSets(defaultNamespace).Get(ctx, cluster.ChildResourceName("server"), metav1.GetOptions{}) + Expect(err).NotTo(HaveOccurred()) + return *sts.Spec.Replicas + }, 10, 1).Should(Equal(int32(5))) + }) + + By("setting 'Warning' events", func() { + Expect(aggregateEventMsgs(ctx, cluster, "UnsupportedOperation")).To( + ContainSubstring("Cluster Scale down not supported")) + }) + + By("setting ReconcileSuccess to 'false'", func() { + Eventually(func() string { + rabbit := &rabbitmqv1beta1.RabbitmqCluster{} + Expect(client.Get(ctx, runtimeClient.ObjectKey{ + Name: cluster.Name, + Namespace: defaultNamespace, + }, rabbit)).To(Succeed()) + + for i := range rabbit.Status.Conditions { + if rabbit.Status.Conditions[i].Type == status.ReconcileSuccess { + return fmt.Sprintf( + "ReconcileSuccess status: %s, with reason: %s and message: %s", + rabbit.Status.Conditions[i].Status, + rabbit.Status.Conditions[i].Reason, + rabbit.Status.Conditions[i].Message) + } + } + return "ReconcileSuccess status: condition not present" + }, 5).Should(Equal("ReconcileSuccess status: False, " + + "with reason: UnsupportedOperation " + + "and message: Cluster Scale down not supported")) + }) + }) +})