Skip to content

Commit

Permalink
Merge pull request #144 from rabbitmq/federation-uri-secret
Browse files Browse the repository at this point in the history
Refactor federation spec.uri to spec.uriSecret
  • Loading branch information
ChunyiLyu committed May 27, 2021
2 parents 282ba99 + 499827a commit 19ce30d
Show file tree
Hide file tree
Showing 12 changed files with 121 additions and 43 deletions.
8 changes: 5 additions & 3 deletions api/v1beta1/federation_types.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package v1beta1

import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
)
Expand All @@ -18,11 +19,12 @@ type FederationSpec struct {
// Required property.
// +kubebuilder:validation:Required
RabbitmqClusterReference RabbitmqClusterReference `json:"rabbitmqClusterReference"`
// The AMQP URI(s) for the upstream.
// Secret contains the AMQP URI(s) for the upstream.
// The Secret must contain the key `uri` or operator will error.
// Required property.
// +kubebuilder:validation:Required
Uri string `json:"uri"`
PrefetchCount int `json:"prefetch-count,omitempty"`
UriSecret *corev1.LocalObjectReference `json:"uriSecret"`
PrefetchCount int `json:"prefetch-count,omitempty"`
// +kubebuilder:validation:Enum=on-confirm;on-publish;no-ack
AckMode string `json:"ackMode,omitempty"`
Expires int `json:"expires,omitempty"`
Expand Down
25 changes: 17 additions & 8 deletions api/v1beta1/federation_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
)
Expand All @@ -21,7 +22,9 @@ var _ = Describe("Federation spec", func() {
RabbitmqClusterReference: RabbitmqClusterReference{
Name: "some-cluster",
},
Uri: "a-rabbit-connection-uri",
UriSecret: &corev1.LocalObjectReference{
Name: "a-secret",
},
}

federation := Federation{
Expand All @@ -31,7 +34,9 @@ var _ = Describe("Federation spec", func() {
},
Spec: FederationSpec{
Name: "test-federation",
Uri: "a-rabbit-connection-uri",
UriSecret: &corev1.LocalObjectReference{
Name: "a-secret",
},
RabbitmqClusterReference: RabbitmqClusterReference{
Name: "some-cluster",
},
Expand All @@ -53,9 +58,11 @@ var _ = Describe("Federation spec", func() {
Namespace: namespace,
},
Spec: FederationSpec{
Name: "configured-federation",
Vhost: "/hello",
Uri: "a-rabbit-connection-uri",
Name: "configured-federation",
Vhost: "/hello",
UriSecret: &corev1.LocalObjectReference{
Name: "a-secret",
},
Expires: 1000,
MessageTTL: 1000,
MaxHops: 100,
Expand Down Expand Up @@ -83,7 +90,7 @@ var _ = Describe("Federation spec", func() {
Name: "some-cluster",
}))

Expect(fetched.Spec.Uri).To(Equal("a-rabbit-connection-uri"))
Expect(fetched.Spec.UriSecret.Name).To(Equal("a-secret"))
Expect(fetched.Spec.AckMode).To(Equal("no-ack"))
Expect(fetched.Spec.Exchange).To(Equal("an-exchange"))
Expect(fetched.Spec.Expires).To(Equal(1000))
Expand All @@ -101,8 +108,10 @@ var _ = Describe("Federation spec", func() {
Namespace: namespace,
},
Spec: FederationSpec{
Name: "test-federation",
Uri: "a-rabbit-connection-uri",
Name: "test-federation",
UriSecret: &corev1.LocalObjectReference{
Name: "a-secret",
},
AckMode: "non-existing-ackmode",
RabbitmqClusterReference: RabbitmqClusterReference{
Name: "some-cluster",
Expand Down
13 changes: 8 additions & 5 deletions api/v1beta1/federation_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package v1beta1
import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
Expand All @@ -13,9 +14,11 @@ var _ = Describe("federation webhook", func() {
Name: "test",
},
Spec: FederationSpec{
Name: "test-upstream",
Vhost: "/a-vhost",
Uri: "a-rabbit-connection-uri",
Name: "test-upstream",
Vhost: "/a-vhost",
UriSecret: &corev1.LocalObjectReference{
Name: "a-secret",
},
Expires: 1000,
MessageTTL: 1000,
MaxHops: 100,
Expand Down Expand Up @@ -50,9 +53,9 @@ var _ = Describe("federation webhook", func() {
Expect(apierrors.IsForbidden(newFederation.ValidateUpdate(&federation))).To(BeTrue())
})

It("allows updates on federation.spec.uri", func() {
It("allows updates on federation.spec.uriSecret", func() {
newFederation := federation.DeepCopy()
newFederation.Spec.Uri = "amqps://new-uri"
newFederation.Spec.UriSecret = &corev1.LocalObjectReference{Name: "a-new-secret"}
Expect(newFederation.ValidateUpdate(&federation)).To(Succeed())
})

Expand Down
7 changes: 6 additions & 1 deletion api/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 11 additions & 4 deletions config/crd/bases/rabbitmq.com_federations.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -74,17 +74,24 @@ spec:
type: integer
trustUserId:
type: boolean
uri:
description: The AMQP URI(s) for the upstream. Required property.
type: string
uriSecret:
description: Secret contains the AMQP URI(s) for the upstream. The
Secret must contain the key `uri` or operator will error. Required
property.
properties:
name:
description: 'Name of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names
TODO: Add other useful fields. apiVersion, kind, uid?'
type: string
type: object
vhost:
default: /
description: Default to vhost '/'; cannot be updated
type: string
required:
- name
- rabbitmqClusterReference
- uri
- uriSecret
type: object
status:
description: FederationStatus defines the observed state of Federation
Expand Down
28 changes: 27 additions & 1 deletion controllers/federation_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ import (
"context"
"encoding/json"
"errors"
"fmt"
"github.com/rabbitmq/messaging-topology-operator/internal"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
clientretry "k8s.io/client-go/util/retry"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
Expand Down Expand Up @@ -113,7 +115,15 @@ func (r *FederationReconciler) Reconcile(ctx context.Context, req ctrl.Request)
func (r *FederationReconciler) setFederation(ctx context.Context, client internal.RabbitMQClient, federation *topology.Federation) error {
logger := ctrl.LoggerFrom(ctx)

if err := validateResponse(client.PutFederationUpstream(federation.Spec.Vhost, federation.Spec.Name, internal.GenerateFederationDefinition(federation))); err != nil {
uri, err := r.getUri(ctx, federation)
if err != nil {
msg := "failed to parse federation uri secret"
r.Recorder.Event(federation, corev1.EventTypeWarning, "FailedUpdate", msg)
logger.Error(err, msg, "uri secret", federation.Spec.UriSecret.Name)
return err
}

if err := validateResponse(client.PutFederationUpstream(federation.Spec.Vhost, federation.Spec.Name, internal.GenerateFederationDefinition(federation, uri))); err != nil {
msg := "failed to set federation upstream parameter"
r.Recorder.Event(federation, corev1.EventTypeWarning, "FailedUpdate", msg)
logger.Error(err, msg, "federation", federation.Spec.Name)
Expand All @@ -124,6 +134,22 @@ func (r *FederationReconciler) setFederation(ctx context.Context, client interna
r.Recorder.Event(federation, corev1.EventTypeNormal, "SuccessfulUpdate", "Successfully set federation Upstream parameter")
return nil
}
func (r *FederationReconciler) getUri(ctx context.Context, federation *topology.Federation) (string, error) {
if federation.Spec.UriSecret == nil {
return "", fmt.Errorf("no uri secret provided")
}
secret := &corev1.Secret{}
if err := r.Get(ctx, types.NamespacedName{Name: federation.Spec.UriSecret.Name, Namespace: federation.Namespace}, secret); err != nil {
return "", err
}

uri, ok := secret.Data["uri"]
if !ok {
return "", fmt.Errorf("could not find key 'uri' in secret %s", secret.Name)
}

return string(uri), nil
}

// addFinalizerIfNeeded adds a deletion finalizer if the Federation does not have one yet and is not marked for deletion
func (r *FederationReconciler) addFinalizerIfNeeded(ctx context.Context, e *topology.Federation) error {
Expand Down
5 changes: 3 additions & 2 deletions controllers/federation_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ var _ = Describe("federation-controller", func() {
Namespace: "default",
},
Spec: topology.FederationSpec{
Name: "my-federation-upstream",
Vhost: "/test",
Name: "my-federation-upstream",
Vhost: "/test",
UriSecret: &corev1.LocalObjectReference{Name: "federation-uri"},
RabbitmqClusterReference: topology.RabbitmqClusterReference{
Name: "example-rabbit",
},
Expand Down
13 changes: 13 additions & 0 deletions controllers/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,19 @@ var _ = BeforeSuite(func() {
},
}
Expect(client.Create(ctx, &secret)).To(Succeed())

// used in federation-controller test
federationUri := corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "federation-uri",
Namespace: "default",
},
Type: corev1.SecretTypeOpaque,
Data: map[string][]byte{
"uri": []byte("amqp://rabbit@rabbit:a-rabbitmq-uri.test.com"),
},
}
Expect(client.Create(ctx, &federationUri)).To(Succeed())
})

var _ = BeforeEach(func() {
Expand Down
2 changes: 1 addition & 1 deletion docs/api/rabbitmq.com.ref.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ FederationSpec defines the desired state of Federation For how to configure fede
| *`name`* __string__ | Required property; cannot be updated
| *`vhost`* __string__ | Default to vhost '/'; cannot be updated
| *`rabbitmqClusterReference`* __xref:{anchor_prefix}-github-com-rabbitmq-messaging-topology-operator-api-v1beta1-rabbitmqclusterreference[$$RabbitmqClusterReference$$]__ | Reference to the RabbitmqCluster that the exchange will be created in. Required property.
| *`uri`* __string__ | The AMQP URI(s) for the upstream. Required property.
| *`uriSecret`* __link:https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.20/#localobjectreference-v1-core[$$LocalObjectReference$$]__ | Secret contains the AMQP URI(s) for the upstream. The Secret must contain the key `uri` or operator will error. Required property.
| *`prefetch-count`* __integer__ |
| *`ackMode`* __string__ |
| *`expires`* __integer__ |
Expand Down
4 changes: 2 additions & 2 deletions internal/federation_definition.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ import (
topology "github.com/rabbitmq/messaging-topology-operator/api/v1beta1"
)

func GenerateFederationDefinition(f *topology.Federation) rabbithole.FederationDefinition {
func GenerateFederationDefinition(f *topology.Federation, uri string) rabbithole.FederationDefinition {
return rabbithole.FederationDefinition{
Uri: f.Spec.Uri,
Uri: uri,
Expires: f.Spec.Expires,
MessageTTL: int32(f.Spec.MessageTTL),
MaxHops: f.Spec.MaxHops,
Expand Down
21 changes: 10 additions & 11 deletions internal/federation_definition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,62 +23,61 @@ var _ = Describe("GenerationFederationDefinition", func() {
})

It("sets 'uri' correctly", func() {
f.Spec.Uri = "a-rabbitmq-uri@test.com"
definition := GenerateFederationDefinition(f)
definition := GenerateFederationDefinition(f, "a-rabbitmq-uri@test.com")
Expect(definition.Uri).To(Equal("a-rabbitmq-uri@test.com"))
})

It("sets 'PrefetchCount' correctly", func() {
f.Spec.PrefetchCount = 200
definition := GenerateFederationDefinition(f)
definition := GenerateFederationDefinition(f, "")
Expect(definition.PrefetchCount).To(Equal(200))
})

It("sets 'AckMode' correctly", func() {
f.Spec.AckMode = "no-ack"
definition := GenerateFederationDefinition(f)
definition := GenerateFederationDefinition(f, "")
Expect(definition.AckMode).To(Equal("no-ack"))
})

It("sets 'Expires' correctly", func() {
f.Spec.Expires = 100000
definition := GenerateFederationDefinition(f)
definition := GenerateFederationDefinition(f, "")
Expect(definition.Expires).To(Equal(100000))
})

It("sets 'MessageTTL' correctly", func() {
f.Spec.MessageTTL = 300
definition := GenerateFederationDefinition(f)
definition := GenerateFederationDefinition(f, "")
Expect(definition.MessageTTL).To(Equal(int32(300)))
})

It("sets 'MaxHops' correctly", func() {
f.Spec.MaxHops = 5
definition := GenerateFederationDefinition(f)
definition := GenerateFederationDefinition(f, "")
Expect(definition.MaxHops).To(Equal(5))
})

It("sets 'ReconnectDelay' correctly", func() {
f.Spec.ReconnectDelay = 100
definition := GenerateFederationDefinition(f)
definition := GenerateFederationDefinition(f, "")
Expect(definition.ReconnectDelay).To(Equal(100))
})

It("sets 'TrustUserId' correctly", func() {
f.Spec.TrustUserId = false
definition := GenerateFederationDefinition(f)
definition := GenerateFederationDefinition(f, "")
Expect(definition.TrustUserId).To(BeFalse())
})

It("sets 'Exchange' correctly", func() {
f.Spec.Exchange = "an-exchange"
definition := GenerateFederationDefinition(f)
definition := GenerateFederationDefinition(f, "")
Expect(definition.Exchange).To(Equal("an-exchange"))
})

It("sets 'Queue' correctly", func() {
f.Spec.Queue = "a-great-queue"
definition := GenerateFederationDefinition(f)
definition := GenerateFederationDefinition(f, "")
Expect(definition.Queue).To(Equal("a-great-queue"))
})
})
23 changes: 18 additions & 5 deletions system_tests/federation_system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,33 @@ import (

var _ = Describe("federation", func() {
var (
namespace = MustHaveEnv("NAMESPACE")
ctx = context.Background()
federation = &topology.Federation{}
namespace = MustHaveEnv("NAMESPACE")
ctx = context.Background()
federation = &topology.Federation{}
federationUri = "amqp://server-name-my-upstream-test-uri"
)

BeforeEach(func() {
federationUriSecret := corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "federation-uri",
Namespace: namespace,
},
Type: corev1.SecretTypeOpaque,
Data: map[string][]byte{
"uri": []byte(federationUri),
},
}
Expect(k8sClient.Create(ctx, &federationUriSecret)).To(Succeed())

federation = &topology.Federation{
ObjectMeta: metav1.ObjectMeta{
Name: "federation",
Namespace: namespace,
},
Spec: topology.FederationSpec{
Name: "my-upstream",
Uri: "amqp://server-name-my-upstream-test-uri",
UriSecret: &corev1.LocalObjectReference{Name: "federation-uri"},
MessageTTL: 3000,
Queue: "a-queue",
AckMode: "on-publish",
Expand All @@ -51,7 +64,7 @@ var _ = Describe("federation", func() {

Expect(upstream.Name).To(Equal(federation.Spec.Name))
Expect(upstream.Vhost).To(Equal(federation.Spec.Vhost))
Expect(upstream.Definition.Uri).To(Equal(federation.Spec.Uri))
Expect(upstream.Definition.Uri).To(Equal(federationUri))
Expect(upstream.Definition.Queue).To(Equal(federation.Spec.Queue))
Expect(upstream.Definition.MessageTTL).To(Equal(int32(federation.Spec.MessageTTL)))
Expect(upstream.Definition.AckMode).To(Equal(federation.Spec.AckMode))
Expand Down

0 comments on commit 19ce30d

Please sign in to comment.