Skip to content

Commit

Permalink
util/admin: support admin check index on partition table (#17183) (#1…
Browse files Browse the repository at this point in the history
  • Loading branch information
sre-bot committed Jun 1, 2020
1 parent 8e59b33 commit c6a93de
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 135 deletions.
26 changes: 26 additions & 0 deletions executor/admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,32 @@ func (s *testSuite1) TestAdminCheckIndexRange(c *C) {
result.Check(testkit.Rows("-1 hi 4", "2 cd 2"))
}

func (s *testSuite2) TestAdminCheckIndex(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
check := func() {
tk.MustExec("insert admin_test (c1, c2) values (1, 1), (2, 2), (5, 5), (10, 10), (11, 11), (NULL, NULL)")
tk.MustExec("admin check index admin_test c1")
tk.MustExec("admin check index admin_test c2")
}
tk.MustExec("drop table if exists admin_test")
tk.MustExec("create table admin_test (c1 int, c2 int, c3 int default 1, index (c1), unique key(c2))")
check()

// Test for hash partition table.
tk.MustExec("drop table if exists admin_test")
tk.MustExec("create table admin_test (c1 int, c2 int, c3 int default 1, index (c1), unique key(c2)) partition by hash(c2) partitions 5;")
check()

// Test for range partition table.
tk.MustExec("drop table if exists admin_test")
tk.MustExec(`create table admin_test (c1 int, c2 int, c3 int default 1, index (c1), unique key(c2)) PARTITION BY RANGE ( c2 ) (
PARTITION p0 VALUES LESS THAN (5),
PARTITION p1 VALUES LESS THAN (10),
PARTITION p2 VALUES LESS THAN (MAXVALUE))`)
check()
}

func (s *testSuite2) TestAdminRecoverIndex(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
Expand Down
23 changes: 1 addition & 22 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,6 @@ func (b *executorBuilder) build(p plannercore.Plan) Executor {
return b.buildChange(v)
case *plannercore.CheckTable:
return b.buildCheckTable(v)
case *plannercore.CheckIndex:
return b.buildCheckIndex(v)
case *plannercore.RecoverIndex:
return b.buildRecoverIndex(v)
case *plannercore.CleanupIndex:
Expand Down Expand Up @@ -304,26 +302,6 @@ func (b *executorBuilder) buildShowSlow(v *plannercore.ShowSlow) Executor {
return e
}

func (b *executorBuilder) buildCheckIndex(v *plannercore.CheckIndex) Executor {
readerExec, err := buildNoRangeIndexLookUpReader(b, v.IndexLookUpReader)
if err != nil {
b.err = err
return nil
}

buildIndexLookUpChecker(b, v.IndexLookUpReader, readerExec)

e := &CheckIndexExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
dbName: v.DBName,
tableName: readerExec.table.Meta().Name.L,
idxName: v.IdxName,
is: b.is,
src: readerExec,
}
return e
}

// buildIndexLookUpChecker builds check information to IndexLookUpReader.
func buildIndexLookUpChecker(b *executorBuilder, readerPlan *plannercore.PhysicalIndexLookUpReader,
readerExec *IndexLookUpExecutor) {
Expand Down Expand Up @@ -377,6 +355,7 @@ func (b *executorBuilder) buildCheckTable(v *plannercore.CheckTable) Executor {
srcs: readerExecs,
exitCh: make(chan struct{}),
retCh: make(chan error, len(readerExecs)),
checkIndex: v.CheckIndex,
}
return e
}
Expand Down
63 changes: 6 additions & 57 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ var (
_ Executor = &TableScanExec{}
_ Executor = &TopNExec{}
_ Executor = &UnionExec{}
_ Executor = &CheckIndexExec{}
_ Executor = &HashJoinExec{}
_ Executor = &IndexLookUpExecutor{}
_ Executor = &MergeJoinExec{}
Expand Down Expand Up @@ -481,6 +480,7 @@ type CheckTableExec struct {
is infoschema.InfoSchema
exitCh chan struct{}
retCh chan error
checkIndex bool
}

// Open implements the Executor Open interface.
Expand Down Expand Up @@ -568,6 +568,10 @@ func (e *CheckTableExec) Next(ctx context.Context, req *chunk.Chunk) error {
}
greater, idxOffset, err := admin.CheckIndicesCount(e.ctx, e.dbName, e.table.Meta().Name.O, idxNames)
if err != nil {
// For admin check index statement, for speed up and compatibility, doesn't do below checks.
if e.checkIndex {
return errors.Trace(err)
}
if greater == admin.IdxCntGreater {
err = e.checkTableIndexHandle(ctx, e.indexInfos[idxOffset])
} else if greater == admin.TblCntGreater {
Expand All @@ -590,7 +594,7 @@ func (e *CheckTableExec) Next(ctx context.Context, req *chunk.Chunk) error {
util.WithRecovery(func() {
err1 := e.checkIndexHandle(ctx, e.srcs[num])
if err1 != nil {
logutil.Logger(ctx).Info("check index handle failed", zap.Error(err))
logutil.Logger(ctx).Info("check index handle failed", zap.Error(err1))
}
}, e.handlePanic)
}(i)
Expand Down Expand Up @@ -634,61 +638,6 @@ func (e *CheckTableExec) checkTableRecord(idxOffset int) error {
return nil
}

// CheckIndexExec represents the executor of checking an index.
// It is built from the "admin check index" statement, and it checks
// the consistency of the index data with the records of the table.
type CheckIndexExec struct {
baseExecutor

dbName string
tableName string
idxName string
src *IndexLookUpExecutor
done bool
is infoschema.InfoSchema
}

// Open implements the Executor Open interface.
func (e *CheckIndexExec) Open(ctx context.Context) error {
if err := e.baseExecutor.Open(ctx); err != nil {
return err
}
if err := e.src.Open(ctx); err != nil {
return err
}
e.done = false
return nil
}

// Close implements the Executor Close interface.
func (e *CheckIndexExec) Close() error {
return e.src.Close()
}

// Next implements the Executor Next interface.
func (e *CheckIndexExec) Next(ctx context.Context, req *chunk.Chunk) error {
if e.done {
return nil
}
defer func() { e.done = true }()

_, _, err := admin.CheckIndicesCount(e.ctx, e.dbName, e.tableName, []string{e.idxName})
if err != nil {
return err
}
chk := newFirstChunk(e.src)
for {
err := Next(ctx, e.src, chk)
if err != nil {
return err
}
if chk.NumRows() == 0 {
break
}
}
return nil
}

// ShowSlowExec represents the executor of showing the slow queries.
// It is build from the "admin show slow" statement:
// admin show slow top [internal | all] N
Expand Down
10 changes: 1 addition & 9 deletions planner/core/common_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ type CheckTable struct {
Table table.Table
IndexInfos []*model.IndexInfo
IndexLookUpReaders []*PhysicalIndexLookUpReader
CheckIndex bool
}

// RecoverIndex is used for backfilling corrupted index data.
Expand All @@ -98,15 +99,6 @@ type CleanupIndex struct {
IndexName string
}

// CheckIndex is used for checking index data, built from the 'admin check index' statement.
type CheckIndex struct {
baseSchemaProducer

IndexLookUpReader *PhysicalIndexLookUpReader
DBName string
IdxName string
}

// CheckIndexRange is used for checking index data, output the index values that handle within begin and end.
type CheckIndexRange struct {
baseSchemaProducer
Expand Down
105 changes: 58 additions & 47 deletions planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -605,53 +605,15 @@ func (b *PlanBuilder) buildPrepare(x *ast.PrepareStmt) Plan {
return p
}

func (b *PlanBuilder) buildCheckIndex(ctx context.Context, dbName model.CIStr, as *ast.AdminStmt) (Plan, error) {
tblName := as.Tables[0]
tbl, err := b.is.TableByName(dbName, tblName.Name)
if err != nil {
return nil, err
}
tblInfo := tbl.Meta()

// get index information
var idx *model.IndexInfo
for _, index := range tblInfo.Indices {
if index.Name.L == strings.ToLower(as.Index) {
idx = index
break
}
}
if idx == nil {
return nil, errors.Errorf("index %s do not exist", as.Index)
}
if idx.State != model.StatePublic {
return nil, errors.Errorf("index %s state %s isn't public", as.Index, idx.State)
}

return b.buildPhysicalIndexLookUpReader(ctx, dbName, tbl, idx)
}

func (b *PlanBuilder) buildAdmin(ctx context.Context, as *ast.AdminStmt) (Plan, error) {
var ret Plan
var err error
switch as.Tp {
case ast.AdminCheckTable:
case ast.AdminCheckTable, ast.AdminCheckIndex:
ret, err = b.buildAdminCheckTable(ctx, as)
if err != nil {
return ret, err
}
case ast.AdminCheckIndex:
dbName := as.Tables[0].Schema
readerPlan, err := b.buildCheckIndex(ctx, dbName, as)
if err != nil {
return ret, err
}

ret = &CheckIndex{
DBName: dbName.L,
IdxName: as.Index,
IndexLookUpReader: readerPlan.(*PhysicalIndexLookUpReader),
}
case ast.AdminRecoverIndex:
p := &RecoverIndex{Table: as.Tables[0], IndexName: as.Index}
p.SetSchema(buildRecoverIndexFields())
Expand Down Expand Up @@ -857,12 +819,12 @@ func (b *PlanBuilder) buildPhysicalIndexLookUpReader(ctx context.Context, dbName
return rootT.p, nil
}

func (b *PlanBuilder) buildPhysicalIndexLookUpReaders(ctx context.Context, dbName model.CIStr, tbl table.Table) ([]Plan, []*model.IndexInfo, error) {
func (b *PlanBuilder) buildPhysicalIndexLookUpReaders(ctx context.Context, dbName model.CIStr, tbl table.Table, indices []table.Index) ([]Plan, []*model.IndexInfo, error) {
tblInfo := tbl.Meta()
// get index information
indexInfos := make([]*model.IndexInfo, 0, len(tblInfo.Indices))
indexLookUpReaders := make([]Plan, 0, len(tblInfo.Indices))
for _, idx := range tbl.Indices() {
for _, idx := range indices {
idxInfo := idx.Meta()
if idxInfo.State != model.StatePublic {
logutil.Logger(context.Background()).Info("build physical index lookup reader, the index isn't public",
Expand Down Expand Up @@ -896,17 +858,40 @@ func (b *PlanBuilder) buildPhysicalIndexLookUpReaders(ctx context.Context, dbNam
}

func (b *PlanBuilder) buildAdminCheckTable(ctx context.Context, as *ast.AdminStmt) (*CheckTable, error) {
tbl := as.Tables[0]
tblName := as.Tables[0]
tableInfo := as.Tables[0].TableInfo
table, ok := b.is.TableByID(tableInfo.ID)
tbl, ok := b.is.TableByID(tableInfo.ID)
if !ok {
return nil, infoschema.ErrTableNotExists.GenWithStackByArgs(tbl.DBInfo.Name.O, tableInfo.Name.O)
return nil, infoschema.ErrTableNotExists.GenWithStackByArgs(tblName.DBInfo.Name.O, tableInfo.Name.O)
}
p := &CheckTable{
DBName: tbl.Schema.O,
Table: table,
DBName: tblName.Schema.O,
Table: tbl,
}
var readerPlans []Plan
var indexInfos []*model.IndexInfo
var err error
if as.Tp == ast.AdminCheckIndex {
// get index information
var idx table.Index
idxName := strings.ToLower(as.Index)
for _, index := range tbl.Indices() {
if index.Meta().Name.L == idxName {
idx = index
break
}
}
if idx == nil {
return nil, errors.Errorf("index %s do not exist", as.Index)
}
if idx.Meta().State != model.StatePublic {
return nil, errors.Errorf("index %s state %s isn't public", as.Index, idx.Meta().State)
}
p.CheckIndex = true
readerPlans, indexInfos, err = b.buildPhysicalIndexLookUpReaders(ctx, tblName.Schema, tbl, []table.Index{idx})
} else {
readerPlans, indexInfos, err = b.buildPhysicalIndexLookUpReaders(ctx, tblName.Schema, tbl, tbl.Indices())
}
readerPlans, indexInfos, err := b.buildPhysicalIndexLookUpReaders(ctx, tbl.Schema, table)
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -919,6 +904,32 @@ func (b *PlanBuilder) buildAdminCheckTable(ctx context.Context, as *ast.AdminStm
return p, nil
}

func (b *PlanBuilder) buildCheckIndex(ctx context.Context, dbName model.CIStr, as *ast.AdminStmt) (Plan, error) {
tblName := as.Tables[0]
tbl, err := b.is.TableByName(dbName, tblName.Name)
if err != nil {
return nil, err
}
tblInfo := tbl.Meta()

// get index information
var idx *model.IndexInfo
for _, index := range tblInfo.Indices {
if index.Name.L == strings.ToLower(as.Index) {
idx = index
break
}
}
if idx == nil {
return nil, errors.Errorf("index %s do not exist", as.Index)
}
if idx.State != model.StatePublic {
return nil, errors.Errorf("index %s state %s isn't public", as.Index, idx.State)
}

return b.buildPhysicalIndexLookUpReader(ctx, dbName, tbl, idx)
}

func (b *PlanBuilder) buildCheckIndexSchema(tn *ast.TableName, indexName string) (*expression.Schema, error) {
schema := expression.NewSchema()
indexName = strings.ToLower(indexName)
Expand Down

0 comments on commit c6a93de

Please sign in to comment.