/
responsehandler.go
130 lines (113 loc) · 2.95 KB
/
responsehandler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package messagix
import (
"sync"
"time"
)
type ChannelType int
const (
RequestChannel ChannelType = iota
PacketChannel
)
type ResponseHandler struct {
client *Client
lock sync.RWMutex
requestChannels map[uint16]chan interface{}
packetChannels map[uint16]chan interface{}
}
func (p *ResponseHandler) hasPacket(packetId uint16) bool {
p.lock.RLock()
_, ok := p.requestChannels[packetId]
p.lock.RUnlock()
return ok
}
func (p *ResponseHandler) addPacketChannel(packetId uint16) {
p.lock.Lock()
p.packetChannels[packetId] = make(chan interface{}, 1) // buffered channel with capacity of 1
p.lock.Unlock()
}
func (p *ResponseHandler) addRequestChannel(packetId uint16) {
p.lock.Lock()
p.requestChannels[packetId] = make(chan interface{}, 1)
p.lock.Unlock()
}
func (p *ResponseHandler) updatePacketChannel(packetId uint16, packetData interface{}) bool {
p.lock.RLock()
ch, ok := p.packetChannels[packetId]
p.lock.RUnlock()
if ok {
ch <- packetData
return true
}
return false
}
func (p *ResponseHandler) updateRequestChannel(packetId uint16, packetData interface{}) bool {
p.lock.RLock()
ch, ok := p.requestChannels[packetId]
p.lock.RUnlock()
if ok {
ch <- packetData
return true
}
return false
}
func (p *ResponseHandler) waitForPubACKDetails(packetId uint16) *Event_PublishACK {
return castIfNotNil[Event_PublishACK](p.waitForDetails(packetId, PacketChannel))
}
func (p *ResponseHandler) waitForSubACKDetails(packetId uint16) *Event_SubscribeACK {
return castIfNotNil[Event_SubscribeACK](p.waitForDetails(packetId, PacketChannel))
}
func (p *ResponseHandler) waitForPubResponseDetails(packetId uint16) *Event_PublishResponse {
return castIfNotNil[Event_PublishResponse](p.waitForDetails(packetId, RequestChannel))
}
func castIfNotNil[T any](i interface{}) *T {
if i != nil {
return i.(*T)
}
return nil
}
func (p *ResponseHandler) waitForDetails(packetId uint16, channelType ChannelType) interface{} {
ch, ok := p.getChannel(packetId, channelType)
if !ok {
return nil
}
select {
case response := <-ch:
p.deleteDetails(packetId, channelType)
return response
case <-time.After(packetTimeout):
p.deleteDetails(packetId, channelType)
// TODO this should probably be an error
return nil
}
}
func (p *ResponseHandler) deleteDetails(packetId uint16, channelType ChannelType) {
p.lock.Lock()
defer p.lock.Unlock()
switch channelType {
case RequestChannel:
ch, ok := p.requestChannels[packetId]
if ok {
close(ch)
delete(p.requestChannels, packetId)
}
case PacketChannel:
ch, ok := p.packetChannels[packetId]
if ok {
close(ch)
delete(p.packetChannels, packetId)
}
}
}
func (p *ResponseHandler) getChannel(packetId uint16, channelType ChannelType) (chan interface{}, bool) {
var ch chan interface{}
var ok bool
p.lock.RLock()
switch channelType {
case RequestChannel:
ch, ok = p.requestChannels[packetId]
case PacketChannel:
ch, ok = p.packetChannels[packetId]
}
p.lock.RUnlock()
return ch, ok
}