Skip to content

Commit

Permalink
ebs br: serial execution of volume backup gc (#5452) (#5473)
Browse files Browse the repository at this point in the history
Signed-off-by: BornChanger <dawn_catcher@126.com>
Co-authored-by: BornChanger <dawn_catcher@126.com>
  • Loading branch information
ti-chi-bot and BornChanger committed Dec 19, 2023
1 parent 29b6292 commit 2854806
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 31 deletions.
11 changes: 7 additions & 4 deletions pkg/backup/util/aws_ebs.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package util

import (
"context"
"os"
"strings"
"sync"
Expand Down Expand Up @@ -147,10 +148,11 @@ func (e *EC2Session) DeleteSnapshots(snapIDMap map[string]string) error {
lock sync.Mutex
)

eg := new(errgroup.Group)
eg, _ := errgroup.WithContext(context.Background())
workerPool := NewWorkerPool(e.concurrency, "delete snapshots")
for volID := range snapIDMap {
snapID := snapIDMap[volID]
eg.Go(func() error {
workerPool.ApplyOnErrorGroup(eg, func() error {
_, err := e.EC2.DeleteSnapshot(&ec2.DeleteSnapshotInput{
SnapshotId: &snapID,
})
Expand Down Expand Up @@ -187,7 +189,8 @@ func (e *EC2Session) DeleteSnapshots(snapIDMap map[string]string) error {

func (e *EC2Session) AddTags(resourcesTags map[string]TagMap) error {

eg := new(errgroup.Group)
eg, _ := errgroup.WithContext(context.Background())
workerPool := NewWorkerPool(e.concurrency, "add tags")
for resourceID := range resourcesTags {
id := resourceID
tagMap := resourcesTags[resourceID]
Expand All @@ -204,7 +207,7 @@ func (e *EC2Session) AddTags(resourcesTags map[string]TagMap) error {
Tags: tags,
}

eg.Go(func() error {
workerPool.ApplyOnErrorGroup(eg, func() error {
_, err := e.EC2.CreateTags(input)
if err != nil {
klog.Errorf("failed to create tags for resource id=%s, %v", id, err)
Expand Down
60 changes: 35 additions & 25 deletions pkg/fedvolumebackup/backupschedule/backup_schedule_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,18 +257,25 @@ func (bm *backupScheduleManager) backupGCByMaxReservedTime(vbs *v1alpha1.VolumeB
return
}

for _, backup := range expiredBackups {
// delete the expired backup
if err = bm.deps.FedVolumeBackupControl.DeleteVolumeBackup(backup); err != nil {
klog.Errorf("backup schedule %s/%s gc backup %s failed, err %v", ns, bsName, backup.GetName(), err)
// In order to avoid throttling, we choose to do delete volumebackup one by one.
// Delete the oldest expired backup
if len(expiredBackups) > 0 {
backup := expiredBackups[0]
if backup.DeletionTimestamp != nil {
klog.Infof("Deletion is ongoing for backup schedule %s/%s, backup %s", ns, bsName, backup.GetName())
return
}
klog.Infof("backup schedule %s/%s gc backup %s success", ns, bsName, backup.GetName())
}
} else {
if err = bm.deps.FedVolumeBackupControl.DeleteVolumeBackup(backup); err != nil {
klog.Errorf("backup schedule %s/%s gc backup %s failed, err %v", ns, bsName, backup.GetName(), err)
return
}
klog.Infof("backup schedule %s/%s gc backup %s success", ns, bsName, backup.GetName())

if len(expiredBackups) == len(backupsList) && len(expiredBackups) > 0 {
// All backups have been deleted, so the last backup information in the backupSchedule should be reset
bm.resetLastBackup(vbs)
if len(expiredBackups) == 1 && len(backupsList) == 1 {
// All backups have been deleted, so the last backup information in the backupSchedule should be reset
bm.resetLastBackup(vbs)
}
}
}
}

Expand Down Expand Up @@ -345,23 +352,26 @@ func (bm *backupScheduleManager) backupGCByMaxBackups(vbs *v1alpha1.VolumeBackup
}

sort.Sort(byCreateTimeDesc(backupsList))
var deleteCount int
for i, backup := range backupsList {
if i < int(*vbs.Spec.MaxBackups) {
continue
}
// delete the backup
if err := bm.deps.FedVolumeBackupControl.DeleteVolumeBackup(backup); err != nil {
klog.Errorf("backup schedule %s/%s gc backup %s failed, err %v", ns, bsName, backup.GetName(), err)

// In order to avoid throttling, we choose to do delete volumebackup one by one.
// Delete the oldest expired backup
if len(backupsList) > int(*vbs.Spec.MaxBackups) {
backup := backupsList[int(*vbs.Spec.MaxBackups)]
if backup.DeletionTimestamp != nil {
klog.Infof("Deletion is ongoing for backup schedule %s/%s, backup %s", ns, bsName, backup.GetName())
return
}
deleteCount += 1
klog.Infof("backup schedule %s/%s gc backup %s success", ns, bsName, backup.GetName())
}
} else {
if err = bm.deps.FedVolumeBackupControl.DeleteVolumeBackup(backup); err != nil {
klog.Errorf("backup schedule %s/%s gc backup %s failed, err %v", ns, bsName, backup.GetName(), err)
return
}
klog.Infof("backup schedule %s/%s gc backup %s success", ns, bsName, backup.GetName())

if deleteCount == len(backupsList) && deleteCount > 0 {
// All backups have been deleted, so the last backup information in the backupSchedule should be reset
bm.resetLastBackup(vbs)
if len(backupsList) == 1 {
// All backups have been deleted, so the last backup information in the backupSchedule should be reset
bm.resetLastBackup(vbs)
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,14 +123,14 @@ func TestManager(t *testing.T) {
bs.Spec.MaxBackups = pointer.Int32Ptr(5)
err = m.Sync(bs)
g.Expect(err).Should(BeNil())
helper.checkBacklist(bs.Namespace, 5)
helper.checkBacklist(bs.Namespace, 9)

t.Log("test setting MaxReservedTime")
bs.Spec.MaxBackups = nil
bs.Spec.MaxReservedTime = pointer.StringPtr("71h")
err = m.Sync(bs)
g.Expect(err).Should(BeNil())
helper.checkBacklist(bs.Namespace, 3)
helper.checkBacklist(bs.Namespace, 8)
}

func TestGetLastScheduledTime(t *testing.T) {
Expand Down

0 comments on commit 2854806

Please sign in to comment.