Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: add index hash join benchmark #12564

Merged
merged 3 commits into from Oct 9, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
271 changes: 267 additions & 4 deletions executor/benchmark_test.go
Expand Up @@ -19,6 +19,7 @@ import (
"math/rand"
"sort"
"strings"
"sync"
"testing"

"github.com/pingcap/log"
Expand Down Expand Up @@ -180,6 +181,14 @@ func buildMockDataSource(opt mockDataSourceParameters) *mockDataSource {
return m
}

func buildMockDataSourceWithIndex(opt mockDataSourceParameters, index []int) *mockDataSource {
opt.orders = make([]bool, len(opt.schema.Columns))
for _, idx := range index {
opt.orders[idx] = true
}
return buildMockDataSource(opt)
}

type aggTestCase struct {
// The test table's schema is fixed (aggCol Double, groupBy LongLong).
execType string // "hash" or "stream"
Expand Down Expand Up @@ -544,11 +553,12 @@ func defaultHashJoinTestCase() *hashJoinTestCase {
ctx.GetSessionVars().InitChunkSize = variable.DefInitChunkSize
ctx.GetSessionVars().MaxChunkSize = variable.DefMaxChunkSize
ctx.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker(nil, -1)
ctx.GetSessionVars().IndexLookupJoinConcurrency = 4
tc := &hashJoinTestCase{rows: 100000, concurrency: 4, ctx: ctx, keyIdx: []int{0, 1}}
return tc
}

func prepare4Join(testCase *hashJoinTestCase, innerExec, outerExec Executor) *HashJoinExec {
func prepare4HashJoin(testCase *hashJoinTestCase, innerExec, outerExec Executor) *HashJoinExec {
cols0 := testCase.columns()
cols1 := testCase.columns()
joinSchema := expression.NewSchema(cols0...)
Expand Down Expand Up @@ -579,7 +589,7 @@ func prepare4Join(testCase *hashJoinTestCase, innerExec, outerExec Executor) *Ha
if testCase.disk {
memLimit = 1
}
t := memory.NewTracker(stringutil.StringerStr("root of prepare4Join"), memLimit)
t := memory.NewTracker(stringutil.StringerStr("root of prepare4HashJoin"), memLimit)
t.SetActionOnExceed(nil)
e.ctx.GetSessionVars().StmtCtx.MemTracker = t
return e
Expand Down Expand Up @@ -607,7 +617,7 @@ func benchmarkHashJoinExecWithCase(b *testing.B, casTest *hashJoinTestCase) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
b.StopTimer()
exec := prepare4Join(casTest, dataSource1, dataSource2)
exec := prepare4HashJoin(casTest, dataSource1, dataSource2)
tmpCtx := context.Background()
chk := newFirstChunk(exec)
dataSource1.prepareChunks()
Expand Down Expand Up @@ -689,7 +699,7 @@ func benchmarkBuildHashTableForList(b *testing.B, casTest *hashJoinTestCase) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
b.StopTimer()
exec := prepare4Join(casTest, dataSource1, dataSource2)
exec := prepare4HashJoin(casTest, dataSource1, dataSource2)
tmpCtx := context.Background()
if err := exec.Open(tmpCtx); err != nil {
b.Fatal(err)
Expand Down Expand Up @@ -740,3 +750,256 @@ func BenchmarkBuildHashTableForList(b *testing.B) {
}
}
}

type indexJoinTestCase struct {
outerRows int
innerRows int
concurrency int
ctx sessionctx.Context
outerJoinKeyIdx []int
innerJoinKeyIdx []int
innerIdx []int
needOuterSort bool
}

func (tc indexJoinTestCase) columns() []*expression.Column {
return []*expression.Column{
{Index: 0, RetType: types.NewFieldType(mysql.TypeLonglong)},
{Index: 1, RetType: types.NewFieldType(mysql.TypeDouble)},
{Index: 2, RetType: types.NewFieldType(mysql.TypeVarString)},
}
}

func defaultIndexJoinTestCase() *indexJoinTestCase {
ctx := mock.NewContext()
ctx.GetSessionVars().InitChunkSize = variable.DefInitChunkSize
ctx.GetSessionVars().MaxChunkSize = variable.DefMaxChunkSize
ctx.GetSessionVars().SnapshotTS = 1
ctx.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker(nil, -1)
tc := &indexJoinTestCase{
outerRows: 100000,
innerRows: variable.DefMaxChunkSize * 100,
concurrency: 4,
ctx: ctx,
outerJoinKeyIdx: []int{0, 1},
innerJoinKeyIdx: []int{0, 1},
innerIdx: []int{0, 1},
}
return tc
}

func (tc indexJoinTestCase) String() string {
return fmt.Sprintf("(outerRows:%v, innerRows:%v, concurency:%v, outerJoinKeyIdx: %v, innerJoinKeyIdx: %v, NeedOuterSort:%v)",
tc.outerRows, tc.innerRows, tc.concurrency, tc.outerJoinKeyIdx, tc.innerJoinKeyIdx, tc.needOuterSort)
}
func (tc indexJoinTestCase) getMockDataSourceOptByRows(rows int) mockDataSourceParameters {
return mockDataSourceParameters{
schema: expression.NewSchema(tc.columns()...),
rows: rows,
ctx: tc.ctx,
genDataFunc: func(row int, typ *types.FieldType) interface{} {
switch typ.Tp {
case mysql.TypeLong, mysql.TypeLonglong:
return int64(row)
case mysql.TypeDouble:
return float64(row)
case mysql.TypeVarString:
return rawData
default:
panic("not implement")
}
},
}
}

func prepare4IndexInnerHashJoin(tc *indexJoinTestCase, outerDS *mockDataSource, innerDS *mockDataSource) Executor {
outerCols, innerCols := tc.columns(), tc.columns()
joinSchema := expression.NewSchema(outerCols...)
joinSchema.Append(innerCols...)
leftTypes, rightTypes := retTypes(outerDS), retTypes(innerDS)
defaultValues := make([]types.Datum, len(innerCols))
colLens := make([]int, len(innerCols))
for i := range colLens {
colLens[i] = types.UnspecifiedLength
}
keyOff2IdxOff := make([]int, len(tc.outerJoinKeyIdx))
for i := range keyOff2IdxOff {
keyOff2IdxOff[i] = i
}
e := &IndexLookUpJoin{
baseExecutor: newBaseExecutor(tc.ctx, joinSchema, stringutil.StringerStr("IndexInnerHashJoin"), outerDS),
outerCtx: outerCtx{
rowTypes: leftTypes,
keyCols: tc.outerJoinKeyIdx,
},
innerCtx: innerCtx{
readerBuilder: &dataReaderBuilder{Plan: &mockPhysicalIndexReader{e: innerDS}, executorBuilder: newExecutorBuilder(tc.ctx, nil)},
rowTypes: rightTypes,
colLens: colLens,
keyCols: tc.innerJoinKeyIdx,
},
workerWg: new(sync.WaitGroup),
joiner: newJoiner(tc.ctx, 0, false, defaultValues, nil, leftTypes, rightTypes),
isOuterJoin: false,
keyOff2IdxOff: keyOff2IdxOff,
lastColHelper: nil,
}
e.joinResult = newFirstChunk(e)
return e
}

func prepare4IndexOuterHashJoin(tc *indexJoinTestCase, outerDS *mockDataSource, innerDS *mockDataSource) Executor {
e := prepare4IndexInnerHashJoin(tc, outerDS, innerDS).(*IndexLookUpJoin)
idxHash := &IndexNestedLoopHashJoin{IndexLookUpJoin: *e}
concurrency := tc.concurrency
idxHash.joiners = make([]joiner, concurrency)
for i := 0; i < concurrency; i++ {
idxHash.joiners[i] = e.joiner.Clone()
}
return idxHash
}

/*
func prepare4IndexMergeJoin(tc *indexJoinTestCase, outerDS *mockDataSource, innerDS *mockDataSource) Executor {
outerCols, innerCols := tc.columns(), tc.columns()
joinSchema := expression.NewSchema(outerCols...)
joinSchema.Append(innerCols...)
outerJoinKeys := make([]*expression.Column, 0, len(tc.outerJoinKeyIdx))
innerJoinKeys := make([]*expression.Column, 0, len(tc.innerJoinKeyIdx))
for _, keyIdx := range tc.outerJoinKeyIdx {
outerJoinKeys = append(outerJoinKeys, outerCols[keyIdx])
}
for _, keyIdx := range tc.innerJoinKeyIdx {
innerJoinKeys = append(innerJoinKeys, innerCols[keyIdx])
}
leftTypes, rightTypes := retTypes(outerDS), retTypes(innerDS)
defaultValues := make([]types.Datum, len(innerCols))
colLens := make([]int, len(innerCols))
for i := range colLens {
colLens[i] = types.UnspecifiedLength
}
keyOff2IdxOff := make([]int, len(outerJoinKeys))
for i := range keyOff2IdxOff {
keyOff2IdxOff[i] = i
}

compareFuncs := make([]expression.CompareFunc, 0, len(outerJoinKeys))
outerCompareFuncs := make([]expression.CompareFunc, 0, len(outerJoinKeys))
for i := range outerJoinKeys {
compareFuncs = append(compareFuncs, expression.GetCmpFunction(outerJoinKeys[i], innerJoinKeys[i]))
outerCompareFuncs = append(outerCompareFuncs, expression.GetCmpFunction(outerJoinKeys[i], outerJoinKeys[i]))
}
e := &IndexLookUpMergeJoin{
baseExecutor: newBaseExecutor(tc.ctx, joinSchema, stringutil.StringerStr("IndexMergeJoin"), outerDS),
outerMergeCtx: outerMergeCtx{
rowTypes: leftTypes,
keyCols: tc.outerJoinKeyIdx,
joinKeys: outerJoinKeys,
needOuterSort: tc.needOuterSort,
compareFuncs: outerCompareFuncs,
},
innerMergeCtx: innerMergeCtx{
readerBuilder: &dataReaderBuilder{Plan: &mockPhysicalIndexReader{e: innerDS}, executorBuilder: newExecutorBuilder(tc.ctx, nil)},
rowTypes: rightTypes,
joinKeys: innerJoinKeys,
colLens: colLens,
keyCols: tc.innerJoinKeyIdx,
compareFuncs: compareFuncs,
},
workerWg: new(sync.WaitGroup),
isOuterJoin: false,
keyOff2IdxOff: keyOff2IdxOff,
lastColHelper: nil,
}
joiners := make([]joiner, e.ctx.GetSessionVars().IndexLookupJoinConcurrency)
for i := 0; i < e.ctx.GetSessionVars().IndexLookupJoinConcurrency; i++ {
joiners[i] = newJoiner(tc.ctx, 0, false, defaultValues, nil, leftTypes, rightTypes)
}
e.joiners = joiners
return e
}
*/

type indexJoinType int8

const (
indexInnerHashJoin indexJoinType = iota
indexOuterHashJoin
indexMergeJoin
)

func benchmarkIndexJoinExecWithCase(
b *testing.B,
tc *indexJoinTestCase,
outerDS *mockDataSource,
innerDS *mockDataSource,
execType indexJoinType,
) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
b.StopTimer()
var exec Executor
switch execType {
case indexInnerHashJoin:
exec = prepare4IndexInnerHashJoin(tc, outerDS, innerDS)
case indexOuterHashJoin:
exec = prepare4IndexOuterHashJoin(tc, outerDS, innerDS)
case indexMergeJoin:
// exec = prepare4IndexMergeJoin(tc, outerDS, innerDS)
}

tmpCtx := context.Background()
chk := newFirstChunk(exec)
outerDS.prepareChunks()
innerDS.prepareChunks()

b.StartTimer()
if err := exec.Open(tmpCtx); err != nil {
b.Fatal(err)
}
for {
if err := exec.Next(tmpCtx, chk); err != nil {
b.Fatal(err)
}
if chk.NumRows() == 0 {
break
}
}

if err := exec.Close(); err != nil {
b.Fatal(err)
}
b.StopTimer()
}
}

func BenchmarkIndexJoinExec(b *testing.B) {
lvl := log.GetLevel()
log.SetLevel(zapcore.ErrorLevel)
defer log.SetLevel(lvl)

b.ReportAllocs()
tc := defaultIndexJoinTestCase()
outerOpt := tc.getMockDataSourceOptByRows(tc.outerRows)
innerOpt := tc.getMockDataSourceOptByRows(tc.innerRows)
outerDS := buildMockDataSourceWithIndex(outerOpt, tc.innerIdx)
innerDS := buildMockDataSourceWithIndex(innerOpt, tc.innerIdx)

// tc.needOuterSort = true
// b.Run(fmt.Sprintf("index merge join need outer sort %v", tc), func(b *testing.B) {
// benchmarkIndexJoinExecWithCase(b, tc, outerDS, innerDS, indexMergeJoin)
// })

// tc.needOuterSort = false
// b.Run(fmt.Sprintf("index merge join %v", tc), func(b *testing.B) {
// benchmarkIndexJoinExecWithCase(b, tc, outerDS, innerDS, indexMergeJoin)
// })

b.Run(fmt.Sprintf("index inner hash join %v", tc), func(b *testing.B) {
benchmarkIndexJoinExecWithCase(b, tc, outerDS, innerDS, indexInnerHashJoin)
})

b.Run(fmt.Sprintf("index outer hash join %v", tc), func(b *testing.B) {
benchmarkIndexJoinExecWithCase(b, tc, outerDS, innerDS, indexOuterHashJoin)
})
}
8 changes: 8 additions & 0 deletions executor/builder.go
Expand Up @@ -2066,6 +2066,8 @@ func (builder *dataReaderBuilder) buildExecutorForIndexJoin(ctx context.Context,
return builder.buildIndexLookUpReaderForIndexJoin(ctx, v, lookUpContents, IndexRanges, keyOff2IdxOff, cwc)
case *plannercore.PhysicalUnionScan:
return builder.buildUnionScanForIndexJoin(ctx, v, lookUpContents, IndexRanges, keyOff2IdxOff, cwc)
case *mockPhysicalIndexReader:
return v.e, nil
}
return nil, errors.New("Wrong plan type for dataReaderBuilder")
}
Expand Down Expand Up @@ -2123,6 +2125,12 @@ func (builder *dataReaderBuilder) buildTableReaderFromHandles(ctx context.Contex
return e, nil
}

type mockPhysicalIndexReader struct {
plannercore.PhysicalPlan

e Executor
}

func (builder *dataReaderBuilder) buildIndexReaderForIndexJoin(ctx context.Context, v *plannercore.PhysicalIndexReader,
lookUpContents []*indexJoinLookUpContent, indexRanges []*ranger.Range, keyOff2IdxOff []int, cwc *plannercore.ColWithCmpFuncManager) (Executor, error) {
e, err := buildNoRangeIndexReader(builder.executorBuilder, v)
Expand Down