Skip to content

Commit

Permalink
Add shovels.rabbitmq.com
Browse files Browse the repository at this point in the history
  • Loading branch information
ChunyiLyu committed Jun 3, 2021
1 parent b4c5739 commit 56babcc
Show file tree
Hide file tree
Showing 23 changed files with 1,613 additions and 2 deletions.
3 changes: 3 additions & 0 deletions PROJECT
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,7 @@ resources:
- group: rabbitmq.com
kind: Federation
version: v1beta1
- group: rabbitmq.com
kind: Shovel
version: v1beta1
version: "2"
85 changes: 85 additions & 0 deletions api/v1beta1/shovel_types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package v1beta1

import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// ShovelSpec defines the desired state of Shovel
// For how to configure Shovel, see: https://www.rabbitmq.com/shovel.html.
type ShovelSpec struct {
// Required property; cannot be updated
// +kubebuilder:validation:Required
Name string `json:"name"`
// Default to vhost '/'; cannot be updated
// +kubebuilder:default:=/
Vhost string `json:"vhost,omitempty"`
// Reference to the RabbitmqCluster that this Shovel will be created in.
// Required property.
// +kubebuilder:validation:Required
RabbitmqClusterReference RabbitmqClusterReference `json:"rabbitmqClusterReference"`
// Secret contains the AMQP URI(s) to configure Shovel destination and source.
// The Secret must contain the key `destUri` and `srcUri` or operator will error.
// Both fields should be one or multiple uris separated by ','.
// Required property.
// +kubebuilder:validation:Required
UriSecret *corev1.LocalObjectReference `json:"uriSecret"`
// +kubebuilder:validation:Enum=on-confirm;on-publish;no-ack
AckMode string `json:"ackMode,omitempty"`
AddForwardHeaders bool `json:"addForwardHeaders,omitempty"`
DeleteAfter string `json:"deleteAfter,omitempty"`
DestinationAddForwardHeaders bool `json:"destAddForwardHeaders,omitempty"`
DestinationAddTimestampHeader bool `json:"destAddTimestampHeader,omitempty"`
DestinationAddress string `json:"destAddress,omitempty"`
DestinationApplicationProperties string `json:"destApplicationProperties,omitempty"`
DestinationExchange string `json:"destExchange,omitempty"`
DestinationExchangeKey string `json:"destExchangeKey,omitempty"`
DestinationProperties string `json:"destProperties,omitempty"`
DestinationProtocol string `json:"destProtocol,omitempty"`
DestinationPublishProperties string `json:"destPublishProperties,omitempty"`
DestinationQueue string `json:"destQueue,omitempty"`
PrefetchCount int `json:"prefetchCount,omitempty"`
ReconnectDelay int `json:"reconnectDelay,omitempty"`
SourceAddress string `json:"srcAddress,omitempty"`
SourceDeleteAfter string `json:"srcDeleteAfter,omitempty"`
SourceExchange string `json:"srcExchange,omitempty"`
SourceExchangeKey string `json:"srcExchangeKey,omitempty"`
SourcePrefetchCount int `json:"srcPrefetchCount,omitempty"`
SourceProtocol string `json:"srcProtocol,omitempty"`
SourceQueue string `json:"srcQueue,omitempty"`
}

// ShovelStatus defines the observed state of Shovel
type ShovelStatus struct {
// observedGeneration is the most recent successful generation observed for this Shovel. It corresponds to the
// Federation's generation, which is updated on mutation by the API Server.
ObservedGeneration int64 `json:"observedGeneration,omitempty"`
Conditions []Condition `json:"conditions,omitempty"`
}

// +genclient
// +kubebuilder:object:root=true
// +kubebuilder:resource:categories=all
// +kubebuilder:subresource:status

// Shovel is the Schema for the shovels API
type Shovel struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`

Spec ShovelSpec `json:"spec,omitempty"`
Status ShovelStatus `json:"status,omitempty"`
}

// +kubebuilder:object:root=true

// ShovelList contains a list of Shovel
type ShovelList struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata,omitempty"`
Items []Shovel `json:"items"`
}

func init() {
SchemeBuilder.Register(&Shovel{}, &ShovelList{})
}
139 changes: 139 additions & 0 deletions api/v1beta1/shovel_types_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package v1beta1

import (
"context"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
)

var _ = Describe("Shovel spec", func() {
var (
namespace = "default"
ctx = context.Background()
)

It("creates a shovel with minimal configurations", func() {
shovel := Shovel{
ObjectMeta: metav1.ObjectMeta{
Name: "test-shovel",
Namespace: namespace,
},
Spec: ShovelSpec{
Name: "test-shovel",
RabbitmqClusterReference: RabbitmqClusterReference{
Name: "some-cluster",
},
UriSecret: &corev1.LocalObjectReference{
Name: "a-secret",
},
}}
Expect(k8sClient.Create(ctx, &shovel)).To(Succeed())
fetched := &Shovel{}
Expect(k8sClient.Get(ctx, types.NamespacedName{
Name: shovel.Name,
Namespace: shovel.Namespace,
}, fetched)).To(Succeed())
Expect(fetched.Spec.Name).To(Equal("test-shovel"))
Expect(fetched.Spec.Vhost).To(Equal("/"))
Expect(fetched.Spec.RabbitmqClusterReference.Name).To(Equal("some-cluster"))
Expect(fetched.Spec.UriSecret.Name).To(Equal("a-secret"))
})

It("creates shovel with configurations", func() {
shovel := Shovel{
ObjectMeta: metav1.ObjectMeta{
Name: "test-shovel-configurations",
Namespace: namespace,
},
Spec: ShovelSpec{
Name: "test-shovel-configurations",
Vhost: "test-vhost",
RabbitmqClusterReference: RabbitmqClusterReference{
Name: "some-cluster",
},
UriSecret: &corev1.LocalObjectReference{
Name: "a-secret",
},
AckMode: "no-ack",
AddForwardHeaders: true,
DeleteAfter: "never",
DestinationAddForwardHeaders: true,
DestinationAddTimestampHeader: true,
DestinationAddress: "myQueue",
DestinationApplicationProperties: "a-property",
DestinationExchange: "an-exchange",
DestinationExchangeKey: "a-key",
DestinationProperties: "a-property",
DestinationProtocol: "amqp091",
DestinationPublishProperties: "a-property",
DestinationQueue: "a-queue",
PrefetchCount: 10,
ReconnectDelay: 10,
SourceAddress: "myQueue",
SourceDeleteAfter: "never",
SourceExchange: "an-exchange",
SourceExchangeKey: "a-key",
SourcePrefetchCount: 10,
SourceProtocol: "amqp091",
SourceQueue: "a-queue",
}}
Expect(k8sClient.Create(ctx, &shovel)).To(Succeed())
fetched := &Shovel{}
Expect(k8sClient.Get(ctx, types.NamespacedName{
Name: shovel.Name,
Namespace: shovel.Namespace,
}, fetched)).To(Succeed())

Expect(fetched.Spec.Name).To(Equal("test-shovel-configurations"))
Expect(fetched.Spec.Vhost).To(Equal("test-vhost"))
Expect(fetched.Spec.RabbitmqClusterReference.Name).To(Equal("some-cluster"))
Expect(fetched.Spec.UriSecret.Name).To(Equal("a-secret"))
Expect(fetched.Spec.AckMode).To(Equal("no-ack"))
Expect(fetched.Spec.AddForwardHeaders).To(BeTrue())
Expect(fetched.Spec.DeleteAfter).To(Equal("never"))

Expect(fetched.Spec.DestinationAddTimestampHeader).To(BeTrue())
Expect(fetched.Spec.DestinationAddForwardHeaders).To(BeTrue())
Expect(fetched.Spec.DestinationAddress).To(Equal("myQueue"))
Expect(fetched.Spec.DestinationApplicationProperties).To(Equal("a-property"))
Expect(fetched.Spec.DestinationExchange).To(Equal("an-exchange"))
Expect(fetched.Spec.DestinationExchangeKey).To(Equal("a-key"))
Expect(fetched.Spec.DestinationProperties).To(Equal("a-property"))
Expect(fetched.Spec.DestinationQueue).To(Equal("a-queue"))
Expect(fetched.Spec.PrefetchCount).To(Equal(10))
Expect(fetched.Spec.ReconnectDelay).To(Equal(10))

Expect(fetched.Spec.SourceAddress).To(Equal("myQueue"))
Expect(fetched.Spec.SourceDeleteAfter).To(Equal("never"))
Expect(fetched.Spec.SourceExchange).To(Equal("an-exchange"))
Expect(fetched.Spec.SourceExchangeKey).To(Equal("a-key"))
Expect(fetched.Spec.SourcePrefetchCount).To(Equal(10))
Expect(fetched.Spec.SourceProtocol).To(Equal("amqp091"))
Expect(fetched.Spec.SourceQueue).To(Equal("a-queue"))
})

When("creating a shovel with an invalid 'AckMode' value", func() {
It("fails with validation errors", func() {
shovel := Shovel{
ObjectMeta: metav1.ObjectMeta{
Name: "an-invalid-ackmode",
Namespace: namespace,
},
Spec: ShovelSpec{
Name: "an-invalid-ackmode",
RabbitmqClusterReference: RabbitmqClusterReference{
Name: "some-cluster",
},
UriSecret: &corev1.LocalObjectReference{
Name: "a-secret",
},
AckMode: "an-invalid-ackmode",
}}
Expect(k8sClient.Create(ctx, &shovel)).To(HaveOccurred())
Expect(k8sClient.Create(ctx, &shovel)).To(MatchError(`Shovel.rabbitmq.com "an-invalid-ackmode" is invalid: spec.ackMode: Unsupported value: "an-invalid-ackmode": supported values: "on-confirm", "on-publish", "no-ack"`))
})
})
})
102 changes: 102 additions & 0 deletions api/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 56babcc

Please sign in to comment.