-
Notifications
You must be signed in to change notification settings - Fork 21
/
admission.go
210 lines (185 loc) · 6.25 KB
/
admission.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
package snapshot
import (
"fmt"
"sync"
api "kubedb.dev/apimachinery/apis/kubedb/v1alpha1"
cs "kubedb.dev/apimachinery/client/clientset/versioned"
plugin "kubedb.dev/apimachinery/pkg/admission"
amv "kubedb.dev/apimachinery/pkg/validator"
admission "k8s.io/api/admission/v1beta1"
core "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"kmodules.xyz/client-go/meta"
meta_util "kmodules.xyz/client-go/meta"
hookapi "kmodules.xyz/webhook-runtime/admission/v1beta1"
)
type SnapshotValidator struct {
client kubernetes.Interface
extClient cs.Interface
lock sync.RWMutex
initialized bool
}
var _ hookapi.AdmissionHook = &SnapshotValidator{}
func (a *SnapshotValidator) Resource() (plural schema.GroupVersionResource, singular string) {
return schema.GroupVersionResource{
Group: "validators.kubedb.com",
Version: "v1alpha1",
Resource: "snapshotvalidators",
},
"snapshotvalidator"
}
func (a *SnapshotValidator) Initialize(config *rest.Config, stopCh <-chan struct{}) error {
a.lock.Lock()
defer a.lock.Unlock()
a.initialized = true
var err error
if a.client, err = kubernetes.NewForConfig(config); err != nil {
return err
}
if a.extClient, err = cs.NewForConfig(config); err != nil {
return err
}
return err
}
func (a *SnapshotValidator) Admit(req *admission.AdmissionRequest) *admission.AdmissionResponse {
status := &admission.AdmissionResponse{}
if (req.Operation != admission.Create && req.Operation != admission.Update) ||
len(req.SubResource) != 0 ||
req.Kind.Group != api.SchemeGroupVersion.Group ||
req.Kind.Kind != api.ResourceKindSnapshot {
status.Allowed = true
return status
}
a.lock.RLock()
defer a.lock.RUnlock()
if !a.initialized {
return hookapi.StatusUninitialized()
}
obj, err := meta.UnmarshalFromJSON(req.Object.Raw, api.SchemeGroupVersion)
if err != nil {
return hookapi.StatusBadRequest(err)
}
if req.Operation == admission.Update {
oldObject, err := meta.UnmarshalFromJSON(req.OldObject.Raw, api.SchemeGroupVersion)
if err != nil {
return hookapi.StatusBadRequest(err)
}
if err := plugin.ValidateUpdate(obj, oldObject, req.Kind.Kind); err != nil {
return hookapi.StatusBadRequest(err)
}
// Skip checking validation if Spec is not changed
if meta_util.Equal(obj.(*api.Snapshot).Spec, oldObject.(*api.Snapshot).Spec) {
status.Allowed = true
return status
}
}
// validates if database of particular kind exists
if err := a.validateSnapshot(obj.(*api.Snapshot)); err != nil {
return hookapi.StatusForbidden(err)
}
// validates Snapshot Spec
if err := amv.ValidateSnapshotSpec(obj.(*api.Snapshot).Spec.Backend); err != nil {
return hookapi.StatusForbidden(err)
}
if req.Operation == admission.Create {
// isSnapshotRunning checks if a snapshot is already running. Check this only when creating snapshot,
// because Snapshot.Status will be needed to edit later and this method will give error for that update.
if err := a.isSnapshotRunning(obj.(*api.Snapshot)); err != nil {
return hookapi.StatusForbidden(err)
}
}
status.Allowed = true
return status
}
// validateSnapshot checks if the database of the particular kind actually exists.
func (a *SnapshotValidator) validateSnapshot(snapshot *api.Snapshot) error {
// Database name can't empty
databaseName := snapshot.Spec.DatabaseName
if databaseName == "" {
return fmt.Errorf(`object 'DatabaseName' is missing in '%v'`, snapshot.Spec)
}
//
kind, err := meta_util.GetStringValue(snapshot.Labels, api.LabelDatabaseKind)
if err != nil {
return fmt.Errorf("'%v:XDB' label is missing", api.LabelDatabaseKind)
}
// Check if DB exists
switch kind {
case api.ResourceKindElasticsearch:
es, err := a.extClient.KubedbV1alpha1().Elasticsearches(snapshot.Namespace).Get(databaseName, metav1.GetOptions{})
if err != nil {
return err
}
storage := es.Spec.Storage
if es.Spec.Topology != nil {
storage = es.Spec.Topology.Data.Storage
}
if err := verifyStorageType(snapshot, storage); err != nil {
return err
}
case api.ResourceKindPostgres:
pg, err := a.extClient.KubedbV1alpha1().Postgreses(snapshot.Namespace).Get(databaseName, metav1.GetOptions{})
if err != nil {
return err
}
if err := verifyStorageType(snapshot, pg.Spec.Storage); err != nil {
return err
}
case api.ResourceKindMongoDB:
mg, err := a.extClient.KubedbV1alpha1().MongoDBs(snapshot.Namespace).Get(databaseName, metav1.GetOptions{})
if err != nil {
return err
}
if err := verifyStorageType(snapshot, mg.Spec.Storage); err != nil {
return err
}
case api.ResourceKindMySQL:
my, err := a.extClient.KubedbV1alpha1().MySQLs(snapshot.Namespace).Get(databaseName, metav1.GetOptions{})
if err != nil {
return err
}
if err := verifyStorageType(snapshot, my.Spec.Storage); err != nil {
return err
}
case api.ResourceKindRedis:
if _, err := a.extClient.KubedbV1alpha1().Redises(snapshot.Namespace).Get(databaseName, metav1.GetOptions{}); err != nil {
return err
}
case api.ResourceKindMemcached:
if _, err := a.extClient.KubedbV1alpha1().Memcacheds(snapshot.Namespace).Get(databaseName, metav1.GetOptions{}); err != nil {
return err
}
}
return nil
}
func verifyStorageType(snapshot *api.Snapshot, dbPvcSpec *core.PersistentVolumeClaimSpec) error {
if snapshot.Spec.StorageType != nil &&
*snapshot.Spec.StorageType == api.StorageTypeDurable &&
snapshot.Spec.PodVolumeClaimSpec == nil &&
dbPvcSpec == nil {
return fmt.Errorf("snapshot storagetype is durable but, " +
"pvc Spec is not specified in either PodVolumeClaimSpec or db.Spec.Storage")
}
return nil
}
func (a *SnapshotValidator) isSnapshotRunning(snapshot *api.Snapshot) error {
labelMap := map[string]string{
api.LabelDatabaseKind: snapshot.Labels[api.LabelDatabaseKind],
api.LabelDatabaseName: snapshot.Spec.DatabaseName,
api.LabelSnapshotStatus: string(api.SnapshotPhaseRunning),
}
snapshotList, err := a.extClient.KubedbV1alpha1().Snapshots(snapshot.Namespace).List(metav1.ListOptions{
LabelSelector: labels.SelectorFromSet(labelMap).String(),
})
if err != nil {
return err
}
if len(snapshotList.Items) > 0 {
return fmt.Errorf("one Snapshot is already running")
}
return nil
}