Skip to content

Commit

Permalink
Remove Wait VolumeSnapshot to ReadyToUse logic.
Browse files Browse the repository at this point in the history
Due to the logic moving to plugin, and the plugin cannot read the
Velero server's resourceTimeout setting, add the resourceTimeout
in the backup annotation to pass to plugin.

Signed-off-by: Xun Jiang <blackpiglet@gmail.com>
  • Loading branch information
Xun Jiang committed May 31, 2023
1 parent 3ad091d commit 800f6cc
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 332 deletions.
1 change: 1 addition & 0 deletions changelogs/unreleased/6327-blackpiglet
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Remove Wait VolumeSnapshot to ReadyToUse logic.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ require (
golang.org/x/mod v0.10.0
golang.org/x/net v0.9.0
golang.org/x/oauth2 v0.7.0
golang.org/x/sync v0.1.0
golang.org/x/text v0.9.0
google.golang.org/api v0.120.0
google.golang.org/grpc v1.54.0
Expand Down Expand Up @@ -141,6 +140,7 @@ require (
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/crypto v0.8.0 // indirect
golang.org/x/exp v0.0.0-20221028150844-83b7d23a625f // indirect
golang.org/x/sync v0.1.0 // indirect
golang.org/x/sys v0.7.0 // indirect
golang.org/x/term v0.7.0 // indirect
golang.org/x/time v0.0.0-20220609170525-579cf78fd858 // indirect
Expand Down
4 changes: 4 additions & 0 deletions pkg/apis/velero/v1/labels_annotations.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,8 @@ const (
// SourceClusterK8sMajorVersionAnnotation is the label key used to identify the k8s
// minor version of the backup , i.e. 16
SourceClusterK8sMinorVersionAnnotation = "velero.io/source-cluster-k8s-minor-version"

// ResourceTimeoutAnnotation is the annotation key used to carry the global resource
// timeout value for backup to plugins.
ResourceTimeoutAnnotation = "velero.io/resource-timeout"
)
229 changes: 9 additions & 220 deletions pkg/controller/backup_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,13 @@ import (
"context"
"fmt"
"os"
"sync"
"time"

snapshotv1api "github.com/kubernetes-csi/external-snapshotter/client/v4/apis/volumesnapshot/v1"
snapshotterClientSet "github.com/kubernetes-csi/external-snapshotter/client/v4/clientset/versioned"
snapshotv1listers "github.com/kubernetes-csi/external-snapshotter/client/v4/listers/volumesnapshot/v1"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
corev1api "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -420,6 +418,7 @@ func (b *backupReconciler) prepareBackupRequest(backup *velerov1api.Backup, logg
request.Annotations[velerov1api.SourceClusterK8sGitVersionAnnotation] = b.discoveryHelper.ServerVersion().String()
request.Annotations[velerov1api.SourceClusterK8sMajorVersionAnnotation] = b.discoveryHelper.ServerVersion().Major
request.Annotations[velerov1api.SourceClusterK8sMinorVersionAnnotation] = b.discoveryHelper.ServerVersion().Minor
request.Annotations[velerov1api.ResourceTimeoutAnnotation] = b.resourceTimeout.String()

// Add namespaces with label velero.io/exclude-from-backup=true into request.Spec.ExcludedNamespaces
// Essentially, adding the label velero.io/exclude-from-backup=true to a namespace would be equivalent to setting spec.ExcludedNamespaces
Expand Down Expand Up @@ -654,9 +653,14 @@ func (b *backupReconciler) runBackup(backup *pkgbackup.Request) error {
selector := label.NewSelectorForBackup(backup.Name)
vscList := &snapshotv1api.VolumeSnapshotContentList{}

volumeSnapshots, err = b.waitVolumeSnapshotReadyToUse(context.Background(), backup.Spec.CSISnapshotTimeout.Duration, backup.Name)
if err != nil {
backupLog.Errorf("fail to wait VolumeSnapshot change to Ready: %s", err.Error())
if b.volumeSnapshotLister != nil {
tmpVSs, err := b.volumeSnapshotLister.List(label.NewSelectorForBackup(backup.Name))
if err != nil {
backupLog.Error(err)
}
for _, vs := range tmpVSs {
volumeSnapshots = append(volumeSnapshots, *vs)
}
}

backup.CSISnapshots = volumeSnapshots
Expand Down Expand Up @@ -686,11 +690,6 @@ func (b *backupReconciler) runBackup(backup *pkgbackup.Request) error {
backupLog.Error(err)
}
}

// Delete the VolumeSnapshots created in the backup, when CSI feature is enabled.
if len(volumeSnapshots) > 0 && len(volumeSnapshotContents) > 0 {
b.deleteVolumeSnapshots(volumeSnapshots, volumeSnapshotContents, backupLog, b.maxConcurrentK8SConnections)
}
}

backup.Status.VolumeSnapshotsAttempted = len(backup.VolumeSnapshots)
Expand Down Expand Up @@ -929,216 +928,6 @@ func closeAndRemoveFile(file *os.File, log logrus.FieldLogger) {
}
}

// waitVolumeSnapshotReadyToUse is used to wait VolumeSnapshot turned to ReadyToUse.
// Waiting for VolumeSnapshot ReadyToUse to true is time consuming. Try to make the process parallel by
// using goroutine here instead of waiting in CSI plugin, because it's not easy to make BackupItemAction
// parallel by now. After BackupItemAction parallel is implemented, this logic should be moved to CSI plugin
// as https://github.com/vmware-tanzu/velero-plugin-for-csi/pull/100
func (b *backupReconciler) waitVolumeSnapshotReadyToUse(ctx context.Context,
csiSnapshotTimeout time.Duration, backupName string) ([]snapshotv1api.VolumeSnapshot, error) {
eg, _ := errgroup.WithContext(ctx)
timeout := csiSnapshotTimeout
interval := 5 * time.Second
volumeSnapshots := make([]snapshotv1api.VolumeSnapshot, 0)

if b.volumeSnapshotLister != nil {
tmpVSs, err := b.volumeSnapshotLister.List(label.NewSelectorForBackup(backupName))
if err != nil {
b.logger.Error(err)
return volumeSnapshots, err
}
for _, vs := range tmpVSs {
volumeSnapshots = append(volumeSnapshots, *vs)
}
}

vsChannel := make(chan snapshotv1api.VolumeSnapshot, len(volumeSnapshots))
defer close(vsChannel)

for index := range volumeSnapshots {
volumeSnapshot := volumeSnapshots[index]
eg.Go(func() error {
err := wait.PollImmediate(interval, timeout, func() (bool, error) {
tmpVS, err := b.volumeSnapshotClient.SnapshotV1().VolumeSnapshots(volumeSnapshot.Namespace).Get(b.ctx, volumeSnapshot.Name, metav1.GetOptions{})
if err != nil {
return false, errors.Wrapf(err, fmt.Sprintf("failed to get volumesnapshot %s/%s", volumeSnapshot.Namespace, volumeSnapshot.Name))
}
if tmpVS.Status == nil || tmpVS.Status.BoundVolumeSnapshotContentName == nil || !boolptr.IsSetToTrue(tmpVS.Status.ReadyToUse) {
b.logger.Infof("Waiting for CSI driver to reconcile volumesnapshot %s/%s. Retrying in %ds", volumeSnapshot.Namespace, volumeSnapshot.Name, interval/time.Second)
return false, nil
}

b.logger.Debugf("VolumeSnapshot %s/%s turned into ReadyToUse.", volumeSnapshot.Namespace, volumeSnapshot.Name)
// Put the ReadyToUse VolumeSnapshot element in the result channel.
vsChannel <- *tmpVS
return true, nil
})
if err == wait.ErrWaitTimeout {
b.logger.Errorf("Timed out awaiting reconciliation of volumesnapshot %s/%s", volumeSnapshot.Namespace, volumeSnapshot.Name)
}
return err
})
}

err := eg.Wait()

result := make([]snapshotv1api.VolumeSnapshot, 0)
length := len(vsChannel)
for index := 0; index < length; index++ {
result = append(result, <-vsChannel)
}

return result, err
}

// deleteVolumeSnapshots delete VolumeSnapshot created during backup.
// This is used to avoid deleting namespace in cluster triggers the VolumeSnapshot deletion,
// which will cause snapshot deletion on cloud provider, then backup cannot restore the PV.
// If DeletionPolicy is Retain, just delete it. If DeletionPolicy is Delete, need to
// change DeletionPolicy to Retain before deleting VS, then change DeletionPolicy back to Delete.
func (b *backupReconciler) deleteVolumeSnapshots(volumeSnapshots []snapshotv1api.VolumeSnapshot,
volumeSnapshotContents []snapshotv1api.VolumeSnapshotContent,
logger logrus.FieldLogger, maxConcurrent int) {
var wg sync.WaitGroup
vscMap := make(map[string]snapshotv1api.VolumeSnapshotContent)
for _, vsc := range volumeSnapshotContents {
vscMap[vsc.Name] = vsc
}

ch := make(chan snapshotv1api.VolumeSnapshot, maxConcurrent)
defer func() {
if _, ok := <-ch; ok {
close(ch)
}
}()

wg.Add(maxConcurrent)
for i := 0; i < maxConcurrent; i++ {
go func() {
for {
vs, ok := <-ch
if !ok {
wg.Done()
return
}
b.deleteVolumeSnapshot(vs, vscMap, logger)
}
}()
}

for _, vs := range volumeSnapshots {
ch <- vs
}
close(ch)

wg.Wait()
}

// deleteVolumeSnapshot is called by deleteVolumeSnapshots and handles the single VolumeSnapshot
// instance.
func (b *backupReconciler) deleteVolumeSnapshot(vs snapshotv1api.VolumeSnapshot, vscMap map[string]snapshotv1api.VolumeSnapshotContent, logger logrus.FieldLogger) {
var vsc snapshotv1api.VolumeSnapshotContent
modifyVSCFlag := false
if vs.Status != nil &&
vs.Status.BoundVolumeSnapshotContentName != nil &&
len(*vs.Status.BoundVolumeSnapshotContentName) > 0 {
var found bool
if vsc, found = vscMap[*vs.Status.BoundVolumeSnapshotContentName]; !found {
logger.Errorf("Not find %s from the vscMap", *vs.Status.BoundVolumeSnapshotContentName)
return
}

if vsc.Spec.DeletionPolicy == snapshotv1api.VolumeSnapshotContentDelete {
modifyVSCFlag = true
}
} else {
logger.Errorf("VolumeSnapshot %s/%s is not ready. This is not expected.", vs.Namespace, vs.Name)
}

// Change VolumeSnapshotContent's DeletionPolicy to Retain before deleting VolumeSnapshot,
// because VolumeSnapshotContent will be deleted by deleting VolumeSnapshot, when
// DeletionPolicy is set to Delete, but Velero needs VSC for cleaning snapshot on cloud
// in backup deletion.
if modifyVSCFlag {
logger.Debugf("Patching VolumeSnapshotContent %s", vsc.Name)
original := vsc.DeepCopy()
vsc.Spec.DeletionPolicy = snapshotv1api.VolumeSnapshotContentRetain
if err := b.kbClient.Patch(context.Background(), &vsc, kbclient.MergeFrom(original)); err != nil {
logger.Errorf("fail to modify VolumeSnapshotContent %s DeletionPolicy to Retain: %s", vsc.Name, err.Error())
return
}

defer func() {
logger.Debugf("Start to recreate VolumeSnapshotContent %s", vsc.Name)
err := b.recreateVolumeSnapshotContent(vsc)
if err != nil {
logger.Errorf("fail to recreate VolumeSnapshotContent %s: %s", vsc.Name, err.Error())
}
}()
}

// Delete VolumeSnapshot from cluster
logger.Debugf("Deleting VolumeSnapshot %s/%s", vs.Namespace, vs.Name)
err := b.volumeSnapshotClient.SnapshotV1().VolumeSnapshots(vs.Namespace).Delete(context.TODO(), vs.Name, metav1.DeleteOptions{})
if err != nil {
logger.Errorf("fail to delete VolumeSnapshot %s/%s: %s", vs.Namespace, vs.Name, err.Error())
}
}

// recreateVolumeSnapshotContent will delete then re-create VolumeSnapshotContent,
// because some parameter in VolumeSnapshotContent Spec is immutable, e.g. VolumeSnapshotRef
// and Source. Source is updated to let csi-controller thinks the VSC is statically provsisioned with VS.
// Set VolumeSnapshotRef's UID to nil will let the csi-controller finds out the related VS is gone, then
// VSC can be deleted.
func (b *backupReconciler) recreateVolumeSnapshotContent(vsc snapshotv1api.VolumeSnapshotContent) error {
timeout := b.resourceTimeout
interval := 1 * time.Second

err := b.kbClient.Delete(context.TODO(), &vsc)
if err != nil {
return errors.Wrapf(err, "fail to delete VolumeSnapshotContent: %s", vsc.Name)
}

// Check VolumeSnapshotContents is already deleted, before re-creating it.
err = wait.PollImmediate(interval, timeout, func() (bool, error) {
tmpVSC := &snapshotv1api.VolumeSnapshotContent{}
err := b.kbClient.Get(context.TODO(), kbclient.ObjectKey{Name: vsc.Name}, tmpVSC)
if err != nil {
if apierrors.IsNotFound(err) {
return true, nil
}
return false, errors.Wrapf(err, fmt.Sprintf("failed to get VolumeSnapshotContent %s", vsc.Name))
}
return false, nil
})
if err != nil {
return errors.Wrapf(err, "fail to retrieve VolumeSnapshotContent %s info", vsc.Name)
}

// Make the VolumeSnapshotContent static
vsc.Spec.Source = snapshotv1api.VolumeSnapshotContentSource{
SnapshotHandle: vsc.Status.SnapshotHandle,
}
// Set VolumeSnapshotRef to none exist one, because VolumeSnapshotContent
// validation webhook will check whether name and namespace are nil.
// external-snapshotter needs Source pointing to snapshot and VolumeSnapshot
// reference's UID to nil to determine the VolumeSnapshotContent is deletable.
vsc.Spec.VolumeSnapshotRef = corev1api.ObjectReference{
APIVersion: snapshotv1api.SchemeGroupVersion.String(),
Kind: "VolumeSnapshot",
Namespace: "ns-" + string(vsc.UID),
Name: "name-" + string(vsc.UID),
}
// ResourceVersion shouldn't exist for new creation.
vsc.ResourceVersion = ""
err = b.kbClient.Create(context.TODO(), &vsc)
if err != nil {
return errors.Wrapf(err, "fail to create VolumeSnapshotContent %s", vsc.Name)
}

return nil
}

func oldAndNewFilterParametersUsedTogether(backupSpec velerov1api.BackupSpec) bool {
haveOldResourceFilterParameters := len(backupSpec.IncludedResources) > 0 ||
(len(backupSpec.ExcludedResources) > 0) ||
Expand Down

0 comments on commit 800f6cc

Please sign in to comment.