Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: refactor plan replayer exec #38786

Merged
merged 6 commits into from
Nov 2, 2022
Merged
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
234 changes: 178 additions & 56 deletions executor/plan_replayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,14 @@ import (

"github.com/BurntSushi/toml"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/bindinfo"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/parser"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/privilege"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/statistics/handle"
Expand Down Expand Up @@ -154,7 +157,7 @@ func (e *PlanReplayerExec) Next(ctx context.Context, req *chunk.Chunk) error {
if e.endFlag {
return nil
}
err := e.createFile(domain.GetPlanReplayerDirName())
err := e.createFile()
if err != nil {
return err
}
Expand All @@ -180,33 +183,78 @@ func (e *PlanReplayerExec) Next(ctx context.Context, req *chunk.Chunk) error {
return nil
}

func (e *PlanReplayerExec) createFile(path string) error {
// Create path
func (e *PlanReplayerExec) createFile() error {
var err error
e.DumpInfo.File, e.DumpInfo.FileName, err = GeneratePlanReplayerFile()
if err != nil {
return err
}
return nil
}

// GeneratePlanReplayerFile generates plan replayer file
func GeneratePlanReplayerFile() (*os.File, string, error) {
path := domain.GetPlanReplayerDirName()
err := os.MkdirAll(path, os.ModePerm)
if err != nil {
return errors.AddStack(err)
return nil, "", errors.AddStack(err)
}
fileName, err := generatePlanReplayerFileName()
if err != nil {
return nil, "", errors.AddStack(err)
}
zf, err := os.Create(filepath.Join(path, fileName))
if err != nil {
return nil, "", errors.AddStack(err)
}
return zf, fileName, err
}

func generatePlanReplayerFileName() (string, error) {
// Generate key and create zip file
time := time.Now().UnixNano()
b := make([]byte, 16)
//nolint: gosec
_, err = rand.Read(b)
_, err := rand.Read(b)
if err != nil {
return err
return "", err
}
key := base64.URLEncoding.EncodeToString(b)
fileName := fmt.Sprintf("replayer_%v_%v.zip", key, time)
zf, err := os.Create(filepath.Join(path, fileName))
return fmt.Sprintf("replayer_%v_%v.zip", key, time), nil
}

func (e *PlanReplayerDumpInfo) dump(ctx context.Context) (err error) {
fileName := e.FileName
zf := e.File
task := &PlanReplayerDumpTask{
FileName: fileName,
Zf: zf,
SessionVars: e.ctx.GetSessionVars(),
TblStats: nil,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where do we init PlanReplayerDumpTask.TblStats?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be used in future to support #38779

ExecStmts: e.ExecStmts,
Analyze: e.Analyze,
}
err = DumpPlanReplayerInfo(ctx, e.ctx, task)
if err != nil {
return errors.AddStack(err)
return err
}
e.DumpInfo.File = zf
e.DumpInfo.FileName = fileName
e.ctx.GetSessionVars().LastPlanReplayerToken = e.FileName
return nil
}

// dumpSingle will dump the information about sqls.
// PlanReplayerDumpTask wrap the params for plan replayer dump
type PlanReplayerDumpTask struct {
SessionBindings []*bindinfo.BindRecord
EncodedPlan string
FileName string
Zf *os.File
SessionVars *variable.SessionVars
TblStats map[int64]*handle.JSONTable
ExecStmts []ast.StmtNode
Analyze bool
}

// DumpPlanReplayerInfo will dump the information about sqls.
// The files will be organized into the following format:
/*
|-meta.txt
Expand Down Expand Up @@ -235,10 +283,12 @@ func (e *PlanReplayerExec) createFile(path string) error {
|-explain2.txt
|-....
*/
func (e *PlanReplayerDumpInfo) dump(ctx context.Context) (err error) {
fileName := e.FileName
zf := e.File
// Create zip writer
func DumpPlanReplayerInfo(ctx context.Context, sctx sessionctx.Context,
task *PlanReplayerDumpTask) (err error) {
zf := task.Zf
fileName := task.FileName
sessionVars := task.SessionVars
execStmts := task.ExecStmts
zw := zip.NewWriter(zf)
defer func() {
err = zw.Close()
Expand All @@ -250,7 +300,6 @@ func (e *PlanReplayerDumpInfo) dump(ctx context.Context) (err error) {
logutil.BgLogger().Error("Closing zip file failed", zap.Error(err), zap.String("filename", fileName))
}
}()

// Dump config
if err = dumpConfig(zw); err != nil {
return err
Expand All @@ -260,60 +309,62 @@ func (e *PlanReplayerDumpInfo) dump(ctx context.Context) (err error) {
if err = dumpMeta(zw); err != nil {
return err
}

// Retrieve current DB
sessionVars := e.ctx.GetSessionVars()
dbName := model.NewCIStr(sessionVars.CurrentDB)
do := domain.GetDomain(e.ctx)
do := domain.GetDomain(sctx)

// Retrieve all tables
pairs, err := e.extractTableNames(ctx, e.ExecStmts, dbName)
pairs, err := extractTableNames(ctx, sctx, execStmts, dbName)
if err != nil {
return errors.AddStack(fmt.Errorf("plan replayer: invalid SQL text, err: %v", err))
}

// Dump Schema and View
if err = dumpSchemas(e.ctx, zw, pairs); err != nil {
if err = dumpSchemas(sctx, zw, pairs); err != nil {
return err
}

// Dump tables tiflash replicas
if err = dumpTiFlashReplica(e.ctx, zw, pairs); err != nil {
if err = dumpTiFlashReplica(sctx, zw, pairs); err != nil {
return err
}

// Dump stats
if err = dumpStats(zw, pairs, do); err != nil {
if err = dumpStats(zw, pairs, task.TblStats, do); err != nil {
return err
}

// Dump variables
if err = dumpVariables(e.ctx, zw); err != nil {
if err = dumpVariables(sctx, sessionVars, zw); err != nil {
return err
}

// Dump sql
if err = dumpSQLs(e.ExecStmts, zw); err != nil {
if err = dumpSQLs(execStmts, zw); err != nil {
return err
}

// Dump session bindings
if err = dumpSessionBindings(e.ctx, zw); err != nil {
return err
if len(task.SessionBindings) > 0 {
if err = dumpSessionBindRecords(sctx, task.SessionBindings, zw); err != nil {
return err
}
} else {
if err = dumpSessionBindings(sctx, zw); err != nil {
return err
}
}

// Dump global bindings
if err = dumpGlobalBindings(e.ctx, zw); err != nil {
if err = dumpGlobalBindings(sctx, zw); err != nil {
return err
}

// Dump explain
if err = dumpExplain(e.ctx, zw, e.ExecStmts, e.Analyze); err != nil {
return err
if len(task.EncodedPlan) > 0 {
return dumpEncodedPlan(sctx, zw, task.EncodedPlan)
}

e.ctx.GetSessionVars().LastPlanReplayerToken = e.FileName
return nil
// Dump explain
return dumpExplain(sctx, zw, execStmts, task.Analyze)
}

func dumpConfig(zw *zip.Writer) error {
Expand Down Expand Up @@ -374,12 +425,12 @@ func dumpSchemas(ctx sessionctx.Context, zw *zip.Writer, pairs map[tableNamePair
return nil
}

func dumpStats(zw *zip.Writer, pairs map[tableNamePair]struct{}, do *domain.Domain) error {
func dumpStats(zw *zip.Writer, pairs map[tableNamePair]struct{}, tblJSONStats map[int64]*handle.JSONTable, do *domain.Domain) error {
for pair := range pairs {
if pair.IsView {
continue
}
jsonTbl, err := getStatsForTable(do, pair)
jsonTbl, err := getStatsForTable(do, tblJSONStats, pair)
if err != nil {
return err
}
Expand Down Expand Up @@ -413,31 +464,72 @@ func dumpSQLs(execStmts []ast.StmtNode, zw *zip.Writer) error {
return nil
}

func dumpVariables(ctx sessionctx.Context, zw *zip.Writer) error {
func dumpVariables(sctx sessionctx.Context, sessionVars *variable.SessionVars, zw *zip.Writer) error {
varMap := make(map[string]string)
recordSets, err := ctx.(sqlexec.SQLExecutor).Execute(context.Background(), "show variables")
if err != nil {
return err
}
sRows, err := resultSetToStringSlice(context.Background(), recordSets[0], false)
if err != nil {
return err
for _, v := range variable.GetSysVars() {
if v.IsNoop && !variable.EnableNoopVariables.Load() {
continue
}
if infoschema.SysVarHiddenForSem(sctx, v.Name) {
continue
}
value, err := sessionVars.GetSessionOrGlobalSystemVar(context.Background(), v.Name)
if err != nil {
return errors.Trace(err)
}
varMap[v.Name] = value
}
vf, err := zw.Create(variablesFile)
if err != nil {
return errors.AddStack(err)
}
for _, row := range sRows {
varMap[row[0]] = row[1]
}
if err := toml.NewEncoder(vf).Encode(varMap); err != nil {
return errors.AddStack(err)
}
if len(recordSets) > 0 {
if err := recordSets[0].Close(); err != nil {
return err
return nil
}

func dumpSessionBindRecords(ctx sessionctx.Context, records []*bindinfo.BindRecord, zw *zip.Writer) error {
sRows := make([][]string, 0)
is := domain.GetDomain(ctx).InfoSchema()
parser := parser.New()
for _, bindData := range records {
for _, hint := range bindData.Bindings {
stmt, err := parser.ParseOneStmt(hint.BindSQL, hint.Charset, hint.Collation)
if err != nil {
return err
}
checker := visibleChecker{
defaultDB: bindData.Db,
ctx: ctx,
is: is,
manager: privilege.GetPrivilegeManager(ctx),
ok: true,
}
stmt.Accept(&checker)
if !checker.ok {
continue
}
sRows = append(sRows, []string{
bindData.OriginalSQL,
hint.BindSQL,
bindData.Db,
hint.Status,
hint.CreateTime.String(),
hint.UpdateTime.String(),
hint.Charset,
hint.Collation,
hint.Source,
})
}
}
bf, err := zw.Create(sessionBindingFile)
if err != nil {
return errors.AddStack(err)
}
for _, row := range sRows {
fmt.Fprintf(bf, "%s\n", strings.Join(row, "\t"))
}
return nil
}

Expand Down Expand Up @@ -489,6 +581,32 @@ func dumpGlobalBindings(ctx sessionctx.Context, zw *zip.Writer) error {
return nil
}

func dumpEncodedPlan(ctx sessionctx.Context, zw *zip.Writer, encodedPlan string) error {
var recordSets []sqlexec.RecordSet
var err error
recordSets, err = ctx.(sqlexec.SQLExecutor).Execute(context.Background(), fmt.Sprintf("select tidb_decode_plan('%s')", encodedPlan))
if err != nil {
return err
}
sRows, err := resultSetToStringSlice(context.Background(), recordSets[0], false)
if err != nil {
return err
}
fw, err := zw.Create("explain/sql.txt")
if err != nil {
return errors.AddStack(err)
}
for _, row := range sRows {
fmt.Fprintf(fw, "%s\n", strings.Join(row, "\t"))
}
if len(recordSets) > 0 {
if err := recordSets[0].Close(); err != nil {
return err
}
}
return nil
}

func dumpExplain(ctx sessionctx.Context, zw *zip.Writer, execStmts []ast.StmtNode, isAnalyze bool) error {
for i, stmtExec := range execStmts {
sql := stmtExec.Text()
Expand Down Expand Up @@ -527,12 +645,12 @@ func dumpExplain(ctx sessionctx.Context, zw *zip.Writer, execStmts []ast.StmtNod
return nil
}

func (e *PlanReplayerDumpInfo) extractTableNames(ctx context.Context,
func extractTableNames(ctx context.Context, sctx sessionctx.Context,
ExecStmts []ast.StmtNode, curDB model.CIStr) (map[tableNamePair]struct{}, error) {
tableExtractor := &tableNameExtractor{
ctx: ctx,
executor: e.ctx.(sqlexec.RestrictedSQLExecutor),
is: domain.GetDomain(e.ctx).InfoSchema(),
executor: sctx.(sqlexec.RestrictedSQLExecutor),
is: domain.GetDomain(sctx).InfoSchema(),
curDB: curDB,
names: make(map[tableNamePair]struct{}),
cteNames: make(map[string]struct{}),
Expand Down Expand Up @@ -586,14 +704,18 @@ func (e *PlanReplayerDumpInfo) DumpSQLsFromFile(ctx context.Context, b []byte) e
return e.dump(ctx)
}

func getStatsForTable(do *domain.Domain, pair tableNamePair) (*handle.JSONTable, error) {
func getStatsForTable(do *domain.Domain, tblJSONStats map[int64]*handle.JSONTable, pair tableNamePair) (*handle.JSONTable, error) {
is := do.InfoSchema()
h := do.StatsHandle()
tbl, err := is.TableByName(model.NewCIStr(pair.DBName), model.NewCIStr(pair.TableName))
if err != nil {
return nil, err
}
js, err := h.DumpStatsToJSON(pair.DBName, tbl.Meta(), nil, true)
js, ok := tblJSONStats[tbl.Meta().ID]
if ok && js != nil {
return js, nil
}
js, err = h.DumpStatsToJSON(pair.DBName, tbl.Meta(), nil, true)
return js, err
}

Expand Down