Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

br: add waitgroup for delete type files #43751

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
11 changes: 9 additions & 2 deletions br/pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2276,6 +2276,7 @@ func ApplyKVFilesWithBatchMethod(
batchCount int,
batchSize uint64,
applyFunc func(files []*LogDataFileInfo, kvCount int64, size uint64),
applyWg *sync.WaitGroup,
) error {
var (
tableMapFiles = make(map[int64]*FilesInTable)
Expand Down Expand Up @@ -2354,6 +2355,7 @@ func ApplyKVFilesWithBatchMethod(
}
}

applyWg.Wait()
for _, fwt := range tableMapFiles {
for _, fs := range fwt.regionMapFiles {
for _, d := range fs.deleteFiles {
Expand Down Expand Up @@ -2384,6 +2386,7 @@ func ApplyKVFilesWithSingelMethod(
ctx context.Context,
files LogIter,
applyFunc func(file []*LogDataFileInfo, kvCount int64, size uint64),
applyWg *sync.WaitGroup,
) error {
deleteKVFiles := make([]*LogDataFileInfo, 0)

Expand All @@ -2400,6 +2403,7 @@ func ApplyKVFilesWithSingelMethod(
applyFunc([]*LogDataFileInfo{f}, f.GetNumberOfEntries(), f.GetLength())
}

applyWg.Wait()
log.Info("restore delete files", zap.Int("count", len(deleteKVFiles)))
for _, file := range deleteKVFiles {
f := file
Expand Down Expand Up @@ -2441,6 +2445,7 @@ func (rc *Client) RestoreKVFiles(
ctx = opentracing.ContextWithSpan(ctx, span1)
}

var applyWg sync.WaitGroup
eg, ectx := errgroup.WithContext(ctx)
applyFunc := func(files []*LogDataFileInfo, kvCount int64, size uint64) {
if len(files) == 0 {
Expand All @@ -2459,9 +2464,11 @@ func (rc *Client) RestoreKVFiles(
log.Debug("skip file due to table id not matched", zap.Int64("table-id", files[0].TableId))
skipFile += len(files)
} else {
applyWg.Add(1)
downstreamId := idrules[files[0].TableId]
rc.workerPool.ApplyOnErrorGroup(eg, func() (err error) {
fileStart := time.Now()
defer applyWg.Done()
defer func() {
onProgress(int64(len(files)))
updateStats(uint64(kvCount), size)
Expand Down Expand Up @@ -2494,9 +2501,9 @@ func (rc *Client) RestoreKVFiles(

rc.workerPool.ApplyOnErrorGroup(eg, func() error {
if supportBatch {
err = ApplyKVFilesWithBatchMethod(ectx, logIter, int(pitrBatchCount), uint64(pitrBatchSize), applyFunc)
err = ApplyKVFilesWithBatchMethod(ectx, logIter, int(pitrBatchCount), uint64(pitrBatchSize), applyFunc, &applyWg)
} else {
err = ApplyKVFilesWithSingelMethod(ectx, logIter, applyFunc)
err = ApplyKVFilesWithSingelMethod(ectx, logIter, applyFunc, &applyWg)
}
return errors.Trace(err)
})
Expand Down
96 changes: 96 additions & 0 deletions br/pkg/restore/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1169,6 +1169,7 @@ func TestApplyKVFilesWithSingelMethod(t *testing.T) {
Type: backuppb.FileType_Put,
},
}
var applyWg sync.WaitGroup
applyFunc := func(
files []*restore.LogDataFileInfo,
kvCount int64,
Expand All @@ -1185,6 +1186,7 @@ func TestApplyKVFilesWithSingelMethod(t *testing.T) {
context.TODO(),
toLogDataFileInfoIter(iter.FromSlice(ds)),
applyFunc,
&applyWg,
)

require.Equal(t, totalKVCount, int64(15))
Expand Down Expand Up @@ -1239,6 +1241,7 @@ func TestApplyKVFilesWithBatchMethod1(t *testing.T) {
RegionId: 1,
},
}
var applyWg sync.WaitGroup
applyFunc := func(
files []*restore.LogDataFileInfo,
kvCount int64,
Expand All @@ -1259,6 +1262,7 @@ func TestApplyKVFilesWithBatchMethod1(t *testing.T) {
batchCount,
batchSize,
applyFunc,
&applyWg,
)

require.Equal(t, runCount, 3)
Expand Down Expand Up @@ -1327,6 +1331,7 @@ func TestApplyKVFilesWithBatchMethod2(t *testing.T) {
RegionId: 1,
},
}
var applyWg sync.WaitGroup
applyFunc := func(
files []*restore.LogDataFileInfo,
kvCount int64,
Expand All @@ -1347,6 +1352,7 @@ func TestApplyKVFilesWithBatchMethod2(t *testing.T) {
batchCount,
batchSize,
applyFunc,
&applyWg,
)

require.Equal(t, runCount, 4)
Expand Down Expand Up @@ -1409,6 +1415,7 @@ func TestApplyKVFilesWithBatchMethod3(t *testing.T) {
RegionId: 3,
},
}
var applyWg sync.WaitGroup
applyFunc := func(
files []*restore.LogDataFileInfo,
kvCount int64,
Expand All @@ -1429,6 +1436,7 @@ func TestApplyKVFilesWithBatchMethod3(t *testing.T) {
batchCount,
batchSize,
applyFunc,
&applyWg,
)

require.Equal(t, totalKVCount, int64(25))
Expand Down Expand Up @@ -1489,6 +1497,7 @@ func TestApplyKVFilesWithBatchMethod4(t *testing.T) {
TableId: 2,
},
}
var applyWg sync.WaitGroup
applyFunc := func(
files []*restore.LogDataFileInfo,
kvCount int64,
Expand All @@ -1509,6 +1518,7 @@ func TestApplyKVFilesWithBatchMethod4(t *testing.T) {
batchCount,
batchSize,
applyFunc,
&applyWg,
)

require.Equal(t, runCount, 4)
Expand All @@ -1524,6 +1534,92 @@ func TestApplyKVFilesWithBatchMethod4(t *testing.T) {
)
}

func TestApplyKVFilesWithBatchMethod5(t *testing.T) {
var lock sync.Mutex
types := make([]backuppb.FileType, 0)
ds := []*backuppb.DataFileInfo{
{
Path: "log1",
NumberOfEntries: 5,
Length: 2000,
Cf: stream.WriteCF,
Type: backuppb.FileType_Delete,
TableId: 1,
}, {
Path: "log2",
NumberOfEntries: 5,
Length: 100,
Cf: stream.WriteCF,
Type: backuppb.FileType_Put,
TableId: 1,
}, {
Path: "log3",
NumberOfEntries: 5,
Length: 100,
Cf: stream.WriteCF,
Type: backuppb.FileType_Put,
TableId: 2,
}, {
Path: "log4",
NumberOfEntries: 5,
Length: 100,
Cf: stream.WriteCF,
Type: backuppb.FileType_Put,
TableId: 1,
}, {
Path: "log5",
NumberOfEntries: 5,
Length: 100,
Cf: stream.DefaultCF,
Type: backuppb.FileType_Put,
TableId: 2,
},
}
var applyWg sync.WaitGroup
applyFunc := func(
files []*restore.LogDataFileInfo,
kvCount int64,
size uint64,
) {
if len(files) == 0 {
return
}
applyWg.Add(1)
go func() {
defer applyWg.Done()
if files[0].Type == backuppb.FileType_Put {
time.Sleep(time.Second)
}
lock.Lock()
types = append(types, files[0].Type)
lock.Unlock()
}()
}

restore.ApplyKVFilesWithBatchMethod(
context.TODO(),
toLogDataFileInfoIter(iter.FromSlice(ds)),
2,
1500,
applyFunc,
&applyWg,
)

applyWg.Wait()
require.Equal(t, backuppb.FileType_Delete, types[len(types)-1])

types = make([]backuppb.FileType, 0)
restore.ApplyKVFilesWithSingelMethod(
context.TODO(),
toLogDataFileInfoIter(iter.FromSlice(ds)),
applyFunc,
&applyWg,
)

applyWg.Wait()
require.Equal(t, backuppb.FileType_Delete, types[len(types)-1])
}

func TestCheckNewCollationEnable(t *testing.T) {
caseList := []struct {
backupMeta *backuppb.BackupMeta
Expand Down