Skip to content

Commit

Permalink
Merge branch 'master' into dumpling-compress
Browse files Browse the repository at this point in the history
  • Loading branch information
lichunzhu committed Nov 14, 2022
2 parents bdc2814 + 86d3b46 commit 5ba6b50
Show file tree
Hide file tree
Showing 60 changed files with 10,604 additions and 9,137 deletions.
6 changes: 4 additions & 2 deletions bindinfo/bind_test.go
Expand Up @@ -36,6 +36,7 @@ func TestPrepareCacheWithBinding(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec(`set tidb_enable_prepared_plan_cache=1`)
tk.MustExec("set tidb_cost_model_version=2")
tk.MustExec("use test")
tk.MustExec("drop table if exists t1, t2")
tk.MustExec("create table t1(a int, b int, c int, key idx_b(b), key idx_c(c))")
Expand Down Expand Up @@ -240,7 +241,7 @@ func TestPrepareCacheWithBinding(t *testing.T) {
ps = []*util.ProcessInfo{tkProcess}
tk.Session().SetSessionManager(&testkit.MockSessionManager{PS: ps})
res = tk.MustQuery("explain for connection " + strconv.FormatUint(tkProcess.ID, 10))
require.False(t, tk.HasPlan4ExplainFor(res, "IndexReader"))
require.True(t, tk.HasPlan4ExplainFor(res, "IndexReader"))
tk.MustExec("execute stmt1;")
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("1"))

Expand Down Expand Up @@ -297,6 +298,7 @@ func TestExplain(t *testing.T) {

tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("set tidb_cost_model_version=2")
tk.MustExec("drop table if exists t1")
tk.MustExec("drop table if exists t2")
tk.MustExec("create table t1(id int)")
Expand All @@ -313,7 +315,7 @@ func TestExplain(t *testing.T) {

// Add test for SetOprStmt
tk.MustExec("create index index_id on t1(id)")
require.False(t, tk.HasPlan("SELECT * from t1 union SELECT * from t1", "IndexReader"))
require.True(t, tk.HasPlan("SELECT * from t1 union SELECT * from t1", "IndexReader"))
require.True(t, tk.HasPlan("SELECT * from t1 use index(index_id) union SELECT * from t1", "IndexReader"))

tk.MustExec("create global binding for SELECT * from t1 union SELECT * from t1 using SELECT * from t1 use index(index_id) union SELECT * from t1")
Expand Down
4 changes: 3 additions & 1 deletion ddl/cluster_test.go
Expand Up @@ -49,7 +49,8 @@ func TestGetFlashbackKeyRanges(t *testing.T) {
// 3: (stats_extended)
// 4: (stats_fm_sketch)
// 5: (stats_history, stats_meta_history)
require.Len(t, kvRanges, 6)
// 6: (stats_table_locked)
require.Len(t, kvRanges, 7)

tk.MustExec("use test")
tk.MustExec("CREATE TABLE employees (" +
Expand All @@ -73,6 +74,7 @@ func TestGetFlashbackKeyRanges(t *testing.T) {
tk.MustExec("truncate table mysql.stats_fm_sketch")
tk.MustExec("truncate table mysql.stats_history")
tk.MustExec("truncate table mysql.stats_meta_history")
tk.MustExec("truncate table mysql.stats_table_locked")
kvRanges, err = ddl.GetFlashbackKeyRanges(se)
require.NoError(t, err)
require.Len(t, kvRanges, 2)
Expand Down
5 changes: 1 addition & 4 deletions ddl/ddl_api.go
Expand Up @@ -6542,10 +6542,7 @@ func CheckIsDropPrimaryKey(indexName model.CIStr, indexInfo *model.IndexInfo, t
if indexInfo == nil && !t.Meta().PKIsHandle {
return isPK, dbterror.ErrCantDropFieldOrKey.GenWithStackByArgs("PRIMARY")
}
if t.Meta().PKIsHandle {
return isPK, dbterror.ErrUnsupportedModifyPrimaryKey.GenWithStack("Unsupported drop primary key when the table's pkIsHandle is true")
}
if t.Meta().IsCommonHandle {
if t.Meta().IsCommonHandle || t.Meta().PKIsHandle {
return isPK, dbterror.ErrUnsupportedModifyPrimaryKey.GenWithStack("Unsupported drop primary key when the table is using clustered index")
}
}
Expand Down
6 changes: 6 additions & 0 deletions domain/domain.go
Expand Up @@ -1811,6 +1811,7 @@ func (do *Domain) updateStatsWorker(ctx sessionctx.Context, owner owner.Manager)
gcStatsTicker := time.NewTicker(100 * lease)
dumpFeedbackTicker := time.NewTicker(200 * lease)
loadFeedbackTicker := time.NewTicker(5 * lease)
loadLockedTablesTicker := time.NewTicker(5 * lease)
dumpColStatsUsageTicker := time.NewTicker(100 * lease)
readMemTricker := time.NewTicker(memory.ReadMemInterval)
statsHandle := do.StatsHandle()
Expand Down Expand Up @@ -1851,6 +1852,11 @@ func (do *Domain) updateStatsWorker(ctx sessionctx.Context, owner owner.Manager)
if err != nil {
logutil.BgLogger().Debug("update stats using feedback failed", zap.Error(err))
}
case <-loadLockedTablesTicker.C:
err := statsHandle.LoadLockedTables()
if err != nil {
logutil.BgLogger().Debug("load locked table failed", zap.Error(err))
}
case <-dumpFeedbackTicker.C:
err := statsHandle.DumpStatsFeedbackToKV()
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions executor/BUILD.bazel
Expand Up @@ -56,6 +56,7 @@ go_library(
"joiner.go",
"load_data.go",
"load_stats.go",
"lock_stats.go",
"mem_reader.go",
"memtable_reader.go",
"merge_join.go",
Expand Down
85 changes: 77 additions & 8 deletions executor/analyze.go
Expand Up @@ -82,27 +82,97 @@ const (

// Next implements the Executor Next interface.
func (e *AnalyzeExec) Next(ctx context.Context, _ *chunk.Chunk) error {
statsHandle := domain.GetDomain(e.ctx).StatsHandle()
var tasks []*analyzeTask
tids := make([]int64, 0)
skipedTables := make([]string, 0)
is := e.ctx.GetInfoSchema().(infoschema.InfoSchema)
for _, task := range e.tasks {
var tableID statistics.AnalyzeTableID
switch task.taskType {
case colTask:
tableID = task.colExec.tableID
case idxTask:
tableID = task.idxExec.tableID
case fastTask:
tableID = task.fastExec.tableID
case pkIncrementalTask:
tableID = task.colIncrementalExec.tableID
case idxIncrementalTask:
tableID = task.idxIncrementalExec.tableID
}
// skip locked tables
if !statsHandle.IsTableLocked(tableID.TableID) {
tasks = append(tasks, task)
}
// generate warning message
dup := false
for _, id := range tids {
if id == tableID.TableID {
dup = true
break
}
}
//avoid generate duplicate tables
if !dup {
if statsHandle.IsTableLocked(tableID.TableID) {
tbl, ok := is.TableByID(tableID.TableID)
if !ok {
return nil
}
skipedTables = append(skipedTables, tbl.Meta().Name.L)
}
tids = append(tids, tableID.TableID)
}
}

if len(skipedTables) > 0 {
tables := skipedTables[0]
for i, table := range skipedTables {
if i == 0 {
continue
}
tables += ", " + table
}
var msg string
if len(tids) > 1 {
if len(tids) > len(skipedTables) {
msg = "skip analyze locked tables: " + tables + ", other tables will be analyzed"
} else {
msg = "skip analyze locked tables: " + tables
}
} else {
msg = "skip analyze locked table: " + tables
}

e.ctx.GetSessionVars().StmtCtx.AppendWarning(errors.New(msg))
}

if len(tasks) == 0 {
return nil
}

concurrency, err := getBuildStatsConcurrency(e.ctx)
if err != nil {
return err
}
taskCh := make(chan *analyzeTask, len(e.tasks))
resultsCh := make(chan *statistics.AnalyzeResults, len(e.tasks))
if len(e.tasks) < concurrency {
concurrency = len(e.tasks)
taskCh := make(chan *analyzeTask, len(tasks))
resultsCh := make(chan *statistics.AnalyzeResults, len(tasks))
if len(tasks) < concurrency {
concurrency = len(tasks)
}
for i := 0; i < concurrency; i++ {
e.wg.Run(func() { e.analyzeWorker(taskCh, resultsCh) })
}
for _, task := range e.tasks {
for _, task := range tasks {
prepareV2AnalyzeJobInfo(task.colExec, false)
AddNewAnalyzeJob(e.ctx, task.job)
}
failpoint.Inject("mockKillPendingAnalyzeJob", func() {
dom := domain.GetDomain(e.ctx)
dom.SysProcTracker().KillSysProcess(util.GetAutoAnalyzeProcID(dom.ServerID))
})
for _, task := range e.tasks {
for _, task := range tasks {
taskCh <- task
}
close(taskCh)
Expand All @@ -113,7 +183,7 @@ func (e *AnalyzeExec) Next(ctx context.Context, _ *chunk.Chunk) error {
needGlobalStats := pruneMode == variable.Dynamic
globalStatsMap := make(map[globalStatsKey]globalStatsInfo)
err = e.handleResultsError(ctx, concurrency, needGlobalStats, globalStatsMap, resultsCh)
for _, task := range e.tasks {
for _, task := range tasks {
if task.colExec != nil && task.colExec.memTracker != nil {
task.colExec.memTracker.Detach()
}
Expand All @@ -135,7 +205,6 @@ func (e *AnalyzeExec) Next(ctx context.Context, _ *chunk.Chunk) error {
if err != nil {
e.ctx.GetSessionVars().StmtCtx.AppendWarning(err)
}
statsHandle := domain.GetDomain(e.ctx).StatsHandle()
if e.ctx.GetSessionVars().InRestrictedSQL {
return statsHandle.Update(e.ctx.GetInfoSchema().(infoschema.InfoSchema))
}
Expand Down
12 changes: 11 additions & 1 deletion executor/analyze_global_stats.go
Expand Up @@ -83,7 +83,17 @@ func (e *AnalyzeExec) handleGlobalStats(ctx context.Context, needGlobalStats boo
for i := 0; i < globalStats.Num; i++ {
hg, cms, topN := globalStats.Hg[i], globalStats.Cms[i], globalStats.TopN[i]
// fms for global stats doesn't need to dump to kv.
err = statsHandle.SaveStatsToStorage(globalStatsID.tableID, globalStats.Count, info.isIndex, hg, cms, topN, info.statsVersion, 1, true)
err = statsHandle.SaveStatsToStorage(globalStatsID.tableID,
globalStats.Count,
globalStats.ModifyCount,
info.isIndex,
hg,
cms,
topN,
info.statsVersion,
1,
true,
)
if err != nil {
logutil.Logger(ctx).Error("save global-level stats to storage failed", zap.Error(err))
}
Expand Down
20 changes: 20 additions & 0 deletions executor/builder.go
Expand Up @@ -201,6 +201,10 @@ func (b *executorBuilder) build(p plannercore.Plan) Executor {
return b.buildLoadData(v)
case *plannercore.LoadStats:
return b.buildLoadStats(v)
case *plannercore.LockStats:
return b.buildLockStats(v)
case *plannercore.UnlockStats:
return b.buildUnlockStats(v)
case *plannercore.IndexAdvise:
return b.buildIndexAdvise(v)
case *plannercore.PlanReplayer:
Expand Down Expand Up @@ -960,6 +964,22 @@ func (b *executorBuilder) buildLoadStats(v *plannercore.LoadStats) Executor {
return e
}

func (b *executorBuilder) buildLockStats(v *plannercore.LockStats) Executor {
e := &LockStatsExec{
baseExecutor: newBaseExecutor(b.ctx, nil, v.ID()),
Tables: v.Tables,
}
return e
}

func (b *executorBuilder) buildUnlockStats(v *plannercore.UnlockStats) Executor {
e := &UnlockStatsExec{
baseExecutor: newBaseExecutor(b.ctx, nil, v.ID()),
Tables: v.Tables,
}
return e
}

func (b *executorBuilder) buildIndexAdvise(v *plannercore.IndexAdvise) Executor {
e := &IndexAdviseExec{
baseExecutor: newBaseExecutor(b.ctx, nil, v.ID()),
Expand Down
5 changes: 4 additions & 1 deletion executor/executor_test.go
Expand Up @@ -1452,6 +1452,7 @@ func TestSetOperation(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec(`use test`)
tk.MustExec("set tidb_cost_model_version=2")
tk.MustExec(`drop table if exists t1, t2, t3`)
tk.MustExec(`create table t1(a int)`)
tk.MustExec(`create table t2 like t1`)
Expand Down Expand Up @@ -1513,6 +1514,7 @@ func TestIndexScanWithYearCol(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test;")
tk.MustExec("set tidb_cost_model_version=2")
tk.MustExec("drop table if exists t;")
tk.MustExec("create table t (c1 year(4), c2 int, key(c1));")
tk.MustExec("insert into t values(2001, 1);")
Expand Down Expand Up @@ -3332,6 +3334,7 @@ func TestUnreasonablyClose(t *testing.T) {
is := infoschema.MockInfoSchema([]*model.TableInfo{plannercore.MockSignedTable(), plannercore.MockUnsignedTable()})
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("set tidb_cost_model_version=2")
// To enable the shuffleExec operator.
tk.MustExec("set @@tidb_merge_join_concurrency=4")

Expand Down Expand Up @@ -3372,7 +3375,7 @@ func TestUnreasonablyClose(t *testing.T) {
"select /*+ inl_hash_join(t1) */ * from t t1 join t t2 on t1.f=t2.f",
"SELECT count(1) FROM (SELECT (SELECT min(a) FROM t as t2 WHERE t2.a > t1.a) AS a from t as t1) t",
"select /*+ hash_agg() */ count(f) from t group by a",
"select /*+ stream_agg() */ count(f) from t group by a",
"select /*+ stream_agg() */ count(f) from t",
"select * from t order by a, f",
"select * from t order by a, f limit 1",
"select * from t limit 1",
Expand Down
2 changes: 1 addition & 1 deletion executor/infoschema_cluster_table_test.go
Expand Up @@ -290,7 +290,7 @@ func TestTableStorageStats(t *testing.T) {
"test 2",
))
rows := tk.MustQuery("select TABLE_NAME from information_schema.TABLE_STORAGE_STATS where TABLE_SCHEMA = 'mysql';").Rows()
result := 39
result := 40
require.Len(t, rows, result)

// More tests about the privileges.
Expand Down
50 changes: 47 additions & 3 deletions executor/infoschema_reader_test.go
Expand Up @@ -672,9 +672,9 @@ func TestTiFlashSystemTableWithTiFlashV620(t *testing.T) {
tk.MustQuery("show warnings").Check(testkit.Rows())

tk.MustQuery("select * from information_schema.TIFLASH_TABLES;").Check(testkit.Rows(
"db_1 t_10 mysql tables_priv 10 0 1 0 0 0 <nil> 0 <nil> 0 <nil> <nil> 0 0 0 0 0 0 <nil> <nil> <nil> 0 0 0 0 <nil> <nil> 0 <nil> <nil> <nil> 0 <nil> <nil> <nil> 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 127.0.0.1:3933",
"db_1 t_8 mysql db 8 0 1 0 0 0 <nil> 0 <nil> 0 <nil> <nil> 0 0 0 0 0 0 <nil> <nil> <nil> 0 0 0 0 <nil> <nil> 0 <nil> <nil> <nil> 0 <nil> <nil> <nil> 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 127.0.0.1:3933",
"db_2 t_70 test segment 70 0 1 102000 169873868 0 0 0 <nil> 0 <nil> <nil> 0 102000 169873868 0 0 0 <nil> <nil> <nil> 1 102000 169873868 43867622 102000 169873868 0 <nil> <nil> <nil> 13 13 7846.153846153846 13067220.615384616 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 127.0.0.1:3933",
"db_1 t_10 mysql tables_priv 10 0 1 0 0 0 <nil> 0 <nil> 0 <nil> <nil> 0 0 0 0 0 0 <nil> <nil> <nil> 0 0 0 0 <nil> <nil> 0 <nil> <nil> <nil> <nil> 0 <nil> <nil> <nil> 0 0 0 0 0 0 0 0 0 0 127.0.0.1:3933",
"db_1 t_8 mysql db 8 0 1 0 0 0 <nil> 0 <nil> 0 <nil> <nil> 0 0 0 0 0 0 <nil> <nil> <nil> 0 0 0 0 <nil> <nil> 0 <nil> <nil> <nil> <nil> 0 <nil> <nil> <nil> 0 0 0 0 0 0 0 0 0 0 127.0.0.1:3933",
"db_2 t_70 test segment 70 0 1 102000 169873868 0 0 0 <nil> 0 <nil> <nil> 0 102000 169873868 0 0 0 <nil> <nil> <nil> 1 102000 169873868 43867622 102000 169873868 0 <nil> <nil> <nil> <nil> 13 13 7846.153846153846 13067220.615384616 0 0 0 0 0 0 0 0 0 0 127.0.0.1:3933",
))
tk.MustQuery("show warnings").Check(testkit.Rows())
}
Expand Down Expand Up @@ -723,6 +723,50 @@ func TestTiFlashSystemTableWithTiFlashV630(t *testing.T) {
tk.MustQuery("show warnings").Check(testkit.Rows())
}

func TestTiFlashSystemTableWithTiFlashV640(t *testing.T) {
httpmock.Activate()
defer httpmock.DeactivateAndReset()

instances := []string{
"tiflash,127.0.0.1:3933,127.0.0.1:7777,,",
"tikv,127.0.0.1:11080,127.0.0.1:10080,,",
}
fpName := "github.com/pingcap/tidb/infoschema/mockStoreServerInfo"
fpExpr := `return("` + strings.Join(instances, ";") + `")`
require.NoError(t, failpoint.Enable(fpName, fpExpr))
defer func() { require.NoError(t, failpoint.Disable(fpName)) }()

httpmock.RegisterResponder("GET", "http://127.0.0.1:7777/config",
httpmock.NewStringResponder(200, `
{
"raftstore-proxy": {},
"engine-store":{
"http_port":8123,
"tcp_port":9000
}
}
`))

data, err := os.ReadFile("testdata/tiflash_v640_dt_tables.json")
require.NoError(t, err)
httpmock.RegisterResponder("GET", "http://127.0.0.1:8123?default_format=JSONCompact&query=SELECT+%2A+FROM+system.dt_tables+LIMIT+0%2C+1024", httpmock.NewBytesResponder(200, data))

store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustQuery("select * from information_schema.TIFLASH_TABLES;").Check(testkit.Rows(
"db_70 t_135 tpcc customer 135 0 4 3528714 2464079200 0 0.002329177144988231 1 0 929227 0.16169850346757514 0 8128 882178.5 616019800 4 8219 5747810 2054.75 1436952.5 0 4 3520495 2458331390 1601563417 880123.75 614582847.5 24 8 6 342.4583333333333 239492.08333333334 482 120.5 7303.9315352697095 5100272.593360996 0 0 0 0 0 0 0 0 0 0 127.0.0.1:3933",
"db_70 t_137 tpcc district 137 0 1 7993 1346259 0 0.8748905292130614 1 0.8055198055198055 252168 0.21407121407121407 0 147272 7993 1346259 1 6993 1178050 6993 1178050 0 1 1000 168209 91344 1000 168209 6 6 6 1165.5 196341.66666666666 10 10 100 16820.9 0 0 0 0 0 0 0 0 0 0 127.0.0.1:3933",
"db_70 t_139 tpcc history 139 0 19 19379697 1629276978 0 0.0006053758219233252 0.5789473684210527 0.4626662120695534 253640 0.25434708489601093 0 293544 1019984.052631579 85751419.89473684 11 11732 997220 1066.5454545454545 90656.36363636363 0 19 19367965 1628279758 625147717 1019366.5789473684 85698934.63157895 15 4 1.3636363636363635 782.1333333333333 66481.33333333333 2378 125.15789473684211 8144.644659377628 684726.559293524 0 0 0 0 0 0 0 0 0 0 127.0.0.1:3933",
"db_70 t_141 tpcc item 141 0 1 100000 10799081 0 0 0 <nil> 0 <nil> <nil> 0 100000 10799081 0 0 0 <nil> <nil> <nil> 1 100000 10799081 7357726 100000 10799081 0 0 <nil> <nil> <nil> 13 13 7692.307692307692 830698.5384615385 0 0 0 0 0 0 0 0 0 0 127.0.0.1:3933",
"db_70 t_143 tpcc new_order 143 0 4 2717707 78813503 0 0.02266763856442214 1 0.9678592299201351 52809 0.029559768846178818 0 1434208 679426.75 19703375.75 4 61604 1786516 15401 446629 0 3 2656103 77026987 40906492 885367.6666666666 25675662.333333332 37 24 9.25 1664.972972972973 48284.21621621621 380 126.66666666666667 6989.744736842105 202702.59736842106 0 0 0 0 0 0 0 0 0 0 127.0.0.1:3933",
"db_70 t_145 tpcc order_line 145 0 203 210607202 20007684190 0 0.0054566462546708164 0.5862068965517241 0.7810067620424135 620065 0.005679558722564825 0 22607144 1037473.9014778325 98560020.64039409 119 1149209 109174855 9657.218487394957 917435.756302521 0 203 209457993 19898509335 8724002804 1031812.7733990147 98022213.47290641 893 39 7.504201680672269 1286.9081746920492 122256.27659574468 31507 155.20689655172413 6647.982765734599 631558.3627447869 0 0 0 0 0 0 0 0 0 0 127.0.0.1:3933",
"db_70 t_147 tpcc orders 147 0 22 21903301 1270391458 0 0.02021357420052804 0.7272727272727273 0.9239944527763222 260536 0.010145817899282655 0 10025264 995604.5909090909 57745066.27272727 16 442744 25679152 27671.5 1604947 0 22 21460557 1244712306 452173775 975479.8636363636 56577832.09090909 242 34 15.125 1829.5206611570247 106112.19834710743 2973 135.13636363636363 7218.485368314833 418672.15136226034 0 0 0 0 0 0 0 0 0 0 127.0.0.1:3933",
"db_70 t_149 tpcc stock 149 0 42 11112720 4811805131 0 0.028085203262567582 0.9761904761904762 0.8463391893060944 10227093 0.07567373591410528 0 6719064 264588.5714285714 114566788.83333333 41 312103 135131097 7612.268292682927 3295880.4146341463 0 42 10800617 4676674034 3231872509 257157.54761904763 111349381.76190476 238 26 5.804878048780488 1311.357142857143 567777.718487395 1644 39.142857142857146 6569.718369829684 2844692.234793187 0 0 0 0 0 0 0 0 0 0 127.0.0.1:3933",
"db_70 t_151 tpcc warehouse 151 0 1 5842 923615 0 0.9828825744608011 1 0.9669104841518634 70220 0.07732497387669801 0 133048 5842 923615 1 5742 907807 5742 907807 0 1 100 15808 11642 100 15808 5 5 5 1148.4 181561.4 5 5 20 3161.6 0 0 0 0 0 0 0 0 0 0 127.0.0.1:3933",
))
tk.MustQuery("show warnings").Check(testkit.Rows())
}

func TestTablesPKType(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
Expand Down

0 comments on commit 5ba6b50

Please sign in to comment.