Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stats: incremental analyze for index with feedback updates #10355

Merged
merged 8 commits into from May 8, 2019
@@ -1075,13 +1075,13 @@ func (e *AnalyzeTestFastExec) TestFastSample() error {

type analyzeIndexIncrementalExec struct {
AnalyzeIndexExec
index *statistics.Index
oldHist *statistics.Histogram
oldCMS *statistics.CMSketch
}

func analyzeIndexIncremental(idxExec *analyzeIndexIncrementalExec) analyzeResult {
idx := idxExec.index
highBound := idx.Histogram.GetUpper(idx.Len() - 1)
values, err := codec.Decode(highBound.GetBytes(), len(idxExec.idxInfo.Columns))
startPos := idxExec.oldHist.GetUpper(idxExec.oldHist.Len() - 1)
This conversation was marked as resolved by lamxTyler

This comment has been minimized.

Copy link
@winoros

winoros May 6, 2019

Member

do we need to check idxExec.oldHist.Len() > 1?

This comment has been minimized.

values, err := codec.DecodeRange(startPos.GetBytes(), len(idxExec.idxInfo.Columns))
if err != nil {
return analyzeResult{Err: err, job: idxExec.job}
}
@@ -1090,16 +1090,12 @@ func analyzeIndexIncremental(idxExec *analyzeIndexIncrementalExec) analyzeResult
if err != nil {
return analyzeResult{Err: err, job: idxExec.job}
}
oldHist, oldCMS, err := idx.RemoveUpperBound(idxExec.ctx.GetSessionVars().StmtCtx, values)
hist, err = statistics.MergeHistograms(idxExec.ctx.GetSessionVars().StmtCtx, idxExec.oldHist, hist, int(idxExec.maxNumBuckets))
if err != nil {
return analyzeResult{Err: err, job: idxExec.job}
}
hist, err = statistics.MergeHistograms(idxExec.ctx.GetSessionVars().StmtCtx, oldHist, hist, int(idxExec.maxNumBuckets))
if err != nil {
return analyzeResult{Err: err, job: idxExec.job}
}
if oldCMS != nil && cms != nil {
err = cms.MergeCMSketch(oldCMS)
if idxExec.oldCMS != nil && cms != nil {
err = cms.MergeCMSketch4IncrementalAnalyze(idxExec.oldCMS)
if err != nil {
return analyzeResult{Err: err, job: idxExec.job}
}
@@ -1120,26 +1116,24 @@ func analyzeIndexIncremental(idxExec *analyzeIndexIncrementalExec) analyzeResult

type analyzePKIncrementalExec struct {
AnalyzeColumnsExec
pkStats *statistics.Column
oldHist *statistics.Histogram
}

func analyzePKIncremental(colExec *analyzePKIncrementalExec) analyzeResult {
This conversation was marked as resolved by eurekaka

This comment has been minimized.

Copy link
@eurekaka

eurekaka May 6, 2019

Contributor

Can we merge analyzePKIncremental with analyzeIndexIncremental now since they are almost same?

This comment has been minimized.

Copy link
@lamxTyler

lamxTyler May 6, 2019

Author Member

There are still many tiny details that differ.

pkStats := colExec.pkStats
high := pkStats.GetUpper(pkStats.Len() - 1)
var maxVal types.Datum
if mysql.HasUnsignedFlag(colExec.pkInfo.Flag) {
maxVal = types.NewUintDatum(math.MaxUint64)
} else {
maxVal = types.NewIntDatum(math.MaxInt64)
}
ran := ranger.Range{LowVal: []types.Datum{*high}, LowExclude: true, HighVal: []types.Datum{maxVal}}
startPos := *colExec.oldHist.GetUpper(colExec.oldHist.Len() - 1)
ran := ranger.Range{LowVal: []types.Datum{startPos}, LowExclude: true, HighVal: []types.Datum{maxVal}}
hists, _, err := colExec.buildStats([]*ranger.Range{&ran})
if err != nil {
return analyzeResult{Err: err, job: colExec.job}
}
hist := hists[0]
oldHist := pkStats.Histogram.Copy()
hist, err = statistics.MergeHistograms(colExec.ctx.GetSessionVars().StmtCtx, oldHist, hist, int(colExec.maxNumBuckets))
hist, err = statistics.MergeHistograms(colExec.ctx.GetSessionVars().StmtCtx, colExec.oldHist, hist, int(colExec.maxNumBuckets))
if err != nil {
return analyzeResult{Err: err, job: colExec.job}
}
@@ -24,9 +24,13 @@ import (
"github.com/pingcap/tidb/executor"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/statistics/handle"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/store/mockstore/mocktikv"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/testkit"
)

@@ -303,4 +307,36 @@ func (s *testSuite1) TestAnalyzeIncremental(c *C) {
tk.MustExec("analyze incremental table t index")
// Result should not change.
tk.MustQuery("show stats_buckets").Check(testkit.Rows("test t a 0 0 1 1 1 1", "test t a 0 1 2 1 2 2", "test t idx 1 0 1 1 1 1", "test t idx 1 1 2 1 2 2"))

// Test analyze incremental with feedback.
tk.MustExec("insert into t values (3,3)")
oriProbability := statistics.FeedbackProbability.Load()
defer func() {
statistics.FeedbackProbability.Store(oriProbability)
}()
statistics.FeedbackProbability.Store(1)
is := s.dom.InfoSchema()
table, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
c.Assert(err, IsNil)
tblInfo := table.Meta()
tk.MustQuery("select * from t use index(idx) where b = 3")
tk.MustQuery("select * from t where a > 1")
h := s.dom.StatsHandle()
c.Assert(h.DumpStatsDeltaToKV(handle.DumpAll), IsNil)
c.Assert(h.DumpStatsFeedbackToKV(), IsNil)
c.Assert(h.HandleUpdateStats(is), IsNil)
h.Update(is)
This conversation was marked as resolved by lamxTyler

This comment has been minimized.

Copy link
@qw4990

qw4990 May 7, 2019

Contributor

Check this error?

tk.MustQuery("show stats_buckets").Check(testkit.Rows("test t a 0 0 1 1 1 1", "test t a 0 1 3 0 2 2147483647", "test t idx 1 0 1 1 1 1", "test t idx 1 1 2 1 2 2"))
tblStats := h.GetTableStats(tblInfo)
val, err := codec.EncodeKey(tk.Se.GetSessionVars().StmtCtx, nil, types.NewIntDatum(3))
c.Assert(err, IsNil)
c.Assert(tblStats.Indices[tblInfo.Indices[0].ID].CMSketch.QueryBytes(val), Equals, uint64(1))
c.Assert(statistics.IsAnalyzed(tblStats.Indices[tblInfo.Indices[0].ID].Flag), IsFalse)
c.Assert(statistics.IsAnalyzed(tblStats.Columns[tblInfo.Columns[0].ID].Flag), IsFalse)

tk.MustExec("analyze incremental table t index")
tk.MustQuery("show stats_buckets").Check(testkit.Rows("test t a 0 0 1 1 1 1", "test t a 0 1 2 1 2 2", "test t a 0 2 3 1 3 3",
"test t idx 1 0 1 1 1 1", "test t idx 1 1 2 1 2 2", "test t idx 1 2 3 1 3 3"))
tblStats = h.GetTableStats(tblInfo)
c.Assert(tblStats.Indices[tblInfo.Indices[0].ID].CMSketch.QueryBytes(val), Equals, uint64(1))
}
@@ -1398,18 +1398,28 @@ func (b *executorBuilder) buildAnalyzeIndexIncremental(task plannercore.AnalyzeI
return analyzeTask
}
idx, ok := statsTbl.Indices[task.IndexInfo.ID]
// TODO: If the index contains feedback, we may use other strategy.
if !ok || idx.Len() == 0 || idx.ContainsFeedback() {
if !ok || idx.Len() == 0 || idx.LastAnalyzePos.IsNull() {
return analyzeTask
}
exec := analyzeTask.idxExec
if idx.CMSketch != nil {
width, depth := idx.CMSketch.GetWidthAndDepth()
exec.analyzePB.IdxReq.CmsketchWidth = &width
exec.analyzePB.IdxReq.CmsketchDepth = &depth
var oldHist *statistics.Histogram
if statistics.IsAnalyzed(idx.Flag) {
exec := analyzeTask.idxExec
if idx.CMSketch != nil {
width, depth := idx.CMSketch.GetWidthAndDepth()
exec.analyzePB.IdxReq.CmsketchWidth = &width
exec.analyzePB.IdxReq.CmsketchDepth = &depth
}
oldHist = idx.Histogram.Copy()
} else {
_, bktID := idx.LessRowCountWithBktIdx(idx.LastAnalyzePos)
if bktID == 0 {
return analyzeTask
}
oldHist = idx.TruncateHistogram(bktID)
}
oldHist = oldHist.RemoveUpperBound()
This conversation was marked as resolved by lamxTyler

This comment has been minimized.

Copy link
@eurekaka

eurekaka May 6, 2019

Contributor

Why we don't remove upper bound from CMSketch now?

This comment has been minimized.

Copy link
@lamxTyler

lamxTyler May 6, 2019

Author Member

Since we use max to merge cm sketch now, it does not matter if we removed it or not.

analyzeTask.taskType = idxIncrementalTask
analyzeTask.idxIncrementalExec = &analyzeIndexIncrementalExec{AnalyzeIndexExec: *analyzeTask.idxExec, index: idx}
analyzeTask.idxIncrementalExec = &analyzeIndexIncrementalExec{AnalyzeIndexExec: *analyzeTask.idxExec, oldHist: oldHist, oldCMS: idx.CMSketch}
analyzeTask.job = &statistics.AnalyzeJob{DBName: task.DBName, TableName: task.TableName, PartitionName: task.PartitionName, JobInfo: "analyze incremental index " + task.IndexInfo.Name.O}
return analyzeTask
}
@@ -1458,13 +1468,28 @@ func (b *executorBuilder) buildAnalyzePKIncremental(task plannercore.AnalyzeColu
return analyzeTask
}
col, ok := statsTbl.Columns[task.PKInfo.ID]
// TODO: If the primary key contains feedback, we may use other strategy.
if !ok || col.Len() == 0 || col.ContainsFeedback() {
if !ok || col.Len() == 0 || col.LastAnalyzePos.IsNull() {
return analyzeTask
}
var oldHist *statistics.Histogram
if statistics.IsAnalyzed(col.Flag) {
oldHist = col.Histogram.Copy()
} else {
d, err := col.LastAnalyzePos.ConvertTo(b.ctx.GetSessionVars().StmtCtx, col.Tp)
if err != nil {
b.err = err
return nil
}
_, bktID := col.LessRowCountWithBktIdx(d)
if bktID == 0 {
return analyzeTask
}
oldHist = col.TruncateHistogram(bktID)
oldHist.NDV = int64(oldHist.TotalRowCount())
}
exec := analyzeTask.colExec
analyzeTask.taskType = pkIncrementalTask
analyzeTask.colIncrementalExec = &analyzePKIncrementalExec{AnalyzeColumnsExec: *exec, pkStats: col}
analyzeTask.colIncrementalExec = &analyzePKIncrementalExec{AnalyzeColumnsExec: *exec, oldHist: oldHist}
analyzeTask.job = &statistics.AnalyzeJob{DBName: task.DBName, TableName: task.TableName, PartitionName: task.PartitionName, JobInfo: "analyze incremental primary key"}
return analyzeTask
}
@@ -2514,6 +2514,7 @@ func (s *testSuite1) SetUpSuite(c *C) {
c.Assert(err, IsNil)
s.dom, err = session.BootstrapSession(s.store)
c.Assert(err, IsNil)
s.dom.SetStatsUpdating(true)
}

func (s *testSuite1) TearDownSuite(c *C) {
@@ -171,6 +171,7 @@ const (
stats_ver bigint(64) NOT NULL DEFAULT 0,
flag bigint(64) NOT NULL DEFAULT 0,
correlation double NOT NULL DEFAULT 0,
last_analyze_pos blob DEFAULT NULL,
unique index tbl(table_id, is_index, hist_id)
);`

@@ -328,6 +329,7 @@ const (
version28 = 28
version29 = 29
version30 = 30
version31 = 31
)

func checkBootstrapped(s Session) (bool, error) {
@@ -507,6 +509,10 @@ func upgrade(s Session) {
upgradeToVer30(s)
}

if ver < version31 {
upgradeToVer31(s)
}

updateBootstrapVer(s)
_, err = s.Execute(context.Background(), "COMMIT")

@@ -799,6 +805,10 @@ func upgradeToVer30(s Session) {
mustExecute(s, CreateStatsTopNTable)
}

func upgradeToVer31(s Session) {
doReentrantDDL(s, "ALTER TABLE mysql.stats_histograms ADD COLUMN `last_analyze_pos` blob default null", infoschema.ErrColumnExists)
}

// updateBootstrapVer updates bootstrap version variable in mysql.TiDB table.
func updateBootstrapVer(s Session) {
// Update bootstrap version.
@@ -1559,7 +1559,7 @@ func createSessionWithDomain(store kv.Storage, dom *domain.Domain) (*session, er

const (
notBootstrapped = 0
currentBootstrapVersion = 30
currentBootstrapVersion = 31
)

func getStoreBootstrapVersion(store kv.Storage) int64 {
@@ -298,6 +298,31 @@ func (c *CMSketch) MergeCMSketch(rc *CMSketch) error {
return nil
}

// MergeCMSketch4IncrementalAnalyze merges two CM Sketch for incremental analyze. Since there is no value
// that appears partially in `c` and `rc` for incremental analyze, it uses `max` to merge them.
// Here is a simple proof: when we query from the CM sketch, we use the `min` to get the answer:
// (1): For values that only appears in `c, using `max` to merge them affects the `min` query result less than using `sum`;
// (2): For values that only appears in `rc`, it is the same as condition (1);
// (3): For values that appears both in `c` and `rc`, if they do not appear partially in `c` and `rc`, for example,
// if `v` appears 5 times in the table, it can appears 3 times in `c` and 5 times in `rc`, then `max` also gives the correct answer.
This conversation was marked as resolved by qw4990

This comment has been minimized.

Copy link
@qw4990

qw4990 May 7, 2019

Contributor

Since rc is the old CMSketch and c is the new one and this column is append only and increase only, appear times of v in c should be always equal or large than rc, so it should be it can appears 5 times in `c` and 3 times in `rc` ?

// So in fact, if we can know the number of appearances of each value in the first place, it is better to use `max` to construct the CM sketch rather than `sum`.
func (c *CMSketch) MergeCMSketch4IncrementalAnalyze(rc *CMSketch) error {
This conversation was marked as resolved by zz-jason

This comment has been minimized.

Copy link
@eurekaka

eurekaka May 7, 2019

Contributor

Could you explain more on the rationale behind this? We don't remove upper bound from old CMSketch now, so there can be value that appears partially in c and rc now? Even if there is no value that appears partially in c and rc, why we can use max operation to merge them?

This comment has been minimized.

Copy link
@lamxTyler

lamxTyler May 7, 2019

Author Member

When we query from the cm sketch, we use the min to get the answer

  • For values that only appears in c, using max to merge them affects the min query result less than using sum.
  • For values that only appears in rc, it is the same;
  • For values that appears both in c and rc, if they do not appear partially in c and rc, for example, if the v appears 5 times in the table, it can appears 3 times in c and 5 times in rc, then max is also gives the correct answer.

So in fact, if we can know the number of appearances of each value in the first place, it is better to use max to construct the cm sketch rather than sum.

This comment has been minimized.

Copy link
@zz-jason

zz-jason May 7, 2019

Member

from the first intuition, it should be the sum of these two CM-Sketches?

This comment has been minimized.

Copy link
@lamxTyler

lamxTyler May 7, 2019

Author Member

It would be wrong in the third case.

This comment has been minimized.

Copy link
@eurekaka

eurekaka May 7, 2019

Contributor

Got it, please add this explanation into the comment also.

if c.depth != rc.depth || c.width != rc.width {
return errors.New("Dimensions of Count-Min Sketch should be the same")
}
if c.topN != nil || rc.topN != nil {
return errors.New("CMSketch with Top-N does not support merge")
}
for i := range c.table {
c.count = 0
for j := range c.table[i] {
c.table[i][j] = mathutil.MaxUint32(c.table[i][j], rc.table[i][j])
c.count += uint64(c.table[i][j])
}
}
return nil
}

// CMSketchToProto converts CMSketch to its protobuf representation.
func CMSketchToProto(c *CMSketch) *tipb.CMSketch {
protoSketch := &tipb.CMSketch{Rows: make([]*tipb.CMSketchRow, c.depth)}
@@ -109,7 +109,7 @@ func (h *Handle) initStatsHistograms4Chunk(is infoschema.InfoSchema, tables Stat
terror.Log(errors.Trace(err))
}
hist := statistics.NewHistogram(id, ndv, nullCount, version, types.NewFieldType(mysql.TypeBlob), chunk.InitialCapacity, 0)
table.Indices[hist.ID] = &statistics.Index{Histogram: *hist, CMSketch: cms, Info: idxInfo, StatsVer: row.GetInt64(8)}
table.Indices[hist.ID] = &statistics.Index{Histogram: *hist, CMSketch: cms, Info: idxInfo, StatsVer: row.GetInt64(8), Flag: row.GetInt64(10), LastAnalyzePos: row.GetDatum(11, types.NewFieldType(mysql.TypeBlob))}
} else {
var colInfo *model.ColumnInfo
for _, col := range tbl.Meta().Columns {
@@ -124,11 +124,13 @@ func (h *Handle) initStatsHistograms4Chunk(is infoschema.InfoSchema, tables Stat
hist := statistics.NewHistogram(id, ndv, nullCount, version, &colInfo.FieldType, 0, totColSize)
hist.Correlation = row.GetFloat64(9)
table.Columns[hist.ID] = &statistics.Column{
Histogram: *hist,
PhysicalID: table.PhysicalID,
Info: colInfo,
Count: nullCount,
IsHandle: tbl.Meta().PKIsHandle && mysql.HasPriKeyFlag(colInfo.Flag),
Histogram: *hist,
PhysicalID: table.PhysicalID,
Info: colInfo,
Count: nullCount,
IsHandle: tbl.Meta().PKIsHandle && mysql.HasPriKeyFlag(colInfo.Flag),
Flag: row.GetInt64(10),
LastAnalyzePos: row.GetDatum(11, types.NewFieldType(mysql.TypeBlob)),
}
}
}
@@ -137,7 +139,7 @@ func (h *Handle) initStatsHistograms4Chunk(is infoschema.InfoSchema, tables Stat
func (h *Handle) initStatsHistograms(is infoschema.InfoSchema, tables StatsCache) error {
h.mu.Lock()
defer h.mu.Unlock()
sql := "select HIGH_PRIORITY table_id, is_index, hist_id, distinct_count, version, null_count, cm_sketch, tot_col_size, stats_ver, correlation from mysql.stats_histograms"
sql := "select HIGH_PRIORITY table_id, is_index, hist_id, distinct_count, version, null_count, cm_sketch, tot_col_size, stats_ver, correlation, flag, last_analyze_pos from mysql.stats_histograms"
rc, err := h.mu.ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), sql)
if len(rc) > 0 {
defer terror.Call(rc[0].Close)
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.