Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 19 additions & 5 deletions controllers/reconcile_persistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
)

func (r *RabbitmqClusterReconciler) reconcilePVC(ctx context.Context, cluster *rabbitmqv1beta1.RabbitmqCluster, current, sts *appsv1.StatefulSet) error {
resize, err := r.needsPVCResize(current, sts)
func (r *RabbitmqClusterReconciler) reconcilePVC(ctx context.Context, rmq *rabbitmqv1beta1.RabbitmqCluster, current, sts *appsv1.StatefulSet) error {
resize, err := r.needsPVCExpand(ctx, rmq, current, sts)
if err != nil {
return err
}

if resize {
if err := r.expandPVC(ctx, cluster, current, sts); err != nil {
if err := r.expandPVC(ctx, rmq, current, sts); err != nil {
return err
}
}
Expand Down Expand Up @@ -84,7 +84,11 @@ func (r *RabbitmqClusterReconciler) updatePVC(ctx context.Context, rmq *rabbitmq
return nil
}

func (r *RabbitmqClusterReconciler) needsPVCResize(current, desired *appsv1.StatefulSet) (bool, error) {
// returns true if desired storage capacity is larger than the current storage; returns false when current and desired capacity is the same
// errors when desired capacity is less than current capacity because PVC shrink is not supported by k8s
func (r *RabbitmqClusterReconciler) needsPVCExpand(ctx context.Context, rmq *rabbitmqv1beta1.RabbitmqCluster, current, desired *appsv1.StatefulSet) (bool, error) {
logger := ctrl.LoggerFrom(ctx)

currentCapacity, err := persistenceStorageCapacity(current.Spec.VolumeClaimTemplates)
if err != nil {
return false, err
Expand All @@ -95,10 +99,20 @@ func (r *RabbitmqClusterReconciler) needsPVCResize(current, desired *appsv1.Stat
return false, err
}

if currentCapacity.Cmp(desiredCapacity) != 0 {
cmp := currentCapacity.Cmp(desiredCapacity)

// desired storage capacity is larger than the current capacity; PVC needs expansion
if cmp == -1 {
return true, nil
}

// desired storage capacity is less than the current capacity; logs and records a warning event
if cmp == 1 {
msg := "shrinking persistent volumes is not supported"
logger.Error(errors.New("unsupported operation"), msg)
r.Recorder.Event(rmq, corev1.EventTypeWarning, "FailedReconcilePersistence", msg)
return false, errors.New(msg)
}
return false, nil
}

Expand Down
91 changes: 91 additions & 0 deletions controllers/reconcile_persistence_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package controllers_test

import (
"context"
"fmt"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
rabbitmqv1beta1 "github.com/rabbitmq/cluster-operator/api/v1beta1"
"github.com/rabbitmq/cluster-operator/internal/status"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
k8sresource "k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/pointer"
runtimeClient "sigs.k8s.io/controller-runtime/pkg/client"
)

var _ = Describe("Persistence", func() {
var (
cluster *rabbitmqv1beta1.RabbitmqCluster
defaultNamespace = "default"
ctx = context.Background()
)

BeforeEach(func() {
cluster = &rabbitmqv1beta1.RabbitmqCluster{
ObjectMeta: metav1.ObjectMeta{
Name: "rabbitmq-shrink",
Namespace: defaultNamespace,
},
Spec: rabbitmqv1beta1.RabbitmqClusterSpec{
Replicas: pointer.Int32Ptr(5),
},
}
Expect(client.Create(ctx, cluster)).To(Succeed())
waitForClusterCreation(ctx, cluster, client)
})

AfterEach(func() {
Expect(client.Delete(ctx, cluster)).To(Succeed())
Eventually(func() bool {
rmq := &rabbitmqv1beta1.RabbitmqCluster{}
err := client.Get(ctx, types.NamespacedName{Name: cluster.Name, Namespace: cluster.Namespace}, rmq)
return apierrors.IsNotFound(err)
}, 5).Should(BeTrue())
})

It("does not allow PVC shrink", func() {
By("not updating statefulSet volume claim storage capacity", func() {
tenG := k8sresource.MustParse("10Gi")
Expect(updateWithRetry(cluster, func(r *rabbitmqv1beta1.RabbitmqCluster) {
storage := k8sresource.MustParse("1Gi")
cluster.Spec.Persistence.Storage = &storage
})).To(Succeed())
Consistently(func() k8sresource.Quantity {
sts, err := clientSet.AppsV1().StatefulSets(defaultNamespace).Get(ctx, cluster.ChildResourceName("server"), metav1.GetOptions{})
Expect(err).NotTo(HaveOccurred())
return sts.Spec.VolumeClaimTemplates[0].Spec.Resources.Requests[corev1.ResourceStorage]
}, 10, 1).Should(Equal(tenG))
})

By("setting 'Warning' events", func() {
Expect(aggregateEventMsgs(ctx, cluster, "FailedReconcilePersistence")).To(
ContainSubstring("shrinking persistent volumes is not supported"))
})

By("setting ReconcileSuccess to 'false' with failed reason and message", func() {
Eventually(func() string {
rabbit := &rabbitmqv1beta1.RabbitmqCluster{}
Expect(client.Get(ctx, runtimeClient.ObjectKey{
Name: cluster.Name,
Namespace: defaultNamespace,
}, rabbit)).To(Succeed())

for i := range rabbit.Status.Conditions {
if rabbit.Status.Conditions[i].Type == status.ReconcileSuccess {
return fmt.Sprintf(
"ReconcileSuccess status: %s, with reason: %s and message: %s",
rabbit.Status.Conditions[i].Status,
rabbit.Status.Conditions[i].Reason,
rabbit.Status.Conditions[i].Message)
}
}
return "ReconcileSuccess status: condition not present"
}, 5).Should(Equal("ReconcileSuccess status: False, " +
"with reason: FailedReconcilePVC " +
"and message: shrinking persistent volumes is not supported"))
})
})
})