Skip to content

Commit

Permalink
br: ebs tags refactoring (#44381) (#44410)
Browse files Browse the repository at this point in the history
close #43934
  • Loading branch information
ti-chi-bot committed Jun 5, 2023
1 parent ca9ffcb commit e954f27
Showing 1 changed file with 11 additions and 99 deletions.
110 changes: 11 additions & 99 deletions br/pkg/aws/ebs.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,8 @@ import (
)

const (
AnnPodNameKey string = "tidb.pingcap.com/pod-name"
AnnTemporaryVolumeID string = "temporary/volume-id"
EC2K8SClusterNameKey string = "aws:eks:cluster-name"

pollingPendingSnapshotInterval = 30 * time.Second
errCodeTooManyPendingSnapshots = "PendingSnapshotLimitExceeded"

SourcePvcNameKey string = "source/pvcName"
SourceVolumeIdKey string = "source/VolumeId"
SourceTikvNameKey string = "source/TikvName"
SourceNamespaceKey string = "source/Namespace"
SourceContextKey string = "source/context"
)

type EC2Session struct {
Expand All @@ -47,14 +37,6 @@ type EC2Session struct {

type VolumeAZs map[string]string

type SnapshotTags struct {
sourcePVCName string
sourceTiKVName string
sourceNameSpace string
}

type VolumeSnapshotTags map[string]SnapshotTags

func NewEC2Session(concurrency uint, region string) (*EC2Session, error) {
// aws-sdk has builtin exponential backoff retry mechanism, see:
// https://github.com/aws/aws-sdk-go/blob/db4388e8b9b19d34dcde76c492b17607cd5651e2/aws/client/default_retryer.go#L12-L16
Expand All @@ -71,66 +53,20 @@ func NewEC2Session(concurrency uint, region string) (*EC2Session, error) {
return &EC2Session{ec2: ec2Session, concurrency: concurrency}, nil
}

func GenerateVolumeSnapshotTags(backupInfo *config.EBSBasedBRMeta, pvVolumeMap map[string]string) (VolumeSnapshotTags, error) {
vst := make(VolumeSnapshotTags)
for j := range backupInfo.KubernetesMeta.PVCs {
pvc := backupInfo.KubernetesMeta.PVCs[j]
volID := pvVolumeMap[pvc.Spec.VolumeName]
if volID == "" {
return vst, errors.Errorf("No matching pv is found with name of [%s]", pvc.Spec.VolumeName)
}
vst[volID] = SnapshotTags{
pvc.GetName(),
pvc.GetLabels()[AnnPodNameKey],
pvc.GetNamespace(),
}
}
return vst, nil
}

// CreateSnapshots is the mainly steps to control the data volume snapshots.
func (e *EC2Session) CreateSnapshots(backupInfo *config.EBSBasedBRMeta) (map[string]string, VolumeAZs, error) {
snapIDMap := make(map[string]string)
var volumeIDs []*string

var mutex sync.Mutex
eg, _ := errgroup.WithContext(context.Background())

pvVolumeMap := make(map[string]string)
for j := range backupInfo.KubernetesMeta.PVs {
pv := backupInfo.KubernetesMeta.PVs[j]
pvVolumeMap[pv.GetName()] = pv.GetAnnotations()[AnnTemporaryVolumeID]
}

vst, err := GenerateVolumeSnapshotTags(backupInfo, pvVolumeMap)
if err != nil {
return snapIDMap, nil, errors.Trace(err)
}
taggingAndFillResult := func(createOutput *ec2.CreateSnapshotsOutput, vst VolumeSnapshotTags, k8sClusterName *string) error {
fillResult := func(createOutput *ec2.CreateSnapshotsOutput) {
mutex.Lock()
defer mutex.Unlock()
for j := range createOutput.Snapshots {
snapshot := createOutput.Snapshots[j]
snapIDMap[aws.StringValue(snapshot.VolumeId)] = aws.StringValue(snapshot.SnapshotId)

createTagInput := &ec2.CreateTagsInput{
Resources: []*string{
snapshot.SnapshotId,
},
Tags: []*ec2.Tag{
ec2Tag(SourcePvcNameKey, vst[aws.StringValue(snapshot.VolumeId)].sourcePVCName),
ec2Tag(SourceVolumeIdKey, aws.StringValue(snapshot.VolumeId)),
ec2Tag(SourceTikvNameKey, vst[aws.StringValue(snapshot.VolumeId)].sourceTiKVName),
ec2Tag(SourceNamespaceKey, vst[aws.StringValue(snapshot.VolumeId)].sourceNameSpace),
ec2Tag(SourceContextKey, aws.StringValue(k8sClusterName)),
},
}
_, err := e.ec2.CreateTags(createTagInput)
if err != nil {
return errors.Trace(err)
}
}
return nil
}

workerPool := utils.NewWorkerPool(e.concurrency, "create snapshots")
Expand Down Expand Up @@ -164,17 +100,6 @@ func (e *EC2Session) CreateSnapshots(backupInfo *config.EBSBasedBRMeta) (map[str
return snapIDMap, nil, errors.Trace(err)
}

// retrieve the k8s cluster name from EC2 instance tags
var k8sClusterName *string

for j := range resp1.Reservations[0].Instances[0].Tags {
tag := resp1.Reservations[0].Instances[0].Tags[j]
if aws.StringValue(tag.Key) == EC2K8SClusterNameKey {
k8sClusterName = tag.Value
break
}
}

for j := range resp1.Reservations[0].Instances[0].BlockDeviceMappings {
device := resp1.Reservations[0].Instances[0].BlockDeviceMappings[j]
// skip root volume
Expand Down Expand Up @@ -205,16 +130,14 @@ func (e *EC2Session) CreateSnapshots(backupInfo *config.EBSBasedBRMeta) (map[str
instanceSpecification.SetInstanceId(aws.StringValue(ec2InstanceId)).SetExcludeBootVolume(true).SetExcludeDataVolumeIds(excludedVolumeIDs)

createSnapshotInput.SetInstanceSpecification(&instanceSpecification)

// Copy tags from source volume
createSnapshotInput.SetCopyTagsFromSource("volume")
resp, err := e.createSnapshotsWithRetry(context.TODO(), &createSnapshotInput)

if err != nil {
return errors.Trace(err)
}
err = taggingAndFillResult(resp, vst, k8sClusterName)
if err != nil {
return errors.Trace(err)
}

fillResult(resp)
return nil
})
}
Expand Down Expand Up @@ -381,16 +304,6 @@ func (e *EC2Session) CreateVolumes(meta *config.EBSBasedBRMeta, volumeType strin
newVolumeIDMap[oldVol.ID] = *newVol.VolumeId
}

fetchTagValue := func(tags []*ec2.Tag, key string) string {
for i := range tags {
tag := tags[i]
if aws.StringValue(tag.Key) == key {
return aws.StringValue(tag.Value)
}
}
return ""
}

workerPool := utils.NewWorkerPool(e.concurrency, "create volume")
for i := range meta.TiKVComponent.Stores {
store := meta.TiKVComponent.Stores[i]
Expand All @@ -413,6 +326,7 @@ func (e *EC2Session) CreateVolumes(meta *config.EBSBasedBRMeta, volumeType strin
tags := []*ec2.Tag{
ec2Tag("TiDBCluster-BR", "new"),
ec2Tag("ebs.csi.aws.com/cluster", "true"),
ec2Tag("snapshot/createdFromSnapshotId", oldVol.SnapshotID),
}
snapshotIds := make([]*string, 0)

Expand All @@ -425,13 +339,11 @@ func (e *EC2Session) CreateVolumes(meta *config.EBSBasedBRMeta, volumeType strin
return errors.Errorf("specified snapshot [%s] is not found", oldVol.SnapshotID)
}

snapshotTags := resp.Snapshots[0].Tags
tags = append(tags, ec2Tag("snapshot/createdFromSnapshotId", oldVol.SnapshotID),
ec2Tag("snapshot/"+SourcePvcNameKey, fetchTagValue(snapshotTags, SourcePvcNameKey)),
ec2Tag("snapshot/"+SourceVolumeIdKey, fetchTagValue(snapshotTags, SourceVolumeIdKey)),
ec2Tag("snapshot/"+SourceTikvNameKey, fetchTagValue(snapshotTags, SourceTikvNameKey)),
ec2Tag("snapshot/"+SourceNamespaceKey, fetchTagValue(snapshotTags, SourceNamespaceKey)),
ec2Tag("snapshot/"+SourceContextKey, fetchTagValue(snapshotTags, SourceContextKey)))
// Copy tags from source snapshots
for j := range resp.Snapshots[0].Tags {
tags = append(tags,
ec2Tag("snapshot/"+aws.StringValue(resp.Snapshots[0].Tags[j].Key), aws.StringValue(resp.Snapshots[0].Tags[j].Value)))
}

req.SetTagSpecifications([]*ec2.TagSpecification{
{
Expand Down

0 comments on commit e954f27

Please sign in to comment.