/
controller.go
173 lines (138 loc) · 4.34 KB
/
controller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
package controller
import (
"context"
"fmt"
"time"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
)
var (
KeyFunc = cache.DeletionHandlingMetaNamespaceKeyFunc
)
type ControleeGetterFunction func(namespace, name string) (interface{}, error)
type SyncHandlerFunction func(ctx context.Context, key string) error
// OrchestClusterController reconciles OrchestCluster CRD.
type Controller[Object client.Object] struct {
// name of this Controller
name string
// the kubernetes client
kubeClient kubernetes.Interface
// GroupVersionKind of the object this controller handles
gvk *schema.GroupVersionKind
workerLoopPeriod time.Duration
// queue for the keys that need to be synced.
queue workqueue.RateLimitingInterface
// number of workers consuming the queue
threadiness int
// sync function
SyncHandler SyncHandlerFunction
// The lister of controlee
ControleeGetter ControleeGetterFunction
// the list of informerSynced
InformerSyncedList []cache.InformerSynced
}
func NewController[Object client.Object](name string, threadiness int,
kubeClient kubernetes.Interface,
gvk *schema.GroupVersionKind) *Controller[Object] {
controller := &Controller[Object]{
name: name,
kubeClient: kubeClient,
threadiness: threadiness,
gvk: gvk,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), name),
}
return controller
}
// resolveControllerRef returns the controller referenced by a ControllerRef,
// or nil if the ControllerRef could not be resolved to a matching controller
// of the correct Kind.
func (c *Controller[Object]) resolveControllerRef(namespace string, controllerRef *metav1.OwnerReference) interface{} {
// We can't look up by UID, so look up by Name and then verify UID.
// Don't even try to look up by Name if it's the wrong Kind.
if controllerRef.Kind != c.gvk.Kind {
return nil
}
obj, err := c.ControleeGetter(namespace, controllerRef.Name)
if err != nil {
return nil
}
accessor, err := meta.Accessor(obj)
if err != nil {
return nil
}
if accessor.GetUID() != controllerRef.UID {
// The controller we found with this Name is not the same one that the
// ControllerRef points to.
return nil
}
return obj
}
// Run will not return until stopCh is closed. workers determines how many
// objects will be handled in parallel.
func (c *Controller[Object]) Run(stopCh <-chan struct{}) {
klog.Infof("Starting %s controller", c.name)
defer klog.Infof("Shutting down %s controller", c.name)
defer utilruntime.HandleCrash()
defer c.queue.ShutDown()
if !cache.WaitForCacheSync(stopCh, c.InformerSyncedList...) {
return
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
for i := 0; i < c.threadiness; i++ {
go wait.UntilWithContext(ctx, c.worker, c.workerLoopPeriod)
}
<-stopCh
}
func (c *Controller[Object]) worker(ctx context.Context) {
for c.processNextWorkItem(ctx) {
}
}
// processNextWorkItem deals with one key off the queue. It returns false when it's time to quit.
func (c *Controller[Object]) processNextWorkItem(ctx context.Context) bool {
eKey, quit := c.queue.Get()
if quit {
return false
}
defer c.queue.Done(eKey)
err := c.SyncHandler(ctx, eKey.(string))
c.handleErr(err, eKey)
return true
}
func (c *Controller[Object]) Enqueue(obj interface{}) {
key, err := KeyFunc(obj)
if err != nil {
utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %#v: %v", obj, err))
return
}
c.queue.Add(key)
}
func (c *Controller[Object]) EnqueueAfter(obj interface{}) {
key, err := KeyFunc(obj)
if err != nil {
utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %#v: %v", obj, err))
return
}
// TODO: make it configurable
c.queue.AddAfter(key, time.Second)
}
func (c *Controller[Object]) Client() kubernetes.Interface {
return c.kubeClient
}
func (c *Controller[Object]) handleErr(err error, key interface{}) {
if err == nil {
c.queue.Forget(key)
return
}
klog.Warningf("dropping Object %q out of the queue: %v", key, err)
c.queue.Forget(key)
utilruntime.HandleError(err)
}