Skip to content

Commit

Permalink
executor: add index hash join benchmark (#12564)
Browse files Browse the repository at this point in the history
  • Loading branch information
lzmhhh123 authored and sre-bot committed Oct 9, 2019
1 parent 0ec38f7 commit a177d46
Show file tree
Hide file tree
Showing 2 changed files with 275 additions and 4 deletions.
271 changes: 267 additions & 4 deletions executor/benchmark_test.go
Expand Up @@ -19,6 +19,7 @@ import (
"math/rand"
"sort"
"strings"
"sync"
"testing"

"github.com/pingcap/log"
Expand Down Expand Up @@ -180,6 +181,14 @@ func buildMockDataSource(opt mockDataSourceParameters) *mockDataSource {
return m
}

func buildMockDataSourceWithIndex(opt mockDataSourceParameters, index []int) *mockDataSource {
opt.orders = make([]bool, len(opt.schema.Columns))
for _, idx := range index {
opt.orders[idx] = true
}
return buildMockDataSource(opt)
}

type aggTestCase struct {
// The test table's schema is fixed (aggCol Double, groupBy LongLong).
execType string // "hash" or "stream"
Expand Down Expand Up @@ -544,11 +553,12 @@ func defaultHashJoinTestCase() *hashJoinTestCase {
ctx.GetSessionVars().InitChunkSize = variable.DefInitChunkSize
ctx.GetSessionVars().MaxChunkSize = variable.DefMaxChunkSize
ctx.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker(nil, -1)
ctx.GetSessionVars().IndexLookupJoinConcurrency = 4
tc := &hashJoinTestCase{rows: 100000, concurrency: 4, ctx: ctx, keyIdx: []int{0, 1}}
return tc
}

func prepare4Join(testCase *hashJoinTestCase, innerExec, outerExec Executor) *HashJoinExec {
func prepare4HashJoin(testCase *hashJoinTestCase, innerExec, outerExec Executor) *HashJoinExec {
cols0 := testCase.columns()
cols1 := testCase.columns()
joinSchema := expression.NewSchema(cols0...)
Expand Down Expand Up @@ -579,7 +589,7 @@ func prepare4Join(testCase *hashJoinTestCase, innerExec, outerExec Executor) *Ha
if testCase.disk {
memLimit = 1
}
t := memory.NewTracker(stringutil.StringerStr("root of prepare4Join"), memLimit)
t := memory.NewTracker(stringutil.StringerStr("root of prepare4HashJoin"), memLimit)
t.SetActionOnExceed(nil)
e.ctx.GetSessionVars().StmtCtx.MemTracker = t
return e
Expand Down Expand Up @@ -607,7 +617,7 @@ func benchmarkHashJoinExecWithCase(b *testing.B, casTest *hashJoinTestCase) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
b.StopTimer()
exec := prepare4Join(casTest, dataSource1, dataSource2)
exec := prepare4HashJoin(casTest, dataSource1, dataSource2)
tmpCtx := context.Background()
chk := newFirstChunk(exec)
dataSource1.prepareChunks()
Expand Down Expand Up @@ -689,7 +699,7 @@ func benchmarkBuildHashTableForList(b *testing.B, casTest *hashJoinTestCase) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
b.StopTimer()
exec := prepare4Join(casTest, dataSource1, dataSource2)
exec := prepare4HashJoin(casTest, dataSource1, dataSource2)
tmpCtx := context.Background()
if err := exec.Open(tmpCtx); err != nil {
b.Fatal(err)
Expand Down Expand Up @@ -740,3 +750,256 @@ func BenchmarkBuildHashTableForList(b *testing.B) {
}
}
}

type indexJoinTestCase struct {
outerRows int
innerRows int
concurrency int
ctx sessionctx.Context
outerJoinKeyIdx []int
innerJoinKeyIdx []int
innerIdx []int
needOuterSort bool
}

func (tc indexJoinTestCase) columns() []*expression.Column {
return []*expression.Column{
{Index: 0, RetType: types.NewFieldType(mysql.TypeLonglong)},
{Index: 1, RetType: types.NewFieldType(mysql.TypeDouble)},
{Index: 2, RetType: types.NewFieldType(mysql.TypeVarString)},
}
}

func defaultIndexJoinTestCase() *indexJoinTestCase {
ctx := mock.NewContext()
ctx.GetSessionVars().InitChunkSize = variable.DefInitChunkSize
ctx.GetSessionVars().MaxChunkSize = variable.DefMaxChunkSize
ctx.GetSessionVars().SnapshotTS = 1
ctx.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker(nil, -1)
tc := &indexJoinTestCase{
outerRows: 100000,
innerRows: variable.DefMaxChunkSize * 100,
concurrency: 4,
ctx: ctx,
outerJoinKeyIdx: []int{0, 1},
innerJoinKeyIdx: []int{0, 1},
innerIdx: []int{0, 1},
}
return tc
}

func (tc indexJoinTestCase) String() string {
return fmt.Sprintf("(outerRows:%v, innerRows:%v, concurency:%v, outerJoinKeyIdx: %v, innerJoinKeyIdx: %v, NeedOuterSort:%v)",
tc.outerRows, tc.innerRows, tc.concurrency, tc.outerJoinKeyIdx, tc.innerJoinKeyIdx, tc.needOuterSort)
}
func (tc indexJoinTestCase) getMockDataSourceOptByRows(rows int) mockDataSourceParameters {
return mockDataSourceParameters{
schema: expression.NewSchema(tc.columns()...),
rows: rows,
ctx: tc.ctx,
genDataFunc: func(row int, typ *types.FieldType) interface{} {
switch typ.Tp {
case mysql.TypeLong, mysql.TypeLonglong:
return int64(row)
case mysql.TypeDouble:
return float64(row)
case mysql.TypeVarString:
return rawData
default:
panic("not implement")
}
},
}
}

func prepare4IndexInnerHashJoin(tc *indexJoinTestCase, outerDS *mockDataSource, innerDS *mockDataSource) Executor {
outerCols, innerCols := tc.columns(), tc.columns()
joinSchema := expression.NewSchema(outerCols...)
joinSchema.Append(innerCols...)
leftTypes, rightTypes := retTypes(outerDS), retTypes(innerDS)
defaultValues := make([]types.Datum, len(innerCols))
colLens := make([]int, len(innerCols))
for i := range colLens {
colLens[i] = types.UnspecifiedLength
}
keyOff2IdxOff := make([]int, len(tc.outerJoinKeyIdx))
for i := range keyOff2IdxOff {
keyOff2IdxOff[i] = i
}
e := &IndexLookUpJoin{
baseExecutor: newBaseExecutor(tc.ctx, joinSchema, stringutil.StringerStr("IndexInnerHashJoin"), outerDS),
outerCtx: outerCtx{
rowTypes: leftTypes,
keyCols: tc.outerJoinKeyIdx,
},
innerCtx: innerCtx{
readerBuilder: &dataReaderBuilder{Plan: &mockPhysicalIndexReader{e: innerDS}, executorBuilder: newExecutorBuilder(tc.ctx, nil)},
rowTypes: rightTypes,
colLens: colLens,
keyCols: tc.innerJoinKeyIdx,
},
workerWg: new(sync.WaitGroup),
joiner: newJoiner(tc.ctx, 0, false, defaultValues, nil, leftTypes, rightTypes),
isOuterJoin: false,
keyOff2IdxOff: keyOff2IdxOff,
lastColHelper: nil,
}
e.joinResult = newFirstChunk(e)
return e
}

func prepare4IndexOuterHashJoin(tc *indexJoinTestCase, outerDS *mockDataSource, innerDS *mockDataSource) Executor {
e := prepare4IndexInnerHashJoin(tc, outerDS, innerDS).(*IndexLookUpJoin)
idxHash := &IndexNestedLoopHashJoin{IndexLookUpJoin: *e}
concurrency := tc.concurrency
idxHash.joiners = make([]joiner, concurrency)
for i := 0; i < concurrency; i++ {
idxHash.joiners[i] = e.joiner.Clone()
}
return idxHash
}

/*
func prepare4IndexMergeJoin(tc *indexJoinTestCase, outerDS *mockDataSource, innerDS *mockDataSource) Executor {
outerCols, innerCols := tc.columns(), tc.columns()
joinSchema := expression.NewSchema(outerCols...)
joinSchema.Append(innerCols...)
outerJoinKeys := make([]*expression.Column, 0, len(tc.outerJoinKeyIdx))
innerJoinKeys := make([]*expression.Column, 0, len(tc.innerJoinKeyIdx))
for _, keyIdx := range tc.outerJoinKeyIdx {
outerJoinKeys = append(outerJoinKeys, outerCols[keyIdx])
}
for _, keyIdx := range tc.innerJoinKeyIdx {
innerJoinKeys = append(innerJoinKeys, innerCols[keyIdx])
}
leftTypes, rightTypes := retTypes(outerDS), retTypes(innerDS)
defaultValues := make([]types.Datum, len(innerCols))
colLens := make([]int, len(innerCols))
for i := range colLens {
colLens[i] = types.UnspecifiedLength
}
keyOff2IdxOff := make([]int, len(outerJoinKeys))
for i := range keyOff2IdxOff {
keyOff2IdxOff[i] = i
}
compareFuncs := make([]expression.CompareFunc, 0, len(outerJoinKeys))
outerCompareFuncs := make([]expression.CompareFunc, 0, len(outerJoinKeys))
for i := range outerJoinKeys {
compareFuncs = append(compareFuncs, expression.GetCmpFunction(outerJoinKeys[i], innerJoinKeys[i]))
outerCompareFuncs = append(outerCompareFuncs, expression.GetCmpFunction(outerJoinKeys[i], outerJoinKeys[i]))
}
e := &IndexLookUpMergeJoin{
baseExecutor: newBaseExecutor(tc.ctx, joinSchema, stringutil.StringerStr("IndexMergeJoin"), outerDS),
outerMergeCtx: outerMergeCtx{
rowTypes: leftTypes,
keyCols: tc.outerJoinKeyIdx,
joinKeys: outerJoinKeys,
needOuterSort: tc.needOuterSort,
compareFuncs: outerCompareFuncs,
},
innerMergeCtx: innerMergeCtx{
readerBuilder: &dataReaderBuilder{Plan: &mockPhysicalIndexReader{e: innerDS}, executorBuilder: newExecutorBuilder(tc.ctx, nil)},
rowTypes: rightTypes,
joinKeys: innerJoinKeys,
colLens: colLens,
keyCols: tc.innerJoinKeyIdx,
compareFuncs: compareFuncs,
},
workerWg: new(sync.WaitGroup),
isOuterJoin: false,
keyOff2IdxOff: keyOff2IdxOff,
lastColHelper: nil,
}
joiners := make([]joiner, e.ctx.GetSessionVars().IndexLookupJoinConcurrency)
for i := 0; i < e.ctx.GetSessionVars().IndexLookupJoinConcurrency; i++ {
joiners[i] = newJoiner(tc.ctx, 0, false, defaultValues, nil, leftTypes, rightTypes)
}
e.joiners = joiners
return e
}
*/

type indexJoinType int8

const (
indexInnerHashJoin indexJoinType = iota
indexOuterHashJoin
indexMergeJoin
)

func benchmarkIndexJoinExecWithCase(
b *testing.B,
tc *indexJoinTestCase,
outerDS *mockDataSource,
innerDS *mockDataSource,
execType indexJoinType,
) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
b.StopTimer()
var exec Executor
switch execType {
case indexInnerHashJoin:
exec = prepare4IndexInnerHashJoin(tc, outerDS, innerDS)
case indexOuterHashJoin:
exec = prepare4IndexOuterHashJoin(tc, outerDS, innerDS)
case indexMergeJoin:
// exec = prepare4IndexMergeJoin(tc, outerDS, innerDS)
}

tmpCtx := context.Background()
chk := newFirstChunk(exec)
outerDS.prepareChunks()
innerDS.prepareChunks()

b.StartTimer()
if err := exec.Open(tmpCtx); err != nil {
b.Fatal(err)
}
for {
if err := exec.Next(tmpCtx, chk); err != nil {
b.Fatal(err)
}
if chk.NumRows() == 0 {
break
}
}

if err := exec.Close(); err != nil {
b.Fatal(err)
}
b.StopTimer()
}
}

func BenchmarkIndexJoinExec(b *testing.B) {
lvl := log.GetLevel()
log.SetLevel(zapcore.ErrorLevel)
defer log.SetLevel(lvl)

b.ReportAllocs()
tc := defaultIndexJoinTestCase()
outerOpt := tc.getMockDataSourceOptByRows(tc.outerRows)
innerOpt := tc.getMockDataSourceOptByRows(tc.innerRows)
outerDS := buildMockDataSourceWithIndex(outerOpt, tc.innerIdx)
innerDS := buildMockDataSourceWithIndex(innerOpt, tc.innerIdx)

// tc.needOuterSort = true
// b.Run(fmt.Sprintf("index merge join need outer sort %v", tc), func(b *testing.B) {
// benchmarkIndexJoinExecWithCase(b, tc, outerDS, innerDS, indexMergeJoin)
// })

// tc.needOuterSort = false
// b.Run(fmt.Sprintf("index merge join %v", tc), func(b *testing.B) {
// benchmarkIndexJoinExecWithCase(b, tc, outerDS, innerDS, indexMergeJoin)
// })

b.Run(fmt.Sprintf("index inner hash join %v", tc), func(b *testing.B) {
benchmarkIndexJoinExecWithCase(b, tc, outerDS, innerDS, indexInnerHashJoin)
})

b.Run(fmt.Sprintf("index outer hash join %v", tc), func(b *testing.B) {
benchmarkIndexJoinExecWithCase(b, tc, outerDS, innerDS, indexOuterHashJoin)
})
}
8 changes: 8 additions & 0 deletions executor/builder.go
Expand Up @@ -2066,6 +2066,8 @@ func (builder *dataReaderBuilder) buildExecutorForIndexJoin(ctx context.Context,
return builder.buildIndexLookUpReaderForIndexJoin(ctx, v, lookUpContents, IndexRanges, keyOff2IdxOff, cwc)
case *plannercore.PhysicalUnionScan:
return builder.buildUnionScanForIndexJoin(ctx, v, lookUpContents, IndexRanges, keyOff2IdxOff, cwc)
case *mockPhysicalIndexReader:
return v.e, nil
}
return nil, errors.New("Wrong plan type for dataReaderBuilder")
}
Expand Down Expand Up @@ -2123,6 +2125,12 @@ func (builder *dataReaderBuilder) buildTableReaderFromHandles(ctx context.Contex
return e, nil
}

type mockPhysicalIndexReader struct {
plannercore.PhysicalPlan

e Executor
}

func (builder *dataReaderBuilder) buildIndexReaderForIndexJoin(ctx context.Context, v *plannercore.PhysicalIndexReader,
lookUpContents []*indexJoinLookUpContent, indexRanges []*ranger.Range, keyOff2IdxOff []int, cwc *plannercore.ColWithCmpFuncManager) (Executor, error) {
e, err := buildNoRangeIndexReader(builder.executorBuilder, v)
Expand Down

0 comments on commit a177d46

Please sign in to comment.