Skip to content

Commit

Permalink
global sort: bench merge step (#49376)
Browse files Browse the repository at this point in the history
ref #48952
  • Loading branch information
ywqzzy committed Dec 12, 2023
1 parent 35ab481 commit 1b199c2
Showing 1 changed file with 121 additions and 11 deletions.
132 changes: 121 additions & 11 deletions br/pkg/lightning/backend/external/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/pingcap/tidb/pkg/util/size"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -574,11 +575,10 @@ func readFileConcurrently(t *testing.T, s *readTestSuite) {
}

func createEvenlyDistributedFiles(
t *testing.T,
store storage.ExternalStorage,
fileSize, fileCount int,
subDir string,
) (storage.ExternalStorage, int) {
store := openTestingStorage(t)
) int {
ctx := context.Background()

cleanOldFiles(ctx, store, "/"+subDir)
Expand Down Expand Up @@ -608,7 +608,7 @@ func createEvenlyDistributedFiles(
err := writer.Close(ctx)
intest.AssertNoError(err)
}
return store, kvCnt
return kvCnt
}

func readMergeIter(t *testing.T, s *readTestSuite) {
Expand Down Expand Up @@ -655,7 +655,9 @@ func TestCompareReaderEvenlyDistributedContent(t *testing.T) {
fileSize := 50 * 1024 * 1024
fileCnt := 24
subDir := "evenly_distributed"
store, kvCnt := createEvenlyDistributedFiles(t, fileSize, fileCnt, subDir)
store := openTestingStorage(t)

kvCnt := createEvenlyDistributedFiles(store, fileSize, fileCnt, subDir)
memoryLimit := 64 * 1024 * 1024
fileIdx := 0
var (
Expand Down Expand Up @@ -763,29 +765,32 @@ var (
)

func TestReadFileConcurrently(t *testing.T) {
testCompareReaderAscendingContent(t, readFileConcurrently)
testCompareReaderWithContent(t, createAscendingFiles, readFileConcurrently)
}

func TestReadFileSequential(t *testing.T) {
testCompareReaderAscendingContent(t, readFileSequential)
testCompareReaderWithContent(t, createAscendingFiles, readFileSequential)
}

func TestReadMergeIterCheckHotspot(t *testing.T) {
testCompareReaderAscendingContent(t, func(t *testing.T, suite *readTestSuite) {
testCompareReaderWithContent(t, createAscendingFiles, func(t *testing.T, suite *readTestSuite) {
suite.mergeIterHotspot = true
readMergeIter(t, suite)
})
}

func TestReadMergeIterWithoutCheckHotspot(t *testing.T) {
testCompareReaderAscendingContent(t, readMergeIter)
testCompareReaderWithContent(t, createAscendingFiles, readMergeIter)
}

func testCompareReaderAscendingContent(t *testing.T, fn func(t *testing.T, suite *readTestSuite)) {
func testCompareReaderWithContent(
t *testing.T,
createFn func(store storage.ExternalStorage, fileSize int, fileCount int, objectPrefix string) int,
fn func(t *testing.T, suite *readTestSuite)) {
store := openTestingStorage(t)
kvCnt := 0
if !*skipCreate {
kvCnt = createAscendingFiles(store, *fileSize, *fileCount, *objectPrefix)
kvCnt = createFn(store, *fileSize, *fileCount, *objectPrefix)
}
fileIdx := 0
var (
Expand Down Expand Up @@ -896,3 +901,108 @@ func TestPrepareLargeData(t *testing.T) {
t.Logf("total %d data files, first file size: %.2f MB, last file size: %.2f MB",
len(dataFiles), float64(firstFileSize)/1024/1024, float64(lastFileSize)/1024/1024)
}

type mergeTestSuite struct {
store storage.ExternalStorage
subDir string
totalKVCnt int
concurrency int
memoryLimit int
mergeIterHotspot bool
minKey kv.Key
maxKey kv.Key
beforeMerge func()
afterMerge func()
}

func mergeStep(t *testing.T, s *mergeTestSuite) {
ctx := context.Background()
datas, _, err := GetAllFileNames(ctx, s.store, "/"+s.subDir)
intest.AssertNoError(err)

mergeOutput := "merge_output"
totalSize := atomic.NewUint64(0)
onClose := func(s *WriterSummary) {
totalSize.Add(s.TotalSize)
}
if s.beforeMerge != nil {
s.beforeMerge()
}

now := time.Now()
err = MergeOverlappingFiles(
ctx,
datas,
s.store,
int64(5*size.MB),
64*1024,
mergeOutput,
DefaultBlockSize,
8*1024,
1*size.MB,
8*1024,
onClose,
s.concurrency,
s.mergeIterHotspot,
)

intest.AssertNoError(err)
if s.afterMerge != nil {
s.afterMerge()
}
elapsed := time.Since(now)
t.Logf(
"merge speed for %d bytes in %s, speed: %.2f MB/s",
totalSize.Load(),
elapsed,
float64(totalSize.Load())/elapsed.Seconds()/1024/1024,
)
}

func testCompareMergeWithContent(
t *testing.T,
createFn func(store storage.ExternalStorage, fileSize int, fileCount int, objectPrefix string) int,
fn func(t *testing.T, suite *mergeTestSuite)) {
store := openTestingStorage(t)
kvCnt := 0
var minKey, maxKey kv.Key
if !*skipCreate {
kvCnt = createFn(store, *fileSize, *fileCount, *objectPrefix)
}

fileIdx := 0
var (
file *os.File
err error
)
beforeTest := func() {
file, err = os.Create(fmt.Sprintf("cpu-profile-%d.prof", fileIdx))
intest.AssertNoError(err)
err = pprof.StartCPUProfile(file)
intest.AssertNoError(err)
}

afterTest := func() {
pprof.StopCPUProfile()
}

suite := &mergeTestSuite{
store: store,
totalKVCnt: kvCnt,
concurrency: *concurrency,
memoryLimit: *memoryLimit,
beforeMerge: beforeTest,
afterMerge: afterTest,
subDir: *objectPrefix,
minKey: minKey,
maxKey: maxKey,
mergeIterHotspot: true,
}

fn(t, suite)
}

func TestMergeBench(t *testing.T) {
testCompareMergeWithContent(t, createAscendingFiles, mergeStep)
testCompareMergeWithContent(t, createEvenlyDistributedFiles, mergeStep)
}

0 comments on commit 1b199c2

Please sign in to comment.