Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner: introduce the general plan cache #37150

Merged
merged 8 commits into from Aug 16, 2022
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion executor/prepared.go
Expand Up @@ -319,7 +319,7 @@ func (e *DeallocateExec) Next(ctx context.Context, req *chunk.Chunk) error {
return err
}
if !vars.IgnorePreparedCacheCloseStmt { // keep the plan in cache
e.ctx.PreparedPlanCache().Delete(cacheKey)
e.ctx.GetPlanCache(false).Delete(cacheKey)
}
}
vars.RemovePreparedStmt(id)
Expand Down
12 changes: 6 additions & 6 deletions executor/seqtest/prepared_test.go
Expand Up @@ -622,7 +622,7 @@ func TestPrepareDealloc(t *testing.T) {
tk.MustExec("drop table if exists prepare_test")
tk.MustExec("create table prepare_test (id int PRIMARY KEY, c1 int)")

require.Equal(t, 0, tk.Session().PreparedPlanCache().Size())
require.Equal(t, 0, tk.Session().GetPlanCache(false).Size())
tk.MustExec(`prepare stmt1 from 'select id from prepare_test'`)
tk.MustExec("execute stmt1")
tk.MustExec(`prepare stmt2 from 'select c1 from prepare_test'`)
Expand All @@ -631,28 +631,28 @@ func TestPrepareDealloc(t *testing.T) {
tk.MustExec("execute stmt3")
tk.MustExec(`prepare stmt4 from 'select * from prepare_test'`)
tk.MustExec("execute stmt4")
require.Equal(t, 3, tk.Session().PreparedPlanCache().Size())
require.Equal(t, 3, tk.Session().GetPlanCache(false).Size())

tk.MustExec("deallocate prepare stmt1")
require.Equal(t, 3, tk.Session().PreparedPlanCache().Size())
require.Equal(t, 3, tk.Session().GetPlanCache(false).Size())
tk.MustExec("deallocate prepare stmt2")
tk.MustExec("deallocate prepare stmt3")
tk.MustExec("deallocate prepare stmt4")
require.Equal(t, 0, tk.Session().PreparedPlanCache().Size())
require.Equal(t, 0, tk.Session().GetPlanCache(false).Size())

tk.MustExec(`prepare stmt1 from 'select * from prepare_test'`)
tk.MustExec(`execute stmt1`)
tk.MustExec(`prepare stmt2 from 'select * from prepare_test'`)
tk.MustExec(`execute stmt2`)
require.Equal(t, 1, tk.Session().PreparedPlanCache().Size()) // use the same cached plan since they have the same statement
require.Equal(t, 1, tk.Session().GetPlanCache(false).Size()) // use the same cached plan since they have the same statement

tk.MustExec(`drop database if exists plan_cache`)
tk.MustExec(`create database plan_cache`)
tk.MustExec(`use plan_cache`)
tk.MustExec(`create table prepare_test (id int PRIMARY KEY, c1 int)`)
tk.MustExec(`prepare stmt3 from 'select * from prepare_test'`)
tk.MustExec(`execute stmt3`)
require.Equal(t, 2, tk.Session().PreparedPlanCache().Size()) // stmt3 has different DB
require.Equal(t, 2, tk.Session().GetPlanCache(false).Size()) // stmt3 has different DB
}

func TestPreparedIssue8153(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion executor/simple.go
Expand Up @@ -1739,7 +1739,7 @@ func (e *SimpleExec) executeAdminFlushPlanCache(s *ast.AdminStmt) error {
}
now := types.NewTime(types.FromGoTime(time.Now().In(e.ctx.GetSessionVars().StmtCtx.TimeZone)), mysql.TypeTimestamp, 3)
e.ctx.GetSessionVars().LastUpdateTime4PC = now
e.ctx.PreparedPlanCache().DeleteAll()
e.ctx.GetPlanCache(false).DeleteAll()
if s.StatementScope == ast.StatementScopeInstance {
// Record the timestamp. When other sessions want to use the plan cache,
// it will check the timestamp first to decide whether the plan cache should be flushed.
Expand Down
4 changes: 4 additions & 0 deletions parser/ast/misc.go
Expand Up @@ -508,6 +508,10 @@ type ExecuteStmt struct {
BinaryArgs interface{}
PrepStmt interface{} // the corresponding prepared statement
IdxInMulti int

// FromGeneralStmt indicates whether this execute-stmt is converted from a general query.
// e.g. select * from t where a>2 --> execute 'select * from t where a>?' using 2
FromGeneralStmt bool
}

// Restore implements Node interface.
Expand Down
24 changes: 13 additions & 11 deletions planner/core/plan_cache.go
Expand Up @@ -42,7 +42,7 @@ import (
"go.uber.org/zap"
)

func planCachePreprocess(sctx sessionctx.Context, is infoschema.InfoSchema,
func planCachePreprocess(sctx sessionctx.Context, isGeneralPlanCache bool, is infoschema.InfoSchema,
stmt *PlanCacheStmt, params []expression.Expression) error {
vars := sctx.GetSessionVars()
stmtAst := stmt.PreparedAst
Expand Down Expand Up @@ -102,7 +102,7 @@ func planCachePreprocess(sctx sessionctx.Context, is infoschema.InfoSchema,
// And update lastUpdateTime to the newest one.
expiredTimeStamp4PC := domain.GetDomain(sctx).ExpiredTimeStamp4PC()
if stmtAst.UseCache && expiredTimeStamp4PC.Compare(vars.LastUpdateTime4PC) > 0 {
sctx.PreparedPlanCache().DeleteAll()
sctx.GetPlanCache(isGeneralPlanCache).DeleteAll()
stmtAst.CachedPlan = nil
vars.LastUpdateTime4PC = expiredTimeStamp4PC
}
Expand All @@ -112,9 +112,11 @@ func planCachePreprocess(sctx sessionctx.Context, is infoschema.InfoSchema,
// GetPlanFromSessionPlanCache is the entry point of Plan Cache.
// It tries to get a valid cached plan from this session's plan cache.
// If there is no such a plan, it'll call the optimizer to generate a new one.
func GetPlanFromSessionPlanCache(ctx context.Context, sctx sessionctx.Context, is infoschema.InfoSchema, stmt *PlanCacheStmt,
// isGeneralPlanCache indicates whether to use the general plan cache or the prepared plan cache.
func GetPlanFromSessionPlanCache(ctx context.Context, sctx sessionctx.Context,
isGeneralPlanCache bool, is infoschema.InfoSchema, stmt *PlanCacheStmt,
params []expression.Expression) (plan Plan, names []*types.FieldName, err error) {
if err := planCachePreprocess(sctx, is, stmt, params); err != nil {
if err := planCachePreprocess(sctx, isGeneralPlanCache, is, stmt, params); err != nil {
return nil, nil, err
}

Expand Down Expand Up @@ -154,13 +156,13 @@ func GetPlanFromSessionPlanCache(ctx context.Context, sctx sessionctx.Context, i
}

if stmtAst.UseCache && !ignorePlanCache { // for general plans
if plan, names, ok, err := getGeneralPlan(sctx, cacheKey, bindSQL, is, stmt,
if plan, names, ok, err := getGeneralPlan(sctx, isGeneralPlanCache, cacheKey, bindSQL, is, stmt,
paramTypes); err != nil || ok {
return plan, names, err
}
}

return generateNewPlan(ctx, sctx, is, stmt, ignorePlanCache, cacheKey,
return generateNewPlan(ctx, sctx, isGeneralPlanCache, is, stmt, ignorePlanCache, cacheKey,
latestSchemaVersion, paramNum, paramTypes, bindSQL)
}

Expand Down Expand Up @@ -208,13 +210,13 @@ func getPointQueryPlan(stmt *ast.Prepared, sessVars *variable.SessionVars, stmtC
return plan, names, true, nil
}

func getGeneralPlan(sctx sessionctx.Context, cacheKey kvcache.Key, bindSQL string,
func getGeneralPlan(sctx sessionctx.Context, isGeneralPlanCache bool, cacheKey kvcache.Key, bindSQL string,
is infoschema.InfoSchema, stmt *PlanCacheStmt, paramTypes []*types.FieldType) (Plan,
[]*types.FieldName, bool, error) {
sessVars := sctx.GetSessionVars()
stmtCtx := sessVars.StmtCtx

cachedVal, exist := getValidPlanFromCache(sctx, cacheKey, paramTypes)
cachedVal, exist := getValidPlanFromCache(sctx, isGeneralPlanCache, cacheKey, paramTypes)
if !exist {
return nil, nil, false, nil
}
Expand All @@ -225,7 +227,7 @@ func getGeneralPlan(sctx sessionctx.Context, cacheKey kvcache.Key, bindSQL strin
if !unionScan && tableHasDirtyContent(sctx, tblInfo) {
// TODO we can inject UnionScan into cached plan to avoid invalidating it, though
// rebuilding the filters in UnionScan is pretty trivial.
sctx.PreparedPlanCache().Delete(cacheKey)
sctx.GetPlanCache(isGeneralPlanCache).Delete(cacheKey)
return nil, nil, false, nil
}
}
Expand All @@ -251,7 +253,7 @@ func getGeneralPlan(sctx sessionctx.Context, cacheKey kvcache.Key, bindSQL strin

// generateNewPlan call the optimizer to generate a new plan for current statement
// and try to add it to cache
func generateNewPlan(ctx context.Context, sctx sessionctx.Context, is infoschema.InfoSchema, stmt *PlanCacheStmt,
func generateNewPlan(ctx context.Context, sctx sessionctx.Context, isGeneralPlanCache bool, is infoschema.InfoSchema, stmt *PlanCacheStmt,
ignorePlanCache bool, cacheKey kvcache.Key, latestSchemaVersion int64, paramNum int,
paramTypes []*types.FieldType, bindSQL string) (Plan, []*types.FieldName, error) {
stmtAst := stmt.PreparedAst
Expand Down Expand Up @@ -286,7 +288,7 @@ func generateNewPlan(ctx context.Context, sctx sessionctx.Context, is infoschema
stmt.NormalizedPlan, stmt.PlanDigest = NormalizePlan(p)
stmtCtx.SetPlan(p)
stmtCtx.SetPlanDigest(stmt.NormalizedPlan, stmt.PlanDigest)
putPlanIntoCache(sctx, cacheKey, cached)
putPlanIntoCache(sctx, isGeneralPlanCache, cacheKey, cached)
}
sessVars.FoundInPlanCache = false
return p, names, err
Expand Down
8 changes: 4 additions & 4 deletions planner/core/plan_cache_utils.go
Expand Up @@ -39,8 +39,8 @@ var (
PreparedPlanCacheMaxMemory = *atomic2.NewUint64(math.MaxUint64)
)

func getValidPlanFromCache(sctx sessionctx.Context, key kvcache.Key, paramTypes []*types.FieldType) (*PlanCacheValue, bool) {
cache := sctx.PreparedPlanCache()
func getValidPlanFromCache(sctx sessionctx.Context, isGeneralPlanCache bool, key kvcache.Key, paramTypes []*types.FieldType) (*PlanCacheValue, bool) {
cache := sctx.GetPlanCache(isGeneralPlanCache)
val, exist := cache.Get(key)
if !exist {
return nil, exist
Expand All @@ -54,8 +54,8 @@ func getValidPlanFromCache(sctx sessionctx.Context, key kvcache.Key, paramTypes
return nil, false
}

func putPlanIntoCache(sctx sessionctx.Context, key kvcache.Key, plan *PlanCacheValue) {
cache := sctx.PreparedPlanCache()
func putPlanIntoCache(sctx sessionctx.Context, isGeneralPlanCache bool, key kvcache.Key, plan *PlanCacheValue) {
cache := sctx.GetPlanCache(isGeneralPlanCache)
val, exist := cache.Get(key)
if !exist {
cache.Put(key, []*PlanCacheValue{plan})
Expand Down
2 changes: 1 addition & 1 deletion planner/core/prepare_test.go
Expand Up @@ -690,7 +690,7 @@ func TestPrepareCacheDeferredFunction(t *testing.T) {
require.True(t, ok)
err = executor.ResetContextOfStmt(tk.Session(), stmt)
require.NoError(t, err)
plan, _, err := core.GetPlanFromSessionPlanCache(ctx, tk.Session(), is, execPlan.PrepStmt, execPlan.Params)
plan, _, err := core.GetPlanFromSessionPlanCache(ctx, tk.Session(), false, is, execPlan.PrepStmt, execPlan.Params)
require.NoError(t, err)
planStr[i] = core.ToString(plan)
require.Regexpf(t, expectedPattern, planStr[i], "for %dth %s", i, sql1)
Expand Down
2 changes: 1 addition & 1 deletion planner/optimize.go
Expand Up @@ -374,7 +374,7 @@ func OptimizeExecStmt(ctx context.Context, sctx sessionctx.Context,
if !ok {
return nil, nil, errors.Errorf("invalid result plan type, should be Execute")
}
plan, names, err := core.GetPlanFromSessionPlanCache(ctx, sctx, is, exec.PrepStmt, exec.Params)
plan, names, err := core.GetPlanFromSessionPlanCache(ctx, sctx, execAst.FromGeneralStmt, is, exec.PrepStmt, exec.Params)
if err != nil {
return nil, nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion server/driver_tidb.go
Expand Up @@ -176,7 +176,7 @@ func (ts *TiDBStatement) Close() error {
return err
}
if !ts.ctx.GetSessionVars().IgnorePreparedCacheCloseStmt { // keep the plan in cache
ts.ctx.PreparedPlanCache().Delete(cacheKey)
ts.ctx.GetPlanCache(false).Delete(cacheKey)
}
}
ts.ctx.GetSessionVars().RemovePreparedStmt(ts.id)
Expand Down
17 changes: 15 additions & 2 deletions session/session.go
Expand Up @@ -232,6 +232,7 @@ type session struct {
store kv.Storage

preparedPlanCache *kvcache.SimpleLRUCache
generalPlanCache *kvcache.SimpleLRUCache

sessionVars *variable.SessionVars
sessionManager util.SessionManager
Expand Down Expand Up @@ -371,7 +372,7 @@ func (s *session) cleanRetryInfo() {
plannercore.SetPstmtIDSchemaVersion(cacheKey, stmtText, preparedAst.SchemaVersion, s.sessionVars.IsolationReadEngines)
}
if !s.sessionVars.IgnorePreparedCacheCloseStmt { // keep the plan in cache
s.PreparedPlanCache().Delete(cacheKey)
s.GetPlanCache(false).Delete(cacheKey)
}
}
s.sessionVars.RemovePreparedStmt(stmtID)
Expand Down Expand Up @@ -430,7 +431,19 @@ func (s *session) SetCollation(coID int) error {
return s.sessionVars.SetSystemVarWithoutValidation(variable.CollationConnection, co)
}

func (s *session) PreparedPlanCache() *kvcache.SimpleLRUCache {
func (s *session) GetPlanCache(isGeneralPlanCache bool) *kvcache.SimpleLRUCache {
if isGeneralPlanCache { // use the general plan cache
if !s.GetSessionVars().EnableGeneralPlanCache {
return nil
}
if s.generalPlanCache == nil { // lazy construction
s.generalPlanCache = kvcache.NewSimpleLRUCache(uint(s.GetSessionVars().GeneralPlanCacheSize),
variable.PreparedPlanCacheMemoryGuardRatio.Load(), plannercore.PreparedPlanCacheMaxMemory.Load())
}
return s.generalPlanCache
}

// use the prepared plan cache
if !s.GetSessionVars().EnablePreparedPlanCache {
return nil
}
Expand Down
5 changes: 3 additions & 2 deletions sessionctx/context.go
Expand Up @@ -106,8 +106,9 @@ type Context interface {
// GetStore returns the store of session.
GetStore() kv.Storage

// PreparedPlanCache returns the cache of the physical plan
PreparedPlanCache() *kvcache.SimpleLRUCache
// GetPlanCache returns the cache of the physical plan.
// generalPlanCache indicates to return the general plan cache or the prepared plan cache.
GetPlanCache(isGeneralPlanCache bool) *kvcache.SimpleLRUCache

// StoreQueryFeedback stores the query feedback.
StoreQueryFeedback(feedback interface{})
Expand Down
3 changes: 1 addition & 2 deletions sessionctx/variable/session.go
Expand Up @@ -1866,8 +1866,7 @@ func (k planCacheStmtKey) Hash() []byte {
// AddGeneralPlanCacheStmt adds this PlanCacheStmt into general-plan-cache-stmt cache
func (s *SessionVars) AddGeneralPlanCacheStmt(sql string, stmt interface{}) {
if s.generalPlanCacheStmts == nil {
// TODO: make it configurable
s.generalPlanCacheStmts = kvcache.NewSimpleLRUCache(100, 0, 0)
s.generalPlanCacheStmts = kvcache.NewSimpleLRUCache(uint(s.GeneralPlanCacheSize), 0, 0)
}
s.generalPlanCacheStmts.Put(planCacheStmtKey(sql), stmt)
}
Expand Down
4 changes: 2 additions & 2 deletions util/mock/context.go
Expand Up @@ -247,8 +247,8 @@ func (*Context) SetGlobalSysVar(_ sessionctx.Context, name string, value string)
return nil
}

// PreparedPlanCache implements the sessionctx.Context interface.
func (c *Context) PreparedPlanCache() *kvcache.SimpleLRUCache {
// GetPlanCache implements the sessionctx.Context interface.
func (c *Context) GetPlanCache(_ bool) *kvcache.SimpleLRUCache {
return c.pcache
}

Expand Down