-
Notifications
You must be signed in to change notification settings - Fork 0
/
watch.go
243 lines (202 loc) · 6.89 KB
/
watch.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
package auth
import (
"errors"
"sync"
"github.com/golang/glog"
kapi "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/auth/user"
kstorage "k8s.io/kubernetes/pkg/storage"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/watch"
projectcache "github.com/openshift/origin/pkg/project/cache"
projectutil "github.com/openshift/origin/pkg/project/util"
)
type CacheWatcher interface {
// GroupMembershipChanged is called serially for all changes for all watchers. This method MUST NOT BLOCK.
// The serial nature makes reasoning about the code easy, but if you block in this method you will doom all watchers.
GroupMembershipChanged(namespaceName string, users, groups sets.String)
}
type WatchableCache interface {
// RemoveWatcher removes a watcher
RemoveWatcher(CacheWatcher)
// List returns the set of namespace names the user has access to view
List(userInfo user.Info) (*kapi.NamespaceList, error)
}
// userProjectWatcher converts a native etcd watch to a watch.Interface.
type userProjectWatcher struct {
user user.Info
// visibleNamespaces are the namespaces that the scopes allow
visibleNamespaces sets.String
// cacheIncoming is a buffered channel used for notification to watcher. If the buffer fills up,
// then the watcher will be removed and the connection will be broken.
cacheIncoming chan watch.Event
// cacheError is a cached channel that is put to serially. In theory, only one item will
// ever be placed on it.
cacheError chan error
// outgoing is the unbuffered `ResultChan` use for the watch. Backups of this channel will block
// the default `emit` call. That's why cacheError is a buffered channel.
outgoing chan watch.Event
// userStop lets a user stop his watch.
userStop chan struct{}
// stopLock keeps parallel stops from doing crazy things
stopLock sync.Mutex
// Injectable for testing. Send the event down the outgoing channel.
emit func(watch.Event)
projectCache *projectcache.ProjectCache
authCache WatchableCache
initialProjects []kapi.Namespace
// knownProjects maps name to resourceVersion
knownProjects map[string]string
}
var (
// watchChannelHWM tracks how backed up the most backed up channel got. This mirrors etcd watch behavior and allows tuning
// of channel depth.
watchChannelHWM kstorage.HighWaterMark
)
func NewUserProjectWatcher(user user.Info, visibleNamespaces sets.String, projectCache *projectcache.ProjectCache, authCache WatchableCache, includeAllExistingProjects bool) *userProjectWatcher {
namespaces, _ := authCache.List(user)
knownProjects := map[string]string{}
for _, namespace := range namespaces.Items {
knownProjects[namespace.Name] = namespace.ResourceVersion
}
// this is optional. If they don't request it, don't include it.
initialProjects := []kapi.Namespace{}
if includeAllExistingProjects {
initialProjects = append(initialProjects, namespaces.Items...)
}
w := &userProjectWatcher{
user: user,
visibleNamespaces: visibleNamespaces,
cacheIncoming: make(chan watch.Event, 1000),
cacheError: make(chan error, 1),
outgoing: make(chan watch.Event),
userStop: make(chan struct{}),
projectCache: projectCache,
authCache: authCache,
initialProjects: initialProjects,
knownProjects: knownProjects,
}
w.emit = func(e watch.Event) {
select {
case w.outgoing <- e:
case <-w.userStop:
}
}
return w
}
func (w *userProjectWatcher) GroupMembershipChanged(namespaceName string, users, groups sets.String) {
if !w.visibleNamespaces.Has("*") && !w.visibleNamespaces.Has(namespaceName) {
// this user is scoped to a level that shouldn't see this update
return
}
hasAccess := users.Has(w.user.GetName()) || groups.HasAny(w.user.GetGroups()...)
_, known := w.knownProjects[namespaceName]
switch {
// this means that we were removed from the project
case !hasAccess && known:
delete(w.knownProjects, namespaceName)
select {
case w.cacheIncoming <- watch.Event{
Type: watch.Deleted,
Object: projectutil.ConvertNamespace(&kapi.Namespace{ObjectMeta: kapi.ObjectMeta{Name: namespaceName}}),
}:
default:
// remove the watcher so that we wont' be notified again and block
w.authCache.RemoveWatcher(w)
w.cacheError <- errors.New("delete notification timeout")
}
case hasAccess:
namespace, err := w.projectCache.GetNamespace(namespaceName)
if err != nil {
utilruntime.HandleError(err)
return
}
event := watch.Event{
Type: watch.Added,
Object: projectutil.ConvertNamespace(namespace),
}
// if we already have this in our list, then we're getting notified because the object changed
if lastResourceVersion, known := w.knownProjects[namespaceName]; known {
event.Type = watch.Modified
// if we've already notified for this particular resourceVersion, there's no work to do
if lastResourceVersion == namespace.ResourceVersion {
return
}
}
w.knownProjects[namespaceName] = namespace.ResourceVersion
select {
case w.cacheIncoming <- event:
default:
// remove the watcher so that we won't be notified again and block
w.authCache.RemoveWatcher(w)
w.cacheError <- errors.New("add notification timeout")
}
}
}
// Watch pulls stuff from etcd, converts, and pushes out the outgoing channel. Meant to be
// called as a goroutine.
func (w *userProjectWatcher) Watch() {
defer close(w.outgoing)
defer func() {
// when the watch ends, always remove the watcher from the cache to avoid leaking.
w.authCache.RemoveWatcher(w)
}()
defer utilruntime.HandleCrash()
// start by emitting all the `initialProjects`
for i := range w.initialProjects {
// keep this check here to sure we don't keep this open in the case of failures
select {
case err := <-w.cacheError:
w.emit(makeErrorEvent(err))
return
default:
}
w.emit(watch.Event{
Type: watch.Added,
Object: projectutil.ConvertNamespace(&w.initialProjects[i]),
})
}
for {
select {
case err := <-w.cacheError:
w.emit(makeErrorEvent(err))
return
case <-w.userStop:
return
case event := <-w.cacheIncoming:
if curLen := int64(len(w.cacheIncoming)); watchChannelHWM.Update(curLen) {
// Monitor if this gets backed up, and how much.
glog.V(2).Infof("watch: %v objects queued in project cache watching channel.", curLen)
}
w.emit(event)
}
}
}
func makeErrorEvent(err error) watch.Event {
return watch.Event{
Type: watch.Error,
Object: &unversioned.Status{
Status: unversioned.StatusFailure,
Message: err.Error(),
},
}
}
// ResultChan implements watch.Interface.
func (w *userProjectWatcher) ResultChan() <-chan watch.Event {
return w.outgoing
}
// Stop implements watch.Interface.
func (w *userProjectWatcher) Stop() {
// lock access so we don't race past the channel select
w.stopLock.Lock()
defer w.stopLock.Unlock()
// Prevent double channel closes.
select {
case <-w.userStop:
return
default:
}
close(w.userStop)
}