Skip to content

Commit

Permalink
Avoid unnecessary object updates (#438)
Browse files Browse the repository at this point in the history
* Avoid unnecessary update for headless service

Before this commit the headless service was updated unnecessarily in
every reconile loop.

* Avoid unnecessary update for client service

Before this commit the client service was updated unnecessarily in
every reconile loop.

* Avoid unnecessary update for StatefulSet

Before this commit the stateful set was updated unnecessarily in
every reconile loop.

* Revert "Avoid unnecessary update for StatefulSet"

This reverts commit 3c28bc5.
  • Loading branch information
ansd committed Nov 3, 2020
1 parent bab4c6e commit d4b6129
Show file tree
Hide file tree
Showing 5 changed files with 128 additions and 91 deletions.
1 change: 1 addition & 0 deletions docs/examples/additionalPorts/rabbitmq.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ spec:
- name: additional-port # adds an additional port on the client service
protocol: TCP
port: 12345
targetPort: 12345
statefulSet:
spec:
template:
Expand Down
50 changes: 29 additions & 21 deletions internal/resource/client_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"encoding/json"
"fmt"

"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/strategicpatch"

rabbitmqv1beta1 "github.com/rabbitmq/cluster-operator/api/v1beta1"
Expand Down Expand Up @@ -107,49 +108,56 @@ func applySvcOverride(svc *corev1.Service, override *rabbitmqv1beta1.ClientServi
func (builder *ClientServiceBuilder) updatePorts(servicePorts []corev1.ServicePort) []corev1.ServicePort {
servicePortsMap := map[string]corev1.ServicePort{
"amqp": {
Protocol: corev1.ProtocolTCP,
Port: 5672,
Name: "amqp",
Protocol: corev1.ProtocolTCP,
Port: 5672,
TargetPort: intstr.FromInt(5672),
Name: "amqp",
},
"management": {
Protocol: corev1.ProtocolTCP,
Port: 15672,
Name: "management",
Protocol: corev1.ProtocolTCP,
Port: 15672,
TargetPort: intstr.FromInt(15672),
Name: "management",
},
}
if builder.Instance.AdditionalPluginEnabled("rabbitmq_mqtt") {
servicePortsMap["mqtt"] = corev1.ServicePort{
Protocol: corev1.ProtocolTCP,
Port: 1883,
Name: "mqtt",
Protocol: corev1.ProtocolTCP,
Port: 1883,
TargetPort: intstr.FromInt(1883),
Name: "mqtt",
}
}
if builder.Instance.AdditionalPluginEnabled("rabbitmq_web_mqtt") {
servicePortsMap["web-mqtt"] = corev1.ServicePort{
Protocol: corev1.ProtocolTCP,
Port: 15675,
Name: "web-mqtt",
Protocol: corev1.ProtocolTCP,
Port: 15675,
TargetPort: intstr.FromInt(15675),
Name: "web-mqtt",
}
}
if builder.Instance.AdditionalPluginEnabled("rabbitmq_stomp") {
servicePortsMap["stomp"] = corev1.ServicePort{
Protocol: corev1.ProtocolTCP,
Port: 61613,
Name: "stomp",
Protocol: corev1.ProtocolTCP,
Port: 61613,
TargetPort: intstr.FromInt(61613),
Name: "stomp",
}
}
if builder.Instance.AdditionalPluginEnabled("rabbitmq_web_stomp") {
servicePortsMap["web-stomp"] = corev1.ServicePort{
Protocol: corev1.ProtocolTCP,
Port: 15674,
Name: "web-stomp",
Protocol: corev1.ProtocolTCP,
Port: 15674,
TargetPort: intstr.FromInt(15674),
Name: "web-stomp",
}
}
if builder.Instance.TLSEnabled() {
servicePortsMap["amqps"] = corev1.ServicePort{
Protocol: corev1.ProtocolTCP,
Port: 5671,
Name: "amqps",
Protocol: corev1.ProtocolTCP,
Port: 5671,
TargetPort: intstr.FromInt(5671),
Name: "amqps",
}
}

Expand Down
129 changes: 73 additions & 56 deletions internal/resource/client_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/intstr"
defaultscheme "k8s.io/client-go/kubernetes/scheme"
)

Expand Down Expand Up @@ -94,9 +95,10 @@ var _ = Context("ClientServices", func() {

Expect(serviceBuilder.Update(svc)).To(Succeed())
Expect(svc.Spec.Ports).Should(ContainElement(corev1.ServicePort{
Name: "amqps",
Protocol: "TCP",
Port: 5671,
Name: "amqps",
Protocol: "TCP",
Port: 5671,
TargetPort: intstr.FromInt(5671),
}))
})
})
Expand Down Expand Up @@ -291,14 +293,16 @@ var _ = Context("ClientServices", func() {
Expect(err).NotTo(HaveOccurred())

amqpPort := corev1.ServicePort{
Name: "amqp",
Port: 5672,
Protocol: corev1.ProtocolTCP,
Name: "amqp",
Port: 5672,
TargetPort: intstr.FromInt(5672),
Protocol: corev1.ProtocolTCP,
}
managementPort := corev1.ServicePort{
Name: "management",
Port: 15672,
Protocol: corev1.ProtocolTCP,
Name: "management",
Port: 15672,
TargetPort: intstr.FromInt(15672),
Protocol: corev1.ProtocolTCP,
}
Expect(svc.Spec.Ports).To(ConsistOf(amqpPort, managementPort))
})
Expand All @@ -309,9 +313,10 @@ var _ = Context("ClientServices", func() {
Expect(serviceBuilder.Update(svc)).To(Succeed())

expectedPort := corev1.ServicePort{
Name: servicePortName,
Port: int32(port),
Protocol: corev1.ProtocolTCP,
Name: servicePortName,
Port: int32(port),
TargetPort: intstr.FromInt(port),
Protocol: corev1.ProtocolTCP,
}
Expect(svc.Spec.Ports).To(ContainElement(expectedPort))
},
Expand All @@ -335,16 +340,18 @@ var _ = Context("ClientServices", func() {
svc.Spec.Type = corev1.ServiceTypeLoadBalancer
svc.Spec.Ports = []corev1.ServicePort{
{
Protocol: corev1.ProtocolTCP,
Port: 5672,
Name: "amqp",
NodePort: 12345,
Protocol: corev1.ProtocolTCP,
Port: 5672,
TargetPort: intstr.FromInt(5672),
Name: "amqp",
NodePort: 12345,
},
{
Protocol: corev1.ProtocolTCP,
Port: 15672,
Name: "management",
NodePort: 1234,
Protocol: corev1.ProtocolTCP,
Port: 15672,
TargetPort: intstr.FromInt(15672),
Name: "management",
NodePort: 1234,
},
}

Expand All @@ -353,16 +360,18 @@ var _ = Context("ClientServices", func() {
Expect(err).NotTo(HaveOccurred())

expectedAmqpServicePort := corev1.ServicePort{
Name: "amqp",
Protocol: corev1.ProtocolTCP,
Port: 5672,
NodePort: 12345,
Name: "amqp",
Protocol: corev1.ProtocolTCP,
Port: 5672,
TargetPort: intstr.FromInt(5672),
NodePort: 12345,
}
expectedManagementServicePort := corev1.ServicePort{
Protocol: corev1.ProtocolTCP,
Port: 15672,
Name: "management",
NodePort: 1234,
Protocol: corev1.ProtocolTCP,
Port: 15672,
TargetPort: intstr.FromInt(15672),
Name: "management",
NodePort: 1234,
}

Expect(svc.Spec.Ports).To(ContainElement(expectedAmqpServicePort))
Expand All @@ -373,10 +382,11 @@ var _ = Context("ClientServices", func() {
svc.Spec.Type = corev1.ServiceTypeNodePort
svc.Spec.Ports = []corev1.ServicePort{
{
Protocol: corev1.ProtocolTCP,
Port: 5672,
Name: "amqp",
NodePort: 12345,
Protocol: corev1.ProtocolTCP,
Port: 5672,
TargetPort: intstr.FromInt(5672),
Name: "amqp",
NodePort: 12345,
},
}

Expand All @@ -387,10 +397,11 @@ var _ = Context("ClientServices", func() {
// We cant set nodePort to nil because its a primitive
// For Kubernetes API, setting it to 0 is the same as not setting it at all
expectedServicePort := corev1.ServicePort{
Name: "amqp",
Protocol: corev1.ProtocolTCP,
Port: 5672,
NodePort: 0,
Name: "amqp",
Protocol: corev1.ProtocolTCP,
Port: 5672,
TargetPort: intstr.FromInt(5672),
NodePort: 0,
}

Expect(svc.Spec.Ports).To(ContainElement(expectedServicePort))
Expand All @@ -400,10 +411,11 @@ var _ = Context("ClientServices", func() {
svc.Spec.Type = corev1.ServiceTypeNodePort
svc.Spec.Ports = []corev1.ServicePort{
{
Protocol: corev1.ProtocolTCP,
Port: 5672,
Name: "amqp",
NodePort: 12345,
Protocol: corev1.ProtocolTCP,
Port: 5672,
TargetPort: intstr.FromInt(5672),
Name: "amqp",
NodePort: 12345,
},
}

Expand All @@ -412,10 +424,11 @@ var _ = Context("ClientServices", func() {
Expect(err).NotTo(HaveOccurred())

expectedServicePort := corev1.ServicePort{
Name: "amqp",
Protocol: corev1.ProtocolTCP,
Port: 5672,
NodePort: 0,
Name: "amqp",
Protocol: corev1.ProtocolTCP,
Port: 5672,
TargetPort: intstr.FromInt(5672),
NodePort: 0,
}

Expect(svc.Spec.Ports).To(ContainElement(expectedServicePort))
Expand Down Expand Up @@ -470,9 +483,10 @@ var _ = Context("ClientServices", func() {
Spec: &corev1.ServiceSpec{
Ports: []corev1.ServicePort{
{
Protocol: corev1.ProtocolUDP,
Port: 12345,
Name: "my-new-port",
Protocol: corev1.ProtocolUDP,
Port: 12345,
TargetPort: intstr.FromInt(12345),
Name: "my-new-port",
},
},
Selector: map[string]string{
Expand All @@ -499,19 +513,22 @@ var _ = Context("ClientServices", func() {
Expect(err).NotTo(HaveOccurred())
Expect(svc.Spec.Ports).To(ConsistOf(
corev1.ServicePort{
Name: "amqp",
Port: 5672,
Protocol: corev1.ProtocolTCP,
Name: "amqp",
Port: 5672,
TargetPort: intstr.FromInt(5672),
Protocol: corev1.ProtocolTCP,
},
corev1.ServicePort{
Name: "management",
Port: 15672,
Protocol: corev1.ProtocolTCP,
Name: "management",
Port: 15672,
TargetPort: intstr.FromInt(15672),
Protocol: corev1.ProtocolTCP,
},
corev1.ServicePort{
Protocol: corev1.ProtocolUDP,
Port: 12345,
Name: "my-new-port",
Protocol: corev1.ProtocolUDP,
Port: 12345,
TargetPort: intstr.FromInt(12345),
Name: "my-new-port",
},
))
Expect(svc.Spec.Selector).To(Equal(map[string]string{"a-selector": "a-label", "app.kubernetes.io/name": "foo"}))
Expand Down
22 changes: 14 additions & 8 deletions internal/resource/headless_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@ package resource

import (
"fmt"

rabbitmqv1beta1 "github.com/rabbitmq/cluster-operator/api/v1beta1"
"github.com/rabbitmq/cluster-operator/internal/metadata"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/intstr"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
)

Expand Down Expand Up @@ -49,18 +51,22 @@ func (builder *HeadlessServiceBuilder) Update(object runtime.Object) error {
service.Labels = metadata.GetLabels(builder.Instance.Name, builder.Instance.Labels)
service.Annotations = metadata.ReconcileAndFilterAnnotations(service.GetAnnotations(), builder.Instance.Annotations)
service.Spec = corev1.ServiceSpec{
ClusterIP: "None",
Selector: metadata.LabelSelector(builder.Instance.Name),
Type: corev1.ServiceTypeClusterIP,
ClusterIP: "None",
SessionAffinity: corev1.ServiceAffinityNone,
Selector: metadata.LabelSelector(builder.Instance.Name),
Ports: []corev1.ServicePort{
{
Protocol: corev1.ProtocolTCP,
Port: 4369,
Name: "epmd",
Protocol: corev1.ProtocolTCP,
Port: 4369,
TargetPort: intstr.FromInt(4369),
Name: "epmd",
},
{
Protocol: corev1.ProtocolTCP,
Port: 25672,
Name: "cluster-rpc", // aka distribution port
Protocol: corev1.ProtocolTCP,
Port: 25672,
TargetPort: intstr.FromInt(25672),
Name: "cluster-rpc", // aka distribution port
},
},
PublishNotReadyAddresses: true,
Expand Down

0 comments on commit d4b6129

Please sign in to comment.