Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store/tikv/latch: write a customed allocator for latch #8344

Closed
wants to merge 3 commits into from
Closed
Changes from all commits
Commits
File filter...
Filter file types
Jump to…
Jump to file or symbol
Failed to load files and symbols.

Always

Just for now

@@ -0,0 +1,112 @@
// Copyright 2018 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package latch

import (
"sync"
"unsafe"
)

const blockSize = 1 * 1024 * 1024

type block [blockSize]byte

const metaSize = unsafe.Sizeof(meta{})

// meta is allocated from block.
type meta struct {
offset int
minTS uint64
}

func newBlock(currentTS uint64) *block {
b := new(block)
m := b.meta()
m.offset = int(metaSize)
m.minTS = currentTS
return b
}

func (b *block) meta() *meta {
return (*meta)(unsafe.Pointer(b))
}

func (b *block) alloc(size int) []byte {
m := b.meta()
if m.offset+size >= blockSize {
return nil
}

start := m.offset
m.offset += size
return (*b)[start:m.offset]
}

// allocator is a customized allocator for latch.
// The allocated memory is volatile.
// The allocator can not allocate data larger than the block size.
type allocator struct {
mu sync.Mutex
blocks []*block
currentTS uint64
}

func newAllocator() *allocator {
a := &allocator{}
b := newBlock(a.currentTS)
a.blocks = append(a.blocks, b)
return a
}

var gAlloc = newAllocator()

// Alloc returns the allocated buffer with a timestamp.
func (a *allocator) Alloc(size int) ([]byte, uint64) {
const dataSize = blockSize - metaSize
if size >= int(dataSize) {
panic("alloc fail")
}

// It's a pity this is not a lock-free allocator.
a.mu.Lock()
defer a.mu.Unlock()

a.currentTS++
b := a.blocks[len(a.blocks)-1]
ret := b.alloc(size)
if ret == nil {
b = newBlock(a.currentTS)
a.blocks = append(a.blocks, b)
ret = b.alloc(size)
}

return ret, a.currentTS
}

// GC releases objects that are created before safeTS.
// After this function returns, objects create during [safeTS, MaxUint64) is safe.
func (a *allocator) GC(safeTS uint64) {
i := 0
a.currentTS++
for ; i < len(a.blocks)-1; i++ {
b := a.blocks[i+1]
m := b.meta()
if safeTS < m.minTS {
break
}
// Delete block to free memory.
a.blocks[i] = nil
}
a.blocks = a.blocks[i:]
}
@@ -0,0 +1,113 @@
// Copyright 2018 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package latch

import (
"bytes"
"math/rand"
"testing"

. "github.com/pingcap/check"
)

var _ = Suite(&testAllocSuite{})

type testAllocSuite struct{}

func (s *testAllocSuite) SetUpSuite(c *C) {}

func (s *testAllocSuite) TestAlloc(c *C) {
a := newAllocator()
b, _ := a.Alloc(0)
c.Assert(len(b), Equals, 0)

_, t1 := a.Alloc(666)
_, t2 := a.Alloc(42)
_, t3 := a.Alloc(2)
a.GC(t1)
c.Assert(len(a.blocks), Equals, 1)
a.GC(t2)
c.Assert(len(a.blocks), Equals, 1)
a.GC(t3)
c.Assert(len(a.blocks), Equals, 1)

var t, tmp uint64
for i := 0; i < 100; i++ {
_, tmp = a.Alloc(rand.Intn(blockSize))
if i == 50 {
t = tmp
}
}
blockCount := len(a.blocks)
a.GC(t)
c.Assert(len(a.blocks), Less, blockCount)

a.GC(tmp)
c.Assert(len(a.blocks), Equals, 1)

var fail bool
func() {
defer func() {
if recover() != nil {
fail = true
}
}()
a.Alloc(blockSize)
}()

c.Assert(fail, IsTrue)
}

func (s *testAllocSuite) TestAllocKey(c *C) {
const count = 100000
srcs := make([][]byte, count)
dests := make([][]byte, count)
var xx byte
for i := 0; i < count; i++ {
length := 10 + rand.Intn(40)
src := make([]byte, length)
for idx := range src {
src[idx] = xx
xx++
}
dst := allocKey(src)

srcs[i] = src
dests[i] = dst
}
for i := 0; i < count; i++ {
c.Assert(bytes.Compare(srcs[i], dests[i]), Equals, 0)
}
}

func BenchmarkAlloc(b *testing.B) {
a := newAllocator()
b.ResetTimer()
for i := 0; i < b.N; i++ {
a.Alloc(7)
}
b.ReportAllocs()
}

func BenchmarkStandard(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
standard(7)
}
b.ReportAllocs()
}

func standard(n int) []byte {
return make([]byte, n)
}
@@ -19,6 +19,7 @@ import (
"sort"
"sync"
"time"
"unsafe"

"github.com/cznic/mathutil"
log "github.com/sirupsen/logrus"
@@ -34,6 +35,15 @@ type node struct {
next *node
}

const szUint64 = unsafe.Sizeof(uint64(0))

func (n *node) refresh(safeTS uint64) {
createTime := *((*uint64)(unsafe.Pointer(uintptr(unsafe.Pointer(&n.key[0])) - szUint64)))
if createTime <= safeTS {
n.key = allocKey(n.key)
}
}

// latch stores a key's waiting transactions information.
type latch struct {
queue *node
@@ -169,6 +179,14 @@ func (latches *Latches) release(lock *Lock, wakeupList []*Lock) []*Lock {
return wakeupList
}

func allocKey(key []byte) []byte {
ptr, ver := gAlloc.Alloc(int(szUint64) + len(key))
*((*uint64)(unsafe.Pointer(&ptr[0]))) = ver
copyKey := ptr[szUint64:]
copy(copyKey, key)
return copyKey
}

func (latches *Latches) releaseSlot(lock *Lock) (nextLock *Lock) {
key := lock.keys[lock.acquiredCount-1]
slotID := lock.requiredSlots[lock.acquiredCount-1]
@@ -186,9 +204,7 @@ func (latches *Latches) releaseSlot(lock *Lock) (nextLock *Lock) {
// Make a copy of the key, so latch does not reference the transaction's memory.
// If we do not do it, transaction memory can't be recycle by GC and there will
// be a leak.
copyKey := make([]byte, len(find.key))
copy(copyKey, find.key)
find.key = copyKey
find.key = allocKey(find.key)
if len(latch.waiting) == 0 {
return nil
}
@@ -227,7 +243,7 @@ func (latches *Latches) acquireSlot(lock *Lock) acquireResult {

// Try to recycle to limit the memory usage.
if latch.count >= latchListCount {
latch.recycle(lock.startTS)
latch.recycle(lock.startTS, 0)
}

find := findNode(latch.queue, key)
@@ -262,7 +278,7 @@ func (latches *Latches) acquireSlot(lock *Lock) acquireResult {
}

// recycle is not thread safe, the latch should acquire its lock before executing this function.
func (l *latch) recycle(currentTS uint64) int {
func (l *latch) recycle(currentTS uint64, safeTS uint64) int {
total := 0
fakeHead := node{next: l.queue}
prev := &fakeHead
@@ -272,21 +288,23 @@ func (l *latch) recycle(currentTS uint64) int {
prev.next = curr.next
total++
} else {
curr.refresh(safeTS)
prev = curr
}
}
l.queue = fakeHead.next
return total
}

func (latches *Latches) recycle(currentTS uint64) {
func (latches *Latches) recycle(currentTS uint64, safeTS uint64) {
total := 0
for i := 0; i < len(latches.slots); i++ {
latch := &latches.slots[i]
latch.Lock()
total += latch.recycle(currentTS)
total += latch.recycle(currentTS, safeTS)
latch.Unlock()
}
gAlloc.GC(safeTS)
log.Debugf("recycle run at %v, recycle count = %d...\n", time.Now(), total)
}

@@ -143,7 +143,7 @@ func (s *testLatchSuite) TestRecycle(c *C) {
c.Assert(allEmpty, IsFalse)

currentTS := oracle.ComposeTS(oracle.GetPhysical(now.Add(expireDuration)), 3)
latches.recycle(currentTS)
latches.recycle(currentTS, 0)

for i := 0; i < len(latches.slots); i++ {
latch := &latches.slots[i]
@@ -51,6 +51,7 @@ const latchListCount = 5

func (scheduler *LatchesScheduler) run() {
var counter int
var safeTS uint64
wakeupList := make([]*Lock, 0)
for lock := range scheduler.unlockCh {
wakeupList = scheduler.latches.release(lock, wakeupList)
@@ -62,10 +63,13 @@ func (scheduler *LatchesScheduler) run() {
currentTS := lock.commitTS
elapsed := tsoSub(currentTS, scheduler.lastRecycleTime)
if elapsed > checkInterval || counter > checkCounter {
go scheduler.latches.recycle(lock.commitTS)
go scheduler.latches.recycle(lock.commitTS, safeTS)
scheduler.lastRecycleTime = currentTS
counter = 0
}
if elapsed > checkInterval {
_, safeTS = gAlloc.Alloc(0)
}
}
counter++
}
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.