-
-
Notifications
You must be signed in to change notification settings - Fork 13
/
locks.go
227 lines (188 loc) · 5.58 KB
/
locks.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
package core
import (
"errors"
"runtime"
"runtime/debug"
"sync"
"time"
)
const (
SMART_LOCK_HOLD_TIMEOUT = 100 * time.Millisecond
)
var (
ErrValueNotShared = errors.New("value is not shared")
ErrLockReEntry = errors.New("lock re-entry")
ErrHeldLockWithoutHolder = errors.New("held lock without holder")
)
// A SmartLock is a lock that ignores locking operations until the value it protects is shared.
// It is not intended to be held for long durations and it should not be used to isolate transactions.
// The context of the current holder state (module) may be cancelled by other modules calling the Lock method.
type SmartLock struct {
lockLock sync.Mutex //protects the SmartLock's fields
holderState *GlobalState
holdStart RelativeTimeInstant64
firstEntry string
totalWaitPressure ModulePriority //TODO: use max(new value, math.MaxInt32) to update this field.
takeover bool
isValueShared bool
}
func (lock *SmartLock) IsValueShared() bool {
return lock.isValueShared
}
func (lock *SmartLock) AssertValueShared() {
if !lock.isValueShared {
panic(ErrValueNotShared)
}
}
func (lock *SmartLock) Share(originState *GlobalState, fn func()) {
if lock.isValueShared {
return
}
lock.isValueShared = true
fn()
}
// IsHeld tells whether the lock is held, regardless of the state of the holder (cancelled or not).
func (lock *SmartLock) IsHeld() bool {
lock.lockLock.Lock()
defer lock.lockLock.Unlock()
return lock.holderState != nil
}
func (lock *SmartLock) Lock(state *GlobalState, embedder PotentiallySharable, ignoreLockedValues ...bool) {
if state == nil {
panic(errors.New("cannot lock smart lock: nil state"))
}
//IMPORTANT:
//Locking/unlocking of SmartLock should be cheap because there are potentially thousands of operations per second.
//No channel or goroutine should be created by default.
if !lock.isValueShared {
return
}
//TODO: extract logic for reuse ?
if len(ignoreLockedValues) == 0 || !ignoreLockedValues[0] {
for _, e := range state.lockedValues {
if e == embedder {
return //already locked
}
}
}
if lock.tryLockIfNoPressure(state) {
return
}
state.Ctx.PauseCPUTimeDepletion()
needResumingDepletion := true
defer func() {
if needResumingDepletion {
state.Ctx.ResumeCPUTimeDepletion()
}
}()
//priority := state.ComputePriority()
//waitPressure := priority
//TODO: give priority to modules with a high wait pressure (priority * time spent waiting).
for {
select {
case <-state.Ctx.Done():
panic(state.Ctx.Err())
default:
lock.lockLock.Lock()
if lock.holderState == state {
lock.lockLock.Unlock()
panic(ErrLockReEntry)
}
// Acquire the lock if there is no holder and the wait pressure is zero.
if lock.holderState == nil && lock.totalWaitPressure == 0 {
func() {
defer lock.lockLock.Unlock()
lock.holderState = state
lock.takeover = false
lock.holdStart = GetRelativeTimeInstant64()
lock.firstEntry = string(debug.Stack())
}()
return
}
//Acquire the lock if the holder is not running.
if lock.holderState.Ctx.IsDone() {
func() {
defer lock.lockLock.Unlock()
lock.holderState = state
lock.takeover = false
lock.holdStart = GetRelativeTimeInstant64()
lock.firstEntry = string(debug.Stack())
}()
return
}
//Cancel the execution of the holder if it has held the lock for too long, and acquire the lock.
if time.Since(lock.holdStart.Time()) >= SMART_LOCK_HOLD_TIMEOUT {
func() {
needUnlock := true
defer func() {
if needUnlock {
lock.lockLock.Unlock()
}
}()
prevHolder := lock.holderState
lock.takeover = true
lock.holderState = state
lock.holdStart = GetRelativeTimeInstant64()
lock.firstEntry = string(debug.Stack())
//Release the internal lock.
needUnlock = false
lock.lockLock.Unlock()
//Resume CPU time depletion early because CancelGracefully() may perform some work.
needResumingDepletion = false
state.Ctx.ResumeCPUTimeDepletion()
prevHolder.Ctx.CancelGracefully()
}()
return
}
lock.lockLock.Unlock()
runtime.Gosched()
}
}
}
// Acquire the lock if there is no holder and the wait pressure is zero.
func (lock *SmartLock) tryLockIfNoPressure(state *GlobalState) bool {
lock.lockLock.Lock()
defer lock.lockLock.Unlock()
if lock.holderState == state {
panic(ErrLockReEntry)
}
if lock.holderState == nil && lock.totalWaitPressure == 0 {
lock.holderState = state
lock.takeover = false
lock.holdStart = GetRelativeTimeInstant64()
lock.firstEntry = string(debug.Stack())
return true
}
return false
}
func (lock *SmartLock) Unlock(state *GlobalState, embedder PotentiallySharable, ignoreLockedValues ...bool) {
if state == nil {
panic(errors.New("cannot unlock smart lock: nil state"))
}
//IMPORTANT:
//Locking/unlocking of SmartLock should be cheap because there are potentially thousands of operations per second.
//No channel or goroutine should be created by default.
if !lock.isValueShared {
return
}
if len(ignoreLockedValues) == 0 || !ignoreLockedValues[0] {
for _, e := range state.lockedValues {
if e == embedder {
return //no need to unlock now.
}
}
}
lock.lockLock.Lock()
if lock.takeover { //lock.holderState is the state taking over the lock.
lock.takeover = false
lock.lockLock.Unlock()
return
}
if lock.holderState != state {
lock.lockLock.Unlock()
state.Ctx.Logger().Warn().Msg("holder state is not the state provided for unlocking a smart lock")
return
}
lock.holderState = nil
lock.lockLock.Unlock()
}