Skip to content

Commit

Permalink
Track the skipped PVC and print the summary in backup log
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel Jiang <jiangd@vmware.com>
  • Loading branch information
reasonerjt committed Jul 14, 2023
1 parent e54a8af commit 6fd2c54
Show file tree
Hide file tree
Showing 12 changed files with 500 additions and 77 deletions.
2 changes: 1 addition & 1 deletion pkg/backup/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,8 +430,8 @@ func (kb *kubernetesBackupper) BackupWithResolvers(log logrus.FieldLogger,
if err := kube.PatchResource(backupRequest.Backup, updated, kb.kbClient); err != nil {
log.WithError(errors.WithStack((err))).Warn("Got error trying to update backup's status.progress")
}
log.Infof("Summary for skipped PVs: \n%s", backupRequest.SkippedPVTracker.Summary())
backupRequest.Status.Progress = &velerov1api.BackupProgress{TotalItems: len(backupRequest.BackedUpItems), ItemsBackedUp: len(backupRequest.BackedUpItems)}

log.WithField("progress", "").Infof("Backed up a total of %d items", len(backupRequest.BackedUpItems))

return nil
Expand Down
98 changes: 90 additions & 8 deletions pkg/backup/item_backupper.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ import (
const (
mustIncludeAdditionalItemAnnotation = "backup.velero.io/must-include-additional-items"
excludeFromBackupLabel = "velero.io/exclude-from-backup"
csiBIAPluginName = "velero.io/csi-pvc-backupper"
vsphereBIAPluginName = "velero.io/vsphere-pvc-backupper"
)

// itemBackupper can back up individual items to a tar writer.
Expand All @@ -69,6 +71,7 @@ type itemBackupper struct {
discoveryHelper discovery.Helper
podVolumeBackupper podvolume.Backupper
podVolumeSnapshotTracker *pvcSnapshotTracker
skippedPVTracker *skipPVTracker
volumeSnapshotterGetter VolumeSnapshotterGetter

itemHookHandler hook.ItemHookHandler
Expand Down Expand Up @@ -125,6 +128,7 @@ func (ib *itemBackupper) backupItemInternal(logger logrus.FieldLogger, obj runti
} else {
if metadata.GetLabels()[excludeFromBackupLabel] == "true" {
log.Infof("Excluding item because it has label %s=true", excludeFromBackupLabel)
ib.trackSkippedPV(obj, groupResource, "", fmt.Sprintf("item has label %s=true", excludeFromBackupLabel), log)
return false, itemFiles, nil
}
// NOTE: we have to re-check namespace & resource includes/excludes because it's possible that
Expand Down Expand Up @@ -181,6 +185,9 @@ func (ib *itemBackupper) backupItemInternal(logger logrus.FieldLogger, obj runti
if err := ib.itemHookHandler.HandleHooks(log, groupResource, obj, ib.backupRequest.ResourceHooks, hook.PhasePre); err != nil {
return false, itemFiles, err
}
if optedOut, podName := ib.podVolumeSnapshotTracker.OptedoutByPod(namespace, name); optedOut {
ib.trackSkippedPV(obj, groupResource, podVolumeApproach, fmt.Sprintf("opted out due to annotation in pod %s", podName), log)
}

if groupResource == kuberesource.Pods {
// pod needs to be initialized for the unstructured converter
Expand All @@ -193,7 +200,8 @@ func (ib *itemBackupper) backupItemInternal(logger logrus.FieldLogger, obj runti
// Get the list of volumes to back up using pod volume backup from the pod's annotations. Remove from this list
// any volumes that use a PVC that we've already backed up (this would be in a read-write-many scenario,
// where it's been backed up from another pod), since we don't need >1 backup per PVC.
for _, volume := range podvolume.GetVolumesByPod(pod, boolptr.IsSetToTrue(ib.backupRequest.Spec.DefaultVolumesToFsBackup)) {
includedVolumes, optedOutVolumes := podvolume.GetVolumesByPod(pod, boolptr.IsSetToTrue(ib.backupRequest.Spec.DefaultVolumesToFsBackup))
for _, volume := range includedVolumes {
// track the volumes that are PVCs using the PVC snapshot tracker, so that when we backup PVCs/PVs
// via an item action in the next step, we don't snapshot PVs that will have their data backed up
// with pod volume backup.
Expand All @@ -208,6 +216,9 @@ func (ib *itemBackupper) backupItemInternal(logger logrus.FieldLogger, obj runti
}
pvbVolumes = append(pvbVolumes, volume)
}
for _, optedOutVol := range optedOutVolumes {
ib.podVolumeSnapshotTracker.Optout(pod, optedOutVol)
}
}
}

Expand Down Expand Up @@ -245,7 +256,7 @@ func (ib *itemBackupper) backupItemInternal(logger logrus.FieldLogger, obj runti
if groupResource == kuberesource.Pods && pod != nil {
// this function will return partial results, so process podVolumeBackups
// even if there are errors.
podVolumeBackups, errs := ib.backupPodVolumes(log, pod, pvbVolumes)
podVolumeBackups, podVolumePVCBackupSummary, errs := ib.backupPodVolumes(log, pod, pvbVolumes)

ib.backupRequest.PodVolumeBackups = append(ib.backupRequest.PodVolumeBackups, podVolumeBackups...)
backupErrs = append(backupErrs, errs...)
Expand All @@ -254,6 +265,23 @@ func (ib *itemBackupper) backupItemInternal(logger logrus.FieldLogger, obj runti
for _, pvb := range podVolumeBackups {
ib.podVolumeSnapshotTracker.Take(pod, pvb.Spec.Volume)
}

// Track/Untrack the volumes based on podVolumePVCBackupSummary
for _, skippedPVC := range podVolumePVCBackupSummary.Skipped {
if obj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(skippedPVC.PVC); err != nil {
backupErrs = append(backupErrs, errors.WithStack(err))
} else {
ib.trackSkippedPV(&unstructured.Unstructured{Object: obj}, kuberesource.PersistentVolumeClaims,
podVolumeApproach, skippedPVC.Reason, log)
}
}
for _, pvc := range podVolumePVCBackupSummary.Backedup {
if obj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(pvc); err != nil {
backupErrs = append(backupErrs, errors.WithStack(err))
} else {
ib.unTrackSkippedPV(&unstructured.Unstructured{Object: obj}, kuberesource.PersistentVolumeClaims, log)
}
}
}

log.Debug("Executing post hooks")
Expand Down Expand Up @@ -295,14 +323,14 @@ func getFileForArchive(namespace, name, groupResource, versionPath string, itemB

// backupPodVolumes triggers pod volume backups of the specified pod volumes, and returns a list of PodVolumeBackups
// for volumes that were successfully backed up, and a slice of any errors that were encountered.
func (ib *itemBackupper) backupPodVolumes(log logrus.FieldLogger, pod *corev1api.Pod, volumes []string) ([]*velerov1api.PodVolumeBackup, []error) {
func (ib *itemBackupper) backupPodVolumes(log logrus.FieldLogger, pod *corev1api.Pod, volumes []string) ([]*velerov1api.PodVolumeBackup, *podvolume.PVCBackupSummary, []error) {
if len(volumes) == 0 {
return nil, nil
return nil, nil, nil
}

if ib.podVolumeBackupper == nil {
log.Warn("No pod volume backupper, not backing up pod's volumes")
return nil, nil
return nil, nil, nil
}

return ib.podVolumeBackupper.BackupPodVolumes(ib.backupRequest.Backup, pod, volumes, ib.backupRequest.ResPolicies, log)
Expand All @@ -327,15 +355,23 @@ func (ib *itemBackupper) executeActions(
return nil, itemFiles, errors.WithStack(err)
} else if act != nil && act.Type == resourcepolicies.Skip {
log.Infof("Skip executing Backup Item Action: %s of resource %s: %s/%s for the matched resource policies", actionName, groupResource, namespace, name)
ib.trackSkippedPV(obj, groupResource, "", "skipped due to resource policy ", log)
continue
}

updatedItem, additionalItemIdentifiers, operationID, postOperationItems, err := action.Execute(obj, ib.backupRequest.Backup)

if err != nil {
if err != nil && err.Error() == NotCSIError.Error() {
// track the skipped PV and do not return the error
ib.trackSkippedPV(obj, groupResource, csiSnapshotApproach, "skipped due to not CSI volume", log)
} else if err != nil {
return nil, itemFiles, errors.Wrapf(err, "error executing custom action (groupResource=%s, namespace=%s, name=%s)", groupResource.String(), namespace, name)
}

if actionName == csiBIAPluginName || actionName == vsphereBIAPluginName {
// the snapshot has been taken
ib.unTrackSkippedPV(obj, groupResource, log)
}

u := &unstructured.Unstructured{Object: updatedItem.UnstructuredContent()}
mustInclude := u.GetAnnotations()[mustIncludeAdditionalItemAnnotation] == "true" || finalize
// remove the annotation as it's for communication between BIA and velero server,
Expand Down Expand Up @@ -493,6 +529,8 @@ func (ib *itemBackupper) takePVSnapshot(obj runtime.Unstructured, log logrus.Fie
return nil
} else if action != nil && action.Type == resourcepolicies.Skip {
log.Infof("skip snapshot of pv %s for the matched resource policies", pv.Name)
// at this point we are sure this object is PV therefore we'll call the tracker directly
ib.skippedPVTracker.Track(pv.Name, volumeSnapshotApproach, "matched action is 'skip' in chosen resource policies")
return nil
}
}
Expand Down Expand Up @@ -547,6 +585,7 @@ func (ib *itemBackupper) takePVSnapshot(obj runtime.Unstructured, log logrus.Fie
if volumeSnapshotter == nil {
// the PV may still has change to be snapshotted by CSI plugin's `PVCBackupItemAction` in PVC backup logic
log.Info("Persistent volume is not a supported volume type for Velero-native volumeSnapshotter snapshot, skipping.")
ib.skippedPVTracker.Track(pv.Name, volumeSnapshotApproach, "no applicable volumesnapshotter found")
return nil
}

Expand All @@ -570,6 +609,7 @@ func (ib *itemBackupper) takePVSnapshot(obj runtime.Unstructured, log logrus.Fie
snapshot := volumeSnapshot(ib.backupRequest.Backup, pv.Name, volumeID, volumeType, pvFailureDomainZone, location, iops)

var errs []error
ib.skippedPVTracker.Untrack(pv.Name)
snapshotID, err := volumeSnapshotter.CreateSnapshot(snapshot.Spec.ProviderVolumeID, snapshot.Spec.VolumeAZ, tags)
if err != nil {
errs = append(errs, errors.Wrap(err, "error taking snapshot of volume"))
Expand All @@ -585,7 +625,7 @@ func (ib *itemBackupper) takePVSnapshot(obj runtime.Unstructured, log logrus.Fie
}

func (ib *itemBackupper) getMatchAction(obj runtime.Unstructured, groupResource schema.GroupResource, backupItemActionName string) (*resourcepolicies.Action, error) {
if ib.backupRequest.ResPolicies != nil && groupResource == kuberesource.PersistentVolumeClaims && (backupItemActionName == "velero.io/csi-pvc-backupper" || backupItemActionName == "velero.io/vsphere-pvc-backupper") {
if ib.backupRequest.ResPolicies != nil && groupResource == kuberesource.PersistentVolumeClaims && (backupItemActionName == csiBIAPluginName || backupItemActionName == vsphereBIAPluginName) {
pvc := corev1api.PersistentVolumeClaim{}
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj.UnstructuredContent(), &pvc); err != nil {
return nil, errors.WithStack(err)
Expand All @@ -605,6 +645,48 @@ func (ib *itemBackupper) getMatchAction(obj runtime.Unstructured, groupResource
return nil, nil
}

// trackSkippedPV tracks the skipped PV based on the object and the given approach and reason
// this function will be called throughout the process of backup, it needs to handle any object
func (ib *itemBackupper) trackSkippedPV(obj runtime.Unstructured, groupResource schema.GroupResource, approach string, reason string, log logrus.FieldLogger) {
if name, err := getPVName(obj, groupResource); len(name) > 0 && err == nil {
ib.backupRequest.SkippedPVTracker.Track(name, approach, reason)
} else if err != nil {
log.WithError(err).Warnf("unable to get PV name, skip tracking.")
}
}

// unTrackSkippedPV removes skipped PV based on the object from the tracker
// this function will be called throughout the process of backup, it needs to handle any object
func (ib *itemBackupper) unTrackSkippedPV(obj runtime.Unstructured, groupResource schema.GroupResource, log logrus.FieldLogger) {
if name, err := getPVName(obj, groupResource); len(name) > 0 && err == nil {
ib.backupRequest.SkippedPVTracker.Untrack(name)
} else if err != nil {
log.WithError(err).Warnf("unable to get PV name, skip untracking.")
}
}

// convert the input object to PV/PVC and get the PV name
func getPVName(obj runtime.Unstructured, groupResource schema.GroupResource) (string, error) {
if groupResource == kuberesource.PersistentVolumes {
pv := new(corev1api.PersistentVolume)
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj.UnstructuredContent(), pv); err != nil {
return "", fmt.Errorf("failed to convert object to PV: %w", err)
}
return pv.Name, nil
}
if groupResource == kuberesource.PersistentVolumeClaims {
pvc := new(corev1api.PersistentVolumeClaim)
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj.UnstructuredContent(), pvc); err != nil {
return "", fmt.Errorf("failed to convert object to PVC: %w", err)
}
if pvc.Spec.VolumeName == "" {
return "", fmt.Errorf("PV name is not set in PVC")
}
return pvc.Spec.VolumeName, nil
}
return "", nil
}

func volumeSnapshot(backup *velerov1api.Backup, volumeName, volumeID, volumeType, az, location string, iops *int64) *volume.Snapshot {
return &volume.Snapshot{
Spec: volume.SnapshotSpec{
Expand Down
67 changes: 67 additions & 0 deletions pkg/backup/item_backupper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ package backup
import (
"testing"

"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/runtime/schema"

"github.com/vmware-tanzu/velero/pkg/kuberesource"

"github.com/stretchr/testify/assert"
corev1api "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -170,3 +175,65 @@ func Test_zoneFromPVNodeAffinity(t *testing.T) {
})
}
}

func TestGetPVName(t *testing.T) {
testcases := []struct {
name string
obj metav1.Object
groupResource schema.GroupResource
pvName string
hasErr bool
}{
{
name: "pv should return pv name",
obj: builder.ForPersistentVolume("test-pv").Result(),
groupResource: kuberesource.PersistentVolumes,
pvName: "test-pv",
hasErr: false,
},
{
name: "pvc without volumeName should return error",
obj: builder.ForPersistentVolumeClaim("ns", "pvc-1").Result(),
groupResource: kuberesource.PersistentVolumeClaims,
pvName: "",
hasErr: true,
},
{
name: "pvc with volumeName should return pv name",
obj: builder.ForPersistentVolumeClaim("ns", "pvc-1").VolumeName("test-pv-2").Result(),
groupResource: kuberesource.PersistentVolumeClaims,
pvName: "test-pv-2",
hasErr: false,
},
{
name: "unsupported group resource should return empty pv name",
obj: builder.ForPod("ns", "pod1").Result(),
groupResource: kuberesource.Pods,
pvName: "",
hasErr: false,
},
}
for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
o := &unstructured.Unstructured{Object: nil}
if tc.obj != nil {
data, err := runtime.DefaultUnstructuredConverter.ToUnstructured(tc.obj)
o = &unstructured.Unstructured{Object: data}
require.Nil(t, err)
}
name, err2 := getPVName(o, tc.groupResource)
assert.Equal(t, tc.pvName, name)
assert.Equal(t, tc.hasErr, err2 != nil)
})
}
}

func TestRandom(t *testing.T) {
pv := new(corev1api.PersistentVolume)
pvc := new(corev1api.PersistentVolumeClaim)
obj := builder.ForPod("ns1", "pod1").ServiceAccount("sa").Result()
o, _ := runtime.DefaultUnstructuredConverter.ToUnstructured(obj)
err1 := runtime.DefaultUnstructuredConverter.FromUnstructured(o, pv)
err2 := runtime.DefaultUnstructuredConverter.FromUnstructured(o, pvc)
t.Logf("err1: %v, err2: %v", err1, err2)
}
85 changes: 85 additions & 0 deletions pkg/backup/pv_skip_tracker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package backup

import (
"errors"
"fmt"
"sort"
"sync"
)

// skipPVTracker keeps track of persistent volumes that have been skipped and the reason why they are skipped.
type skipPVTracker struct {
*sync.RWMutex
// pvs is a map of name of the pv to the list of reasons why it is skipped.
// The reasons are stored in a map each key of the map is the backup approach, each approach can have one reason
pvs map[string]map[string]string
}

const (
podVolumeApproach = "podvolume"
csiSnapshotApproach = "csiSnapshot"
volumeSnapshotApproach = "volumeSnapshot"
anyApproach = "any"
)

// NotCSIError is the error for CSI plugin to communicate to velero that the volume is not CSI and is skipped
var NotCSIError = errors.New("not a CSI volume")

func NewSkipPVTracker() *skipPVTracker {
return &skipPVTracker{
RWMutex: &sync.RWMutex{},
pvs: make(map[string]map[string]string),
}
}

// Track tracks the pv with the specified name and the reason why it is skipped.
func (pt *skipPVTracker) Track(name, approach, reason string) {

pt.Lock()
defer pt.Unlock()
if name == "" || reason == "" {
return
}
skipReasons := pt.pvs[name]
if skipReasons == nil {
skipReasons = make(map[string]string, 0)
pt.pvs[name] = skipReasons
}
if approach == "" {
approach = anyApproach
}
skipReasons[approach] = reason
}

// Untrack removes the pvc with the specified namespace and name.
func (pt *skipPVTracker) Untrack(name string) {
pt.Lock()
defer pt.Unlock()
delete(pt.pvs, name)
}

// Summary returns the summary of the tracked pvcs.
func (pt *skipPVTracker) Summary() string {
pt.RLock()
defer pt.RUnlock()
keys := make([]string, 0, len(pt.pvs))
for key := range pt.pvs {
keys = append(keys, key)
}
sort.Strings(keys)
res := ""
for _, key := range keys {
if skipReasons := pt.pvs[key]; skipReasons != nil && len(skipReasons) > 0 {
res += fmt.Sprintf("PV: %s, skipped reasons: \n", key)
approaches := make([]string, 0, len(skipReasons))
for a := range skipReasons {
approaches = append(approaches, a)
}
sort.Strings(approaches)
for _, a := range approaches {
res += fmt.Sprintf("\tapproach: %s, reason: %s\n", a, skipReasons[a])
}
}
}
return res
}
Loading

0 comments on commit 6fd2c54

Please sign in to comment.