/
node_expander.go
151 lines (127 loc) · 5.85 KB
/
node_expander.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package operationexecutor
import (
"fmt"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
kevents "k8s.io/kubernetes/pkg/kubelet/events"
"k8s.io/kubernetes/pkg/volume/util"
volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
)
type NodeExpander struct {
nodeResizeOperationOpts
kubeClient clientset.Interface
recorder record.EventRecorder
// computed via precheck
pvcStatusCap resource.Quantity
pvCap resource.Quantity
resizeStatus *v1.PersistentVolumeClaimResizeStatus
// pvcAlreadyUpdated if true indicates that although we are calling NodeExpandVolume on the kubelet
// PVC has already been updated - possibly because expansion already succeeded on different node.
// This can happen when a RWX PVC is expanded.
pvcAlreadyUpdated bool
}
func newNodeExpander(resizeOp nodeResizeOperationOpts, client clientset.Interface, recorder record.EventRecorder) *NodeExpander {
return &NodeExpander{
kubeClient: client,
nodeResizeOperationOpts: resizeOp,
recorder: recorder,
}
}
// testResponseData is merely used for doing sanity checks in unit tests
type testResponseData struct {
// indicates that resize operation was called on underlying volume driver
// mainly useful for testing.
resizeCalledOnPlugin bool
// Indicates whether kubelet should assume resize operation as finished.
// For kubelet - resize operation could be assumed as finished even if
// actual resizing is *not* finished. This can happen, because certain prechecks
// are failing and kubelet should not retry expansion, or it could happen
// because resize operation is genuinely finished.
assumeResizeFinished bool
}
// runPreCheck performs some sanity checks before expansion can be performed on the PVC.
func (ne *NodeExpander) runPreCheck() bool {
ne.pvcStatusCap = ne.pvc.Status.Capacity[v1.ResourceStorage]
ne.pvCap = ne.pv.Spec.Capacity[v1.ResourceStorage]
ne.resizeStatus = ne.pvc.Status.ResizeStatus
// PVC is already expanded but we are still trying to expand the volume because
// last recorded size in ASOW is older. This can happen for RWX volume types.
if ne.pvcStatusCap.Cmp(ne.pluginResizeOpts.NewSize) >= 0 && (ne.resizeStatus == nil || *ne.resizeStatus == v1.PersistentVolumeClaimNoExpansionInProgress) {
ne.pvcAlreadyUpdated = true
}
// if resizestatus is nil or NodeExpansionInProgress or NodeExpansionPending then we
// should allow volume expansion on the node to proceed. We are making an exception for
// resizeStatus being nil because it will support use cases where
// resizeStatus may not be set (old control-plane expansion controller etc).
if ne.resizeStatus == nil ||
ne.pvcAlreadyUpdated ||
*ne.resizeStatus == v1.PersistentVolumeClaimNodeExpansionPending ||
*ne.resizeStatus == v1.PersistentVolumeClaimNodeExpansionInProgress {
return true
}
return false
}
func (ne *NodeExpander) expandOnPlugin() (bool, error, testResponseData) {
allowExpansion := ne.runPreCheck()
if !allowExpansion {
return false, nil, testResponseData{false, true}
}
var err error
nodeName := ne.vmt.Pod.Spec.NodeName
if !ne.pvcAlreadyUpdated {
ne.pvc, err = util.MarkNodeExpansionInProgress(ne.pvc, ne.kubeClient)
if err != nil {
msg := ne.vmt.GenerateErrorDetailed("MountVolume.NodeExpandVolume failed to mark node expansion in progress: %v", err)
klog.Errorf(msg.Error())
return false, err, testResponseData{}
}
}
_, resizeErr := ne.volumePlugin.NodeExpand(ne.pluginResizeOpts)
if resizeErr != nil {
if volumetypes.IsOperationFinishedError(resizeErr) {
var markFailedError error
ne.pvc, markFailedError = util.MarkNodeExpansionFailed(ne.pvc, ne.kubeClient)
if markFailedError != nil {
klog.Errorf(ne.vmt.GenerateErrorDetailed("MountMount.NodeExpandVolume failed to mark node expansion as failed: %v", err).Error())
}
}
// if driver returned FailedPrecondition error that means
// volume expansion should not be retried on this node but
// expansion operation should not block mounting
if volumetypes.IsFailedPreconditionError(resizeErr) {
ne.actualStateOfWorld.MarkForInUseExpansionError(ne.vmt.VolumeName)
klog.Errorf(ne.vmt.GenerateErrorDetailed("MountVolume.NodeExapndVolume failed with %v", resizeErr).Error())
return false, nil, testResponseData{assumeResizeFinished: true, resizeCalledOnPlugin: true}
}
return false, resizeErr, testResponseData{assumeResizeFinished: true, resizeCalledOnPlugin: true}
}
simpleMsg, detailedMsg := ne.vmt.GenerateMsg("MountVolume.NodeExpandVolume succeeded", nodeName)
ne.recorder.Eventf(ne.vmt.Pod, v1.EventTypeNormal, kevents.FileSystemResizeSuccess, simpleMsg)
ne.recorder.Eventf(ne.pvc, v1.EventTypeNormal, kevents.FileSystemResizeSuccess, simpleMsg)
klog.InfoS(detailedMsg, "pod", klog.KObj(ne.vmt.Pod))
// no need to update PVC object if we already updated it
if ne.pvcAlreadyUpdated {
return true, nil, testResponseData{true, true}
}
// File system resize succeeded, now update the PVC's Capacity to match the PV's
ne.pvc, err = util.MarkFSResizeFinished(ne.pvc, ne.pluginResizeOpts.NewSize, ne.kubeClient)
if err != nil {
return true, fmt.Errorf("mountVolume.NodeExpandVolume update pvc status failed: %v", err), testResponseData{true, true}
}
return true, nil, testResponseData{true, true}
}