Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: add remain time for showAnalyzeStatus #43866

Merged
merged 22 commits into from May 25, 2023
Merged
1 change: 1 addition & 0 deletions executor/BUILD.bazel
Expand Up @@ -187,6 +187,7 @@ go_library(
"//util/gcutil",
"//util/hack",
"//util/hint",
"//util/intest",
"//util/keydecoder",
"//util/kvcache",
"//util/logutil",
Expand Down
53 changes: 7 additions & 46 deletions executor/builder.go
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/executor/aggfuncs"
"github.com/pingcap/tidb/executor/internal/builder"
internalutil "github.com/pingcap/tidb/executor/internal/util"
executor_metrics "github.com/pingcap/tidb/executor/metrics"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/expression/aggregation"
Expand All @@ -52,7 +53,6 @@ import (
"github.com/pingcap/tidb/sessiontxn"
"github.com/pingcap/tidb/sessiontxn/staleread"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/store/helper"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/table/temptable"
Expand All @@ -67,7 +67,6 @@ import (
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/ranger"
"github.com/pingcap/tidb/util/rowcodec"
"github.com/pingcap/tidb/util/sqlexec"
"github.com/pingcap/tidb/util/timeutil"
"github.com/pingcap/tipb/go-tipb"
clientkv "github.com/tikv/client-go/v2/kv"
Expand Down Expand Up @@ -2615,7 +2614,7 @@ func (b *executorBuilder) buildAnalyzeSamplingPushdown(task plannercore.AnalyzeC
if opts[ast.AnalyzeOptNumSamples] == 0 {
*sampleRate = math.Float64frombits(opts[ast.AnalyzeOptSampleRate])
if *sampleRate < 0 {
*sampleRate = b.getAdjustedSampleRate(b.ctx, task)
*sampleRate = b.getAdjustedSampleRate(task)
if task.PartitionName != "" {
sc.AppendNote(errors.Errorf(
"Analyze use auto adjusted sample rate %f for table %s.%s's partition %s",
Expand Down Expand Up @@ -2694,8 +2693,8 @@ func (b *executorBuilder) buildAnalyzeSamplingPushdown(task plannercore.AnalyzeC
// If we take n = 1e12, a 300*k sample still gives <= 0.66 bin size error with probability 0.99.
// So if we don't consider the top-n values, we can keep the sample size at 300*256.
// But we may take some top-n before building the histogram, so we increase the sample a little.
func (b *executorBuilder) getAdjustedSampleRate(sctx sessionctx.Context, task plannercore.AnalyzeColumnsTask) float64 {
statsHandle := domain.GetDomain(sctx).StatsHandle()
func (b *executorBuilder) getAdjustedSampleRate(task plannercore.AnalyzeColumnsTask) float64 {
statsHandle := domain.GetDomain(b.ctx).StatsHandle()
defaultRate := 0.001
if statsHandle == nil {
return defaultRate
Expand All @@ -2707,7 +2706,7 @@ func (b *executorBuilder) getAdjustedSampleRate(sctx sessionctx.Context, task pl
} else {
statsTbl = statsHandle.GetPartitionStats(task.TblInfo, tid)
}
approxiCount, hasPD := b.getApproximateTableCountFromStorage(sctx, tid, task)
approxiCount, hasPD := b.getApproximateTableCountFromStorage(tid, task)
// If there's no stats meta and no pd, return the default rate.
if statsTbl == nil && !hasPD {
return defaultRate
Expand All @@ -2734,46 +2733,8 @@ func (b *executorBuilder) getAdjustedSampleRate(sctx sessionctx.Context, task pl
return math.Min(1, config.DefRowsForSampleRate/float64(statsTbl.RealtimeCount))
}

func (b *executorBuilder) getApproximateTableCountFromStorage(sctx sessionctx.Context, tid int64, task plannercore.AnalyzeColumnsTask) (float64, bool) {
tikvStore, ok := sctx.GetStore().(helper.Storage)
if !ok {
return 0, false
}
regionStats := &helper.PDRegionStats{}
pdHelper := helper.NewHelper(tikvStore)
err := pdHelper.GetPDRegionStats(tid, regionStats, true)
failpoint.Inject("calcSampleRateByStorageCount", func() {
// Force the TiDB thinking that there's PD and the count of region is small.
err = nil
regionStats.Count = 1
// Set a very large approximate count.
regionStats.StorageKeys = 1000000
})
if err != nil {
return 0, false
}
// If this table is not small, we directly use the count from PD,
// since for a small table, it's possible that it's data is in the same region with part of another large table.
// Thus, we use the number of the regions of the table's table KV to decide whether the table is small.
if regionStats.Count > 2 {
return float64(regionStats.StorageKeys), true
}
// Otherwise, we use count(*) to calc it's size, since it's very small, the table data can be filled in no more than 2 regions.
sql := new(strings.Builder)
sqlexec.MustFormatSQL(sql, "select count(*) from %n.%n", task.DBName, task.TableName)
if task.PartitionName != "" {
sqlexec.MustFormatSQL(sql, " partition(%n)", task.PartitionName)
}
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
rows, _, err := b.ctx.(sqlexec.RestrictedSQLExecutor).ExecRestrictedSQL(ctx, nil, sql.String())
if err != nil {
return 0, false
}
// If the record set is nil, there's something wrong with the execution. The COUNT(*) would always return one row.
if len(rows) == 0 || rows[0].Len() == 0 {
return 0, false
}
return float64(rows[0].GetInt64(0)), true
func (b *executorBuilder) getApproximateTableCountFromStorage(tid int64, task plannercore.AnalyzeColumnsTask) (float64, bool) {
return internalutil.GetApproximateTableCountFromStorage(b.ctx, tid, task.DBName, task.TableName, task.PartitionName)
}

func (b *executorBuilder) buildAnalyzeColumnsPushdown(task plannercore.AnalyzeColumnsTask, opts map[ast.AnalyzeOptionType]uint64, autoAnalyze string, schemaForVirtualColEval *expression.Schema) *analyzeTask {
Expand Down
156 changes: 137 additions & 19 deletions executor/infoschema_reader.go
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/domain/infosync"
"github.com/pingcap/tidb/errno"
internalutil "github.com/pingcap/tidb/executor/internal/util"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
Expand All @@ -49,6 +50,7 @@ import (
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/sessiontxn"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/store/helper"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/tablecodec"
Expand All @@ -58,7 +60,9 @@ import (
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/collate"
"github.com/pingcap/tidb/util/deadlockhistory"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/hint"
"github.com/pingcap/tidb/util/intest"
"github.com/pingcap/tidb/util/keydecoder"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/mathutil"
Expand Down Expand Up @@ -120,7 +124,7 @@ func (e *memtableRetriever) retrieve(ctx context.Context, sctx sessionctx.Contex
case infoschema.TableClusterInfo:
err = e.dataForTiDBClusterInfo(sctx)
case infoschema.TableAnalyzeStatus:
err = e.setDataForAnalyzeStatus(sctx)
err = e.setDataForAnalyzeStatus(ctx, sctx)
case infoschema.TableTiDBIndexes:
e.setDataFromIndexes(sctx, dbs)
case infoschema.TableViews:
Expand Down Expand Up @@ -2130,16 +2134,17 @@ func (e *tableStorageStatsRetriever) setDataForTableStorageStats(ctx sessionctx.
}

// dataForAnalyzeStatusHelper is a helper function which can be used in show_stats.go
func dataForAnalyzeStatusHelper(sctx sessionctx.Context) (rows [][]types.Datum, err error) {
func dataForAnalyzeStatusHelper(ctx context.Context, sctx sessionctx.Context, isShow bool) (rows [][]types.Datum, err error) {
const maxAnalyzeJobs = 30
const sql = "SELECT table_schema, table_name, partition_name, job_info, processed_rows, CONVERT_TZ(start_time, @@TIME_ZONE, '+00:00'), CONVERT_TZ(end_time, @@TIME_ZONE, '+00:00'), state, fail_reason, instance, process_id FROM mysql.analyze_jobs ORDER BY update_time DESC LIMIT %?"
exec := sctx.(sqlexec.RestrictedSQLExecutor)
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
chunkRows, _, err := exec.ExecRestrictedSQL(ctx, nil, sql, maxAnalyzeJobs)
kctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
chunkRows, _, err := exec.ExecRestrictedSQL(kctx, nil, sql, maxAnalyzeJobs)
if err != nil {
return nil, err
}
checker := privilege.GetPrivilegeManager(sctx)

for _, chunkRow := range chunkRows {
dbName := chunkRow.GetString(0)
tableName := chunkRow.GetString(1)
Expand All @@ -2164,6 +2169,7 @@ func dataForAnalyzeStatusHelper(sctx sessionctx.Context) (rows [][]types.Datum,
}
endTime = types.NewTime(types.FromGoTime(t.In(sctx.GetSessionVars().TimeZone)), mysql.TypeDatetime, 0)
}

state := chunkRow.GetEnum(7).String()
var failReason interface{}
if !chunkRow.IsNull(8) {
Expand All @@ -2174,26 +2180,138 @@ func dataForAnalyzeStatusHelper(sctx sessionctx.Context) (rows [][]types.Datum,
if !chunkRow.IsNull(10) {
procID = chunkRow.GetUint64(10)
}
rows = append(rows, types.MakeDatums(
dbName, // TABLE_SCHEMA
tableName, // TABLE_NAME
partitionName, // PARTITION_NAME
jobInfo, // JOB_INFO
processedRows, // ROW_COUNT
startTime, // START_TIME
endTime, // END_TIME
state, // STATE
failReason, // FAIL_REASON
instance, // INSTANCE
procID, // PROCESS_ID
))

var remainDurationStr, progressStr, estimatedRowCntStr interface{}
if isShow && state == statistics.AnalyzeRunning {
startTime, ok := startTime.(types.Time)
if !ok {
return nil, errors.New("invalid start time")
}
RemainingDuration, progress, estimatedRowCnt, RemainDurationErr :=
getRemainDurationForAnalyzeStatusHelper(ctx, sctx, &startTime,
dbName, tableName, partitionName, processedRows)
if RemainDurationErr != nil {
logutil.BgLogger().Warn("get remaining duration failed", zap.Error(RemainDurationErr))
}
if RemainingDuration != nil {
remainDurationStr = execdetails.FormatDuration(*RemainingDuration)
}
progressStr = progress
estimatedRowCntStr = int64(estimatedRowCnt)
}
var row []types.Datum
if isShow {
row = types.MakeDatums(
dbName, // TABLE_SCHEMA
tableName, // TABLE_NAME
partitionName, // PARTITION_NAME
jobInfo, // JOB_INFO
processedRows, // ROW_COUNT
startTime, // START_TIME
endTime, // END_TIME
state, // STATE
failReason, // FAIL_REASON
instance, // INSTANCE
procID, // PROCESS_ID
remainDurationStr, // REMAINING_SECONDS
progressStr, // PROGRESS
Copy link
Contributor

@chrysan chrysan Jun 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to be a numeric type for UI to easily parse and aggregate.

estimatedRowCntStr, // ESTIMATED_TOTAL_ROWS
)
} else {
row = types.MakeDatums(
dbName, // TABLE_SCHEMA
tableName, // TABLE_NAME
partitionName, // PARTITION_NAME
jobInfo, // JOB_INFO
processedRows, // ROW_COUNT
startTime, // START_TIME
endTime, // END_TIME
state, // STATE
failReason, // FAIL_REASON
instance, // INSTANCE
procID, // PROCESS_ID
)
}
rows = append(rows, row)
}
return
}

func getRemainDurationForAnalyzeStatusHelper(
ctx context.Context,
sctx sessionctx.Context, startTime *types.Time,
dbName, tableName, partitionName string, processedRows int64) (*time.Duration, float64, float64, error) {
var RemainingDuration = time.Duration(0)
var percentage = 0.0
var totalCnt = float64(0)
if startTime != nil {
start, err := startTime.GoTime(time.UTC)
if err != nil {
return nil, percentage, totalCnt, err
}
duration := time.Now().UTC().Sub(start)
if intest.InTest {
if val := ctx.Value(AnalyzeProgressTest); val != nil {
RemainingDuration, percentage = calRemainInfoForAnalyzeStatus(ctx, int64(totalCnt), processedRows, duration)
return &RemainingDuration, percentage, totalCnt, nil
}
}
var tid int64
is := sessiontxn.GetTxnManager(sctx).GetTxnInfoSchema()
tb, err := is.TableByName(model.NewCIStr(dbName), model.NewCIStr(tableName))
if err != nil {
return nil, percentage, totalCnt, err
}
statsHandle := domain.GetDomain(sctx).StatsHandle()
if statsHandle != nil {
var statsTbl *statistics.Table
meta := tb.Meta()
if partitionName != "" {
pt := meta.GetPartitionInfo()
tid = pt.GetPartitionIDByName(partitionName)
statsTbl = statsHandle.GetPartitionStats(meta, tid)
} else {
statsTbl = statsHandle.GetTableStats(meta)
tid = meta.ID
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that we need to exchange the two branches.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

if statsTbl != nil && statsTbl.RealtimeCount != 0 {
totalCnt = float64(statsTbl.RealtimeCount)
}
}
if tid > 0 && totalCnt == 0 {
totalCnt, _ = internalutil.GetApproximateTableCountFromStorage(sctx, tid, dbName, tableName, partitionName)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This func will be called at getAdjustedSampleRate when we do the first analyze, and called again when show analyze status, right? Can we reuse, like persisting the ApproximateTableCount somewhere?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UI will call showAnalyzeStatus many times to get fresh status. I'm afraid there will be frequent calls to internalutil.GetApproximateTableCountFromStorage.

}
RemainingDuration, percentage = calRemainInfoForAnalyzeStatus(ctx, int64(totalCnt), processedRows, duration)
}
return &RemainingDuration, percentage, totalCnt, nil
}

func calRemainInfoForAnalyzeStatus(ctx context.Context, totalCnt int64, processedRows int64, duration time.Duration) (time.Duration, float64) {
if intest.InTest {
if val := ctx.Value(AnalyzeProgressTest); val != nil {
totalCnt = 100 // But in final result, it is still 0.
processedRows = 10
duration = 1 * time.Minute
}
}
if totalCnt == 0 {
return 0, 100.0
}
remainLine := totalCnt - processedRows
if processedRows == 0 {
processedRows = 1
}
if duration == 0 {
duration = 1 * time.Second
}
i := float64(remainLine) * duration.Seconds() / float64(processedRows)
persentage := float64(processedRows) / float64(totalCnt)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: persentage-> percentage

return time.Duration(i) * time.Second, persentage
}

// setDataForAnalyzeStatus gets all the analyze jobs.
func (e *memtableRetriever) setDataForAnalyzeStatus(sctx sessionctx.Context) (err error) {
e.rows, err = dataForAnalyzeStatusHelper(sctx)
func (e *memtableRetriever) setDataForAnalyzeStatus(ctx context.Context, sctx sessionctx.Context) (err error) {
e.rows, err = dataForAnalyzeStatusHelper(ctx, sctx, false)
return
}

Expand Down
6 changes: 4 additions & 2 deletions executor/infoschema_reader_test.go
Expand Up @@ -616,8 +616,10 @@ func TestForAnalyzeStatus(t *testing.T) {
}
rows2 := tk.MustQuery("show analyze status where TABLE_NAME='t1'").Sort().Rows()
require.Equal(t, len(rows), len(rows2))
for i, row2 := range rows2 {
require.Equal(t, rows[i], row2)
for i, row := range rows {
for j, r := range row {
require.Equal(t, r, rows2[i][j])
}
}
}

Expand Down
10 changes: 9 additions & 1 deletion executor/internal/util/BUILD.bazel
Expand Up @@ -2,11 +2,19 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "util",
srcs = ["partition_table.go"],
srcs = [
"partition_table.go",
"util.go",
],
importpath = "github.com/pingcap/tidb/executor/internal/util",
visibility = ["//executor:__subpackages__"],
deps = [
"//kv",
"//sessionctx",
"//store/helper",
"//util/sqlexec",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_tipb//go-tipb",
],
)