Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

br: Fix backupmeta division bug #37246

Merged
merged 5 commits into from Aug 31, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
31 changes: 20 additions & 11 deletions br/pkg/metautil/metafile.go
Expand Up @@ -427,19 +427,21 @@ func (op AppendOp) name() string {
}

// appends item to MetaFile
func (op AppendOp) appendFile(a *backuppb.MetaFile, b interface{}) (size int, itemCount int) {
func (op AppendOp) appendFile(a *backuppb.MetaFile, b interface{}) (dataFileSize int, size int, itemCount int) {
switch op {
case AppendMetaFile:
a.MetaFiles = append(a.MetaFiles, b.(*backuppb.File))
size += int(b.(*backuppb.File).Size_)
metaFile := b.(*backuppb.File)
a.MetaFiles = append(a.MetaFiles, metaFile)
size += metaFile.Size()
itemCount++
case AppendDataFile:
// receive a batch of file because we need write and default sst are adjacent.
files := b.([]*backuppb.File)
a.DataFiles = append(a.DataFiles, files...)
for _, f := range files {
itemCount++
size += int(f.Size_)
size += f.Size()
dataFileSize += int(f.Size_)
}
case AppendSchema:
a.Schemas = append(a.Schemas, b.(*backuppb.Schema))
Expand All @@ -450,15 +452,16 @@ func (op AppendOp) appendFile(a *backuppb.MetaFile, b interface{}) (size int, it
itemCount++
size += len(b.([]byte))
}
return size, itemCount
return dataFileSize, size, itemCount
}

type sizedMetaFile struct {
// A stack like array, we always append to the last node.
root *backuppb.MetaFile
size int
itemNum int
sizeLimit int
root *backuppb.MetaFile
dataFileSize int
size int
itemNum int
sizeLimit int
}

// NewSizedMetaFile represents the sizedMetaFile.
Expand All @@ -476,9 +479,10 @@ func NewSizedMetaFile(sizeLimit int) *sizedMetaFile {
func (f *sizedMetaFile) append(file interface{}, op AppendOp) bool {
// append to root
// TODO maybe use multi level index
size, itemCount := op.appendFile(f.root, file)
dataFileSize, size, itemCount := op.appendFile(f.root, file)
f.itemNum += itemCount
f.size += size
f.dataFileSize += dataFileSize
// f.size would reset outside
return f.size > f.sizeLimit
}
Expand Down Expand Up @@ -510,6 +514,9 @@ type MetaWriter struct {
metaFileName string

cipher *backuppb.CipherInfo

// records the total datafile size
totalDataFileSize int
}

// NewMetaWriter creates MetaWriter.
Expand Down Expand Up @@ -716,6 +723,8 @@ func (writer *MetaWriter) flushMetasV2(ctx context.Context, op AppendOp) error {

name := op.name()
writer.metafileSizes[name] += writer.metafiles.size
writer.totalDataFileSize += writer.metafiles.dataFileSize

// Flush metafiles to external storage.
writer.metafileSeqNum["metafiles"]++
fname := fmt.Sprintf("backupmeta.%s.%09d", name, writer.metafileSeqNum["metafiles"])
Expand Down Expand Up @@ -748,7 +757,7 @@ func (writer *MetaWriter) ArchiveSize() uint64 {
for _, file := range writer.backupMeta.Files {
total += file.Size_
}
total += uint64(writer.metafileSizes["datafile"])
total += uint64(writer.totalDataFileSize)
return total
}

Expand Down
38 changes: 38 additions & 0 deletions br/pkg/metautil/metafile_test.go
Expand Up @@ -215,3 +215,41 @@ func TestEncryptAndDecrypt(t *testing.T) {
}
}
}

func TestMetaFileSize(t *testing.T) {
files := []*backuppb.File{
{Name: "f0", Size_: 99999}, // Size() is 8
{Name: "f1", Size_: 99999},
{Name: "f2", Size_: 99999},
{Name: "f3", Size_: 99999},
{Name: "f4", Size_: 99999},
{Name: "f5", Size_: 99999},
}
metafiles := NewSizedMetaFile(50) // >= 50, then flush

needFlush := metafiles.append(files, AppendDataFile)
t.Logf("needFlush: %v, %+v", needFlush, metafiles)
require.False(t, needFlush)

needFlush = metafiles.append([]*backuppb.File{
{Name: "f5", Size_: 99999},
}, AppendDataFile)
t.Logf("needFlush: %v, %+v", needFlush, metafiles)
require.True(t, needFlush)

metas := []*backuppb.File{
{Name: "meta0", Size_: 99999}, // Size() is 11
{Name: "meta1", Size_: 99999},
{Name: "meta2", Size_: 99999},
{Name: "meta3", Size_: 99999},
}
metafiles = NewSizedMetaFile(50)
for _, meta := range metas {
needFlush = metafiles.append(meta, AppendMetaFile)
t.Logf("needFlush: %v, %+v", needFlush, metafiles)
require.False(t, needFlush)
}
needFlush = metafiles.append(&backuppb.File{Name: "meta4", Size_: 99999}, AppendMetaFile)
t.Logf("needFlush: %v, %+v", needFlush, metafiles)
require.True(t, needFlush)
}