Skip to content

Commit

Permalink
Merge pull request #82 from rabbitmq/binding-delete
Browse files Browse the repository at this point in the history
Support bindings.rabbitmq.com deletion
  • Loading branch information
ChunyiLyu committed Mar 24, 2021
2 parents bf2e20f + d6910e8 commit 9b10d40
Show file tree
Hide file tree
Showing 13 changed files with 325 additions and 95 deletions.
64 changes: 58 additions & 6 deletions api/v1alpha1/binding_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/validation/field"
"reflect"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/webhook"
)
Expand All @@ -31,13 +32,64 @@ func (b *Binding) ValidateUpdate(old runtime.Object) error {
return apierrors.NewBadRequest(fmt.Sprintf("expected a binding but got a %T", old))
}

if oldBinding.Spec != b.Spec {
return apierrors.NewForbidden(
b.GroupResource(),
b.Name,
field.Forbidden(field.NewPath("spec"), "binding.spec is immutable"))
var allErrs field.ErrorList
detailMsg := "updates on vhost and rabbitmqClusterReference are all forbidden"

if b.Spec.Vhost != oldBinding.Spec.Vhost {
return apierrors.NewForbidden(b.GroupResource(), b.Name,
field.Forbidden(field.NewPath("spec", "vhost"), detailMsg))
}
return nil

if b.Spec.RabbitmqClusterReference != oldBinding.Spec.RabbitmqClusterReference {
return apierrors.NewForbidden(b.GroupResource(), b.Name,
field.Forbidden(field.NewPath("spec", "rabbitmqClusterReference"), detailMsg))
}

if b.Spec.Source != oldBinding.Spec.Source {
allErrs = append(allErrs, field.Invalid(
field.NewPath("spec", "source"),
b.Spec.Source,
"source cannot be updated",
))
}

if b.Spec.Destination != oldBinding.Spec.Destination {
allErrs = append(allErrs, field.Invalid(
field.NewPath("spec", "destination"),
b.Spec.Destination,
"destination cannot be updated",
))
}

if b.Spec.DestinationType != oldBinding.Spec.DestinationType {
allErrs = append(allErrs, field.Invalid(
field.NewPath("spec", "destinationType"),
b.Spec.DestinationType,
"destinationType cannot be updated",
))
}

if b.Spec.RoutingKey != oldBinding.Spec.RoutingKey {
allErrs = append(allErrs, field.Invalid(
field.NewPath("spec", "routingKey"),
b.Spec.RoutingKey,
"routingKey cannot be updated",
))
}

if !reflect.DeepEqual(b.Spec.Arguments, oldBinding.Spec.Arguments) {
allErrs = append(allErrs, field.Invalid(
field.NewPath("spec", "arguments"),
b.Spec.Arguments,
"arguments cannot be updated",
))
}

if len(allErrs) == 0 {
return nil
}

return apierrors.NewInvalid(GroupVersion.WithKind("Binding").GroupKind(), b.Name, allErrs)
}

// no validation logic on delete
Expand Down
28 changes: 14 additions & 14 deletions api/v1alpha1/binding_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,42 +32,42 @@ var _ = Describe("Binding webhook", func() {
Expect(apierrors.IsForbidden(newBinding.ValidateUpdate(&oldBinding))).To(BeTrue())
})

It("does not allow updates on RabbitmqClusterReference", func() {
newBinding := oldBinding.DeepCopy()
newBinding.Spec.RabbitmqClusterReference = RabbitmqClusterReference{
Name: "new-cluster",
Namespace: "default",
}
Expect(apierrors.IsForbidden(newBinding.ValidateUpdate(&oldBinding))).To(BeTrue())
})

It("does not allow updates on source", func() {
newBinding := oldBinding.DeepCopy()
newBinding.Spec.Source = "updated-source"
Expect(apierrors.IsForbidden(newBinding.ValidateUpdate(&oldBinding))).To(BeTrue())
Expect(apierrors.IsInvalid(newBinding.ValidateUpdate(&oldBinding))).To(BeTrue())
})

It("does not allow updates on destination", func() {
newBinding := oldBinding.DeepCopy()
newBinding.Spec.Destination = "updated-des"
Expect(apierrors.IsForbidden(newBinding.ValidateUpdate(&oldBinding))).To(BeTrue())
Expect(apierrors.IsInvalid(newBinding.ValidateUpdate(&oldBinding))).To(BeTrue())
})

It("does not allow updates on destination type", func() {
newBinding := oldBinding.DeepCopy()
newBinding.Spec.DestinationType = "exchange"
Expect(apierrors.IsForbidden(newBinding.ValidateUpdate(&oldBinding))).To(BeTrue())
Expect(apierrors.IsInvalid(newBinding.ValidateUpdate(&oldBinding))).To(BeTrue())
})

It("does not allow updates on routing key", func() {
newBinding := oldBinding.DeepCopy()
newBinding.Spec.RoutingKey = "not-allowed"
Expect(apierrors.IsForbidden(newBinding.ValidateUpdate(&oldBinding))).To(BeTrue())
Expect(apierrors.IsInvalid(newBinding.ValidateUpdate(&oldBinding))).To(BeTrue())
})

It("does not allow updates on binding arguments", func() {
newBinding := oldBinding.DeepCopy()
newBinding.Spec.Arguments = &runtime.RawExtension{Raw: []byte(`{"new":"new-value"}`)}
Expect(apierrors.IsForbidden(newBinding.ValidateUpdate(&oldBinding))).To(BeTrue())
})

It("does not allow updates on RabbitmqClusterReference", func() {
newBinding := oldBinding.DeepCopy()
newBinding.Spec.RabbitmqClusterReference = RabbitmqClusterReference{
Name: "new-cluster",
Namespace: "default",
}
Expect(apierrors.IsForbidden(newBinding.ValidateUpdate(&oldBinding))).To(BeTrue())
Expect(apierrors.IsInvalid(newBinding.ValidateUpdate(&oldBinding))).To(BeTrue())
})
})
114 changes: 113 additions & 1 deletion controllers/binding_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,16 @@ package controllers
import (
"context"
"encoding/json"
"errors"
rabbithole "github.com/michaelklishin/rabbit-hole/v2"
"github.com/rabbitmq/messaging-topology-operator/internal"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/record"
clientretry "k8s.io/client-go/util/retry"
"reflect"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"time"

"github.com/go-logr/logr"
"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -27,6 +31,8 @@ import (
topologyv1alpha1 "github.com/rabbitmq/messaging-topology-operator/api/v1alpha1"
)

const bindingFinalizer = "deletion.finalizers.bindings.rabbitmq.com"

// BindingReconciler reconciles a Binding object
type BindingReconciler struct {
client.Client
Expand All @@ -47,11 +53,23 @@ func (r *BindingReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
}

rabbitClient, err := rabbitholeClient(ctx, r.Client, binding.Spec.RabbitmqClusterReference)
if err != nil {

if errors.Is(err, NoSuchRabbitmqClusterError) && binding.ObjectMeta.DeletionTimestamp.IsZero() {
logger.Info("Could not generate rabbitClient for non existent cluster: " + err.Error())
return reconcile.Result{RequeueAfter: 10 * time.Second}, err
} else if err != nil && !errors.Is(err, NoSuchRabbitmqClusterError) {
logger.Error(err, failedGenerateRabbitClient)
return reconcile.Result{}, err
}

if !binding.ObjectMeta.DeletionTimestamp.IsZero() {
logger.Info("Deleting")
return ctrl.Result{}, r.deleteBinding(ctx, rabbitClient, binding)
}

if err := r.addFinalizerIfNeeded(ctx, binding); err != nil {
return ctrl.Result{}, err
}
spec, err := json.Marshal(binding.Spec)
if err != nil {
logger.Error(err, failedMarshalSpec)
Expand Down Expand Up @@ -106,6 +124,100 @@ func (r *BindingReconciler) declareBinding(ctx context.Context, client *rabbitho
return nil
}

// deletes binding from rabbitmq server; bindings have no name; server needs BindingInfo to delete them
// when server responds with '404' Not Found, it logs and does not requeue on error
// if no binding argument is set, generating properties key by using internal.GeneratePropertiesKey
// if binding arguments are set, list all bindings between source/destination to find the binding; if it failed to find corresponding binding, it assumes that the binding is already deleted and returns no error
func (r *BindingReconciler) deleteBinding(ctx context.Context, client *rabbithole.Client, binding *topologyv1alpha1.Binding) error {
logger := ctrl.LoggerFrom(ctx)

var info *rabbithole.BindingInfo
var err error
if binding.Spec.Arguments != nil {
info, err = r.findBindingInfo(logger, binding, client)
if err != nil {
return err
}
if info == nil {
logger.Info("cannot find the corresponding binding info in rabbitmq server; binding already deleted")
return r.removeFinalizer(ctx, binding)
}
} else {
info, err = internal.GenerateBindingInfo(binding)
if err != nil {
msg := "failed to generate binding info"
r.Recorder.Event(binding, corev1.EventTypeWarning, "FailedDelete", msg)
logger.Error(err, msg)
return err
}
info.PropertiesKey = internal.GeneratePropertiesKey(binding)
}

err = validateResponseForDeletion(client.DeleteBinding(binding.Spec.Vhost, *info))
if errors.Is(err, NotFound) {
logger.Info("cannot find binding in rabbitmq server; already deleted")
} else if err != nil {
msg := "failed to delete binding"
r.Recorder.Event(binding, corev1.EventTypeWarning, "FailedDelete", msg)
logger.Error(err, msg)
return err
}

logger.Info("Successfully deleted binding")
return r.removeFinalizer(ctx, binding)
}

func (r *BindingReconciler) findBindingInfo(logger logr.Logger, binding *topologyv1alpha1.Binding, client *rabbithole.Client) (*rabbithole.BindingInfo, error) {
logger.Info("binding arguments set; listing bindings from server to complete deletion")
arguments := make(map[string]interface{})
if binding.Spec.Arguments != nil {
if err := json.Unmarshal(binding.Spec.Arguments.Raw, &arguments); err != nil {
msg := "failed to unmarshall binding arguments"
r.Recorder.Event(binding, corev1.EventTypeWarning, "FailedDelete", msg)
logger.Error(err, msg)
return nil, err
}
}
var bindingInfos []rabbithole.BindingInfo
var err error
if binding.Spec.DestinationType == "queue" {
bindingInfos, err = client.ListQueueBindingsBetween(binding.Spec.Vhost, binding.Spec.Source, binding.Spec.Destination)
} else {
bindingInfos, err = client.ListExchangeBindingsBetween(binding.Spec.Vhost, binding.Spec.Source, binding.Spec.Destination)
}
if err != nil {
msg := "failed to list binding infos"
r.Recorder.Event(binding, corev1.EventTypeWarning, "FailedDelete", msg)
logger.Error(err, msg)
return nil, err
}
var info *rabbithole.BindingInfo
for _, b := range bindingInfos {
if binding.Spec.RoutingKey == b.RoutingKey && reflect.DeepEqual(b.Arguments, arguments) {
info = &b
}
}
return info, nil
}

func (r *BindingReconciler) removeFinalizer(ctx context.Context, binding *topologyv1alpha1.Binding) error {
controllerutil.RemoveFinalizer(binding, bindingFinalizer)
if err := r.Client.Update(ctx, binding); err != nil {
return err
}
return nil
}

func (r *BindingReconciler) addFinalizerIfNeeded(ctx context.Context, binding *topologyv1alpha1.Binding) error {
if binding.ObjectMeta.DeletionTimestamp.IsZero() && !controllerutil.ContainsFinalizer(binding, bindingFinalizer) {
controllerutil.AddFinalizer(binding, bindingFinalizer)
if err := r.Client.Update(ctx, binding); err != nil {
return err
}
}
return nil
}

func (r *BindingReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&topologyv1alpha1.Binding{}).
Expand Down
21 changes: 21 additions & 0 deletions internal/binding_info.go → internal/binding.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"fmt"
rabbithole "github.com/michaelklishin/rabbit-hole/v2"
topologyv1alpha1 "github.com/rabbitmq/messaging-topology-operator/api/v1alpha1"
"strings"
)

func GenerateBindingInfo(binding *topologyv1alpha1.Binding) (*rabbithole.BindingInfo, error) {
Expand All @@ -33,3 +34,23 @@ func GenerateBindingInfo(binding *topologyv1alpha1.Binding) (*rabbithole.Binding
Arguments: arguments,
}, nil
}

// Generate binding properties key which is necessary when deleting a binding
// Binding properties key is:
// when routing key and argument are not provided, properties key is "~"
// when routing key is set and no argument is provided, properties key is the routing key itself
// if routing key has character '~', it's replaced by '%7E'
// when arguments are provided, properties key is the routing key (could be empty) plus the hash of arguments
// the hash function used is 'erlang:phash2' and it's erlang specific; GeneratePropertiesKey returns empty
// string if arguments are provided (deletion not supported)

func GeneratePropertiesKey(binding *topologyv1alpha1.Binding) string {
if binding.Spec.RoutingKey == "" {
return "~"
}
if binding.Spec.Arguments == nil {
return strings.ReplaceAll(binding.Spec.RoutingKey, "~", "%7E")
}

return ""
}
72 changes: 0 additions & 72 deletions internal/binding_info_test.go

This file was deleted.

0 comments on commit 9b10d40

Please sign in to comment.