-
Notifications
You must be signed in to change notification settings - Fork 0
/
dynamic.go
196 lines (174 loc) · 5.72 KB
/
dynamic.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
package main
import (
"errors"
"sync"
"golang.org/x/time/rate"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/discovery"
"k8s.io/client-go/rest"
)
// dynamicCachable is a Cachable that dynamically discovers resource
// types at runtime.
type dynamicCachable struct {
mu sync.RWMutex // protects the following fields
staticCachable Cachable
limiter *rate.Limiter
newCachable func() (Cachable, error)
lazy bool
// Used for lazy init.
initOnce sync.Once
}
// DynamicCachableOption is a functional option on the dynamicCachable
type DynamicCachableOption func(*dynamicCachable) error
// WithLimiter sets the Cachable's underlying limiter to lim.
func WithLimiter(lim *rate.Limiter) DynamicCachableOption {
return func(drm *dynamicCachable) error {
drm.limiter = lim
return nil
}
}
// WithLazyDiscovery prevents the Cachable from discovering REST mappings
// until an API call is made.
var WithLazyDiscovery DynamicCachableOption = func(drm *dynamicCachable) error {
drm.lazy = true
return nil
}
// WithCustomCachable supports setting a custom Cachable refresher instead of
// the default method, which uses a discovery client.
//
// This exists mainly for testing, but can be useful if you need tighter control
// over how discovery is performed, which discovery endpoints are queried, etc.
func WithCustomCachable(newCachable func() (Cachable, error)) DynamicCachableOption {
return func(drm *dynamicCachable) error {
drm.newCachable = newCachable
return nil
}
}
// NewDynamicCachable returns a dynamic Cachable for cfg. The dynamic
// Cachable dynamically discovers resource types at runtime. opts
// configure the Cachable.
func NewDynamicCachable(cfg *rest.Config, opts ...DynamicCachableOption) (Cachable, error) {
client, err := discovery.NewDiscoveryClientForConfig(cfg)
if err != nil {
return nil, err
}
drm := &dynamicCachable{
limiter: rate.NewLimiter(rate.Limit(defaultRefillRate), defaultLimitSize),
newCachable: func() (Cachable, error) {
return NewCachable(client)
},
}
for _, opt := range opts {
if err = opt(drm); err != nil {
return nil, err
}
}
if !drm.lazy {
if err := drm.setStaticCachable(); err != nil {
return nil, err
}
}
return drm, nil
}
var (
// defaultRefilRate is the default rate at which potential calls are
// added back to the "bucket" of allowed calls.
defaultRefillRate = 5
// defaultLimitSize is the default starting/max number of potential calls
// per second. Once a call is used, it's added back to the bucket at a rate
// of defaultRefillRate per second.
defaultLimitSize = 5
)
// setStaticCachable sets drm's staticCachable by querying its client, regardless
// of reload backoff.
func (drm *dynamicCachable) setStaticCachable() error {
newCachable, err := drm.newCachable()
if err != nil {
return err
}
drm.staticCachable = newCachable
return nil
}
// init initializes drm only once if drm is lazy.
func (drm *dynamicCachable) init() (err error) {
drm.initOnce.Do(func() {
if drm.lazy {
err = drm.setStaticCachable()
}
})
return err
}
// checkAndReload attempts to call the given callback, which is assumed to be dependent
// on the data in the restmapper.
//
// If the callback returns an error that matches the given error, it will attempt to reload
// the Cachable's data and re-call the callback once that's occurred.
// If the callback returns any other error, the function will return immediately regardless.
//
// It will take care of ensuring that reloads are rate-limited and that extraneous calls
// aren't made. If a reload would exceed the limiters rate, it returns the error return by
// the callback.
// It's thread-safe, and worries about thread-safety for the callback (so the callback does
// not need to attempt to lock the restmapper).
func (drm *dynamicCachable) checkAndReload(needsReloadErr error, checkNeedsReload func() error) error {
// first, check the common path -- data is fresh enough
// (use an IIFE for the lock's defer)
err := func() error {
drm.mu.RLock()
defer drm.mu.RUnlock()
return checkNeedsReload()
}()
// NB(directxman12): `Is` and `As` have a confusing relationship --
// `Is` is like `== or does this implement .Is`, whereas `As` says
// `can I type-assert into`
needsReload := errors.As(err, &needsReloadErr)
if !needsReload {
return err
}
// if the data wasn't fresh, we'll need to try and update it, so grab the lock...
drm.mu.Lock()
defer drm.mu.Unlock()
// ... and double-check that we didn't reload in the meantime
err = checkNeedsReload()
needsReload = errors.As(err, &needsReloadErr)
if !needsReload {
return err
}
// we're still stale, so grab a rate-limit token if we can...
if !drm.limiter.Allow() {
// return error from static mapper here, we have refreshed often enough (exceeding rate of provided limiter)
// so that client's can handle this the same way as a "normal" NoResourceMatchError / NoKindMatchError
return err
}
// ...reload...
if err := drm.setStaticCachable(); err != nil {
return err
}
// ...and return the results of the closure regardless
return checkNeedsReload()
}
func (drm *dynamicCachable) GVK(gvk schema.GroupVersionKind) (bool, error) {
if err := drm.init(); err != nil {
return false, err
}
var canCache bool
err := drm.checkAndReload(&meta.NoKindMatchError{}, func() error {
var err error
canCache, err = drm.staticCachable.GVK(gvk)
return err
})
return canCache, err
}
func (drm *dynamicCachable) GVR(gvr schema.GroupVersionResource) (bool, error) {
if err := drm.init(); err != nil {
return false, err
}
var canCache bool
err := drm.checkAndReload(&meta.NoResourceMatchError{}, func() error {
var err error
canCache, err = drm.staticCachable.GVR(gvr)
return err
})
return canCache, err
}