/
changeset.go
124 lines (101 loc) · 3.02 KB
/
changeset.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package relatedresource
import (
"context"
"time"
"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/api/meta"
"github.com/rancher/wrangler/v2/pkg/generic"
"github.com/rancher/wrangler/v2/pkg/kv"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/cache"
)
type Key struct {
Namespace string
Name string
}
func NewKey(namespace, name string) Key {
return Key{
Namespace: namespace,
Name: name,
}
}
func FromString(key string) Key {
return NewKey(kv.RSplit(key, "/"))
}
type ControllerWrapper interface {
Informer() cache.SharedIndexInformer
AddGenericHandler(ctx context.Context, name string, handler generic.Handler)
}
type ClusterScopedEnqueuer interface {
Enqueue(name string)
}
type Enqueuer interface {
Enqueue(namespace, name string)
}
type Resolver func(namespace, name string, obj runtime.Object) ([]Key, error)
func WatchClusterScoped(ctx context.Context, name string, resolve Resolver, enq ClusterScopedEnqueuer, watching ...ControllerWrapper) {
Watch(ctx, name, resolve, &wrapper{ClusterScopedEnqueuer: enq}, watching...)
}
func Watch(ctx context.Context, name string, resolve Resolver, enq Enqueuer, watching ...ControllerWrapper) {
for _, c := range watching {
watch(ctx, name, enq, resolve, c)
}
}
func watch(ctx context.Context, name string, enq Enqueuer, resolve Resolver, controller ControllerWrapper) {
runResolve := func(ns, name string, obj runtime.Object) error {
keys, err := resolve(ns, name, obj)
if err != nil {
return err
}
for _, key := range keys {
if key.Name != "" {
enq.Enqueue(key.Namespace, key.Name)
}
}
return nil
}
addResourceEventHandler(ctx, controller.Informer(), cache.ResourceEventHandlerFuncs{
DeleteFunc: func(obj interface{}) {
ro, ok := obj.(runtime.Object)
if !ok {
return
}
meta, err := meta.Accessor(ro)
if err != nil {
return
}
go func() {
time.Sleep(time.Second)
runResolve(meta.GetNamespace(), meta.GetName(), ro)
}()
},
})
controller.AddGenericHandler(ctx, name, func(key string, obj runtime.Object) (runtime.Object, error) {
ns, name := kv.RSplit(key, "/")
return obj, runResolve(ns, name, obj)
})
}
type wrapper struct {
ClusterScopedEnqueuer
}
func (w *wrapper) Enqueue(namespace, name string) {
w.ClusterScopedEnqueuer.Enqueue(name)
}
// informerRegisterer is a subset of the cache.SharedIndexInformer, so it's easier to replace in tests
type informerRegisterer interface {
AddEventHandler(funcs cache.ResourceEventHandler) (cache.ResourceEventHandlerRegistration, error)
RemoveEventHandler(cache.ResourceEventHandlerRegistration) error
}
func addResourceEventHandler(ctx context.Context, informer informerRegisterer, handler cache.ResourceEventHandler) {
handlerReg, err := informer.AddEventHandler(handler)
if err != nil {
logrus.WithError(err).Error("failed to add ResourceEventHandler")
return
}
go func() {
<-ctx.Done()
if err := informer.RemoveEventHandler(handlerReg); err != nil {
logrus.WithError(err).Warn("failed to remove ResourceEventHandler")
}
}()
}