/
transport.go
269 lines (255 loc) · 7.12 KB
/
transport.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
package transport
import (
"context"
"crypto/cipher"
"crypto/rand"
"errors"
"io"
"net"
"sync"
"time"
"github.com/p9c/monorepo/pkg/fec"
)
// HandleFunc is a map of handlers for working on received, decoded packets
type HandleFunc map[string]func(ctx interface{}) func(b []byte) (e error)
// Connection is the state and working memory references for a simple reliable UDP lan transport, encrypted by a GCM AES
// cipher, with the simple protocol of sending out 9 packets containing encrypted FEC shards containing a slice of
// bytes.
//
// This protocol probably won't work well outside of a multicast lan in adverse conditions but it is designed for local
// network control systems todo: it is if the updated fec segmenting code is put in
type Connection struct {
maxDatagramSize int
buffers map[string]*MsgBuffer
sendAddress *net.UDPAddr
SendConn net.Conn
listenAddress *net.UDPAddr
listenConn *net.PacketConn
ciph cipher.AEAD
ctx context.Context
mx *sync.Mutex
}
//
// // NewConnection creates a new connection with a defined default send
// // connection and listener and pre shared key password for encryption on the
// // local network
// func NewConnection(send, listen, preSharedKey string,
// maxDatagramSize int, ctx context.Context) (c *Connection, e error) {
// sendAddr := &net.UDPAddr{}
// var sendConn net.Conn
// listenAddr := &net.UDPAddr{}
// var listenConn net.PacketConn
// if listen != "" {
// config := &net.ListenConfig{Control: reusePort}
// listenConn, e = config.ListenPacket(context.Background(), "udp4", listen)
// if e != nil {
// E.Ln(e)
// }
// }
// if send != "" {
// // sendAddr, e = net.ResolveUDPAddr("udp4", send)
// // if e != nil {
// // E.Ln(e)
// // }
// sendConn, e = net.Dial("udp4", send)
// if e != nil {
// Error(err, sendAddr)
// }
// // L.Spew(sendConn)
// }
// var ciph cipher.AEAD
// if ciph, e = gcm.GetCipher(preSharedKey); E.Chk(e) {
// }
// return &Connection{
// maxDatagramSize: maxDatagramSize,
// buffers: make(map[string]*MsgBuffer),
// sendAddress: sendAddr,
// SendConn: sendConn,
// listenAddress: listenAddr,
// listenConn: &listenConn,
// ciph: ciph, // gcm.GetCipher(*cx.Config.MinerPass),
// ctx: ctx,
// mx: &sync.Mutex{},
// }, err
// }
// SetSendConn sets up an outbound connection
func (c *Connection) SetSendConn(ad string) (e error) {
// c.sendAddress, e = net.ResolveUDPAddr("udp4", ad)
// if e != nil {
// // }
var sC net.Conn
if sC, e = net.Dial("udp4", ad); !E.Chk(e) {
c.SendConn = sC
}
return
}
// CreateShards takes a slice of bites and generates 3
func (c *Connection) CreateShards(b, magic []byte) (
shards [][]byte,
e error,
) {
magicLen := 4
// get a nonce for the packet, it is both message ID and salt
nonceLen := c.ciph.NonceSize()
nonce := make([]byte, nonceLen)
if _, e = io.ReadFull(rand.Reader, nonce); E.Chk(e) {
return
}
// generate the shards
if shards, e = fec.Encode(b); E.Chk(e) {
}
for i := range shards {
encryptedShard := c.ciph.Seal(nil, nonce, shards[i], nil)
shardLen := len(encryptedShard)
// assemble the packet: magic, nonce, and encrypted shard
outBytes := make([]byte, shardLen+magicLen+nonceLen)
copy(outBytes, magic[:magicLen])
copy(outBytes[magicLen:], nonce)
copy(outBytes[magicLen+nonceLen:], encryptedShard)
shards[i] = outBytes
}
return
}
func send(shards [][]byte, sendConn net.Conn) (e error) {
for i := range shards {
if _, e = sendConn.Write(shards[i]); E.Chk(e) {
}
}
return
}
func (c *Connection) Send(b, magic []byte) (e error) {
if len(magic) != 4 {
e = errors.New("magic must be 4 bytes long")
return
}
var shards [][]byte
shards, e = c.CreateShards(b, magic)
if e = send(shards, c.SendConn); E.Chk(e) {
}
return
}
func (c *Connection) SendTo(addr *net.UDPAddr, b, magic []byte) (e error) {
if len(magic) != 4 {
if e = errors.New("magic must be 4 bytes long"); E.Chk(e) {
return
}
}
var sendConn *net.UDPConn
if sendConn, e = net.DialUDP("udp", nil, addr); E.Chk(e) {
return
}
var shards [][]byte
if shards, e = c.CreateShards(b, magic); E.Chk(e) {
}
if e = send(shards, sendConn); E.Chk(e) {
}
return
}
func (c *Connection) SendShards(shards [][]byte) (e error) {
if e = send(shards, c.SendConn); E.Chk(e) {
}
return
}
func (c *Connection) SendShardsTo(shards [][]byte, addr *net.UDPAddr) (e error) {
var sendConn *net.UDPConn
if sendConn, e = net.DialUDP("udp", nil, addr); !E.Chk(e) {
if e = send(shards, sendConn); E.Chk(e) {
}
}
return
}
// Listen runs a goroutine that collects and attempts to decode the FEC shards
// once it has enough intact pieces
func (c *Connection) Listen(handlers HandleFunc, ifc interface{}, lastSent *time.Time, firstSender *string,) (e error) {
F.Ln("setting read buffer")
buffer := make([]byte, c.maxDatagramSize)
go func() {
F.Ln("starting connection handler")
out:
// read from socket until context is cancelled
for {
var src net.Addr
var n int
n, src, e = (*c.listenConn).ReadFrom(buffer)
buf := buffer[:n]
if E.Chk(e) {
// Error("ReadFromUDP failed:", e)
continue
}
magic := string(buf[:4])
if _, ok := handlers[magic]; ok {
// if caller needs to know the liveness status of the controller it is working on, the code below
if lastSent != nil && firstSender != nil {
*lastSent = time.Now()
}
nonceBytes := buf[4:16]
nonce := string(nonceBytes)
// decipher
var shard []byte
if shard, e = c.ciph.Open(nil, nonceBytes, buf[16:], nil); E.Chk(e) {
// corrupted or irrelevant message
continue
}
var bn *MsgBuffer
if bn, ok = c.buffers[nonce]; ok {
if !bn.Decoded {
bn.Buffers = append(bn.Buffers, shard)
if len(bn.Buffers) >= 3 {
// try to decode it
var cipherText []byte
if cipherText, e = fec.Decode(bn.Buffers); E.Chk(e) {
continue
}
bn.Decoded = true
if e = handlers[magic](ifc)(cipherText); E.Chk(e) {
continue
}
}
} else {
for i := range c.buffers {
if i != nonce {
// superseded messages can be deleted from the buffers, we don't add more data
// for the already decoded.
// F.Ln("deleting superseded buffer", hex.EncodeToString([]byte(i)))
delete(c.buffers, i)
}
}
}
} else {
// F.Ln("new message arriving",
// hex.EncodeToString([]byte(nonce)))
c.buffers[nonce] = &MsgBuffer{
[][]byte{},
time.Now(), false, src,
}
c.buffers[nonce].Buffers = append(
c.buffers[nonce].
Buffers, shard,
)
}
}
select {
case <-c.ctx.Done():
break out
default:
}
}
}()
return
}
//
// func GetUDPAddr(address string) (sendAddr *net.UDPAddr) {
// sendHost, sendPort, e := net.SplitHostPort(address)
// if e != nil {
// // return
// }
// sendPortI, e := strconv.ParseInt(sendPort, 10, 64)
// if e != nil {
// // return
// }
// sendAddr = &net.UDPAddr{IP: net.ParseIP(sendHost),
// Port: int(sendPortI)}
// // D.Ln("multicast", Address)
// // L.Spew(sendAddr)
// return
// }