Skip to content

Commit

Permalink
Support bindings.rabbitmq.com deletion
Browse files Browse the repository at this point in the history
- when binding.spec.argument is not set, it generates
the properties key using binding.spec.routingKey; when
arguments are set, the controller lists all bindings
between a source and a destination, and find the
corresponding binding's binding info to complete deletion
  • Loading branch information
ChunyiLyu committed Mar 23, 2021
1 parent 4eb332b commit 84c0473
Show file tree
Hide file tree
Showing 2 changed files with 123 additions and 3 deletions.
114 changes: 113 additions & 1 deletion controllers/binding_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,16 @@ package controllers
import (
"context"
"encoding/json"
"errors"
rabbithole "github.com/michaelklishin/rabbit-hole/v2"
"github.com/rabbitmq/messaging-topology-operator/internal"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/record"
clientretry "k8s.io/client-go/util/retry"
"reflect"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"time"

"github.com/go-logr/logr"
"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -27,6 +31,8 @@ import (
topologyv1alpha1 "github.com/rabbitmq/messaging-topology-operator/api/v1alpha1"
)

const bindingFinalizer = "deletion.finalizers.bindings.rabbitmq.com"

// BindingReconciler reconciles a Binding object
type BindingReconciler struct {
client.Client
Expand All @@ -47,11 +53,23 @@ func (r *BindingReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
}

rabbitClient, err := rabbitholeClient(ctx, r.Client, binding.Spec.RabbitmqClusterReference)
if err != nil {

if errors.Is(err, NoSuchRabbitmqClusterError) && binding.ObjectMeta.DeletionTimestamp.IsZero() {
logger.Info("Could not generate rabbitClient for non existent cluster: " + err.Error())
return reconcile.Result{RequeueAfter: 10 * time.Second}, err
} else if err != nil && !errors.Is(err, NoSuchRabbitmqClusterError) {
logger.Error(err, failedGenerateRabbitClient)
return reconcile.Result{}, err
}

if !binding.ObjectMeta.DeletionTimestamp.IsZero() {
logger.Info("Deleting")
return ctrl.Result{}, r.deleteBinding(ctx, rabbitClient, binding)
}

if err := r.addFinalizerIfNeeded(ctx, binding); err != nil {
return ctrl.Result{}, err
}
spec, err := json.Marshal(binding.Spec)
if err != nil {
logger.Error(err, failedMarshalSpec)
Expand Down Expand Up @@ -106,6 +124,100 @@ func (r *BindingReconciler) declareBinding(ctx context.Context, client *rabbitho
return nil
}

// deletes binding from rabbitmq server; bindings have no name; server needs BindingInfo to delete them
// when server responds with '404' Not Found, it logs and does not requeue on error
// if no binding argument is set, generating properties key by using internal.GeneratePropertiesKey
// if binding arguments are set, list all bindings between source/destination to find the binding; if it failed to find corresponding binding, it assumes that the binding is already deleted and returns no error
func (r *BindingReconciler) deleteBinding(ctx context.Context, client *rabbithole.Client, binding *topologyv1alpha1.Binding) error {
logger := ctrl.LoggerFrom(ctx)

var info *rabbithole.BindingInfo
var err error
if binding.Spec.Arguments != nil {
info, err = r.findBindingInfo(logger, binding, client)
if err != nil {
return err
}
if info == nil {
logger.Info("cannot find the corresponding binding info in rabbitmq server; binding already deleted")
return r.removeFinalizer(ctx, binding)
}
} else {
info, err = internal.GenerateBindingInfo(binding)
if err != nil {
msg := "failed to generate binding info"
r.Recorder.Event(binding, corev1.EventTypeWarning, "FailedDelete", msg)
logger.Error(err, msg)
return err
}
info.PropertiesKey = internal.GeneratePropertiesKey(binding)
}

err = validateResponseForDeletion(client.DeleteBinding(binding.Spec.Vhost, *info))
if errors.Is(err, NotFound) {
logger.Info("cannot find binding in rabbitmq server; already deleted")
} else if err != nil {
msg := "failed to delete binding"
r.Recorder.Event(binding, corev1.EventTypeWarning, "FailedDelete", msg)
logger.Error(err, msg)
return err
}

logger.Info("Successfully deleted binding")
return r.removeFinalizer(ctx, binding)
}

func (r *BindingReconciler) findBindingInfo(logger logr.Logger, binding *topologyv1alpha1.Binding, client *rabbithole.Client) (*rabbithole.BindingInfo, error) {
logger.Info("binding arguments set; listing bindings from server to complete deletion")
arguments := make(map[string]interface{})
if binding.Spec.Arguments != nil {
if err := json.Unmarshal(binding.Spec.Arguments.Raw, &arguments); err != nil {
msg := "failed to unmarshall binding arguments"
r.Recorder.Event(binding, corev1.EventTypeWarning, "FailedDelete", msg)
logger.Error(err, msg)
return nil, err
}
}
var bindingInfos []rabbithole.BindingInfo
var err error
if binding.Spec.DestinationType == "queue" {
bindingInfos, err = client.ListQueueBindingsBetween(binding.Spec.Vhost, binding.Spec.Source, binding.Spec.Destination)
} else {
bindingInfos, err = client.ListExchangeBindingsBetween(binding.Spec.Vhost, binding.Spec.Source, binding.Spec.Destination)
}
if err != nil {
msg := "failed to list binding infos"
r.Recorder.Event(binding, corev1.EventTypeWarning, "FailedDelete", msg)
logger.Error(err, msg)
return nil, err
}
var info *rabbithole.BindingInfo
for _, b := range bindingInfos {
if binding.Spec.RoutingKey == b.RoutingKey && reflect.DeepEqual(b.Arguments, arguments) {
info = &b
}
}
return info, nil
}

func (r *BindingReconciler) removeFinalizer(ctx context.Context, binding *topologyv1alpha1.Binding) error {
controllerutil.RemoveFinalizer(binding, bindingFinalizer)
if err := r.Client.Update(ctx, binding); err != nil {
return err
}
return nil
}

func (r *BindingReconciler) addFinalizerIfNeeded(ctx context.Context, binding *topologyv1alpha1.Binding) error {
if binding.ObjectMeta.DeletionTimestamp.IsZero() && !controllerutil.ContainsFinalizer(binding, bindingFinalizer) {
controllerutil.AddFinalizer(binding, bindingFinalizer)
if err := r.Client.Update(ctx, binding); err != nil {
return err
}
}
return nil
}

func (r *BindingReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&topologyv1alpha1.Binding{}).
Expand Down
12 changes: 10 additions & 2 deletions system_tests/binding_system_tests.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ var _ = Describe("Binding", func() {
})

AfterEach(func() {
Expect(k8sClient.Delete(ctx, binding)).To(Succeed())
Expect(k8sClient.Delete(ctx, queue)).To(Succeed())
Expect(k8sClient.Delete(ctx, exchange)).To(Succeed())
})
Expand Down Expand Up @@ -129,6 +128,15 @@ var _ = Describe("Binding", func() {
updateBinding := topologyv1alpha1.Binding{}
Expect(k8sClient.Get(ctx, types.NamespacedName{Name: binding.Name, Namespace: binding.Namespace}, &updateBinding)).To(Succeed())
updatedBinding.Spec.RoutingKey = "new-key"
Expect(k8sClient.Update(ctx, &updatedBinding).Error()).To(ContainSubstring("spec: Forbidden: binding.spec is immutable"))
Expect(k8sClient.Update(ctx, &updatedBinding).Error()).To(ContainSubstring("invalid: spec.routingKey: Invalid value: \"new-key\": routingKey cannot be updated"))

By("deleting binding from rabbitmq server")
Expect(k8sClient.Delete(ctx, binding)).To(Succeed())
Eventually(func() int {
var err error
bindings, err := rabbitClient.ListQueueBindingsBetween(binding.Spec.Vhost, binding.Spec.Source, binding.Spec.Destination)
Expect(err).NotTo(HaveOccurred())
return len(bindings)
}, 10, 2).Should(Equal(0), "cannot find created binding")
})
})

0 comments on commit 84c0473

Please sign in to comment.