-
Notifications
You must be signed in to change notification settings - Fork 111
/
fromUDP.go
198 lines (158 loc) · 4.31 KB
/
fromUDP.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
package library
import (
"encoding/json"
"github.com/nytlabs/streamtools/st/blocks"
"github.com/nytlabs/streamtools/st/util"
"net"
"sync"
)
const (
MAX_UDP_MESSAGE_SIZE = 1024
)
type listenerUDP struct {
block blocks.BlockInterface
out chan []byte
udpConn net.PacketConn
wait sync.WaitGroup
closed bool
}
func NewListenerUDP(block blocks.BlockInterface, connectionString string, out chan []byte) (*listenerUDP, error) {
l := &listenerUDP{
block: block,
out: out,
}
// Try to open a new UDP connection, returning any error.
if conn, err := net.ListenPacket("udp", connectionString); err != nil {
return l, err
} else {
l.udpConn = conn
}
// Start the listener.
l.wait.Add(1)
go l.listen()
return l, nil
}
func (l *listenerUDP) Close() {
// Signal the listener loop to exit and close the UDP connection.
l.closed = true
l.udpConn.Close()
// Wait for the listener loop to exit.
l.wait.Wait()
}
func (l *listenerUDP) listen() {
// Defer notification that the listener is done.
defer l.wait.Done()
// Create a byte buffer
buffer := make([]byte, MAX_UDP_MESSAGE_SIZE)
// Loop continuously.
for !l.closed {
// Try to read from the connection, and log the error if there is one.
if bytes, _, err := l.udpConn.ReadFrom(buffer); err != nil {
// Log the error.
l.block.Error(err)
} else {
// Copy the message from the buffer.
message := make([]byte, bytes)
copy(message, buffer)
// Dump the message onto the listener chanel.
l.out <- message
}
}
}
// specify those channels we're going to use to communicate with streamtools
type FromUDP struct {
blocks.Block
queryrule chan chan interface{}
inrule chan interface{}
inpoll chan interface{}
in chan interface{}
out chan interface{}
quit chan interface{}
connectionString string
listener *listenerUDP
listenerLock sync.RWMutex
listenerChan chan []byte
}
// we need to build a simple factory so that streamtools can make new blocks of this kind
func NewFromUDP() blocks.BlockInterface {
return &FromUDP{}
}
// Setup is called once before running the block. We build up the channels and
// specify what kind of block this is.
func (u *FromUDP) Setup() {
u.Kind = "fromUDP"
u.Desc = "listens for messages sent over UDP, emitting each into streamtools"
u.inrule = u.InRoute("rule")
u.queryrule = u.QueryRoute("rule")
u.quit = u.Quit()
u.out = u.Broadcast()
u.listenerChan = make(chan []byte)
}
// Run is the block's main loop. Here we listen on the different channels we
// set up.
func (u *FromUDP) Run() {
var ConnectionString string
for {
select {
// Handle a rule change.
case msgI := <-u.inrule:
// Check for a new connection string.
if cs, err := util.ParseString(msgI, "ConnectionString"); err != nil {
u.Error(err)
break
} else {
ConnectionString = cs
}
// Get the listener lock for writing.
u.listenerLock.Lock()
// Check if the connection string has been modified.
if u.connectionString != ConnectionString {
// Save the new connection string.
u.connectionString = ConnectionString
// Close any existing connection.
if u.listener != nil {
u.listener.Close()
u.listener = nil
}
// Try to get a new connection.
if l, err := NewListenerUDP(u, ConnectionString, u.listenerChan); err != nil {
u.Error(err)
} else {
u.listener = l
}
}
// Release the listener lock.
u.listenerLock.Unlock()
// Recieving a message from the listener. This is the same as from SQS
// etc.
case msg := <-u.listenerChan:
var outMsg interface{}
if err := json.Unmarshal(msg, &outMsg); err != nil {
u.Error(err)
} else {
u.out <- outMsg
}
// Respond to a rule query.
case respChan := <-u.queryrule:
// Get the listener lock for reading.
u.listenerLock.RLock()
respChan <- map[string]interface{}{
"ConnectionString": u.connectionString,
}
// Release the listener lock.
u.listenerLock.RUnlock()
// Shutdown everything.
case <-u.quit:
// Get the listener lock for writing and defer its closing.
u.listenerLock.Lock()
defer u.listenerLock.Unlock()
// Clean up the listener if it exists.
if u.listener != nil {
u.listener.Close()
u.listener = nil
}
// quit the block
return
}
}
}