/
option.go
155 lines (138 loc) · 4.41 KB
/
option.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
// Copyright 2020-2023 the Pinniped contributors. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package controllerlib
import (
"fmt"
"sync"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/events"
"k8s.io/client-go/util/workqueue"
"go.pinniped.dev/internal/plog"
)
type Option func(*controller)
func WithMaxRetries(maxRetries int) Option {
return func(c *controller) {
c.maxRetries = maxRetries
}
}
func WithInitialEvent(key Key) Option {
return toNaiveRunOpt(func(c *controller) {
c.queueWrapper.Add(key)
})
}
func WithRateLimiter(limiter workqueue.RateLimiter) Option {
return func(c *controller) {
c.queue = workqueue.NewNamedRateLimitingQueue(limiter, c.Name())
c.queueWrapper = &queueWrapper{queue: c.queue}
}
}
func WithRecorder(recorder events.EventRecorder) Option {
return func(c *controller) {
c.recorder = recorder
}
}
func WithInformer(getter InformerGetter, filter Filter, opt InformerOption) Option {
informer := getter.Informer() // immediately signal that we intend to use this informer in case it is lazily initialized
return toRunOpt(func(c *controller) {
if opt.SkipSync && opt.SkipEvents {
panic(die("cannot skip syncing and event handlers at the same time"))
}
if !opt.SkipSync {
c.cacheSyncs = append(c.cacheSyncs, informer.HasSynced)
}
if opt.SkipEvents {
return
}
_, err := informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
object := metaOrDie(obj)
if filter.Add(object) {
plog.Debug("handling add",
"controller", c.Name(),
"namespace", object.GetNamespace(),
"name", object.GetName(),
"selfLink", object.GetSelfLink(), // TODO: self link is deprecated so we need to extract the GVR in some other way (using a series of schemes?)
"kind", fmt.Sprintf("%T", object),
)
c.add(filter, object)
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
oldObject := metaOrDie(oldObj)
newObject := metaOrDie(newObj)
if filter.Update(oldObject, newObject) {
plog.Debug("handling update",
"controller", c.Name(),
"namespace", newObject.GetNamespace(),
"name", newObject.GetName(),
"selfLink", newObject.GetSelfLink(), // TODO: self link is deprecated so we need to extract the GVR in some other way (using a series of schemes?)
"kind", fmt.Sprintf("%T", newObject),
)
c.add(filter, newObject)
}
},
DeleteFunc: func(obj interface{}) {
accessor, err := meta.Accessor(obj)
if err != nil {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
utilruntime.HandleError(fmt.Errorf("%s: could not get object from tombstone: %+v", c.Name(), obj))
return
}
accessor, err = meta.Accessor(tombstone.Obj)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: tombstone contained object that is not an accessor: %+v", c.Name(), obj))
return
}
}
if filter.Delete(accessor) {
plog.Debug("handling delete",
"controller", c.Name(),
"namespace", accessor.GetNamespace(),
"name", accessor.GetName(),
"selfLink", accessor.GetSelfLink(), // TODO: self link is deprecated so we need to extract the GVR in some other way (using a series of schemes?)
"kind", fmt.Sprintf("%T", accessor),
)
c.add(filter, accessor)
}
},
})
if err != nil {
// Shouldn't really happen.
panic(die(fmt.Sprintf("got error from AddEventHandler: %s", err.Error())))
}
})
}
// toRunOpt guarantees that an Option only runs once on the first call to Run (and not New), even if a controller is stopped and restarted.
func toRunOpt(opt Option) Option {
return toOnceOpt(toNaiveRunOpt(opt))
}
// toNaiveRunOpt guarantees that an Option only runs on calls to Run (and not New), even if a controller is stopped and restarted.
func toNaiveRunOpt(opt Option) Option {
return func(c *controller) {
if c.run {
opt(c)
return
}
c.runOpts = append(c.runOpts, opt)
}
}
// toOnceOpt guarantees that an Option only runs once.
func toOnceOpt(opt Option) Option {
var once sync.Once
return func(c *controller) {
once.Do(func() {
opt(c)
})
}
}
func metaOrDie(obj interface{}) metav1.Object {
accessor, err := meta.Accessor(obj)
if err != nil {
panic(err) // this should never happen
}
return accessor
}