Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#49620
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
hi-rustin authored and ti-chi-bot committed Dec 21, 2023
1 parent aab2da0 commit 6b0830b
Show file tree
Hide file tree
Showing 3 changed files with 257 additions and 11 deletions.
43 changes: 32 additions & 11 deletions domain/plan_replayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,33 +157,54 @@ func insertPlanReplayerStatus(ctx context.Context, sctx sessionctx.Context, reco

func insertPlanReplayerErrorStatusRecord(ctx context.Context, sctx sessionctx.Context, instance string, record PlanReplayerStatusRecord) {
exec := sctx.(sqlexec.RestrictedSQLExecutor)
_, _, err := exec.ExecRestrictedSQL(ctx, nil, fmt.Sprintf(
"insert into mysql.plan_replayer_status (sql_digest, plan_digest, origin_sql, fail_reason, instance) values ('%s','%s','%s','%s','%s')",
record.SQLDigest, record.PlanDigest, record.OriginSQL, record.FailedReason, instance))
_, _, err := exec.ExecRestrictedSQL(
ctx, nil,
"insert into mysql.plan_replayer_status (sql_digest, plan_digest, origin_sql, fail_reason, instance) values (%?,%?,%?,%?,%?)",
record.SQLDigest, record.PlanDigest, record.OriginSQL, record.FailedReason, instance,
)
if err != nil {
logutil.BgLogger().Warn("insert mysql.plan_replayer_status record failed",
zap.String("sqlDigest", record.SQLDigest),
zap.String("planDigest", record.PlanDigest),
zap.String("sql", record.OriginSQL),
zap.String("failReason", record.FailedReason),
zap.String("instance", instance),
zap.Error(err))
}
}

func insertPlanReplayerSuccessStatusRecord(ctx context.Context, sctx sessionctx.Context, instance string, record PlanReplayerStatusRecord) {
exec := sctx.(sqlexec.RestrictedSQLExecutor)
_, _, err := exec.ExecRestrictedSQL(ctx, nil, fmt.Sprintf(
"insert into mysql.plan_replayer_status (sql_digest, plan_digest, origin_sql, token, instance) values ('%s','%s','%s','%s','%s')",
record.SQLDigest, record.PlanDigest, record.OriginSQL, record.Token, instance))
_, _, err := exec.ExecRestrictedSQL(
ctx,
nil,
"insert into mysql.plan_replayer_status (sql_digest, plan_digest, origin_sql, token, instance) values (%?,%?,%?,%?,%?)",
record.SQLDigest, record.PlanDigest, record.OriginSQL, record.Token, instance,
)
if err != nil {
logutil.BgLogger().Warn("insert mysql.plan_replayer_status record failed",
zap.String("sqlDigest", record.SQLDigest),
zap.String("planDigest", record.PlanDigest),
zap.String("sql", record.OriginSQL),
zap.Error(err))
zap.String("token", record.Token),
zap.String("instance", instance),
zap.Error(err),
)
// try insert record without original sql
_, _, err = exec.ExecRestrictedSQL(ctx, nil, fmt.Sprintf(
"insert into mysql.plan_replayer_status (sql_digest, plan_digest, token, instance) values ('%s','%s','%s','%s')",
record.SQLDigest, record.PlanDigest, record.Token, instance))
_, _, err = exec.ExecRestrictedSQL(
ctx,
nil,
"insert into mysql.plan_replayer_status (sql_digest, plan_digest, token, instance) values (%?,%?,%?,%?)",
record.SQLDigest, record.PlanDigest, record.Token, instance,
)
if err != nil {
logutil.BgLogger().Warn("insert mysql.plan_replayer_status record failed",
zap.String("sqlDigest", record.SQLDigest),
zap.String("planDigest", record.PlanDigest),
zap.Error(err))
zap.String("token", record.Token),
zap.String("instance", instance),
zap.Error(err),
)
}
}
}
Expand Down
56 changes: 56 additions & 0 deletions domain/plan_replayer_handle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,3 +147,59 @@ func TestPlanReplayerGC(t *testing.T) {
require.NotNil(t, err)
require.True(t, os.IsNotExist(err))
}

func TestInsertPlanReplayerStatus(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
prHandle := dom.GetPlanReplayerHandle()
tk.MustExec("use test")
tk.MustExec(`
CREATE TABLE tableA (
columnA VARCHAR(255),
columnB DATETIME,
columnC VARCHAR(255)
)`)

// This is a single quote in the sql.
// We should escape it correctly.
sql := `
SELECT * from tableA where SUBSTRING_INDEX(tableA.columnC, '_', 1) = tableA.columnA
`

tk.MustQuery(sql)
_, d := tk.Session().GetSessionVars().StmtCtx.SQLDigest()
_, pd := tk.Session().GetSessionVars().StmtCtx.GetPlanDigest()
sqlDigest := d.String()
planDigest := pd.String()

// Register task
tk.MustExec("delete from mysql.plan_replayer_task")
tk.MustExec("delete from mysql.plan_replayer_status")
tk.MustExec(fmt.Sprintf("insert into mysql.plan_replayer_task (sql_digest, plan_digest) values ('%v','%v');", sqlDigest, planDigest))
err := prHandle.CollectPlanReplayerTask()
require.NoError(t, err)
require.Len(t, prHandle.GetTasks(), 1)

tk.MustExec("SET @@tidb_enable_plan_replayer_capture = ON;")

// Capture task and dump
tk.MustQuery(sql)
task := prHandle.DrainTask()
require.NotNil(t, task)
worker := prHandle.GetWorker()
success := worker.HandleTask(task)
defer os.RemoveAll(replayer.GetPlanReplayerDirName())
require.True(t, success)
require.Equal(t, prHandle.GetTaskStatus().GetRunningTaskStatusLen(), 0)
// assert memory task consumed
require.Len(t, prHandle.GetTasks(), 0)

// Check the plan_replayer_status.
// We should store the origin sql correctly.
rows := tk.MustQuery(
"select * from mysql.plan_replayer_status where sql_digest = ? and plan_digest = ? and origin_sql is not null",
sqlDigest,
planDigest,
).Rows()
require.Len(t, rows, 1)
}
169 changes: 169 additions & 0 deletions pkg/domain/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "domain",
srcs = [
"domain.go",
"domain_sysvars.go",
"domainctx.go",
"extract.go",
"historical_stats.go",
"optimize_trace.go",
"plan_replayer.go",
"plan_replayer_dump.go",
"runaway.go",
"schema_checker.go",
"schema_validator.go",
"sysvar_cache.go",
"test_helper.go",
"topn_slow_query.go",
],
importpath = "github.com/pingcap/tidb/pkg/domain",
visibility = ["//visibility:public"],
deps = [
"//br/pkg/streamhelper",
"//br/pkg/streamhelper/daemon",
"//pkg/bindinfo",
"//pkg/config",
"//pkg/ddl",
"//pkg/ddl/placement",
"//pkg/ddl/schematracker",
"//pkg/ddl/util",
"//pkg/disttask/framework/scheduler",
"//pkg/disttask/framework/storage",
"//pkg/disttask/framework/taskexecutor",
"//pkg/domain/globalconfigsync",
"//pkg/domain/infosync",
"//pkg/domain/metrics",
"//pkg/domain/resourcegroup",
"//pkg/errno",
"//pkg/infoschema",
"//pkg/infoschema/metrics",
"//pkg/infoschema/perfschema",
"//pkg/keyspace",
"//pkg/kv",
"//pkg/meta",
"//pkg/meta/autoid",
"//pkg/metrics",
"//pkg/owner",
"//pkg/parser",
"//pkg/parser/ast",
"//pkg/parser/model",
"//pkg/parser/mysql",
"//pkg/parser/terror",
"//pkg/privilege/privileges",
"//pkg/sessionctx",
"//pkg/sessionctx/sessionstates",
"//pkg/sessionctx/variable",
"//pkg/statistics/handle",
"//pkg/statistics/handle/logutil",
"//pkg/statistics/handle/util",
"//pkg/store/helper",
"//pkg/telemetry",
"//pkg/ttl/cache",
"//pkg/ttl/sqlbuilder",
"//pkg/ttl/ttlworker",
"//pkg/types",
"//pkg/util",
"//pkg/util/chunk",
"//pkg/util/dbterror",
"//pkg/util/disttask",
"//pkg/util/domainutil",
"//pkg/util/engine",
"//pkg/util/etcd",
"//pkg/util/execdetails",
"//pkg/util/expensivequery",
"//pkg/util/gctuner",
"//pkg/util/globalconn",
"//pkg/util/intest",
"//pkg/util/logutil",
"//pkg/util/memory",
"//pkg/util/memoryusagealarm",
"//pkg/util/printer",
"//pkg/util/replayer",
"//pkg/util/servermemorylimit",
"//pkg/util/sqlexec",
"//pkg/util/sqlkiller",
"//pkg/util/syncutil",
"@com_github_burntsushi_toml//:toml",
"@com_github_ngaut_pools//:pools",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_kvproto//pkg/metapb",
"@com_github_pingcap_kvproto//pkg/pdpb",
"@com_github_pingcap_kvproto//pkg/resource_manager",
"@com_github_pingcap_log//:log",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//oracle",
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_client_go_v2//txnkv/transaction",
"@com_github_tikv_pd_client//:client",
"@com_github_tikv_pd_client//http",
"@com_github_tikv_pd_client//resource_group/controller",
"@io_etcd_go_etcd_client_v3//:client",
"@io_etcd_go_etcd_client_v3//concurrency",
"@org_golang_google_grpc//:grpc",
"@org_golang_google_grpc//backoff",
"@org_golang_google_grpc//keepalive",
"@org_uber_go_atomic//:atomic",
"@org_uber_go_zap//:zap",
],
)

go_test(
name = "domain_test",
timeout = "short",
srcs = [
"db_test.go",
"domain_test.go",
"domain_utils_test.go",
"domainctx_test.go",
"extract_test.go",
"main_test.go",
"plan_replayer_handle_test.go",
"plan_replayer_test.go",
"schema_checker_test.go",
"schema_validator_test.go",
"session_pool_test.go",
"topn_slow_query_test.go",
],
embed = [":domain"],
flaky = True,
shard_count = 25,
deps = [
"//pkg/config",
"//pkg/ddl",
"//pkg/domain/infosync",
"//pkg/errno",
"//pkg/keyspace",
"//pkg/kv",
"//pkg/metrics",
"//pkg/parser/ast",
"//pkg/parser/auth",
"//pkg/parser/model",
"//pkg/parser/mysql",
"//pkg/parser/terror",
"//pkg/server",
"//pkg/session",
"//pkg/sessionctx/variable",
"//pkg/store/mockstore",
"//pkg/testkit",
"//pkg/testkit/testsetup",
"//pkg/types",
"//pkg/util",
"//pkg/util/mock",
"//pkg/util/replayer",
"//pkg/util/stmtsummary/v2:stmtsummary",
"@com_github_ngaut_pools//:pools",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_kvproto//pkg/metapb",
"@com_github_prometheus_client_model//go",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//oracle",
"@com_github_tikv_client_go_v2//txnkv/transaction",
"@com_github_tikv_pd_client//:client",
"@io_etcd_go_etcd_tests_v3//integration",
"@org_uber_go_goleak//:goleak",
],
)

0 comments on commit 6b0830b

Please sign in to comment.