Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion api/v1beta1/rabbitmqcluster_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type RabbitmqClusterSecretReference struct {
Name string `json:"name"`
// Namespace of the Secret containing the default user credentials
Namespace string `json:"namespace"`
// Key-value pairs in the Secret corresponding to `username` and `password`
// Key-value pairs in the Secret corresponding to `username`, `password`, `host`, and `port`
Keys map[string]string `json:"keys"`
}

Expand Down
2 changes: 1 addition & 1 deletion config/crd/bases/rabbitmq.com_rabbitmqclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3904,7 +3904,7 @@ spec:
keys:
additionalProperties:
type: string
description: Key-value pairs in the Secret corresponding to `username` and `password`
description: Key-value pairs in the Secret corresponding to `username`, `password`, `host`, and `port`
type: object
name:
description: Name of the Secret containing the default user credentials
Expand Down
14 changes: 14 additions & 0 deletions controllers/rabbitmqcluster_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,20 @@ var _ = Describe("RabbitmqClusterController", func() {
}, 3).Should(HaveKeyWithValue("foo", "bar"))
})

When("the plugin configuration is updated", func() {
It("updates the secret port configuration", func() {
Expect(updateWithRetry(cluster, func(r *rabbitmqv1beta1.RabbitmqCluster) {
r.Spec.Rabbitmq.AdditionalPlugins = []rabbitmqv1beta1.Plugin{"rabbitmq_stream"}
})).To(Succeed())

Eventually(func() map[string][]byte {
secret, err := clientSet.CoreV1().Secrets(cluster.Namespace).Get(ctx, cluster.ChildResourceName("default-user"), metav1.GetOptions{})
Expect(err).NotTo(HaveOccurred())
return secret.Data
}).Should(HaveKeyWithValue("stream-port", []byte("5552")))
})
})

When("instance annotations are updated", func() {
annotationKey := "anno-key"
annotationValue := "anno-value"
Expand Down
2 changes: 1 addition & 1 deletion docs/api/rabbitmq.com.ref.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ Reference to the Kubernetes Secret containing the credentials of the default use
| Field | Description
| *`name`* __string__ | Name of the Secret containing the default user credentials
| *`namespace`* __string__ | Namespace of the Secret containing the default user credentials
| *`keys`* __object (keys:string, values:string)__ | Key-value pairs in the Secret corresponding to `username` and `password`
| *`keys`* __object (keys:string, values:string)__ | Key-value pairs in the Secret corresponding to `username`, `password`, `host`, and `port`
|===


Expand Down
80 changes: 74 additions & 6 deletions internal/resource/default_user_secret.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"gopkg.in/ini.v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/rabbitmq/cluster-operator/api/v1beta1"
)

const (
Expand Down Expand Up @@ -53,22 +55,28 @@ func (builder *DefaultUserSecretBuilder) Build() (client.Object, error) {
return nil, err
}

return &corev1.Secret{
host := fmt.Sprintf("%s.%s.svc.cluster.local", builder.Instance.Name, builder.Instance.Namespace)

// Default user secret implements the service binding Provisioned Service
// See: https://k8s-service-bindings.github.io/spec/#provisioned-service
secret := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: builder.Instance.ChildResourceName(DefaultUserSecretName),
Namespace: builder.Instance.Namespace,
},
Type: corev1.SecretTypeOpaque,
// Default user secret implements the service binding Provisioned Service
// See: https://k8s-service-bindings.github.io/spec/#provisioned-service
Data: map[string][]byte{
"provider": []byte(bindingProvider),
"type": []byte(bindingType),
"username": []byte(username),
"password": []byte(password),
"default_user.conf": defaultUserConf,
"provider": []byte(bindingProvider),
"type": []byte(bindingType),
"host": []byte(host),
},
}, nil
}
builder.updatePorts(secret)

return secret, nil
}

func (builder *DefaultUserSecretBuilder) UpdateMayRequireStsRecreate() bool {
Expand All @@ -79,6 +87,7 @@ func (builder *DefaultUserSecretBuilder) Update(object client.Object) error {
secret := object.(*corev1.Secret)
secret.Labels = metadata.GetLabels(builder.Instance.Name, builder.Instance.Labels)
secret.Annotations = metadata.ReconcileAndFilterAnnotations(secret.GetAnnotations(), builder.Instance.Annotations)
builder.updatePorts(secret)

if err := controllerutil.SetControllerReference(builder.Instance, secret, builder.Scheme); err != nil {
return fmt.Errorf("failed setting controller reference: %v", err)
Expand All @@ -87,6 +96,65 @@ func (builder *DefaultUserSecretBuilder) Update(object client.Object) error {
return nil
}

func (builder *DefaultUserSecretBuilder) updatePorts(secret *corev1.Secret) {
const (
AMQPPort = "5672"
AMQPSPort = "5671"
)
portNames := map[v1beta1.Plugin]string{
"rabbitmq_mqtt": "mqtt-port",
"rabbitmq_stomp": "stomp-port",
"rabbitmq_stream": "stream-port",
"rabbitmq_web_mqtt": "web-mqtt-port",
"rabbitmq_web_stomp": "web-stomp-port",
}
TLSPort := map[string]string{
"mqtt-port": "8883",
"stomp-port": "61614",
"stream-port": "5551",
"web-mqtt-port": "15676",
"web-stomp-port": "15673",
}
port := map[string]string{
"mqtt-port": "1883",
"stomp-port": "61613",
"stream-port": "5552",
"web-mqtt-port": "15675",
"web-stomp-port": "15674",
}

if builder.Instance.Spec.TLS.SecretName != "" {
secret.Data["port"] = []byte(AMQPSPort)

for plugin, portName := range portNames {
if builder.pluginEnabled(plugin) {
secret.Data[portName] = []byte(TLSPort[portName])
} else {
delete(secret.Data, portName)
}
}
} else {
secret.Data["port"] = []byte(AMQPPort)

for plugin, portName := range portNames {
if builder.pluginEnabled(plugin) {
secret.Data[portName] = []byte(port[portName])
} else {
delete(secret.Data, portName)
}
}
}
}

func (builder *DefaultUserSecretBuilder) pluginEnabled(plugin v1beta1.Plugin) bool {
for _, value := range builder.Instance.Spec.Rabbitmq.AdditionalPlugins {
if value == plugin {
return true
}
}
return false
}

func generateDefaultUserConf(username, password string) ([]byte, error) {
ini.PrettySection = false // Remove trailing new line because default_user.conf has only a default section.
cfg, err := ini.Load([]byte{})
Expand Down
153 changes: 152 additions & 1 deletion internal/resource/default_user_secret_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ var _ = Describe("DefaultUserSecret", func() {
It("creates the necessary default-user secret", func() {
var username []byte
var password []byte
var host []byte
var port []byte
var ok bool

obj, err := defaultUserSecretBuilder.Build()
Expand Down Expand Up @@ -86,6 +88,19 @@ var _ = Describe("DefaultUserSecret", func() {
Expect(len(decodedPassword)).To(Equal(24))
})

By("Setting a host that corresponds to the service address", func() {
host, ok = secret.Data["host"]
Expect(ok).NotTo(BeFalse(), "Failed to find a key \"host\" in the generated Secret")
expectedHost := "a name.a namespace.svc.cluster.local"
Expect(host).To(BeEquivalentTo(expectedHost))
})

By("Setting a port that corresponds to the amqp port", func() {
port, ok = secret.Data["port"]
Expect(ok).NotTo(BeFalse(), "Failed to find a key \"port\" in the generated Secret")
Expect(port).To(BeEquivalentTo("5672"))
})

By("creating a default_user.conf file that contains the correct sysctl config format to be parsed by RabbitMQ", func() {
defaultUserConf, ok := secret.Data["default_user.conf"]
Expect(ok).NotTo(BeFalse(), "Failed to find a key \"default_user.conf\" in the generated Secret")
Expand Down Expand Up @@ -114,6 +129,99 @@ var _ = Describe("DefaultUserSecret", func() {
})
})

Context("when MQTT, STOMP, streams, WebMQTT, and WebSTOMP are enabled", func() {
It("adds the MQTT, STOMP, stream, WebMQTT, and WebSTOMP ports to the user secret", func() {
var port []byte

instance.Spec.Rabbitmq.AdditionalPlugins = []rabbitmqv1beta1.Plugin{
"rabbitmq_mqtt",
"rabbitmq_stomp",
"rabbitmq_stream",
"rabbitmq_web_mqtt",
"rabbitmq_web_stomp",
}

obj, err := defaultUserSecretBuilder.Build()
Expect(err).NotTo(HaveOccurred())
secret = obj.(*corev1.Secret)

port, ok := secret.Data["mqtt-port"]
Expect(ok).NotTo(BeFalse(), "Failed to find key \"mqtt-port\" in the generated Secret")
Expect(port).To(BeEquivalentTo("1883"))

port, ok = secret.Data["stomp-port"]
Expect(ok).NotTo(BeFalse(), "Failed to find key \"stomp-port\" in the generated Secret")
Expect(port).To(BeEquivalentTo("61613"))

port, ok = secret.Data["stream-port"]
Expect(ok).NotTo(BeFalse(), "Failed to find key \"stream-port\" in the generated Secret")
Expect(port).To(BeEquivalentTo("5552"))

port, ok = secret.Data["web-mqtt-port"]
Expect(ok).NotTo(BeFalse(), "Failed to find key \"web-mqtt-port\" in the generated Secret")
Expect(port).To(BeEquivalentTo("15675"))

port, ok = secret.Data["web-stomp-port"]
Expect(ok).NotTo(BeFalse(), "Failed to find key \"web-stomp-port\" in the generated Secret")
Expect(port).To(BeEquivalentTo("15674"))
})
})

Context("when TLS is enabled", func() {
It("Uses the AMQPS port in the user secret", func() {
var port []byte

instance.Spec.TLS.SecretName = "tls-secret"

obj, err := defaultUserSecretBuilder.Build()
Expect(err).NotTo(HaveOccurred())
secret = obj.(*corev1.Secret)

port, ok := secret.Data["port"]
Expect(ok).NotTo(BeFalse(), "Failed to find key \"port\" in the generated Secret")
Expect(port).To(BeEquivalentTo("5671"))
})

Context("when MQTT, STOMP, streams, WebMQTT, and WebSTOMP are enabled", func() {
It("adds the MQTTS, STOMPS, streams, WebMQTTS, and WebSTOMPS ports to the user secret", func() {
var port []byte

instance.Spec.TLS.SecretName = "tls-secret"
instance.Spec.Rabbitmq.AdditionalPlugins = []rabbitmqv1beta1.Plugin{
"rabbitmq_mqtt",
"rabbitmq_stomp",
"rabbitmq_stream",
"rabbitmq_web_mqtt",
"rabbitmq_web_stomp",
}

obj, err := defaultUserSecretBuilder.Build()
Expect(err).NotTo(HaveOccurred())
secret = obj.(*corev1.Secret)

port, ok := secret.Data["mqtt-port"]
Expect(ok).NotTo(BeFalse(), "Failed to find key \"mqtt-port\" in the generated Secret")
Expect(port).To(BeEquivalentTo("8883"))

port, ok = secret.Data["stomp-port"]
Expect(ok).NotTo(BeFalse(), "Failed to find key \"stomp-port\" in the generated Secret")
Expect(port).To(BeEquivalentTo("61614"))

port, ok = secret.Data["stream-port"]
Expect(ok).NotTo(BeFalse(), "Failed to find key \"stream-port\" in the generated Secret")
Expect(port).To(BeEquivalentTo("5551"))

port, ok = secret.Data["web-mqtt-port"]
Expect(ok).NotTo(BeFalse(), "Failed to find key \"web-mqtt-port\" in the generated Secret")
Expect(port).To(BeEquivalentTo("15676"))

port, ok = secret.Data["web-stomp-port"]
Expect(ok).NotTo(BeFalse(), "Failed to find key \"web-stomp-port\" in the generated Secret")
Expect(port).To(BeEquivalentTo("15673"))
})
})
})

Context("Update with instance labels", func() {
It("Updates the secret", func() {
instance = rabbitmqv1beta1.RabbitmqCluster{
Expand All @@ -136,6 +244,7 @@ var _ = Describe("DefaultUserSecret", func() {
"this-was-the-previous-label": "should-be-deleted",
},
},
Data: map[string][]byte{},
}
err := defaultUserSecretBuilder.Update(secret)
Expect(err).NotTo(HaveOccurred())
Expand Down Expand Up @@ -184,6 +293,7 @@ var _ = Describe("DefaultUserSecret", func() {
"k8s.io/name": "should-stay",
},
},
Data: map[string][]byte{},
}
err := defaultUserSecretBuilder.Update(secret)
Expect(err).NotTo(HaveOccurred())
Expand All @@ -203,8 +313,49 @@ var _ = Describe("DefaultUserSecret", func() {
})
})

Context("When plugins or TLS are updated", func() {
It("updates the secret with the only enabled ports", func() {
instance = rabbitmqv1beta1.RabbitmqCluster{
ObjectMeta: metav1.ObjectMeta{
Name: "rabbit-labelled",
},
}
instance.Spec.Rabbitmq.AdditionalPlugins = []rabbitmqv1beta1.Plugin{
"rabbitmq_mqtt",
"rabbitmq_stream",
}
instance.Spec.TLS.SecretName = "tls-secret"
secret = &corev1.Secret{
Data: map[string][]byte{
"port": []byte("5672"),
"mqtt-port": []byte("1883"),
"web-mqtt-port": []byte("15675"),
},
}
err := defaultUserSecretBuilder.Update(secret)
Expect(err).NotTo(HaveOccurred())

port, ok := secret.Data["port"]
Expect(ok).NotTo(BeFalse())
Expect(port).To(BeEquivalentTo("5671"))

port, ok = secret.Data["mqtt-port"]
Expect(ok).NotTo(BeFalse())
Expect(port).To(BeEquivalentTo("8883"))

port, ok = secret.Data["stream-port"]
Expect(ok).NotTo(BeFalse())
Expect(port).To(BeEquivalentTo("5551"))

_, ok = secret.Data["web-mqtt-port"]
Expect(ok).To(BeFalse())
})
})

It("sets owner reference", func() {
secret = &corev1.Secret{}
secret = &corev1.Secret{
Data: map[string][]byte{},
}
instance = rabbitmqv1beta1.RabbitmqCluster{
ObjectMeta: metav1.ObjectMeta{
Name: "rabbit1",
Expand Down
11 changes: 9 additions & 2 deletions system_tests/system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,10 +151,10 @@ var _ = Describe("Operator", func() {
})

It("keeps rabbitmq server related configurations up-to-date", func() {
By("updating enabled plugins when additionalPlugins are modified", func() {
By("updating enabled plugins and the secret ports when additionalPlugins are modified", func() {
// modify rabbitmqcluster.spec.rabbitmq.additionalPlugins
Expect(updateRabbitmqCluster(ctx, rmqClusterClient, cluster.Name, cluster.Namespace, func(cluster *rabbitmqv1beta1.RabbitmqCluster) {
cluster.Spec.Rabbitmq.AdditionalPlugins = []rabbitmqv1beta1.Plugin{"rabbitmq_top"}
cluster.Spec.Rabbitmq.AdditionalPlugins = []rabbitmqv1beta1.Plugin{"rabbitmq_top", "rabbitmq_mqtt"}
})).To(Succeed())

getConfigMapAnnotations := func() map[string]string {
Expand All @@ -168,6 +168,12 @@ var _ = Describe("Operator", func() {
Eventually(getConfigMapAnnotations, 60, 1).Should(
Not(HaveKey("rabbitmq.com/pluginsUpdatedAt")), "plugins ConfigMap annotation should have been removed")

Eventually(func() map[string][]byte {
secret, err := clientSet.CoreV1().Secrets(cluster.Namespace).Get(ctx, cluster.ChildResourceName("default-user"), metav1.GetOptions{})
Expect(err).NotTo(HaveOccurred())
return secret.Data
}).Should(HaveKeyWithValue("mqtt-port", []byte("1883")))

_, err := kubectlExec(namespace,
statefulSetPodName(cluster, 0),
"rabbitmq",
Expand All @@ -177,6 +183,7 @@ var _ = Describe("Operator", func() {
"rabbitmq_peer_discovery_k8s",
"rabbitmq_prometheus",
"rabbitmq_top",
"rabbitmq_mqtt",
)
Expect(err).ToNot(HaveOccurred())
})
Expand Down