-
Notifications
You must be signed in to change notification settings - Fork 9
/
worker.go
128 lines (107 loc) · 3.85 KB
/
worker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
/*
Copyright AppsCode Inc. and Contributors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package queue
import (
"fmt"
"time"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
)
// Worker continuously runs a Reconcile function against a message Queue
type Worker struct {
name string
queue workqueue.RateLimitingInterface
maxRetries int
threadiness int
reconcile func(key string) error
}
func New(name string, maxRetries, threadiness int, fn func(key string) error) *Worker {
q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), name)
return &Worker{name, q, maxRetries, threadiness, fn}
}
func (w *Worker) GetQueue() workqueue.RateLimitingInterface {
return w.queue
}
// Run schedules a routine to continuously process Queue messages
// until shutdown is closed
func (w *Worker) Run(shutdown <-chan struct{}) {
defer runtime.HandleCrash()
// Every second, process all messages in the Queue until it is time to shutdown
for i := 0; i < w.threadiness; i++ {
go wait.Until(w.processQueue, time.Second, shutdown)
}
go func() {
<-shutdown
// Stop accepting messages into the Queue
klog.V(1).Infof("Shutting down %s Queue\n", w.name)
w.queue.ShutDown()
}()
}
// ProcessAllMessages tries to process all messages in the Queue
func (w *Worker) processQueue() {
for w.processNextEntry() {
}
}
// ProcessMessage tries to process the next message in the Queue, and requeues on an error
func (w *Worker) processNextEntry() bool {
// Wait until there is a new item in the working queue
key, quit := w.queue.Get()
if quit {
return false
}
// Tell the queue that we are done with processing this key. This unblocks the key for other workers
// This allows safe parallel processing because two deployments with the same key are never processed in
// parallel.
defer w.queue.Done(key)
// Invoke the method containing the business logic
paniced, err := w.panicSafeReconcile(key.(string))
if err == nil {
// Forget about the #AddRateLimited history of the key on every successful synchronization.
// This ensures that future processing of updates for this key is not delayed because of
// an outdated error history.
w.queue.Forget(key)
return true
}
klog.Errorf("Failed to process key %v. Reason: %s", key, err)
// This controller retries 5 times if something goes wrong. After that, it stops trying.
if !paniced && w.queue.NumRequeues(key) < w.maxRetries {
klog.Infof("Error syncing key %v: %v", key, err)
// Re-enqueue the key rate limited. Based on the rate limiter on the
// queue and the re-enqueue history, the key will be processed later again.
w.queue.AddRateLimited(key)
return true
}
w.queue.Forget(key)
// Report to an external entity that, even after several retries, we could not successfully process this key
if !paniced {
runtime.HandleError(err)
}
klog.Infof("Dropping key %q out of the queue: %v", key, err)
return true
}
func (w *Worker) panicSafeReconcile(key string) (paniced bool, err error) {
// xref: https://github.com/kubernetes-sigs/controller-runtime/blob/v0.10.0/pkg/internal/controller/controller.go#L102-L111
defer func() {
if r := recover(); r != nil {
for _, fn := range runtime.PanicHandlers {
fn(r)
}
paniced = true
err = fmt.Errorf("panic: %v [recovered]", r)
}
}()
err = w.reconcile(key)
return
}