Skip to content

Commit

Permalink
Patch newly dynamically provisioned PV with volume info to restore cu…
Browse files Browse the repository at this point in the history
…stom setting of PV

Signed-off-by: allenxu404 <qix2@vmware.com>
  • Loading branch information
allenxu404 committed Mar 18, 2024
1 parent 4d54861 commit ff65e79
Show file tree
Hide file tree
Showing 10 changed files with 568 additions and 20 deletions.
1 change: 1 addition & 0 deletions changelogs/unreleased/7504-allenxu404
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Patch newly dynamically provisioned PV with volume info to restore custom setting of PV
6 changes: 6 additions & 0 deletions pkg/builder/persistent_volume_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,3 +108,9 @@ func (b *PersistentVolumeBuilder) NodeAffinityRequired(req *corev1api.NodeSelect
}
return b
}

// Phase sets the PersistentVolume's phase.
func (b *PersistentVolumeBuilder) Phase(phase corev1api.PersistentVolumePhase) *PersistentVolumeBuilder {
b.object.Status.Phase = phase
return b
}
1 change: 1 addition & 0 deletions pkg/cmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -992,6 +992,7 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string
newPluginManager,
backupStoreGetter,
s.metrics,
s.crClient,

Check warning on line 995 in pkg/cmd/server/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/cmd/server/server.go#L995

Added line #L995 was not covered by tests
).SetupWithManager(s.mgr); err != nil {
s.logger.Fatal(err, "unable to create controller", "controller", controller.RestoreFinalizer)
}
Expand Down
186 changes: 183 additions & 3 deletions pkg/controller/restore_finalizer_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,36 @@ package controller

import (
"context"
"fmt"
"regexp"
"sync"
"time"

"github.com/pkg/errors"
"github.com/sirupsen/logrus"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/wait"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/clock"

internalVolume "github.com/vmware-tanzu/velero/internal/volume"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
"github.com/vmware-tanzu/velero/pkg/metrics"
"github.com/vmware-tanzu/velero/pkg/persistence"
"github.com/vmware-tanzu/velero/pkg/plugin/clientmgmt"
"github.com/vmware-tanzu/velero/pkg/restore"
kubeutil "github.com/vmware-tanzu/velero/pkg/util/kube"
"github.com/vmware-tanzu/velero/pkg/util/results"
)

const (
PVPatchMaximumDuration = 10 * time.Minute
)

type restoreFinalizerReconciler struct {
client.Client
namespace string
Expand All @@ -44,6 +56,7 @@ type restoreFinalizerReconciler struct {
backupStoreGetter persistence.ObjectBackupStoreGetter
metrics *metrics.ServerMetrics
clock clock.WithTickerAndDelayedExecution
crClient client.Client
}

func NewRestoreFinalizerReconciler(
Expand All @@ -53,6 +66,7 @@ func NewRestoreFinalizerReconciler(
newPluginManager func(logrus.FieldLogger) clientmgmt.Manager,
backupStoreGetter persistence.ObjectBackupStoreGetter,
metrics *metrics.ServerMetrics,
crClient client.Client,
) *restoreFinalizerReconciler {
return &restoreFinalizerReconciler{
Client: client,
Expand All @@ -62,6 +76,7 @@ func NewRestoreFinalizerReconciler(
backupStoreGetter: backupStoreGetter,
metrics: metrics,
clock: &clock.RealClock{},
crClient: crClient,
}
}

Expand Down Expand Up @@ -123,7 +138,27 @@ func (r *restoreFinalizerReconciler) Reconcile(ctx context.Context, req ctrl.Req
return ctrl.Result{}, errors.Wrap(err, "error getting backup store")
}

finalizerCtx := &finalizerContext{log: log}
volumeInfo, err := backupStore.GetBackupVolumeInfos(restore.Spec.BackupName)
if err != nil {
log.WithError(err).Errorf("error getting volumeInfo for backup %s", restore.Spec.BackupName)
return ctrl.Result{}, errors.Wrap(err, "error getting volumeInfo")
}

Check warning on line 145 in pkg/controller/restore_finalizer_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/restore_finalizer_controller.go#L143-L145

Added lines #L143 - L145 were not covered by tests

restoredResourceList, err := backupStore.GetRestoredResourceList(restore.Name)
if err != nil {
log.WithError(err).Error("error getting restoredResourceList")
return ctrl.Result{}, errors.Wrap(err, "error getting restoredResourceList")
}

Check warning on line 151 in pkg/controller/restore_finalizer_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/restore_finalizer_controller.go#L149-L151

Added lines #L149 - L151 were not covered by tests

restoredPVCList := getRestoredPVCFromRestoredResourceList(restoredResourceList)

finalizerCtx := &finalizerContext{
logger: log,
restore: restore,
crClient: r.crClient,
volumeInfo: volumeInfo,
restoredPVCList: restoredPVCList,
}
warnings, errs := finalizerCtx.execute()

warningCnt := len(warnings.Velero) + len(warnings.Cluster)
Expand Down Expand Up @@ -200,14 +235,159 @@ func (r *restoreFinalizerReconciler) finishProcessing(restorePhase velerov1api.R
// finalizerContext includes all the dependencies required by finalization tasks and
// a function execute() to orderly implement task logic.
type finalizerContext struct {
log logrus.FieldLogger
logger logrus.FieldLogger
restore *velerov1api.Restore
crClient client.Client
volumeInfo []*internalVolume.VolumeInfo
restoredPVCList map[string]struct{}
}

func (ctx *finalizerContext) execute() (results.Result, results.Result) { //nolint:unparam //temporarily ignore the lint report: result 0 is always nil (unparam)
warnings, errs := results.Result{}, results.Result{}

// implement finalization tasks
ctx.log.Debug("Starting running execute()")
pdpErrs := ctx.patchDynamicPVWithVolumeInfo()
errs.Merge(&pdpErrs)

return warnings, errs
}

// patchDynamicPV patches newly dynamically provisioned PV using volume info
// in order to restore custom settings that would otherwise be lost during dynamic PV recreation.
func (ctx *finalizerContext) patchDynamicPVWithVolumeInfo() (errs results.Result) {
ctx.logger.Info("patching newly dynamically provisioned PV starts")

var pvWaitGroup sync.WaitGroup
var resultLock sync.Mutex
maxConcurrency := 3
semaphore := make(chan struct{}, maxConcurrency)

for _, volumeItem := range ctx.volumeInfo {
if (volumeItem.BackupMethod == internalVolume.PodVolumeBackup || volumeItem.BackupMethod == internalVolume.CSISnapshot) && volumeItem.PVInfo != nil {
// Determine restored PVC namespace
restoredNamespace := volumeItem.PVCNamespace
if remapped, ok := ctx.restore.Spec.NamespaceMapping[restoredNamespace]; ok {
restoredNamespace = remapped
}

// Check if PVC was restored in previous phase
pvcKey := fmt.Sprintf("%s/%s", restoredNamespace, volumeItem.PVCName)
if _, restored := ctx.restoredPVCList[pvcKey]; !restored {
continue

Check warning on line 276 in pkg/controller/restore_finalizer_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/restore_finalizer_controller.go#L276

Added line #L276 was not covered by tests
}

pvWaitGroup.Add(1)
go func(volInfo internalVolume.VolumeInfo, restoredNamespace string) {
defer pvWaitGroup.Done()

semaphore <- struct{}{}

log := ctx.logger.WithField("PVC", volInfo.PVCName).WithField("PVCNamespace", restoredNamespace)
log.Debug("patching dynamic PV is in progress")
err := wait.PollImmediate(10*time.Second, PVPatchMaximumDuration, func() (bool, error) { //nolint:staticcheck //temporarily ignore the lint report as suggested func PollUntilContextTimeout is not available
// wait for PVC to be bound
pvc := &v1.PersistentVolumeClaim{}
err := ctx.crClient.Get(context.Background(), client.ObjectKey{Name: volInfo.PVCName, Namespace: restoredNamespace}, pvc)
if apierrors.IsNotFound(err) {
log.Debug("error not finding PVC")
return false, nil
}

Check warning on line 294 in pkg/controller/restore_finalizer_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/restore_finalizer_controller.go#L292-L294

Added lines #L292 - L294 were not covered by tests
if err != nil {
return false, err
}

Check warning on line 297 in pkg/controller/restore_finalizer_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/restore_finalizer_controller.go#L296-L297

Added lines #L296 - L297 were not covered by tests

if pvc.Status.Phase != v1.ClaimBound || pvc.Spec.VolumeName == "" {
log.Debugf("PVC: %s not ready", pvc.Name)
return false, nil
}

Check warning on line 302 in pkg/controller/restore_finalizer_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/restore_finalizer_controller.go#L300-L302

Added lines #L300 - L302 were not covered by tests

// wait for PV to be bound
pvName := pvc.Spec.VolumeName
pv := &v1.PersistentVolume{}
err = ctx.crClient.Get(context.Background(), client.ObjectKey{Name: pvName}, pv)
if apierrors.IsNotFound(err) {
log.Debugf("error not finding PV: %s", pvName)
return false, nil
}

Check warning on line 311 in pkg/controller/restore_finalizer_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/restore_finalizer_controller.go#L309-L311

Added lines #L309 - L311 were not covered by tests
if err != nil {
return false, err
}

Check warning on line 314 in pkg/controller/restore_finalizer_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/restore_finalizer_controller.go#L313-L314

Added lines #L313 - L314 were not covered by tests

if pv.Spec.ClaimRef == nil || pv.Status.Phase != v1.VolumeBound {
log.Debugf("PV: %s not ready", pvName)
return false, nil
}

Check warning on line 319 in pkg/controller/restore_finalizer_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/restore_finalizer_controller.go#L317-L319

Added lines #L317 - L319 were not covered by tests

// validate PV
if pv.Spec.ClaimRef.Name != pvc.Name || pv.Spec.ClaimRef.Namespace != restoredNamespace {
return false, fmt.Errorf("PV was bound by unexpected PVC, unexpected PVC: %s/%s, expected PVC: %s/%s",
pv.Spec.ClaimRef.Namespace, pv.Spec.ClaimRef.Name, restoredNamespace, pvc.Name)
}

// patch PV's reclaim policy and label using the corresponding data stored in volume info
if needPatch(pv, volInfo.PVInfo) {
updatedPV := pv.DeepCopy()
updatedPV.Labels = volInfo.PVInfo.Labels
updatedPV.Spec.PersistentVolumeReclaimPolicy = v1.PersistentVolumeReclaimPolicy(volInfo.PVInfo.ReclaimPolicy)
if err := kubeutil.PatchResource(pv, updatedPV, ctx.crClient); err != nil {
return false, err
}

Check warning on line 334 in pkg/controller/restore_finalizer_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/restore_finalizer_controller.go#L333-L334

Added lines #L333 - L334 were not covered by tests
log.Infof("newly dynamically provisioned PV:%s has been patched using volume info", pvName)
}

return true, nil
})

if err != nil {
err = fmt.Errorf("fail to patch dynamic PV, err: %s, PVC: %s, PV: %s", err, volInfo.PVCName, volInfo.PVName)
ctx.logger.WithError(errors.WithStack((err))).Error("err patching dynamic PV using volume info")
resultLock.Lock()
defer resultLock.Unlock()
errs.Add(restoredNamespace, err)
}

<-semaphore
}(*volumeItem, restoredNamespace)
}
}

pvWaitGroup.Wait()
ctx.logger.Info("patching newly dynamically provisioned PV ends")

return errs
}

func getRestoredPVCFromRestoredResourceList(restoredResourceList map[string][]string) map[string]struct{} {
pvcKey := "v1/PersistentVolumeClaim"
pvcList := make(map[string]struct{})

for _, pvc := range restoredResourceList[pvcKey] {
// the format of pvc string in restoredResourceList is like: "namespace/pvcName(status)"
// extract the substring before "(created)" if the status in rightmost Parenthesis is "created"
r := regexp.MustCompile(`\(([^)]+)\)`)
matches := r.FindAllStringSubmatch(pvc, -1)
if len(matches) > 0 && matches[len(matches)-1][1] == restore.ItemRestoreResultCreated {
pvcList[pvc[:len(pvc)-len("(created)")]] = struct{}{}
}
}

return pvcList
}

func needPatch(newPV *v1.PersistentVolume, pvInfo *internalVolume.PVInfo) bool {
if newPV.Spec.PersistentVolumeReclaimPolicy != v1.PersistentVolumeReclaimPolicy(pvInfo.ReclaimPolicy) {
return true
}

newPVLabels, pvLabels := newPV.Labels, pvInfo.Labels
for k, v := range pvLabels {
if _, ok := newPVLabels[k]; !ok {
return true
}

Check warning on line 386 in pkg/controller/restore_finalizer_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/restore_finalizer_controller.go#L385-L386

Added lines #L385 - L386 were not covered by tests
if newPVLabels[k] != v {
return true
}

Check warning on line 389 in pkg/controller/restore_finalizer_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/restore_finalizer_controller.go#L388-L389

Added lines #L388 - L389 were not covered by tests
}

return false
}
Loading

0 comments on commit ff65e79

Please sign in to comment.