Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: implement disk-based hash join #12067

Merged
merged 14 commits into from Sep 24, 2019
2 changes: 2 additions & 0 deletions config/config.go
Expand Up @@ -67,6 +67,7 @@ type Config struct {
RunDDL bool `toml:"run-ddl" json:"run-ddl"`
SplitTable bool `toml:"split-table" json:"split-table"`
TokenLimit uint `toml:"token-limit" json:"token-limit"`
OOMUseTmpStorage bool `toml:"oom-use-tmp-storage" json:"oom-use-tmp-storage"`
OOMAction string `toml:"oom-action" json:"oom-action"`
MemQuotaQuery int64 `toml:"mem-quota-query" json:"mem-quota-query"`
EnableStreaming bool `toml:"enable-streaming" json:"enable-streaming"`
Expand Down Expand Up @@ -336,6 +337,7 @@ var defaultConf = Config{
SplitTable: true,
Lease: "45s",
TokenLimit: 1000,
OOMUseTmpStorage: true,
OOMAction: "log",
MemQuotaQuery: 32 << 30,
EnableStreaming: false,
Expand Down
11 changes: 7 additions & 4 deletions config/config.toml.example
Expand Up @@ -31,13 +31,16 @@ split-table = true
# The limit of concurrent executed sessions.
token-limit = 1000

# Only print a log when out of memory quota.
# Valid options: ["log", "cancel"]
oom-action = "log"

# Set the memory quota for a query in bytes. Default: 32GB
mem-quota-query = 34359738368

# Set to true to enable use of temporary disk for some executors when mem-quota-query is exceeded.
oom-use-tmp-storage = true

# What to do when mem-quota-query is exceeded and can not be spilled over to disk any more.
# Valid options: ["log", "cancel"]
oom-action = "log"

# Enable coprocessor streaming.
enable-streaming = false

Expand Down
88 changes: 65 additions & 23 deletions executor/benchmark_test.go
Expand Up @@ -21,6 +21,7 @@ import (
"strings"
"testing"

"github.com/pingcap/log"
"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/expression"
Expand All @@ -34,6 +35,7 @@ import (
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/mock"
"github.com/pingcap/tidb/util/stringutil"
"go.uber.org/zap/zapcore"
)

var (
Expand Down Expand Up @@ -522,6 +524,7 @@ type hashJoinTestCase struct {
concurrency int
ctx sessionctx.Context
keyIdx []int
disk bool
}

func (tc hashJoinTestCase) columns() []*expression.Column {
Expand All @@ -532,8 +535,8 @@ func (tc hashJoinTestCase) columns() []*expression.Column {
}

func (tc hashJoinTestCase) String() string {
return fmt.Sprintf("(rows:%v, concurency:%v, joinKeyIdx: %v)",
tc.rows, tc.concurrency, tc.keyIdx)
return fmt.Sprintf("(rows:%v, concurency:%v, joinKeyIdx: %v, disk:%v)",
tc.rows, tc.concurrency, tc.keyIdx, tc.disk)
}

func defaultHashJoinTestCase() *hashJoinTestCase {
Expand Down Expand Up @@ -572,6 +575,13 @@ func prepare4Join(testCase *hashJoinTestCase, innerExec, outerExec Executor) *Ha
e.joiners[i] = newJoiner(testCase.ctx, e.joinType, true, defaultValues,
nil, lhsTypes, rhsTypes)
}
memLimit := int64(-1)
if testCase.disk {
memLimit = 1
}
t := memory.NewTracker(stringutil.StringerStr("root of prepare4Join"), memLimit)
t.SetActionOnExceed(nil)
e.ctx.GetSessionVars().StmtCtx.MemTracker = t
return e
}

Expand Down Expand Up @@ -620,10 +630,17 @@ func benchmarkHashJoinExecWithCase(b *testing.B, casTest *hashJoinTestCase) {
b.Fatal(err)
}
b.StopTimer()
if exec.rowContainer.alreadySpilled() != casTest.disk {
b.Fatal("wrong usage with disk")
}
}
}

func BenchmarkHashJoinExec(b *testing.B) {
lvl := log.GetLevel()
log.SetLevel(zapcore.ErrorLevel)
defer log.SetLevel(lvl)

b.ReportAllocs()
cas := defaultHashJoinTestCase()
b.Run(fmt.Sprintf("%v", cas), func(b *testing.B) {
Expand All @@ -634,6 +651,19 @@ func BenchmarkHashJoinExec(b *testing.B) {
b.Run(fmt.Sprintf("%v", cas), func(b *testing.B) {
benchmarkHashJoinExecWithCase(b, cas)
})

cas.keyIdx = []int{0}
cas.disk = true
b.Run(fmt.Sprintf("%v", cas), func(b *testing.B) {
benchmarkHashJoinExecWithCase(b, cas)
})

cas.keyIdx = []int{0}
cas.disk = true
cas.rows = 1000
b.Run(fmt.Sprintf("%v", cas), func(b *testing.B) {
benchmarkHashJoinExecWithCase(b, cas)
})
}

func benchmarkBuildHashTableForList(b *testing.B, casTest *hashJoinTestCase) {
Expand All @@ -656,16 +686,16 @@ func benchmarkBuildHashTableForList(b *testing.B, casTest *hashJoinTestCase) {
dataSource2 := buildMockDataSource(opt)

dataSource1.prepareChunks()
exec := prepare4Join(casTest, dataSource1, dataSource2)
tmpCtx := context.Background()
if err := exec.Open(tmpCtx); err != nil {
b.Fatal(err)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
b.StopTimer()
exec.rowContainer = nil
exec.memTracker = memory.NewTracker(exec.id, exec.ctx.GetSessionVars().MemQuotaHashJoin)
exec := prepare4Join(casTest, dataSource1, dataSource2)
tmpCtx := context.Background()
if err := exec.Open(tmpCtx); err != nil {
b.Fatal(err)
}
exec.prepared = true

innerResultCh := make(chan *chunk.Chunk, len(dataSource1.chunks))
for _, chk := range dataSource1.chunks {
innerResultCh <- chk
Expand All @@ -676,25 +706,37 @@ func benchmarkBuildHashTableForList(b *testing.B, casTest *hashJoinTestCase) {
if err := exec.buildHashTableForList(innerResultCh); err != nil {
b.Fatal(err)
}

if err := exec.Close(); err != nil {
b.Fatal(err)
}
b.StopTimer()
if exec.rowContainer.alreadySpilled() != casTest.disk {
b.Fatal("wrong usage with disk")
}
}
}

func BenchmarkBuildHashTableForList(b *testing.B) {
lvl := log.GetLevel()
log.SetLevel(zapcore.ErrorLevel)
defer log.SetLevel(lvl)

b.ReportAllocs()
cas := defaultHashJoinTestCase()
b.Run(fmt.Sprintf("%v", cas), func(b *testing.B) {
benchmarkBuildHashTableForList(b, cas)
})

cas.keyIdx = []int{0}
b.Run(fmt.Sprintf("%v", cas), func(b *testing.B) {
benchmarkBuildHashTableForList(b, cas)
})

cas.keyIdx = []int{0}
cas.rows = 10
b.Run(fmt.Sprintf("%v", cas), func(b *testing.B) {
benchmarkBuildHashTableForList(b, cas)
})
rows := []int{10, 100000}
keyIdxs := [][]int{{0, 1}, {0}}
disks := []bool{false, true}
for _, row := range rows {
for _, keyIdx := range keyIdxs {
for _, disk := range disks {
cas.rows = row
cas.keyIdx = keyIdx
cas.disk = disk
b.Run(fmt.Sprintf("%v", cas), func(b *testing.B) {
benchmarkBuildHashTableForList(b, cas)
})
}
}
}
}