Skip to content

Commit

Permalink
executor: add index_merge_intersection cases (#39323)
Browse files Browse the repository at this point in the history
  • Loading branch information
guo-shaoge committed Nov 30, 2022
1 parent 328aef8 commit 40f0bd5
Show file tree
Hide file tree
Showing 2 changed files with 224 additions and 0 deletions.
212 changes: 212 additions & 0 deletions executor/index_merge_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ import (
"testing"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/testkit/testutil"
"github.com/pingcap/tidb/util"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -242,32 +244,52 @@ func TestIndexMergeInTransaction(t *testing.T) {
"├─IndexRangeScan_6(Build) 3323.33 cop[tikv] table:t1, index:c2(c2) range:[-inf,10), keep order:false, stats:pseudo",
"└─Selection_8(Probe) 1106.67 cop[tikv] lt(test.t1.c3, 10)",
" └─TableRowIDScan_7 3330.01 cop[tikv] table:t1 keep order:false, stats:pseudo"))
tk.MustQuery("explain select /*+ use_index_merge(t1, c1, c2, c3) */ * from t1 where c1 < 10 and c2 < 10 and c3 < 10;").Check(testkit.Rows(
"IndexMerge_9 367.05 root type: intersection",
"├─IndexRangeScan_5(Build) 3323.33 cop[tikv] table:t1, index:c1(c1) range:[-inf,10), keep order:false, stats:pseudo",
"├─IndexRangeScan_6(Build) 3323.33 cop[tikv] table:t1, index:c2(c2) range:[-inf,10), keep order:false, stats:pseudo",
"├─IndexRangeScan_7(Build) 3323.33 cop[tikv] table:t1, index:c3(c3) range:[-inf,10), keep order:false, stats:pseudo",
"└─TableRowIDScan_8(Probe) 367.05 cop[tikv] table:t1 keep order:false, stats:pseudo"))

// Test with normal key.
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < -1) and c3 < 10;").Check(testkit.Rows())
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < -1 or c2 < 10) and c3 < 10;").Check(testkit.Rows())
tk.MustQuery("select /*+ use_index_merge(t1, c1, c2, c3) */ * from t1 where (c1 < 10 and c2 < -1) and c3 < 10;").Check(testkit.Rows())
tk.MustQuery("select /*+ use_index_merge(t1, c1, c2, c3) */ * from t1 where (c1 < -1 and c2 < 10) and c3 < 10;").Check(testkit.Rows())

tk.MustExec("insert into t1 values(1, 1, 1, 1);")
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < -1) and c3 < 10;").Check(testkit.Rows("1 1 1 1"))
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < -1 or c2 < 10) and c3 < 10;").Check(testkit.Rows("1 1 1 1"))
tk.MustQuery("select /*+ use_index_merge(t1, c1, c2, c3) */ * from t1 where (c1 < 10 and c2 < 10) and c3 < 10;").Check(testkit.Rows("1 1 1 1"))
tk.MustQuery("select /*+ use_index_merge(t1, c1, c2, c3) */ * from t1 where (c1 < 10 and c2 < 10) and c3 > 10;").Check(testkit.Rows())

tk.MustExec("update t1 set c3 = 100 where c3 = 1;")
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < -1) and c3 < 10;").Check(testkit.Rows())
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < -1 or c2 < 10) and c3 < 10;").Check(testkit.Rows())
tk.MustQuery("select /*+ use_index_merge(t1, c1, c2, c3) */ * from t1 where (c1 < 10 and c2 < 10) and c3 > 10;").Check(testkit.Rows("1 1 100 1"))

tk.MustExec("delete from t1;")
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < -1) and c3 < 10;").Check(testkit.Rows())
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < -1 or c2 < 10) and c3 < 10;").Check(testkit.Rows())
tk.MustQuery("select /*+ use_index_merge(t1, c1, c2, c3) */ * from t1 where (c1 < 10 and c2 < 10) and c3 > 10;").Check(testkit.Rows())

// Test with primary key, so the partialPlan is TableScan.
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < -1 or c2 < 10) and c3 < 10;").Check(testkit.Rows())
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < 10 or c2 < -1) and c3 < 10;").Check(testkit.Rows())
tk.MustQuery("select /*+ use_index_merge(t1, c2, c3, primary) */ * from t1 where (pk < -1 and c2 < 10) and c3 < 10;").Check(testkit.Rows())
tk.MustQuery("select /*+ use_index_merge(t1, c2, c3, primary) */ * from t1 where (pk < 10 and c2 < -1) and c3 < 10;").Check(testkit.Rows())
tk.MustExec("insert into t1 values(1, 1, 1, 1);")
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < -1 or c2 < 10) and c3 < 10;").Check(testkit.Rows("1 1 1 1"))
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < 10 or c2 < -1) and c3 < 10;").Check(testkit.Rows("1 1 1 1"))
tk.MustQuery("select /*+ use_index_merge(t1, c2, c3, primary) */ * from t1 where (pk < 10 and c2 < 10) and c3 < 10;").Check(testkit.Rows("1 1 1 1"))
tk.MustExec("update t1 set c3 = 100 where c3 = 1;")
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < -1 or c2 < 10) and c3 < 10;").Check(testkit.Rows())
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < 10 or c2 < -1) and c3 < 10;").Check(testkit.Rows())
tk.MustQuery("select /*+ use_index_merge(t1, c2, c3, primary) */ * from t1 where (pk < 10 and c2 < 10) and c3 > 10;").Check(testkit.Rows("1 1 100 1"))
tk.MustExec("delete from t1;")
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < -1 or c2 < 10) and c3 < 10;").Check(testkit.Rows())
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < 10 or c2 < -1) and c3 < 10;").Check(testkit.Rows())
tk.MustQuery("select /*+ use_index_merge(t1, c2, c3, primary) */ * from t1 where (pk < 10 and c2 < 10) and c3 > 10;").Check(testkit.Rows())

tk.MustExec("commit;")
if i == 1 {
Expand Down Expand Up @@ -566,3 +588,193 @@ func TestPessimisticLockOnPartitionForIndexMerge(t *testing.T) {

// TODO: add support for index merge reader in dynamic tidb_partition_prune_mode
}

func TestIndexMergeIntersectionConcurrency(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)

tk.MustExec("use test")
tk.MustExec("drop table if exists t1")
tk.MustExec("create table t1(c1 int, c2 bigint, c3 bigint, primary key(c1), key(c2), key(c3)) partition by hash(c1) partitions 10;")
tk.MustExec("insert into t1 values(1, 1, 3000), (2, 1, 1)")
tk.MustExec("analyze table t1;")
tk.MustExec("set tidb_partition_prune_mode = 'dynamic'")
res := tk.MustQuery("explain select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c2 < 1024 and c3 > 1024").Rows()
require.Contains(t, res[1][0], "IndexMerge")

// Default is tidb_executor_concurrency.
res = tk.MustQuery("select @@tidb_executor_concurrency;").Sort().Rows()
defExecCon := res[0][0].(string)
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testIndexMergeIntersectionConcurrency", fmt.Sprintf("return(%s)", defExecCon)))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/testIndexMergeIntersectionConcurrency"))
}()
tk.MustQuery("select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c2 < 1024 and c3 > 1024").Check(testkit.Rows("1"))

tk.MustExec("set tidb_executor_concurrency = 10")
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testIndexMergeIntersectionConcurrency", "return(10)"))
tk.MustQuery("select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c2 < 1024 and c3 > 1024").Check(testkit.Rows("1"))
// workerCnt = min(part_num, concurrency)
tk.MustExec("set tidb_executor_concurrency = 20")
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testIndexMergeIntersectionConcurrency", "return(10)"))
tk.MustQuery("select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c2 < 1024 and c3 > 1024").Check(testkit.Rows("1"))
tk.MustExec("set tidb_executor_concurrency = 2")
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testIndexMergeIntersectionConcurrency", "return(2)"))
tk.MustQuery("select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c2 < 1024 and c3 > 1024").Check(testkit.Rows("1"))

tk.MustExec("set tidb_index_merge_intersection_concurrency = 9")
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testIndexMergeIntersectionConcurrency", "return(9)"))
tk.MustQuery("select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c2 < 1024 and c3 > 1024").Check(testkit.Rows("1"))
tk.MustExec("set tidb_index_merge_intersection_concurrency = 21")
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testIndexMergeIntersectionConcurrency", "return(10)"))
tk.MustQuery("select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c2 < 1024 and c3 > 1024").Check(testkit.Rows("1"))
tk.MustExec("set tidb_index_merge_intersection_concurrency = 3")
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testIndexMergeIntersectionConcurrency", "return(3)"))
tk.MustQuery("select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c2 < 1024 and c3 > 1024").Check(testkit.Rows("1"))

// Concurrency only works for dynamic pruning partition table, so real concurrency is 1.
tk.MustExec("set tidb_partition_prune_mode = 'static'")
tk.MustExec("set tidb_index_merge_intersection_concurrency = 9")
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testIndexMergeIntersectionConcurrency", "return(1)"))
tk.MustQuery("select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c2 < 1024 and c3 > 1024").Check(testkit.Rows("1"))

// Concurrency only works for dynamic pruning partition table. so real concurrency is 1.
tk.MustExec("drop table if exists t1")
tk.MustExec("create table t1(c1 int, c2 bigint, c3 bigint, primary key(c1), key(c2), key(c3));")
tk.MustExec("insert into t1 values(1, 1, 3000), (2, 1, 1)")
tk.MustExec("set tidb_index_merge_intersection_concurrency = 9")
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testIndexMergeIntersectionConcurrency", "return(1)"))
tk.MustQuery("select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c2 < 1024 and c3 > 1024").Check(testkit.Rows("1"))
}

func TestIntersectionWithDifferentConcurrency(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)

var execCon []int
tblSchemas := []string{
// partition table
"create table t1(c1 int, c2 bigint, c3 bigint, primary key(c1), key(c2), key(c3)) partition by hash(c1) partitions 10;",
// non-partition table
"create table t1(c1 int, c2 bigint, c3 bigint, primary key(c1), key(c2), key(c3));",
}

for tblIdx, tblSchema := range tblSchemas {
if tblIdx == 0 {
// Test different intersectionProcessWorker with partition table(10 partitions).
execCon = []int{1, 3, 10, 11, 20}
} else {
// Default concurrency.
execCon = []int{5}
}
tk.MustExec("use test")
tk.MustExec("drop table if exists t1;")
tk.MustExec(tblSchema)

const queryCnt int = 10
const rowCnt int = 1000
curRowCnt := 0
insertStr := "insert into t1 values"
for i := 0; i < rowCnt; i++ {
if i != 0 {
insertStr += ", "
}
insertStr += fmt.Sprintf("(%d, %d, %d)", i, rand.Int(), rand.Int())
curRowCnt++
}
tk.MustExec(insertStr)
tk.MustExec("analyze table t1")

for _, concurrency := range execCon {
tk.MustExec(fmt.Sprintf("set tidb_executor_concurrency = %d", concurrency))
for i := 0; i < 2; i++ {
if i == 0 {
// Dynamic mode.
tk.MustExec("set tidb_partition_prune_mode = 'dynamic'")
res := tk.MustQuery("explain select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c2 < 1024 and c3 > 1024")
require.Contains(t, res.Rows()[1][0], "IndexMerge")
} else {
tk.MustExec("set tidb_partition_prune_mode = 'static'")
res := tk.MustQuery("explain select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c2 < 1024 and c3 > 1024")
if tblIdx == 0 {
// partition table
require.Contains(t, res.Rows()[1][0], "PartitionUnion")
require.Contains(t, res.Rows()[2][0], "IndexMerge")
} else {
require.Contains(t, res.Rows()[1][0], "IndexMerge")
}
}
for i := 0; i < queryCnt; i++ {
c3 := rand.Intn(1024)
res := tk.MustQuery(fmt.Sprintf("select /*+ no_index_merge() */ c1 from t1 where c2 < 1024 and c3 > %d", c3)).Sort().Rows()
tk.MustQuery(fmt.Sprintf("select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c2 < 1024 and c3 > %d", c3)).Sort().Check(res)
}

// In tranaction
for i := 0; i < queryCnt; i++ {
tk.MustExec("begin;")
r := rand.Intn(3)
if r == 0 {
tk.MustExec(fmt.Sprintf("update t1 set c3 = %d where c1 = %d", rand.Int(), rand.Intn(rowCnt)))
} else if r == 1 {
tk.MustExec(fmt.Sprintf("delete from t1 where c1 = %d", rand.Intn(rowCnt)))
} else if r == 2 {
tk.MustExec(fmt.Sprintf("insert into t1 values(%d, %d, %d)", curRowCnt, rand.Int(), rand.Int()))
curRowCnt++
}
c3 := rand.Intn(1024)
res := tk.MustQuery(fmt.Sprintf("select /*+ no_index_merge() */ c1 from t1 where c2 < 1024 and c3 > %d", c3)).Sort().Rows()
tk.MustQuery(fmt.Sprintf("select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c2 < 1024 and c3 > %d", c3)).Sort().Check(res)
tk.MustExec("commit;")
}
}
}
tk.MustExec("drop table t1")
}
}

func TestIntersectionWorkerPanic(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)

tk.MustExec("use test")
tk.MustExec("drop table if exists t1")
tk.MustExec("create table t1(c1 int, c2 bigint, c3 bigint, primary key(c1), key(c2), key(c3)) partition by hash(c1) partitions 10;")
tk.MustExec("insert into t1 values(1, 1, 3000), (2, 1, 1)")
tk.MustExec("analyze table t1;")
tk.MustExec("set tidb_partition_prune_mode = 'dynamic'")
res := tk.MustQuery("explain select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c2 < 1024 and c3 > 1024").Rows()
require.Contains(t, res[1][0], "IndexMerge")

// Test panic in intersection.
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testIndexMergeIntersectionWorkerPanic", "panic"))
err := tk.QueryToErr("select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c2 < 1024 and c3 > 1024")
require.Contains(t, err.Error(), "IndexMergeReaderExecutor")
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/testIndexMergeIntersectionWorkerPanic"))
}

func TestIntersectionMemQuota(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)

tk.MustExec("use test")
tk.MustExec("drop table if exists t1")
tk.MustExec("create table t1(pk varchar(100) primary key, c1 int, c2 int, index idx1(c1), index idx2(c2))")

insertStr := "insert into t1 values"
for i := 0; i < 20; i++ {
if i != 0 {
insertStr += ", "
}
insertStr += fmt.Sprintf("('%s', %d, %d)", testutil.RandStringRunes(100), 1, 1)
}
tk.MustExec(insertStr)
res := tk.MustQuery("explain select /*+ use_index_merge(t1, primary, idx1, idx2) */ c1 from t1 where c1 < 1024 and c2 < 1024").Rows()
require.Contains(t, res[1][0], "IndexMerge")

tk.MustExec("set global tidb_mem_oom_action='CANCEL'")
defer tk.MustExec("set global tidb_mem_oom_action = DEFAULT")
tk.MustExec("set @@tidb_mem_quota_query = 4000")
err := tk.QueryToErr("select /*+ use_index_merge(t1, primary, idx1, idx2) */ c1 from t1 where c1 < 1024 and c2 < 1024")
require.Contains(t, err.Error(), "Out Of Memory Quota!")
}
12 changes: 12 additions & 0 deletions testkit/testutil/require.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package testutil

import (
"math/rand"
"testing"

"github.com/pingcap/tidb/kv"
Expand Down Expand Up @@ -75,3 +76,14 @@ func CompareUnorderedStringSlice(a []string, b []string) bool {
}
return len(m) == 0
}

var letterRunes = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")

// RandStringRunes generate random string of length n.
func RandStringRunes(n int) string {
b := make([]rune, n)
for i := range b {
b[i] = letterRunes[rand.Intn(len(letterRunes))]
}
return string(b)
}

0 comments on commit 40f0bd5

Please sign in to comment.