/
udproute_controller_watch_utils.go
108 lines (91 loc) · 3.41 KB
/
udproute_controller_watch_utils.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controllers
import (
"context"
"fmt"
"reflect"
appsv1 "k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
gatewayv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2"
gatewayv1beta1 "sigs.k8s.io/gateway-api/apis/v1beta1"
"github.com/kubernetes-sigs/blixt/pkg/vars"
)
// mapDataPlaneDaemonsetToUDPRoutes is a mapping function to map dataplane
// DaemonSet updates to UDPRoute reconcilations. This enables changes to the
// DaemonSet such as adding new Pods for a new Node to result in new dataplane
// instances getting fully configured.
func (r *UDPRouteReconciler) mapDataPlaneDaemonsetToUDPRoutes(ctx context.Context, obj client.Object) (reqs []reconcile.Request) {
daemonset, ok := obj.(*appsv1.DaemonSet)
if !ok {
return
}
// determine if this is a blixt daemonset
matchLabels := daemonset.Spec.Selector.MatchLabels
app, ok := matchLabels["app"]
if !ok || app != vars.DefaultDataPlaneAppLabel {
return
}
// verify that it's the dataplane daemonset
component, ok := matchLabels["component"]
if !ok || component != vars.DefaultDataPlaneComponentLabel {
return
}
udproutes := &gatewayv1alpha2.UDPRouteList{}
if err := r.Client.List(ctx, udproutes); err != nil {
// TODO: https://github.com/kubernetes-sigs/controller-runtime/issues/1996
r.log.Error(err, "could not enqueue UDPRoutes for DaemonSet update")
return
}
for _, udproute := range udproutes.Items {
reqs = append(reqs, reconcile.Request{
NamespacedName: types.NamespacedName{
Namespace: udproute.Namespace,
Name: udproute.Name,
},
})
}
return
}
// mapGatewayToUDPRoutes enqueues reconcilation for all UDPRoutes whenever
// an event occurs on a relevant Gateway.
func (r *UDPRouteReconciler) mapGatewayToUDPRoutes(_ context.Context, obj client.Object) (reqs []reconcile.Request) {
gateway, ok := obj.(*gatewayv1beta1.Gateway)
if !ok {
r.log.Error(fmt.Errorf("invalid type in map func"), "failed to map gateways to udproutes", "expected", "*gatewayv1beta1.Gateway", "received", reflect.TypeOf(obj))
return
}
udproutes := new(gatewayv1alpha2.UDPRouteList)
if err := r.Client.List(context.Background(), udproutes); err != nil {
// TODO: https://github.com/kubernetes-sigs/controller-runtime/issues/1996
r.log.Error(err, "could not enqueue UDPRoutes for Gateway update")
return
}
for _, udproute := range udproutes.Items {
for _, parentRef := range udproute.Spec.ParentRefs {
namespace := udproute.Namespace
if parentRef.Namespace != nil {
namespace = string(*parentRef.Namespace)
}
if parentRef.Name == gatewayv1alpha2.ObjectName(gateway.Name) && namespace == gateway.Namespace {
reqs = append(reqs, reconcile.Request{NamespacedName: types.NamespacedName{
Namespace: udproute.Namespace,
Name: udproute.Name,
}})
}
}
}
return
}