/
watch_based_manager.go
365 lines (318 loc) · 10.6 KB
/
watch_based_manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package manager
import (
"fmt"
"sync"
"time"
"k8s.io/api/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/utils/clock"
)
type listObjectFunc func(string, metav1.ListOptions) (runtime.Object, error)
type watchObjectFunc func(string, metav1.ListOptions) (watch.Interface, error)
type newObjectFunc func() runtime.Object
type isImmutableFunc func(runtime.Object) bool
// objectCacheItem is a single item stored in objectCache.
type objectCacheItem struct {
refCount int
store *cacheStore
reflector *cache.Reflector
hasSynced func() (bool, error)
// waitGroup is used to ensure that there won't be two concurrent calls to reflector.Run
waitGroup sync.WaitGroup
// lock is to ensure the access and modify of lastAccessTime, stopped, and immutable are thread safety,
// and protecting from closing stopCh multiple times.
lock sync.Mutex
lastAccessTime time.Time
stopped bool
immutable bool
stopCh chan struct{}
}
func (i *objectCacheItem) stop() bool {
i.lock.Lock()
defer i.lock.Unlock()
return i.stopThreadUnsafe()
}
func (i *objectCacheItem) stopThreadUnsafe() bool {
if i.stopped {
return false
}
i.stopped = true
close(i.stopCh)
if !i.immutable {
i.store.unsetInitialized()
}
return true
}
func (i *objectCacheItem) setLastAccessTime(time time.Time) {
i.lock.Lock()
defer i.lock.Unlock()
i.lastAccessTime = time
}
func (i *objectCacheItem) setImmutable() {
i.lock.Lock()
defer i.lock.Unlock()
i.immutable = true
}
func (i *objectCacheItem) stopIfIdle(now time.Time, maxIdleTime time.Duration) bool {
i.lock.Lock()
defer i.lock.Unlock()
// Ensure that we don't try to stop not yet initialized reflector.
// In case of overloaded kube-apiserver, if the list request is
// already being processed, all the work would lost and would have
// to be retried.
if !i.stopped && i.store.hasSynced() && now.After(i.lastAccessTime.Add(maxIdleTime)) {
return i.stopThreadUnsafe()
}
return false
}
func (i *objectCacheItem) restartReflectorIfNeeded() {
i.lock.Lock()
defer i.lock.Unlock()
if i.immutable || !i.stopped {
return
}
i.stopCh = make(chan struct{})
i.stopped = false
go i.startReflector()
}
func (i *objectCacheItem) startReflector() {
i.waitGroup.Wait()
i.waitGroup.Add(1)
defer i.waitGroup.Done()
i.reflector.Run(i.stopCh)
}
// cacheStore is in order to rewrite Replace function to mark initialized flag
type cacheStore struct {
cache.Store
lock sync.Mutex
initialized bool
}
func (c *cacheStore) Replace(list []interface{}, resourceVersion string) error {
c.lock.Lock()
defer c.lock.Unlock()
err := c.Store.Replace(list, resourceVersion)
if err != nil {
return err
}
c.initialized = true
return nil
}
func (c *cacheStore) hasSynced() bool {
c.lock.Lock()
defer c.lock.Unlock()
return c.initialized
}
func (c *cacheStore) unsetInitialized() {
c.lock.Lock()
defer c.lock.Unlock()
c.initialized = false
}
// objectCache is a local cache of objects propagated via
// individual watches.
type objectCache struct {
listObject listObjectFunc
watchObject watchObjectFunc
newObject newObjectFunc
isImmutable isImmutableFunc
groupResource schema.GroupResource
clock clock.Clock
maxIdleTime time.Duration
lock sync.RWMutex
items map[objectKey]*objectCacheItem
}
const minIdleTime = 1 * time.Minute
// NewObjectCache returns a new watch-based instance of Store interface.
func NewObjectCache(
listObject listObjectFunc,
watchObject watchObjectFunc,
newObject newObjectFunc,
isImmutable isImmutableFunc,
groupResource schema.GroupResource,
clock clock.Clock,
maxIdleTime time.Duration) Store {
if maxIdleTime < minIdleTime {
maxIdleTime = minIdleTime
}
store := &objectCache{
listObject: listObject,
watchObject: watchObject,
newObject: newObject,
isImmutable: isImmutable,
groupResource: groupResource,
clock: clock,
maxIdleTime: maxIdleTime,
items: make(map[objectKey]*objectCacheItem),
}
// TODO propagate stopCh from the higher level.
go wait.Until(store.startRecycleIdleWatch, time.Minute, wait.NeverStop)
return store
}
func (c *objectCache) newStore() *cacheStore {
// TODO: We may consider created a dedicated store keeping just a single
// item, instead of using a generic store implementation for this purpose.
// However, simple benchmarks show that memory overhead in that case is
// decrease from ~600B to ~300B per object. So we are not optimizing it
// until we will see a good reason for that.
store := cache.NewStore(cache.MetaNamespaceKeyFunc)
return &cacheStore{store, sync.Mutex{}, false}
}
func (c *objectCache) newReflector(namespace, name string) *objectCacheItem {
fieldSelector := fields.Set{"metadata.name": name}.AsSelector().String()
listFunc := func(options metav1.ListOptions) (runtime.Object, error) {
options.FieldSelector = fieldSelector
return c.listObject(namespace, options)
}
watchFunc := func(options metav1.ListOptions) (watch.Interface, error) {
options.FieldSelector = fieldSelector
return c.watchObject(namespace, options)
}
store := c.newStore()
reflector := cache.NewNamedReflector(
fmt.Sprintf("object-%q/%q", namespace, name),
&cache.ListWatch{ListFunc: listFunc, WatchFunc: watchFunc},
c.newObject(),
store,
0,
)
item := &objectCacheItem{
refCount: 0,
store: store,
reflector: reflector,
hasSynced: func() (bool, error) { return store.hasSynced(), nil },
stopCh: make(chan struct{}),
}
go item.startReflector()
return item
}
func (c *objectCache) AddReference(namespace, name string) {
key := objectKey{namespace: namespace, name: name}
// AddReference is called from RegisterPod thus it needs to be efficient.
// Thus, it is only increasing refCount and in case of first registration
// of a given object it starts corresponding reflector.
// It's responsibility of the first Get operation to wait until the
// reflector propagated the store.
c.lock.Lock()
defer c.lock.Unlock()
item, exists := c.items[key]
if !exists {
item = c.newReflector(namespace, name)
c.items[key] = item
}
item.refCount++
}
func (c *objectCache) DeleteReference(namespace, name string) {
key := objectKey{namespace: namespace, name: name}
c.lock.Lock()
defer c.lock.Unlock()
if item, ok := c.items[key]; ok {
item.refCount--
if item.refCount == 0 {
// Stop the underlying reflector.
item.stop()
delete(c.items, key)
}
}
}
// key returns key of an object with a given name and namespace.
// This has to be in-sync with cache.MetaNamespaceKeyFunc.
func (c *objectCache) key(namespace, name string) string {
if len(namespace) > 0 {
return namespace + "/" + name
}
return name
}
func (c *objectCache) Get(namespace, name string) (runtime.Object, error) {
key := objectKey{namespace: namespace, name: name}
c.lock.RLock()
item, exists := c.items[key]
c.lock.RUnlock()
if !exists {
return nil, fmt.Errorf("object %q/%q not registered", namespace, name)
}
// Record last access time independently if it succeeded or not.
// This protects from premature (racy) reflector closure.
item.setLastAccessTime(c.clock.Now())
item.restartReflectorIfNeeded()
if err := wait.PollImmediate(10*time.Millisecond, time.Second, item.hasSynced); err != nil {
return nil, fmt.Errorf("failed to sync %s cache: %v", c.groupResource.String(), err)
}
obj, exists, err := item.store.GetByKey(c.key(namespace, name))
if err != nil {
return nil, err
}
if !exists {
return nil, apierrors.NewNotFound(c.groupResource, name)
}
if object, ok := obj.(runtime.Object); ok {
// If the returned object is immutable, stop the reflector.
//
// NOTE: we may potentially not even start the reflector if the object is
// already immutable. However, given that:
// - we want to also handle the case when object is marked as immutable later
// - Secrets and ConfigMaps are periodically fetched by volumemanager anyway
// - doing that wouldn't provide visible scalability/performance gain - we
// already have it from here
// - doing that would require significant refactoring to reflector
// we limit ourselves to just quickly stop the reflector here.
if c.isImmutable(object) {
item.setImmutable()
if item.stop() {
klog.V(4).InfoS("Stopped watching for changes - object is immutable", "obj", klog.KRef(namespace, name))
}
}
return object, nil
}
return nil, fmt.Errorf("unexpected object type: %v", obj)
}
func (c *objectCache) startRecycleIdleWatch() {
c.lock.Lock()
defer c.lock.Unlock()
for key, item := range c.items {
if item.stopIfIdle(c.clock.Now(), c.maxIdleTime) {
klog.V(4).InfoS("Not acquired for long time, Stopped watching for changes", "objectKey", key, "maxIdleTime", c.maxIdleTime)
}
}
}
// NewWatchBasedManager creates a manager that keeps a cache of all objects
// necessary for registered pods.
// It implements the following logic:
// - whenever a pod is created or updated, we start individual watches for all
// referenced objects that aren't referenced from other registered pods
// - every GetObject() returns a value from local cache propagated via watches
func NewWatchBasedManager(
listObject listObjectFunc,
watchObject watchObjectFunc,
newObject newObjectFunc,
isImmutable isImmutableFunc,
groupResource schema.GroupResource,
resyncInterval time.Duration,
getReferencedObjects func(*v1.Pod) sets.String) Manager {
// If a configmap/secret is used as a volume, the volumeManager will visit the objectCacheItem every resyncInterval cycle,
// We just want to stop the objectCacheItem referenced by environment variables,
// So, maxIdleTime is set to an integer multiple of resyncInterval,
// We currently set it to 5 times.
maxIdleTime := resyncInterval * 5
objectStore := NewObjectCache(listObject, watchObject, newObject, isImmutable, groupResource, clock.RealClock{}, maxIdleTime)
return NewCacheBasedManager(objectStore, getReferencedObjects)
}