-
Notifications
You must be signed in to change notification settings - Fork 38.9k
/
volume_restrictions.go
343 lines (302 loc) · 12.4 KB
/
volume_restrictions.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package volumerestrictions
import (
"context"
"fmt"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
corelisters "k8s.io/client-go/listers/core/v1"
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
)
// VolumeRestrictions is a plugin that checks volume restrictions.
type VolumeRestrictions struct {
pvcLister corelisters.PersistentVolumeClaimLister
sharedLister framework.SharedLister
enableReadWriteOncePod bool
}
var _ framework.PreFilterPlugin = &VolumeRestrictions{}
var _ framework.FilterPlugin = &VolumeRestrictions{}
var _ framework.EnqueueExtensions = &VolumeRestrictions{}
var _ framework.StateData = &preFilterState{}
const (
// Name is the name of the plugin used in the plugin registry and configurations.
Name = names.VolumeRestrictions
// preFilterStateKey is the key in CycleState to VolumeRestrictions pre-computed data for Filtering.
// Using the name of the plugin will likely help us avoid collisions with other plugins.
preFilterStateKey = "PreFilter" + Name
// ErrReasonDiskConflict is used for NoDiskConflict predicate error.
ErrReasonDiskConflict = "node(s) had no available disk"
// ErrReasonReadWriteOncePodConflict is used when a pod is found using the same PVC with the ReadWriteOncePod access mode.
ErrReasonReadWriteOncePodConflict = "node has pod using PersistentVolumeClaim with the same name and ReadWriteOncePod access mode"
)
// preFilterState computed at PreFilter and used at Filter.
type preFilterState struct {
// Names of the pod's volumes using the ReadWriteOncePod access mode.
readWriteOncePodPVCs sets.Set[string]
// The number of references to these ReadWriteOncePod volumes by scheduled pods.
conflictingPVCRefCount int
}
func (s *preFilterState) updateWithPod(podInfo *framework.PodInfo, multiplier int) {
s.conflictingPVCRefCount += multiplier * s.conflictingPVCRefCountForPod(podInfo)
}
func (s *preFilterState) conflictingPVCRefCountForPod(podInfo *framework.PodInfo) int {
conflicts := 0
for _, volume := range podInfo.Pod.Spec.Volumes {
if volume.PersistentVolumeClaim == nil {
continue
}
if s.readWriteOncePodPVCs.Has(volume.PersistentVolumeClaim.ClaimName) {
conflicts += 1
}
}
return conflicts
}
// Clone the prefilter state.
func (s *preFilterState) Clone() framework.StateData {
if s == nil {
return nil
}
return &preFilterState{
readWriteOncePodPVCs: s.readWriteOncePodPVCs,
conflictingPVCRefCount: s.conflictingPVCRefCount,
}
}
// Name returns name of the plugin. It is used in logs, etc.
func (pl *VolumeRestrictions) Name() string {
return Name
}
func isVolumeConflict(volume *v1.Volume, pod *v1.Pod) bool {
for _, existingVolume := range pod.Spec.Volumes {
// Same GCE disk mounted by multiple pods conflicts unless all pods mount it read-only.
if volume.GCEPersistentDisk != nil && existingVolume.GCEPersistentDisk != nil {
disk, existingDisk := volume.GCEPersistentDisk, existingVolume.GCEPersistentDisk
if disk.PDName == existingDisk.PDName && !(disk.ReadOnly && existingDisk.ReadOnly) {
return true
}
}
if volume.AWSElasticBlockStore != nil && existingVolume.AWSElasticBlockStore != nil {
if volume.AWSElasticBlockStore.VolumeID == existingVolume.AWSElasticBlockStore.VolumeID {
return true
}
}
if volume.ISCSI != nil && existingVolume.ISCSI != nil {
iqn := volume.ISCSI.IQN
eiqn := existingVolume.ISCSI.IQN
// two ISCSI volumes are same, if they share the same iqn. As iscsi volumes are of type
// RWO or ROX, we could permit only one RW mount. Same iscsi volume mounted by multiple Pods
// conflict unless all other pods mount as read only.
if iqn == eiqn && !(volume.ISCSI.ReadOnly && existingVolume.ISCSI.ReadOnly) {
return true
}
}
if volume.RBD != nil && existingVolume.RBD != nil {
mon, pool, image := volume.RBD.CephMonitors, volume.RBD.RBDPool, volume.RBD.RBDImage
emon, epool, eimage := existingVolume.RBD.CephMonitors, existingVolume.RBD.RBDPool, existingVolume.RBD.RBDImage
// two RBDs images are the same if they share the same Ceph monitor, are in the same RADOS Pool, and have the same image name
// only one read-write mount is permitted for the same RBD image.
// same RBD image mounted by multiple Pods conflicts unless all Pods mount the image read-only
if haveOverlap(mon, emon) && pool == epool && image == eimage && !(volume.RBD.ReadOnly && existingVolume.RBD.ReadOnly) {
return true
}
}
}
return false
}
// haveOverlap searches two arrays and returns true if they have at least one common element; returns false otherwise.
func haveOverlap(a1, a2 []string) bool {
if len(a1) > len(a2) {
a1, a2 = a2, a1
}
m := sets.New(a1...)
for _, val := range a2 {
if _, ok := m[val]; ok {
return true
}
}
return false
}
// PreFilter computes and stores cycleState containing details for enforcing ReadWriteOncePod.
func (pl *VolumeRestrictions) PreFilter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) {
if !pl.enableReadWriteOncePod {
return nil, nil
}
pvcs, err := pl.readWriteOncePodPVCsForPod(ctx, pod)
if err != nil {
if apierrors.IsNotFound(err) {
return nil, framework.NewStatus(framework.UnschedulableAndUnresolvable, err.Error())
}
return nil, framework.AsStatus(err)
}
s, err := pl.calPreFilterState(ctx, pod, pvcs)
if err != nil {
return nil, framework.AsStatus(err)
}
cycleState.Write(preFilterStateKey, s)
return nil, nil
}
// AddPod from pre-computed data in cycleState.
func (pl *VolumeRestrictions) AddPod(ctx context.Context, cycleState *framework.CycleState, podToSchedule *v1.Pod, podInfoToAdd *framework.PodInfo, nodeInfo *framework.NodeInfo) *framework.Status {
if !pl.enableReadWriteOncePod {
return nil
}
state, err := getPreFilterState(cycleState)
if err != nil {
return framework.AsStatus(err)
}
state.updateWithPod(podInfoToAdd, 1)
return nil
}
// RemovePod from pre-computed data in cycleState.
func (pl *VolumeRestrictions) RemovePod(ctx context.Context, cycleState *framework.CycleState, podToSchedule *v1.Pod, podInfoToRemove *framework.PodInfo, nodeInfo *framework.NodeInfo) *framework.Status {
if !pl.enableReadWriteOncePod {
return nil
}
state, err := getPreFilterState(cycleState)
if err != nil {
return framework.AsStatus(err)
}
state.updateWithPod(podInfoToRemove, -1)
return nil
}
func getPreFilterState(cycleState *framework.CycleState) (*preFilterState, error) {
c, err := cycleState.Read(preFilterStateKey)
if err != nil {
// preFilterState doesn't exist, likely PreFilter wasn't invoked.
return nil, fmt.Errorf("cannot read %q from cycleState", preFilterStateKey)
}
s, ok := c.(*preFilterState)
if !ok {
return nil, fmt.Errorf("%+v convert to volumerestrictions.state error", c)
}
return s, nil
}
// calPreFilterState computes preFilterState describing which PVCs use ReadWriteOncePod
// and which pods in the cluster are in conflict.
func (pl *VolumeRestrictions) calPreFilterState(ctx context.Context, pod *v1.Pod, pvcs sets.Set[string]) (*preFilterState, error) {
conflictingPVCRefCount := 0
for pvc := range pvcs {
key := framework.GetNamespacedName(pod.Namespace, pvc)
if pl.sharedLister.StorageInfos().IsPVCUsedByPods(key) {
// There can only be at most one pod using the ReadWriteOncePod PVC.
conflictingPVCRefCount += 1
}
}
return &preFilterState{
readWriteOncePodPVCs: pvcs,
conflictingPVCRefCount: conflictingPVCRefCount,
}, nil
}
func (pl *VolumeRestrictions) readWriteOncePodPVCsForPod(ctx context.Context, pod *v1.Pod) (sets.Set[string], error) {
pvcs := sets.New[string]()
for _, volume := range pod.Spec.Volumes {
if volume.PersistentVolumeClaim == nil {
continue
}
pvc, err := pl.pvcLister.PersistentVolumeClaims(pod.Namespace).Get(volume.PersistentVolumeClaim.ClaimName)
if err != nil {
return nil, err
}
if !v1helper.ContainsAccessMode(pvc.Spec.AccessModes, v1.ReadWriteOncePod) {
continue
}
pvcs.Insert(pvc.Name)
}
return pvcs, nil
}
// Checks if scheduling the pod onto this node would cause any conflicts with
// existing volumes.
func satisfyVolumeConflicts(pod *v1.Pod, nodeInfo *framework.NodeInfo) bool {
for i := range pod.Spec.Volumes {
v := &pod.Spec.Volumes[i]
// fast path if there is no conflict checking targets.
if v.GCEPersistentDisk == nil && v.AWSElasticBlockStore == nil && v.RBD == nil && v.ISCSI == nil {
continue
}
for _, ev := range nodeInfo.Pods {
if isVolumeConflict(v, ev.Pod) {
return false
}
}
}
return true
}
// Checks if scheduling the pod would cause any ReadWriteOncePod PVC access mode conflicts.
func satisfyReadWriteOncePod(ctx context.Context, state *preFilterState) *framework.Status {
if state == nil {
return nil
}
if state.conflictingPVCRefCount > 0 {
return framework.NewStatus(framework.Unschedulable, ErrReasonReadWriteOncePodConflict)
}
return nil
}
// PreFilterExtensions returns prefilter extensions, pod add and remove.
func (pl *VolumeRestrictions) PreFilterExtensions() framework.PreFilterExtensions {
return pl
}
// Filter invoked at the filter extension point.
// It evaluates if a pod can fit due to the volumes it requests, and those that
// are already mounted. If there is already a volume mounted on that node, another pod that uses the same volume
// can't be scheduled there.
// This is GCE, Amazon EBS, ISCSI and Ceph RBD specific for now:
// - GCE PD allows multiple mounts as long as they're all read-only
// - AWS EBS forbids any two pods mounting the same volume ID
// - Ceph RBD forbids if any two pods share at least same monitor, and match pool and image, and the image is read-only
// - ISCSI forbids if any two pods share at least same IQN and ISCSI volume is read-only
// If the pod uses PVCs with the ReadWriteOncePod access mode, it evaluates if
// these PVCs are already in-use and if preemption will help.
func (pl *VolumeRestrictions) Filter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
if !satisfyVolumeConflicts(pod, nodeInfo) {
return framework.NewStatus(framework.Unschedulable, ErrReasonDiskConflict)
}
if !pl.enableReadWriteOncePod {
return nil
}
state, err := getPreFilterState(cycleState)
if err != nil {
return framework.AsStatus(err)
}
return satisfyReadWriteOncePod(ctx, state)
}
// EventsToRegister returns the possible events that may make a Pod
// failed by this plugin schedulable.
func (pl *VolumeRestrictions) EventsToRegister() []framework.ClusterEvent {
return []framework.ClusterEvent{
// Pods may fail to schedule because of volumes conflicting with other pods on same node.
// Once running pods are deleted and volumes have been released, the unschedulable pod will be schedulable.
// Due to immutable fields `spec.volumes`, pod update events are ignored.
{Resource: framework.Pod, ActionType: framework.Delete},
// A new Node may make a pod schedulable.
{Resource: framework.Node, ActionType: framework.Add},
// Pods may fail to schedule because the PVC it uses has not yet been created.
// This PVC is required to exist to check its access modes.
{Resource: framework.PersistentVolumeClaim, ActionType: framework.Add | framework.Update},
}
}
// New initializes a new plugin and returns it.
func New(_ runtime.Object, handle framework.Handle, fts feature.Features) (framework.Plugin, error) {
informerFactory := handle.SharedInformerFactory()
pvcLister := informerFactory.Core().V1().PersistentVolumeClaims().Lister()
sharedLister := handle.SnapshotSharedLister()
return &VolumeRestrictions{
pvcLister: pvcLister,
sharedLister: sharedLister,
enableReadWriteOncePod: fts.EnableReadWriteOncePod,
}, nil
}