Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set Pod Management to 'Parallel' and disallow cluster scale down entirely #621

Merged
merged 2 commits into from Mar 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion config/crd/bases/rabbitmq.com_rabbitmqclusters.yaml
Expand Up @@ -10,7 +10,7 @@ apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
annotations:
controller-gen.kubebuilder.io/version: v0.4.1
controller-gen.kubebuilder.io/version: v0.5.0
creationTimestamp: null
name: rabbitmqclusters.rabbitmq.com
spec:
Expand Down
42 changes: 30 additions & 12 deletions controllers/rabbitmqcluster_controller.go
Expand Up @@ -22,7 +22,7 @@ import (

"github.com/rabbitmq/cluster-operator/internal/resource"
"github.com/rabbitmq/cluster-operator/internal/status"
"k8s.io/apimachinery/pkg/api/errors"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/record"
Expand Down Expand Up @@ -89,7 +89,7 @@ func (r *RabbitmqClusterReconciler) Reconcile(ctx context.Context, req ctrl.Requ

if client.IgnoreNotFound(err) != nil {
return ctrl.Result{}, err
} else if errors.IsNotFound(err) {
} else if k8serrors.IsNotFound(err) {
// No need to requeue if the resource no longer exists
return ctrl.Result{}, nil
}
Expand Down Expand Up @@ -163,13 +163,31 @@ func (r *RabbitmqClusterReconciler) Reconcile(ctx context.Context, req ctrl.Requ

// only StatefulSetBuilder returns true
if builder.UpdateMayRequireStsRecreate() {
if err = r.reconcilePVC(ctx, builder, rabbitmqCluster, resource); err != nil {
rabbitmqCluster.Status.SetCondition(status.ReconcileSuccess, corev1.ConditionFalse, "FailedReconcilePVC", err.Error())
if statusErr := r.Status().Update(ctx, rabbitmqCluster); statusErr != nil {
logger.Error(statusErr, "Failed to update ReconcileSuccess condition state")
}
sts := resource.(*appsv1.StatefulSet)

current, err := r.statefulSet(ctx, rabbitmqCluster)
if client.IgnoreNotFound(err) != nil {
return ctrl.Result{}, err
}

// only checks for PVC expansion and scale down if statefulSet is created
// else continue to CreateOrUpdate()
if !k8serrors.IsNotFound(err) {
if err := builder.Update(sts); err != nil {
return ctrl.Result{}, err
}
if err = r.reconcilePVC(ctx, rabbitmqCluster, current, sts); err != nil {
rabbitmqCluster.Status.SetCondition(status.ReconcileSuccess, corev1.ConditionFalse, "FailedReconcilePVC", err.Error())
if statusErr := r.Status().Update(ctx, rabbitmqCluster); statusErr != nil {
logger.Error(statusErr, "Failed to update ReconcileSuccess condition state")
}
return ctrl.Result{}, err
}
if r.scaleDown(ctx, rabbitmqCluster, current, sts) {
// return when cluster scale down detected; unsupported operation
return ctrl.Result{}, nil
}
}
}

var operationResult controllerutil.OperationResult
Expand Down Expand Up @@ -269,7 +287,7 @@ func (r *RabbitmqClusterReconciler) updateStatus(ctx context.Context, rmq *rabbi

if !reflect.DeepEqual(rmq.Status.Conditions, oldConditions) {
if err = r.Status().Update(ctx, rmq); err != nil {
if errors.IsConflict(err) {
if k8serrors.IsConflict(err) {
logger.Info("failed to update status because of conflict; requeueing...",
"namespace", rmq.Namespace,
"name", rmq.Name)
Expand All @@ -287,17 +305,17 @@ func (r *RabbitmqClusterReconciler) getChildResources(ctx context.Context, rmq *

if err := r.Client.Get(ctx,
types.NamespacedName{Name: rmq.ChildResourceName("server"), Namespace: rmq.Namespace},
sts); err != nil && !errors.IsNotFound(err) {
sts); err != nil && !k8serrors.IsNotFound(err) {
return nil, err
} else if errors.IsNotFound(err) {
} else if k8serrors.IsNotFound(err) {
sts = nil
}

if err := r.Client.Get(ctx,
types.NamespacedName{Name: rmq.ChildResourceName(resource.ServiceSuffix), Namespace: rmq.Namespace},
endPoints); err != nil && !errors.IsNotFound(err) {
endPoints); err != nil && !k8serrors.IsNotFound(err) {
return nil, err
} else if errors.IsNotFound(err) {
} else if k8serrors.IsNotFound(err) {
endPoints = nil
}

Expand Down
4 changes: 2 additions & 2 deletions controllers/rabbitmqcluster_controller_test.go
Expand Up @@ -937,7 +937,7 @@ var _ = Describe("RabbitmqClusterController", func() {

It("updates", func() {
Expect(updateWithRetry(cluster, func(r *rabbitmqv1beta1.RabbitmqCluster) {
cluster.Spec.Override.StatefulSet.Spec.Replicas = pointer.Int32Ptr(5)
cluster.Spec.Override.StatefulSet.Spec.Replicas = pointer.Int32Ptr(15)
cluster.Spec.Override.StatefulSet.Spec.Template.Spec.Containers = []corev1.Container{
{
Name: "additional-container-2",
Expand All @@ -949,7 +949,7 @@ var _ = Describe("RabbitmqClusterController", func() {
Eventually(func() int32 {
sts := statefulSet(ctx, cluster)
return *sts.Spec.Replicas
}, 3).Should(Equal(int32(5)))
}, 3).Should(Equal(int32(15)))

Eventually(func() string {
sts := statefulSet(ctx, cluster)
Expand Down
19 changes: 1 addition & 18 deletions controllers/reconcile_persistence.go
Expand Up @@ -6,7 +6,6 @@ import (
"fmt"
"github.com/go-logr/logr"
rabbitmqv1beta1 "github.com/rabbitmq/cluster-operator/api/v1beta1"
"github.com/rabbitmq/cluster-operator/internal/resource"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -18,23 +17,7 @@ import (
"time"
)

func (r *RabbitmqClusterReconciler) reconcilePVC(ctx context.Context, builder resource.ResourceBuilder, cluster *rabbitmqv1beta1.RabbitmqCluster, resource client.Object) error {
logger := ctrl.LoggerFrom(ctx)

sts := resource.(*appsv1.StatefulSet)
current, err := r.statefulSet(ctx, cluster)

if client.IgnoreNotFound(err) != nil {
return err
} else if k8serrors.IsNotFound(err) {
logger.Info("statefulSet not created yet, skipping checks to expand PersistentVolumeClaims")
return nil
}

if err := builder.Update(sts); err != nil {
return err
}

func (r *RabbitmqClusterReconciler) reconcilePVC(ctx context.Context, cluster *rabbitmqv1beta1.RabbitmqCluster, current, sts *appsv1.StatefulSet) error {
resize, err := r.needsPVCResize(current, sts)
if err != nil {
return err
Expand Down
32 changes: 32 additions & 0 deletions controllers/reconcile_scale_down.go
@@ -0,0 +1,32 @@
package controllers

import (
"context"
"errors"
"github.com/go-logr/logr"
"github.com/rabbitmq/cluster-operator/api/v1beta1"
"github.com/rabbitmq/cluster-operator/internal/status"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
)

// cluster scale down not supported
// log error, publish warning event, and set ReconcileSuccess to false when scale down request detected
func (r *RabbitmqClusterReconciler) scaleDown(ctx context.Context, cluster *v1beta1.RabbitmqCluster, current, sts *appsv1.StatefulSet) bool {
logger := logr.FromContext(ctx)

currentReplicas := *current.Spec.Replicas
desiredReplicas := *sts.Spec.Replicas
if currentReplicas > desiredReplicas {
msg := "Cluster Scale down not supported"
reason := "UnsupportedOperation"
logger.Error(errors.New(reason), msg)
r.Recorder.Event(cluster, corev1.EventTypeWarning, reason, msg)
cluster.Status.SetCondition(status.ReconcileSuccess, corev1.ConditionFalse, reason, msg)
if statusErr := r.Status().Update(ctx, cluster); statusErr != nil {
logger.Error(statusErr, "Failed to update ReconcileSuccess condition state")
}
return true
}
return false
}
85 changes: 85 additions & 0 deletions controllers/reconcile_scale_down_test.go
@@ -0,0 +1,85 @@
package controllers_test

import (
"context"
"fmt"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
rabbitmqv1beta1 "github.com/rabbitmq/cluster-operator/api/v1beta1"
"github.com/rabbitmq/cluster-operator/internal/status"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/pointer"
runtimeClient "sigs.k8s.io/controller-runtime/pkg/client"
)

var _ = Describe("Cluster scale down", func() {
var (
cluster *rabbitmqv1beta1.RabbitmqCluster
defaultNamespace = "default"
ctx = context.Background()
)

AfterEach(func() {
Expect(client.Delete(ctx, cluster)).To(Succeed())
Eventually(func() bool {
rmq := &rabbitmqv1beta1.RabbitmqCluster{}
err := client.Get(ctx, types.NamespacedName{Name: cluster.Name, Namespace: cluster.Namespace}, rmq)
return apierrors.IsNotFound(err)
}, 5).Should(BeTrue())
})

It("does not allow cluster scale down", func() {
By("not updating statefulSet replicas", func() {
cluster = &rabbitmqv1beta1.RabbitmqCluster{
ObjectMeta: metav1.ObjectMeta{
Name: "rabbitmq-shrink",
Namespace: defaultNamespace,
},
Spec: rabbitmqv1beta1.RabbitmqClusterSpec{
Replicas: pointer.Int32Ptr(5),
},
}
Expect(client.Create(ctx, cluster)).To(Succeed())
waitForClusterCreation(ctx, cluster, client)

Expect(updateWithRetry(cluster, func(r *rabbitmqv1beta1.RabbitmqCluster) {
r.Spec.Replicas = pointer.Int32Ptr(3)
})).To(Succeed())
Consistently(func() int32 {
sts, err := clientSet.AppsV1().StatefulSets(defaultNamespace).Get(ctx, cluster.ChildResourceName("server"), metav1.GetOptions{})
Expect(err).NotTo(HaveOccurred())
return *sts.Spec.Replicas
}, 10, 1).Should(Equal(int32(5)))
})

By("setting 'Warning' events", func() {
Expect(aggregateEventMsgs(ctx, cluster, "UnsupportedOperation")).To(
ContainSubstring("Cluster Scale down not supported"))
})

By("setting ReconcileSuccess to 'false'", func() {
Eventually(func() string {
rabbit := &rabbitmqv1beta1.RabbitmqCluster{}
Expect(client.Get(ctx, runtimeClient.ObjectKey{
Name: cluster.Name,
Namespace: defaultNamespace,
}, rabbit)).To(Succeed())

for i := range rabbit.Status.Conditions {
if rabbit.Status.Conditions[i].Type == status.ReconcileSuccess {
return fmt.Sprintf(
"ReconcileSuccess status: %s, with reason: %s and message: %s",
rabbit.Status.Conditions[i].Status,
rabbit.Status.Conditions[i].Reason,
rabbit.Status.Conditions[i].Message)
}
}
return "ReconcileSuccess status: condition not present"
}, 5).Should(Equal("ReconcileSuccess status: False, " +
"with reason: UnsupportedOperation " +
"and message: Cluster Scale down not supported"))
})
})
})
4 changes: 3 additions & 1 deletion internal/resource/configmap.go
Expand Up @@ -34,7 +34,9 @@ cluster_formation.k8s.host = kubernetes.default
cluster_formation.k8s.address_type = hostname
cluster_partition_handling = pause_minority
queue_master_locator = min-masters
disk_free_limit.absolute = 2GB`
disk_free_limit.absolute = 2GB
cluster_formation.randomized_startup_delay_range.min = 5
cluster_formation.randomized_startup_delay_range.max = 30`

defaultTLSConf = `
ssl_options.certfile = /etc/rabbitmq-tls/tls.crt
Expand Down
2 changes: 2 additions & 0 deletions internal/resource/configmap_test.go
Expand Up @@ -33,6 +33,8 @@ cluster_formation.k8s.address_type = hostname
cluster_partition_handling = pause_minority
queue_master_locator = min-masters
disk_free_limit.absolute = 2GB
cluster_formation.randomized_startup_delay_range.min = 5
cluster_formation.randomized_startup_delay_range.max = 30
cluster_name = ` + instanceName)
}

Expand Down
1 change: 1 addition & 0 deletions internal/resource/statefulset.go
Expand Up @@ -67,6 +67,7 @@ func (builder *StatefulSetBuilder) Build() (client.Object, error) {
MatchLabels: metadata.LabelSelector(builder.Instance.Name),
},
VolumeClaimTemplates: pvc,
PodManagementPolicy: appsv1.ParallelPodManagement,
},
}

Expand Down
11 changes: 10 additions & 1 deletion internal/resource/statefulset_test.go
Expand Up @@ -65,6 +65,7 @@ var _ = Describe("StatefulSet", func() {

Expect(statefulSet.Spec.ServiceName).To(Equal(instance.ChildResourceName("nodes")))
})

It("adds the correct label selector", func() {
obj, err := stsBuilder.Build()
Expect(err).NotTo(HaveOccurred())
Expand All @@ -74,7 +75,15 @@ var _ = Describe("StatefulSet", func() {
Expect(labels["app.kubernetes.io/name"]).To(Equal(instance.Name))
})

It("references the storageclassname when specified", func() {
It("sets pod management policy to 'Parallel' ", func() {
obj, err := stsBuilder.Build()
Expect(err).NotTo(HaveOccurred())
statefulSet := obj.(*appsv1.StatefulSet)

Expect(statefulSet.Spec.PodManagementPolicy).To(Equal(appsv1.ParallelPodManagement))
})

It("references the storage class name when specified", func() {
storageClassName := "my-storage-class"
builder.Instance.Spec.Persistence.StorageClassName = &storageClassName

Expand Down