forked from DrmagicE/gmqtt
/
limiter.go
116 lines (104 loc) · 2.57 KB
/
limiter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
package server
import (
"sync"
"github.com/twxstar/gmqtt/pkg/bitmap"
"github.com/twxstar/gmqtt/pkg/packets"
)
func newPacketIDLimiter(limit uint16) *packetIDLimiter {
return &packetIDLimiter{
cond: sync.NewCond(&sync.Mutex{}),
used: 0,
limit: limit,
exit: false,
freePid: 1,
lockedPid: bitmap.New(packets.MaxPacketID),
}
}
// packetIDLimiter limit the generation of packet id to keep the number of inflight messages
// always less or equal than receive maximum setting of the client.
type packetIDLimiter struct {
cond *sync.Cond
used uint16
limit uint16
exit bool
lockedPid *bitmap.Bitmap // packet id in-use
freePid packets.PacketID // next available id
}
func (p *packetIDLimiter) close() {
p.cond.L.Lock()
p.exit = true
p.cond.L.Unlock()
p.cond.Signal()
}
// pollPacketIDs returns at most max number of unused packetID and marks them as used for a client.
// If there is no available id, the call will be blocked until at least one packet id is available or the limiter has been closed.
// return 0 means the limiter is closed.
// the return number = min(max, i.used).
func (p *packetIDLimiter) pollPacketIDs(max uint16) (id []packets.PacketID) {
p.cond.L.Lock()
defer p.cond.L.Unlock()
for p.used >= p.limit && !p.exit {
p.cond.Wait()
}
if p.exit {
return nil
}
n := max
if remain := p.limit - p.used; remain < max {
n = remain
}
for j := uint16(0); j < n; j++ {
for p.lockedPid.Get(p.freePid) == 1 {
if p.freePid == packets.MaxPacketID {
p.freePid = packets.MinPacketID
} else {
p.freePid++
}
}
id = append(id, p.freePid)
p.used++
p.lockedPid.Set(p.freePid, 1)
if p.freePid == packets.MaxPacketID {
p.freePid = packets.MinPacketID
} else {
p.freePid++
}
}
return id
}
// release marks the given id list as unused
func (p *packetIDLimiter) release(id packets.PacketID) {
p.cond.L.Lock()
p.releaseLocked(id)
p.cond.L.Unlock()
p.cond.Signal()
}
func (p *packetIDLimiter) releaseLocked(id packets.PacketID) {
if p.lockedPid.Get(id) == 1 {
p.lockedPid.Set(id, 0)
p.used--
}
}
func (p *packetIDLimiter) batchRelease(id []packets.PacketID) {
p.cond.L.Lock()
for _, v := range id {
p.releaseLocked(v)
}
p.cond.L.Unlock()
p.cond.Signal()
}
// markInUsed marks the given id as used.
func (p *packetIDLimiter) markUsedLocked(id packets.PacketID) {
p.used++
p.lockedPid.Set(id, 1)
}
func (p *packetIDLimiter) lock() {
p.cond.L.Lock()
}
func (p *packetIDLimiter) unlock() {
p.cond.L.Unlock()
}
func (p *packetIDLimiter) unlockAndSignal() {
p.cond.L.Unlock()
p.cond.Signal()
}