Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
204 changes: 141 additions & 63 deletions controllers/rabbitmqcluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,10 @@ var (
)

const (
ownerKey = ".metadata.controller"
ownerKind = "RabbitmqCluster"
deletionFinalizer = "deletion.finalizers.rabbitmqclusters.rabbitmq.com"
ownerKey = ".metadata.controller"
ownerKind = "RabbitmqCluster"
deletionFinalizer = "deletion.finalizers.rabbitmqclusters.rabbitmq.com"
pluginsUpdateAnnotation = "rabbitmq.com/pluginsUpdatedAt"
)

// RabbitmqClusterReconciler reconciles a RabbitmqCluster object
Expand Down Expand Up @@ -183,7 +184,6 @@ func (r *RabbitmqClusterReconciler) Reconcile(req ctrl.Request) (ctrl.Result, er
operationResult, apiError = controllerutil.CreateOrUpdate(ctx, r, resource, func() error {
return builder.Update(resource)
})

return apiError
})
r.logAndRecordOperationResult(rabbitmqCluster, resource, operationResult, err)
Expand All @@ -194,11 +194,13 @@ func (r *RabbitmqClusterReconciler) Reconcile(req ctrl.Request) (ctrl.Result, er
"namespace", rabbitmqCluster.Namespace,
"name", rabbitmqCluster.Name)
}

return ctrl.Result{}, err
}

r.restartStatefulSetIfNeeded(ctx, builder, operationResult, rabbitmqCluster)
r.annotatePluginsConfigMapIfUpdated(ctx, builder, operationResult, rabbitmqCluster)
if restarted := r.restartStatefulSetIfNeeded(ctx, builder, operationResult, rabbitmqCluster); restarted {
return ctrl.Result{RequeueAfter: time.Second * 10}, nil
}
}

// Set ReconcileSuccess to true here because all CRUD operations to Kube API related
Expand All @@ -214,18 +216,13 @@ func (r *RabbitmqClusterReconciler) Reconcile(req ctrl.Request) (ctrl.Result, er
return ctrl.Result{}, err
}

if ok, err := r.allReplicasReady(ctx, rabbitmqCluster); !ok {
// only enable plugins when all pods of the StatefulSet become ready
// requeue request after 10 seconds without error
logger.Info("Not all replicas ready yet; requeuing request to enable plugins on RabbitmqCluster",
"namespace", rabbitmqCluster.Namespace,
"name", rabbitmqCluster.Name)
return ctrl.Result{RequeueAfter: time.Second * 10}, err
}

if err := r.enablePlugins(rabbitmqCluster); err != nil {
requeueAfter, err := r.setPluginsIfNeeded(ctx, rabbitmqCluster)
if err != nil {
return ctrl.Result{}, err
}
if requeueAfter > 0 {
return ctrl.Result{RequeueAfter: requeueAfter}, nil
}

logger.Info("Finished reconciling RabbitmqCluster",
"namespace", rabbitmqCluster.Namespace,
Expand Down Expand Up @@ -312,70 +309,154 @@ func (r *RabbitmqClusterReconciler) setAdminStatus(ctx context.Context, rmq *rab
return nil
}

// restartStatefulSetIfNeeded - helper function that annotates the StatefulSet PodTemplate with current timestamp
// to trigger a restart of the all pods in the StatefulSet when builder requires StatefulSet to be updated
func (r *RabbitmqClusterReconciler) restartStatefulSetIfNeeded(ctx context.Context, builder resource.ResourceBuilder, operationResult controllerutil.OperationResult, rmq *rabbitmqv1beta1.RabbitmqCluster) {
if builder.UpdateRequiresStsRestart() && operationResult == controllerutil.OperationResultUpdated {
if err := clientretry.RetryOnConflict(clientretry.DefaultRetry, func() error {
sts := &appsv1.StatefulSet{ObjectMeta: metav1.ObjectMeta{Name: rmq.ChildResourceName("server"), Namespace: rmq.Namespace}}
if err := r.Get(ctx, types.NamespacedName{Name: sts.Name, Namespace: sts.Namespace}, sts); err != nil {
return err
}
if sts.Spec.Template.ObjectMeta.Annotations == nil {
sts.Spec.Template.ObjectMeta.Annotations = make(map[string]string)
}
sts.Spec.Template.ObjectMeta.Annotations["rabbitmq.com/restartAt"] = time.Now().Format(time.RFC3339)
return r.Update(ctx, sts)
}); err != nil {
msg := fmt.Sprintf("Failed to restart StatefulSet %s of Namespace %s; rabbitmq.conf configuration may be outdated", rmq.ChildResourceName("server"), rmq.Namespace)
r.Log.Error(err, msg)
r.Recorder.Event(rmq, corev1.EventTypeWarning, "FailedUpdate", msg)
// Adds an arbitrary annotation (rabbitmq.com/lastRestartAt) to the StatefulSet PodTemplate to trigger a StatefulSet restart
// if builder requires StatefulSet to be updated.
func (r *RabbitmqClusterReconciler) restartStatefulSetIfNeeded(
ctx context.Context,
builder resource.ResourceBuilder,
operationResult controllerutil.OperationResult,
rmq *rabbitmqv1beta1.RabbitmqCluster) (restarted bool) {

if !(builder.UpdateRequiresStsRestart() && operationResult == controllerutil.OperationResultUpdated) {
return false
}

if err := clientretry.RetryOnConflict(clientretry.DefaultRetry, func() error {
sts := &appsv1.StatefulSet{ObjectMeta: metav1.ObjectMeta{Name: rmq.ChildResourceName("server"), Namespace: rmq.Namespace}}
if err := r.Get(ctx, types.NamespacedName{Name: sts.Name, Namespace: sts.Namespace}, sts); err != nil {
return err
}
msg := fmt.Sprintf("Restarted StatefulSet %s of Namespace %s", rmq.ChildResourceName("server"), rmq.Namespace)
r.Log.Info(msg)
r.Recorder.Event(rmq, corev1.EventTypeNormal, "SuccessfulUpdate", msg)
if sts.Spec.Template.ObjectMeta.Annotations == nil {
sts.Spec.Template.ObjectMeta.Annotations = make(map[string]string)
}
sts.Spec.Template.ObjectMeta.Annotations["rabbitmq.com/lastRestartAt"] = time.Now().Format(time.RFC3339)
return r.Update(ctx, sts)
}); err != nil {
msg := fmt.Sprintf("failed to restart StatefulSet %s of Namespace %s; rabbitmq.conf configuration may be outdated", rmq.ChildResourceName("server"), rmq.Namespace)
r.Log.Error(err, msg)
r.Recorder.Event(rmq, corev1.EventTypeWarning, "FailedUpdate", msg)
return false
}

msg := fmt.Sprintf("restarted StatefulSet %s of Namespace %s", rmq.ChildResourceName("server"), rmq.Namespace)
r.Log.Info(msg)
r.Recorder.Event(rmq, corev1.EventTypeNormal, "SuccessfulUpdate", msg)
return true
}

// allReplicasReady - helper function that checks if StatefulSet replicas are all ready
func (r *RabbitmqClusterReconciler) allReplicasReady(ctx context.Context, rmq *rabbitmqv1beta1.RabbitmqCluster) (bool, error) {
sts := &appsv1.StatefulSet{}
// There are 2 paths how plugins are set:
// 1. When SatefulSet is (re)started, the up-to-date plugins list (ConfigMap copied by the init container) is read by RabbitMQ nodes during node start up.
// 2. When the plugins ConfigMap is changed, 'rabbitmq-plugins set' updates the plugins on every node (without the need to re-start the nodes).
// This method implements the 2nd path.
func (r *RabbitmqClusterReconciler) setPluginsIfNeeded(ctx context.Context, rmq *rabbitmqv1beta1.RabbitmqCluster) (requeueAfter time.Duration, err error) {
configMap := corev1.ConfigMap{}
if err := r.Get(ctx, types.NamespacedName{Namespace: rmq.Namespace, Name: rmq.ChildResourceName(resource.PluginsConfig)}, &configMap); err != nil {
return 0, client.IgnoreNotFound(err)
}

if err := r.Get(ctx, types.NamespacedName{Name: rmq.ChildResourceName("server"), Namespace: rmq.Namespace}, sts); err != nil {
return false, client.IgnoreNotFound(err)
pluginsUpdatedAt, ok := configMap.Annotations[pluginsUpdateAnnotation]
if !ok {
return 0, nil // plugins configMap was not updated
}

if sts.Status.ReadyReplicas < *sts.Spec.Replicas {
return false, nil
annotationTime, err := time.Parse(time.RFC3339, pluginsUpdatedAt)
if err != nil {
return 0, err
}
if time.Since(annotationTime).Seconds() < 2 {
// plugins configMap was updated very recently
// give StatefulSet controller some time to trigger restart of StatefulSet if necessary
// otherwise, there would be race conditions where we exec into containers losing the connection due to pods being terminated
r.Log.Info("requeuing request to set plugins on RabbitmqCluster",
"namespace", rmq.Namespace,
"name", rmq.Name)
return 2 * time.Second, nil
}

return true, nil
}
ready, err := r.allReplicasReadyAndUpdated(ctx, rmq)
if err != nil {
return 0, err
}
if !ready {
r.Log.Info("not all replicas ready yet; requeuing request to set plugins on RabbitmqCluster",
"namespace", rmq.Namespace,
"name", rmq.Name)
return 15 * time.Second, err
}

// enablePlugins - helper function to set the list of enabled plugins in a given RabbitmqCluster pods
// `rabbitmq-plugins set` disables plugins that are not in the provided list
func (r *RabbitmqClusterReconciler) enablePlugins(rmq *rabbitmqv1beta1.RabbitmqCluster) error {
plugins := resource.NewRabbitmqPlugins(rmq.Spec.Rabbitmq.AdditionalPlugins)
for i := int32(0); i < *rmq.Spec.Replicas; i++ {
podName := fmt.Sprintf("%s-%d", rmq.ChildResourceName("server"), i)
rabbitCommand := fmt.Sprintf("rabbitmq-plugins set %s", plugins.AsString(" "))

stdout, stderr, err := r.exec(rmq.Namespace, podName, "rabbitmq", "sh", "-c", rabbitCommand)

if err != nil {

r.Log.Error(err, fmt.Sprintf(
"Failed to enable plugins on pod %s in namespace %s, running command %s with output: %s %s",
podName, rmq.Namespace, rabbitCommand, stdout, stderr))

return err
r.Log.Error(err, "failed to set plugins",
"namespace", rmq.Namespace,
"name", rmq.Name,
"pod", podName,
"command", rabbitCommand,
"stdout", stdout,
"stderr", stderr)
return 0, err
}
}

r.Log.Info("Successfully enabled plugins on RabbitmqCluster",
r.Log.Info("successfully set plugins on RabbitmqCluster",
"namespace", rmq.Namespace,
"name", rmq.Name)
return nil

delete(configMap.Annotations, pluginsUpdateAnnotation)
if err := r.Update(ctx, &configMap); err != nil {
return 0, client.IgnoreNotFound(err)
}

return 0, nil
}

func (r *RabbitmqClusterReconciler) allReplicasReadyAndUpdated(ctx context.Context, rmq *rabbitmqv1beta1.RabbitmqCluster) (bool, error) {
sts := &appsv1.StatefulSet{}

if err := r.Get(ctx, types.NamespacedName{Name: rmq.ChildResourceName("server"), Namespace: rmq.Namespace}, sts); err != nil {
return false, client.IgnoreNotFound(err)
}

desiredReplicas := *sts.Spec.Replicas
if sts.Status.ReadyReplicas < desiredReplicas ||
sts.Status.UpdatedReplicas < desiredReplicas { // StatefulSet rolling update is ongoing
return false, nil
}

return true, nil
}

// Annotates the plugins ConfigMap if it was updated such that 'rabbitmq-plugins set' will be called on the RabbitMQ nodes at a later point in time
func (r *RabbitmqClusterReconciler) annotatePluginsConfigMapIfUpdated(
ctx context.Context,
builder resource.ResourceBuilder,
operationResult controllerutil.OperationResult,
rmq *rabbitmqv1beta1.RabbitmqCluster) {

if _, ok := builder.(*resource.RabbitmqPluginsConfigMapBuilder); !ok {
return
}
if operationResult != controllerutil.OperationResultUpdated {
return
}

if retryOnConflictErr := clientretry.RetryOnConflict(clientretry.DefaultRetry, func() error {
configMap := corev1.ConfigMap{}
if err := r.Get(ctx, types.NamespacedName{Namespace: rmq.Namespace, Name: rmq.ChildResourceName(resource.PluginsConfig)}, &configMap); err != nil {
return client.IgnoreNotFound(err)
}
if configMap.Annotations == nil {
configMap.Annotations = make(map[string]string)
}
configMap.Annotations[pluginsUpdateAnnotation] = time.Now().Format(time.RFC3339)
return r.Update(ctx, &configMap)
}); retryOnConflictErr != nil {
msg := fmt.Sprintf("Failed to annotate ConfigMap %s of Namespace %s; enabled_plugins may be outdated", rmq.ChildResourceName(resource.PluginsConfig), rmq.Namespace)
r.Log.Error(retryOnConflictErr, msg)
r.Recorder.Event(rmq, corev1.EventTypeWarning, "FailedUpdate", msg)
}
}

func (r *RabbitmqClusterReconciler) exec(namespace, podName, containerName string, command ...string) (string, string, error) {
Expand Down Expand Up @@ -407,12 +488,9 @@ func (r *RabbitmqClusterReconciler) exec(namespace, podName, containerName strin
Stdin: nil,
Tty: false,
})

if err != nil {

return stdOut.String(), stdErr.String(), err
}

if stdErr.Len() > 0 {
return stdOut.String(), stdErr.String(), fmt.Errorf("%v", stdErr)
}
Expand Down
24 changes: 12 additions & 12 deletions controllers/rabbitmqcluster_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,16 +167,16 @@ var _ = Describe("RabbitmqclusterController", func() {
})
By("recording SuccessfullCreate events for all child resources", func() {
allEventMsgs := aggregateEventMsgs(ctx, rabbitmqCluster, "SuccessfulCreate")
Expect(allEventMsgs).To(ContainSubstring(fmt.Sprintf("created resource %s of Type *v1.StatefulSet", rabbitmqCluster.ChildResourceName("server"))))
Expect(allEventMsgs).To(ContainSubstring(fmt.Sprintf("created resource %s of Type *v1.Service", rabbitmqCluster.ChildResourceName("client"))))
Expect(allEventMsgs).To(ContainSubstring(fmt.Sprintf("created resource %s of Type *v1.Service", rabbitmqCluster.ChildResourceName("headless"))))
Expect(allEventMsgs).To(ContainSubstring(fmt.Sprintf("created resource %s of Type *v1.ConfigMap", rabbitmqCluster.ChildResourceName("plugins-conf"))))
Expect(allEventMsgs).To(ContainSubstring(fmt.Sprintf("created resource %s of Type *v1.ConfigMap", rabbitmqCluster.ChildResourceName("server-conf"))))
Expect(allEventMsgs).To(ContainSubstring(fmt.Sprintf("created resource %s of Type *v1.Secret", rabbitmqCluster.ChildResourceName("erlang-cookie"))))
Expect(allEventMsgs).To(ContainSubstring(fmt.Sprintf("created resource %s of Type *v1.Secret", rabbitmqCluster.ChildResourceName("admin"))))
Expect(allEventMsgs).To(ContainSubstring(fmt.Sprintf("created resource %s of Type *v1.ServiceAccount", rabbitmqCluster.ChildResourceName("server"))))
Expect(allEventMsgs).To(ContainSubstring(fmt.Sprintf("created resource %s of Type *v1.Role", rabbitmqCluster.ChildResourceName("peer-discovery"))))
Expect(allEventMsgs).To(ContainSubstring(fmt.Sprintf("created resource %s of Type *v1.RoleBinding", rabbitmqCluster.ChildResourceName("server"))))
Expect(allEventMsgs).To(ContainSubstring("created resource %s of Type *v1.StatefulSet", rabbitmqCluster.ChildResourceName("server")))
Expect(allEventMsgs).To(ContainSubstring("created resource %s of Type *v1.Service", rabbitmqCluster.ChildResourceName("client")))
Expect(allEventMsgs).To(ContainSubstring("created resource %s of Type *v1.Service", rabbitmqCluster.ChildResourceName("headless")))
Expect(allEventMsgs).To(ContainSubstring("created resource %s of Type *v1.ConfigMap", rabbitmqCluster.ChildResourceName("plugins-conf")))
Expect(allEventMsgs).To(ContainSubstring("created resource %s of Type *v1.ConfigMap", rabbitmqCluster.ChildResourceName("server-conf")))
Expect(allEventMsgs).To(ContainSubstring("created resource %s of Type *v1.Secret", rabbitmqCluster.ChildResourceName("erlang-cookie")))
Expect(allEventMsgs).To(ContainSubstring("created resource %s of Type *v1.Secret", rabbitmqCluster.ChildResourceName("admin")))
Expect(allEventMsgs).To(ContainSubstring("created resource %s of Type *v1.ServiceAccount", rabbitmqCluster.ChildResourceName("server")))
Expect(allEventMsgs).To(ContainSubstring("created resource %s of Type *v1.Role", rabbitmqCluster.ChildResourceName("peer-discovery")))
Expect(allEventMsgs).To(ContainSubstring("created resource %s of Type *v1.RoleBinding", rabbitmqCluster.ChildResourceName("server")))
})

By("adding the deletion finalizer", func() {
Expand Down Expand Up @@ -665,7 +665,7 @@ var _ = Describe("RabbitmqclusterController", func() {

// verify that SuccessfulUpdate event is recorded for the client service
Expect(aggregateEventMsgs(ctx, rabbitmqCluster, "SuccessfulUpdate")).To(
ContainSubstring(fmt.Sprintf("updated resource %s of Type *v1.Service", rabbitmqCluster.ChildResourceName("client"))))
ContainSubstring("updated resource %s of Type *v1.Service", rabbitmqCluster.ChildResourceName("client")))
})

It("the CPU and memory requirements are updated", func() {
Expand Down Expand Up @@ -699,7 +699,7 @@ var _ = Describe("RabbitmqclusterController", func() {

// verify that SuccessfulUpdate event is recorded for the StatefulSet
Expect(aggregateEventMsgs(ctx, rabbitmqCluster, "SuccessfulUpdate")).To(
ContainSubstring(fmt.Sprintf("updated resource %s of Type *v1.StatefulSet", rabbitmqCluster.ChildResourceName("server"))))
ContainSubstring("updated resource %s of Type *v1.StatefulSet", rabbitmqCluster.ChildResourceName("server")))
})

It("the rabbitmq image is updated", func() {
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -702,6 +702,7 @@ google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ij
google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
google.golang.org/grpc v1.23.1 h1:q4XQuHFC6I28BKZpo6IYyb3mNO+l7lSOxRuYTCiDfXk=
google.golang.org/grpc v1.23.1/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
google.golang.org/grpc v1.26.0 h1:2dTRdpdFEEhJYQD8EMLB61nnrzSCTbG38PhqdhvOltg=
google.golang.org/grpc v1.26.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
Expand Down
2 changes: 1 addition & 1 deletion internal/resource/configmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (builder *RabbitmqResourceBuilder) ServerConfigMap() *ServerConfigMapBuilde
}

func (builder *ServerConfigMapBuilder) UpdateRequiresStsRestart() bool {
return true
return true // because rabbitmq.conf and advanced.config changes take effect only after a node restart
}

func (builder *ServerConfigMapBuilder) Update(object runtime.Object) error {
Expand Down
4 changes: 2 additions & 2 deletions internal/resource/rabbitmq_plugins.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ var requiredPlugins = []string{
"rabbitmq_management",
}

const pluginsConfig = "plugins-conf"
const PluginsConfig = "plugins-conf"

type RabbitmqPlugins struct {
requiredPlugins []string
Expand Down Expand Up @@ -82,7 +82,7 @@ func (builder *RabbitmqPluginsConfigMapBuilder) Update(object runtime.Object) er
func (builder *RabbitmqPluginsConfigMapBuilder) Build() (runtime.Object, error) {
return &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: builder.Instance.ChildResourceName(pluginsConfig),
Name: builder.Instance.ChildResourceName(PluginsConfig),
Namespace: builder.Instance.Namespace,
},
Data: map[string]string{
Expand Down
2 changes: 1 addition & 1 deletion internal/resource/statefulset.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ func (builder *StatefulSetBuilder) podTemplateSpec(annotations, labels map[strin
VolumeSource: corev1.VolumeSource{
ConfigMap: &corev1.ConfigMapVolumeSource{
LocalObjectReference: corev1.LocalObjectReference{
Name: builder.Instance.ChildResourceName(pluginsConfig),
Name: builder.Instance.ChildResourceName(PluginsConfig),
},
},
},
Expand Down
Loading