Skip to content

Commit

Permalink
Allocate continuous row id for single INSERT statement (#13648)
Browse files Browse the repository at this point in the history
  • Loading branch information
Little-Wallace authored and sre-bot committed Nov 22, 2019
1 parent 2418958 commit 71e19a7
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 9 deletions.
39 changes: 39 additions & 0 deletions executor/insert_test.go
Expand Up @@ -15,7 +15,9 @@ package executor_test

import (
"fmt"
"strconv"
"strings"
"sync"

. "github.com/pingcap/check"
"github.com/pingcap/parser/terror"
Expand Down Expand Up @@ -763,6 +765,43 @@ func (s *testSuite3) TestBit(c *C) {

}

func (s *testSuite3) TestAllocateContinuousRowID(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec(`use test`)
tk.MustExec(`create table t1 (a int,b int, key I_a(a));`)
wg := sync.WaitGroup{}
for i := 0; i < 5; i++ {
wg.Add(1)
go func(idx int) {
defer wg.Done()
tk := testkit.NewTestKitWithInit(c, s.store)
for j := 0; j < 10; j++ {
k := strconv.Itoa(idx*100 + j)
sql := "insert into t1(a,b) values (" + k + ", 2)"
for t := 0; t < 20; t++ {
sql += ",(" + k + ",2)"
}
tk.MustExec(sql)
q := "select _tidb_rowid from t1 where a=" + k
fmt.Printf("query: %v\n", q)
rows := tk.MustQuery(q).Rows()
c.Assert(len(rows), Equals, 21)
last := 0
for _, r := range rows {
c.Assert(len(r), Equals, 1)
v, err := strconv.Atoi(r[0].(string))
c.Assert(err, Equals, nil)
if last > 0 {
c.Assert(last+1, Equals, v)
}
last = v
}
}
}(i)
}
wg.Wait()
}

func (s *testSuite3) TestJiraIssue5366(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec(`use test`)
Expand Down
10 changes: 10 additions & 0 deletions infoschema/tables.go
Expand Up @@ -2424,6 +2424,11 @@ func (it *infoschemaTable) AllocHandle(ctx sessionctx.Context) (int64, error) {
return 0, table.ErrUnsupportedOp
}

// AllocHandleIDs implements table.Table AllocHandleIDs interface.
func (it *infoschemaTable) AllocHandleIDs(ctx sessionctx.Context, n uint64) (int64, int64, error) {
return 0, 0, table.ErrUnsupportedOp
}

// Allocator implements table.Table Allocator interface.
func (it *infoschemaTable) Allocator(ctx sessionctx.Context) autoid.Allocator {
return nil
Expand Down Expand Up @@ -2541,6 +2546,11 @@ func (vt *VirtualTable) AllocHandle(ctx sessionctx.Context) (int64, error) {
return 0, table.ErrUnsupportedOp
}

// AllocHandleIDs implements table.Table AllocHandleIDs interface.
func (vt *VirtualTable) AllocHandleIDs(ctx sessionctx.Context, n uint64) (int64, int64, error) {
return 0, 0, table.ErrUnsupportedOp
}

// Allocator implements table.Table Allocator interface.
func (vt *VirtualTable) Allocator(ctx sessionctx.Context) autoid.Allocator {
return nil
Expand Down
5 changes: 5 additions & 0 deletions sessionctx/stmtctx/stmtctx.go
Expand Up @@ -118,6 +118,9 @@ type StatementContext struct {
// InsertID is the given insert ID of an auto_increment column.
InsertID uint64

BaseRowID int64
MaxRowID int64

// Copied from SessionVars.TimeZone.
TimeZone *time.Location
Priority mysql.PriorityEnum
Expand Down Expand Up @@ -428,6 +431,8 @@ func (sc *StatementContext) ResetForRetry() {
sc.mu.execDetails = execdetails.ExecDetails{}
sc.mu.allExecDetails = make([]*execdetails.ExecDetails, 0, 4)
sc.mu.Unlock()
sc.MaxRowID = 0
sc.BaseRowID = 0
sc.TableIDs = sc.TableIDs[:0]
sc.IndexNames = sc.IndexNames[:0]
}
Expand Down
3 changes: 3 additions & 0 deletions table/table.go
Expand Up @@ -168,6 +168,9 @@ type Table interface {
// AllocHandle allocates a handle for a new row.
AllocHandle(ctx sessionctx.Context) (int64, error)

// AllocHandleIds allocates multiple handle for rows.
AllocHandleIDs(ctx sessionctx.Context, n uint64) (int64, int64, error)

// Allocator returns Allocator.
Allocator(ctx sessionctx.Context) autoid.Allocator

Expand Down
38 changes: 29 additions & 9 deletions table/tables/tables.go
Expand Up @@ -455,9 +455,22 @@ func (t *tableCommon) AddRecord(ctx sessionctx.Context, r []types.Datum, opts ..
}
}
if !hasRecordID {
recordID, err = t.AllocHandle(ctx)
if err != nil {
return 0, err
stmtCtx := ctx.GetSessionVars().StmtCtx
rows := stmtCtx.RecordRows()
if rows > 1 {
if stmtCtx.BaseRowID >= stmtCtx.MaxRowID {
stmtCtx.BaseRowID, stmtCtx.MaxRowID, err = t.AllocHandleIDs(ctx, rows)
if err != nil {
return 0, err
}
}
stmtCtx.BaseRowID += 1
recordID = stmtCtx.BaseRowID
} else {
recordID, err = t.AllocHandle(ctx)
if err != nil {
return 0, err
}
}
}

Expand Down Expand Up @@ -947,30 +960,37 @@ func GetColDefaultValue(ctx sessionctx.Context, col *table.Column, defaultVals [

// AllocHandle implements table.Table AllocHandle interface.
func (t *tableCommon) AllocHandle(ctx sessionctx.Context) (int64, error) {
_, rowID, err := t.Allocator(ctx).Alloc(t.tableID, 1)
_, rowID, err := t.AllocHandleIDs(ctx, 1)
return rowID, err
}

// AllocHandle implements table.Table AllocHandle interface.
func (t *tableCommon) AllocHandleIDs(ctx sessionctx.Context, n uint64) (int64, int64, error) {
base, maxID, err := t.Allocator(ctx).Alloc(t.tableID, n)
if err != nil {
return 0, err
return 0, 0, err
}
if t.meta.ShardRowIDBits > 0 {
// Use max record ShardRowIDBits to check overflow.
if OverflowShardBits(rowID, t.meta.MaxShardRowIDBits) {
if OverflowShardBits(maxID, t.meta.MaxShardRowIDBits) {
// If overflow, the rowID may be duplicated. For examples,
// t.meta.ShardRowIDBits = 4
// rowID = 0010111111111111111111111111111111111111111111111111111111111111
// shard = 01000000000000000000000000000000000000000000000000000000000000000
// will be duplicated with:
// rowID = 0100111111111111111111111111111111111111111111111111111111111111
// shard = 0010000000000000000000000000000000000000000000000000000000000000
return 0, autoid.ErrAutoincReadFailed
return 0, 0, autoid.ErrAutoincReadFailed
}
txnCtx := ctx.GetSessionVars().TxnCtx
if txnCtx.Shard == nil {
shard := t.calcShard(txnCtx.StartTS)
txnCtx.Shard = &shard
}
rowID |= *txnCtx.Shard
base |= *txnCtx.Shard
maxID |= *txnCtx.Shard
}
return rowID, nil
return base, maxID, nil
}

// OverflowShardBits checks whether the rowID overflow `1<<(64-shardRowIDBits-1) -1`.
Expand Down

0 comments on commit 71e19a7

Please sign in to comment.