Skip to content

Commit

Permalink
Fix snapshot leak for backup
Browse files Browse the repository at this point in the history
Signed-off-by: Ming Qiu <ming.qiu@broadcom.com>
  • Loading branch information
qiuming-best committed Mar 29, 2024
1 parent d640cc1 commit 5fbbe14
Show file tree
Hide file tree
Showing 13 changed files with 131 additions and 41 deletions.
1 change: 1 addition & 0 deletions changelogs/unreleased/7558-qiuming-best
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix snapshot leak for backup
4 changes: 2 additions & 2 deletions pkg/cmd/cli/nodeagent/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,8 +407,8 @@ func (s *nodeAgentServer) markInProgressPVBsFailed(client ctrlclient.Client) {
}

if err := controller.UpdatePVBStatusToFailed(s.ctx, client, &pvbs.Items[i],
fmt.Sprintf("get a podvolumebackup with status %q during the server starting, mark it as %q", velerov1api.PodVolumeBackupPhaseInProgress, velerov1api.PodVolumeBackupPhaseFailed),
time.Now(), s.logger); err != nil {
fmt.Errorf("get a podvolumebackup with status %q during the server starting, mark it as %q", velerov1api.PodVolumeBackupPhaseInProgress, velerov1api.PodVolumeBackupPhaseFailed),
"", time.Now(), s.logger); err != nil {

Check warning on line 411 in pkg/cmd/cli/nodeagent/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/cmd/cli/nodeagent/server.go#L410-L411

Added lines #L410 - L411 were not covered by tests
s.logger.WithError(errors.WithStack(err)).Errorf("failed to patch podvolumebackup %q", pvb.GetName())
continue
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/controller/data_upload_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ func (r *DataUploadReconciler) OnDataUploadCompleted(ctx context.Context, namesp
}
}

func (r *DataUploadReconciler) OnDataUploadFailed(ctx context.Context, namespace string, duName string, err error) {
func (r *DataUploadReconciler) OnDataUploadFailed(ctx context.Context, namespace, duName string, err error) {
defer r.closeDataPath(ctx, duName)

log := r.logger.WithField("dataupload", duName)
Expand Down Expand Up @@ -691,6 +691,9 @@ func (r *DataUploadReconciler) updateStatusToFailed(ctx context.Context, du *vel
du.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()}
}

if dataPathError, ok := err.(datapath.DataPathError); ok {
du.Status.SnapshotID = dataPathError.GetSnapshotID()
}

Check warning on line 696 in pkg/controller/data_upload_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/data_upload_controller.go#L695-L696

Added lines #L695 - L696 were not covered by tests
du.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()}
if patchErr := r.client.Patch(ctx, du, client.MergeFrom(original)); patchErr != nil {
log.WithError(patchErr).Error("error updating DataUpload status")
Expand Down
13 changes: 8 additions & 5 deletions pkg/controller/pod_volume_backup_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ func (r *PodVolumeBackupReconciler) Reconcile(ctx context.Context, req ctrl.Requ
}

fsBackup, err := r.dataPathMgr.CreateFileSystemBR(pvb.Name, pVBRRequestor, ctx, r.Client, pvb.Namespace, callbacks, log)

if err != nil {
if err == datapath.ConcurrentLimitExceed {
return ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5}, nil
Expand Down Expand Up @@ -225,7 +226,7 @@ func (r *PodVolumeBackupReconciler) OnDataPathCompleted(ctx context.Context, nam
log.Info("PodVolumeBackup completed")
}

func (r *PodVolumeBackupReconciler) OnDataPathFailed(ctx context.Context, namespace string, pvbName string, err error) {
func (r *PodVolumeBackupReconciler) OnDataPathFailed(ctx context.Context, namespace, pvbName string, err error) {

Check warning on line 229 in pkg/controller/pod_volume_backup_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/pod_volume_backup_controller.go#L229

Added line #L229 was not covered by tests
defer r.closeDataPath(ctx, pvbName)

log := r.logger.WithField("pvb", pvbName)
Expand Down Expand Up @@ -348,17 +349,19 @@ func (r *PodVolumeBackupReconciler) closeDataPath(ctx context.Context, pvbName s

func (r *PodVolumeBackupReconciler) errorOut(ctx context.Context, pvb *velerov1api.PodVolumeBackup, err error, msg string, log logrus.FieldLogger) (ctrl.Result, error) {
r.closeDataPath(ctx, pvb.Name)
_ = UpdatePVBStatusToFailed(ctx, r.Client, pvb, errors.WithMessage(err, msg).Error(), r.clock.Now(), log)
_ = UpdatePVBStatusToFailed(ctx, r.Client, pvb, err, msg, r.clock.Now(), log)

Check warning on line 352 in pkg/controller/pod_volume_backup_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/pod_volume_backup_controller.go#L352

Added line #L352 was not covered by tests

return ctrl.Result{}, err
}

func UpdatePVBStatusToFailed(ctx context.Context, c client.Client, pvb *velerov1api.PodVolumeBackup, errString string, time time.Time, log logrus.FieldLogger) error {
func UpdatePVBStatusToFailed(ctx context.Context, c client.Client, pvb *velerov1api.PodVolumeBackup, errOut error, msg string, time time.Time, log logrus.FieldLogger) error {

Check warning on line 357 in pkg/controller/pod_volume_backup_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/pod_volume_backup_controller.go#L357

Added line #L357 was not covered by tests
original := pvb.DeepCopy()
pvb.Status.Phase = velerov1api.PodVolumeBackupPhaseFailed
pvb.Status.Message = errString
pvb.Status.CompletionTimestamp = &metav1.Time{Time: time}

if dataPathError, ok := errOut.(datapath.DataPathError); ok {
pvb.Status.SnapshotID = dataPathError.GetSnapshotID()
}
pvb.Status.Message = errors.WithMessage(errOut, msg).Error()

Check warning on line 364 in pkg/controller/pod_volume_backup_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/pod_volume_backup_controller.go#L361-L364

Added lines #L361 - L364 were not covered by tests
err := c.Patch(ctx, pvb, client.MergeFrom(original))
if err != nil {
log.WithError(err).Error("error updating PodVolumeBackup status")
Expand Down
33 changes: 33 additions & 0 deletions pkg/datapath/error.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
Copyright The Velero Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package datapath

// DataPathError represents an error that occurred during a backup or restore operation
type DataPathError struct {
snapshotID string
err error
}

// Error implements error.
func (e DataPathError) Error() string {
return e.err.Error()
}

// GetSnapshotID returns the snapshot ID for the error.
func (e DataPathError) GetSnapshotID() string {
return e.snapshotID
}
45 changes: 45 additions & 0 deletions pkg/datapath/error_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
Copyright The Velero Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package datapath

import (
"errors"
"testing"
)

func TestGetSnapshotID(t *testing.T) {
// Create a DataPathError instance for testing
err := DataPathError{snapshotID: "123", err: errors.New("example error")}
// Call the GetSnapshotID method to retrieve the snapshot ID
snapshotID := err.GetSnapshotID()
// Check if the retrieved snapshot ID matches the expected value
if snapshotID != "123" {
t.Errorf("GetSnapshotID() returned unexpected snapshot ID: got %s, want %s", snapshotID, "123")
}
}

func TestError(t *testing.T) {
// Create a DataPathError instance for testing
err := DataPathError{snapshotID: "123", err: errors.New("example error")}
// Call the Error method to retrieve the error message
errMsg := err.Error()
// Check if the retrieved error message matches the expected value
expectedErrMsg := "example error"
if errMsg != expectedErrMsg {
t.Errorf("Error() returned unexpected error message: got %s, want %s", errMsg, expectedErrMsg)
}
}
12 changes: 10 additions & 2 deletions pkg/datapath/file_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,11 @@ func (fs *fileSystemBR) StartBackup(source AccessPoint, realSource string, paren
if err == provider.ErrorCanceled {
fs.callbacks.OnCancelled(context.Background(), fs.namespace, fs.jobName)
} else if err != nil {
fs.callbacks.OnFailed(context.Background(), fs.namespace, fs.jobName, err)
dataPathErr := DataPathError{
snapshotID: snapshotID,
err: err,
}
fs.callbacks.OnFailed(context.Background(), fs.namespace, fs.jobName, dataPathErr)
} else {
fs.callbacks.OnCompleted(context.Background(), fs.namespace, fs.jobName, Result{Backup: BackupResult{snapshotID, emptySnapshot, source}})
}
Expand All @@ -161,7 +165,11 @@ func (fs *fileSystemBR) StartRestore(snapshotID string, target AccessPoint, uplo
if err == provider.ErrorCanceled {
fs.callbacks.OnCancelled(context.Background(), fs.namespace, fs.jobName)
} else if err != nil {
fs.callbacks.OnFailed(context.Background(), fs.namespace, fs.jobName, err)
dataPathErr := DataPathError{
snapshotID: snapshotID,
err: err,
}
fs.callbacks.OnFailed(context.Background(), fs.namespace, fs.jobName, dataPathErr)
} else {
fs.callbacks.OnCompleted(context.Background(), fs.namespace, fs.jobName, Result{Restore: RestoreResult{Target: target}})
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/datapath/file_system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestAsyncBackup(t *testing.T) {
var asyncErr error
var asyncResult Result
finish := make(chan struct{})

var failErr = errors.New("fake-fail-error")
tests := []struct {
name string
uploaderProv provider.Provider
Expand All @@ -49,12 +49,12 @@ func TestAsyncBackup(t *testing.T) {
OnCompleted: nil,
OnCancelled: nil,
OnFailed: func(ctx context.Context, namespace string, job string, err error) {
asyncErr = err
asyncErr = failErr
asyncResult = Result{}
finish <- struct{}{}
},
},
err: errors.New("fake-error"),
err: failErr,
},
{
name: "async backup cancel",
Expand Down Expand Up @@ -117,7 +117,7 @@ func TestAsyncRestore(t *testing.T) {
var asyncErr error
var asyncResult Result
finish := make(chan struct{})

var failErr = errors.New("fake-fail-error")
tests := []struct {
name string
uploaderProv provider.Provider
Expand All @@ -133,12 +133,12 @@ func TestAsyncRestore(t *testing.T) {
OnCompleted: nil,
OnCancelled: nil,
OnFailed: func(ctx context.Context, namespace string, job string, err error) {
asyncErr = err
asyncErr = failErr
asyncResult = Result{}
finish <- struct{}{}
},
},
err: errors.New("fake-error"),
err: failErr,
},
{
name: "async restore cancel",
Expand Down
2 changes: 1 addition & 1 deletion pkg/repository/maintenance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func TestDeleteOldMaintenanceJobs(t *testing.T) {
assert.NoError(t, err)

// We expect the number of jobs to be equal to 'keep'
assert.Equal(t, keep, len(jobList.Items))
assert.Len(t, jobList.Items, keep)

// We expect that the oldest jobs were deleted
// Job3 should not be present in the remaining list
Expand Down
14 changes: 8 additions & 6 deletions pkg/uploader/kopia/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ var filesystemEntryFunc = snapshotfs.FilesystemEntryFromIDWithPath
var restoreEntryFunc = restore.Entry

const UploaderConfigMultipartKey = "uploader-multipart"
const MaxErrorReported = 10

// SnapshotUploader which mainly used for UT test that could overwrite Upload interface
type SnapshotUploader interface {
Expand Down Expand Up @@ -182,17 +183,14 @@ func Backup(ctx context.Context, fsUploader SnapshotUploader, repoWriter repo.Re
}

kopiaCtx := kopia.SetupKopiaLog(ctx, log)
snapID, snapshotSize, err := SnapshotSource(kopiaCtx, repoWriter, fsUploader, sourceInfo, sourceEntry, forceFull, parentSnapshot, tags, uploaderCfg, log, "Kopia Uploader")
if err != nil {
return nil, false, err
}

snapID, snapshotSize, err := SnapshotSource(kopiaCtx, repoWriter, fsUploader, sourceInfo, sourceEntry, forceFull, parentSnapshot, tags, uploaderCfg, log, "Kopia Uploader")
snapshotInfo := &uploader.SnapshotInfo{
ID: snapID,
Size: snapshotSize,
}

return snapshotInfo, false, nil
return snapshotInfo, false, err
}

func getLocalFSEntry(path0 string) (fs.Entry, error) {
Expand Down Expand Up @@ -307,6 +305,10 @@ func reportSnapshotStatus(manifest *snapshot.Manifest, policyTree *policy.Tree)
var errs []string
if ds := manifest.RootEntry.DirSummary; ds != nil {
for _, ent := range ds.FailedEntries {
if len(errs) > MaxErrorReported {
errs = append(errs, "too many errors, ignored...")
break

Check warning on line 310 in pkg/uploader/kopia/snapshot.go

View check run for this annotation

Codecov / codecov/patch

pkg/uploader/kopia/snapshot.go#L309-L310

Added lines #L309 - L310 were not covered by tests
}
policy := policyTree.EffectivePolicy()
if !(policy != nil && bool(*policy.ErrorHandlingPolicy.IgnoreUnknownTypes) && strings.Contains(ent.Error, fs.ErrUnknown.Error())) {
errs = append(errs, fmt.Sprintf("Error when processing %v: %v", ent.EntryPath, ent.Error))
Expand All @@ -315,7 +317,7 @@ func reportSnapshotStatus(manifest *snapshot.Manifest, policyTree *policy.Tree)
}

if len(errs) != 0 {
return "", 0, errors.New(strings.Join(errs, "\n"))
return string(manifestID), snapSize, errors.New(strings.Join(errs, "\n"))
}

return string(manifestID), snapSize, nil
Expand Down
4 changes: 2 additions & 2 deletions pkg/uploader/kopia/snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,8 +227,8 @@ func TestReportSnapshotStatus(t *testing.T) {
},
{
shouldError: true,
expectedResult: "",
expectedSize: 0,
expectedResult: "sample-manifest-id",
expectedSize: 1024,
directorySummary: &fs.DirectorySummary{
FailedEntries: []*fs.EntryWithError{
{
Expand Down
20 changes: 11 additions & 9 deletions pkg/uploader/provider/kopia.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ func (kp *kopiaProvider) RunBackup(
progress.Updater = updater
progress.Log = log
kpUploader.Progress = progress
kpUploader.FailFast = true
quit := make(chan struct{})
log.Info("Starting backup")
go kp.CheckContext(ctx, quit, nil, kpUploader)
Expand All @@ -167,19 +168,20 @@ func (kp *kopiaProvider) RunBackup(
uploaderCfg[kopia.UploaderConfigMultipartKey] = "true"
}

snapshotInfo, isSnapshotEmpty, err := BackupFunc(ctx, kpUploader, repoWriter, path, realSource, forceFull, parentSnapshot, volMode, uploaderCfg, tags, log)
snapshotInfo, _, err := BackupFunc(ctx, kpUploader, repoWriter, path, realSource, forceFull, parentSnapshot, volMode, uploaderCfg, tags, log)
if err != nil {
snapshotID := ""
if snapshotInfo != nil {
snapshotID = snapshotInfo.ID
} else {
log.Error("Kopia backup failed with get empty snapshot ID")
}

Check warning on line 178 in pkg/uploader/provider/kopia.go

View check run for this annotation

Codecov / codecov/patch

pkg/uploader/provider/kopia.go#L177-L178

Added lines #L177 - L178 were not covered by tests

if kpUploader.IsCanceled() {
log.Error("Kopia backup is canceled")
return "", false, ErrorCanceled
} else {
return "", false, errors.Wrapf(err, "Failed to run kopia backup")
return snapshotID, false, ErrorCanceled

Check warning on line 182 in pkg/uploader/provider/kopia.go

View check run for this annotation

Codecov / codecov/patch

pkg/uploader/provider/kopia.go#L182

Added line #L182 was not covered by tests
}
} else if isSnapshotEmpty {
log.Debugf("Kopia backup got empty dir with path %s", path)
return "", true, nil
} else if snapshotInfo == nil {
return "", false, fmt.Errorf("failed to get kopia backup snapshot info for path %v", path)
return snapshotID, false, errors.Wrapf(err, "Failed to run kopia backup")
}

// which ensure that the statistic data of TotalBytes equal to BytesDone when finished
Expand Down
7 changes: 0 additions & 7 deletions pkg/uploader/provider/kopia_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,6 @@ func TestRunBackup(t *testing.T) {
},
notError: false,
},
{
name: "got empty snapshot",
hookBackupFunc: func(ctx context.Context, fsUploader kopia.SnapshotUploader, repoWriter repo.RepositoryWriter, sourcePath string, realSource string, forceFull bool, parentSnapshot string, volMode uploader.PersistentVolumeMode, uploaderCfg map[string]string, tags map[string]string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) {
return nil, true, errors.New("snapshot is empty")
},
notError: false,
},
{
name: "success to backup block mode volume",
hookBackupFunc: func(ctx context.Context, fsUploader kopia.SnapshotUploader, repoWriter repo.RepositoryWriter, sourcePath string, realSource string, forceFull bool, parentSnapshot string, volMode uploader.PersistentVolumeMode, uploaderCfg map[string]string, tags map[string]string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) {
Expand Down

0 comments on commit 5fbbe14

Please sign in to comment.