forked from openshift/origin
/
admission.go
148 lines (123 loc) · 4.63 KB
/
admission.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package nodeenv
import (
"errors"
"fmt"
"io"
"time"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apiserver/pkg/admission"
"k8s.io/apiserver/pkg/admission/initializer"
"k8s.io/client-go/informers"
corev1listers "k8s.io/client-go/listers/core/v1"
coreapi "k8s.io/kubernetes/pkg/apis/core"
oadmission "github.com/openshift/origin/pkg/cmd/server/admission"
"github.com/openshift/origin/pkg/util/labelselector"
)
func Register(plugins *admission.Plugins) {
plugins.Register("scheduling.openshift.io/OriginPodNodeEnvironment",
func(config io.Reader) (admission.Interface, error) {
return NewPodNodeEnvironment()
})
}
const (
timeToWaitForCacheSync = 10 * time.Second
kubeProjectNodeSelector = "scheduler.alpha.kubernetes.io/node-selector"
openShiftProjectNodeSelector = "openshift.io/node-selector"
)
// podNodeEnvironment is an implementation of admission.MutationInterface.
type podNodeEnvironment struct {
*admission.Handler
nsLister corev1listers.NamespaceLister
nsListerSynced func() bool
// TODO this should become a piece of config passed to the admission plugin
defaultNodeSelector string
}
var _ = initializer.WantsExternalKubeInformerFactory(&podNodeEnvironment{})
var _ = oadmission.WantsDefaultNodeSelector(&podNodeEnvironment{})
var _ = admission.ValidationInterface(&podNodeEnvironment{})
var _ = admission.MutationInterface(&podNodeEnvironment{})
// Admit enforces that pod and its project node label selectors matches at least a node in the cluster.
func (p *podNodeEnvironment) admit(a admission.Attributes, mutationAllowed bool) (err error) {
resource := a.GetResource().GroupResource()
if resource != corev1.Resource("pods") {
return nil
}
if a.GetSubresource() != "" {
// only run the checks below on pods proper and not subresources
return nil
}
obj := a.GetObject()
pod, ok := obj.(*coreapi.Pod)
if !ok {
return nil
}
name := pod.Name
if !p.waitForSyncedStore(time.After(timeToWaitForCacheSync)) {
return admission.NewForbidden(a, errors.New("scheduling.openshift.io/OriginPodNodeEnvironment: caches not synchronized"))
}
namespace, err := p.nsLister.Get(a.GetNamespace())
if err != nil {
return apierrors.NewForbidden(resource, name, err)
}
// If scheduler.alpha.kubernetes.io/node-selector is set on the pod,
// do not process the pod further.
if _, ok := namespace.ObjectMeta.Annotations[kubeProjectNodeSelector]; ok {
return nil
}
selector := p.defaultNodeSelector
if projectNodeSelector, ok := namespace.ObjectMeta.Annotations[openShiftProjectNodeSelector]; ok {
selector = projectNodeSelector
}
projectNodeSelector, err := labelselector.Parse(selector)
if err != nil {
return err
}
if labelselector.Conflicts(projectNodeSelector, pod.Spec.NodeSelector) {
return apierrors.NewForbidden(resource, name, fmt.Errorf("pod node label selector conflicts with its project node label selector"))
}
if !mutationAllowed && len(labelselector.Merge(projectNodeSelector, pod.Spec.NodeSelector)) != len(pod.Spec.NodeSelector) {
// no conflict, different size => pod.Spec.NodeSelector does not contain projectNodeSelector
return apierrors.NewForbidden(resource, name, fmt.Errorf("pod node label selector does not extend project node label selector"))
}
// modify pod node selector = project node selector + current pod node selector
pod.Spec.NodeSelector = labelselector.Merge(projectNodeSelector, pod.Spec.NodeSelector)
return nil
}
func (p *podNodeEnvironment) Admit(a admission.Attributes) (err error) {
return p.admit(a, true)
}
func (p *podNodeEnvironment) Validate(a admission.Attributes) (err error) {
return p.admit(a, false)
}
func (p *podNodeEnvironment) SetDefaultNodeSelector(in string) {
p.defaultNodeSelector = in
}
func (p *podNodeEnvironment) SetExternalKubeInformerFactory(kubeInformers informers.SharedInformerFactory) {
p.nsLister = kubeInformers.Core().V1().Namespaces().Lister()
p.nsListerSynced = kubeInformers.Core().V1().Namespaces().Informer().HasSynced
}
func (p *podNodeEnvironment) waitForSyncedStore(timeout <-chan time.Time) bool {
for !p.nsListerSynced() {
select {
case <-time.After(100 * time.Millisecond):
case <-timeout:
return p.nsListerSynced()
}
}
return true
}
func (p *podNodeEnvironment) ValidateInitialization() error {
if p.nsLister == nil {
return fmt.Errorf("project node environment plugin needs a namespace lister")
}
if p.nsListerSynced == nil {
return fmt.Errorf("project node environment plugin needs a namespace lister synced")
}
return nil
}
func NewPodNodeEnvironment() (admission.Interface, error) {
return &podNodeEnvironment{
Handler: admission.NewHandler(admission.Create),
}, nil
}