/
backup_size.go
415 lines (352 loc) · 12.6 KB
/
backup_size.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package util
import (
"context"
"encoding/json"
"fmt"
"sort"
"strings"
"sync/atomic"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/service/ebs"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/dustin/go-humanize"
"github.com/pingcap/errors"
"github.com/pingcap/tidb-operator/cmd/backup-manager/app/constants"
"github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1"
"github.com/pingcap/tidb-operator/pkg/backup/util"
"golang.org/x/sync/errgroup"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/retry"
"k8s.io/klog/v2"
)
// TODO#1: there shall be a abstract interface for diff cloud platform. e.g. GetBackupSize
// it may better to do this work after gcp support volume-snapshot
// for EBS Direct API permissions, refer to https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ebsapi-permissions.html
// for EBS Direct API, refer to https://docs.aws.amazon.com/sdk-for-go/api/service/ebs/
// interface CalcVolSnapBackupSize called by backup and backup clean.
const (
// DescribeSnapMaxReturnResult can be between 5 and 1,000; if MaxResults is given a value larger than 1,000, only 1,000 results are returned.
DescribeSnapMaxReturnResult = 1000
// ListSnapMaxReturnResult can be between 100 and 10,000, and charge ~0.6$/1 million request
ListSnapMaxReturnResult = 10000
// EbsApiConcurrency can be between 1 and 50 due to aws service quota
EbsApiConcurrency = 40
CalculateFullSize = "full"
CalculateIncremental = "incremental"
CalculateAll = "all"
)
// CalcVolSnapBackupSize get snapshots from backup meta and then calc the backup size of snapshots.
func CalcVolSnapBackupSize(ctx context.Context, provider v1alpha1.StorageProvider, level string) (fullBackupSize int64, incrementalBackupSize int64, err error) {
start := time.Now()
// retrieves all snapshots from backup meta file
volSnapshots, err := getSnapshotsFromBackupmeta(ctx, provider)
if err != nil {
return 0, 0, err
}
if err != nil {
return 0, 0, err
}
fullBackupSize, incrementalBackupSize, err = calcBackupSize(ctx, volSnapshots, level)
if err != nil {
return 0, 0, err
}
elapsed := time.Since(start)
klog.Infof("calculate volume-snapshot backup size takes %v", elapsed)
return
}
// getSnapshotsFromBackupmeta read all snapshots from backupmeta
// return volume - snapshot map
func getSnapshotsFromBackupmeta(ctx context.Context, provider v1alpha1.StorageProvider) (map[string]string, error) {
volumeIDMap := make(map[string]string)
// read backup meta
s, err := util.NewStorageBackend(provider, &util.StorageCredential{})
if err != nil {
return volumeIDMap, err
}
defer s.Close()
var contents []byte
// use exponential backoff, every retry duration is duration * factor ^ (used_step - 1)
backoff := wait.Backoff{
Duration: time.Second,
Steps: 6,
Factor: 2.0,
Cap: time.Minute,
}
readBackupMeta := func() error {
exist, err := s.Exists(ctx, constants.MetaFile)
if err != nil {
return err
}
if !exist {
return fmt.Errorf("%s not exist", constants.MetaFile)
}
contents, err = s.ReadAll(ctx, constants.MetaFile)
if err != nil {
return err
}
return nil
}
isRetry := func(err error) bool {
return !strings.Contains(err.Error(), "not exist")
}
err = retry.OnError(backoff, isRetry, readBackupMeta)
if err != nil {
return nil, errors.Annotatef(err, "read backup meta from bucket %s and prefix %s", s.GetBucket(), s.GetPrefix())
}
metaInfo := &util.EBSBasedBRMeta{}
if err = json.Unmarshal(contents, metaInfo); err != nil {
return volumeIDMap, errors.Annotatef(err, "read backup meta from bucket %s and prefix %s", s.GetBucket(), s.GetPrefix())
}
// get volume-snapshot map
for i := range metaInfo.TiKVComponent.Stores {
store := metaInfo.TiKVComponent.Stores[i]
for j := range store.Volumes {
vol := store.Volumes[j]
volumeIDMap[vol.ID] = vol.SnapshotID
}
}
return volumeIDMap, nil
}
// calcBackupSize get a volume-snapshots backup size
func calcBackupSize(ctx context.Context, volumes map[string]string, level string) (fullBackupSize int64, incrementalBackupSize int64, err error) {
var apiReqCount, incrementalApiReqCount uint64
workerPool := util.NewWorkerPool(EbsApiConcurrency, "list snapshot size")
eg, _ := errgroup.WithContext(ctx)
for vid, sid := range volumes {
snapshotId := sid
volumeId := vid
// sort snapshots by timestamp
workerPool.ApplyOnErrorGroup(eg, func() error {
var snapSize, apiReq uint64
if level == CalculateAll || level == CalculateFullSize {
snapSize, apiReq, err = calculateSnapshotSize(volumeId, snapshotId)
if err != nil {
return err
}
atomic.AddInt64(&fullBackupSize, int64(snapSize))
atomic.AddUint64(&apiReqCount, apiReq)
}
if level == CalculateAll || level == CalculateIncremental {
volSnapshots, err := getVolSnapshots(volumeId)
if err != nil {
return err
}
prevSnapshotId, existed := getPrevSnapshotId(snapshotId, volSnapshots)
if !existed {
// if there is no previous snapshot, means it's the first snapshot, uses its full size as incremental size
atomic.AddInt64(&incrementalBackupSize, int64(snapSize))
return nil
}
klog.Infof("get previous snapshot %s of snapshot %s, volume %s", prevSnapshotId, snapshotId, volumeId)
incrementalSnapSize, incrementalApiReq, err := calculateChangedBlocksSize(volumeId, prevSnapshotId, snapshotId)
if err != nil {
return err
}
atomic.AddInt64(&incrementalBackupSize, int64(incrementalSnapSize))
atomic.AddUint64(&incrementalApiReqCount, incrementalApiReq)
}
return nil
})
}
if err := eg.Wait(); err != nil {
klog.Errorf("failed to get snapshots size %d, number of api request %d", fullBackupSize, apiReqCount)
return 0, 0, err
}
// currently, we do not count api request fees, since it is very few of cost, however, we print it in log in case "very few" is not correct
klog.Infof("backup size %d bytes, number of api request %d, incremental backup size %d bytes, numbers of incremental size's api request %d",
fullBackupSize, apiReqCount, incrementalBackupSize, incrementalApiReqCount)
return
}
// calculateSnapshotSize calculate size of a snapshot in bytes by listing its blocks.
func calculateSnapshotSize(volumeId, snapshotId string) (uint64, uint64, error) {
var snapshotSize uint64
var numApiReq uint64
start := time.Now()
klog.Infof("start to calculate size for snapshot %s, volume %s",
snapshotId, volumeId)
ebsSession, err := util.NewEBSSession(util.CloudAPIConcurrency)
if err != nil {
klog.Errorf("new a ebs session failure.")
return 0, numApiReq, err
}
var nextToken *string
for {
// Each retry interval is around 1 second, and no more than 60 times retry (~1 minute)
backoff := wait.Backoff{
Duration: time.Second,
Steps: 60,
Factor: 1.0,
Jitter: 0.1,
}
isAllListed := false
listBlocks := func() error {
resp, err := ebsSession.EBS.ListSnapshotBlocks(&ebs.ListSnapshotBlocksInput{
SnapshotId: aws.String(snapshotId),
MaxResults: aws.Int64(ListSnapMaxReturnResult),
NextToken: nextToken,
})
numApiReq += 1
if err != nil {
return err
}
if resp.BlockSize != nil {
snapshotSize += uint64(len(resp.Blocks)) * uint64(*resp.BlockSize)
}
// check if there is more to retrieve
if resp.NextToken == nil {
isAllListed = true
}
nextToken = resp.NextToken
return nil
}
isRetry := func(err error) bool {
return request.IsErrorThrottle(err)
}
err = retry.OnError(backoff, isRetry, listBlocks)
if err != nil {
return 0, numApiReq, errors.Annotatef(err, "ListSnapshotBlocks() failed against snapshot id %s, volume id %s", snapshotId, volumeId)
}
if isAllListed {
break
}
}
elapsed := time.Since(start)
klog.Infof("full snapshot size %s, num of ListSnapshotBlocks request %d, snapshot id %s, volume id %s, takes %v",
humanize.Bytes(snapshotSize), numApiReq, snapshotId, volumeId, elapsed)
return snapshotSize, numApiReq, nil
}
// calculateChangedBlocksSize calculates changed blocks total size in bytes between two snapshots with common ancestry.
func calculateChangedBlocksSize(volumeId, preSnapshotId, snapshotId string) (uint64, uint64, error) {
var numBlocks int
var snapshotSize uint64
var numApiReq uint64
start := time.Now()
klog.Infof("start to calculate incremental snapshot size for %s, base on prev snapshot %s, volume id %s",
snapshotId, preSnapshotId, volumeId)
ebsSession, err := util.NewEBSSession(util.CloudAPIConcurrency)
if err != nil {
klog.Errorf("new a ebs session failure.")
return 0, numApiReq, err
}
var nextToken *string
for {
// Each retry interval is around 1 second, and no more than 60 times retry (~1 minute)
backoff := wait.Backoff{
Duration: time.Second,
Steps: 600,
Factor: 1.0,
Jitter: 0.5,
}
isAllChangeListed := false
listChangeBlocks := func() error {
resp, err := ebsSession.EBS.ListChangedBlocks(&ebs.ListChangedBlocksInput{
FirstSnapshotId: aws.String(preSnapshotId),
MaxResults: aws.Int64(ListSnapMaxReturnResult),
SecondSnapshotId: aws.String(snapshotId),
NextToken: nextToken,
})
numApiReq += 1
if err != nil {
return err
}
numBlocks += len(resp.ChangedBlocks)
// retrieve only changed block and blocks only existed in current snapshot (new add blocks)
for _, block := range resp.ChangedBlocks {
if block.SecondBlockToken != nil && resp.BlockSize != nil {
snapshotSize += uint64(*resp.BlockSize)
}
}
// check if there is more to retrieve
if resp.NextToken == nil {
isAllChangeListed = true
}
nextToken = resp.NextToken
return nil
}
isRetry := func(err error) bool {
return request.IsErrorThrottle(err)
}
err = retry.OnError(backoff, isRetry, listChangeBlocks)
if err != nil {
return 0, numApiReq, errors.Annotatef(err, "ListChangedBlocks() failed against volume id %s, preSnapshot id %s, snapshot id %s", volumeId, preSnapshotId, snapshotId)
}
if isAllChangeListed {
break
}
}
elapsed := time.Since(start)
klog.Infof("incremental snapshot size %s, num of api ListChangedBlocks request %d, snapshot id %s, volume id %s, takes %v",
humanize.Bytes(snapshotSize), numApiReq, snapshotId, volumeId, elapsed)
return snapshotSize, numApiReq, nil
}
// getBackupVolSnapshots get a volume-snapshots map contains map[volumeId]{snapshot1, snapshot2, snapshot3}
func getVolSnapshots(volumeId string) ([]*ec2.Snapshot, error) {
// read all snapshots from aws
ec2Session, err := util.NewEC2Session(util.CloudAPIConcurrency)
if err != nil {
klog.Errorf("new a ec2 session failure.")
return nil, err
}
filters := []*ec2.Filter{{Name: aws.String("volume-id"), Values: []*string{aws.String(volumeId)}}}
// describe snapshot is heavy operator, try to call only once
// api has limit with max 1000 snapshots
// search with filter volume id the backupmeta contains
var nextToken *string
var snapshots []*ec2.Snapshot
for {
resp, err := ec2Session.EC2.DescribeSnapshots(&ec2.DescribeSnapshotsInput{
OwnerIds: aws.StringSlice([]string{"self"}),
MaxResults: aws.Int64(DescribeSnapMaxReturnResult),
Filters: filters,
NextToken: nextToken,
})
if err != nil {
return nil, err
}
for _, s := range resp.Snapshots {
if *s.State == ec2.SnapshotStateCompleted {
klog.Infof("get the snapshot %s created for volume %s", *s.SnapshotId, *s.VolumeId)
snapshots = append(snapshots, s)
} else { // skip ongoing snapshots
klog.Infof("the snapshot %s is creating... skip it, volume %s", *s.SnapshotId, *s.VolumeId)
continue
}
}
// check if there's more to retrieve
if resp.NextToken == nil {
break
}
klog.Infof("the total number of snapshot is %d", len(resp.Snapshots))
nextToken = resp.NextToken
}
sort.Slice(snapshots, func(i, j int) bool {
return snapshots[i].StartTime.Before(*snapshots[j].StartTime)
})
return snapshots, nil
}
func getPrevSnapshotId(snapshotId string, sortedVolSnapshots []*ec2.Snapshot) (string, bool) {
for i, snapshot := range sortedVolSnapshots {
if snapshotId == *snapshot.SnapshotId {
if i == 0 {
return "", false
} else {
return *sortedVolSnapshots[i-1].SnapshotId, true
}
}
}
return "", false
}