/
cache.go
326 lines (268 loc) · 10.7 KB
/
cache.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
// Copyright Contributors to the Open Cluster Management project
package client
import (
"errors"
"fmt"
"sync"
"time"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/discovery"
"k8s.io/klog"
)
var ErrNoCacheEntry = errors.New("there was no populated cache entry")
// ObjectCache provides a concurrency safe cache of unstructured objects. Additionally, it's able to convert GVKs to
// GVRs and cache the results.
type ObjectCache interface { //nolint: interfacebloat
// Get returns the object from the cache. A nil value can be returned to indicate a not found is cached. The error
// ErrNoCacheEntry is returned if there is no cache entry at all.
Get(gvk schema.GroupVersionKind, namespace string, name string) (*unstructured.Unstructured, error)
// List returns the objects from the cache, which can be an empty list. The error ErrNoCacheEntry is returned if
// there is no cache entry.
List(gvk schema.GroupVersionKind, namespace string, selector labels.Selector) ([]unstructured.Unstructured, error)
// FromObjectIdentifier returns the objects from the cache, which can be an empty list, based on the input object
// identifier. The error ErrNoCacheEntry is returned if there is no cache entry at all.
FromObjectIdentifier(objID ObjectIdentifier) ([]unstructured.Unstructured, error)
// CacheList will cache a list of objects for the list query.
CacheList(
gvk schema.GroupVersionKind, namespace string, selector labels.Selector, objects []unstructured.Unstructured,
)
// CacheObject allows to cache an object. The input object can be nil to indicate a cached not found result.
CacheObject(gvk schema.GroupVersionKind, namespace string, name string, object *unstructured.Unstructured)
// CacheFromObjectIdentifier will cache a list of objects for the input object identifier.
CacheFromObjectIdentifier(objID ObjectIdentifier, objects []unstructured.Unstructured)
// UncacheObject will entirely remove the cache entry of the object.
UncacheObject(gvk schema.GroupVersionKind, namespace string, name string)
// UncacheObject will entirely remove the cache entries for the list query.
UncacheList(gvk schema.GroupVersionKind, namespace string, selector labels.Selector)
// UncacheObject will entirely remove the cache entries for the object identifier.
UncacheFromObjectIdentifier(objID ObjectIdentifier)
// GVKToGVR will convert a GVK to a GVR and cache the result for a default of 10 minutes (configurable) when found,
// and not cache failed conversions by default (configurable).
GVKToGVR(gvk schema.GroupVersionKind) (ScopedGVR, error)
// Clear will entirely clear the cache.
Clear()
}
type lockedGVR struct {
lock *sync.RWMutex
gvr *ScopedGVR
expires time.Time
}
func (l *lockedGVR) isExpired() bool {
return time.Now().After(l.expires)
}
type objectCache struct {
cache *sync.Map
discoveryClient *discovery.DiscoveryClient
// gvkToGVRCache is used as a cache of GVK to GVR mappings. The cache entries automatically expire after 10 minutes.
gvkToGVRCache *sync.Map
options ObjectCacheOptions
}
type ObjectCacheOptions struct {
// The time for a GVK to GVR conversion cache entry to be considered fresh (not expired). This excludes missing API
// resources, which is configured by MissingAPIResourceCacheTTL. The default value is 10 minutes.
GVKToGVRCacheTTL time.Duration
// The time for a failed GVK to GVR conversion to not be retried. The default behavior is to not cache failures.
// Setting this can be useful if you don't want to continuously query the Kubernetes API if a CRD is missing.
MissingAPIResourceCacheTTL time.Duration
// Whether to *skip* the DeepCopy when retrieving an object from the cache.
// Be careful when disabling deep copy: it makes it possible to mutate objects inside the cache.
UnsafeDisableDeepCopy bool
}
// NewObjectCache will create an object cache with the input discovery client.
func NewObjectCache(discoveryClient *discovery.DiscoveryClient, options ObjectCacheOptions) ObjectCache {
if options.GVKToGVRCacheTTL == 0 {
options.GVKToGVRCacheTTL = 10 * time.Minute
}
return &objectCache{
cache: &sync.Map{},
gvkToGVRCache: &sync.Map{},
discoveryClient: discoveryClient,
options: options,
}
}
// Get returns the object from the cache. A nil value can be returned to indicate a not found is cached. The error
// ErrNoCacheEntry is returned if there is no cache entry at all.
func (o *objectCache) Get(
gvk schema.GroupVersionKind, namespace string, name string,
) (*unstructured.Unstructured, error) {
objID := ObjectIdentifier{
Group: gvk.Group,
Version: gvk.Version,
Kind: gvk.Kind,
Namespace: namespace,
Name: name,
}
result, err := o.FromObjectIdentifier(objID)
if err != nil {
return nil, err
}
if len(result) == 0 {
return nil, nil
}
return &result[0], nil
}
// List returns the objects from the cache, which can be an empty list. The error ErrNoCacheEntry is returned if
// there is no cache entry.
func (o *objectCache) List(
gvk schema.GroupVersionKind, namespace string, selector labels.Selector,
) ([]unstructured.Unstructured, error) {
if selector == nil {
selector = labels.NewSelector()
}
objID := ObjectIdentifier{
Group: gvk.Group,
Version: gvk.Version,
Kind: gvk.Kind,
Namespace: namespace,
Selector: selector.String(),
}
return o.FromObjectIdentifier(objID)
}
// CacheList will cache a list of objects for the list query.
func (o *objectCache) CacheList(
gvk schema.GroupVersionKind, namespace string, selector labels.Selector, objects []unstructured.Unstructured,
) {
if selector == nil {
selector = labels.NewSelector()
}
objID := ObjectIdentifier{
Group: gvk.Group,
Version: gvk.Version,
Kind: gvk.Kind,
Namespace: namespace,
Selector: selector.String(),
}
o.CacheFromObjectIdentifier(objID, objects)
}
// FromObjectIdentifier returns the objects from the cache, which can be an empty list, based on the input object
// identifier. The error ErrNoCacheEntry is returned if there is no cache entry at all.
func (o *objectCache) FromObjectIdentifier(objID ObjectIdentifier) ([]unstructured.Unstructured, error) {
loadedResult, loaded := o.cache.Load(objID)
if !loaded {
return nil, fmt.Errorf("%w for %s", ErrNoCacheEntry, objID)
}
// Type assertion checks aren't needed since this is the only method that stores data.
uncopiedResult := loadedResult.([]unstructured.Unstructured)
if o.options.UnsafeDisableDeepCopy {
return uncopiedResult, nil
}
result := make([]unstructured.Unstructured, 0, len(uncopiedResult))
for _, obj := range uncopiedResult {
result = append(result, *obj.DeepCopy())
}
return result, nil
}
// CacheObject allows to cache an object. The input object can be nil to indicate a cached not found result.
func (o *objectCache) CacheObject(
gvk schema.GroupVersionKind, namespace string, name string, object *unstructured.Unstructured,
) {
objID := ObjectIdentifier{
Group: gvk.Group,
Version: gvk.Version,
Kind: gvk.Kind,
Namespace: namespace,
Name: name,
}
o.CacheFromObjectIdentifier(objID, []unstructured.Unstructured{*object})
}
// CacheFromObjectIdentifier will cache a list of objects for the input object identifier.
func (o *objectCache) CacheFromObjectIdentifier(objID ObjectIdentifier, objects []unstructured.Unstructured) {
o.cache.Store(objID, objects)
}
// UncacheObject will entirely remove the cache entry of the object.
func (o *objectCache) UncacheObject(gvk schema.GroupVersionKind, namespace string, name string) {
objID := ObjectIdentifier{
Group: gvk.Group,
Version: gvk.Version,
Kind: gvk.Kind,
Namespace: namespace,
Name: name,
}
o.UncacheFromObjectIdentifier(objID)
}
// UncacheObject will entirely remove the cache entries for the list query.
func (o *objectCache) UncacheList(gvk schema.GroupVersionKind, namespace string, selector labels.Selector) {
if selector == nil {
selector = labels.NewSelector()
}
objID := ObjectIdentifier{
Group: gvk.Group,
Version: gvk.Version,
Kind: gvk.Kind,
Namespace: namespace,
Selector: selector.String(),
}
o.UncacheFromObjectIdentifier(objID)
}
// UncacheObject will entirely remove the cache entries for the object identifier.
func (o *objectCache) UncacheFromObjectIdentifier(objID ObjectIdentifier) {
o.cache.Delete(objID)
}
// GVKToGVR will convert a GVK to a GVR and cache the result for a default of 10 minutes (configurable) when found, and
// not cache failed conversions by default (configurable).
func (o *objectCache) GVKToGVR(gvk schema.GroupVersionKind) (ScopedGVR, error) {
now := time.Now()
lockedGVRObj := &lockedGVR{lock: &sync.RWMutex{}, expires: now.Add(o.options.GVKToGVRCacheTTL)}
lockedGVRObj.lock.Lock()
loadedLockedGVRObj, loaded := o.gvkToGVRCache.LoadOrStore(gvk, lockedGVRObj)
if loaded {
// If the value was loaded (not stored), there means there is a cached value or it is being populated.
lockedGVRObj = loadedLockedGVRObj.(*lockedGVR)
lockedGVRObj.lock.RLock()
if !lockedGVRObj.isExpired() {
lockedGVRObj.lock.RUnlock()
// If stored but nil, this means the previous call failed.
if lockedGVRObj.gvr == nil {
return ScopedGVR{}, ErrNoVersionedResource
}
return *lockedGVRObj.gvr, nil
}
// The cache is expired, get a write lock to update the entry.
lockedGVRObj.lock.RUnlock()
lockedGVRObj.lock.Lock()
}
defer lockedGVRObj.lock.Unlock()
groupVersion := gvk.GroupVersion().String()
resources, err := o.discoveryClient.ServerResourcesForGroupVersion(groupVersion)
if err != nil {
if apierrors.IsNotFound(err) {
klog.Infof("The group version was not found: %s", groupVersion)
lockedGVRObj.expires = now.Add(o.options.MissingAPIResourceCacheTTL)
return ScopedGVR{}, ErrNoVersionedResource
}
return ScopedGVR{}, err
}
for _, apiRes := range resources.APIResources {
if apiRes.Kind == gvk.Kind {
klog.V(2).Infof("Found the API resource: %v", apiRes)
gv := gvk.GroupVersion()
gvr := ScopedGVR{
GroupVersionResource: schema.GroupVersionResource{
Group: gv.Group,
Version: gv.Version,
Resource: apiRes.Name,
},
Namespaced: apiRes.Namespaced,
}
lockedGVRObj.gvr = &gvr
return gvr, nil
}
}
klog.V(2).Infof("The APIResource for the GVK wasn't found: %v", gvk)
lockedGVRObj.expires = now.Add(o.options.MissingAPIResourceCacheTTL)
return ScopedGVR{}, ErrNoVersionedResource
}
// Clear will entirely clear the cache.
func (o *objectCache) Clear() {
o.cache.Range(func(key, value any) bool {
o.cache.Delete(key)
return true
})
o.gvkToGVRCache.Range(func(key, value any) bool {
o.gvkToGVRCache.Delete(key)
return true
})
}