Skip to content

Commit

Permalink
Broker / Trigger can use RabbitmqBrokerConfig or a RabbitMQCluster as…
Browse files Browse the repository at this point in the history
… its config (#780)

* Broker / Trigger can use RabbitmqBrokerConfig or a RabbitMQCluster as its config

- adds brokerConfig service to dedupe between Trigger and Broker

* codestyle fixes
  • Loading branch information
gab-satchi committed May 24, 2022
1 parent d52104c commit fb7c6d1
Show file tree
Hide file tree
Showing 13 changed files with 1,982 additions and 1,888 deletions.
12 changes: 12 additions & 0 deletions config/broker/200-rabbitmq-broker-clusterrole.yaml
Expand Up @@ -61,3 +61,15 @@ rules:
- "delete"
- "patch"
- "watch"
- apiGroups:
- eventing.knative.dev
resources:
- rabbitmqbrokerconfigs
verbs:
- "get"
- "list"
- "create"
- "update"
- "delete"
- "patch"
- "watch"
5 changes: 4 additions & 1 deletion pkg/apis/eventing/v1alpha1/broker_validation.go
Expand Up @@ -100,8 +100,11 @@ func (b *RabbitBroker) Validate(ctx context.Context) *apis.FieldError {

switch gvk {
case "RabbitmqCluster.rabbitmq.com/v1beta1":
case "RabbitmqBrokerConfig.eventing.knative.dev/v1alpha1":
default:
errs = errs.Also(apis.ErrGeneric("Configuration not supported, only [kind: RabbitmqCluster, apiVersion: rabbitmq.com/v1beta1]")).ViaField("spec").ViaField("config")
errs = errs.Also(apis.ErrGeneric("Configuration not supported, only [kind: RabbitmqCluster, apiVersion: rabbitmq.com/v1beta1] or [kind: RabbitmqBrokerConfig, apiVersion: eventing.knative.dev/v1alpha1]")).
ViaField("spec").
ViaField("config")
}
}
if errs.Error() == "" {
Expand Down
17 changes: 16 additions & 1 deletion pkg/apis/eventing/v1alpha1/broker_validation_test.go
Expand Up @@ -185,7 +185,7 @@ func TestValidate(t *testing.T) {
},
},
}},
want: apis.ErrGeneric("Configuration not supported, only [kind: RabbitmqCluster, apiVersion: rabbitmq.com/v1beta1]").ViaField("spec").ViaField("config"),
want: apis.ErrGeneric("Configuration not supported, only [kind: RabbitmqCluster, apiVersion: rabbitmq.com/v1beta1] or [kind: RabbitmqBrokerConfig, apiVersion: eventing.knative.dev/v1alpha1]").ViaField("spec").ViaField("config"),
}, {
name: "invalid config, no namespace",
b: RabbitBroker{eventingv1.Broker{
Expand Down Expand Up @@ -261,6 +261,21 @@ func TestValidate(t *testing.T) {
},
},
}},
}, {
name: "valid config, rabbitmqBrokerConfig",
b: RabbitBroker{eventingv1.Broker{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{"eventing.knative.dev/broker.class": "RabbitMQBroker"},
},
Spec: eventingv1.BrokerSpec{
Config: &duckv1.KReference{
Namespace: "othernamespace",
Name: "name",
Kind: "RabbitmqBrokerConfig",
APIVersion: "eventing.knative.dev/v1alpha1",
},
},
}},
}}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
Expand Down
158 changes: 158 additions & 0 deletions pkg/brokerconfig/service.go
@@ -0,0 +1,158 @@
/*
Copyright 2022 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package brokerconfig

import (
"context"
"errors"
"fmt"
"net/url"

"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/kubernetes"
duckv1beta1 "knative.dev/eventing-rabbitmq/pkg/apis/duck/v1beta1"
"knative.dev/eventing-rabbitmq/pkg/rabbit"
eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
duckv1 "knative.dev/pkg/apis/duck/v1"
kubeclient "knative.dev/pkg/client/injection/kube/client"
"knative.dev/pkg/network"

rmqeventingclientset "knative.dev/eventing-rabbitmq/pkg/client/clientset/versioned"
rmqeventingclient "knative.dev/eventing-rabbitmq/pkg/client/injection/client"
rabbitv1 "knative.dev/eventing-rabbitmq/pkg/client/injection/ducks/duck/v1beta1/rabbit"
rabbitv1beta1 "knative.dev/eventing-rabbitmq/third_party/pkg/apis/rabbitmq.com/v1beta1"
apisduck "knative.dev/pkg/apis/duck"
)

func New(ctx context.Context) *BrokerConfigService {
return &BrokerConfigService{
rmqeventingClientSet: rmqeventingclient.Get(ctx),
kubeClientSet: kubeclient.Get(ctx),
rabbitLister: rabbitv1.Get(ctx),
}
}

type BrokerConfigService struct {
rmqeventingClientSet rmqeventingclientset.Interface
kubeClientSet kubernetes.Interface
rabbitLister apisduck.InformerFactory
}

func (r *BrokerConfigService) GetExchangeArgs(ctx context.Context, b *eventingv1.Broker) (*rabbit.ExchangeArgs, error) {
rabbitmqClusterRef, err := r.GetRabbitMQClusterRef(ctx, b)
if err != nil {
return nil, err
}

rabbitmqURL, err := r.RabbitmqURL(ctx, rabbitmqClusterRef)
if err != nil {
return nil, err
}

return &rabbit.ExchangeArgs{
Namespace: b.Namespace,
Broker: b,
RabbitmqClusterReference: rabbitmqClusterRef,
RabbitMQURL: rabbitmqURL,
}, nil
}

func (r *BrokerConfigService) GetRabbitMQClusterRef(ctx context.Context, b *eventingv1.Broker) (*rabbitv1beta1.RabbitmqClusterReference, error) {
if b.Spec.Config == nil {
return nil, errors.New("Broker.Spec.Config is required")
}

if b.Spec.Config.Namespace == "" || b.Spec.Config.Name == "" {
return nil, errors.New("broker.spec.config.[name, namespace] are required")
}

gvk := fmt.Sprintf("%s.%s", b.Spec.Config.Kind, b.Spec.Config.APIVersion)
switch gvk {
case "RabbitmqCluster.rabbitmq.com/v1beta1":
return &rabbitv1beta1.RabbitmqClusterReference{
Name: b.Spec.Config.Name,
Namespace: b.Spec.Config.Namespace,
}, nil
case "RabbitmqBrokerConfig.eventing.knative.dev/v1alpha1":
config, err := r.rmqeventingClientSet.EventingV1alpha1().RabbitmqBrokerConfigs(b.Spec.Config.Namespace).Get(ctx, b.Spec.Config.Name, metav1.GetOptions{})
if err != nil {
return nil, err
}
if config.Spec.RabbitmqClusterReference == nil {
return nil, fmt.Errorf("rabbitmqBrokerConfig %s/%s spec is empty", b.Spec.Config.Namespace, b.Spec.Config.Name)
}
return config.Spec.RabbitmqClusterReference, nil
default:
return nil, errors.New("Broker.Spec.Config configuration not supported, only [kind: RabbitmqCluster, apiVersion: rabbitmq.com/v1beta1]")
}
}

func (r *BrokerConfigService) RabbitmqURL(ctx context.Context, clusterRef *rabbitv1beta1.RabbitmqClusterReference) (*url.URL, error) {
// TODO: make this better.
ref := &duckv1.KReference{
Kind: "RabbitmqCluster",
APIVersion: "rabbitmq.com/v1beta1",
Name: clusterRef.Name,
Namespace: clusterRef.Namespace,
}
gv, err := schema.ParseGroupVersion(ref.APIVersion)
if err != nil {
return nil, err
}
gvk := gv.WithKind(ref.Kind)

gvr, _ := meta.UnsafeGuessKindToResource(gvk)

_, lister, err := r.rabbitLister.Get(ctx, gvr)
if err != nil {
return nil, err
}

o, err := lister.ByNamespace(ref.Namespace).Get(ref.Name)
if err != nil {
return nil, err
}

rab := o.(*duckv1beta1.Rabbit)

if rab.Status.DefaultUser == nil || rab.Status.DefaultUser.SecretReference == nil || rab.Status.DefaultUser.ServiceReference == nil {
return nil, fmt.Errorf("rabbit \"%s/%s\" not ready", ref.Namespace, ref.Name)
}

_ = rab.Status.DefaultUser.SecretReference

s, err := r.kubeClientSet.CoreV1().Secrets(rab.Status.DefaultUser.SecretReference.Namespace).Get(ctx, rab.Status.DefaultUser.SecretReference.Name, metav1.GetOptions{})
if err != nil {
return nil, err
}

password, ok := s.Data[rab.Status.DefaultUser.SecretReference.Keys["password"]]
if !ok {
return nil, fmt.Errorf("rabbit Secret missing key %s", rab.Status.DefaultUser.SecretReference.Keys["password"])
}

username, ok := s.Data[rab.Status.DefaultUser.SecretReference.Keys["username"]]
if !ok {
return nil, fmt.Errorf("rabbit Secret missing key %s", rab.Status.DefaultUser.SecretReference.Keys["username"])
}

host := network.GetServiceHostname(rab.Status.DefaultUser.ServiceReference.Name, rab.Status.DefaultUser.ServiceReference.Namespace)

return url.Parse(fmt.Sprintf("amqp://%s:%s@%s:%d", username, password, host, 5672))
}
76 changes: 28 additions & 48 deletions pkg/reconciler/broker/broker.go
Expand Up @@ -22,16 +22,13 @@ import (

"k8s.io/utils/pointer"

rabbitv1beta1 "knative.dev/eventing-rabbitmq/third_party/pkg/apis/rabbitmq.com/v1beta1"

"go.uber.org/zap"
v1 "k8s.io/api/apps/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
apierrs "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
appsv1listers "k8s.io/client-go/listers/apps/v1"
corev1listers "k8s.io/client-go/listers/core/v1"
Expand All @@ -40,6 +37,7 @@ import (
"knative.dev/pkg/logging"
"knative.dev/pkg/network"

"knative.dev/eventing-rabbitmq/pkg/brokerconfig"
"knative.dev/eventing-rabbitmq/pkg/rabbit"
naming "knative.dev/eventing-rabbitmq/pkg/rabbitmqnaming"
"knative.dev/eventing-rabbitmq/pkg/reconciler/broker/resources"
Expand All @@ -59,7 +57,6 @@ import (

type Reconciler struct {
eventingClientSet clientset.Interface
dynamicClientSet dynamic.Interface
kubeClientSet kubernetes.Interface
rabbitClientSet rabbitclientset.Interface

Expand Down Expand Up @@ -89,7 +86,8 @@ type Reconciler struct {

rabbit rabbit.Service
// config accessor for observability/logging/tracing
configs reconcilersource.ConfigAccessor
configs reconcilersource.ConfigAccessor
brokerConfig *brokerconfig.BrokerConfigService
}

// Check that our Reconciler implements Interface
Expand Down Expand Up @@ -119,31 +117,18 @@ var rabbitBrokerCondSet = apis.NewLivingConditionSet(
BrokerConditionAddressable,
)

// isUsingOperator checks the Spec for a Broker and determines if we should be using the
// messaging-topology-operator or the libraries.
func isUsingOperator(b *eventingv1.Broker) bool {
if b != nil && b.Spec.Config != nil {
return b.Spec.Config.Kind == "RabbitmqCluster"
}
return false
}

func (r *Reconciler) ReconcileKind(ctx context.Context, b *eventingv1.Broker) pkgreconciler.Event {
logging.FromContext(ctx).Infow("Reconciling", zap.Any("Broker", b))

// 1. RabbitMQ Exchange
// 2. Ingress Deployment
// 3. K8s Service that points to the Ingress Deployment
args, err := r.getExchangeArgs(ctx, b)
args, err := r.brokerConfig.GetExchangeArgs(ctx, b)
if err != nil {
MarkExchangeFailed(&b.Status, "ExchangeCredentialsUnavailable", "Failed to get arguments for creating exchange: %s", err)
return err
}

if !isUsingOperator(b) {
MarkExchangeFailed(&b.Status, "ReconcileFailure", "using secret is not supported with this controller")
return nil
}
return r.reconcileRabbitResources(ctx, b, args)
}

Expand Down Expand Up @@ -308,15 +293,16 @@ func (r *Reconciler) reconcileDeadLetterResources(ctx context.Context, b *eventi
return nil, false
}

clusterRef, err := r.brokerConfig.GetRabbitMQClusterRef(ctx, b)
if err != nil {
return err, false
}
queue, err := r.rabbit.ReconcileQueue(ctx, &rabbit.QueueArgs{
Name: naming.CreateBrokerDeadLetterQueueName(b),
Namespace: b.Namespace,
RabbitmqClusterReference: &rabbitv1beta1.RabbitmqClusterReference{
Name: b.Spec.Config.Name,
Namespace: b.Spec.Config.Namespace,
},
Owner: *kmeta.NewControllerRef(b),
Labels: rabbit.Labels(b, nil, nil),
Name: naming.CreateBrokerDeadLetterQueueName(b),
Namespace: b.Namespace,
RabbitmqClusterReference: clusterRef,
Owner: *kmeta.NewControllerRef(b),
Labels: rabbit.Labels(b, nil, nil),
})
if err != nil {
MarkDLXFailed(&b.Status, "QueueFailure", fmt.Sprintf("Failed to reconcile Dead Letter Queue %q : %s", naming.CreateBrokerDeadLetterQueueName(b), err))
Expand All @@ -331,17 +317,14 @@ func (r *Reconciler) reconcileDeadLetterResources(ctx context.Context, b *eventi

bindingName := naming.CreateBrokerDeadLetterQueueName(b)
binding, err := r.rabbit.ReconcileBinding(ctx, &rabbit.BindingArgs{
Name: bindingName,
Namespace: b.Namespace,
RabbitmqClusterReference: &rabbitv1beta1.RabbitmqClusterReference{
Name: b.Spec.Config.Name,
Namespace: b.Spec.Config.Namespace,
},
Vhost: "/",
Source: naming.BrokerExchangeName(b, true),
Destination: bindingName,
Owner: *kmeta.NewControllerRef(b),
Labels: rabbit.Labels(b, nil, nil),
Name: bindingName,
Namespace: b.Namespace,
RabbitmqClusterReference: clusterRef,
Vhost: "/",
Source: naming.BrokerExchangeName(b, true),
Destination: bindingName,
Owner: *kmeta.NewControllerRef(b),
Labels: rabbit.Labels(b, nil, nil),
Filters: map[string]string{
rabbit.DLQBindingKey: b.Name,
},
Expand All @@ -359,16 +342,13 @@ func (r *Reconciler) reconcileDeadLetterResources(ctx context.Context, b *eventi

policyName := naming.CreateBrokerDeadLetterQueueName(b) // same name as the DL queue
policy, err := r.rabbit.ReconcileBrokerDLXPolicy(ctx, &rabbit.QueueArgs{
Name: policyName,
Namespace: b.Namespace,
RabbitmqClusterReference: &rabbitv1beta1.RabbitmqClusterReference{
Name: b.Spec.Config.Name,
Namespace: b.Spec.Config.Namespace,
},
Owner: *kmeta.NewControllerRef(b),
Labels: rabbit.Labels(b, nil, nil),
DLXName: pointer.String(args.Name),
BrokerUID: string(b.GetUID()),
Name: policyName,
Namespace: b.Namespace,
RabbitmqClusterReference: clusterRef,
Owner: *kmeta.NewControllerRef(b),
Labels: rabbit.Labels(b, nil, nil),
DLXName: pointer.String(args.Name),
BrokerUID: string(b.GetUID()),
})
if err != nil {
MarkDeadLetterSinkFailed(&b.Status, "PolicyFailure", fmt.Sprintf("Failed to reconcile RabbitMQ Policy %q : %s", policyName, err))
Expand Down

0 comments on commit fb7c6d1

Please sign in to comment.