/
enqueue_watching.go
111 lines (95 loc) · 3.44 KB
/
enqueue_watching.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
package dynamiccache
import (
"context"
"fmt"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)
// Enqueues all objects watching the object mentioned in the event, filtered by WatcherType.
type EnqueueWatchingObjects struct {
WatcherRefGetter ownerRefGetter
// WatcherType is the type of the Owner object to look for in OwnerReferences. Only Group and Kind are compared.
WatcherType runtime.Object
scheme *runtime.Scheme
// groupKind is the cached Group and Kind from WatcherType
groupKind schema.GroupKind
}
var _ handler.EventHandler = (*EnqueueWatchingObjects)(nil)
func NewEnqueueWatchingObjects(watcherRefGetter ownerRefGetter,
watcherType runtime.Object,
scheme *runtime.Scheme,
) *EnqueueWatchingObjects {
e := &EnqueueWatchingObjects{
WatcherRefGetter: watcherRefGetter,
WatcherType: watcherType,
scheme: scheme,
}
if err := e.parseWatcherTypeGroupKind(scheme); err != nil {
// This (passing a type that is not in the scheme) HAS
// to be a programmer error and can't be recovered at runtime anyways.
panic(err)
}
return e
}
type ownerRefGetter interface {
OwnersForGKV(gvk schema.GroupVersionKind) []OwnerReference
}
func (e *EnqueueWatchingObjects) Create(_ context.Context, evt event.CreateEvent, q workqueue.RateLimitingInterface) {
e.enqueueWatchers(evt.Object, q)
}
func (e *EnqueueWatchingObjects) Update(_ context.Context, evt event.UpdateEvent, q workqueue.RateLimitingInterface) {
e.enqueueWatchers(evt.ObjectNew, q)
e.enqueueWatchers(evt.ObjectOld, q)
}
func (e *EnqueueWatchingObjects) Delete(_ context.Context, evt event.DeleteEvent, q workqueue.RateLimitingInterface) {
e.enqueueWatchers(evt.Object, q)
}
func (e *EnqueueWatchingObjects) Generic(_ context.Context, evt event.GenericEvent, q workqueue.RateLimitingInterface) {
e.enqueueWatchers(evt.Object, q)
}
func (e *EnqueueWatchingObjects) enqueueWatchers(obj client.Object, q workqueue.RateLimitingInterface) {
if obj == nil {
return
}
gvk, err := apiutil.GVKForObject(obj, e.scheme)
if err != nil {
// TODO: error reporting?
panic(err)
}
ownerRefs := e.WatcherRefGetter.OwnersForGKV(gvk)
for _, ownerRef := range ownerRefs {
if ownerRef.Kind != e.groupKind.Kind ||
ownerRef.Group != e.groupKind.Group {
continue
}
q.Add(reconcile.Request{
NamespacedName: types.NamespacedName{
Name: ownerRef.Name,
Namespace: ownerRef.Namespace,
},
})
}
}
// parseOwnerTypeGroupKind parses the WatcherType into a Group and Kind and caches the result. Returns false
// if the WatcherType could not be parsed using the scheme.
func (e *EnqueueWatchingObjects) parseWatcherTypeGroupKind(scheme *runtime.Scheme) error {
// Get the kinds of the type
kinds, _, err := scheme.ObjectKinds(e.WatcherType)
if err != nil {
return err
}
// Expect only 1 kind. If there is more than one kind this is probably an edge case such as ListOptions.
if len(kinds) != 1 {
panic(fmt.Sprintf("Expected exactly 1 kind for WatcherType %T, but found %s kinds", e.WatcherType, kinds))
}
// Cache the Group and Kind for the WatcherType
e.groupKind = schema.GroupKind{Group: kinds[0].Group, Kind: kinds[0].Kind}
return nil
}