Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

domain: revise extract plan package format #41876

Merged
merged 2 commits into from Mar 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
233 changes: 153 additions & 80 deletions domain/extract.go
Expand Up @@ -18,6 +18,7 @@ import (
"archive/zip"
"context"
"encoding/base64"
"encoding/json"
"fmt"
"math/rand"
"os"
Expand All @@ -41,7 +42,7 @@ import (

const (
// ExtractMetaFile indicates meta file for extract
ExtractMetaFile = "meta.txt"
ExtractMetaFile = "extract_meta.txt"
)

const (
Expand Down Expand Up @@ -147,55 +148,33 @@ func (w *extractWorker) extractPlanTask(ctx context.Context, task *ExtractTask)
return w.dumpExtractPlanPackage(p)
}

func (w *extractWorker) collectRecords(ctx context.Context, task *ExtractTask) (map[stmtSummaryHistoryKey]stmtSummaryHistoryRecord, error) {
func (w *extractWorker) collectRecords(ctx context.Context, task *ExtractTask) (map[stmtSummaryHistoryKey]*stmtSummaryHistoryRecord, error) {
w.Lock()
defer w.Unlock()
exec := w.sctx.(sqlexec.RestrictedSQLExecutor)
ctx1 := kv.WithInternalSourceType(ctx, kv.InternalTxnStats)
rows, _, err := exec.ExecRestrictedSQL(ctx1, nil, fmt.Sprintf("SELECT STMT_TYPE, TABLE_NAMES, DIGEST, PLAN_DIGEST,QUERY_SAMPLE_TEXT, BINARY_PLAN FROM INFORMATION_SCHEMA.STATEMENTS_SUMMARY_HISTORY WHERE SUMMARY_END_TIME > '%s' OR SUMMARY_BEGIN_TIME < '%s'",
rows, _, err := exec.ExecRestrictedSQL(ctx1, nil, fmt.Sprintf("SELECT STMT_TYPE, DIGEST, PLAN_DIGEST,QUERY_SAMPLE_TEXT, BINARY_PLAN, TABLE_NAMES FROM INFORMATION_SCHEMA.STATEMENTS_SUMMARY_HISTORY WHERE SUMMARY_END_TIME > '%s' OR SUMMARY_BEGIN_TIME < '%s'",
task.Begin.Format(types.TimeFormat), task.End.Format(types.TimeFormat)))
if err != nil {
return nil, err
}
collectMap := make(map[stmtSummaryHistoryKey]stmtSummaryHistoryRecord, 0)
is := GetDomain(w.sctx).InfoSchema()
collectMap := make(map[stmtSummaryHistoryKey]*stmtSummaryHistoryRecord, 0)
for _, row := range rows {
record := stmtSummaryHistoryRecord{}
record := &stmtSummaryHistoryRecord{}
record.stmtType = row.GetString(0)
record.digest = row.GetString(2)
record.planDigest = row.GetString(3)
record.sql = row.GetString(4)
record.binaryPlan = row.GetString(5)
record.digest = row.GetString(1)
record.planDigest = row.GetString(2)
record.sql = row.GetString(3)
record.binaryPlan = row.GetString(4)
tableNames := row.GetString(5)
key := stmtSummaryHistoryKey{
digest: record.digest,
planDigest: record.planDigest,
}
record.tables = make([]tableNamePair, 0)
tables := row.GetString(1)
setRecord := true

for _, t := range strings.Split(tables, ",") {
names := strings.Split(t, ".")
if len(names) != 2 {
setRecord = false
break
}
dbName := names[0]
tblName := names[1]
t, err := is.TableByName(model.NewCIStr(dbName), model.NewCIStr(tblName))
if err != nil {
return nil, err
}
record.schemaName = dbName
// skip internal schema record
switch strings.ToLower(record.schemaName) {
case util.PerformanceSchemaName.L, util.InformationSchemaName.L, util.MetricSchemaName.L, "mysql":
setRecord = false
}
if !setRecord {
break
}
record.tables = append(record.tables, tableNamePair{DBName: dbName, TableName: tblName, IsView: t.Meta().IsView()})
setRecord, err := w.handleTableNames(tableNames, record)
if err != nil {
return nil, err
}
if setRecord && checkRecordValid(record) {
collectMap[key] = record
Expand All @@ -204,7 +183,38 @@ func (w *extractWorker) collectRecords(ctx context.Context, task *ExtractTask) (
return collectMap, nil
}

func checkRecordValid(r stmtSummaryHistoryRecord) bool {
func (w *extractWorker) handleTableNames(tableNames string, record *stmtSummaryHistoryRecord) (bool, error) {
is := GetDomain(w.sctx).InfoSchema()
for _, t := range strings.Split(tableNames, ",") {
names := strings.Split(t, ".")
if len(names) != 2 {
return false, nil
}
dbName := names[0]
tblName := names[1]
record.schemaName = dbName
// skip internal schema record
switch strings.ToLower(record.schemaName) {
case util.PerformanceSchemaName.L, util.InformationSchemaName.L, util.MetricSchemaName.L, "mysql":
return false, nil
}
exists := is.TableExists(model.NewCIStr(dbName), model.NewCIStr(tblName))
if !exists {
return false, nil
}
t, err := is.TableByName(model.NewCIStr(dbName), model.NewCIStr(tblName))
if err != nil {
return false, err
}
record.tables = append(record.tables, tableNamePair{DBName: dbName, TableName: tblName, IsView: t.Meta().IsView()})
}
return true, nil
}

func checkRecordValid(r *stmtSummaryHistoryRecord) bool {
if r.stmtType != "Select" {
return false
}
if r.schemaName == "" {
return false
}
Expand All @@ -214,31 +224,65 @@ func checkRecordValid(r stmtSummaryHistoryRecord) bool {
return true
}

func (w *extractWorker) packageExtractPlanRecords(ctx context.Context, records map[stmtSummaryHistoryKey]stmtSummaryHistoryRecord) (*extractPlanPackage, error) {
func (w *extractWorker) packageExtractPlanRecords(ctx context.Context, records map[stmtSummaryHistoryKey]*stmtSummaryHistoryRecord) (*extractPlanPackage, error) {
p := &extractPlanPackage{}
p.sqls = make([]string, 0)
p.plans = make([]string, 0)
p.skippedSQLs = make([]string, 0)
p.records = records
p.tables = make(map[tableNamePair]struct{}, 0)
for _, record := range records {
// skip the sql which has been cut off
if strings.Contains(record.sql, "(len:") {
p.skippedSQLs = append(p.skippedSQLs, record.sql)
record.skip = true
continue
}
p.sqls = append(p.sqls, record.sql)
plan, err := w.decodeBinaryPlan(ctx, record.binaryPlan)
if err != nil {
return nil, err
}
p.plans = append(p.plans, plan)
record.plan = plan
for _, tbl := range record.tables {
p.tables[tbl] = struct{}{}
}
}
if err := w.handleIsView(ctx, p); err != nil {
return nil, err
}
return p, nil
}

func (w *extractWorker) handleIsView(ctx context.Context, p *extractPlanPackage) error {
is := GetDomain(w.sctx).InfoSchema()
tne := &tableNameExtractor{
ctx: ctx,
executor: w.sctx.(sqlexec.RestrictedSQLExecutor),
is: is,
curDB: model.NewCIStr(""),
names: make(map[tableNamePair]struct{}),
cteNames: make(map[string]struct{}),
}
for v := range p.tables {
if v.IsView {
v, err := is.TableByName(model.NewCIStr(v.DBName), model.NewCIStr(v.TableName))
if err != nil {
return err
}
sql := v.Meta().View.SelectStmt
node, err := tne.executor.ParseWithParams(tne.ctx, sql)
if err != nil {
return err
}
node.Accept(tne)
}
}
if tne.err != nil {
return tne.err
}
r := tne.getTablesAndViews()
for t := range r {
p.tables[t] = struct{}{}
}
return nil
}

func (w *extractWorker) decodeBinaryPlan(ctx context.Context, bPlan string) (string, error) {
exec := w.sctx.(sqlexec.RestrictedSQLExecutor)
ctx1 := kv.WithInternalSourceType(ctx, kv.InternalTxnStats)
Expand All @@ -253,7 +297,11 @@ func (w *extractWorker) decodeBinaryPlan(ctx context.Context, bPlan string) (str
// dumpExtractPlanPackage will dump the information about sqls collected in stmt_summary_history
// The files will be organized into the following format:
/*
|-extract_meta.txt
|-meta.txt
|-config.toml
|-variables.toml
|-bindings.sql
|-schema
| |-schema_meta.txt
| |-db1.table1.schema.txt
Expand All @@ -269,8 +317,12 @@ func (w *extractWorker) decodeBinaryPlan(ctx context.Context, bPlan string) (str
| |-....
|-table_tiflash_replica.txt
|-sql
| |-sqls.sql
| |-skippedSQLs.sql
| |-digest1.sql
| |-digest2.sql
| |-....
|-skippedSQLs
| |-digest1.sql
| |-...
*/
func (w *extractWorker) dumpExtractPlanPackage(p *extractPlanPackage) (name string, err error) {
f, name, err := GenerateExtractFile()
Expand All @@ -290,6 +342,14 @@ func (w *extractWorker) dumpExtractPlanPackage(p *extractPlanPackage) (name stri
}
}()

// Dump config
if err = dumpConfig(zw); err != nil {
return "", err
}
// Dump meta
if err = dumpMeta(zw); err != nil {
return "", err
}
// dump extract plan task meta
if err = dumpExtractMeta(ExtractPlanType, zw); err != nil {
return "", err
Expand All @@ -302,40 +362,34 @@ func (w *extractWorker) dumpExtractPlanPackage(p *extractPlanPackage) (name stri
if err = dumpTiFlashReplica(w.sctx, zw, p.tables); err != nil {
return "", err
}
// Dump stats
if err = dumpStats(zw, p.tables, GetDomain(w.sctx)); err != nil {
// Dump variables
if err = dumpVariables(w.sctx, w.sctx.GetSessionVars(), zw); err != nil {
return "", err
}
// Dump sqls
if err = dumpExtractPlanSQLs(p.sqls, p.skippedSQLs, zw); err != nil {
// Dump global bindings
if err = dumpGlobalBindings(w.sctx, zw); err != nil {
return "", err
}
// dump plans
if err = dumpExtractPlans(p.plans, zw); err != nil {
// Dump stats
if err = dumpStats(zw, p.tables, GetDomain(w.sctx)); err != nil {
return "", err
}
return name, nil
}

func dumpExtractPlanSQLs(sqls, skippedSQLs []string, zw *zip.Writer) error {
if err := dumpTargetSQLs(sqls, "sql/sqls.sql", zw); err != nil {
return err
// Dump sqls and plan
if err = dumpSQLRecords(p.records, zw); err != nil {
return "", err
}
return dumpTargetSQLs(skippedSQLs, "sql/skippedSQLs.sql", zw)
return name, nil
}

func dumpExtractPlans(plans []string, zw *zip.Writer) error {
zf, err := zw.Create("plans.txt")
if err != nil {
return err
}
for i, plan := range plans {
_, err = zf.Write([]byte(plan))
if err != nil {
return err
}
if i < len(plans)-1 {
_, err = zf.Write([]byte("\n<--------->\n"))
func dumpSQLRecords(records map[stmtSummaryHistoryKey]*stmtSummaryHistoryRecord, zw *zip.Writer) error {
for key, record := range records {
if record.skip {
err := dumpSQLRecord(record, fmt.Sprintf("skippedSQLs/%v.json", key.digest), zw)
if err != nil {
return err
}
} else {
err := dumpSQLRecord(record, fmt.Sprintf("SQLs/%v.json", key.digest), zw)
if err != nil {
return err
}
Expand All @@ -344,16 +398,34 @@ func dumpExtractPlans(plans []string, zw *zip.Writer) error {
return nil
}

func dumpTargetSQLs(sqls []string, path string, zw *zip.Writer) error {
type singleSQLRecord struct {
Schema string `json:"schema"`
Plan string `json:"plan"`
SQL string `json:"sql"`
Digest string `json:"digest"`
BinaryPlan string `json:"binaryPlan"`
}

// dumpSQLRecord dumps sql records into one file for each record, the format is in json.
func dumpSQLRecord(record *stmtSummaryHistoryRecord, path string, zw *zip.Writer) error {
zf, err := zw.Create(path)
if err != nil {
return err
}
for _, sql := range sqls {
_, err = zf.Write([]byte(fmt.Sprintf("%s;\n", sql)))
if err != nil {
return err
}
singleSQLRecord := &singleSQLRecord{
Schema: record.schemaName,
Plan: record.plan,
SQL: record.sql,
Digest: record.digest,
BinaryPlan: record.binaryPlan,
}
content, err := json.Marshal(singleSQLRecord)
if err != nil {
return err
}
_, err = zf.Write(content)
if err != nil {
return err
}
return nil
}
Expand All @@ -372,10 +444,8 @@ func dumpExtractMeta(t ExtractType, zw *zip.Writer) error {
}

type extractPlanPackage struct {
sqls []string
plans []string
skippedSQLs []string
tables map[tableNamePair]struct{}
tables map[tableNamePair]struct{}
records map[stmtSummaryHistoryKey]*stmtSummaryHistoryRecord
}

type stmtSummaryHistoryKey struct {
Expand All @@ -391,6 +461,9 @@ type stmtSummaryHistoryRecord struct {
planDigest string
sql string
binaryPlan string

plan string
skip bool
}

// GenerateExtractFile generates extract stmt file
Expand Down