/
controller.go
223 lines (176 loc) · 5.78 KB
/
controller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
// Copyright 2020 the Pinniped contributors. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package controllerlib
import (
"context"
"errors"
"fmt"
"sync"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/events"
"k8s.io/client-go/util/workqueue"
"go.pinniped.dev/internal/plog"
)
// Controller interface represents a runnable Kubernetes controller.
// Cancelling the context passed will cause the controller to shutdown.
// Number of workers determine how much parallel the job processing should be.
type Controller interface {
// Run runs the controller and blocks until the controller is finished.
// Number of workers can be specified via workers parameter.
// This function will return when all internal loops are finished.
// Note that having more than one worker usually means handing parallelization of Sync().
Run(ctx context.Context, workers int)
// Name returns the controller name string.
Name() string
// The methods below should only be called during tests via the Test* functions.
// sync contains the main controller logic.
// This can be used in unit tests to exercise the Syncer by directly calling it.
sync(ctx Context) error
// wrap wraps the main controller logic provided via the Syncer.
// This can be used in tests to synchronize asynchronous events as seen by a running controller.
// The wrapping must be done after New is called and before Run is called.
wrap(wrapper SyncWrapperFunc)
// These are called by the Run() method but also need to be called by Test* functions sometimes.
waitForCacheSyncWithTimeout() bool
invokeAllRunOpts()
}
var _ Controller = &controller{}
type Config struct {
Name string
Syncer Syncer
}
func New(config Config, opts ...Option) Controller {
c := &controller{
config: config,
}
// set up defaults
WithRateLimiter(workqueue.DefaultControllerRateLimiter())(c)
WithRecorder(klogRecorder{})(c)
for _, opt := range opts {
opt(c)
}
return c
}
type controller struct {
config Config
queue workqueue.RateLimitingInterface
queueWrapper Queue
maxRetries int
recorder events.EventRecorder
run bool
runOpts []Option
cacheSyncs []cache.InformerSynced
}
func (c *controller) Run(ctx context.Context, workers int) {
defer utilruntime.HandleCrash(crash) // prevent panics from killing the process
plog.Debug("starting controller", "controller", c.Name(), "workers", workers)
c.invokeAllRunOpts()
if !c.waitForCacheSyncWithTimeout() {
panic(die(fmt.Sprintf("%s: timed out waiting for caches to sync", c.Name())))
}
var workerWg sync.WaitGroup
// workerContext is used to track and initiate worker shutdown
workerContext, workerContextCancel := context.WithCancel(context.Background())
defer func() {
c.queue.ShutDown() // shutdown the controller queue first
workerContextCancel() // cancel the worker context, which tell workers to initiate shutdown
// Wait for all workers to finish their job.
// at this point the Run() can hang and callers have to implement the logic that will kill
// this controller (SIGKILL).
workerWg.Wait()
plog.Debug("all workers have been terminated, shutting down", "controller", c.Name(), "workers", workers)
}()
for i := 1; i <= workers; i++ {
idx := i
plog.Debug("starting worker", "controller", c.Name(), "worker", idx)
workerWg.Add(1)
go func() {
defer utilruntime.HandleCrash(crash) // prevent panics from killing the process
defer func() {
plog.Debug("shutting down worker", "controller", c.Name(), "worker", idx)
workerWg.Done()
}()
c.runWorker(workerContext)
}()
}
<-ctx.Done() // wait for controller context to be cancelled
}
func (c *controller) invokeAllRunOpts() {
c.run = true
for _, opt := range c.runOpts {
opt(c)
}
}
func (c *controller) Name() string {
return c.config.Name
}
func (c *controller) sync(ctx Context) error {
return c.config.Syncer.Sync(ctx)
}
func (c *controller) wrap(wrapper SyncWrapperFunc) {
c.runOpts = append(c.runOpts, toRunOpt(func(c *controller) {
c.config.Syncer = wrapper(c.config.Syncer)
}))
}
func (c *controller) waitForCacheSyncWithTimeout() bool {
// prevent us from blocking forever due to a broken informer
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
defer cancel()
return cache.WaitForCacheSync(ctx.Done(), c.cacheSyncs...)
}
func (c *controller) add(filter Filter, object metav1.Object) {
key := filter.Parent(object)
c.queueWrapper.Add(key)
}
// runWorker runs a single worker
// The worker is asked to terminate when the passed context is cancelled.
func (c *controller) runWorker(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
default:
c.processNextWorkItem(ctx)
}
}
}
func (c *controller) processNextWorkItem(ctx context.Context) {
queueKey, quit := c.queue.Get()
if quit {
return
}
key := queueKey.(Key)
defer c.queue.Done(key)
syncCtx := Context{
Context: ctx,
Name: c.Name(),
Key: key,
Queue: c.queueWrapper,
Recorder: c.recorder,
}
err := c.sync(syncCtx)
c.handleKey(key, err)
}
func (c *controller) handleKey(key Key, err error) {
if err == nil {
c.queue.Forget(key)
return
}
retryForever := c.maxRetries <= 0
shouldRetry := retryForever || c.queue.NumRequeues(key) < c.maxRetries
if !shouldRetry {
utilruntime.HandleError(fmt.Errorf("%s: dropping key %v out of the queue: %w", c.Name(), key, err))
c.queue.Forget(key)
return
}
if errors.Is(err, ErrSyntheticRequeue) {
// logging this helps detecting wedged controllers with missing pre-requirements
plog.Debug("requested synthetic requeue", "controller", c.Name(), "key", key)
} else {
utilruntime.HandleError(fmt.Errorf("%s: %v failed with: %w", c.Name(), key, err))
}
c.queue.AddRateLimited(key)
}