Skip to content

Commit

Permalink
Merge branch 'release-5.1' into dev-fix-compress-maxrecv
Browse files Browse the repository at this point in the history
  • Loading branch information
lysu committed Jun 18, 2021
2 parents 04af474 + 38175b9 commit 88f6421
Show file tree
Hide file tree
Showing 25 changed files with 661 additions and 99 deletions.
2 changes: 1 addition & 1 deletion executor/batch_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func getKeysNeedCheckOneRow(ctx sessionctx.Context, t table.Table, row []types.D
if p, ok := t.(table.PartitionedTable); ok {
t, err = p.GetPartitionByRow(ctx, row)
if err != nil {
if terr, ok := errors.Cause(err).(*terror.Error); ctx.GetSessionVars().StmtCtx.IgnoreNoPartition && ok && terr.Code() == errno.ErrNoPartitionForGivenValue {
if terr, ok := errors.Cause(err).(*terror.Error); ctx.GetSessionVars().StmtCtx.IgnoreNoPartition && ok && (terr.Code() == errno.ErrNoPartitionForGivenValue || terr.Code() == errno.ErrRowDoesNotMatchGivenPartitionSet) {
ctx.GetSessionVars().StmtCtx.AppendWarning(err)
result = append(result, toBeCheckedRow{ignored: true})
return result, nil
Expand Down
37 changes: 37 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -621,6 +621,16 @@ func (b *executorBuilder) buildSelectLock(v *plannercore.PhysicalLock) Executor
tblID2Handle: v.TblID2Handle,
partitionedTable: v.PartitionedTable,
}
if len(e.partitionedTable) > 0 {
schema := v.Schema()
e.tblID2PIDColumnIndex = make(map[int64]int)
for i := 0; i < len(v.ExtraPIDInfo.Columns); i++ {
col := v.ExtraPIDInfo.Columns[i]
tblID := v.ExtraPIDInfo.TblIDs[i]
offset := schema.ColumnIndex(col)
e.tblID2PIDColumnIndex[tblID] = offset
}
}
return e
}

Expand Down Expand Up @@ -2673,6 +2683,9 @@ func buildNoRangeTableReader(b *executorBuilder, v *plannercore.PhysicalTableRea
storeType: v.StoreType,
batchCop: v.BatchCop,
}
if tbl.Meta().Partition != nil {
e.extraPIDColumnIndex = extraPIDColumnIndex(v.Schema())
}
e.buildVirtualColumnInfo()
if containsLimit(dagReq.Executors) {
e.feedback = statistics.NewQueryFeedback(0, nil, 0, ts.Desc)
Expand Down Expand Up @@ -2703,6 +2716,15 @@ func buildNoRangeTableReader(b *executorBuilder, v *plannercore.PhysicalTableRea
return e, nil
}

func extraPIDColumnIndex(schema *expression.Schema) offsetOptional {
for idx, col := range schema.Columns {
if col.ID == model.ExtraPidColID {
return newOffset(idx)
}
}
return 0
}

func (b *executorBuilder) buildMPPGather(v *plannercore.PhysicalTableReader) Executor {
startTs, err := b.getSnapshotTS()
if err != nil {
Expand Down Expand Up @@ -2748,6 +2770,10 @@ func (b *executorBuilder) buildTableReader(v *plannercore.PhysicalTableReader) E
if !b.ctx.GetSessionVars().UseDynamicPartitionPrune() {
return ret
}
// When isPartition is set, it means the union rewriting is done, so a partition reader is prefered.
if ok, _ := ts.IsPartition(); ok {
return ret
}

pi := ts.Table.GetPartitionInfo()
if pi == nil {
Expand Down Expand Up @@ -2966,6 +2992,10 @@ func (b *executorBuilder) buildIndexReader(v *plannercore.PhysicalIndexReader) E
if !b.ctx.GetSessionVars().UseDynamicPartitionPrune() {
return ret
}
// When isPartition is set, it means the union rewriting is done, so a partition reader is prefered.
if ok, _ := is.IsPartition(); ok {
return ret
}

pi := is.Table.GetPartitionInfo()
if pi == nil {
Expand Down Expand Up @@ -3067,6 +3097,9 @@ func buildNoRangeIndexLookUpReader(b *executorBuilder, v *plannercore.PhysicalIn
tblPlans: v.TablePlans,
PushedLimit: v.PushedLimit,
}
if ok, _ := ts.IsPartition(); ok {
e.extraPIDColumnIndex = extraPIDColumnIndex(v.Schema())
}

if containsLimit(indexReq.Executors) {
e.feedback = statistics.NewQueryFeedback(0, nil, 0, is.Desc)
Expand Down Expand Up @@ -3126,6 +3159,10 @@ func (b *executorBuilder) buildIndexLookUpReader(v *plannercore.PhysicalIndexLoo
if is.Index.Global {
return ret
}
if ok, _ := is.IsPartition(); ok {
// Already pruned when translated to logical union.
return ret
}

tmp, _ := b.is.TableByID(is.Table.ID)
tbl := tmp.(table.PartitionedTable)
Expand Down
22 changes: 13 additions & 9 deletions executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,9 @@ type IndexLookUpExecutor struct {
PushedLimit *plannercore.PushedDownLimit

stats *IndexLookUpRunTimeStats

// extraPIDColumnIndex is used for partition reader to add an extra partition ID column, default -1
extraPIDColumnIndex offsetOptional
}

type getHandleType int8
Expand Down Expand Up @@ -638,15 +641,16 @@ func (e *IndexLookUpExecutor) buildTableReader(ctx context.Context, task *lookup
table = task.partitionTable
}
tableReaderExec := &TableReaderExecutor{
baseExecutor: newBaseExecutor(e.ctx, e.schema, e.getTableRootPlanID()),
table: table,
dagPB: e.tableRequest,
startTS: e.startTS,
columns: e.columns,
streaming: e.tableStreaming,
feedback: statistics.NewQueryFeedback(0, nil, 0, false),
corColInFilter: e.corColInTblSide,
plans: e.tblPlans,
baseExecutor: newBaseExecutor(e.ctx, e.schema, e.getTableRootPlanID()),
table: table,
dagPB: e.tableRequest,
startTS: e.startTS,
columns: e.columns,
streaming: e.tableStreaming,
feedback: statistics.NewQueryFeedback(0, nil, 0, false),
corColInFilter: e.corColInTblSide,
plans: e.tblPlans,
extraPIDColumnIndex: e.extraPIDColumnIndex,
}
tableReaderExec.buildVirtualColumnInfo()
tableReader, err := e.dataReaderBuilder.buildTableReaderFromHandles(ctx, tableReaderExec, task.handles, true)
Expand Down
42 changes: 16 additions & 26 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -890,31 +890,22 @@ type SelectLockExec struct {
Lock *ast.SelectLockInfo
keys []kv.Key

tblID2Handle map[int64][]plannercore.HandleCols
tblID2Handle map[int64][]plannercore.HandleCols

// All the partition tables in the children of this executor.
partitionedTable []table.PartitionedTable

// tblID2Table is cached to reduce cost.
tblID2Table map[int64]table.PartitionedTable
// When SelectLock work on the partition table, we need the partition ID
// instead of table ID to calculate the lock KV. In that case, partition ID is store as an
// extra column in the chunk row.
// tblID2PIDColumnIndex stores the column index in the chunk row. The children may be join
// of multiple tables, so the map struct is used.
tblID2PIDColumnIndex map[int64]int
}

// Open implements the Executor Open interface.
func (e *SelectLockExec) Open(ctx context.Context) error {
if err := e.baseExecutor.Open(ctx); err != nil {
return err
}

if len(e.tblID2Handle) > 0 && len(e.partitionedTable) > 0 {
e.tblID2Table = make(map[int64]table.PartitionedTable, len(e.partitionedTable))
for id := range e.tblID2Handle {
for _, p := range e.partitionedTable {
if id == p.Meta().ID {
e.tblID2Table[id] = p
}
}
}
}

return nil
return e.baseExecutor.Open(ctx)
}

// Next implements the Executor Next interface.
Expand All @@ -932,15 +923,14 @@ func (e *SelectLockExec) Next(ctx context.Context, req *chunk.Chunk) error {
if req.NumRows() > 0 {
iter := chunk.NewIterator4Chunk(req)
for row := iter.Begin(); row != iter.End(); row = iter.Next() {

for id, cols := range e.tblID2Handle {
physicalID := id
if pt, ok := e.tblID2Table[id]; ok {
// On a partitioned table, we have to use physical ID to encode the lock key!
p, err := pt.GetPartitionByRow(e.ctx, row.GetDatumRow(e.base().retFieldTypes))
if err != nil {
return err
}
physicalID = p.GetPhysicalID()
if len(e.partitionedTable) > 0 {
// Replace the table ID with partition ID.
// The partition ID is returned as an extra column from the table reader.
offset := e.tblID2PIDColumnIndex[id]
physicalID = row.GetInt64(offset)
}

for _, col := range cols {
Expand Down
6 changes: 6 additions & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,7 @@ func (s *testSuiteP1) TestShow(c *C) {
"Usage Server Admin No privileges - allow connect only",
"BACKUP_ADMIN Server Admin ",
"RESTORE_ADMIN Server Admin ",
"SYSTEM_USER Server Admin ",
"SYSTEM_VARIABLES_ADMIN Server Admin ",
"ROLE_ADMIN Server Admin ",
"CONNECTION_ADMIN Server Admin ",
Expand Down Expand Up @@ -7590,6 +7591,11 @@ func issue20975PreparePartitionTable(c *C, store kv.Storage) (*testkit.TestKit,

func (s *testSuite) TestIssue20975UpdateNoChangeWithPartitionTable(c *C) {
tk1, tk2 := issue20975PreparePartitionTable(c, s.store)

// Set projection concurrency to avoid data race here.
// TODO: remove this line after fixing https://github.com/pingcap/tidb/issues/25496
tk1.Se.GetSessionVars().Concurrency.SetProjectionConcurrency(0)

tk1.MustExec("begin pessimistic")
tk1.MustExec("update t1 set c=c")
tk2.MustExec("create table t2(a int)")
Expand Down
Loading

0 comments on commit 88f6421

Please sign in to comment.