-
Notifications
You must be signed in to change notification settings - Fork 8
/
flow.go
129 lines (111 loc) · 3.6 KB
/
flow.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package client
import (
"github.com/snower/slock/protocol"
"math"
"sync"
"time"
)
type MaxConcurrentFlow struct {
db *Database
flowKey [16]byte
count uint16
timeout uint32
expried uint32
flowLock *Lock
glock *sync.Mutex
}
func NewMaxConcurrentFlow(db *Database, flowKey [16]byte, count uint16, timeout uint32, expried uint32) *MaxConcurrentFlow {
if count > 0 {
count -= 1
}
return &MaxConcurrentFlow{db, flowKey, count, timeout, expried, nil, &sync.Mutex{}}
}
func (self *MaxConcurrentFlow) GetTimeoutFlag() uint16 {
return uint16((self.timeout & 0xffff) >> 16)
}
func (self *MaxConcurrentFlow) SetTimeoutFlag(flag uint16) uint16 {
oflag := self.GetTimeoutFlag()
self.timeout = (self.timeout & 0xffff) | (uint32(flag) << 16)
return oflag
}
func (self *MaxConcurrentFlow) GetExpriedFlag() uint16 {
return uint16((self.expried & 0xffff) >> 16)
}
func (self *MaxConcurrentFlow) SetExpriedFlag(flag uint16) uint16 {
oflag := self.GetExpriedFlag()
self.expried = (self.expried & 0xffff) | (uint32(flag) << 16)
return oflag
}
func (self *MaxConcurrentFlow) Acquire() (*protocol.LockResultCommand, error) {
self.glock.Lock()
if self.flowLock == nil {
self.flowLock = &Lock{self.db, self.db.GenLockId(), self.flowKey, self.timeout, self.expried, self.count, 0}
}
self.glock.Unlock()
return self.flowLock.Lock()
}
func (self *MaxConcurrentFlow) Release() (*protocol.LockResultCommand, error) {
self.glock.Lock()
if self.flowLock == nil {
self.flowLock = &Lock{self.db, self.db.GenLockId(), self.flowKey, self.timeout, self.expried, self.count, 0}
}
self.glock.Unlock()
return self.flowLock.Unlock()
}
type TokenBucketFlow struct {
db *Database
flowKey [16]byte
count uint16
timeout uint32
period float64
expriedFlag uint16
flowLock *Lock
glock *sync.Mutex
}
func NewTokenBucketFlow(db *Database, flowKey [16]byte, count uint16, timeout uint32, period float64) *TokenBucketFlow {
if count > 0 {
count -= 1
}
return &TokenBucketFlow{db, flowKey, count, timeout, period, 0, nil, &sync.Mutex{}}
}
func (self *TokenBucketFlow) GetTimeoutFlag() uint16 {
return uint16((self.timeout & 0xffff) >> 16)
}
func (self *TokenBucketFlow) SetTimeoutFlag(flag uint16) uint16 {
oflag := self.GetTimeoutFlag()
self.timeout = (self.timeout & 0xffff) | (uint32(flag) << 16)
return oflag
}
func (self *TokenBucketFlow) GetExpriedFlag() uint16 {
return self.expriedFlag
}
func (self *TokenBucketFlow) SetExpriedFlag(flag uint16) uint16 {
oflag := self.GetExpriedFlag()
self.expriedFlag = flag
return oflag
}
func (self *TokenBucketFlow) Acquire() (*protocol.LockResultCommand, error) {
self.glock.Lock()
if self.period < 3 {
expried := uint32(math.Ceil(self.period*1000)) | 0x04000000
expried |= uint32(self.expriedFlag) << 16
self.flowLock = &Lock{self.db, self.db.GenLockId(), self.flowKey, self.timeout, expried, self.count, 0}
self.glock.Unlock()
return self.flowLock.Lock()
}
now := time.Now().UnixNano() / 1e9
expried := uint32(int64(math.Ceil(self.period)) - (now % int64(math.Ceil(self.period))))
expried |= uint32(self.expriedFlag) << 16
self.flowLock = &Lock{self.db, self.db.GenLockId(), self.flowKey, 0, expried, self.count, 0}
self.glock.Unlock()
result, err := self.flowLock.Lock()
if err != nil && result != nil && result.Result == protocol.RESULT_TIMEOUT {
self.glock.Lock()
expried = uint32(math.Ceil(self.period))
expried |= uint32(self.expriedFlag) << 16
self.flowLock = &Lock{self.db, self.db.GenLockId(), self.flowKey, self.timeout, expried, self.count, 0}
self.glock.Unlock()
return self.flowLock.Lock()
}
return result, err
}