From 86ad1a96c3a626dd69d8769cd5f8f7196c0c7a56 Mon Sep 17 00:00:00 2001 From: Lynn Date: Tue, 13 Aug 2019 14:06:11 +0800 Subject: [PATCH] =?UTF-8?q?*:=20speed=20up=20the=20operation=20of=20"admin?= =?UTF-8?q?=20check=20table"=20(#8572,=20#1156=E2=80=A6=20(#11676)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ddl/db_test.go | 16 +++ ddl/index.go | 4 +- executor/admin_test.go | 148 +++++++++++++++++--- executor/builder.go | 55 +++++++- executor/distsql.go | 189 +++++++++++++++++++------ executor/executor.go | 149 ++++++++++++++------ executor/executor_test.go | 2 +- planner/core/common_plans.go | 7 +- planner/core/physical_plans.go | 5 + planner/core/planbuilder.go | 243 ++++++++++++++++++++++++--------- util/admin/admin.go | 47 +++++-- util/admin/admin_test.go | 8 +- util/rowDecoder/decoder.go | 55 ++++++-- util/testkit/testkit.go | 9 ++ 14 files changed, 729 insertions(+), 208 deletions(-) diff --git a/ddl/db_test.go b/ddl/db_test.go index dd6d30e05138..e98bcb54aec7 100644 --- a/ddl/db_test.go +++ b/ddl/db_test.go @@ -4616,6 +4616,22 @@ func (s *testDBSuite) TestAddIndexForGeneratedColumn(c *C) { s.mustExec(c, "delete from t where y = 2155") s.mustExec(c, "alter table t add index idx_y(y1)") s.mustExec(c, "alter table t drop index idx_y") + + // Fix issue 9311. + s.tk.MustExec("create table gcai_table (id int primary key);") + s.tk.MustExec("insert into gcai_table values(1);") + s.tk.MustExec("ALTER TABLE gcai_table ADD COLUMN d date DEFAULT '9999-12-31';") + s.tk.MustExec("ALTER TABLE gcai_table ADD COLUMN d1 date as (DATE_SUB(d, INTERVAL 31 DAY));") + s.tk.MustExec("ALTER TABLE gcai_table ADD INDEX idx(d1);") + s.tk.MustQuery("select * from gcai_table").Check(testkit.Rows("1 9999-12-31 9999-11-30")) + s.tk.MustQuery("select d1 from gcai_table use index(idx)").Check(testkit.Rows("9999-11-30")) + s.tk.MustExec("admin check table gcai_table") + // The column is PKIsHandle in generated column expression. + s.tk.MustExec("ALTER TABLE gcai_table ADD COLUMN id1 int as (id+5);") + s.tk.MustExec("ALTER TABLE gcai_table ADD INDEX idx1(id1);") + s.tk.MustQuery("select * from gcai_table").Check(testkit.Rows("1 9999-12-31 9999-11-30 6")) + s.tk.MustQuery("select id1 from gcai_table use index(idx1)").Check(testkit.Rows("6")) + s.tk.MustExec("admin check table gcai_table") } func (s *testDBSuite) TestModifyColumnCharset(c *C) { diff --git a/ddl/index.go b/ddl/index.go index 4bb413bf8d6a..ce5755249441 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -519,7 +519,7 @@ func mergeAddIndexCtxToResult(taskCtx *addIndexTaskContext, result *addIndexResu func newAddIndexWorker(sessCtx sessionctx.Context, worker *worker, id int, t table.PhysicalTable, indexInfo *model.IndexInfo, decodeColMap map[int64]decoder.Column) *addIndexWorker { index := tables.NewIndex(t.GetPhysicalID(), t.Meta(), indexInfo) - rowDecoder := decoder.NewRowDecoder(t.Cols(), decodeColMap) + rowDecoder := decoder.NewRowDecoder(t, decodeColMap) return &addIndexWorker{ id: id, ddlWorker: worker, @@ -549,7 +549,7 @@ func (w *addIndexWorker) getIndexRecord(handle int64, recordKey []byte, rawRecor cols := t.Cols() idxInfo := w.index.Meta() sysZone := timeutil.SystemLocation() - _, err := w.rowDecoder.DecodeAndEvalRowWithMap(w.sessCtx, rawRecord, time.UTC, sysZone, w.rowMap) + _, err := w.rowDecoder.DecodeAndEvalRowWithMap(w.sessCtx, handle, rawRecord, time.UTC, sysZone, w.rowMap) if err != nil { return nil, errors.Trace(errCantDecodeIndex.GenWithStackByArgs(err)) } diff --git a/executor/admin_test.go b/executor/admin_test.go index bf6b7ed30bde..bab9e433ba43 100644 --- a/executor/admin_test.go +++ b/executor/admin_test.go @@ -101,10 +101,10 @@ func (s *testSuite) TestAdminRecoverIndex(c *C) { c.Assert(err, IsNil) err = txn.Commit(context.Background()) c.Assert(err, IsNil) - _, err = tk.Exec("admin check table admin_test") + err = tk.ExecToErr("admin check table admin_test") c.Assert(err, NotNil) c.Assert(executor.ErrAdminCheckTable.Equal(err), IsTrue) - _, err = tk.Exec("admin check index admin_test c2") + err = tk.ExecToErr("admin check index admin_test c2") c.Assert(err, NotNil) r = tk.MustQuery("SELECT COUNT(*) FROM admin_test USE INDEX(c2)") @@ -125,7 +125,7 @@ func (s *testSuite) TestAdminRecoverIndex(c *C) { err = txn.Commit(context.Background()) c.Assert(err, IsNil) - _, err = tk.Exec("admin check index admin_test c2") + err = tk.ExecToErr("admin check index admin_test c2") c.Assert(err, NotNil) r = tk.MustQuery("admin recover index admin_test c2") r.Check(testkit.Rows("1 5")) @@ -147,9 +147,9 @@ func (s *testSuite) TestAdminRecoverIndex(c *C) { err = txn.Commit(context.Background()) c.Assert(err, IsNil) - _, err = tk.Exec("admin check table admin_test") + err = tk.ExecToErr("admin check table admin_test") c.Assert(err, NotNil) - _, err = tk.Exec("admin check index admin_test c2") + err = tk.ExecToErr("admin check index admin_test c2") c.Assert(err, NotNil) r = tk.MustQuery("SELECT COUNT(*) FROM admin_test USE INDEX(c2)") @@ -271,9 +271,9 @@ func (s *testSuite) TestAdminCleanupIndex(c *C) { err = txn.Commit(context.Background()) c.Assert(err, IsNil) - _, err = tk.Exec("admin check table admin_test") + err = tk.ExecToErr("admin check table admin_test") c.Assert(err, NotNil) - _, err = tk.Exec("admin check index admin_test c2") + err = tk.ExecToErr("admin check index admin_test c2") c.Assert(err, NotNil) r = tk.MustQuery("SELECT COUNT(*) FROM admin_test USE INDEX(c2)") r.Check(testkit.Rows("11")) @@ -283,9 +283,9 @@ func (s *testSuite) TestAdminCleanupIndex(c *C) { r.Check(testkit.Rows("6")) tk.MustExec("admin check index admin_test c2") - _, err = tk.Exec("admin check table admin_test") + err = tk.ExecToErr("admin check table admin_test") c.Assert(err, NotNil) - _, err = tk.Exec("admin check index admin_test c3") + err = tk.ExecToErr("admin check index admin_test c3") c.Assert(err, NotNil) r = tk.MustQuery("SELECT COUNT(*) FROM admin_test USE INDEX(c3)") r.Check(testkit.Rows("9")) @@ -332,9 +332,9 @@ func (s *testSuite) TestAdminCleanupIndexPKNotHandle(c *C) { err = txn.Commit(context.Background()) c.Assert(err, IsNil) - _, err = tk.Exec("admin check table admin_test") + err = tk.ExecToErr("admin check table admin_test") c.Assert(err, NotNil) - _, err = tk.Exec("admin check index admin_test `primary`") + err = tk.ExecToErr("admin check index admin_test `primary`") c.Assert(err, NotNil) r = tk.MustQuery("SELECT COUNT(*) FROM admin_test USE INDEX(`primary`)") r.Check(testkit.Rows("6")) @@ -384,11 +384,11 @@ func (s *testSuite) TestAdminCleanupIndexMore(c *C) { err = txn.Commit(context.Background()) c.Assert(err, IsNil) - _, err = tk.Exec("admin check table admin_test") + err = tk.ExecToErr("admin check table admin_test") c.Assert(err, NotNil) - _, err = tk.Exec("admin check index admin_test c1") + err = tk.ExecToErr("admin check index admin_test c1") c.Assert(err, NotNil) - _, err = tk.Exec("admin check index admin_test c2") + err = tk.ExecToErr("admin check index admin_test c2") c.Assert(err, NotNil) r := tk.MustQuery("SELECT COUNT(*) FROM admin_test") r.Check(testkit.Rows("3")) @@ -409,6 +409,112 @@ func (s *testSuite) TestAdminCleanupIndexMore(c *C) { tk.MustExec("admin check table admin_test") } +func (s *testSuite) TestAdminCheckTableFailed(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("drop table if exists admin_test") + tk.MustExec("create table admin_test (c1 int, c2 int, c3 varchar(255) default '1', primary key(c1), key(c3), unique key(c2), key(c2, c3))") + tk.MustExec("insert admin_test (c1, c2, c3) values (-10, -20, 'y'), (-1, -10, 'z'), (1, 11, 'a'), (2, 12, 'b'), (5, 15, 'c'), (10, 20, 'd'), (20, 30, 'e')") + + // Make some corrupted index. Build the index information. + s.ctx = mock.NewContext() + s.ctx.Store = s.store + is := s.domain.InfoSchema() + dbName := model.NewCIStr("test") + tblName := model.NewCIStr("admin_test") + tbl, err := is.TableByName(dbName, tblName) + c.Assert(err, IsNil) + tblInfo := tbl.Meta() + idxInfo := tblInfo.Indices[1] + indexOpr := tables.NewIndex(tblInfo.ID, tblInfo, idxInfo) + sc := s.ctx.GetSessionVars().StmtCtx + tk.Se.GetSessionVars().IndexLookupSize = 3 + tk.Se.GetSessionVars().MaxChunkSize = 3 + + // Reduce one row of index. + // Table count > index count. + // Index c2 is missing 11. + txn, err := s.store.Begin() + c.Assert(err, IsNil) + err = indexOpr.Delete(sc, txn, types.MakeDatums(-10), -1) + c.Assert(err, IsNil) + err = txn.Commit(context.Background()) + c.Assert(err, IsNil) + err = tk.ExecToErr("admin check table admin_test") + c.Assert(err.Error(), Equals, + "[executor:8003]admin_test err:[admin:1]index: != record:&admin.RecordData{Handle:-1, Values:[]types.Datum{types.Datum{k:0x1, collation:0x0, decimal:0x0, length:0x0, i:-10, b:[]uint8(nil), x:interface {}(nil)}}}") + c.Assert(executor.ErrAdminCheckTable.Equal(err), IsTrue) + r := tk.MustQuery("admin recover index admin_test c2") + r.Check(testkit.Rows("1 7")) + tk.MustExec("admin check table admin_test") + + // Add one row of index. + // Table count < index count. + // Index c2 has one more values ​​than table data: 0, and the handle 0 hasn't correlative record. + txn, err = s.store.Begin() + c.Assert(err, IsNil) + _, err = indexOpr.Create(s.ctx, txn, types.MakeDatums(0), 0) + c.Assert(err, IsNil) + err = txn.Commit(context.Background()) + c.Assert(err, IsNil) + err = tk.ExecToErr("admin check table admin_test") + c.Assert(err.Error(), Equals, "handle 0, index:types.Datum{k:0x1, collation:0x0, decimal:0x0, length:0x0, i:0, b:[]uint8(nil), x:interface {}(nil)} != record:") + + // Add one row of index. + // Table count < index count. + // Index c2 has two more values ​​than table data: 10, 13, and these handles have correlative record. + txn, err = s.store.Begin() + c.Assert(err, IsNil) + err = indexOpr.Delete(sc, txn, types.MakeDatums(0), 0) + c.Assert(err, IsNil) + // Make sure the index value "19" is smaller "21". Then we scan to "19" before "21". + _, err = indexOpr.Create(s.ctx, txn, types.MakeDatums(19), 10) + c.Assert(err, IsNil) + _, err = indexOpr.Create(s.ctx, txn, types.MakeDatums(13), 2) + c.Assert(err, IsNil) + err = txn.Commit(context.Background()) + c.Assert(err, IsNil) + err = tk.ExecToErr("admin check table admin_test") + c.Assert(err.Error(), Equals, "col c2, handle 2, index:types.Datum{k:0x1, collation:0x0, decimal:0x0, length:0x0, i:13, b:[]uint8(nil), x:interface {}(nil)} != record:types.Datum{k:0x1, collation:0x0, decimal:0x0, length:0x0, i:12, b:[]uint8(nil), x:interface {}(nil)}") + + // Table count = index count. + // Two indices have the same handle. + txn, err = s.store.Begin() + c.Assert(err, IsNil) + err = indexOpr.Delete(sc, txn, types.MakeDatums(13), 2) + c.Assert(err, IsNil) + err = indexOpr.Delete(sc, txn, types.MakeDatums(12), 2) + c.Assert(err, IsNil) + err = txn.Commit(context.Background()) + c.Assert(err, IsNil) + err = tk.ExecToErr("admin check table admin_test") + c.Assert(err.Error(), Equals, "col c2, handle 10, index:types.Datum{k:0x1, collation:0x0, decimal:0x0, length:0x0, i:19, b:[]uint8(nil), x:interface {}(nil)} != record:types.Datum{k:0x1, collation:0x0, decimal:0x0, length:0x0, i:20, b:[]uint8(nil), x:interface {}(nil)}") + + // Table count = index count. + // Index c2 has one line of data is 19, the corresponding table data is 20. + txn, err = s.store.Begin() + c.Assert(err, IsNil) + _, err = indexOpr.Create(s.ctx, txn, types.MakeDatums(12), 2) + c.Assert(err, IsNil) + err = indexOpr.Delete(sc, txn, types.MakeDatums(20), 10) + c.Assert(err, IsNil) + err = txn.Commit(context.Background()) + c.Assert(err, IsNil) + err = tk.ExecToErr("admin check table admin_test") + c.Assert(err.Error(), Equals, "col c2, handle 10, index:types.Datum{k:0x1, collation:0x0, decimal:0x0, length:0x0, i:19, b:[]uint8(nil), x:interface {}(nil)} != record:types.Datum{k:0x1, collation:0x0, decimal:0x0, length:0x0, i:20, b:[]uint8(nil), x:interface {}(nil)}") + + // Recover records. + txn, err = s.store.Begin() + c.Assert(err, IsNil) + err = indexOpr.Delete(sc, txn, types.MakeDatums(19), 10) + c.Assert(err, IsNil) + _, err = indexOpr.Create(s.ctx, txn, types.MakeDatums(20), 10) + c.Assert(err, IsNil) + err = txn.Commit(context.Background()) + c.Assert(err, IsNil) + tk.MustExec("admin check table admin_test") +} + func (s *testSuite) TestAdminCheckTable(c *C) { // test NULL value. tk := testkit.NewTestKit(c, s.store) @@ -466,22 +572,22 @@ func (s *testSuite) TestAdminCheckTable(c *C) { // Test index in virtual generated column. tk.MustExec(`drop table if exists test`) - tk.MustExec(`create table test ( b json , c int as (JSON_EXTRACT(b,'$.d')) , index idxc(c));`) + tk.MustExec(`create table test ( b json , c int as (JSON_EXTRACT(b,'$.d')), index idxc(c));`) tk.MustExec(`INSERT INTO test set b='{"d": 100}';`) tk.MustExec(`admin check table test;`) // Test prefix index. tk.MustExec(`drop table if exists t`) tk.MustExec(`CREATE TABLE t ( - ID CHAR(32) NOT NULL, - name CHAR(32) NOT NULL, - value CHAR(255), - INDEX indexIDname (ID(8),name(8)));`) + ID CHAR(32) NOT NULL, + name CHAR(32) NOT NULL, + value CHAR(255), + INDEX indexIDname (ID(8),name(8)));`) tk.MustExec(`INSERT INTO t VALUES ('keyword','urlprefix','text/ /text');`) tk.MustExec(`admin check table t;`) tk.MustExec("use mysql") tk.MustExec(`admin check table test.t;`) - _, err := tk.Exec("admin check table t") + err := tk.ExecToErr("admin check table t") c.Assert(err, NotNil) // test add index on time type column which have default value @@ -521,7 +627,7 @@ func (s *testSuite) TestAdminCheckTable(c *C) { tk.MustExec(`drop table if exists t1`) tk.MustExec(`create table t1 (a decimal(2,1), index(a))`) tk.MustExec(`insert into t1 set a='1.9'`) - _, err = tk.Exec(`alter table t1 modify column a decimal(3,2);`) + err = tk.ExecToErr(`alter table t1 modify column a decimal(3,2);`) c.Assert(err, NotNil) c.Assert(err.Error(), Equals, "[ddl:203]unsupported modify decimal column precision") tk.MustExec(`delete from t1;`) diff --git a/executor/builder.go b/executor/builder.go index a3c0c697c33c..8bd2cf170f23 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -271,8 +271,8 @@ func (b *executorBuilder) buildCheckIndex(v *plannercore.CheckIndex) Executor { b.err = errors.Trace(err) return nil } - readerExec.ranges = ranger.FullRange() - readerExec.isCheckOp = true + + buildIndexLookUpChecker(b, v.IndexLookUpReader, readerExec) e := &CheckIndexExec{ baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()), @@ -285,12 +285,59 @@ func (b *executorBuilder) buildCheckIndex(v *plannercore.CheckIndex) Executor { return e } +// buildIndexLookUpChecker builds check information to IndexLookUpReader. +func buildIndexLookUpChecker(b *executorBuilder, readerPlan *plannercore.PhysicalIndexLookUpReader, + readerExec *IndexLookUpExecutor) { + is := readerPlan.IndexPlans[0].(*plannercore.PhysicalIndexScan) + readerExec.dagPB.OutputOffsets = make([]uint32, 0, len(is.Index.Columns)) + for i := 0; i <= len(is.Index.Columns); i++ { + readerExec.dagPB.OutputOffsets = append(readerExec.dagPB.OutputOffsets, uint32(i)) + } + readerExec.ranges = ranger.FullRange() + ts := readerPlan.TablePlans[0].(*plannercore.PhysicalTableScan) + readerExec.handleIdx = ts.HandleIdx + + tps := make([]*types.FieldType, 0, len(is.Columns)+1) + for _, col := range is.Columns { + tps = append(tps, &col.FieldType) + } + tps = append(tps, types.NewFieldType(mysql.TypeLonglong)) + readerExec.checkIndexValue = &checkIndexValue{genExprs: is.GenExprs, idxColTps: tps} + + colNames := make([]string, 0, len(is.Columns)) + for _, col := range is.Columns { + colNames = append(colNames, col.Name.O) + } + var err error + readerExec.idxTblCols, err = table.FindCols(readerExec.table.Cols(), colNames, true) + if err != nil { + b.err = errors.Trace(err) + return + } +} + func (b *executorBuilder) buildCheckTable(v *plannercore.CheckTable) Executor { + readerExecs := make([]*IndexLookUpExecutor, 0, len(v.IndexLookUpReaders)) + for _, readerPlan := range v.IndexLookUpReaders { + readerExec, err := buildNoRangeIndexLookUpReader(b, readerPlan) + if err != nil { + b.err = errors.Trace(err) + return nil + } + buildIndexLookUpChecker(b, readerPlan, readerExec) + + readerExecs = append(readerExecs, readerExec) + } + e := &CheckTableExec{ baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()), - tables: v.Tables, + dbName: v.DBName, + tblInfo: v.TblInfo, + indices: v.Indices, is: b.is, - genExprs: v.GenExprs, + srcs: readerExecs, + exitCh: make(chan struct{}), + retCh: make(chan error, len(v.Indices)), } return e } diff --git a/executor/distsql.go b/executor/distsql.go index ffd61c15036e..49d1078f9951 100644 --- a/executor/distsql.go +++ b/executor/distsql.go @@ -34,6 +34,7 @@ import ( "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/statistics" "github.com/pingcap/tidb/table" + "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/logutil" @@ -59,6 +60,7 @@ type lookupTableTask struct { handles []int64 rowIdx []int // rowIdx represents the handle index for every row. Only used when keep order. rows []chunk.Row + idxRows *chunk.Chunk cursor int doneCh chan error @@ -68,6 +70,9 @@ type lookupTableTask struct { // The handles fetched from index is originally ordered by index, but we need handles to be ordered by itself // to do table request. indexOrder map[int64]int + // duplicatedIndexOrder map likes indexOrder. But it's used when checkIndexValue isn't nil and + // the same handle of index has multiple values. + duplicatedIndexOrder map[int64]int // memUsage records the memory usage of this task calculated by table worker. // memTracker is used to release memUsage after task is done and unused. @@ -351,8 +356,8 @@ type IndexLookUpExecutor struct { // memTracker is used to track the memory usage of this executor. memTracker *memory.Tracker - // isCheckOp is used to determine whether we need to check the consistency of the index data. - isCheckOp bool + // checkIndexValue is used to check the consistency of the index data. + *checkIndexValue corColInIdxSide bool idxPlans []plannercore.PhysicalPlan @@ -363,6 +368,12 @@ type IndexLookUpExecutor struct { colLens []int } +type checkIndexValue struct { + idxColTps []*types.FieldType + idxTblCols []*table.Column + genExprs map[model.TableColumnID]expression.Expression +} + // Open implements the Executor Open interface. func (e *IndexLookUpExecutor) Open(ctx context.Context) error { var err error @@ -437,21 +448,26 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, kvRanges []k if err != nil { return errors.Trace(err) } + tps := []*types.FieldType{types.NewFieldType(mysql.TypeLonglong)} + if e.checkIndexValue != nil { + tps = e.idxColTps + } // Since the first read only need handle information. So its returned col is only 1. - result, err := distsql.Select(ctx, e.ctx, kvReq, []*types.FieldType{types.NewFieldType(mysql.TypeLonglong)}, e.feedback) + result, err := distsql.Select(ctx, e.ctx, kvReq, tps, e.feedback) if err != nil { return errors.Trace(err) } result.Fetch(ctx) worker := &indexWorker{ - idxLookup: e, - workCh: workCh, - finished: e.finished, - resultCh: e.resultCh, - keepOrder: e.keepOrder, - batchSize: initBatchSize, - maxBatchSize: e.ctx.GetSessionVars().IndexLookupSize, - maxChunkSize: e.maxChunkSize, + idxLookup: e, + workCh: workCh, + finished: e.finished, + resultCh: e.resultCh, + keepOrder: e.keepOrder, + batchSize: initBatchSize, + checkIndexValue: e.checkIndexValue, + maxBatchSize: e.ctx.GetSessionVars().IndexLookupSize, + maxChunkSize: e.maxChunkSize, } if worker.batchSize > worker.maxBatchSize { worker.batchSize = worker.maxBatchSize @@ -481,13 +497,14 @@ func (e *IndexLookUpExecutor) startTableWorker(ctx context.Context, workCh <-cha e.tblWorkerWg.Add(lookupConcurrencyLimit) for i := 0; i < lookupConcurrencyLimit; i++ { worker := &tableWorker{ - workCh: workCh, - finished: e.finished, - buildTblReader: e.buildTableReader, - keepOrder: e.keepOrder, - handleIdx: e.handleIdx, - isCheckOp: e.isCheckOp, - memTracker: memory.NewTracker("tableWorker", -1), + idxLookup: e, + workCh: workCh, + finished: e.finished, + buildTblReader: e.buildTableReader, + keepOrder: e.keepOrder, + handleIdx: e.handleIdx, + checkIndexValue: e.checkIndexValue, + memTracker: memory.NewTracker("tableWorker", -1), } worker.memTracker.AttachTo(e.memTracker) ctx1, cancel := context.WithCancel(ctx) @@ -603,6 +620,9 @@ type indexWorker struct { batchSize int maxBatchSize int maxChunkSize int + + // checkIndexValue is used to check the consistency of the index data. + *checkIndexValue } // fetchHandles fetches a batch of handles from index data and builds the index lookup tasks. @@ -626,9 +646,14 @@ func (w *indexWorker) fetchHandles(ctx context.Context, result distsql.SelectRes } } }() - chk := chunk.NewChunkWithCapacity([]*types.FieldType{types.NewFieldType(mysql.TypeLonglong)}, w.idxLookup.maxChunkSize) + var chk *chunk.Chunk + if w.checkIndexValue != nil { + chk = chunk.NewChunkWithCapacity(w.idxColTps, w.maxChunkSize) + } else { + chk = chunk.NewChunkWithCapacity([]*types.FieldType{types.NewFieldType(mysql.TypeLonglong)}, w.idxLookup.maxChunkSize) + } for { - handles, err := w.extractTaskHandles(ctx, chk, result) + handles, retChunk, err := w.extractTaskHandles(ctx, chk, result) if err != nil { doneCh := make(chan error, 1) doneCh <- errors.Trace(err) @@ -640,7 +665,7 @@ func (w *indexWorker) fetchHandles(ctx context.Context, result distsql.SelectRes if len(handles) == 0 { return nil } - task := w.buildTableTask(handles) + task := w.buildTableTask(handles, retChunk) select { case <-ctx.Done(): return nil @@ -652,30 +677,40 @@ func (w *indexWorker) fetchHandles(ctx context.Context, result distsql.SelectRes } } -func (w *indexWorker) extractTaskHandles(ctx context.Context, chk *chunk.Chunk, idxResult distsql.SelectResult) (handles []int64, err error) { +func (w *indexWorker) extractTaskHandles(ctx context.Context, chk *chunk.Chunk, idxResult distsql.SelectResult) ( + handles []int64, retChk *chunk.Chunk, err error) { + handleOffset := chk.NumCols() - 1 handles = make([]int64, 0, w.batchSize) for len(handles) < w.batchSize { chk.SetRequiredRows(w.batchSize-len(handles), w.maxChunkSize) err = errors.Trace(idxResult.Next(ctx, chk)) if err != nil { - return handles, err + return handles, nil, err } if chk.NumRows() == 0 { - return handles, nil + return handles, retChk, nil } for i := 0; i < chk.NumRows(); i++ { - handles = append(handles, chk.GetRow(i).GetInt64(0)) + h := chk.GetRow(i).GetInt64(handleOffset) + handles = append(handles, h) + } + if w.checkIndexValue != nil { + if retChk == nil { + retChk = chunk.NewChunkWithCapacity(w.idxColTps, w.batchSize) + } + retChk.Append(chk, 0, chk.NumRows()) } } w.batchSize *= 2 if w.batchSize > w.maxBatchSize { w.batchSize = w.maxBatchSize } - return handles, nil + return handles, retChk, nil } -func (w *indexWorker) buildTableTask(handles []int64) *lookupTableTask { +func (w *indexWorker) buildTableTask(handles []int64, retChk *chunk.Chunk) *lookupTableTask { var indexOrder map[int64]int + var duplicatedIndexOrder map[int64]int if w.keepOrder { // Save the index order. indexOrder = make(map[int64]int, len(handles)) @@ -683,16 +718,34 @@ func (w *indexWorker) buildTableTask(handles []int64) *lookupTableTask { indexOrder[h] = i } } + + if w.checkIndexValue != nil { + // Save the index order. + indexOrder = make(map[int64]int, len(handles)) + duplicatedIndexOrder = make(map[int64]int) + for i, h := range handles { + if _, ok := indexOrder[h]; ok { + duplicatedIndexOrder[h] = i + } else { + indexOrder[h] = i + } + } + } + task := &lookupTableTask{ - handles: handles, - indexOrder: indexOrder, + handles: handles, + indexOrder: indexOrder, + duplicatedIndexOrder: duplicatedIndexOrder, + idxRows: retChk, } + task.doneCh = make(chan error, 1) return task } // tableWorker is used by IndexLookUpExecutor to maintain table lookup background goroutines. type tableWorker struct { + idxLookup *IndexLookUpExecutor workCh <-chan *lookupTableTask finished <-chan struct{} buildTblReader func(ctx context.Context, handles []int64) (Executor, error) @@ -702,8 +755,8 @@ type tableWorker struct { // memTracker is used to track the memory usage of this executor. memTracker *memory.Tracker - // isCheckOp is used to determine whether we need to check the consistency of the index data. - isCheckOp bool + // checkIndexValue is used to check the consistency of the index data. + *checkIndexValue } // pickAndExecTask picks tasks from workCh, and execute them. @@ -736,6 +789,66 @@ func (w *tableWorker) pickAndExecTask(ctx context.Context) { } } +func (w *tableWorker) compareData(ctx context.Context, task *lookupTableTask, tableReader Executor) error { + chk := tableReader.newFirstChunk() + tblInfo := w.idxLookup.table.Meta() + vals := make([]types.Datum, 0, len(w.idxTblCols)) + for { + err := tableReader.Next(ctx, chk) + if err != nil { + return errors.Trace(err) + } + if chk.NumRows() == 0 { + for h := range task.indexOrder { + idxRow := task.idxRows.GetRow(task.indexOrder[h]) + return errors.Errorf("handle %#v, index:%#v != record:%#v", h, idxRow.GetDatum(0, w.idxColTps[0]), nil) + } + break + } + + tblReaderExec := tableReader.(*TableReaderExecutor) + iter := chunk.NewIterator4Chunk(chk) + for row := iter.Begin(); row != iter.End(); row = iter.Next() { + handle := row.GetInt64(w.handleIdx) + offset, ok := task.indexOrder[handle] + if !ok { + offset = task.duplicatedIndexOrder[handle] + } + delete(task.indexOrder, handle) + idxRow := task.idxRows.GetRow(offset) + vals = vals[:0] + for i, col := range w.idxTblCols { + if col.IsGenerated() && !col.GeneratedStored { + expr := w.genExprs[model.TableColumnID{TableID: tblInfo.ID, ColumnID: col.ID}] + // Eval the column value + val, err := expr.Eval(row) + if err != nil { + return errors.Trace(err) + } + val, err = table.CastValue(tblReaderExec.ctx, val, col.ColumnInfo) + if err != nil { + return errors.Trace(err) + } + vals = append(vals, val) + } else { + vals = append(vals, row.GetDatum(i, &col.FieldType)) + } + } + vals = tables.TruncateIndexValuesIfNeeded(tblInfo, w.idxLookup.index, vals) + for i, val := range vals { + col := w.idxTblCols[i] + tp := &col.FieldType + ret := chunk.Compare(idxRow, i, &val) + if ret != 0 { + return errors.Errorf("col %s, handle %#v, index:%#v != record:%#v", col.Name, handle, idxRow.GetDatum(i, tp), val) + } + } + } + } + + return nil +} + // executeTask executes the table look up tasks. We will construct a table reader and send request by handles. // Then we hold the returning rows and finish this task. func (w *tableWorker) executeTask(ctx context.Context, task *lookupTableTask) error { @@ -746,6 +859,10 @@ func (w *tableWorker) executeTask(ctx context.Context, task *lookupTableTask) er } defer terror.Call(tableReader.Close) + if w.checkIndexValue != nil { + return w.compareData(ctx, task, tableReader) + } + task.memTracker = w.memTracker memUsage := int64(cap(task.handles) * 8) task.memUsage = memUsage @@ -785,16 +902,6 @@ func (w *tableWorker) executeTask(ctx context.Context, task *lookupTableTask) er sort.Sort(task) } - if w.isCheckOp && handleCnt != len(task.rows) { - obtainedHandlesMap := make(map[int64]struct{}, len(task.rows)) - for _, row := range task.rows { - handle := row.GetInt64(w.handleIdx) - obtainedHandlesMap[handle] = struct{}{} - } - return errors.Errorf("handle count %d isn't equal to value count %d, missing handles %v in a batch", - handleCnt, len(task.rows), GetLackHandles(task.handles, obtainedHandlesMap)) - } - return nil } diff --git a/executor/executor.go b/executor/executor.go index 3fc58f632be8..5d0a6e415f24 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -39,6 +39,7 @@ import ( "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/admin" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/execdetails" @@ -430,11 +431,14 @@ func getTableName(is infoschema.InfoSchema, id int64) string { type CheckTableExec struct { baseExecutor - tables []*ast.TableName - done bool - is infoschema.InfoSchema - - genExprs map[model.TableColumnID]expression.Expression + dbName string + tblInfo *model.TableInfo + indices []table.Index + srcs []*IndexLookUpExecutor + done bool + is infoschema.InfoSchema + exitCh chan struct{} + retCh chan error } // Open implements the Executor Open interface. @@ -442,62 +446,131 @@ func (e *CheckTableExec) Open(ctx context.Context) error { if err := e.baseExecutor.Open(ctx); err != nil { return errors.Trace(err) } + for _, src := range e.srcs { + if err := src.Open(ctx); err != nil { + return errors.Trace(err) + } + } e.done = false return nil } +// Close implements the Executor Close interface. +func (e *CheckTableExec) Close() error { + var firstErr error + for _, src := range e.srcs { + if err := src.Close(); err != nil && firstErr == nil { + firstErr = err + } + } + return firstErr +} + +func (e *CheckTableExec) checkIndexHandle(ctx context.Context, num int, src *IndexLookUpExecutor) error { + cols := src.schema.Columns + retFieldTypes := make([]*types.FieldType, len(cols)) + for i := range cols { + retFieldTypes[i] = cols[i].RetType + } + chk := chunk.New(retFieldTypes, e.initCap, e.maxChunkSize) + + var err error + for { + err = src.Next(ctx, chk) + if err != nil { + break + } + if chk.NumRows() == 0 { + break + } + + select { + case <-e.exitCh: + return nil + default: + } + } + e.retCh <- errors.Trace(err) + return errors.Trace(err) +} + +func (e *CheckTableExec) handlePanic(r interface{}) { + if r != nil { + e.retCh <- errors.Errorf("%v", r) + } +} + // Next implements the Executor Next interface. -func (e *CheckTableExec) Next(ctx context.Context, chk *chunk.Chunk) error { - if e.done { +func (e *CheckTableExec) Next(ctx context.Context, req *chunk.Chunk) error { + if e.done || len(e.srcs) == 0 { return nil } defer func() { e.done = true }() - for _, t := range e.tables { - dbName := t.DBInfo.Name - tb, err := e.is.TableByName(dbName, t.Name) - if err != nil { - return errors.Trace(err) + + idxNames := make([]string, 0, len(e.indices)) + for _, idx := range e.indices { + idxNames = append(idxNames, idx.Meta().Name.O) + } + greater, idxOffset, err := admin.CheckIndicesCount(e.ctx, e.dbName, e.tblInfo.Name.O, idxNames) + if err != nil { + tbl := e.srcs[idxOffset].table + if greater == admin.IdxCntGreater { + err = e.checkIndexHandle(ctx, idxOffset, e.srcs[idxOffset]) + } else if greater == admin.TblCntGreater { + err = e.checkTableRecord(tbl, idxOffset) } - if tb.Meta().GetPartitionInfo() != nil { - err = e.doCheckPartitionedTable(tb.(table.PartitionedTable)) - } else { - err = e.doCheckTable(tb) + if err != nil && admin.ErrDataInConsistent.Equal(err) { + return ErrAdminCheckTable.GenWithStack("%v err:%v", tbl.Meta().Name, err) } - if err != nil { - logutil.Logger(ctx).Warn("check table failed", zap.String("tableName", t.Name.O), zap.Error(err)) - if admin.ErrDataInConsistent.Equal(err) { - return ErrAdminCheckTable.GenWithStack("%v err:%v", t.Name, err) - } + return errors.Trace(err) + } - return errors.Errorf("%v err:%v", t.Name, err) + // The number of table rows is equal to the number of index rows. + // TODO: Make the value of concurrency adjustable. And we can consider the number of records. + concurrency := 3 + wg := sync.WaitGroup{} + for i := range e.srcs { + wg.Add(1) + go func(num int) { + defer wg.Done() + util.WithRecovery(func() { + err1 := e.checkIndexHandle(ctx, num, e.srcs[num]) + if err1 != nil { + logutil.Logger(ctx).Info("check index handle failed", zap.Error(err)) + } + }, e.handlePanic) + }(i) + + if (i+1)%concurrency == 0 { + wg.Wait() } } - return nil -} -func (e *CheckTableExec) doCheckPartitionedTable(tbl table.PartitionedTable) error { - info := tbl.Meta().GetPartitionInfo() - for _, def := range info.Definitions { - pid := def.ID - partition := tbl.GetPartition(pid) - if err := e.doCheckTable(partition); err != nil { + for i := 0; i < len(e.srcs); i++ { + err = <-e.retCh + if err != nil { return errors.Trace(err) } } return nil } -func (e *CheckTableExec) doCheckTable(tbl table.Table) error { +func (e *CheckTableExec) checkTableRecord(tbl table.Table, idxOffset int) error { + idx := e.indices[idxOffset] + genExprs := e.srcs[idxOffset].genExprs txn, err := e.ctx.Txn(true) if err != nil { return errors.Trace(err) } - for _, idx := range tbl.Indices() { - if idx.Meta().State != model.StatePublic { - continue - } - err := admin.CompareIndexData(e.ctx, txn, tbl, idx, e.genExprs) - if err != nil { + if tbl.Meta().GetPartitionInfo() == nil { + return admin.CheckRecordAndIndex(e.ctx, txn, tbl, idx, genExprs) + } + + info := tbl.Meta().GetPartitionInfo() + for _, def := range info.Definitions { + pid := def.ID + partition := tbl.(table.PartitionedTable).GetPartition(pid) + if err := admin.CheckRecordAndIndex(e.ctx, txn, partition, idx, genExprs); err != nil { return errors.Trace(err) } } @@ -542,7 +615,7 @@ func (e *CheckIndexExec) Next(ctx context.Context, chk *chunk.Chunk) error { } defer func() { e.done = true }() - err := admin.CheckIndicesCount(e.ctx, e.dbName, e.tableName, []string{e.idxName}) + _, _, err := admin.CheckIndicesCount(e.ctx, e.dbName, e.tableName, []string{e.idxName}) if err != nil { return errors.Trace(err) } diff --git a/executor/executor_test.go b/executor/executor_test.go index 249be76f89ce..5f5add10cf87 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -3056,7 +3056,7 @@ func (s *testSuite) TestCheckIndex(c *C) { c.Assert(err, IsNil) _, err = se.Execute(context.Background(), "admin check index t c") c.Assert(err, NotNil) - c.Assert(strings.Contains(err.Error(), "isn't equal to value count"), IsTrue) + c.Assert(err.Error(), Equals, "handle 3, index:types.Datum{k:0x1, collation:0x0, decimal:0x0, length:0x0, i:30, b:[]uint8(nil), x:interface {}(nil)} != record:") // set data to: // index data (handle, data): (1, 10), (2, 20), (3, 30), (4, 40) diff --git a/planner/core/common_plans.go b/planner/core/common_plans.go index 0d0157dece97..27eb2c3307d1 100644 --- a/planner/core/common_plans.go +++ b/planner/core/common_plans.go @@ -72,9 +72,10 @@ type ShowNextRowID struct { type CheckTable struct { baseSchemaProducer - Tables []*ast.TableName - - GenExprs map[model.TableColumnID]expression.Expression + DBName string + TblInfo *model.TableInfo + Indices []table.Index + IndexLookUpReaders []*PhysicalIndexLookUpReader } // RecoverIndex is used for backfilling corrupted index data. diff --git a/planner/core/physical_plans.go b/planner/core/physical_plans.go index ca32af5651f2..a345719aac6a 100644 --- a/planner/core/physical_plans.go +++ b/planner/core/physical_plans.go @@ -118,6 +118,8 @@ type PhysicalIndexScan struct { // The index scan may be on a partition. isPartition bool physicalTableID int64 + + GenExprs map[model.TableColumnID]expression.Expression } // PhysicalMemTable reads memory table. @@ -159,6 +161,9 @@ type PhysicalTableScan struct { physicalTableID int64 rangeDecidedBy []*expression.Column + + // HandleIdx is the index of handle, which is only used for admin check table. + HandleIdx int } // IsPartition returns true and partition ID if it's actually a partition. diff --git a/planner/core/planbuilder.go b/planner/core/planbuilder.go index 83aacdcec018..107035efc2e9 100644 --- a/planner/core/planbuilder.go +++ b/planner/core/planbuilder.go @@ -15,8 +15,8 @@ package core import ( "bytes" + "context" "fmt" - "github.com/pingcap/tidb/util/chunk" "strings" "github.com/cznic/mathutil" @@ -35,8 +35,11 @@ import ( "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/types/parser_driver" + "github.com/pingcap/tidb/util/chunk" + "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/ranger" "github.com/pingcap/tidb/util/schemautil" + "go.uber.org/zap" ) type visitInfo struct { @@ -477,42 +480,7 @@ func (b *planBuilder) buildCheckIndex(dbName model.CIStr, as *ast.AdminStmt) (Pl return nil, errors.Errorf("index %s state %s isn't public", as.Index, idx.State) } - id := 1 - columns := make([]*model.ColumnInfo, 0, len(idx.Columns)) - schema := expression.NewSchema(make([]*expression.Column, 0, len(idx.Columns))...) - for _, idxCol := range idx.Columns { - for _, col := range tblInfo.Columns { - if idxCol.Name.L == col.Name.L { - columns = append(columns, col) - schema.Append(&expression.Column{ - ColName: col.Name, - UniqueID: b.ctx.GetSessionVars().AllocPlanColumnID(), - RetType: &col.FieldType, - }) - } - } - } - is := PhysicalIndexScan{ - Table: tblInfo, - TableAsName: &tblName.Name, - DBName: dbName, - Columns: columns, - Index: idx, - dataSourceSchema: schema, - Ranges: ranger.FullRange(), - KeepOrder: false, - }.init(b.ctx) - is.stats = &property.StatsInfo{} - cop := &copTask{indexPlan: is} - // It's double read case. - ts := PhysicalTableScan{Columns: columns, Table: is.Table}.init(b.ctx) - ts.SetSchema(is.dataSourceSchema) - cop.tablePlan = ts - is.initSchema(id, idx, true) - t := finishCopTask(b.ctx, cop) - - rootT := t.(*rootTask) - return rootT.p, nil + return b.buildPhysicalIndexLookUpReader(dbName, tbl, idx, 1) } func (b *planBuilder) buildAdmin(as *ast.AdminStmt) (Plan, error) { @@ -596,43 +564,188 @@ func (b *planBuilder) buildAdmin(as *ast.AdminStmt) (Plan, error) { return ret, nil } -func (b *planBuilder) buildAdminCheckTable(as *ast.AdminStmt) (*CheckTable, error) { - p := &CheckTable{Tables: as.Tables} - p.GenExprs = make(map[model.TableColumnID]expression.Expression, len(p.Tables)) - +// getGenExprs gets generated expressions map. +func (b *planBuilder) getGenExprs(dbName model.CIStr, tbl table.Table, idx *model.IndexInfo) ( + map[model.TableColumnID]expression.Expression, error) { + tblInfo := tbl.Meta() + genExprsMap := make(map[model.TableColumnID]expression.Expression) + exprs := make([]expression.Expression, 0, len(tbl.Cols())) + genExprIdxs := make([]model.TableColumnID, len(tbl.Cols())) mockTablePlan := LogicalTableDual{}.init(b.ctx) - for _, tbl := range p.Tables { - tableInfo := tbl.TableInfo - schema := expression.TableInfo2SchemaWithDBName(b.ctx, tbl.Schema, tableInfo) - table, ok := b.is.TableByID(tableInfo.ID) - if !ok { - return nil, infoschema.ErrTableNotExists.GenWithStackByArgs(tbl.DBInfo.Name.O, tableInfo.Name.O) + mockTablePlan.SetSchema(expression.TableInfo2SchemaWithDBName(b.ctx, dbName, tblInfo)) + for i, colExpr := range mockTablePlan.Schema().Columns { + col := tbl.Cols()[i] + var expr expression.Expression + expr = colExpr + if col.IsGenerated() && !col.GeneratedStored { + var err error + expr, _, err = b.rewrite(col.GeneratedExpr, mockTablePlan, nil, true) + if err != nil { + return nil, errors.Trace(err) + } + expr = expression.BuildCastFunction(b.ctx, expr, colExpr.GetType()) + found := false + for _, column := range idx.Columns { + if strings.EqualFold(col.Name.L, column.Name.L) { + found = true + break + } + } + if found { + genColumnID := model.TableColumnID{TableID: tblInfo.ID, ColumnID: col.ColumnInfo.ID} + genExprsMap[genColumnID] = expr + genExprIdxs[i] = genColumnID + } } + exprs = append(exprs, expr) + } + // Re-iterate expressions to handle those virtual generated columns that refers to the other generated columns. + for i, expr := range exprs { + exprs[i] = expression.ColumnSubstitute(expr, mockTablePlan.Schema(), exprs) + if _, ok := genExprsMap[genExprIdxs[i]]; ok { + genExprsMap[genExprIdxs[i]] = exprs[i] + } + } + return genExprsMap, nil +} - mockTablePlan.SetSchema(schema) +func (b *planBuilder) buildPhysicalIndexLookUpReader(dbName model.CIStr, tbl table.Table, idx *model.IndexInfo, id int) (Plan, error) { + genExprsMap, err := b.getGenExprs(dbName, tbl, idx) + if err != nil { + return nil, errors.Trace(err) + } - // Calculate generated columns. - columns := table.Cols() - for _, column := range columns { - if !column.IsGenerated() { - continue + // Get generated columns. + var genCols []*expression.Column + pkOffset := -1 + tblInfo := tbl.Meta() + colsMap := make(map[int64]struct{}) + schema := expression.NewSchema(make([]*expression.Column, 0, len(idx.Columns))...) + idxReaderCols := make([]*model.ColumnInfo, 0, len(idx.Columns)) + tblReaderCols := make([]*model.ColumnInfo, 0, len(tbl.Cols())) + for _, idxCol := range idx.Columns { + for _, col := range tblInfo.Columns { + if idxCol.Name.L == col.Name.L { + idxReaderCols = append(idxReaderCols, col) + tblReaderCols = append(tblReaderCols, col) + schema.Append(&expression.Column{ + ColName: col.Name, + UniqueID: b.ctx.GetSessionVars().AllocPlanColumnID(), + RetType: &col.FieldType}) + colsMap[col.ID] = struct{}{} + if mysql.HasPriKeyFlag(col.Flag) { + pkOffset = len(tblReaderCols) - 1 + } } - columnName := &ast.ColumnName{Name: column.Name} - columnName.SetText(column.Name.O) - - colExpr, _, err := mockTablePlan.findColumn(columnName) - if err != nil { - return nil, errors.Trace(err) + genColumnID := model.TableColumnID{TableID: tblInfo.ID, ColumnID: col.ID} + if expr, ok := genExprsMap[genColumnID]; ok { + cols := expression.ExtractColumns(expr) + genCols = append(genCols, cols...) } - - expr, _, err := b.rewrite(column.GeneratedExpr, mockTablePlan, nil, true) - if err != nil { - return nil, errors.Trace(err) + } + } + // Add generated columns to tblSchema and tblReaderCols. + tblSchema := schema.Clone() + for _, col := range genCols { + if _, ok := colsMap[col.ID]; !ok { + c := table.FindCol(tbl.Cols(), col.ColName.O) + if c != nil { + col.Index = len(tblReaderCols) + tblReaderCols = append(tblReaderCols, c.ColumnInfo) + tblSchema.Append(&expression.Column{ + ColName: c.Name, + UniqueID: b.ctx.GetSessionVars().AllocPlanColumnID(), + RetType: &c.FieldType}) + colsMap[c.ID] = struct{}{} + if mysql.HasPriKeyFlag(c.Flag) { + pkOffset = len(tblReaderCols) - 1 + } } - expr = expression.BuildCastFunction(b.ctx, expr, colExpr.GetType()) - p.GenExprs[model.TableColumnID{TableID: tableInfo.ID, ColumnID: column.ColumnInfo.ID}] = expr } } + if !tbl.Meta().PKIsHandle || pkOffset == -1 { + tblReaderCols = append(tblReaderCols, model.NewExtraHandleColInfo()) + handleCol := &expression.Column{ + DBName: dbName, + TblName: tblInfo.Name, + ColName: model.ExtraHandleName, + RetType: types.NewFieldType(mysql.TypeLonglong), + UniqueID: b.ctx.GetSessionVars().AllocPlanColumnID(), + ID: model.ExtraHandleID, + } + tblSchema.Append(handleCol) + pkOffset = len(tblReaderCols) - 1 + } + + is := PhysicalIndexScan{ + Table: tblInfo, + TableAsName: &tblInfo.Name, + DBName: dbName, + Columns: idxReaderCols, + Index: idx, + dataSourceSchema: schema, + Ranges: ranger.FullRange(), + GenExprs: genExprsMap, + }.init(b.ctx) + is.stats = property.NewSimpleStats(0) + // It's double read case. + ts := PhysicalTableScan{Columns: tblReaderCols, Table: is.Table}.init(b.ctx) + ts.SetSchema(tblSchema) + cop := &copTask{indexPlan: is, tablePlan: ts} + ts.HandleIdx = pkOffset + is.initSchema(id, idx, true) + rootT := finishCopTask(b.ctx, cop).(*rootTask) + return rootT.p, nil +} + +func (b *planBuilder) buildPhysicalIndexLookUpReaders(dbName model.CIStr, tbl table.Table) ([]Plan, []table.Index, error) { + tblInfo := tbl.Meta() + // get index information + indices := make([]table.Index, 0, len(tblInfo.Indices)) + indexLookUpReaders := make([]Plan, 0, len(tblInfo.Indices)) + for i, idx := range tbl.Indices() { + idxInfo := idx.Meta() + if idxInfo.State != model.StatePublic { + logutil.Logger(context.Background()).Info("build physical index lookup reader, the index isn't public", + zap.String("index", idxInfo.Name.O), zap.Stringer("state", idxInfo.State), zap.String("table", tblInfo.Name.O)) + continue + } + indices = append(indices, idx) + reader, err := b.buildPhysicalIndexLookUpReader(dbName, tbl, idxInfo, i) + if err != nil { + return nil, nil, err + } + indexLookUpReaders = append(indexLookUpReaders, reader) + } + if len(indexLookUpReaders) == 0 { + return nil, nil, nil + } + return indexLookUpReaders, indices, nil +} + +func (b *planBuilder) buildAdminCheckTable(as *ast.AdminStmt) (*CheckTable, error) { + tbl := as.Tables[0] + p := &CheckTable{ + DBName: tbl.Schema.O, + TblInfo: tbl.TableInfo, + } + + tableInfo := as.Tables[0].TableInfo + table, ok := b.is.TableByID(tableInfo.ID) + if !ok { + return nil, infoschema.ErrTableNotExists.GenWithStackByArgs(tbl.DBInfo.Name.O, tableInfo.Name.O) + } + + readerPlans, indices, err := b.buildPhysicalIndexLookUpReaders(tbl.Schema, table) + if err != nil { + return nil, errors.Trace(err) + } + readers := make([]*PhysicalIndexLookUpReader, 0, len(readerPlans)) + for _, plan := range readerPlans { + readers = append(readers, plan.(*PhysicalIndexLookUpReader)) + } + p.Indices = indices + p.IndexLookUpReaders = readers return p, nil } diff --git a/util/admin/admin.go b/util/admin/admin.go index 7bb2feeddbe7..8657c16554b4 100644 --- a/util/admin/admin.go +++ b/util/admin/admin.go @@ -14,8 +14,10 @@ package admin import ( + "context" "fmt" "io" + "math" "sort" "time" @@ -37,7 +39,6 @@ import ( "github.com/pingcap/tidb/util/rowDecoder" "github.com/pingcap/tidb/util/sqlexec" "go.uber.org/zap" - "golang.org/x/net/context" ) // DDLInfo is for DDL information. @@ -272,28 +273,46 @@ func getCount(ctx sessionctx.Context, sql string) (int64, error) { return rows[0].GetInt64(0), nil } +// Count greater Types +const ( + // TblCntGreater means that the number of table rows is more than the number of index rows. + TblCntGreater byte = 1 + // IdxCntGreater means that the number of index rows is more than the number of table rows. + IdxCntGreater byte = 2 +) + // CheckIndicesCount compares indices count with table count. +// It returns the count greater type, the index offset and an error. // It returns nil if the count from the index is equal to the count from the table columns, -// otherwise it returns an error with a different information. -func CheckIndicesCount(ctx sessionctx.Context, dbName, tableName string, indices []string) error { +// otherwise it returns an error and the corresponding index's offset. +func CheckIndicesCount(ctx sessionctx.Context, dbName, tableName string, indices []string) (byte, int, error) { // Add `` for some names like `table name`. sql := fmt.Sprintf("SELECT COUNT(*) FROM `%s`.`%s`", dbName, tableName) tblCnt, err := getCount(ctx, sql) if err != nil { - return errors.Trace(err) + return 0, 0, errors.Trace(err) } - for _, idx := range indices { + for i, idx := range indices { sql = fmt.Sprintf("SELECT COUNT(*) FROM `%s`.`%s` USE INDEX(`%s`)", dbName, tableName, idx) idxCnt, err := getCount(ctx, sql) if err != nil { - return errors.Trace(err) + return 0, i, errors.Trace(err) } - if tblCnt != idxCnt { - return errors.Errorf("table count %d != index(%s) count %d", tblCnt, idx, idxCnt) + logutil.Logger(context.Background()).Info("check indices count", + zap.String("table", tableName), zap.Int64("cnt", tblCnt), zap.Reflect("index", idx), zap.Int64("cnt", idxCnt)) + if tblCnt == idxCnt { + continue } - } - return nil + var ret byte + if tblCnt > idxCnt { + ret = TblCntGreater + } else if idxCnt > tblCnt { + ret = IdxCntGreater + } + return ret, i, errors.Errorf("table count %d != index(%s) count %d", tblCnt, idx, idxCnt) + } + return 0, 0, nil } // ScanIndexData scans the index handles and values in a limited number, according to the index information. @@ -449,7 +468,7 @@ func CheckRecordAndIndex(sessCtx sessionctx.Context, txn kv.Transaction, t table cols[i] = t.Cols()[col.Offset] } - startKey := t.RecordKey(0) + startKey := t.RecordKey(math.MinInt64) filterFunc := func(h1 int64, vals1 []types.Datum, cols []*table.Column) (bool, error) { for i, val := range vals1 { col := cols[i] @@ -606,7 +625,7 @@ func makeRowDecoder(t table.Table, decodeCol []*table.Column, genExpr map[model. decoder.SubstituteGenColsInDecodeColMap(decodeColsMap) decoder.RemoveUnusedVirtualCols(decodeColsMap, decodeCol) } - return decoder.NewRowDecoder(t.Cols(), decodeColsMap) + return decoder.NewRowDecoder(t, decodeColsMap) } // genExprs use to calculate generated column value. @@ -634,7 +653,7 @@ func rowWithCols(sessCtx sessionctx.Context, txn kv.Retriever, t table.Table, h } } - rowMap, err := rowDecoder.DecodeAndEvalRowWithMap(sessCtx, value, sessCtx.GetSessionVars().Location(), time.UTC, nil) + rowMap, err := rowDecoder.DecodeAndEvalRowWithMap(sessCtx, h, value, sessCtx.GetSessionVars().Location(), time.UTC, nil) if err != nil { return nil, errors.Trace(err) } @@ -698,7 +717,7 @@ func iterRecords(sessCtx sessionctx.Context, retriever kv.Retriever, t table.Tab return errors.Trace(err) } - rowMap, err := rowDecoder.DecodeAndEvalRowWithMap(sessCtx, it.Value(), sessCtx.GetSessionVars().Location(), time.UTC, nil) + rowMap, err := rowDecoder.DecodeAndEvalRowWithMap(sessCtx, handle, it.Value(), sessCtx.GetSessionVars().Location(), time.UTC, nil) if err != nil { return errors.Trace(err) } diff --git a/util/admin/admin_test.go b/util/admin/admin_test.go index d9d1a6a21e78..b30d63aa9ff3 100644 --- a/util/admin/admin_test.go +++ b/util/admin/admin_test.go @@ -424,7 +424,7 @@ func (s *testSuite) testIndex(c *C, ctx sessionctx.Context, dbName string, tb ta c.Assert(err, IsNil) idxNames := []string{idx.Meta().Name.L} - err = CheckIndicesCount(ctx, dbName, tb.Meta().Name.L, idxNames) + _, _, err = CheckIndicesCount(ctx, dbName, tb.Meta().Name.L, idxNames) c.Assert(err, IsNil) mockCtx := mock.NewContext() @@ -446,7 +446,7 @@ func (s *testSuite) testIndex(c *C, ctx sessionctx.Context, dbName string, tb ta diffMsg := newDiffRetError("index", record1, nil) c.Assert(err.Error(), DeepEquals, diffMsg) - err = CheckIndicesCount(ctx, dbName, tb.Meta().Name.L, idxNames) + _, _, err = CheckIndicesCount(ctx, dbName, tb.Meta().Name.L, idxNames) c.Assert(err, IsNil) // set data to: @@ -505,7 +505,7 @@ func (s *testSuite) testIndex(c *C, ctx sessionctx.Context, dbName string, tb ta diffMsg = newDiffRetError("index", record1, nil) c.Assert(err.Error(), DeepEquals, diffMsg) - err = CheckIndicesCount(ctx, dbName, tb.Meta().Name.L, idxNames) + _, _, err = CheckIndicesCount(ctx, dbName, tb.Meta().Name.L, idxNames) c.Assert(err.Error(), Equals, "table count 3 != index(c) count 4") // set data to: @@ -525,7 +525,7 @@ func (s *testSuite) testIndex(c *C, ctx sessionctx.Context, dbName string, tb ta diffMsg = newDiffRetError("index", nil, record1) c.Assert(err.Error(), DeepEquals, diffMsg) - err = CheckIndicesCount(ctx, dbName, tb.Meta().Name.L, idxNames) + _, _, err = CheckIndicesCount(ctx, dbName, tb.Meta().Name.L, idxNames) c.Assert(err.Error(), Equals, "table count 4 != index(c) count 3") } diff --git a/util/rowDecoder/decoder.go b/util/rowDecoder/decoder.go index 09146ae10f0c..e42d5d3852b5 100644 --- a/util/rowDecoder/decoder.go +++ b/util/rowDecoder/decoder.go @@ -18,11 +18,11 @@ import ( "time" "github.com/pingcap/errors" - "github.com/pingcap/parser/model" + "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb/expression" - "github.com/pingcap/tidb/mysql" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/table" + "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" @@ -30,24 +30,26 @@ import ( // Column contains the info and generated expr of column. type Column struct { - Info *model.ColumnInfo + Col *table.Column GenExpr expression.Expression } // RowDecoder decodes a byte slice into datums and eval the generated column value. type RowDecoder struct { + tbl table.Table mutRow chunk.MutRow columns map[int64]Column colTypes map[int64]*types.FieldType haveGenColumn bool + defaultVals []types.Datum } // NewRowDecoder returns a new RowDecoder. -func NewRowDecoder(cols []*table.Column, decodeColMap map[int64]Column) *RowDecoder { +func NewRowDecoder(tbl table.Table, decodeColMap map[int64]Column) *RowDecoder { colFieldMap := make(map[int64]*types.FieldType, len(decodeColMap)) haveGenCol := false for id, col := range decodeColMap { - colFieldMap[id] = &col.Info.FieldType + colFieldMap[id] = &col.Col.ColumnInfo.FieldType if col.GenExpr != nil { haveGenCol = true } @@ -58,20 +60,23 @@ func NewRowDecoder(cols []*table.Column, decodeColMap map[int64]Column) *RowDeco } } + cols := tbl.Cols() tps := make([]*types.FieldType, len(cols)) for _, col := range cols { tps[col.Offset] = &col.FieldType } return &RowDecoder{ + tbl: tbl, mutRow: chunk.MutRowFromTypes(tps), columns: decodeColMap, colTypes: colFieldMap, haveGenColumn: haveGenCol, + defaultVals: make([]types.Datum, len(cols)), } } // DecodeAndEvalRowWithMap decodes a byte slice into datums and evaluates the generated column value. -func (rd *RowDecoder) DecodeAndEvalRowWithMap(ctx sessionctx.Context, b []byte, decodeLoc, sysLoc *time.Location, row map[int64]types.Datum) (map[int64]types.Datum, error) { +func (rd *RowDecoder) DecodeAndEvalRowWithMap(ctx sessionctx.Context, handle int64, b []byte, decodeLoc, sysLoc *time.Location, row map[int64]types.Datum) (map[int64]types.Datum, error) { row, err := tablecodec.DecodeRowWithMap(b, rd.colTypes, decodeLoc, row) if err != nil { return nil, errors.Trace(err) @@ -80,8 +85,28 @@ func (rd *RowDecoder) DecodeAndEvalRowWithMap(ctx sessionctx.Context, b []byte, return row, nil } - for id, v := range row { - rd.mutRow.SetValue(rd.columns[id].Info.Offset, v.GetValue()) + for _, dCol := range rd.columns { + colInfo := dCol.Col.ColumnInfo + val, ok := row[colInfo.ID] + if ok || dCol.GenExpr != nil { + rd.mutRow.SetValue(colInfo.Offset, val.GetValue()) + continue + } + + // Get the default value of the column in the generated column expression. + if dCol.Col.IsPKHandleColumn(rd.tbl.Meta()) { + if mysql.HasUnsignedFlag(colInfo.Flag) { + val.SetUint64(uint64(handle)) + } else { + val.SetInt64(handle) + } + } else { + val, err = tables.GetColDefaultValue(ctx, dCol.Col, rd.defaultVals) + if err != nil { + return nil, errors.Trace(err) + } + } + rd.mutRow.SetValue(colInfo.Offset, val.GetValue()) } for id, col := range rd.columns { if col.GenExpr == nil { @@ -92,7 +117,7 @@ func (rd *RowDecoder) DecodeAndEvalRowWithMap(ctx sessionctx.Context, b []byte, if err != nil { return nil, errors.Trace(err) } - val, err = table.CastValue(ctx, val, col.Info) + val, err = table.CastValue(ctx, val, col.Col.ColumnInfo) if err != nil { return nil, errors.Trace(err) } @@ -139,12 +164,12 @@ func BuildFullDecodeColMap(indexedCols []*table.Column, t table.Table, genExprPr return nil, errors.Trace(err) } decodeColMap[col.ID] = Column{ - Info: col.ColumnInfo, + Col: col, GenExpr: e, } } else { decodeColMap[col.ID] = Column{ - Info: col.ColumnInfo, + Col: col, } } } @@ -161,7 +186,7 @@ func SubstituteGenColsInDecodeColMap(decodeColMap map[int64]Column) { } var orderedCols []Pair for colID, col := range decodeColMap { - orderedCols = append(orderedCols, Pair{colID, col.Info.Offset}) + orderedCols = append(orderedCols, Pair{colID, col.Col.Offset}) } sort.Slice(orderedCols, func(i, j int) bool { return orderedCols[i].colOffset < orderedCols[j].colOffset }) @@ -172,12 +197,12 @@ func SubstituteGenColsInDecodeColMap(decodeColMap map[int64]Column) { decCol := decodeColMap[colID] if decCol.GenExpr != nil { decodeColMap[colID] = Column{ - Info: decCol.Info, + Col: decCol.Col, GenExpr: substituteGeneratedColumn(decCol.GenExpr, decodeColMap), } } else { decodeColMap[colID] = Column{ - Info: decCol.Info, + Col: decCol.Col, } } } @@ -204,7 +229,7 @@ func substituteGeneratedColumn(expr expression.Expression, decodeColMap map[int6 // RemoveUnusedVirtualCols removes all virtual columns in decodeColMap that cannot found in indexedCols. func RemoveUnusedVirtualCols(decodeColMap map[int64]Column, indexedCols []*table.Column) { for colID, decCol := range decodeColMap { - col := decCol.Info + col := decCol.Col if !col.IsGenerated() || col.GeneratedStored { continue } diff --git a/util/testkit/testkit.go b/util/testkit/testkit.go index 68d5ca79c022..fa7d22ae0921 100644 --- a/util/testkit/testkit.go +++ b/util/testkit/testkit.go @@ -214,6 +214,15 @@ func (tk *TestKit) QueryToErr(sql string, args ...interface{}) error { return resErr } +// ExecToErr executes a sql statement and discard results. +func (tk *TestKit) ExecToErr(sql string, args ...interface{}) error { + res, err := tk.Exec(sql, args...) + if res != nil { + tk.c.Assert(res.Close(), check.IsNil) + } + return err +} + // ResultSetToResult converts sqlexec.RecordSet to testkit.Result. // It is used to check results of execute statement in binary mode. func (tk *TestKit) ResultSetToResult(rs sqlexec.RecordSet, comment check.CommentInterface) *Result {