forked from knative/serving
/
default.go
128 lines (104 loc) · 3.29 KB
/
default.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
/*
Copyright 2018 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package configmap
import (
"errors"
"sync"
corev1 "k8s.io/api/core/v1"
informers "k8s.io/client-go/informers"
corev1informers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/tools/cache"
)
// defaultImpl provides a default informer-based implementation of Watcher.
type defaultImpl struct {
sif informers.SharedInformerFactory
informer corev1informers.ConfigMapInformer
ns string
// Guards mutations to defaultImpl fields
m sync.Mutex
observers map[string][]Observer
started bool
}
// Asserts that defaultImpl implements Watcher.
var _ Watcher = (*defaultImpl)(nil)
// Watch implements Watcher
func (di *defaultImpl) Watch(name string, w Observer) {
di.m.Lock()
defer di.m.Unlock()
if di.observers == nil {
di.observers = make(map[string][]Observer)
}
wl, _ := di.observers[name]
di.observers[name] = append(wl, w)
}
// Start implements Watcher
func (di *defaultImpl) Start(stopCh <-chan struct{}) error {
if err := di.registerCallbackAndStartInformer(stopCh); err != nil {
return err
}
// Wait until it has been synced (WITHOUT holding the mutex, so callbacks happen)
if ok := cache.WaitForCacheSync(stopCh, di.informer.Informer().HasSynced); !ok {
return errors.New("Error waiting for ConfigMap informer to sync.")
}
return di.checkObservedResourcesExist()
}
func (di *defaultImpl) registerCallbackAndStartInformer(stopCh <-chan struct{}) error {
di.m.Lock()
defer di.m.Unlock()
if di.started {
return errors.New("Watcher already started!")
}
di.started = true
di.informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: di.addConfigMapEvent,
UpdateFunc: di.updateConfigMapEvent,
})
// Start the shared informer factory (non-blocking)
di.sif.Start(stopCh)
return nil
}
func (di *defaultImpl) checkObservedResourcesExist() error {
di.m.Lock()
defer di.m.Unlock()
// Check that all objects with Observers exist in our informers.
for k := range di.observers {
_, err := di.informer.Lister().ConfigMaps(di.ns).Get(k)
if err != nil {
return err
}
}
return nil
}
func (di *defaultImpl) addConfigMapEvent(obj interface{}) {
// If the ConfigMap update is outside of our namespace, then quickly disregard it.
configMap := obj.(*corev1.ConfigMap)
if configMap.Namespace != di.ns {
// Outside of our namespace.
// This shouldn't happen due to our filtered informer.
return
}
// Within our namespace, take the lock and see if there are any registered observers.
di.m.Lock()
defer di.m.Unlock()
wl, ok := di.observers[configMap.Name]
if !ok {
return // No observers.
}
// Iterate over the observers and invoke their callbacks.
for _, w := range wl {
w(configMap)
}
}
func (di *defaultImpl) updateConfigMapEvent(old, new interface{}) {
di.addConfigMapEvent(new)
}