Skip to content

Commit

Permalink
executor: refactor plan replayer exec (#38786)
Browse files Browse the repository at this point in the history
ref #38779
  • Loading branch information
Yisaer committed Nov 2, 2022
1 parent 64ca5d4 commit 07a0ef2
Showing 1 changed file with 178 additions and 56 deletions.
234 changes: 178 additions & 56 deletions executor/plan_replayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,14 @@ import (

"github.com/BurntSushi/toml"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/bindinfo"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/parser"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/privilege"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/statistics/handle"
Expand Down Expand Up @@ -154,7 +157,7 @@ func (e *PlanReplayerExec) Next(ctx context.Context, req *chunk.Chunk) error {
if e.endFlag {
return nil
}
err := e.createFile(domain.GetPlanReplayerDirName())
err := e.createFile()
if err != nil {
return err
}
Expand All @@ -180,33 +183,78 @@ func (e *PlanReplayerExec) Next(ctx context.Context, req *chunk.Chunk) error {
return nil
}

func (e *PlanReplayerExec) createFile(path string) error {
// Create path
func (e *PlanReplayerExec) createFile() error {
var err error
e.DumpInfo.File, e.DumpInfo.FileName, err = GeneratePlanReplayerFile()
if err != nil {
return err
}
return nil
}

// GeneratePlanReplayerFile generates plan replayer file
func GeneratePlanReplayerFile() (*os.File, string, error) {
path := domain.GetPlanReplayerDirName()
err := os.MkdirAll(path, os.ModePerm)
if err != nil {
return errors.AddStack(err)
return nil, "", errors.AddStack(err)
}
fileName, err := generatePlanReplayerFileName()
if err != nil {
return nil, "", errors.AddStack(err)
}
zf, err := os.Create(filepath.Join(path, fileName))
if err != nil {
return nil, "", errors.AddStack(err)
}
return zf, fileName, err
}

func generatePlanReplayerFileName() (string, error) {
// Generate key and create zip file
time := time.Now().UnixNano()
b := make([]byte, 16)
//nolint: gosec
_, err = rand.Read(b)
_, err := rand.Read(b)
if err != nil {
return err
return "", err
}
key := base64.URLEncoding.EncodeToString(b)
fileName := fmt.Sprintf("replayer_%v_%v.zip", key, time)
zf, err := os.Create(filepath.Join(path, fileName))
return fmt.Sprintf("replayer_%v_%v.zip", key, time), nil
}

func (e *PlanReplayerDumpInfo) dump(ctx context.Context) (err error) {
fileName := e.FileName
zf := e.File
task := &PlanReplayerDumpTask{
FileName: fileName,
Zf: zf,
SessionVars: e.ctx.GetSessionVars(),
TblStats: nil,
ExecStmts: e.ExecStmts,
Analyze: e.Analyze,
}
err = DumpPlanReplayerInfo(ctx, e.ctx, task)
if err != nil {
return errors.AddStack(err)
return err
}
e.DumpInfo.File = zf
e.DumpInfo.FileName = fileName
e.ctx.GetSessionVars().LastPlanReplayerToken = e.FileName
return nil
}

// dumpSingle will dump the information about sqls.
// PlanReplayerDumpTask wrap the params for plan replayer dump
type PlanReplayerDumpTask struct {
SessionBindings []*bindinfo.BindRecord
EncodedPlan string
FileName string
Zf *os.File
SessionVars *variable.SessionVars
TblStats map[int64]*handle.JSONTable
ExecStmts []ast.StmtNode
Analyze bool
}

// DumpPlanReplayerInfo will dump the information about sqls.
// The files will be organized into the following format:
/*
|-meta.txt
Expand Down Expand Up @@ -235,10 +283,12 @@ func (e *PlanReplayerExec) createFile(path string) error {
|-explain2.txt
|-....
*/
func (e *PlanReplayerDumpInfo) dump(ctx context.Context) (err error) {
fileName := e.FileName
zf := e.File
// Create zip writer
func DumpPlanReplayerInfo(ctx context.Context, sctx sessionctx.Context,
task *PlanReplayerDumpTask) (err error) {
zf := task.Zf
fileName := task.FileName
sessionVars := task.SessionVars
execStmts := task.ExecStmts
zw := zip.NewWriter(zf)
defer func() {
err = zw.Close()
Expand All @@ -250,7 +300,6 @@ func (e *PlanReplayerDumpInfo) dump(ctx context.Context) (err error) {
logutil.BgLogger().Error("Closing zip file failed", zap.Error(err), zap.String("filename", fileName))
}
}()

// Dump config
if err = dumpConfig(zw); err != nil {
return err
Expand All @@ -260,60 +309,62 @@ func (e *PlanReplayerDumpInfo) dump(ctx context.Context) (err error) {
if err = dumpMeta(zw); err != nil {
return err
}

// Retrieve current DB
sessionVars := e.ctx.GetSessionVars()
dbName := model.NewCIStr(sessionVars.CurrentDB)
do := domain.GetDomain(e.ctx)
do := domain.GetDomain(sctx)

// Retrieve all tables
pairs, err := e.extractTableNames(ctx, e.ExecStmts, dbName)
pairs, err := extractTableNames(ctx, sctx, execStmts, dbName)
if err != nil {
return errors.AddStack(fmt.Errorf("plan replayer: invalid SQL text, err: %v", err))
}

// Dump Schema and View
if err = dumpSchemas(e.ctx, zw, pairs); err != nil {
if err = dumpSchemas(sctx, zw, pairs); err != nil {
return err
}

// Dump tables tiflash replicas
if err = dumpTiFlashReplica(e.ctx, zw, pairs); err != nil {
if err = dumpTiFlashReplica(sctx, zw, pairs); err != nil {
return err
}

// Dump stats
if err = dumpStats(zw, pairs, do); err != nil {
if err = dumpStats(zw, pairs, task.TblStats, do); err != nil {
return err
}

// Dump variables
if err = dumpVariables(e.ctx, zw); err != nil {
if err = dumpVariables(sctx, sessionVars, zw); err != nil {
return err
}

// Dump sql
if err = dumpSQLs(e.ExecStmts, zw); err != nil {
if err = dumpSQLs(execStmts, zw); err != nil {
return err
}

// Dump session bindings
if err = dumpSessionBindings(e.ctx, zw); err != nil {
return err
if len(task.SessionBindings) > 0 {
if err = dumpSessionBindRecords(sctx, task.SessionBindings, zw); err != nil {
return err
}
} else {
if err = dumpSessionBindings(sctx, zw); err != nil {
return err
}
}

// Dump global bindings
if err = dumpGlobalBindings(e.ctx, zw); err != nil {
if err = dumpGlobalBindings(sctx, zw); err != nil {
return err
}

// Dump explain
if err = dumpExplain(e.ctx, zw, e.ExecStmts, e.Analyze); err != nil {
return err
if len(task.EncodedPlan) > 0 {
return dumpEncodedPlan(sctx, zw, task.EncodedPlan)
}

e.ctx.GetSessionVars().LastPlanReplayerToken = e.FileName
return nil
// Dump explain
return dumpExplain(sctx, zw, execStmts, task.Analyze)
}

func dumpConfig(zw *zip.Writer) error {
Expand Down Expand Up @@ -374,12 +425,12 @@ func dumpSchemas(ctx sessionctx.Context, zw *zip.Writer, pairs map[tableNamePair
return nil
}

func dumpStats(zw *zip.Writer, pairs map[tableNamePair]struct{}, do *domain.Domain) error {
func dumpStats(zw *zip.Writer, pairs map[tableNamePair]struct{}, tblJSONStats map[int64]*handle.JSONTable, do *domain.Domain) error {
for pair := range pairs {
if pair.IsView {
continue
}
jsonTbl, err := getStatsForTable(do, pair)
jsonTbl, err := getStatsForTable(do, tblJSONStats, pair)
if err != nil {
return err
}
Expand Down Expand Up @@ -413,31 +464,72 @@ func dumpSQLs(execStmts []ast.StmtNode, zw *zip.Writer) error {
return nil
}

func dumpVariables(ctx sessionctx.Context, zw *zip.Writer) error {
func dumpVariables(sctx sessionctx.Context, sessionVars *variable.SessionVars, zw *zip.Writer) error {
varMap := make(map[string]string)
recordSets, err := ctx.(sqlexec.SQLExecutor).Execute(context.Background(), "show variables")
if err != nil {
return err
}
sRows, err := resultSetToStringSlice(context.Background(), recordSets[0], false)
if err != nil {
return err
for _, v := range variable.GetSysVars() {
if v.IsNoop && !variable.EnableNoopVariables.Load() {
continue
}
if infoschema.SysVarHiddenForSem(sctx, v.Name) {
continue
}
value, err := sessionVars.GetSessionOrGlobalSystemVar(context.Background(), v.Name)
if err != nil {
return errors.Trace(err)
}
varMap[v.Name] = value
}
vf, err := zw.Create(variablesFile)
if err != nil {
return errors.AddStack(err)
}
for _, row := range sRows {
varMap[row[0]] = row[1]
}
if err := toml.NewEncoder(vf).Encode(varMap); err != nil {
return errors.AddStack(err)
}
if len(recordSets) > 0 {
if err := recordSets[0].Close(); err != nil {
return err
return nil
}

func dumpSessionBindRecords(ctx sessionctx.Context, records []*bindinfo.BindRecord, zw *zip.Writer) error {
sRows := make([][]string, 0)
is := domain.GetDomain(ctx).InfoSchema()
parser := parser.New()
for _, bindData := range records {
for _, hint := range bindData.Bindings {
stmt, err := parser.ParseOneStmt(hint.BindSQL, hint.Charset, hint.Collation)
if err != nil {
return err
}
checker := visibleChecker{
defaultDB: bindData.Db,
ctx: ctx,
is: is,
manager: privilege.GetPrivilegeManager(ctx),
ok: true,
}
stmt.Accept(&checker)
if !checker.ok {
continue
}
sRows = append(sRows, []string{
bindData.OriginalSQL,
hint.BindSQL,
bindData.Db,
hint.Status,
hint.CreateTime.String(),
hint.UpdateTime.String(),
hint.Charset,
hint.Collation,
hint.Source,
})
}
}
bf, err := zw.Create(sessionBindingFile)
if err != nil {
return errors.AddStack(err)
}
for _, row := range sRows {
fmt.Fprintf(bf, "%s\n", strings.Join(row, "\t"))
}
return nil
}

Expand Down Expand Up @@ -489,6 +581,32 @@ func dumpGlobalBindings(ctx sessionctx.Context, zw *zip.Writer) error {
return nil
}

func dumpEncodedPlan(ctx sessionctx.Context, zw *zip.Writer, encodedPlan string) error {
var recordSets []sqlexec.RecordSet
var err error
recordSets, err = ctx.(sqlexec.SQLExecutor).Execute(context.Background(), fmt.Sprintf("select tidb_decode_plan('%s')", encodedPlan))
if err != nil {
return err
}
sRows, err := resultSetToStringSlice(context.Background(), recordSets[0], false)
if err != nil {
return err
}
fw, err := zw.Create("explain/sql.txt")
if err != nil {
return errors.AddStack(err)
}
for _, row := range sRows {
fmt.Fprintf(fw, "%s\n", strings.Join(row, "\t"))
}
if len(recordSets) > 0 {
if err := recordSets[0].Close(); err != nil {
return err
}
}
return nil
}

func dumpExplain(ctx sessionctx.Context, zw *zip.Writer, execStmts []ast.StmtNode, isAnalyze bool) error {
for i, stmtExec := range execStmts {
sql := stmtExec.Text()
Expand Down Expand Up @@ -527,12 +645,12 @@ func dumpExplain(ctx sessionctx.Context, zw *zip.Writer, execStmts []ast.StmtNod
return nil
}

func (e *PlanReplayerDumpInfo) extractTableNames(ctx context.Context,
func extractTableNames(ctx context.Context, sctx sessionctx.Context,
ExecStmts []ast.StmtNode, curDB model.CIStr) (map[tableNamePair]struct{}, error) {
tableExtractor := &tableNameExtractor{
ctx: ctx,
executor: e.ctx.(sqlexec.RestrictedSQLExecutor),
is: domain.GetDomain(e.ctx).InfoSchema(),
executor: sctx.(sqlexec.RestrictedSQLExecutor),
is: domain.GetDomain(sctx).InfoSchema(),
curDB: curDB,
names: make(map[tableNamePair]struct{}),
cteNames: make(map[string]struct{}),
Expand Down Expand Up @@ -586,14 +704,18 @@ func (e *PlanReplayerDumpInfo) DumpSQLsFromFile(ctx context.Context, b []byte) e
return e.dump(ctx)
}

func getStatsForTable(do *domain.Domain, pair tableNamePair) (*handle.JSONTable, error) {
func getStatsForTable(do *domain.Domain, tblJSONStats map[int64]*handle.JSONTable, pair tableNamePair) (*handle.JSONTable, error) {
is := do.InfoSchema()
h := do.StatsHandle()
tbl, err := is.TableByName(model.NewCIStr(pair.DBName), model.NewCIStr(pair.TableName))
if err != nil {
return nil, err
}
js, err := h.DumpStatsToJSON(pair.DBName, tbl.Meta(), nil, true)
js, ok := tblJSONStats[tbl.Meta().ID]
if ok && js != nil {
return js, nil
}
js, err = h.DumpStatsToJSON(pair.DBName, tbl.Meta(), nil, true)
return js, err
}

Expand Down

0 comments on commit 07a0ef2

Please sign in to comment.