Skip to content

Commit

Permalink
LockManager wait queue adds priority support
Browse files Browse the repository at this point in the history
  • Loading branch information
snower committed May 23, 2024
1 parent a45a7c8 commit 18eb3ee
Show file tree
Hide file tree
Showing 2 changed files with 149 additions and 9 deletions.
97 changes: 91 additions & 6 deletions server/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,13 @@ import (
"sync/atomic"
)

type ILockManagerRingQueue interface {
Push(lock *Lock)
Pop() *Lock
Head() *Lock
IterNodes() [][]*Lock
}

type LockManagerRingQueue struct {
queue []*Lock
index int
Expand Down Expand Up @@ -47,6 +54,73 @@ func (self *LockManagerRingQueue) Head() *Lock {
return self.queue[self.index]
}

func (self *LockManagerRingQueue) IterNodes() [][]*Lock {
if self.index < len(self.queue) {
return [][]*Lock{self.queue[self.index:]}
}
return make([][]*Lock, 0)
}

type LockManagerPriorityRingQueueNode struct {
ringQueue *LockManagerRingQueue
priority uint8
}

type LockManagerPriorityRingQueue struct {
priorityNodes []*LockManagerPriorityRingQueueNode
size int
}

func NewLockManagerPriorityRingQueue(size int) *LockManagerPriorityRingQueue {
return &LockManagerPriorityRingQueue{make([]*LockManagerPriorityRingQueueNode, 0), size}
}

func (self *LockManagerPriorityRingQueue) Push(lock *Lock) {
for _, node := range self.priorityNodes {
if node.priority == lock.command.Rcount {
node.ringQueue.Push(lock)
return
}
}
node := &LockManagerPriorityRingQueueNode{NewLockManagerRingQueue(self.size), lock.command.Rcount}
self.priorityNodes = append(self.priorityNodes, node)
for i := len(self.priorityNodes) - 2; i >= 0; i-- {
if self.priorityNodes[i].priority <= self.priorityNodes[i+1].priority {
break
}
self.priorityNodes[i], self.priorityNodes[i+1] = self.priorityNodes[i+1], self.priorityNodes[i]
}
node.ringQueue.Push(lock)
}

func (self *LockManagerPriorityRingQueue) Pop() *Lock {
for _, node := range self.priorityNodes {
lock := node.ringQueue.Pop()
if lock != nil {
return lock
}
}
return nil
}

func (self *LockManagerPriorityRingQueue) Head() *Lock {
for _, node := range self.priorityNodes {
lock := node.ringQueue.Head()
if lock != nil {
return lock
}
}
return nil
}

func (self *LockManagerPriorityRingQueue) IterNodes() [][]*Lock {
iterNodes := make([][]*Lock, 0)
for _, node := range self.priorityNodes {
iterNodes = append(iterNodes, node.ringQueue.IterNodes()...)
}
return make([][]*Lock, 0)
}

type LockManagerLockQueue struct {
queue *LockQueue
maps map[[16]byte]*Lock
Expand Down Expand Up @@ -74,10 +148,13 @@ func (self *LockManagerLockQueue) Head() *Lock {
type LockManagerWaitQueue struct {
fastQueue []*Lock
fastIndex int
ringQueue *LockManagerRingQueue
ringQueue ILockManagerRingQueue
}

func NewLockManagerWaitQueue() *LockManagerWaitQueue {
func NewLockManagerWaitQueue(priorityQueue bool) *LockManagerWaitQueue {
if priorityQueue {
return &LockManagerWaitQueue{make([]*Lock, 0, 8), 0, NewLockManagerPriorityRingQueue(16)}
}
return &LockManagerWaitQueue{make([]*Lock, 0, 8), 0, nil}
}

Expand Down Expand Up @@ -134,12 +211,12 @@ func (self *LockManagerWaitQueue) Rellac() {
}

func (self *LockManagerWaitQueue) IterNodes() [][]*Lock {
lockNodes := make([][]*Lock, 0, 2)
lockNodes := make([][]*Lock, 0)
if self.fastIndex < len(self.fastQueue) {
lockNodes = append(lockNodes, self.fastQueue[self.fastIndex:])
}
if self.ringQueue != nil && self.ringQueue.index < len(self.ringQueue.queue) {
lockNodes = append(lockNodes, self.ringQueue.queue[self.ringQueue.index:])
if self.ringQueue != nil {
lockNodes = append(lockNodes, self.ringQueue.IterNodes()...)
}
return lockNodes
}
Expand Down Expand Up @@ -381,7 +458,15 @@ func (self *LockManager) UpdateLockedLock(lock *Lock, command *protocol.LockComm

func (self *LockManager) AddWaitLock(lock *Lock) *Lock {
if self.waitLocks == nil {
self.waitLocks = NewLockManagerWaitQueue()
if lock.command.TimeoutFlag&protocol.TIMEOUT_FLAG_RCOUNT_IS_PRIORITY != 0 {
self.waitLocks = NewLockManagerWaitQueue(true)
} else {
self.waitLocks = NewLockManagerWaitQueue(false)
}
} else {
if lock.command.TimeoutFlag&protocol.TIMEOUT_FLAG_RCOUNT_IS_PRIORITY != 0 && self.waitLocks.ringQueue == nil {
self.waitLocks.ringQueue = NewLockManagerPriorityRingQueue(16)
}
}
self.waitLocks.Push(lock)
lock.refCount++
Expand Down
61 changes: 58 additions & 3 deletions server/lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,62 @@ func TestLockManagerRingQueue(t *testing.T) {
}
}

func TestLockManagerPriorityRingQueue(t *testing.T) {
queue := NewLockManagerPriorityRingQueue(4)

lock := &Lock{command: &protocol.LockCommand{TimeoutFlag: protocol.TIMEOUT_FLAG_RCOUNT_IS_PRIORITY, Rcount: 1}}
queue.Push(lock)
if queue.Head() != lock || queue.Pop() != lock || len(queue.priorityNodes) != 1 || queue.priorityNodes[0].priority != 1 || queue.priorityNodes[0].ringQueue.index != 0 {
t.Errorf("LockManagerPriorityRingQueue Push Pop fail")
return
}

lock1 := &Lock{command: &protocol.LockCommand{TimeoutFlag: protocol.TIMEOUT_FLAG_RCOUNT_IS_PRIORITY, Rcount: 2}}
queue.Push(lock1)
lock2 := &Lock{command: &protocol.LockCommand{TimeoutFlag: protocol.TIMEOUT_FLAG_RCOUNT_IS_PRIORITY, Rcount: 1}}
queue.Push(lock2)
if len(queue.priorityNodes) != 2 || queue.priorityNodes[0].priority != 1 || queue.priorityNodes[1].priority != 2 {
t.Errorf("LockManagerPriorityRingQueue Push Priority fail")
return
}
if queue.Head() != lock2 || queue.Pop() != lock2 || len(queue.priorityNodes) != 2 || queue.priorityNodes[0].priority != 1 || queue.priorityNodes[0].ringQueue.index != 0 {
t.Errorf("LockManagerPriorityRingQueue Push Pop fail")
return
}
if queue.Head() != lock1 || queue.Pop() != lock1 || len(queue.priorityNodes) != 2 || queue.priorityNodes[1].priority != 2 || queue.priorityNodes[1].ringQueue.index != 0 {
t.Errorf("LockManagerPriorityRingQueue Push Pop fail")
return
}

for i := 0; i < 10000; i++ {
lock = &Lock{command: &protocol.LockCommand{TimeoutFlag: protocol.TIMEOUT_FLAG_RCOUNT_IS_PRIORITY, Rcount: uint8(rand.Intn(50) + 1)}}
queue.Push(lock)
}
currentPriority := uint8(0)
for queue.Head() != nil {
lock = queue.Pop()
if lock == nil || lock.command.Rcount < currentPriority {
t.Errorf("LockManagerPriorityRingQueue Pop fail")
return
}
currentPriority = lock.command.Rcount
}
currentPriority = uint8(0)
for _, node := range queue.priorityNodes {
if node.priority <= currentPriority {
t.Errorf("LockManagerPriorityRingQueue priorityNodes fail")
return
}
if node.ringQueue.index != 0 {
t.Errorf("LockManagerPriorityRingQueue priorityNodes ringQueue fail")
return
}
currentPriority = node.priority
}
}

func TestLockManagerWaitQueue(t *testing.T) {
queue := NewLockManagerWaitQueue()
queue := NewLockManagerWaitQueue(false)

lock := &Lock{}
queue.Push(lock)
Expand Down Expand Up @@ -117,7 +171,8 @@ func TestLockManagerWaitQueue(t *testing.T) {
}
for i := 0; i < 1024; i++ {
queue.Push(lock)
if len(queue.fastQueue) != 8 || cap(queue.fastQueue) != 8 || queue.ringQueue == nil || len(queue.ringQueue.queue) != i+1 {
ringQueue := queue.ringQueue.(*LockManagerRingQueue)
if len(queue.fastQueue) != 8 || cap(queue.fastQueue) != 8 || queue.ringQueue == nil || len(ringQueue.queue) != i+1 {
t.Errorf("LockManagerWaitQueue Push Size fail")
return
}
Expand Down Expand Up @@ -151,7 +206,7 @@ func TestLockManagerWaitQueue(t *testing.T) {

func BenchmarkLockManagerWaitQueue(b *testing.B) {
lock := &Lock{}
queue := NewLockManagerWaitQueue()
queue := NewLockManagerWaitQueue(false)
for i := 0; i < b.N; i++ {
for j := 0; j < 10000; j++ {
n := rand.Intn(1024)
Expand Down

0 comments on commit 18eb3ee

Please sign in to comment.