/
volume_zone.go
312 lines (277 loc) · 10.9 KB
/
volume_zone.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package volumezone
import (
"context"
"errors"
"fmt"
v1 "k8s.io/api/core/v1"
storage "k8s.io/api/storage/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
corelisters "k8s.io/client-go/listers/core/v1"
storagelisters "k8s.io/client-go/listers/storage/v1"
volumehelpers "k8s.io/cloud-provider/volume/helpers"
storagehelpers "k8s.io/component-helpers/storage/volume"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
)
// VolumeZone is a plugin that checks volume zone.
type VolumeZone struct {
pvLister corelisters.PersistentVolumeLister
pvcLister corelisters.PersistentVolumeClaimLister
scLister storagelisters.StorageClassLister
}
var _ framework.FilterPlugin = &VolumeZone{}
var _ framework.PreFilterPlugin = &VolumeZone{}
var _ framework.EnqueueExtensions = &VolumeZone{}
const (
// Name is the name of the plugin used in the plugin registry and configurations.
Name = names.VolumeZone
preFilterStateKey framework.StateKey = "PreFilter" + Name
// ErrReasonConflict is used for NoVolumeZoneConflict predicate error.
ErrReasonConflict = "node(s) had no available volume zone"
)
// pvTopology holds the value of a pv's topologyLabel
type pvTopology struct {
pvName string
key string
values sets.Set[string]
}
// the state is initialized in PreFilter phase. because we save the pointer in
// framework.CycleState, in the later phases we don't need to call Write method
// to update the value
type stateData struct {
// podPVTopologies holds the pv information we need
// it's initialized in the PreFilter phase
podPVTopologies []pvTopology
}
func (d *stateData) Clone() framework.StateData {
return d
}
var topologyLabels = []string{
v1.LabelFailureDomainBetaZone,
v1.LabelFailureDomainBetaRegion,
v1.LabelTopologyZone,
v1.LabelTopologyRegion,
}
func translateToGALabel(label string) string {
if label == v1.LabelFailureDomainBetaRegion {
return v1.LabelTopologyRegion
}
if label == v1.LabelFailureDomainBetaZone {
return v1.LabelTopologyZone
}
return label
}
// Name returns name of the plugin. It is used in logs, etc.
func (pl *VolumeZone) Name() string {
return Name
}
// PreFilter invoked at the prefilter extension point
//
// # It finds the topology of the PersistentVolumes corresponding to the volumes a pod requests
//
// Currently, this is only supported with PersistentVolumeClaims,
// and only looks for the bound PersistentVolume.
func (pl *VolumeZone) PreFilter(ctx context.Context, cs *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) {
podPVTopologies, status := pl.getPVbyPod(ctx, pod)
if !status.IsSuccess() {
return nil, status
}
if len(podPVTopologies) == 0 {
return nil, framework.NewStatus(framework.Skip)
}
cs.Write(preFilterStateKey, &stateData{podPVTopologies: podPVTopologies})
return nil, nil
}
func (pl *VolumeZone) getPVbyPod(ctx context.Context, pod *v1.Pod) ([]pvTopology, *framework.Status) {
logger := klog.FromContext(ctx)
podPVTopologies := make([]pvTopology, 0)
for i := range pod.Spec.Volumes {
volume := pod.Spec.Volumes[i]
if volume.PersistentVolumeClaim == nil {
continue
}
pvcName := volume.PersistentVolumeClaim.ClaimName
if pvcName == "" {
return nil, framework.NewStatus(framework.UnschedulableAndUnresolvable, "PersistentVolumeClaim had no name")
}
pvc, err := pl.pvcLister.PersistentVolumeClaims(pod.Namespace).Get(pvcName)
if s := getErrorAsStatus(err); !s.IsSuccess() {
return nil, s
}
pvName := pvc.Spec.VolumeName
if pvName == "" {
scName := storagehelpers.GetPersistentVolumeClaimClass(pvc)
if len(scName) == 0 {
return nil, framework.NewStatus(framework.UnschedulableAndUnresolvable, "PersistentVolumeClaim had no pv name and storageClass name")
}
class, err := pl.scLister.Get(scName)
if s := getErrorAsStatus(err); !s.IsSuccess() {
return nil, s
}
if class.VolumeBindingMode == nil {
return nil, framework.NewStatus(framework.UnschedulableAndUnresolvable, fmt.Sprintf("VolumeBindingMode not set for StorageClass %q", scName))
}
if *class.VolumeBindingMode == storage.VolumeBindingWaitForFirstConsumer {
// Skip unbound volumes
continue
}
return nil, framework.NewStatus(framework.UnschedulableAndUnresolvable, "PersistentVolume had no name")
}
pv, err := pl.pvLister.Get(pvName)
if s := getErrorAsStatus(err); !s.IsSuccess() {
return nil, s
}
for _, key := range topologyLabels {
if value, ok := pv.ObjectMeta.Labels[key]; ok {
volumeVSet, err := volumehelpers.LabelZonesToSet(value)
if err != nil {
logger.Info("Failed to parse label, ignoring the label", "label", fmt.Sprintf("%s:%s", key, value), "err", err)
continue
}
podPVTopologies = append(podPVTopologies, pvTopology{
pvName: pv.Name,
key: key,
values: sets.Set[string](volumeVSet),
})
}
}
}
return podPVTopologies, nil
}
// PreFilterExtensions returns prefilter extensions, pod add and remove.
func (pl *VolumeZone) PreFilterExtensions() framework.PreFilterExtensions {
return nil
}
// Filter invoked at the filter extension point.
//
// It evaluates if a pod can fit due to the volumes it requests, given
// that some volumes may have zone scheduling constraints. The requirement is that any
// volume zone-labels must match the equivalent zone-labels on the node. It is OK for
// the node to have more zone-label constraints (for example, a hypothetical replicated
// volume might allow region-wide access)
//
// Currently this is only supported with PersistentVolumeClaims, and looks to the labels
// only on the bound PersistentVolume.
//
// Working with volumes declared inline in the pod specification (i.e. not
// using a PersistentVolume) is likely to be harder, as it would require
// determining the zone of a volume during scheduling, and that is likely to
// require calling out to the cloud provider. It seems that we are moving away
// from inline volume declarations anyway.
func (pl *VolumeZone) Filter(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
logger := klog.FromContext(ctx)
// If a pod doesn't have any volume attached to it, the predicate will always be true.
// Thus we make a fast path for it, to avoid unnecessary computations in this case.
if len(pod.Spec.Volumes) == 0 {
return nil
}
var podPVTopologies []pvTopology
state, err := getStateData(cs)
if err != nil {
// Fallback to calculate pv list here
var status *framework.Status
podPVTopologies, status = pl.getPVbyPod(ctx, pod)
if !status.IsSuccess() {
return status
}
} else {
podPVTopologies = state.podPVTopologies
}
node := nodeInfo.Node()
hasAnyNodeConstraint := false
for _, topologyLabel := range topologyLabels {
if _, ok := node.Labels[topologyLabel]; ok {
hasAnyNodeConstraint = true
break
}
}
if !hasAnyNodeConstraint {
// The node has no zone constraints, so we're OK to schedule.
// This is to handle a single-zone cluster scenario where the node may not have any topology labels.
return nil
}
for _, pvTopology := range podPVTopologies {
v, ok := node.Labels[pvTopology.key]
if !ok {
// if we can't match the beta label, try to match pv's beta label with node's ga label
v, ok = node.Labels[translateToGALabel(pvTopology.key)]
}
if !ok || !pvTopology.values.Has(v) {
logger.V(10).Info("Won't schedule pod onto node due to volume (mismatch on label key)", "pod", klog.KObj(pod), "node", klog.KObj(node), "PV", klog.KRef("", pvTopology.pvName), "PVLabelKey", pvTopology.key)
return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReasonConflict)
}
}
return nil
}
func getStateData(cs *framework.CycleState) (*stateData, error) {
state, err := cs.Read(preFilterStateKey)
if err != nil {
return nil, err
}
s, ok := state.(*stateData)
if !ok {
return nil, errors.New("unable to convert state into stateData")
}
return s, nil
}
func getErrorAsStatus(err error) *framework.Status {
if err != nil {
if apierrors.IsNotFound(err) {
return framework.NewStatus(framework.UnschedulableAndUnresolvable, err.Error())
}
return framework.AsStatus(err)
}
return nil
}
// EventsToRegister returns the possible events that may make a Pod
// failed by this plugin schedulable.
func (pl *VolumeZone) EventsToRegister() []framework.ClusterEventWithHint {
return []framework.ClusterEventWithHint{
// New storageClass with bind mode `VolumeBindingWaitForFirstConsumer` will make a pod schedulable.
// Due to immutable field `storageClass.volumeBindingMode`, storageClass update events are ignored.
{Event: framework.ClusterEvent{Resource: framework.StorageClass, ActionType: framework.Add}},
// A new node or updating a node's volume zone labels may make a pod schedulable.
//
// A note about UpdateNodeTaint event:
// NodeAdd QueueingHint isn't always called because of the internal feature called preCheck.
// As a common problematic scenario,
// when a node is added but not ready, NodeAdd event is filtered out by preCheck and doesn't arrive.
// In such cases, this plugin may miss some events that actually make pods schedulable.
// As a workaround, we add UpdateNodeTaint event to catch the case.
// We can remove UpdateNodeTaint when we remove the preCheck feature.
// See: https://github.com/kubernetes/kubernetes/issues/110175
{Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.UpdateNodeLabel | framework.UpdateNodeTaint}},
// A new pvc may make a pod schedulable.
// Due to fields are immutable except `spec.resources`, pvc update events are ignored.
{Event: framework.ClusterEvent{Resource: framework.PersistentVolumeClaim, ActionType: framework.Add}},
// A new pv or updating a pv's volume zone labels may make a pod schedulable.
{Event: framework.ClusterEvent{Resource: framework.PersistentVolume, ActionType: framework.Add | framework.Update}},
}
}
// New initializes a new plugin and returns it.
func New(_ context.Context, _ runtime.Object, handle framework.Handle) (framework.Plugin, error) {
informerFactory := handle.SharedInformerFactory()
pvLister := informerFactory.Core().V1().PersistentVolumes().Lister()
pvcLister := informerFactory.Core().V1().PersistentVolumeClaims().Lister()
scLister := informerFactory.Storage().V1().StorageClasses().Lister()
return &VolumeZone{
pvLister,
pvcLister,
scLister,
}, nil
}