Skip to content

Commit

Permalink
statistics: add refresher (#50845)
Browse files Browse the repository at this point in the history
ref #50132
  • Loading branch information
hi-rustin committed Feb 2, 2024
1 parent 8491da1 commit 35a7c9e
Show file tree
Hide file tree
Showing 7 changed files with 453 additions and 16 deletions.
4 changes: 3 additions & 1 deletion pkg/statistics/handle/autoanalyze/priorityqueue/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@ go_library(
"//pkg/sessionctx",
"//pkg/sessionctx/variable",
"//pkg/statistics/handle/autoanalyze/exec",
"//pkg/statistics/handle/logutil",
"//pkg/statistics/handle/types",
"//pkg/statistics/handle/util",
"@org_uber_go_zap//:zap",
],
)

Expand All @@ -29,7 +31,7 @@ go_test(
],
embed = [":priorityqueue"],
flaky = True,
shard_count = 10,
shard_count = 13,
deps = [
"//pkg/parser/model",
"//pkg/session",
Expand Down
42 changes: 29 additions & 13 deletions pkg/statistics/handle/autoanalyze/priorityqueue/interval.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ import (
"github.com/pingcap/tidb/pkg/statistics/handle/util"
)

// noRecord is used to indicate that there is no related record in mysql.analyze_jobs.
const noRecord = -1

// justFailed is used to indicate that the last analysis has just failed.
const justFailed = 0

const avgDurationQueryForTable = `
SELECT AVG(TIMESTAMPDIFF(SECOND, start_time, end_time)) AS avg_duration
FROM (
Expand Down Expand Up @@ -57,14 +63,21 @@ const lastFailedDurationQueryForTable = `
// We pick the minimum duration of all failed analyses because we want to be conservative.
const lastFailedDurationQueryForPartition = `
SELECT
MIN(TIMESTAMPDIFF(SECOND, start_time, CURRENT_TIMESTAMP)) AS min_duration
FROM
mysql.analyze_jobs
WHERE
table_schema = %? AND
table_name = %? AND
state = 'failed' AND
partition_name IN (%?);
MIN(TIMESTAMPDIFF(SECOND, aj.start_time, CURRENT_TIMESTAMP)) AS min_duration
FROM (
SELECT
MAX(id) AS max_id
FROM
mysql.analyze_jobs
WHERE
table_schema = %?
AND table_name = %?
AND state = 'failed'
AND partition_name IN (%?)
GROUP BY
partition_name
) AS latest_failures
JOIN mysql.analyze_jobs aj ON aj.id = latest_failures.max_id;
`

// getAverageAnalysisDuration returns the average duration of the last 5 successful analyses for each specified partition.
Expand All @@ -86,17 +99,17 @@ func getAverageAnalysisDuration(

rows, _, err := util.ExecRows(sctx, query, params...)
if err != nil {
return 0, err
return noRecord, err
}

// NOTE: if there are no successful analyses, we return 0.
if len(rows) == 0 || rows[0].IsNull(0) {
return 0, nil
return noRecord, nil
}
avgDuration := rows[0].GetMyDecimal(0)
duration, err := avgDuration.ToFloat64()
if err != nil {
return 0, err
return noRecord, err
}

return time.Duration(duration) * time.Second, nil
Expand All @@ -121,14 +134,17 @@ func getLastFailedAnalysisDuration(

rows, _, err := util.ExecRows(sctx, query, params...)
if err != nil {
return 0, err
return noRecord, err
}

// NOTE: if there are no failed analyses, we return 0.
if len(rows) == 0 || rows[0].IsNull(0) {
return 0, nil
return noRecord, nil
}
lastFailedDuration := rows[0].GetUint64(0)
if lastFailedDuration == 0 {
return justFailed, nil
}

return time.Duration(lastFailedDuration) * time.Second, nil
}
69 changes: 67 additions & 2 deletions pkg/statistics/handle/autoanalyze/priorityqueue/interval_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestGetAverageAnalysisDuration(t *testing.T) {
"example_schema", "example_table", "example_partition",
)
require.NoError(t, err)
require.Equal(t, time.Duration(0), avgDuration)
require.Equal(t, time.Duration(noRecord), avgDuration)

initJobs(tk)

Expand Down Expand Up @@ -102,7 +102,7 @@ func TestGetLastFailedAnalysisDuration(t *testing.T) {
"example_schema", "example_table", "example_partition",
)
require.NoError(t, err)
require.Equal(t, time.Duration(0), lastFailedDuration)
require.Equal(t, time.Duration(noRecord), lastFailedDuration)
initJobs(tk)

// Partitioned table.
Expand Down Expand Up @@ -320,3 +320,68 @@ func insertFailedJob(
)
}
}

func insertFailedJobWithStartTime(
tk *testkit.TestKit,
dbName string,
tableName string,
partitionName string,
startTime string,
) {
if partitionName == "" {
tk.MustExec(`
INSERT INTO mysql.analyze_jobs (
table_schema,
table_name,
job_info,
start_time,
end_time,
state,
fail_reason,
instance
) VALUES (
?,
?,
'Job information for failed job',
?,
'2024-01-01 10:00:00',
'failed',
'Some reason for failure',
'example_instance'
);
`,
dbName,
tableName,
startTime,
)
} else {
tk.MustExec(`
INSERT INTO mysql.analyze_jobs (
table_schema,
table_name,
partition_name,
job_info,
start_time,
end_time,
state,
fail_reason,
instance
) VALUES (
?,
?,
?,
'Job information for failed job',
?,
'2024-01-01 10:00:00',
'failed',
'Some reason for failure',
'example_instance'
);
`,
dbName,
tableName,
partitionName,
startTime,
)
}
}
131 changes: 131 additions & 0 deletions pkg/statistics/handle/autoanalyze/priorityqueue/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,25 @@
package priorityqueue

import (
"fmt"
"strings"
"time"

"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/statistics/handle/autoanalyze/exec"
statslogutil "github.com/pingcap/tidb/pkg/statistics/handle/logutil"
statstypes "github.com/pingcap/tidb/pkg/statistics/handle/types"
statsutil "github.com/pingcap/tidb/pkg/statistics/handle/util"
"go.uber.org/zap"
)

// defaultFailedAnalysisWaitTime is the default wait time for the next analysis after a failed analysis.
// NOTE: this is only used when the average analysis duration is not available.(No successful analysis before)
const defaultFailedAnalysisWaitTime = 30 * time.Minute

// TableAnalysisJob defines the structure for table analysis job information.
// TODO: add stringer for TableAnalysisJob.
type TableAnalysisJob struct {
// Only set when partitions's indexes need to be analyzed.
PartitionIndexes map[string][]string
Expand All @@ -40,6 +49,128 @@ type TableAnalysisJob struct {
Weight float64
}

// IsValidToAnalyze checks whether the table is valid to analyze.
// It checks the last failed analysis duration and the average analysis duration.
// If the last failed analysis duration is less than 2 times the average analysis duration,
// we skip this table to avoid too much failed analysis.
func (j *TableAnalysisJob) IsValidToAnalyze(
sctx sessionctx.Context,
) (bool, string) {
// No need to analyze this table.
// TODO: Usually, we should not put this kind of table into the queue.
if j.Weight == 0 {
return false, "weight is 0"
}

// Check whether the table or partition is valid to analyze.
if len(j.Partitions) > 0 || len(j.PartitionIndexes) > 0 {
// Any partition is invalid to analyze, the whole table is invalid to analyze.
// Because we need to analyze partitions in batch mode.
partitions := append(j.Partitions, getPartitionNames(j.PartitionIndexes)...)
if valid, failReason := isValidToAnalyze(
sctx,
j.TableSchema,
j.TableName,
partitions...,
); !valid {
return false, failReason
}
} else {
if valid, failReason := isValidToAnalyze(
sctx,
j.TableSchema,
j.TableName,
); !valid {
return false, failReason
}
}

return true, ""
}

func getPartitionNames(partitionIndexes map[string][]string) []string {
names := make([]string, 0, len(partitionIndexes))
for _, partitionNames := range partitionIndexes {
names = append(names, partitionNames...)
}
return names
}

func isValidToAnalyze(
sctx sessionctx.Context,
schema, table string,
partitionNames ...string,
) (bool, string) {
lastFailedAnalysisDuration, err :=
getLastFailedAnalysisDuration(sctx, schema, table, partitionNames...)
if err != nil {
statslogutil.StatsLogger().Warn(
"Fail to get last failed analysis duration",
zap.String("schema", schema),
zap.String("table", table),
zap.Strings("partitions", partitionNames),
zap.Error(err),
)
return false, fmt.Sprintf("fail to get last failed analysis duration: %v", err)
}

averageAnalysisDuration, err :=
getAverageAnalysisDuration(sctx, schema, table, partitionNames...)
if err != nil {
statslogutil.StatsLogger().Warn(
"Fail to get average analysis duration",
zap.String("schema", schema),
zap.String("table", table),
zap.Strings("partitions", partitionNames),
zap.Error(err),
)
return false, fmt.Sprintf("fail to get average analysis duration: %v", err)
}

// Last analysis just failed, we should not analyze it again.
if lastFailedAnalysisDuration == justFailed {
// The last analysis failed, we should not analyze it again.
statslogutil.StatsLogger().Info(
"Skip analysis because the last analysis just failed",
zap.String("schema", schema),
zap.String("table", table),
zap.Strings("partitions", partitionNames),
)
return false, "last analysis just failed"
}

// Failed analysis duration is less than 2 times the average analysis duration.
// Skip this table to avoid too much failed analysis.
onlyFailedAnalysis := lastFailedAnalysisDuration != noRecord && averageAnalysisDuration == noRecord
if onlyFailedAnalysis && lastFailedAnalysisDuration < defaultFailedAnalysisWaitTime {
statslogutil.StatsLogger().Info(
fmt.Sprintf("Skip analysis because the last failed analysis duration is less than %v", defaultFailedAnalysisWaitTime),
zap.String("schema", schema),
zap.String("table", table),
zap.Strings("partitions", partitionNames),
zap.Duration("lastFailedAnalysisDuration", lastFailedAnalysisDuration),
zap.Duration("averageAnalysisDuration", averageAnalysisDuration),
)
return false, fmt.Sprintf("last failed analysis duration is less than %v", defaultFailedAnalysisWaitTime)
}
// Failed analysis duration is less than 2 times the average analysis duration.
meetSkipCondition := lastFailedAnalysisDuration != noRecord &&
lastFailedAnalysisDuration < 2*averageAnalysisDuration
if meetSkipCondition {
statslogutil.StatsLogger().Info(
"Skip analysis because the last failed analysis duration is less than 2 times the average analysis duration",
zap.String("schema", schema),
zap.String("table", table),
zap.Strings("partitions", partitionNames),
zap.Duration("lastFailedAnalysisDuration", lastFailedAnalysisDuration),
zap.Duration("averageAnalysisDuration", averageAnalysisDuration),
)
return false, "last failed analysis duration is less than 2 times the average analysis duration"
}

return true, ""
}

// Execute executes the analyze statement.
func (j *TableAnalysisJob) Execute(
statsHandle statstypes.StatsHandle,
Expand Down

0 comments on commit 35a7c9e

Please sign in to comment.