Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store/tikv,executor: redesign the latch scheduler #7711

Merged
merged 13 commits into from Oct 9, 2018
3 changes: 1 addition & 2 deletions executor/write_test.go
Expand Up @@ -1815,8 +1815,7 @@ func (s *testBypassSuite) TestBypassLatch(c *C) {

// txn1 and txn2 data range do not overlap, but using latches result in txn conflict.
fn()
_, err = tk1.Exec("commit")
c.Assert(err, NotNil)
tk1.MustExec("commit")

tk1.MustExec("truncate table t")
fn()
Expand Down
206 changes: 128 additions & 78 deletions store/tikv/latch/latch.go
Expand Up @@ -14,6 +14,7 @@
package latch

import (
"bytes"
"math/bits"
"sort"
"sync"
Expand All @@ -22,32 +23,26 @@ import (
"github.com/spaolacci/murmur3"
)

// latch stores a key's waiting transactions information.
type latch struct {
// Whether there is any transaction in waitingQueue except head.
hasMoreWaiting bool
// The startTS of the transaction which is the head of waiting transactions.
waitingQueueHead uint64
maxCommitTS uint64
sync.Mutex
}
type node struct {
slotID int
key []byte
maxCommitTS uint64
value *Lock

func (l *latch) isEmpty() bool {
return l.waitingQueueHead == 0 && !l.hasMoreWaiting
next *node
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use list.List?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. list.List will make unnecessary allocation.

Use

type Element struct {

    // The value stored with this element.
    Value interface{}
    // contains filtered or unexported fields
}

is similar to

type node struct {
    Value *nodeValue
}
type nodeValue {
    slotID int
    key []byte
    maxCommitTS uint64
    value *Lock
}
  1. list.List is a doubly linked list, while a single linked list is sufficient here.
  2. list data struct is simple and common enough to implement

}

func (l *latch) free() {
l.waitingQueueHead = 0
}

func (l *latch) refreshCommitTS(commitTS uint64) {
l.Lock()
defer l.Unlock()
l.maxCommitTS = mathutil.MaxUint64(commitTS, l.maxCommitTS)
// latch stores a key's waiting transactions information.
type latch struct {
queue *node
count int
waiting []*Lock
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not each node has a waiting queue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Waiting queue is moved from each node to the latch for those reasons:

  1. nodes in the queue is inserted every now and then, if each node has is waiting queue, the queue would be created and destroyed. There will be more allocations, and it's less memory efficient.
  2. I have an assumption that the waiting queue would not be large, when a list is small enough, an array is very efficient.
  3. You may still remember the "first waiting one automatically become running" problem and the old code is complex enough to handle different states. If each node doesn't have the waiting queue, the problem could be avoid.
    @zhangjinpeng1987

sync.Mutex
}

// Lock is the locks' information required for a transaction.
type Lock struct {
keys [][]byte
// The slot IDs of the latches(keys) that a startTS must acquire before being able to processed.
requiredSlots []int
// The number of latches that the transaction has acquired. For status is stale, it include the
Expand Down Expand Up @@ -96,9 +91,20 @@ func (l *Lock) SetCommitTS(commitTS uint64) {
// but conceptually a latch is a queue, and a slot is an index to the queue
type Latches struct {
slots []latch
// The waiting queue for each slot(slotID => slice of Lock).
waitingQueues map[int][]*Lock
sync.RWMutex
}

type bytesSlice [][]byte

func (s bytesSlice) Len() int {
return len(s)
}

func (s bytesSlice) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}

func (s bytesSlice) Less(i, j int) bool {
return bytes.Compare(s[i], s[j]) < 0
}

// NewLatches create a Latches with fixed length,
Expand All @@ -107,14 +113,15 @@ func NewLatches(size uint) *Latches {
powerOfTwoSize := 1 << uint32(bits.Len32(uint32(size-1)))
slots := make([]latch, powerOfTwoSize)
return &Latches{
slots: slots,
waitingQueues: make(map[int][]*Lock),
slots: slots,
}
}

// genLock generates Lock for the transaction with startTS and keys.
func (latches *Latches) genLock(startTS uint64, keys [][]byte) *Lock {
sort.Sort(bytesSlice(keys))
return &Lock{
keys: keys,
requiredSlots: latches.genSlotIDs(keys),
acquiredCount: 0,
startTS: startTS,
Expand All @@ -126,17 +133,7 @@ func (latches *Latches) genSlotIDs(keys [][]byte) []int {
for _, key := range keys {
slots = append(slots, latches.slotID(key))
}
sort.Ints(slots)
if len(slots) <= 1 {
return slots
}
dedup := slots[:1]
for i := 1; i < len(slots); i++ {
if slots[i] != slots[i-1] {
dedup = append(dedup, slots[i])
}
}
return dedup
return slots
}

// slotID return slotID for current key.
Expand All @@ -150,8 +147,7 @@ func (latches *Latches) acquire(lock *Lock) acquireResult {
return acquireStale
}
for lock.acquiredCount < len(lock.requiredSlots) {
slotID := lock.requiredSlots[lock.acquiredCount]
status := latches.acquireSlot(slotID, lock)
status := latches.acquireSlot(lock)
if status != acquireSuccess {
return status
}
Expand All @@ -161,75 +157,129 @@ func (latches *Latches) acquire(lock *Lock) acquireResult {

// release releases all latches owned by the `lock` and returns the wakeup list.
// Preconditions: the caller must ensure the transaction's status is not locked.
func (latches *Latches) release(lock *Lock, commitTS uint64, wakeupList []*Lock) []*Lock {
func (latches *Latches) release(lock *Lock, wakeupList []*Lock) []*Lock {
wakeupList = wakeupList[:0]
for i := 0; i < lock.acquiredCount; i++ {
slotID := lock.requiredSlots[i]
if nextLock := latches.releaseSlot(slotID, commitTS); nextLock != nil {
for lock.acquiredCount > 0 {
if nextLock := latches.releaseSlot(lock); nextLock != nil {
wakeupList = append(wakeupList, nextLock)
}
}
return wakeupList
}

// refreshCommitTS refreshes commitTS for keys.
func (latches *Latches) refreshCommitTS(keys [][]byte, commitTS uint64) {
slotIDs := latches.genSlotIDs(keys)
for _, slotID := range slotIDs {
latches.slots[slotID].refreshCommitTS(commitTS)
}
}

func (latches *Latches) releaseSlot(slotID int, commitTS uint64) (nextLock *Lock) {
func (latches *Latches) releaseSlot(lock *Lock) (nextLock *Lock) {
key := lock.keys[lock.acquiredCount-1]
slotID := lock.requiredSlots[lock.acquiredCount-1]
latch := &latches.slots[slotID]
lock.acquiredCount--
latch.Lock()
defer latch.Unlock()
latch.maxCommitTS = mathutil.MaxUint64(latch.maxCommitTS, commitTS)
if !latch.hasMoreWaiting {
latch.free()

find := findNode(latch.queue, key)
if find.value != lock {
panic("releaseSlot wrong")
}
find.maxCommitTS = mathutil.MaxUint64(find.maxCommitTS, lock.commitTS)
find.value = nil
if len(latch.waiting) == 0 {
return nil
}
nextLock, latch.hasMoreWaiting = latches.popFromWaitingQueue(slotID)
latch.waitingQueueHead = nextLock.startTS
nextLock.acquiredCount++
if latch.maxCommitTS > nextLock.startTS {
nextLock.isStale = true

var idx int
for idx = 0; idx < len(latch.waiting); idx++ {
waiting := latch.waiting[idx]
if bytes.Compare(waiting.keys[waiting.acquiredCount], key) == 0 {
break
}
}
return nextLock
}
// Wake up the first one in waiting queue.
if idx < len(latch.waiting) {
nextLock = latch.waiting[idx]
// Delete element latch.waiting[idx] from the array.
copy(latch.waiting[idx:], latch.waiting[idx+1:])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to use a list for waiting locks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@coocood What's your advise?

latch.waiting[len(latch.waiting)-1] = nil
latch.waiting = latch.waiting[:len(latch.waiting)-1]

func (latches *Latches) popFromWaitingQueue(slotID int) (front *Lock, hasMoreWaiting bool) {
latches.Lock()
defer latches.Unlock()
waiting := latches.waitingQueues[slotID]
front = waiting[0]
if len(waiting) == 1 {
delete(latches.waitingQueues, slotID)
} else {
latches.waitingQueues[slotID] = waiting[1:]
hasMoreWaiting = true
if find.maxCommitTS > nextLock.startTS {
nextLock.isStale = true
}
}

return
}

func (latches *Latches) acquireSlot(slotID int, lock *Lock) acquireResult {
func (latches *Latches) acquireSlot(lock *Lock) acquireResult {
key := lock.keys[lock.acquiredCount]
slotID := lock.requiredSlots[lock.acquiredCount]
latch := &latches.slots[slotID]
latch.Lock()
defer latch.Unlock()
if latch.maxCommitTS > lock.startTS {

// Try to recycle to limit the memory usage.
if latch.count >= latchListCount {
latch.recycle(lock.startTS)
}

find := findNode(latch.queue, key)
if find == nil {
tmp := &node{
slotID: slotID,
key: key,
value: lock,
}
tmp.next = latch.queue
latch.queue = tmp
latch.count++

lock.acquiredCount++
return acquireSuccess
}

if find.maxCommitTS > lock.startTS {
lock.isStale = true
return acquireStale
}

if latch.isEmpty() {
latch.waitingQueueHead = lock.startTS
if find.value == nil {
find.value = lock
lock.acquiredCount++
return acquireSuccess
}

// Push the current transaction into waitingQueue.
latch.hasMoreWaiting = true
latches.Lock()
defer latches.Unlock()
latches.waitingQueues[slotID] = append(latches.waitingQueues[slotID], lock)
latch.waiting = append(latch.waiting, lock)
return acquireLocked
}

// recycle is not thread safe, the latch should acquire its lock before executing this function.
func (l *latch) recycle(currentTS uint64) {
fakeHead := node{next: l.queue}
prev := &fakeHead
for curr := prev.next; curr != nil; curr = curr.next {
if tsoSub(currentTS, curr.maxCommitTS) >= expireDuration && curr.value == nil {
l.count--
prev.next = curr.next
zhangjinpeng87 marked this conversation as resolved.
Show resolved Hide resolved
} else {
prev = curr
}
}
l.queue = fakeHead.next
}

func (latches *Latches) recycle(currentTS uint64) {
for i := 0; i < len(latches.slots); i++ {
latch := &latches.slots[i]
latch.Lock()
latch.recycle(currentTS)
latch.Unlock()
}
}

func findNode(list *node, key []byte) *node {
for n := list; n != nil; n = n.next {
if bytes.Compare(n.key, key) == 0 {
return n
}
}
return nil
}