-
Notifications
You must be signed in to change notification settings - Fork 22
/
dagmutex.go
149 lines (126 loc) · 4.29 KB
/
dagmutex.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
package syncutils
import (
"sync"
"github.com/iotaledger/hive.go/ds/shrinkingmap"
"github.com/iotaledger/hive.go/ierrors"
)
// DAGMutex is a multi-entity reader/writer mutual exclusion lock that allows for starvation.
// Entities can be registered dynamically by providing a comparable identifier.
// The structure and relation of these entities MUST NOT contain any cycles as this may lead to deadlocks while waiting
// to acquire locks cyclically.
//
// Entities can be Lock-ed one at a time. The call blocks until the lock for this entity can be acquired.
// Entities can be RLock-ed multiple in arbitrary order. The call blocks until all read locks can be acquired.
//
// Consider the following example of a DAG with 3 entities A,B,C.
// ┌───┬─┐
// │ │B◄─┐
// ┌▼┐ └─┘ │
// │A│ │
// └▲┘ ┌┴┐
// │ │C│
// └──────┴─┘
//
// Let's assume the following 3 goroutines are competing for access. Time is advancing with every row.
// Goroutine 1 Goroutine 2 Goroutine 3
//
// Lock(A) - RLock(A,B) <- blocking, not able to acquire locks
// work Lock(B) wait
// Unlock(A) work wait
// - Unlock(B) wait <- (internally) now RLock(A) is successful, but still waiting for B
// - - RLock(A, B) <- successful acquired, holding the locks now
type DAGMutex[T comparable] struct {
consumerCounter *shrinkingmap.ShrinkingMap[T, int]
mutexes *shrinkingmap.ShrinkingMap[T, *StarvingMutex]
sync.Mutex
}
// NewDAGMutex creates a new DAGMutex.
func NewDAGMutex[T comparable]() *DAGMutex[T] {
return &DAGMutex[T]{
consumerCounter: shrinkingmap.New[T, int](),
mutexes: shrinkingmap.New[T, *StarvingMutex](),
}
}
// RLock locks all given entities for reading.
// It blocks until all read locks can be acquired.
//
// It should not be used for recursive read locking.
// A blocked Lock call DOES NOT exclude new readers from acquiring the lock. Hence, it is starving.
func (d *DAGMutex[T]) RLock(ids ...T) {
for _, mutex := range d.registerMutexes(ids...) {
mutex.RLock()
}
}
// RUnlock unlocks reading for all given entities.
// It does not affect other simultaneous readers.
func (d *DAGMutex[T]) RUnlock(ids ...T) {
for _, mutex := range d.unregisterMutexes(ids...) {
mutex.RUnlock()
}
}
// Lock locks the given entity for writing.
// If the lock is already locked for reading or writing, Lock blocks until the lock is available.
func (d *DAGMutex[T]) Lock(id T) {
d.Mutex.Lock()
mutex := d.registerMutex(id)
d.Mutex.Unlock()
mutex.Lock()
}
// Unlock unlocks the given entity for writing.
//
// As with Mutexes, a locked DAGMutex is not associated with a particular goroutine. One goroutine may RLock (Lock) an
// entity within DAGMutex and then arrange for another goroutine to RUnlock (Unlock) it.
func (d *DAGMutex[T]) Unlock(id T) {
d.Mutex.Lock()
mutex := d.unregisterMutex(id)
if mutex == nil {
d.Mutex.Unlock()
return
}
d.Mutex.Unlock()
mutex.Unlock()
}
func (d *DAGMutex[T]) registerMutexes(ids ...T) (mutexes []*StarvingMutex) {
d.Mutex.Lock()
defer d.Mutex.Unlock()
mutexes = make([]*StarvingMutex, len(ids))
for i, id := range ids {
mutexes[i] = d.registerMutex(id)
}
return mutexes
}
func (d *DAGMutex[T]) registerMutex(id T) (mutex *StarvingMutex) {
mutex, mutexExists := d.mutexes.Get(id)
if !mutexExists {
mutex = NewStarvingMutex()
d.mutexes.Set(id, mutex)
}
count, _ := d.consumerCounter.Get(id)
d.consumerCounter.Set(id, count+1)
return mutex
}
func (d *DAGMutex[T]) unregisterMutexes(ids ...T) (mutexes []*StarvingMutex) {
d.Mutex.Lock()
defer d.Mutex.Unlock()
mutexes = make([]*StarvingMutex, 0)
for _, id := range ids {
if mutex := d.unregisterMutex(id); mutex != nil {
mutexes = append(mutexes, mutex)
}
}
return mutexes
}
func (d *DAGMutex[T]) unregisterMutex(id T) (mutex *StarvingMutex) {
if count, _ := d.consumerCounter.Get(id); count == 1 {
d.consumerCounter.Delete(id)
d.mutexes.Delete(id)
return nil
}
mutex, mutexExists := d.mutexes.Get(id)
if !mutexExists {
panic(ierrors.Errorf("called Unlock or RUnlock too often for entity with %v", id))
}
count, _ := d.consumerCounter.Get(id)
d.consumerCounter.Set(id, count-1)
return mutex
}