Skip to content

Commit

Permalink
# This is a combination of 6 commits.
Browse files Browse the repository at this point in the history
# This is the 1st commit message:

fixup

# This is the commit message #2:

fixup

# This is the commit message pingcap#3:

fixup

# This is the commit message pingcap#4:

fixup

# This is the commit message pingcap#5:

fixup

# This is the commit message pingcap#6:

fixup
  • Loading branch information
qw4990 committed Jun 21, 2023
1 parent ce99d5d commit b6465f0
Show file tree
Hide file tree
Showing 6 changed files with 238 additions and 47 deletions.
22 changes: 1 addition & 21 deletions planner/core/find_best_task.go
Expand Up @@ -1129,7 +1129,7 @@ func (ds *DataSource) findBestTask(prop *property.PhysicalProperty, planCounter

// Batch/PointGet plans may be over-optimized, like `a>=1(?) and a<=1(?)` --> `a=1` --> PointGet(a=1).
// For safety, prevent these plans from the plan cache here.
if !pointGetTask.invalid() && expression.MaybeOverOptimized4PlanCache(ds.ctx, candidate.path.AccessConds) && !ds.isSafePointGetPlan4PlanCache(candidate.path) {
if !pointGetTask.invalid() && expression.MaybeOverOptimized4PlanCache(ds.ctx, candidate.path.AccessConds) && !isSafePointGetPath4PlanCache(candidate.path) {
ds.ctx.GetSessionVars().StmtCtx.SetSkipPlanCache(errors.New("Batch/PointGet plans may be over-optimized"))
}

Expand Down Expand Up @@ -1212,26 +1212,6 @@ func (ds *DataSource) findBestTask(prop *property.PhysicalProperty, planCounter
return
}

func (*DataSource) isSafePointGetPlan4PlanCache(path *util.AccessPath) bool {
// PointGet might contain some over-optimized assumptions, like `a>=1 and a<=1` --> `a=1`, but
// these assumptions may be broken after parameters change.

// safe scenario 1: each column corresponds to a single EQ, `a=1 and b=2 and c=3` --> `[1, 2, 3]`
if len(path.Ranges) > 0 && path.Ranges[0].Width() == len(path.AccessConds) {
for _, accessCond := range path.AccessConds {
f, ok := accessCond.(*expression.ScalarFunction)
if !ok {
return false
}
if f.FuncName.L != ast.EQ {
return false
}
}
return true
}
return false
}

func (ds *DataSource) convertToIndexMergeScan(prop *property.PhysicalProperty, candidate *candidatePath, _ *physicalOptimizeOp) (task task, err error) {
if prop.IsFlashProp() || prop.TaskTp == property.CopSingleReadTaskType || !prop.IsSortItemEmpty() {
return invalidTask, nil
Expand Down
84 changes: 81 additions & 3 deletions planner/core/plan_cache.go
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/mysql"
core_metrics "github.com/pingcap/tidb/planner/core/metrics"
"github.com/pingcap/tidb/planner/util"
"github.com/pingcap/tidb/planner/util/debugtrace"
"github.com/pingcap/tidb/privilege"
"github.com/pingcap/tidb/sessionctx"
Expand Down Expand Up @@ -442,7 +443,7 @@ func rebuildRange(p Plan) error {
if err != nil {
return err
}
if !isSafeRange(x.AccessConditions, ranges, false, nil) {
if len(ranges.Ranges) != 1 || !isSafeRange(x.AccessConditions, ranges, false, nil) {
return errors.New("rebuild to get an unsafe range")
}
for i := range x.IndexValues {
Expand All @@ -464,7 +465,7 @@ func rebuildRange(p Plan) error {
if err != nil {
return err
}
if !isSafeRange(x.AccessConditions, &ranger.DetachRangeResult{
if len(ranges) != 1 || !isSafeRange(x.AccessConditions, &ranger.DetachRangeResult{
Ranges: ranges,
AccessConds: accessConds,
RemainedConds: remainingConds,
Expand Down Expand Up @@ -533,7 +534,7 @@ func rebuildRange(p Plan) error {
if err != nil {
return err
}
if len(ranges) != len(x.Handles) && !isSafeRange(x.AccessConditions, &ranger.DetachRangeResult{
if len(ranges) != len(x.Handles) || !isSafeRange(x.AccessConditions, &ranger.DetachRangeResult{
Ranges: ranges,
AccessConds: accessConds,
RemainedConds: remainingConds,
Expand Down Expand Up @@ -847,3 +848,80 @@ func IsPointPlanShortPathOK(sctx sessionctx.Context, is infoschema.InfoSchema, s
}
return ok, err
}

func isSafePointGetPath4PlanCache(path *util.AccessPath) bool {
// PointGet might contain some over-optimized assumptions, like `a>=1 and a<=1` --> `a=1`, but
// these assumptions may be broken after parameters change.

return isSafePointGetPath4PlanCacheScenario1(path) ||
isSafePointGetPath4PlanCacheScenario2(path) ||
isSafePointGetPath4PlanCacheScenario3(path)
}

func isSafePointGetPath4PlanCacheScenario1(path *util.AccessPath) bool {
// safe scenario 1: each column corresponds to a single EQ, `a=1 and b=2 and c=3` --> `[1, 2, 3]`
if len(path.Ranges) <= 0 || path.Ranges[0].Width() != len(path.AccessConds) {
return false
}
for _, accessCond := range path.AccessConds {
f, ok := accessCond.(*expression.ScalarFunction)
if !ok || f.FuncName.L != ast.EQ { // column = constant
return false
}
}
return true
}

func isSafePointGetPath4PlanCacheScenario2(path *util.AccessPath) bool {
// safe scenario 2: this Batch or PointGet is simply from a single IN predicate, `key in (...)`
if len(path.Ranges) <= 0 || len(path.AccessConds) != 1 {
return false
}
f, ok := path.AccessConds[0].(*expression.ScalarFunction)
if !ok || f.FuncName.L != ast.In {
return false
}
return len(path.Ranges) == len(f.GetArgs())-1 // no duplicated values in this in-list for safety.
}

func isSafePointGetPath4PlanCacheScenario3(path *util.AccessPath) bool {
// safe scenario 3: this Batch or PointGet is simply from a simple DNF like `key=? or key=? or key=?`
if len(path.Ranges) <= 0 || len(path.AccessConds) != 1 {
return false
}
f, ok := path.AccessConds[0].(*expression.ScalarFunction)
if !ok || f.FuncName.L != ast.LogicOr {
return false
}

dnfExprs := expression.FlattenDNFConditions(f)
if len(path.Ranges) != len(dnfExprs) {
// no duplicated values in this in-list for safety.
// e.g. `k=1 or k=2 or k=1` --> [[1, 1], [2, 2]]
return false
}

for _, expr := range dnfExprs {
f, ok := expr.(*expression.ScalarFunction)
if !ok {
return false
}
switch f.FuncName.L {
case ast.EQ: // (k=1 or k=2) --> [k=1, k=2]
case ast.LogicAnd: // ((k1=1 and k2=1) or (k1=2 and k2=2)) --> [k1=1 and k2=1, k2=2 and k2=2]
cnfExprs := expression.FlattenCNFConditions(f)
if path.Ranges[0].Width() != len(cnfExprs) { // not all key columns are specified
return false
}
for _, expr := range cnfExprs { // k1=1 and k2=1
f, ok := expr.(*expression.ScalarFunction)
if !ok || f.FuncName.L != ast.EQ {
return false
}
}
default:
return false
}
}
return true
}
70 changes: 57 additions & 13 deletions planner/core/plan_cache_test.go
Expand Up @@ -72,6 +72,49 @@ func TestIssue43311(t *testing.T) {
tk.MustQuery(`execute st using @a, @b`).Check(testkit.Rows()) // empty
}

func TestIssue44830(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec(`use test`)
tk.MustExec(`create table t (a int, primary key(a))`)
tk.MustExec(`create table t1 (a int, b int, primary key(a, b))`) // multiple-column primary key
tk.MustExec(`insert into t values (1), (2), (3)`)
tk.MustExec(`insert into t1 values (1, 1), (2, 2), (3, 3)`)
tk.MustExec(`set @a=1, @b=2, @c=3`)

// single-column primary key cases
tk.MustExec(`prepare st from 'select * from t where 1=1 and a in (?, ?, ?)'`)
tk.MustQuery(`execute st using @a, @b, @c`).Sort().Check(testkit.Rows("1", "2", "3"))
tk.MustQuery(`execute st using @a, @b, @c`).Sort().Check(testkit.Rows("1", "2", "3"))
tk.MustQuery(`select @@last_plan_from_cache`).Check(testkit.Rows("1"))
tk.MustQuery(`execute st using @a, @b, @b`).Sort().Check(testkit.Rows("1", "2"))
tk.MustQuery(`select @@last_plan_from_cache`).Check(testkit.Rows("0")) // range length changed
tk.MustQuery(`execute st using @b, @b, @b`).Sort().Check(testkit.Rows("2"))
tk.MustQuery(`select @@last_plan_from_cache`).Check(testkit.Rows("0")) // range length changed
tk.MustQuery(`execute st using @a, @b, @c`).Sort().Check(testkit.Rows("1", "2", "3"))
tk.MustQuery(`execute st using @a, @b, @b`).Sort().Check(testkit.Rows("1", "2"))
tk.MustQuery(`execute st using @a, @b, @b`).Sort().Check(testkit.Rows("1", "2"))
tk.MustQuery(`select @@last_plan_from_cache`).Check(testkit.Rows("0")) // contain duplicated values in the in-list

// multi-column primary key cases
tk.MustExec(`prepare st from 'select * from t1 where 1=1 and (a, b) in ((?, ?), (?, ?), (?, ?))'`)
tk.MustQuery(`execute st using @a, @a, @b, @b, @c, @c`).Sort().Check(testkit.Rows("1 1", "2 2", "3 3"))
tk.MustQuery(`execute st using @a, @a, @b, @b, @c, @c`).Sort().Check(testkit.Rows("1 1", "2 2", "3 3"))
tk.MustQuery(`select @@last_plan_from_cache`).Check(testkit.Rows("1"))
tk.MustQuery(`execute st using @a, @a, @b, @b, @b, @b`).Sort().Check(testkit.Rows("1 1", "2 2"))
tk.MustQuery(`select @@last_plan_from_cache`).Check(testkit.Rows("0")) // range length changed
tk.MustQuery(`execute st using @b, @b, @b, @b, @b, @b`).Sort().Check(testkit.Rows("2 2"))
tk.MustQuery(`select @@last_plan_from_cache`).Check(testkit.Rows("0")) // range length changed
tk.MustQuery(`execute st using @b, @b, @b, @b, @c, @c`).Sort().Check(testkit.Rows("2 2", "3 3"))
tk.MustQuery(`select @@last_plan_from_cache`).Check(testkit.Rows("0")) // range length changed
tk.MustQuery(`execute st using @a, @a, @a, @a, @a, @a`).Sort().Check(testkit.Rows("1 1"))
tk.MustQuery(`execute st using @a, @a, @a, @a, @a, @a`).Sort().Check(testkit.Rows("1 1"))
tk.MustQuery(`select @@last_plan_from_cache`).Check(testkit.Rows("0")) // contain duplicated values in the in-list
tk.MustQuery(`execute st using @a, @a, @b, @b, @b, @b`).Sort().Check(testkit.Rows("1 1", "2 2"))
tk.MustQuery(`execute st using @a, @a, @b, @b, @b, @b`).Sort().Check(testkit.Rows("1 1", "2 2"))
tk.MustQuery(`select @@last_plan_from_cache`).Check(testkit.Rows("0")) // contain duplicated values in the in-list
}

func TestPlanCacheSizeSwitch(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
Expand Down Expand Up @@ -638,24 +681,25 @@ func TestPreparedPlanCacheLongInList(t *testing.T) {
return "(" + strings.Join(elements, ",") + ")"
}

tk.MustExec(fmt.Sprintf(`prepare st_99 from 'select * from t where a in %v'`, genInList(99)))
tk.MustExec(`execute st_99`)
tk.MustExec(`execute st_99`)
// the limitation is 200
tk.MustExec(fmt.Sprintf(`prepare st_199 from 'select * from t where a in %v'`, genInList(199)))
tk.MustExec(`execute st_199`)
tk.MustExec(`execute st_199`)
tk.MustQuery(`select @@last_plan_from_cache`).Check(testkit.Rows("1"))

tk.MustExec(fmt.Sprintf(`prepare st_101 from 'select * from t where a in %v'`, genInList(101)))
tk.MustExec(`execute st_101`)
tk.MustExec(`execute st_101`)
tk.MustExec(fmt.Sprintf(`prepare st_201 from 'select * from t where a in %v'`, genInList(201)))
tk.MustExec(`execute st_201`)
tk.MustExec(`execute st_201`)
tk.MustQuery(`select @@last_plan_from_cache`).Check(testkit.Rows("0"))

tk.MustExec(fmt.Sprintf(`prepare st_49_50 from 'select * from t where a in %v and b in %v'`, genInList(49), genInList(50)))
tk.MustExec(`execute st_49_50`)
tk.MustExec(`execute st_49_50`)
tk.MustExec(fmt.Sprintf(`prepare st_99_100 from 'select * from t where a in %v and b in %v'`, genInList(99), genInList(100)))
tk.MustExec(`execute st_99_100`)
tk.MustExec(`execute st_99_100`)
tk.MustQuery(`select @@last_plan_from_cache`).Check(testkit.Rows("1"))

tk.MustExec(fmt.Sprintf(`prepare st_49_52 from 'select * from t where a in %v and b in %v'`, genInList(49), genInList(52)))
tk.MustExec(`execute st_49_52`)
tk.MustExec(`execute st_49_52`)
tk.MustExec(fmt.Sprintf(`prepare st_100_101 from 'select * from t where a in %v and b in %v'`, genInList(100), genInList(101)))
tk.MustExec(`execute st_100_101`)
tk.MustExec(`execute st_100_101`)
tk.MustQuery(`select @@last_plan_from_cache`).Check(testkit.Rows("0"))
}

Expand Down Expand Up @@ -1210,7 +1254,7 @@ func TestLongInsertStmt(t *testing.T) {
tk.MustQuery(`select @@last_plan_from_cache`).Check(testkit.Rows("1"))

tk.MustExec(`prepare inert201 from 'insert into t values (1)` + strings.Repeat(", (1)", 200) + "'")
tk.MustQuery("show warnings").Check(testkit.Rows("Warning 1105 skip prepared plan-cache: too many values (more than 200) in the insert statement"))
tk.MustQuery("show warnings").Check(testkit.Rows("Warning 1105 skip prepared plan-cache: too many values in the insert statement"))
tk.MustExec(`execute inert201`)
tk.MustExec(`execute inert201`)
tk.MustQuery(`select @@last_plan_from_cache`).Check(testkit.Rows("0"))
Expand Down
49 changes: 39 additions & 10 deletions planner/core/plan_cacheable_checker.go
Expand Up @@ -16,6 +16,8 @@ package core

import (
"fmt"
"math"
"strconv"
"sync"

"github.com/pingcap/tidb/expression"
Expand All @@ -26,6 +28,7 @@ import (
"github.com/pingcap/tidb/parser/mysql"
core_metrics "github.com/pingcap/tidb/planner/core/metrics"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/types"
driver "github.com/pingcap/tidb/types/parser_driver"
"github.com/pingcap/tidb/util/filter"
Expand Down Expand Up @@ -56,6 +59,7 @@ func CacheableWithCtx(sctx sessionctx.Context, node ast.Node, is infoschema.Info
cacheable: true,
schema: is,
sumInListLen: 0,
maxNumParam: getMaxParamLimit(sctx),
}
node.Accept(&checker)
return checker.cacheable, checker.reason
Expand All @@ -69,6 +73,7 @@ type cacheableChecker struct {
reason string // reason why cannot use plan-cache

sumInListLen int // the accumulated number of elements in all in-lists
maxNumParam int
}

// Enter implements Visitor interface.
Expand Down Expand Up @@ -105,9 +110,9 @@ func (checker *cacheableChecker) Enter(in ast.Node) (out ast.Node, skipChildren
if len(node.Lists) > 0 { // avoid index-out-of-range
nCols = len(node.Lists[0])
}
if nRows*nCols > 200 { // to save memory
if nRows*nCols > checker.maxNumParam { // to save memory
checker.cacheable = false
checker.reason = "too many values (more than 200) in the insert statement"
checker.reason = "too many values in the insert statement"
return in, true
}
}
Expand All @@ -120,9 +125,9 @@ func (checker *cacheableChecker) Enter(in ast.Node) (out ast.Node, skipChildren
}
case *ast.PatternInExpr:
checker.sumInListLen += len(node.List)
if checker.sumInListLen > 100 { // to save memory
if checker.sumInListLen > checker.maxNumParam { // to save memory
checker.cacheable = false
checker.reason = "too many values in in-list (more than 100)"
checker.reason = "too many values in in-list"
return in, true
}
case *ast.VariableExpr:
Expand Down Expand Up @@ -223,6 +228,7 @@ func NonPreparedPlanCacheableWithCtx(sctx sessionctx.Context, node ast.Node, is
return false, "not a SELECT statement"
}

maxNumParam := getMaxParamLimit(sctx)
var tableNames []*ast.TableName
switch x := node.(type) {
case *ast.SelectStmt:
Expand Down Expand Up @@ -251,8 +257,8 @@ func NonPreparedPlanCacheableWithCtx(sctx sessionctx.Context, node ast.Node, is
if len(x.Lists) > 0 { // avoid index-out-of-range
nCols = len(x.Lists[0])
}
if nRows*nCols > 200 { // to save memory
return false, "too many values (more than 200) in the insert statement"
if nRows*nCols > maxNumParam { // to save memory
return false, "too many values in the insert statement"
}
tableNames, ok, reason = extractTableNames(x.Table.TableRefs, tableNames)
if !ok {
Expand Down Expand Up @@ -289,7 +295,7 @@ func NonPreparedPlanCacheableWithCtx(sctx sessionctx.Context, node ast.Node, is

// allocate and init the checker
checker := nonPrepCacheCheckerPool.Get().(*nonPreparedPlanCacheableChecker)
checker.reset(sctx, is, tableNames)
checker.reset(sctx, is, tableNames, maxNumParam)

node.Accept(checker)
cacheable, reason := checker.cacheable, checker.reason
Expand Down Expand Up @@ -382,16 +388,19 @@ type nonPreparedPlanCacheableChecker struct {

constCnt int // the number of constants/parameters in this query
filterCnt int // the number of filters in the current node

maxNumberParam int // the maximum number of parameters for a query to be cached.
}

func (checker *nonPreparedPlanCacheableChecker) reset(sctx sessionctx.Context, schema infoschema.InfoSchema, tableNodes []*ast.TableName) {
func (checker *nonPreparedPlanCacheableChecker) reset(sctx sessionctx.Context, schema infoschema.InfoSchema, tableNodes []*ast.TableName, maxNumberParam int) {
checker.sctx = sctx
checker.cacheable = true
checker.schema = schema
checker.reason = ""
checker.tableNodes = tableNodes
checker.constCnt = 0
checker.filterCnt = 0
checker.maxNumberParam = maxNumberParam
}

// Enter implements Visitor interface.
Expand Down Expand Up @@ -458,9 +467,9 @@ func (checker *nonPreparedPlanCacheableChecker) Enter(in ast.Node) (out ast.Node
checker.reason = "query has null constants"
}
checker.constCnt++
if checker.constCnt > 200 { // just for safety and reduce memory cost
if checker.maxNumberParam > 0 && checker.constCnt > checker.maxNumberParam { // just for safety and reduce memory cost
checker.cacheable = false
checker.reason = "query has more than 200 constants"
checker.reason = "query has too many constants"
}
return in, !checker.cacheable
case *ast.GroupByClause:
Expand Down Expand Up @@ -672,3 +681,23 @@ func isPhysicalPlanCacheable(sctx sessionctx.Context, p PhysicalPlan, paramNum,
}
return true, ""
}

// getMaxParamLimit returns the maximum number of parameters for a query that can be cached in the Plan Cache.
func getMaxParamLimit(sctx sessionctx.Context) int {
v := 200
if sctx == nil || sctx.GetSessionVars() == nil || sctx.GetSessionVars().OptimizerFixControl == nil {
return v
}
if sctx.GetSessionVars().OptimizerFixControl[variable.TiDBOptFixControl44823] != "" {
n, err := strconv.Atoi(sctx.GetSessionVars().OptimizerFixControl[variable.TiDBOptFixControl44823])
if err != nil {
return v
}
if n == 0 {
v = math.MaxInt32 // no limitation
} else if n > 0 {
v = n
}
}
return v
}

0 comments on commit b6465f0

Please sign in to comment.