Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: skip empty partition table to merge global stats #53162

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
24 changes: 20 additions & 4 deletions pkg/executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"net"
"strconv"
"strings"
"sync/atomic"
"time"

"github.com/pingcap/errors"
Expand Down Expand Up @@ -64,6 +65,7 @@ type AnalyzeExec struct {
gp *gp.Pool
// errExitCh is used to notice the worker that the whole analyze task is finished when to meet error.
errExitCh chan struct{}
notEmpty atomic.Bool
}

var (
Expand Down Expand Up @@ -160,9 +162,18 @@ TASKLOOP:
})
// If we enabled dynamic prune mode, then we need to generate global stats here for partition tables.
if needGlobalStats {
err = e.handleGlobalStats(ctx, globalStatsMap)
if err != nil {
return err
if e.notEmpty.Load() {
err = e.handleGlobalStats(ctx, globalStatsMap)
if err != nil {
return err
}
} else {
for globalStatsID := range globalStatsMap {
err = statsHandle.SaveMetaToStorage(globalStatsID.tableID, 0, 0, handleutil.StatsMetaHistorySourceAnalyze)
if err != nil {
return err
}
}
}
}

Expand Down Expand Up @@ -423,7 +434,9 @@ func (e *AnalyzeExec) handleResultsError(
}
handleGlobalStats(needGlobalStats, globalStatsMap, results)
tableIDs[results.TableID.GetStatisticsID()] = struct{}{}

if results.Job.Progress.GetProcessRow() != 0 {
e.notEmpty.Store(true)
}
if err1 := statsHandle.SaveTableStatsToStorage(results, e.Ctx().GetSessionVars().EnableAnalyzeSnapshot, handleutil.StatsMetaHistorySourceAnalyze); err1 != nil {
tableID := results.TableID.TableID
err = err1
Expand Down Expand Up @@ -488,6 +501,9 @@ func (e *AnalyzeExec) handleResultsErrorWithConcurrency(ctx context.Context, sta
}
handleGlobalStats(needGlobalStats, globalStatsMap, results)
tableIDs[results.TableID.GetStatisticsID()] = struct{}{}
if results.Job.Progress.GetProcessRow() != 0 {
e.notEmpty.Store(true)
}
saveResultsCh <- results
}
close(saveResultsCh)
Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/test/analyzetest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ go_test(
"main_test.go",
],
flaky = True,
shard_count = 48,
shard_count = 49,
deps = [
"//pkg/config",
"//pkg/domain",
Expand Down
18 changes: 18 additions & 0 deletions pkg/executor/test/analyzetest/analyze_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3127,3 +3127,21 @@ func TestAnalyzePartitionVerify(t *testing.T) {
}
}
}

func TestAnalyzeEmptryPartitionTable(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
sql := "create table t(a int,b varchar(100),c int,INDEX idx_c(c)) PARTITION BY RANGE ( a ) ("
for n := 100; n < 1000; n = n + 100 {
sql += "PARTITION p" + fmt.Sprint(n) + " VALUES LESS THAN (" + fmt.Sprint(n) + "),"
}
sql += "PARTITION p" + fmt.Sprint(1000) + " VALUES LESS THAN MAXVALUE)"
tk.MustExec(sql)
tk.MustExec("set @@tidb_analyze_partition_concurrency=1")
tk.MustExec("analyze table t")
tk.MustQuery("select * from mysql.analyze_jobs where job_info like \"%merge%\"").Check(testkit.Rows())
tk.MustExec("set @@tidb_analyze_partition_concurrency=2")
tk.MustExec("analyze table t")
tk.MustQuery("select * from mysql.analyze_jobs where job_info like \"%merge%\"").Check(testkit.Rows())
}
7 changes: 7 additions & 0 deletions pkg/statistics/analyze_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,15 @@ type AnalyzeProgress struct {

// deltaCount is the newly processed rows after the last time mysql.analyze_jobs.processed_rows is updated.
deltaCount atomic.Int64
count atomic.Int64
}

// Update adds rowCount to the delta count. If the updated delta count reaches threshold, it returns the delta count for
// dumping it into mysql.analyze_jobs and resets the delta count to 0. Otherwise, it returns 0.
func (p *AnalyzeProgress) Update(rowCount int64) int64 {
dumpCount := int64(0)
newCount := p.deltaCount.Add(rowCount)
p.count.Add(rowCount)

t := time.Now()
p.lastDumpTimeMu.Lock()
Expand Down Expand Up @@ -99,3 +101,8 @@ func (p *AnalyzeProgress) GetLastDumpTime() time.Time {
defer p.lastDumpTimeMu.RUnlock()
return p.lastDumpTime
}

// GetProcessRow returns the total processed rows.
func (p *AnalyzeProgress) GetProcessRow() int64 {
return p.count.Load()
}
21 changes: 12 additions & 9 deletions pkg/statistics/handle/globalstats/global_stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,19 +224,22 @@ partition by range (a) (
tk.MustExec("set @@tidb_analyze_version=2")
tk.MustExec("set @@tidb_partition_prune_mode='dynamic'")
tk.MustExec("analyze table t")
checkModifyAndCount(0, 0, 0, 0, 0, 0)
checkHealthy(100, 100, 100)
rs := tk.MustQuery("show stats_meta").Rows()
require.Equal(t, "0", rs[0][4].(string)) // p0.modify_count
require.Equal(t, "0", rs[0][5].(string)) // p0.row_count
require.Equal(t, "0", rs[1][4].(string)) // p1.modify_count
require.Equal(t, "0", rs[1][5].(string)) // p1.row_count
tk.MustQuery("show stats_healthy").Check(testkit.Rows("test t p0 100", "test t p1 100"))

tk.MustExec("insert into t values (1), (2)") // update p0
require.NoError(t, dom.StatsHandle().DumpStatsDeltaToKV(true))
require.NoError(t, dom.StatsHandle().Update(dom.InfoSchema()))
checkModifyAndCount(2, 2, 2, 2, 0, 0)
checkHealthy(0, 0, 100)
tk.MustExec("analyze table t")
checkModifyAndCount(0, 2, 0, 2, 0, 0)
checkHealthy(100, 100, 100)

tk.MustExec("insert into t values (11), (12), (13), (14)") // update p1
require.NoError(t, dom.StatsHandle().DumpStatsDeltaToKV(true))
require.NoError(t, dom.StatsHandle().Update(dom.InfoSchema()))
checkModifyAndCount(6, 6, 2, 2, 4, 4)
checkModifyAndCount(6, 8, 2, 4, 4, 4)
checkHealthy(0, 0, 0)

tk.MustExec("analyze table t")
Expand Down Expand Up @@ -626,14 +629,14 @@ func TestGlobalStatsNDV(t *testing.T) {
checkNDV := func(ndvs ...int) { // g, p0, ..., p3
tk.MustExec("analyze table t")
rs := tk.MustQuery(`show stats_histograms where is_index=1`).Rows()
require.Len(t, rs, 5)
require.Len(t, rs, len(ndvs))
for i, ndv := range ndvs {
require.Equal(t, fmt.Sprintf("%v", ndv), rs[i][6].(string))
}
}

// all partitions are empty
checkNDV(0, 0, 0, 0, 0)
checkNDV(0, 0, 0, 0)

// p0 has data while others are empty
tk.MustExec("insert into t values (1), (2), (3)")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,7 @@ func TestUnlockSomePartitionsWouldUpdateGlobalCountCorrectly(t *testing.T) {
tk.MustExec("insert into t(a, b) values(2,'b')")
tk.MustExec("analyze table test.t partition p0, p1")
tblStats := h.GetTableStats(tbl)
require.Equal(t, int64(0), tblStats.RealtimeCount)
require.Equal(t, int64(10000), tblStats.RealtimeCount)

// Dump stats delta to KV.
require.Nil(t, h.DumpStatsDeltaToKV(true))
Expand All @@ -499,11 +499,12 @@ func TestUnlockSomePartitionsWouldUpdateGlobalCountCorrectly(t *testing.T) {

// Unlock partition p0 and p1.
tk.MustExec("unlock stats t partition p0, p1")
tk.MustExec("analyze table test.t partition p0, p1")
// Check the global count is updated correctly.
rows = tk.MustQuery(fmt.Sprint("select count, modify_count, table_id from mysql.stats_meta where table_id = ", tbl.ID)).Rows()
require.Len(t, rows, 1)
require.Equal(t, "2", rows[0][0])
require.Equal(t, "2", rows[0][1])
require.Equal(t, "0", rows[0][1])
}

func setupTestEnvironmentWithPartitionedTableT(t *testing.T) (kv.Storage, *domain.Domain, *testkit.TestKit, *model.TableInfo) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ func TestUnlockPartitionedTableWouldUpdateGlobalCountCorrectly(t *testing.T) {
tk.MustExec("insert into t(a, b) values(2,'b')")
tk.MustExec("analyze table test.t")
tblStats := h.GetTableStats(tbl)
require.Equal(t, int64(0), tblStats.RealtimeCount)
require.Equal(t, int64(10000), tblStats.RealtimeCount)

// Dump stats delta to KV.
require.Nil(t, h.DumpStatsDeltaToKV(true))
Expand All @@ -297,11 +297,12 @@ func TestUnlockPartitionedTableWouldUpdateGlobalCountCorrectly(t *testing.T) {

// Unlock the table.
tk.MustExec("unlock stats t")
tk.MustExec("analyze table test.t")
// Check the global count is updated correctly.
rows = tk.MustQuery(fmt.Sprint("select count, modify_count from mysql.stats_meta where table_id = ", tbl.ID)).Rows()
require.Len(t, rows, 1)
require.Equal(t, "2", rows[0][0])
require.Equal(t, "2", rows[0][1])
require.Equal(t, "0", rows[0][1])
}

func TestDeltaInLockInfoCanBeNegative(t *testing.T) {
Expand Down