-
Notifications
You must be signed in to change notification settings - Fork 212
/
udp.go
285 lines (223 loc) · 7.75 KB
/
udp.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
package p2p
import (
"context"
"errors"
"fmt"
"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/p2p/config"
"github.com/spacemeshos/go-spacemesh/p2p/connectionpool"
"github.com/spacemeshos/go-spacemesh/p2p/version"
"net"
"time"
"github.com/spacemeshos/go-spacemesh/log"
inet "github.com/spacemeshos/go-spacemesh/p2p/net"
"github.com/spacemeshos/go-spacemesh/p2p/node"
"github.com/spacemeshos/go-spacemesh/p2p/p2pcrypto"
"github.com/spacemeshos/go-spacemesh/p2p/service"
)
// Lookuper is a service used to lookup for nodes we know already
type Lookuper func(key p2pcrypto.PublicKey) (*node.Info, error)
type udpNetwork interface {
Shutdown()
Dial(ctx context.Context, address net.Addr, remotePublicKey p2pcrypto.PublicKey) (inet.Connection, error)
IncomingMessages() chan inet.IncomingMessageEvent
SubscribeOnNewRemoteConnections(f func(event inet.NewConnectionEvent))
SubscribeClosingConnections(f func(connection inet.ConnectionWithErr))
}
// UDPMux is a server for receiving and sending udp messages through protocols.
type UDPMux struct {
logger log.Log
local node.LocalNode
networkid int8
cpool cPool
lookuper Lookuper
network udpNetwork
messages map[string]chan service.DirectMessage
shutdown chan struct{}
}
// NewUDPMux creates a new udp protocol server
func NewUDPMux(localNode node.LocalNode, lookuper Lookuper, udpNet udpNetwork, networkid int8, logger log.Log) *UDPMux {
cpool := connectionpool.NewConnectionPool(udpNet.Dial, localNode.PublicKey(), logger.WithName("udp_cpool"))
um := &UDPMux{
logger: logger,
local: localNode,
networkid: networkid,
lookuper: lookuper,
network: udpNet,
cpool: cpool,
messages: make(map[string]chan service.DirectMessage),
shutdown: make(chan struct{}, 1),
}
udpNet.SubscribeOnNewRemoteConnections(func(event inet.NewConnectionEvent) {
err := cpool.OnNewConnection(event)
if err != nil {
um.logger.Warning("error adding udp connection to cpool err=%c", err)
}
})
udpNet.SubscribeClosingConnections(func(connection inet.ConnectionWithErr) {
cpool.OnClosedConnection(connection)
})
return um
}
// Start starts the UDPMux
func (mux *UDPMux) Start() error {
go mux.listenToNetworkMessage()
return nil
}
// Shutdown closes the server
func (mux *UDPMux) Shutdown() {
close(mux.shutdown)
mux.network.Shutdown()
mux.cpool.Shutdown()
}
// listens to messages from the network layer and handles them.
func (mux *UDPMux) listenToNetworkMessage() {
msgChan := mux.network.IncomingMessages()
for {
select {
case msg, ok := <-msgChan:
if !ok {
// closed
return
}
go func(event inet.IncomingMessageEvent) {
err := mux.processUDPMessage(event)
if err != nil {
mux.logger.Error("Error handing network message err=%v", err)
// todo: blacklist ?
}
}(msg)
case <-mux.shutdown:
return
}
}
}
// Note: for now udp is only direct.
// todo: no need to return chan, but for now stay consistent with api
// RegisterDirectProtocolWithChannel registers a protocol on a channel, should be done before `Start` was called. not thread-safe
func (mux *UDPMux) RegisterDirectProtocolWithChannel(name string, c chan service.DirectMessage) chan service.DirectMessage {
mux.messages[name] = c
return c
}
// ProcessDirectProtocolMessage passes a message to the protocol.
func (mux *UDPMux) ProcessDirectProtocolMessage(sender p2pcrypto.PublicKey, protocol string, data service.Data, metadata service.P2PMetadata) error {
// route authenticated message to the registered protocol
msgchan := mux.messages[protocol]
if msgchan == nil {
return errors.New("no protocol")
}
msgchan <- &udpProtocolMessage{metadata, sender, data}
return nil
}
// SendWrappedMessage is a proxy method to the sendMessageImpl. it sends a wrapped message and used within MessageServer
func (mux *UDPMux) SendWrappedMessage(nodeID p2pcrypto.PublicKey, protocol string, payload *service.DataMsgWrapper) error {
return mux.sendMessageImpl(nodeID, protocol, payload)
}
// SendMessage is a proxy method to the sendMessageImpl.
func (mux *UDPMux) SendMessage(peerPubkey p2pcrypto.PublicKey, protocol string, payload []byte) error {
return mux.sendMessageImpl(peerPubkey, protocol, service.DataBytes{Payload: payload})
}
// sendMessageImpl finds the peer address, wraps the message as a protocol message with p2p metadata and sends it.
// it handles looking up the peer ip address and encrypting the message.
func (mux *UDPMux) sendMessageImpl(peerPubkey p2pcrypto.PublicKey, protocol string, payload service.Data) error {
var err error
var peer *node.Info
peer, err = mux.lookuper(peerPubkey)
if err != nil {
return err
}
addr := &net.UDPAddr{IP: net.ParseIP(peer.IP.String()), Port: int(peer.DiscoveryPort)}
conn, err := mux.cpool.GetConnection(addr, peer.PublicKey())
if err != nil {
return err
}
session := conn.Session()
if session == nil {
return ErrNoSession
}
mt := ProtocolMessageMetadata{protocol,
config.ClientVersion,
time.Now().UnixNano(),
mux.local.PublicKey().Bytes(),
int32(mux.networkid),
}
message := ProtocolMessage{
Metadata: &mt,
}
message.Payload, err = CreatePayload(payload)
if err != nil {
return fmt.Errorf("can't create payload, err:%v", err)
}
data, err := types.InterfaceToBytes(&message)
if err != nil {
return fmt.Errorf("failed to encode signed message err: %v", err)
}
// TODO: node.address should have IP address, UDP and TCP PORT.
// for now assuming it's the same port for both.
final := session.SealMessage(data)
realfinal := p2pcrypto.PrependPubkey(final, mux.local.PublicKey())
err = conn.Send(realfinal)
if err != nil {
return err
}
mux.logger.With().Debug("Sent UDP message", log.String("protocol", protocol), log.String("to", peer.String()), log.Int("len", len(realfinal)))
return nil
}
type udpProtocolMessage struct {
meta service.P2PMetadata
sender p2pcrypto.PublicKey
msg service.Data
}
func (upm *udpProtocolMessage) Sender() p2pcrypto.PublicKey {
return upm.sender
}
func (upm *udpProtocolMessage) Metadata() service.P2PMetadata {
return upm.meta
}
func (upm *udpProtocolMessage) Bytes() []byte {
return upm.msg.Bytes()
}
func (upm *udpProtocolMessage) Data() service.Data {
return upm.msg
}
// processUDPMessage processes a udp message received and passes it to the protocol, it adds related p2p metadata.
func (mux *UDPMux) processUDPMessage(msg inet.IncomingMessageEvent) error {
if msg.Message == nil || msg.Conn == nil {
return ErrBadFormat1
}
// protocol messages are encrypted in payload
// Locate the session
session := msg.Conn.Session()
if session == nil {
return ErrNoSession
}
rawmsg, _, err := p2pcrypto.ExtractPubkey(msg.Message)
if err != nil {
return err
}
decPayload, err := session.OpenMessage(rawmsg)
if err != nil {
mux.logger.Warning("failed decrypting message err=%v", err)
return ErrFailDecrypt
}
pm := &ProtocolMessage{}
err = types.BytesToInterface(decPayload, pm)
if err != nil {
mux.logger.Error("deserialization err=", err)
return ErrBadFormat2
}
if pm.Metadata.NetworkID != int32(mux.networkid) {
// todo: tell net to blacklist the ip or sender ?
return fmt.Errorf("wrong NetworkID, want: %v, got: %v", mux.networkid, pm.Metadata.NetworkID)
}
if t, err := version.CheckNodeVersion(pm.Metadata.ClientVersion, config.MinClientVersion); err != nil || !t {
return fmt.Errorf("wrong client version want at least: %v, got: %v, err: %v", config.MinClientVersion, pm.Metadata.ClientVersion, err)
}
var data service.Data
data, err = ExtractData(pm.Payload)
if err != nil {
return fmt.Errorf("failed extracting data from message err:%v", err)
}
p2pmeta := service.P2PMetadata{FromAddress: msg.Conn.RemoteAddr()}
return mux.ProcessDirectProtocolMessage(msg.Conn.RemotePublicKey(), pm.Metadata.NextProtocol, data, p2pmeta)
}