Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: index refactor for table partition #7062

Merged
merged 6 commits into from Jul 17, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions ddl/db_test.go
Expand Up @@ -485,7 +485,7 @@ LOOP:
}

ctx := s.s.(sessionctx.Context)
idx := tables.NewIndex(t.Meta(), c3IdxInfo)
idx := tables.NewIndex(t.Meta().ID, c3IdxInfo)
checkDelRangeDone(c, ctx, idx)

s.mustExec(c, "drop table t1")
Expand Down Expand Up @@ -795,7 +795,7 @@ LOOP:
}
c.Assert(nidx, IsNil)

idx := tables.NewIndex(t.Meta(), c3idx.Meta())
idx := tables.NewIndex(t.Meta().ID, c3idx.Meta())
checkDelRangeDone(c, ctx, idx)
s.tk.MustExec("drop table test_drop_index")
}
Expand Down
2 changes: 1 addition & 1 deletion ddl/index.go
Expand Up @@ -487,7 +487,7 @@ type addIndexResult struct {
}

func newAddIndexWorker(sessCtx sessionctx.Context, worker *worker, id int, t table.Table, indexInfo *model.IndexInfo, colFieldMap map[int64]*types.FieldType) *addIndexWorker {
index := tables.NewIndex(t.Meta(), indexInfo)
index := tables.NewIndex(t.Meta().ID, indexInfo)
return &addIndexWorker{
id: id,
ddlWorker: worker,
Expand Down
14 changes: 7 additions & 7 deletions executor/admin_test.go
Expand Up @@ -88,7 +88,7 @@ func (s *testSuite) TestAdminRecoverIndex(c *C) {

tblInfo := tbl.Meta()
idxInfo := findIndexByName("c2", tblInfo.Indices)
indexOpr := tables.NewIndex(tblInfo, idxInfo)
indexOpr := tables.NewIndex(tblInfo.ID, idxInfo)
sc := s.ctx.GetSessionVars().StmtCtx
txn, err := s.store.Begin()
c.Assert(err, IsNil)
Expand Down Expand Up @@ -184,7 +184,7 @@ func (s *testSuite) TestAdminRecoverIndex1(c *C) {
tblInfo := tbl.Meta()
idxInfo := findIndexByName("primary", tblInfo.Indices)
c.Assert(idxInfo, NotNil)
indexOpr := tables.NewIndex(tblInfo, idxInfo)
indexOpr := tables.NewIndex(tblInfo.ID, idxInfo)

txn, err := s.store.Begin()
c.Assert(err, IsNil)
Expand Down Expand Up @@ -240,9 +240,9 @@ func (s *testSuite) TestAdminCleanupIndex(c *C) {

tblInfo := tbl.Meta()
idxInfo2 := findIndexByName("c2", tblInfo.Indices)
indexOpr2 := tables.NewIndex(tblInfo, idxInfo2)
indexOpr2 := tables.NewIndex(tblInfo.ID, idxInfo2)
idxInfo3 := findIndexByName("c3", tblInfo.Indices)
indexOpr3 := tables.NewIndex(tblInfo, idxInfo3)
indexOpr3 := tables.NewIndex(tblInfo.ID, idxInfo3)

txn, err := s.store.Begin()
c.Assert(err, IsNil)
Expand Down Expand Up @@ -307,7 +307,7 @@ func (s *testSuite) TestAdminCleanupIndexPKNotHandle(c *C) {

tblInfo := tbl.Meta()
idxInfo := findIndexByName("primary", tblInfo.Indices)
indexOpr := tables.NewIndex(tblInfo, idxInfo)
indexOpr := tables.NewIndex(tblInfo.ID, idxInfo)

txn, err := s.store.Begin()
c.Assert(err, IsNil)
Expand Down Expand Up @@ -355,9 +355,9 @@ func (s *testSuite) TestAdminCleanupIndexMore(c *C) {

tblInfo := tbl.Meta()
idxInfo1 := findIndexByName("c1", tblInfo.Indices)
indexOpr1 := tables.NewIndex(tblInfo, idxInfo1)
indexOpr1 := tables.NewIndex(tblInfo.ID, idxInfo1)
idxInfo2 := findIndexByName("c2", tblInfo.Indices)
indexOpr2 := tables.NewIndex(tblInfo, idxInfo2)
indexOpr2 := tables.NewIndex(tblInfo.ID, idxInfo2)

txn, err := s.store.Begin()
c.Assert(err, IsNil)
Expand Down
3 changes: 3 additions & 0 deletions executor/builder.go
Expand Up @@ -1531,6 +1531,9 @@ func buildNoRangeIndexReader(b *executorBuilder, v *plan.PhysicalIndexReader) (*
colLens: is.IdxColLens,
plans: v.IndexPlans,
}
if isPartition, partitionID := is.IsPartition(); isPartition {
e.tableID = partitionID
}
if containsLimit(dagReq.Executors) {
e.feedback = statistics.NewQueryFeedback(0, nil, 0, is.Desc)
} else {
Expand Down
4 changes: 4 additions & 0 deletions plan/find_best_task.go
Expand Up @@ -312,6 +312,8 @@ func (ds *DataSource) forceToIndexScan(idx *model.IndexInfo, remainedConds []exp
dataSourceSchema: ds.schema,
Ranges: ranger.FullRange(),
KeepOrder: false,
isPartition: ds.isPartition,
partitionID: ds.partitionID,
}.init(ds.ctx)
is.filterCondition = remainedConds
is.stats = newSimpleStats(float64(ds.statisticTable.Count))
Expand Down Expand Up @@ -359,6 +361,8 @@ func (ds *DataSource) convertToIndexScan(prop *requiredProp, path *accessPath) (
Ranges: path.ranges,
filterCondition: path.indexFilters,
dataSourceSchema: ds.schema,
isPartition: ds.isPartition,
partitionID: ds.partitionID,
}.init(ds.ctx)
statsTbl := ds.statisticTable
if statsTbl.Indices[idx.ID] != nil {
Expand Down
9 changes: 9 additions & 0 deletions plan/physical_plans.go
Expand Up @@ -113,6 +113,10 @@ type PhysicalIndexScan struct {
// Hist is the histogram when the query was issued.
// It is used for query feedback.
Hist *statistics.Histogram

// The index scan may be on a partition.
isPartition bool
partitionID int64
}

// PhysicalMemTable reads memory table.
Expand Down Expand Up @@ -334,6 +338,11 @@ type PhysicalUnionScan struct {
Conditions []expression.Expression
}

// IsPartition returns true and partition ID if it works on a partition.
func (p *PhysicalIndexScan) IsPartition() (bool, int64) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about s/IsPartition/GetPartitionID/

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think IsPartition maybe better.

succ, _ :=  IsPartition()
if succ {
    doSomething()
}

It's also flexible when we need the partition id:

if succ, pid := IsPartition(); succ {
    doSomthingWith(pid)
}

GetPartitionID looks unnatural if we define:

GetPartitionID(pid int64, bool) // wow, it maybe not a partition

If we use partition id == 0 to represent it's not a partition:

GetPartitionID(pid int64)

This code introduce an implicit assumption with the caller.

return p.isPartition, p.partitionID
}

// IsPointGetByUniqueKey checks whether is a point get by unique key.
func (p *PhysicalIndexScan) IsPointGetByUniqueKey(sc *stmtctx.StatementContext) bool {
return len(p.Ranges) == 1 &&
Expand Down
5 changes: 5 additions & 0 deletions plan/rule_partition_processor.go
Expand Up @@ -105,6 +105,11 @@ func (s *partitionProcessor) prune(ds *DataSource) (LogicalPlan, error) {
newDataSource.baseLogicalPlan = newBaseLogicalPlan(ds.context(), TypeTableScan, &newDataSource)
newDataSource.isPartition = true
newDataSource.partitionID = pi.Definitions[i].ID
// There are many expression nodes in the plan tree use the original datasource
// id as FromID. So we set the id of the newDataSource with the original one to
// avoid traversing the whole plan tree to update the references.
newDataSource.id = ds.id

children = append(children, &newDataSource)
}
if len(children) == 0 {
Expand Down
8 changes: 4 additions & 4 deletions table/tables/index.go
Expand Up @@ -97,17 +97,16 @@ func (c *indexIter) Next() (val []types.Datum, h int64, err error) {

// index is the data structure for index data in the KV store.
type index struct {
tblInfo *model.TableInfo
idxInfo *model.IndexInfo
prefix kv.Key
}

// NewIndex builds a new Index object.
func NewIndex(tableInfo *model.TableInfo, indexInfo *model.IndexInfo) table.Index {
// id may be partition or table ID, depends on whether the table is a PartitionedTable.
func NewIndex(id int64, indexInfo *model.IndexInfo) table.Index {
index := &index{
tblInfo: tableInfo,
idxInfo: indexInfo,
prefix: tablecodec.EncodeTableIndexPrefix(tableInfo.ID, indexInfo.ID),
prefix: tablecodec.EncodeTableIndexPrefix(id, indexInfo.ID),
}
return index
}
Expand Down Expand Up @@ -242,6 +241,7 @@ func (c *index) Seek(sc *stmtctx.StatementContext, r kv.Retriever, indexedValues
if err != nil {
return nil, false, errors.Trace(err)
}

it, err := r.Seek(key)
if err != nil {
return nil, false, errors.Trace(err)
Expand Down
8 changes: 4 additions & 4 deletions table/tables/index_test.go
Expand Up @@ -63,7 +63,7 @@ func (s *testIndexSuite) TestIndex(c *C) {
},
},
}
index := tables.NewIndex(tblInfo, tblInfo.Indices[0])
index := tables.NewIndex(tblInfo.ID, tblInfo.Indices[0])

// Test ununiq index.
txn, err := s.s.Begin()
Expand Down Expand Up @@ -148,7 +148,7 @@ func (s *testIndexSuite) TestIndex(c *C) {
},
},
}
index = tables.NewIndex(tblInfo, tblInfo.Indices[0])
index = tables.NewIndex(tblInfo.ID, tblInfo.Indices[0])

// Test uniq index.
txn, err = s.s.Begin()
Expand Down Expand Up @@ -216,7 +216,7 @@ func (s *testIndexSuite) TestCombineIndexSeek(c *C) {
},
},
}
index := tables.NewIndex(tblInfo, tblInfo.Indices[0])
index := tables.NewIndex(tblInfo.ID, tblInfo.Indices[0])

txn, err := s.s.Begin()
c.Assert(err, IsNil)
Expand All @@ -226,7 +226,7 @@ func (s *testIndexSuite) TestCombineIndexSeek(c *C) {
_, err = index.Create(mockCtx, txn, values, 1)
c.Assert(err, IsNil)

index2 := tables.NewIndex(tblInfo, tblInfo.Indices[0])
index2 := tables.NewIndex(tblInfo.ID, tblInfo.Indices[0])
sc := &stmtctx.StatementContext{TimeZone: time.Local}
iter, hit, err := index2.Seek(sc, txn, types.MakeDatums("abc", nil))
c.Assert(err, IsNil)
Expand Down
33 changes: 26 additions & 7 deletions table/tables/partition.go
Expand Up @@ -53,6 +53,31 @@ type Partition struct {
type PartitionedTable struct {
Table
partitionExpr *PartitionExpr
partitions map[int64]*Partition
}

func newPartitionedTable(tbl *Table, tblInfo *model.TableInfo) (table.Table, error) {
partitionExpr, err := generatePartitionExpr(tblInfo)
if err != nil {
return nil, errors.Trace(err)
}

partitions := make(map[int64]*Partition)
pi := tblInfo.GetPartitionInfo()
for _, p := range pi.Definitions {
var t Partition
err = initTableCommonWithIndices(&t.tableCommon, tblInfo, p.ID, tbl.Columns, tbl.alloc)
if err != nil {
return nil, errors.Trace(err)
}
partitions[p.ID] = &t
}

return &PartitionedTable{
Table: *tbl,
partitionExpr: partitionExpr,
partitions: partitions,
}, nil
}

// PartitionExpr is the partition definition expressions.
Expand Down Expand Up @@ -146,13 +171,7 @@ func (t *PartitionedTable) locatePartition(ctx sessionctx.Context, pi *model.Par

// GetPartition returns a Table, which is actually a Partition.
func (t *PartitionedTable) GetPartition(pid int64) table.Table {
var ret Partition
// Make a shallow copy, change ID to partition ID.
ret.tableCommon = t.tableCommon
ret.partitionID = pid
ret.recordPrefix = tablecodec.GenTableRecordPrefix(pid)
ret.indexPrefix = tablecodec.GenTableIndexPrefix(pid)
return &ret
return t.partitions[pid]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why change here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If GetPartition function create a Partition and Index everytime it's called, it's bad for performance.
So I store those information when PartitionedTable is created, and just get it here.

}

// AddRecord implements the AddRecord method for the table.Table interface.
Expand Down
73 changes: 39 additions & 34 deletions table/tables/tables.go
Expand Up @@ -72,27 +72,27 @@ type Table struct {
var _ table.Table = &Table{}

// MockTableFromMeta only serves for test.
func MockTableFromMeta(tableInfo *model.TableInfo) table.Table {
columns := make([]*table.Column, 0, len(tableInfo.Columns))
for _, colInfo := range tableInfo.Columns {
func MockTableFromMeta(tblInfo *model.TableInfo) table.Table {
columns := make([]*table.Column, 0, len(tblInfo.Columns))
for _, colInfo := range tblInfo.Columns {
col := table.ToColumn(colInfo)
columns = append(columns, col)
}

var t Table
initTableCommon(&t.tableCommon, tableInfo.ID, tableInfo.ID, columns, nil)
t.meta = tableInfo
if tableInfo.GetPartitionInfo() == nil {
initTableCommon(&t.tableCommon, tblInfo, tblInfo.ID, columns, nil)
if tblInfo.GetPartitionInfo() == nil {
if err := initTableIndices(&t.tableCommon); err != nil {
return nil
}
return &t
}

partitionExpr, err := generatePartitionExpr(tableInfo)
ret, err := newPartitionedTable(&t, tblInfo)
if err != nil {
return nil
}
return &PartitionedTable{
Table: t,
partitionExpr: partitionExpr,
}
return ret
}

// TableFromMeta creates a Table instance from model.TableInfo.
Expand Down Expand Up @@ -129,44 +129,49 @@ func TableFromMeta(alloc autoid.Allocator, tblInfo *model.TableInfo) (table.Tabl
}

var t Table
initTableCommon(&t.tableCommon, tblInfo.ID, tblInfo.ID, columns, alloc)

for _, idxInfo := range tblInfo.Indices {
if idxInfo.State == model.StateNone {
return nil, table.ErrIndexStateCantNone.Gen("index %s can't be in none state", idxInfo.Name)
}

idx := NewIndex(tblInfo, idxInfo)
t.indices = append(t.indices, idx)
}

t.meta = tblInfo
initTableCommon(&t.tableCommon, tblInfo, tblInfo.ID, columns, alloc)
if tblInfo.GetPartitionInfo() == nil {
if err := initTableIndices(&t.tableCommon); err != nil {
return nil, errors.Trace(err)
}
return &t, nil
}

partitionExpr, err := generatePartitionExpr(tblInfo)
if err != nil {
return nil, errors.Trace(err)
}
return &PartitionedTable{
Table: t,
partitionExpr: partitionExpr,
}, nil
return newPartitionedTable(&t, tblInfo)
}

// initTableCommon initializes a tableCommon struct.
func initTableCommon(t *tableCommon, tableID, partitionID int64, cols []*table.Column, alloc autoid.Allocator) {
t.tableID = tableID
func initTableCommon(t *tableCommon, tblInfo *model.TableInfo, partitionID int64, cols []*table.Column, alloc autoid.Allocator) {
t.tableID = tblInfo.ID
t.partitionID = partitionID
t.alloc = alloc
t.meta = tblInfo
t.Columns = cols
t.publicColumns = t.Cols()
t.writableColumns = t.WritableCols()
t.writableIndices = t.WritableIndices()
t.recordPrefix = tablecodec.GenTableRecordPrefix(partitionID)
t.indexPrefix = tablecodec.GenTableIndexPrefix(partitionID)
return
}

// initTableIndices initializes the indices of the tableCommon.
func initTableIndices(t *tableCommon) error {
tblInfo := t.meta
for _, idxInfo := range tblInfo.Indices {
if idxInfo.State == model.StateNone {
return table.ErrIndexStateCantNone.Gen("index %s can't be in none state", idxInfo.Name)
}

// Use partition ID for index, because tableCommon may be table or partition.
idx := NewIndex(t.partitionID, idxInfo)
t.indices = append(t.indices, idx)
}
return nil
}

func initTableCommonWithIndices(t *tableCommon, tblInfo *model.TableInfo, partitionID int64, cols []*table.Column, alloc autoid.Allocator) error {
initTableCommon(t, tblInfo, partitionID, cols, alloc)
return errors.Trace(initTableIndices(t))
}

// Indices implements table.Table Indices interface.
Expand Down