forked from openshift/origin
-
Notifications
You must be signed in to change notification settings - Fork 0
/
namespace_security_allocation_controller.go
230 lines (198 loc) · 6.38 KB
/
namespace_security_allocation_controller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
package controller
import (
"fmt"
"time"
"k8s.io/apimachinery/pkg/api/errors"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
kapi "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/v1"
kcoreclient "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/typed/core/v1"
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions/core/v1"
"github.com/golang/glog"
"github.com/openshift/origin/pkg/security"
"github.com/openshift/origin/pkg/security/mcs"
"github.com/openshift/origin/pkg/security/uid"
"github.com/openshift/origin/pkg/security/uidallocator"
)
// NamespaceSecurityDefaultsController allocates uids/labels for namespaces
type NamespaceSecurityDefaultsController struct {
uidAllocator uidallocator.Interface
mcsAllocator MCSAllocationFunc
client kcoreclient.NamespaceInterface
queue workqueue.RateLimitingInterface
maxRetries int
controller cache.Controller
cache cache.Store
// extracted for testing
syncHandler func(key string) error
}
func NewNamespaceSecurityDefaultsController(namespaces informers.NamespaceInformer, client kcoreclient.NamespaceInterface, uid uidallocator.Interface, mcs MCSAllocationFunc) *NamespaceSecurityDefaultsController {
c := &NamespaceSecurityDefaultsController{
uidAllocator: uid,
mcsAllocator: mcs,
client: client,
controller: namespaces.Informer().GetController(),
cache: namespaces.Informer().GetStore(),
queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
maxRetries: 10,
}
namespaces.Informer().AddEventHandlerWithResyncPeriod(
cache.ResourceEventHandlerFuncs{
AddFunc: c.enqueueNamespace,
UpdateFunc: func(oldObj, newObj interface{}) {
c.enqueueNamespace(newObj)
},
},
10*time.Minute,
)
c.syncHandler = c.syncNamespace
return c
}
// Run starts the workers for this controller.
func (c *NamespaceSecurityDefaultsController) Run(stopCh <-chan struct{}, workers int) {
defer utilruntime.HandleCrash()
defer c.queue.ShutDown()
// Wait for the stores to fill
if !cache.WaitForCacheSync(stopCh, c.controller.HasSynced) {
return
}
glog.V(5).Infof("Starting workers")
for i := 0; i < workers; i++ {
go c.worker()
}
<-stopCh
glog.V(1).Infof("Shutting down")
}
func (c *NamespaceSecurityDefaultsController) enqueueNamespace(obj interface{}) {
ns, ok := obj.(*v1.Namespace)
if !ok {
return
}
c.queue.Add(ns.Name)
}
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the syncHandler is never invoked concurrently with the same key.
func (c *NamespaceSecurityDefaultsController) worker() {
for c.work() {
}
}
// work returns true if the worker thread should continue
func (c *NamespaceSecurityDefaultsController) work() bool {
key, quit := c.queue.Get()
if quit {
return false
}
defer c.queue.Done(key)
if err := c.syncHandler(key.(string)); err == nil {
// this means the request was successfully handled. We should "forget" the item so that any retry
// later on is reset
c.queue.Forget(key)
} else {
// if we had an error it means that we didn't handle it, which means that we want to requeue the work
utilruntime.HandleError(fmt.Errorf("error syncing namespace, it will be retried: %v", err))
c.queue.AddRateLimited(key)
}
return true
}
// syncNamespace will sync the namespace with the given key.
// This function is not meant to be invoked concurrently with the same key.
func (c *NamespaceSecurityDefaultsController) syncNamespace(key string) error {
item, exists, err := c.cache.GetByKey(key)
if err != nil {
return err
}
if !exists {
return nil
}
return c.allocate(item.(*v1.Namespace))
}
type MCSAllocationFunc func(uid.Block) *mcs.Label
// DefaultMCSAllocation returns a label from the MCS range that matches the offset
// within the overall range. blockSize must be a positive integer representing the
// number of labels to jump past in the category space (if 1, range == label, if 2
// each range will have two labels).
func DefaultMCSAllocation(from *uid.Range, to *mcs.Range, blockSize int) MCSAllocationFunc {
return func(block uid.Block) *mcs.Label {
ok, offset := from.Offset(block)
if !ok {
return nil
}
if blockSize > 0 {
offset = offset * uint32(blockSize)
}
label, _ := to.LabelAt(uint64(offset))
return label
}
}
// Next processes a changed namespace and tries to allocate a uid range for it. If it is
// successful, an mcs label corresponding to the relative position of the range is also
// set.
func (c *NamespaceSecurityDefaultsController) allocate(ns *v1.Namespace) error {
tx := &tx{}
defer tx.Rollback()
if _, ok := ns.Annotations[security.UIDRangeAnnotation]; ok {
return nil
}
obj, err := kapi.Scheme.Copy(ns)
if err != nil {
return err
}
ns = obj.(*v1.Namespace)
if ns.Annotations == nil {
ns.Annotations = make(map[string]string)
}
// do uid allocation
block, err := c.uidAllocator.AllocateNext()
if err != nil {
return err
}
tx.Add(func() error { return c.uidAllocator.Release(block) })
ns.Annotations[security.UIDRangeAnnotation] = block.String()
ns.Annotations[security.SupplementalGroupsAnnotation] = block.String()
if _, ok := ns.Annotations[security.MCSAnnotation]; !ok {
if label := c.mcsAllocator(block); label != nil {
ns.Annotations[security.MCSAnnotation] = label.String()
}
}
_, err = c.client.Update(ns)
if err == nil {
tx.Commit()
}
if errors.IsNotFound(err) {
return nil
}
return err
}
func changedAndSetAnnotations(old, ns *v1.Namespace) bool {
if value, ok := ns.Annotations[security.UIDRangeAnnotation]; ok && value != old.Annotations[security.UIDRangeAnnotation] {
return true
}
if value, ok := ns.Annotations[security.MCSAnnotation]; ok && value != old.Annotations[security.MCSAnnotation] {
return true
}
if value, ok := ns.Annotations[security.SupplementalGroupsAnnotation]; ok && value != old.Annotations[security.SupplementalGroupsAnnotation] {
return true
}
return false
}
type tx struct {
rollback []func() error
}
func (tx *tx) Add(fn func() error) {
tx.rollback = append(tx.rollback, fn)
}
func (tx *tx) HasChanges() bool {
return len(tx.rollback) > 0
}
func (tx *tx) Rollback() {
for _, fn := range tx.rollback {
if err := fn(); err != nil {
utilruntime.HandleError(fmt.Errorf("unable to undo tx: %v", err))
}
}
}
func (tx *tx) Commit() {
tx.rollback = nil
}