-
Notifications
You must be signed in to change notification settings - Fork 63
/
federation_controller.go
64 lines (54 loc) · 2.41 KB
/
federation_controller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package controllers
import (
"context"
"errors"
"fmt"
"github.com/rabbitmq/messaging-topology-operator/internal"
"github.com/rabbitmq/messaging-topology-operator/rabbitmqclient"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
topology "github.com/rabbitmq/messaging-topology-operator/api/v1beta1"
)
// +kubebuilder:rbac:groups=rabbitmq.com,resources=federations,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=rabbitmq.com,resources=federations/finalizers,verbs=update
// +kubebuilder:rbac:groups=rabbitmq.com,resources=federations/status,verbs=get;update;patch
type FederationReconciler struct {
client.Client
}
func (r *FederationReconciler) DeclareFunc(ctx context.Context, client rabbitmqclient.Client, obj topology.TopologyResource) error {
federation := obj.(*topology.Federation)
uri, err := r.getUri(ctx, federation)
if err != nil {
return fmt.Errorf("failed to parse federation uri secret; secret name: %s, error: %w", federation.Spec.UriSecret.Name, err)
}
return validateResponse(client.PutFederationUpstream(federation.Spec.Vhost, federation.Spec.Name, internal.GenerateFederationDefinition(federation, uri)))
}
func (r *FederationReconciler) getUri(ctx context.Context, federation *topology.Federation) (string, error) {
if federation.Spec.UriSecret == nil {
return "", fmt.Errorf("no uri secret provided")
}
secret := &corev1.Secret{}
if err := r.Get(ctx, types.NamespacedName{Name: federation.Spec.UriSecret.Name, Namespace: federation.Namespace}, secret); err != nil {
return "", err
}
uri, ok := secret.Data["uri"]
if !ok {
return "", fmt.Errorf("could not find key 'uri' in secret %s", secret.Name)
}
return string(uri), nil
}
// DeleteFunc deletes federation from rabbitmq server
// if server responds with '404' Not Found, it logs and does not requeue on error
func (r *FederationReconciler) DeleteFunc(ctx context.Context, client rabbitmqclient.Client, obj topology.TopologyResource) error {
logger := ctrl.LoggerFrom(ctx)
federation := obj.(*topology.Federation)
err := validateResponseForDeletion(client.DeleteFederationUpstream(federation.Spec.Vhost, federation.Spec.Name))
if errors.Is(err, NotFound) {
logger.Info("cannot find federation upstream parameter; no need to delete it", "federation", federation.Spec.Name)
} else if err != nil {
return err
}
return nil
}