Skip to content

Commit

Permalink
br: add waitgroup for delete type files (#43751)
Browse files Browse the repository at this point in the history
close #43739
  • Loading branch information
Leavrth committed May 15, 2023
1 parent 5c368a3 commit e9a95da
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 2 deletions.
11 changes: 9 additions & 2 deletions br/pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2276,6 +2276,7 @@ func ApplyKVFilesWithBatchMethod(
batchCount int,
batchSize uint64,
applyFunc func(files []*LogDataFileInfo, kvCount int64, size uint64),
applyWg *sync.WaitGroup,
) error {
var (
tableMapFiles = make(map[int64]*FilesInTable)
Expand Down Expand Up @@ -2354,6 +2355,7 @@ func ApplyKVFilesWithBatchMethod(
}
}

applyWg.Wait()
for _, fwt := range tableMapFiles {
for _, fs := range fwt.regionMapFiles {
for _, d := range fs.deleteFiles {
Expand Down Expand Up @@ -2384,6 +2386,7 @@ func ApplyKVFilesWithSingelMethod(
ctx context.Context,
files LogIter,
applyFunc func(file []*LogDataFileInfo, kvCount int64, size uint64),
applyWg *sync.WaitGroup,
) error {
deleteKVFiles := make([]*LogDataFileInfo, 0)

Expand All @@ -2400,6 +2403,7 @@ func ApplyKVFilesWithSingelMethod(
applyFunc([]*LogDataFileInfo{f}, f.GetNumberOfEntries(), f.GetLength())
}

applyWg.Wait()
log.Info("restore delete files", zap.Int("count", len(deleteKVFiles)))
for _, file := range deleteKVFiles {
f := file
Expand Down Expand Up @@ -2441,6 +2445,7 @@ func (rc *Client) RestoreKVFiles(
ctx = opentracing.ContextWithSpan(ctx, span1)
}

var applyWg sync.WaitGroup
eg, ectx := errgroup.WithContext(ctx)
applyFunc := func(files []*LogDataFileInfo, kvCount int64, size uint64) {
if len(files) == 0 {
Expand All @@ -2459,9 +2464,11 @@ func (rc *Client) RestoreKVFiles(
log.Debug("skip file due to table id not matched", zap.Int64("table-id", files[0].TableId))
skipFile += len(files)
} else {
applyWg.Add(1)
downstreamId := idrules[files[0].TableId]
rc.workerPool.ApplyOnErrorGroup(eg, func() (err error) {
fileStart := time.Now()
defer applyWg.Done()
defer func() {
onProgress(int64(len(files)))
updateStats(uint64(kvCount), size)
Expand Down Expand Up @@ -2494,9 +2501,9 @@ func (rc *Client) RestoreKVFiles(

rc.workerPool.ApplyOnErrorGroup(eg, func() error {
if supportBatch {
err = ApplyKVFilesWithBatchMethod(ectx, logIter, int(pitrBatchCount), uint64(pitrBatchSize), applyFunc)
err = ApplyKVFilesWithBatchMethod(ectx, logIter, int(pitrBatchCount), uint64(pitrBatchSize), applyFunc, &applyWg)
} else {
err = ApplyKVFilesWithSingelMethod(ectx, logIter, applyFunc)
err = ApplyKVFilesWithSingelMethod(ectx, logIter, applyFunc, &applyWg)
}
return errors.Trace(err)
})
Expand Down
96 changes: 96 additions & 0 deletions br/pkg/restore/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1169,6 +1169,7 @@ func TestApplyKVFilesWithSingelMethod(t *testing.T) {
Type: backuppb.FileType_Put,
},
}
var applyWg sync.WaitGroup
applyFunc := func(
files []*restore.LogDataFileInfo,
kvCount int64,
Expand All @@ -1185,6 +1186,7 @@ func TestApplyKVFilesWithSingelMethod(t *testing.T) {
context.TODO(),
toLogDataFileInfoIter(iter.FromSlice(ds)),
applyFunc,
&applyWg,
)

require.Equal(t, totalKVCount, int64(15))
Expand Down Expand Up @@ -1239,6 +1241,7 @@ func TestApplyKVFilesWithBatchMethod1(t *testing.T) {
RegionId: 1,
},
}
var applyWg sync.WaitGroup
applyFunc := func(
files []*restore.LogDataFileInfo,
kvCount int64,
Expand All @@ -1259,6 +1262,7 @@ func TestApplyKVFilesWithBatchMethod1(t *testing.T) {
batchCount,
batchSize,
applyFunc,
&applyWg,
)

require.Equal(t, runCount, 3)
Expand Down Expand Up @@ -1327,6 +1331,7 @@ func TestApplyKVFilesWithBatchMethod2(t *testing.T) {
RegionId: 1,
},
}
var applyWg sync.WaitGroup
applyFunc := func(
files []*restore.LogDataFileInfo,
kvCount int64,
Expand All @@ -1347,6 +1352,7 @@ func TestApplyKVFilesWithBatchMethod2(t *testing.T) {
batchCount,
batchSize,
applyFunc,
&applyWg,
)

require.Equal(t, runCount, 4)
Expand Down Expand Up @@ -1409,6 +1415,7 @@ func TestApplyKVFilesWithBatchMethod3(t *testing.T) {
RegionId: 3,
},
}
var applyWg sync.WaitGroup
applyFunc := func(
files []*restore.LogDataFileInfo,
kvCount int64,
Expand All @@ -1429,6 +1436,7 @@ func TestApplyKVFilesWithBatchMethod3(t *testing.T) {
batchCount,
batchSize,
applyFunc,
&applyWg,
)

require.Equal(t, totalKVCount, int64(25))
Expand Down Expand Up @@ -1489,6 +1497,7 @@ func TestApplyKVFilesWithBatchMethod4(t *testing.T) {
TableId: 2,
},
}
var applyWg sync.WaitGroup
applyFunc := func(
files []*restore.LogDataFileInfo,
kvCount int64,
Expand All @@ -1509,6 +1518,7 @@ func TestApplyKVFilesWithBatchMethod4(t *testing.T) {
batchCount,
batchSize,
applyFunc,
&applyWg,
)

require.Equal(t, runCount, 4)
Expand All @@ -1524,6 +1534,92 @@ func TestApplyKVFilesWithBatchMethod4(t *testing.T) {
)
}

func TestApplyKVFilesWithBatchMethod5(t *testing.T) {
var lock sync.Mutex
types := make([]backuppb.FileType, 0)
ds := []*backuppb.DataFileInfo{
{
Path: "log1",
NumberOfEntries: 5,
Length: 2000,
Cf: stream.WriteCF,
Type: backuppb.FileType_Delete,
TableId: 1,
}, {
Path: "log2",
NumberOfEntries: 5,
Length: 100,
Cf: stream.WriteCF,
Type: backuppb.FileType_Put,
TableId: 1,
}, {
Path: "log3",
NumberOfEntries: 5,
Length: 100,
Cf: stream.WriteCF,
Type: backuppb.FileType_Put,
TableId: 2,
}, {
Path: "log4",
NumberOfEntries: 5,
Length: 100,
Cf: stream.WriteCF,
Type: backuppb.FileType_Put,
TableId: 1,
}, {
Path: "log5",
NumberOfEntries: 5,
Length: 100,
Cf: stream.DefaultCF,
Type: backuppb.FileType_Put,
TableId: 2,
},
}
var applyWg sync.WaitGroup
applyFunc := func(
files []*restore.LogDataFileInfo,
kvCount int64,
size uint64,
) {
if len(files) == 0 {
return
}
applyWg.Add(1)
go func() {
defer applyWg.Done()
if files[0].Type == backuppb.FileType_Put {
time.Sleep(time.Second)
}
lock.Lock()
types = append(types, files[0].Type)
lock.Unlock()
}()
}

restore.ApplyKVFilesWithBatchMethod(
context.TODO(),
toLogDataFileInfoIter(iter.FromSlice(ds)),
2,
1500,
applyFunc,
&applyWg,
)

applyWg.Wait()
require.Equal(t, backuppb.FileType_Delete, types[len(types)-1])

types = make([]backuppb.FileType, 0)
restore.ApplyKVFilesWithSingelMethod(
context.TODO(),
toLogDataFileInfoIter(iter.FromSlice(ds)),
applyFunc,
&applyWg,
)

applyWg.Wait()
require.Equal(t, backuppb.FileType_Delete, types[len(types)-1])
}

func TestCheckNewCollationEnable(t *testing.T) {
caseList := []struct {
backupMeta *backuppb.BackupMeta
Expand Down

0 comments on commit e9a95da

Please sign in to comment.