forked from VolantMQ/volantmq
/
ack.go
59 lines (45 loc) · 1005 Bytes
/
ack.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
package session
import (
"sync"
"github.com/troian/surgemq/message"
)
type onRelease func(msg message.Provider)
type ackQueue struct {
lock sync.Mutex
messages map[message.PacketID]message.Provider
onRelease onRelease
}
func newAckQueue(cb onRelease) *ackQueue {
a := ackQueue{
messages: make(map[message.PacketID]message.Provider),
onRelease: cb,
}
return &a
}
func (a *ackQueue) store(msg message.Provider) {
a.lock.Lock()
defer a.lock.Unlock()
id, _ := msg.PacketID()
a.messages[id] = msg
}
func (a *ackQueue) release(msg message.Provider) {
a.lock.Lock()
defer a.lock.Unlock()
id, _ := msg.PacketID()
if e, ok := a.messages[id]; ok {
if a.onRelease != nil {
a.onRelease(e)
}
a.messages[id] = nil
delete(a.messages, id)
}
}
func (a *ackQueue) get() map[message.PacketID]message.Provider {
return a.messages
}
//func (a *ackQueue) wipe() {
// a.lock.Lock()
// defer a.lock.Unlock()
//
// a.messages = make(map[message.PacketID]message.Provider)
//}