/
packet_transport.go
102 lines (93 loc) · 3 KB
/
packet_transport.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
package metcd
import (
"net"
"github.com/coreos/etcd/raft/raftpb"
"github.com/weaveworks/mesh"
"github.com/weaveworks/mesh/meshconn"
)
// packetTransport takes ownership of the net.PacketConn.
// Incoming messages are unmarshaled from the conn and send to incomingc.
// Outgoing messages are received from outgoingc and marshaled to the conn.
type packetTransport struct {
conn net.PacketConn
translate peerTranslator
incomingc chan<- raftpb.Message // to controller
outgoingc <-chan raftpb.Message // from controller
unreachablec chan<- uint64 // to controller
logger mesh.Logger
}
func newPacketTransport(
conn net.PacketConn,
translate peerTranslator,
incomingc chan<- raftpb.Message,
outgoingc <-chan raftpb.Message,
unreachablec chan<- uint64,
logger mesh.Logger,
) *packetTransport {
t := &packetTransport{
conn: conn,
translate: translate,
incomingc: incomingc,
outgoingc: outgoingc,
unreachablec: unreachablec,
logger: logger,
}
go t.recvLoop()
go t.sendLoop()
return t
}
type peerTranslator func(uid mesh.PeerUID) (mesh.PeerName, error)
func (t *packetTransport) stop() {
t.conn.Close()
}
func (t *packetTransport) recvLoop() {
defer t.logger.Printf("packet transport: recv loop exit")
const maxRecvLen = 8192
b := make([]byte, maxRecvLen)
for {
n, remote, err := t.conn.ReadFrom(b)
if err != nil {
t.logger.Printf("packet transport: recv: %v (aborting)", err)
return
} else if n >= cap(b) {
t.logger.Printf("packet transport: recv from %s: short read, %d >= %d (continuing)", remote, n, cap(b))
continue
}
var msg raftpb.Message
if err := msg.Unmarshal(b[:n]); err != nil {
t.logger.Printf("packet transport: recv from %s (sz %d): %v (%s) (continuing)", remote, n, err, b[:n])
continue
}
//t.logger.Printf("packet transport: recv from %s (sz %d/%d) OK", remote, n, msg.Size())
t.incomingc <- msg
}
}
func (t *packetTransport) sendLoop() {
defer t.logger.Printf("packet transport: send loop exit")
for msg := range t.outgoingc {
b, err := msg.Marshal()
if err != nil {
t.logger.Printf("packet transport: send to Raft ID %x: %v (continuing)", msg.To, err)
continue
}
peerName, err := t.translate(mesh.PeerUID(msg.To))
if err != nil {
select {
case t.unreachablec <- msg.To:
t.logger.Printf("packet transport: send to Raft ID %x: %v (unreachable; continuing) (%s)", msg.To, err, msg.Type)
default:
t.logger.Printf("packet transport: send to Raft ID %x: %v (unreachable, report dropped; continuing) (%s)", msg.To, err, msg.Type)
}
continue
}
dst := meshconn.MeshAddr{PeerName: peerName}
if n, err := t.conn.WriteTo(b, dst); err != nil {
t.logger.Printf("packet transport: send to Mesh peer %s: %v (continuing)", dst, err)
continue
} else if n < len(b) {
t.logger.Printf("packet transport: send to Mesh peer %s: short write, %d < %d (continuing)", dst, n, len(b))
continue
}
//t.logger.Printf("packet transport: send to %s (sz %d/%d) OK", dst, msg.Size(), len(b))
}
}