/
util.go
163 lines (139 loc) · 4.9 KB
/
util.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
/*
Copyright 2020 The cert-manager Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
import (
"reflect"
"time"
"github.com/go-logr/logr"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
logf "github.com/jetstack/cert-manager/pkg/logs"
)
var (
// KeyFunc creates a key for an API object. The key can be passed to a
// worker function that processes an object from a queue such as
// ProcessItem.
KeyFunc = cache.DeletionHandlingMetaNamespaceKeyFunc
)
// DefaultItemBasedRateLimiter returns a new rate limiter with base delay of 5
// seconds, max delay of 5 minutes.
func DefaultItemBasedRateLimiter() workqueue.RateLimiter {
return workqueue.NewItemExponentialFailureRateLimiter(time.Second*5, time.Minute*5)
}
// HandleOwnedResourceNamespacedFunc returns a function thataccepts a
// Kubernetes object and adds its owner references to the workqueue.
// https://kubernetes.io/docs/concepts/workloads/controllers/garbage-collection/#owners-and-dependents
func HandleOwnedResourceNamespacedFunc(log logr.Logger, queue workqueue.RateLimitingInterface, ownerGVK schema.GroupVersionKind, get func(namespace, name string) (interface{}, error)) func(obj interface{}) {
return func(obj interface{}) {
log := log.WithName("handleOwnedResource")
metaobj, ok := obj.(metav1.Object)
if !ok {
log.Error(nil, "item passed to handleOwnedResource does not implement metav1.Object")
return
}
log = logf.WithResource(log, metaobj)
ownerRefs := metaobj.GetOwnerReferences()
for _, ref := range ownerRefs {
log := log.WithValues(
logf.RelatedResourceNamespaceKey, metaobj.GetNamespace(),
logf.RelatedResourceNameKey, ref.Name,
logf.RelatedResourceKindKey, ref.Kind,
)
// Parse the Group out of the OwnerReference to compare it to what was parsed out of the requested OwnerType
refGV, err := schema.ParseGroupVersion(ref.APIVersion)
if err != nil {
log.Error(err, "could not parse OwnerReference GroupVersion")
continue
}
if refGV.Group == ownerGVK.Group && ref.Kind == ownerGVK.Kind {
// TODO: how to handle namespace of owner references?
order, err := get(metaobj.GetNamespace(), ref.Name)
if err != nil {
log.Error(err, "error getting referenced owning resource")
continue
}
objKey, err := KeyFunc(order)
if err != nil {
log.Error(err, "error computing key for resource")
continue
}
queue.Add(objKey)
}
}
}
}
// QueuingEventHandler is an implementation of cache.ResourceEventHandler that
// simply queues objects that are added/updated/deleted.
type QueuingEventHandler struct {
Queue workqueue.RateLimitingInterface
}
// Enqueue adds a key for an object to the workqueue.
func (q *QueuingEventHandler) Enqueue(obj interface{}) {
key, err := KeyFunc(obj)
if err != nil {
runtime.HandleError(err)
return
}
q.Queue.Add(key)
}
// OnAdd adds a newly created object to the workqueue.
func (q *QueuingEventHandler) OnAdd(obj interface{}) {
q.Enqueue(obj)
}
// OnUpdate adds an updated object to the workqueue.
func (q *QueuingEventHandler) OnUpdate(old, new interface{}) {
if reflect.DeepEqual(old, new) {
return
}
q.Enqueue(new)
}
// OnDelete adds a deleted object to the workqueue for processing.
func (q *QueuingEventHandler) OnDelete(obj interface{}) {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if ok {
obj = tombstone.Obj
}
q.Enqueue(obj)
}
// BlockingEventHandler is an implementation of cache.ResourceEventHandler that
// simply synchronously calls it's WorkFunc upon calls to OnAdd, OnUpdate or
// OnDelete.
type BlockingEventHandler struct {
WorkFunc func(obj interface{})
}
// Enqueue synchronously adds a key for an object to the workqueue.
func (b *BlockingEventHandler) Enqueue(obj interface{}) {
b.WorkFunc(obj)
}
// OnAdd synchronously adds a newly created object to the workqueue.
func (b *BlockingEventHandler) OnAdd(obj interface{}) {
b.WorkFunc(obj)
}
// OnUpdate synchronously adds an updated object to the workqueue.
func (b *BlockingEventHandler) OnUpdate(old, new interface{}) {
if reflect.DeepEqual(old, new) {
return
}
b.WorkFunc(new)
}
// OnDelete synchronously adds a deleted object to the workqueue.
func (b *BlockingEventHandler) OnDelete(obj interface{}) {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if ok {
obj = tombstone.Obj
}
b.WorkFunc(obj)
}