Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: use pv annotation to trigger filesystem resize when necessary #99326

Merged
merged 1 commit into from Mar 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
38 changes: 31 additions & 7 deletions pkg/controller/volume/expand/expand_controller.go
Expand Up @@ -151,13 +151,18 @@ func NewExpandController(
return
}

oldSize := oldPVC.Spec.Resources.Requests[v1.ResourceStorage]
oldReq := oldPVC.Spec.Resources.Requests[v1.ResourceStorage]
oldCap := oldPVC.Status.Capacity[v1.ResourceStorage]
newPVC, ok := new.(*v1.PersistentVolumeClaim)
if !ok {
return
}
newSize := newPVC.Spec.Resources.Requests[v1.ResourceStorage]
if newSize.Cmp(oldSize) > 0 {
newReq := newPVC.Spec.Resources.Requests[v1.ResourceStorage]
newCap := newPVC.Status.Capacity[v1.ResourceStorage]
// PVC will be enqueued under 2 circumstances
// 1. User has increased PVC's request capacity --> volume needs to be expanded
// 2. PVC status capacity has been expanded --> claim's bound PV has likely recently gone through filesystem resize, so remove AnnPreResizeCapacity annotation from PV
if newReq.Cmp(oldReq) > 0 || newCap.Cmp(oldCap) > 0 {
expc.enqueuePVC(new)
}
},
Expand All @@ -173,10 +178,7 @@ func (expc *expandController) enqueuePVC(obj interface{}) {
return
}

size := pvc.Spec.Resources.Requests[v1.ResourceStorage]
statusSize := pvc.Status.Capacity[v1.ResourceStorage]

if pvc.Status.Phase == v1.ClaimBound && size.Cmp(statusSize) > 0 {
if pvc.Status.Phase == v1.ClaimBound {
key, err := kcache.DeletionHandlingMetaNamespaceKeyFunc(pvc)
if err != nil {
runtime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", pvc, err))
Expand Down Expand Up @@ -233,6 +235,16 @@ func (expc *expandController) syncHandler(key string) error {
return err
}

pvcRequestSize := pvc.Spec.Resources.Requests[v1.ResourceStorage]
pvcStatusSize := pvc.Status.Capacity[v1.ResourceStorage]

// call expand operation only under two condition
// 1. pvc's request size has been expanded and is larger than pvc's current status size
// 2. pv has an pre-resize capacity annotation
if pvcRequestSize.Cmp(pvcStatusSize) <= 0 && !metav1.HasAnnotation(pv.ObjectMeta, util.AnnPreResizeCapacity) {
return nil
}

volumeSpec := volume.NewSpecFromPersistentVolume(pv, false)
migratable, err := expc.csiMigratedPluginManager.IsMigratable(volumeSpec)
if err != nil {
Expand Down Expand Up @@ -285,6 +297,11 @@ func (expc *expandController) syncHandler(key string) error {
}

func (expc *expandController) expand(pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume, resizerName string) error {
// if node expand is complete and pv's annotation can be removed, remove the annotation from pv and return
if expc.isNodeExpandComplete(pvc, pv) && metav1.HasAnnotation(pv.ObjectMeta, util.AnnPreResizeCapacity) {
return util.DeleteAnnPreResizeCapacity(pv, expc.GetKubeClient())
}

pvc, err := util.MarkResizeInProgressWithResizer(pvc, resizerName, expc.kubeClient)
if err != nil {
klog.V(5).Infof("Error setting PVC %s in progress with error : %v", util.GetPersistentVolumeClaimQualifiedName(pvc), err)
Expand Down Expand Up @@ -337,6 +354,13 @@ func (expc *expandController) getPersistentVolume(pvc *v1.PersistentVolumeClaim)
return pv.DeepCopy(), nil
}

// isNodeExpandComplete returns true if pvc.Status.Capacity >= pv.Spec.Capacity
func (expc *expandController) isNodeExpandComplete(pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume) bool {
klog.V(4).Infof("pv %q capacity = %v, pvc %s capacity = %v", pv.Name, pv.Spec.Capacity[v1.ResourceStorage], pvc.ObjectMeta.Name, pvc.Status.Capacity[v1.ResourceStorage])
pvcCap, pvCap := pvc.Status.Capacity[v1.ResourceStorage], pv.Spec.Capacity[v1.ResourceStorage]
return pvcCap.Cmp(pvCap) >= 0
}

// Implementing VolumeHost interface
func (expc *expandController) GetPluginDir(pluginName string) string {
return ""
Expand Down
48 changes: 38 additions & 10 deletions pkg/controller/volume/expand/expand_controller_test.go
Expand Up @@ -24,6 +24,7 @@ import (
"testing"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
Expand All @@ -40,6 +41,7 @@ import (
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/awsebs"
"k8s.io/kubernetes/pkg/volume/csimigration"
"k8s.io/kubernetes/pkg/volume/util"
"k8s.io/kubernetes/pkg/volume/util/operationexecutor"
volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
)
Expand All @@ -57,30 +59,42 @@ func TestSyncHandler(t *testing.T) {
}{
{
name: "when pvc has no PV binding",
pvc: getFakePersistentVolumeClaim("no-pv-pvc", "", ""),
pvc: getFakePersistentVolumeClaim("no-pv-pvc", "", "1Gi", "1Gi", ""),
pvcKey: "default/no-pv-pvc",
hasError: true,
},
{
name: "when pvc and pv has everything for in-tree plugin",
pv: getFakePersistentVolume("vol-3", csitranslationplugins.AWSEBSInTreePluginName, "good-pvc-vol-3"),
pvc: getFakePersistentVolumeClaim("good-pvc", "vol-3", "good-pvc-vol-3"),
pv: getFakePersistentVolume("vol-3", csitranslationplugins.AWSEBSInTreePluginName, "1Gi", "good-pvc-vol-3"),
pvc: getFakePersistentVolumeClaim("good-pvc", "vol-3", "1Gi", "2Gi", "good-pvc-vol-3"),
pvcKey: "default/good-pvc",
expansionCalled: true,
expectedAnnotation: map[string]string{volumetypes.VolumeResizerKey: csitranslationplugins.AWSEBSInTreePluginName},
},
{
name: "if pv has pre-resize capacity annotation, generate expand operation should not be called",
pv: func() *v1.PersistentVolume {
pv := getFakePersistentVolume("vol-4", csitranslationplugins.AWSEBSInTreePluginName, "2Gi", "good-pvc-vol-4")
pv.ObjectMeta.Annotations = make(map[string]string)
pv.ObjectMeta.Annotations[util.AnnPreResizeCapacity] = "1Gi"
return pv
}(),
pvc: getFakePersistentVolumeClaim("good-pvc", "vol-4", "2Gi", "2Gi", "good-pvc-vol-4"),
pvcKey: "default/good-pvc",
expansionCalled: false,
},
{
name: "when csi migration is enabled for a in-tree plugin",
csiMigrationEnabled: true,
pv: getFakePersistentVolume("vol-4", csitranslationplugins.AWSEBSInTreePluginName, "csi-pvc-vol-4"),
pvc: getFakePersistentVolumeClaim("csi-pvc", "vol-4", "csi-pvc-vol-4"),
pv: getFakePersistentVolume("vol-4", csitranslationplugins.AWSEBSInTreePluginName, "1Gi", "csi-pvc-vol-5"),
pvc: getFakePersistentVolumeClaim("csi-pvc", "vol-4", "1Gi", "2Gi", "csi-pvc-vol-5"),
pvcKey: "default/csi-pvc",
expectedAnnotation: map[string]string{volumetypes.VolumeResizerKey: csitranslationplugins.AWSEBSDriverName},
},
{
name: "for csi plugin without migration path",
pv: getFakePersistentVolume("vol-5", "com.csi.ceph", "ceph-csi-pvc-vol-5"),
pvc: getFakePersistentVolumeClaim("ceph-csi-pvc", "vol-5", "ceph-csi-pvc-vol-5"),
pv: getFakePersistentVolume("vol-5", "com.csi.ceph", "1Gi", "ceph-csi-pvc-vol-6"),
pvc: getFakePersistentVolumeClaim("ceph-csi-pvc", "vol-5", "1Gi", "2Gi", "ceph-csi-pvc-vol-6"),
pvcKey: "default/ceph-csi-pvc",
expansionCalled: false,
hasError: false,
Expand Down Expand Up @@ -177,14 +191,17 @@ func applyPVCPatch(originalPVC *v1.PersistentVolumeClaim, patch []byte) (*v1.Per
return updatedPVC, nil
}

func getFakePersistentVolume(volumeName, pluginName string, pvcUID types.UID) *v1.PersistentVolume {
func getFakePersistentVolume(volumeName, pluginName string, size string, pvcUID types.UID) *v1.PersistentVolume {
pv := &v1.PersistentVolume{
ObjectMeta: metav1.ObjectMeta{Name: volumeName},
Spec: v1.PersistentVolumeSpec{
PersistentVolumeSource: v1.PersistentVolumeSource{},
ClaimRef: &v1.ObjectReference{
Namespace: "default",
},
Capacity: map[v1.ResourceName]resource.Quantity{
v1.ResourceStorage: resource.MustParse(size),
},
},
}
if pvcUID != "" {
Expand All @@ -205,10 +222,21 @@ func getFakePersistentVolume(volumeName, pluginName string, pvcUID types.UID) *v
return pv
}

func getFakePersistentVolumeClaim(pvcName, volumeName string, uid types.UID) *v1.PersistentVolumeClaim {
func getFakePersistentVolumeClaim(pvcName, volumeName, statusSize, requestSize string, uid types.UID) *v1.PersistentVolumeClaim {
pvc := &v1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{Name: pvcName, Namespace: "default", UID: uid},
Spec: v1.PersistentVolumeClaimSpec{},
Spec: v1.PersistentVolumeClaimSpec{
Resources: v1.ResourceRequirements{
Requests: map[v1.ResourceName]resource.Quantity{
v1.ResourceStorage: resource.MustParse(requestSize),
},
},
},
Status: v1.PersistentVolumeClaimStatus{
Capacity: map[v1.ResourceName]resource.Quantity{
v1.ResourceStorage: resource.MustParse(statusSize),
},
},
}
if volumeName != "" {
pvc.Spec.VolumeName = volumeName
Expand Down
16 changes: 15 additions & 1 deletion pkg/controller/volume/persistentvolume/pv_controller.go
Expand Up @@ -26,6 +26,7 @@ import (
v1 "k8s.io/api/core/v1"
storage "k8s.io/api/storage/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
utilfeature "k8s.io/apiserver/pkg/util/feature"
Expand Down Expand Up @@ -783,7 +784,20 @@ func (ctrl *PersistentVolumeController) updateClaimStatus(claim *v1.PersistentVo
return nil, fmt.Errorf("PersistentVolume %q is without a storage capacity", volume.Name)
}
claimCap, ok := claim.Status.Capacity[v1.ResourceStorage]
if !ok || volumeCap.Cmp(claimCap) != 0 {
// If PV has a resize annotation, set the claim's request capacity
if metav1.HasAnnotation(volume.ObjectMeta, util.AnnPreResizeCapacity) {
klog.V(2).Infof("volume %q requires filesystem resize: setting pvc %s status capacity to %s", volume.Name, claimToClaimKey(claim), volume.ObjectMeta.Annotations[util.AnnPreResizeCapacity])
preQty, err := resource.ParseQuantity(volume.ObjectMeta.Annotations[util.AnnPreResizeCapacity])
sunpa93 marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
klog.Warningf("Parsing pre-resize-capacity from PV(%q) failed", volume.Name, err)
preQty = volume.Spec.Capacity[v1.ResourceStorage]
}
if claimClone.Status.Capacity == nil {
claimClone.Status.Capacity = make(map[v1.ResourceName]resource.Quantity)
}
claimClone.Status.Capacity[v1.ResourceStorage] = preQty
dirty = true
} else if !ok || volumeCap.Cmp(claimCap) != 0 {
claimClone.Status.Capacity = volume.Spec.Capacity
dirty = true
}
Expand Down
12 changes: 12 additions & 0 deletions pkg/controller/volume/persistentvolume/pv_controller_test.go
Expand Up @@ -42,6 +42,7 @@ import (
pvutil "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/util"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/volume/csimigration"
"k8s.io/kubernetes/pkg/volume/util"
)

var (
Expand Down Expand Up @@ -89,6 +90,17 @@ func TestControllerSync(t *testing.T) {
return nil
},
},
{
"5-2-3 - complete bind when PV and PVC both exist and PV has AnnPreResizeCapacity annotation",
volumesWithAnnotation(util.AnnPreResizeCapacity, "1Gi", newVolumeArray("volume5-2", "2Gi", "", "", v1.VolumeAvailable, v1.PersistentVolumeReclaimRetain, classEmpty, pvutil.AnnBoundByController)),
volumesWithAnnotation(util.AnnPreResizeCapacity, "1Gi", newVolumeArray("volume5-2", "2Gi", "uid5-2", "claim5-2", v1.VolumeBound, v1.PersistentVolumeReclaimRetain, classEmpty, pvutil.AnnBoundByController)),
withExpectedCapacity("2Gi", newClaimArray("claim5-2", "uid5-2", "2Gi", "", v1.ClaimPending, nil)),
withExpectedCapacity("1Gi", newClaimArray("claim5-2", "uid5-2", "2Gi", "volume5-2", v1.ClaimBound, nil, pvutil.AnnBoundByController, pvutil.AnnBindCompleted)),
noevents, noerrors,
func(ctrl *PersistentVolumeController, reactor *pvtesting.VolumeReactor, test controllerTest) error {
return nil
},
},
{
// deleteClaim with a bound claim makes bound volume released.
"5-3 - delete claim",
Expand Down
8 changes: 8 additions & 0 deletions pkg/volume/util/operationexecutor/operation_generator.go
Expand Up @@ -1551,6 +1551,14 @@ func (og *operationGenerator) GenerateExpandVolumeFunc(
klog.Warning(detailedErr)
return volumetypes.NewOperationContext(nil, nil, migrated)
}
oldCapacity := pvc.Status.Capacity[v1.ResourceStorage]
err = util.AddAnnPreResizeCapacity(pv, oldCapacity, og.kubeClient)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a race here? Should the annotation be added before marking for fsresize?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't think of scenarios where there can be a race from top of my head. Could you point to when race could happen?

if err != nil {
detailedErr := fmt.Errorf("error updating pv %s annotation (%s) with pre-resize capacity %s: %v", pv.ObjectMeta.Name, util.AnnPreResizeCapacity, oldCapacity.String(), err)
klog.Warning(detailedErr)
return volumetypes.NewOperationContext(nil, nil, migrated)
}

}
return volumetypes.NewOperationContext(nil, nil, migrated)
}
Expand Down
68 changes: 57 additions & 11 deletions pkg/volume/util/resize_util.go
Expand Up @@ -21,7 +21,7 @@ import (
"encoding/json"
"fmt"

"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -39,6 +39,12 @@ var (
v1.PersistentVolumeClaimFileSystemResizePending: true,
v1.PersistentVolumeClaimResizing: true,
}

// AnnPreResizeCapacity annotation is added to a PV when expanding volume.
// Its value is status capacity of the PVC prior to the volume expansion
// Its value will be set by the external-resizer when it deems that filesystem resize is required after resizing volume.
// Its value will be used by pv_controller to determine pvc's status capacity when binding pvc and pv.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a comment when the annotation is supposed to be removed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do :)

AnnPreResizeCapacity = "volume.alpha.kubernetes.io/pre-resize-capacity"
)

type resizeProcessStatus struct {
Expand All @@ -57,27 +63,67 @@ func UpdatePVSize(
newSize resource.Quantity,
kubeClient clientset.Interface) error {
pvClone := pv.DeepCopy()
pvClone.Spec.Capacity[v1.ResourceStorage] = newSize
sunpa93 marked this conversation as resolved.
Show resolved Hide resolved

oldData, err := json.Marshal(pvClone)
if err != nil {
return fmt.Errorf("unexpected error marshaling old PV %q with error : %v", pvClone.Name, err)
return PatchPV(pv, pvClone, kubeClient)
}

// AddAnnPreResizeCapacity adds volume.alpha.kubernetes.io/pre-resize-capacity from the pv
func AddAnnPreResizeCapacity(
pv *v1.PersistentVolume,
oldCapacity resource.Quantity,
kubeClient clientset.Interface) error {
// if the pv already has a resize annotation skip the process
if metav1.HasAnnotation(pv.ObjectMeta, AnnPreResizeCapacity) {
return nil
}

pvClone.Spec.Capacity[v1.ResourceStorage] = newSize
pvClone := pv.DeepCopy()
if pvClone.ObjectMeta.Annotations == nil {
pvClone.ObjectMeta.Annotations = make(map[string]string)
}
pvClone.ObjectMeta.Annotations[AnnPreResizeCapacity] = oldCapacity.String()

return PatchPV(pv, pvClone, kubeClient)
}

// DeleteAnnPreResizeCapacity deletes volume.alpha.kubernetes.io/pre-resize-capacity from the pv
func DeleteAnnPreResizeCapacity(
pv *v1.PersistentVolume,
kubeClient clientset.Interface) error {
// if the pv does not have a resize annotation skip the entire process
if !metav1.HasAnnotation(pv.ObjectMeta, AnnPreResizeCapacity) {
return nil
}
pvClone := pv.DeepCopy()
delete(pvClone.ObjectMeta.Annotations, AnnPreResizeCapacity)

return PatchPV(pv, pvClone, kubeClient)
}

// PatchPV creates and executes a patch for pv
func PatchPV(
oldPV *v1.PersistentVolume,
newPV *v1.PersistentVolume,
kubeClient clientset.Interface) error {
oldData, err := json.Marshal(oldPV)
if err != nil {
return fmt.Errorf("unexpected error marshaling old PV %q with error : %v", oldPV.Name, err)
}

newData, err := json.Marshal(pvClone)
newData, err := json.Marshal(newPV)
if err != nil {
return fmt.Errorf("unexpected error marshaling new PV %q with error : %v", pvClone.Name, err)
return fmt.Errorf("unexpected error marshaling new PV %q with error : %v", newPV.Name, err)
}

patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, pvClone)
patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, oldPV)
if err != nil {
return fmt.Errorf("error Creating two way merge patch for PV %q with error : %v", pvClone.Name, err)
return fmt.Errorf("error Creating two way merge patch for PV %q with error : %v", oldPV.Name, err)
}

_, err = kubeClient.CoreV1().PersistentVolumes().Patch(context.TODO(), pvClone.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{})
_, err = kubeClient.CoreV1().PersistentVolumes().Patch(context.TODO(), oldPV.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{})
if err != nil {
return fmt.Errorf("error Patching PV %q with error : %v", pvClone.Name, err)
return fmt.Errorf("error Patching PV %q with error : %v", oldPV.Name, err)
}
return nil
}
Expand Down