forked from tendermint/tendermint
/
transport_mconn.go
479 lines (433 loc) · 12.7 KB
/
transport_mconn.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
package p2p
import (
"context"
"errors"
"fmt"
"io"
"math"
"net"
"strconv"
"sync"
"golang.org/x/net/netutil"
"github.com/providenetwork/tendermint/crypto"
"github.com/providenetwork/tendermint/internal/libs/protoio"
"github.com/providenetwork/tendermint/internal/p2p/conn"
"github.com/providenetwork/tendermint/libs/log"
p2pproto "github.com/providenetwork/tendermint/proto/tendermint/p2p"
"github.com/providenetwork/tendermint/types"
)
const (
MConnProtocol Protocol = "mconn"
TCPProtocol Protocol = "tcp"
)
// MConnTransportOptions sets options for MConnTransport.
type MConnTransportOptions struct {
// MaxAcceptedConnections is the maximum number of simultaneous accepted
// (incoming) connections. Beyond this, new connections will block until
// a slot is free. 0 means unlimited.
//
// FIXME: We may want to replace this with connection accounting in the
// Router, since it will need to do e.g. rate limiting and such as well.
// But it might also make sense to have per-transport limits.
MaxAcceptedConnections uint32
}
// MConnTransport is a Transport implementation using the current multiplexed
// Tendermint protocol ("MConn").
type MConnTransport struct {
logger log.Logger
options MConnTransportOptions
mConnConfig conn.MConnConfig
channelDescs []*ChannelDescriptor
closeCh chan struct{}
closeOnce sync.Once
listener net.Listener
}
// NewMConnTransport sets up a new MConnection transport. This uses the
// proprietary Tendermint MConnection protocol, which is implemented as
// conn.MConnection.
func NewMConnTransport(
logger log.Logger,
mConnConfig conn.MConnConfig,
channelDescs []*ChannelDescriptor,
options MConnTransportOptions,
) *MConnTransport {
return &MConnTransport{
logger: logger,
options: options,
mConnConfig: mConnConfig,
closeCh: make(chan struct{}),
channelDescs: channelDescs,
}
}
// String implements Transport.
func (m *MConnTransport) String() string {
return string(MConnProtocol)
}
// Protocols implements Transport. We support tcp for backwards-compatibility.
func (m *MConnTransport) Protocols() []Protocol {
return []Protocol{MConnProtocol, TCPProtocol}
}
// Endpoints implements Transport.
func (m *MConnTransport) Endpoints() []Endpoint {
if m.listener == nil {
return []Endpoint{}
}
select {
case <-m.closeCh:
return []Endpoint{}
default:
}
endpoint := Endpoint{
Protocol: MConnProtocol,
}
if addr, ok := m.listener.Addr().(*net.TCPAddr); ok {
endpoint.IP = addr.IP
endpoint.Port = uint16(addr.Port)
}
return []Endpoint{endpoint}
}
// Listen asynchronously listens for inbound connections on the given endpoint.
// It must be called exactly once before calling Accept(), and the caller must
// call Close() to shut down the listener.
//
// FIXME: Listen currently only supports listening on a single endpoint, it
// might be useful to support listening on multiple addresses (e.g. IPv4 and
// IPv6, or a private and public address) via multiple Listen() calls.
func (m *MConnTransport) Listen(endpoint Endpoint) error {
if m.listener != nil {
return errors.New("transport is already listening")
}
if err := m.validateEndpoint(endpoint); err != nil {
return err
}
listener, err := net.Listen("tcp", net.JoinHostPort(
endpoint.IP.String(), strconv.Itoa(int(endpoint.Port))))
if err != nil {
return err
}
if m.options.MaxAcceptedConnections > 0 {
// FIXME: This will establish the inbound connection but simply hang it
// until another connection is released. It would probably be better to
// return an error to the remote peer or close the connection. This is
// also a DoS vector since the connection will take up kernel resources.
// This was just carried over from the legacy P2P stack.
listener = netutil.LimitListener(listener, int(m.options.MaxAcceptedConnections))
}
m.listener = listener
return nil
}
// Accept implements Transport.
func (m *MConnTransport) Accept() (Connection, error) {
if m.listener == nil {
return nil, errors.New("transport is not listening")
}
tcpConn, err := m.listener.Accept()
if err != nil {
select {
case <-m.closeCh:
return nil, io.EOF
default:
return nil, err
}
}
return newMConnConnection(m.logger, tcpConn, m.mConnConfig, m.channelDescs), nil
}
// Dial implements Transport.
func (m *MConnTransport) Dial(ctx context.Context, endpoint Endpoint) (Connection, error) {
if err := m.validateEndpoint(endpoint); err != nil {
return nil, err
}
if endpoint.Port == 0 {
endpoint.Port = 26657
}
dialer := net.Dialer{}
tcpConn, err := dialer.DialContext(ctx, "tcp", net.JoinHostPort(
endpoint.IP.String(), strconv.Itoa(int(endpoint.Port))))
if err != nil {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
return nil, err
}
}
return newMConnConnection(m.logger, tcpConn, m.mConnConfig, m.channelDescs), nil
}
// Close implements Transport.
func (m *MConnTransport) Close() error {
var err error
m.closeOnce.Do(func() {
close(m.closeCh) // must be closed first, to handle error in Accept()
if m.listener != nil {
err = m.listener.Close()
}
})
return err
}
// SetChannels sets the channel descriptors to be used when
// establishing a connection.
//
// FIXME: To be removed when the legacy p2p stack is removed. Channel
// descriptors should be managed by the router. The underlying transport and
// connections should be agnostic to everything but the channel ID's which are
// initialized in the handshake.
func (m *MConnTransport) AddChannelDescriptors(channelDesc []*ChannelDescriptor) {
m.channelDescs = append(m.channelDescs, channelDesc...)
}
// validateEndpoint validates an endpoint.
func (m *MConnTransport) validateEndpoint(endpoint Endpoint) error {
if err := endpoint.Validate(); err != nil {
return err
}
if endpoint.Protocol != MConnProtocol && endpoint.Protocol != TCPProtocol {
return fmt.Errorf("unsupported protocol %q", endpoint.Protocol)
}
if len(endpoint.IP) == 0 {
return errors.New("endpoint has no IP address")
}
if endpoint.Path != "" {
return fmt.Errorf("endpoints with path not supported (got %q)", endpoint.Path)
}
return nil
}
// mConnConnection implements Connection for MConnTransport.
type mConnConnection struct {
logger log.Logger
conn net.Conn
mConnConfig conn.MConnConfig
channelDescs []*ChannelDescriptor
receiveCh chan mConnMessage
errorCh chan error
closeCh chan struct{}
closeOnce sync.Once
mconn *conn.MConnection // set during Handshake()
}
// mConnMessage passes MConnection messages through internal channels.
type mConnMessage struct {
channelID ChannelID
payload []byte
}
// newMConnConnection creates a new mConnConnection.
func newMConnConnection(
logger log.Logger,
conn net.Conn,
mConnConfig conn.MConnConfig,
channelDescs []*ChannelDescriptor,
) *mConnConnection {
return &mConnConnection{
logger: logger,
conn: conn,
mConnConfig: mConnConfig,
channelDescs: channelDescs,
receiveCh: make(chan mConnMessage),
errorCh: make(chan error, 1), // buffered to avoid onError leak
closeCh: make(chan struct{}),
}
}
// Handshake implements Connection.
func (c *mConnConnection) Handshake(
ctx context.Context,
nodeInfo types.NodeInfo,
privKey crypto.PrivKey,
) (types.NodeInfo, crypto.PubKey, error) {
var (
mconn *conn.MConnection
peerInfo types.NodeInfo
peerKey crypto.PubKey
errCh = make(chan error, 1)
)
// To handle context cancellation, we need to do the handshake in a
// goroutine and abort the blocking network calls by closing the connection
// when the context is canceled.
go func() {
// FIXME: Since the MConnection code panics, we need to recover it and turn it
// into an error. We should remove panics instead.
defer func() {
if r := recover(); r != nil {
errCh <- fmt.Errorf("recovered from panic: %v", r)
}
}()
var err error
mconn, peerInfo, peerKey, err = c.handshake(ctx, nodeInfo, privKey)
errCh <- err
}()
select {
case <-ctx.Done():
_ = c.Close()
return types.NodeInfo{}, nil, ctx.Err()
case err := <-errCh:
if err != nil {
return types.NodeInfo{}, nil, err
}
c.mconn = mconn
c.logger = mconn.Logger
if err = c.mconn.Start(); err != nil {
return types.NodeInfo{}, nil, err
}
return peerInfo, peerKey, nil
}
}
// handshake is a helper for Handshake, simplifying error handling so we can
// keep context handling and panic recovery in Handshake. It returns an
// unstarted but handshaked MConnection, to avoid concurrent field writes.
func (c *mConnConnection) handshake(
ctx context.Context,
nodeInfo types.NodeInfo,
privKey crypto.PrivKey,
) (*conn.MConnection, types.NodeInfo, crypto.PubKey, error) {
if c.mconn != nil {
return nil, types.NodeInfo{}, nil, errors.New("connection is already handshaked")
}
secretConn, err := conn.MakeSecretConnection(c.conn, privKey)
if err != nil {
return nil, types.NodeInfo{}, nil, err
}
var pbPeerInfo p2pproto.NodeInfo
errCh := make(chan error, 2)
go func() {
_, err := protoio.NewDelimitedWriter(secretConn).WriteMsg(nodeInfo.ToProto())
errCh <- err
}()
go func() {
_, err := protoio.NewDelimitedReader(secretConn, types.MaxNodeInfoSize()).ReadMsg(&pbPeerInfo)
errCh <- err
}()
for i := 0; i < cap(errCh); i++ {
if err = <-errCh; err != nil {
return nil, types.NodeInfo{}, nil, err
}
}
peerInfo, err := types.NodeInfoFromProto(&pbPeerInfo)
if err != nil {
return nil, types.NodeInfo{}, nil, err
}
mconn := conn.NewMConnectionWithConfig(
secretConn,
c.channelDescs,
c.onReceive,
c.onError,
c.mConnConfig,
)
mconn.SetLogger(c.logger.With("peer", c.RemoteEndpoint().NodeAddress(peerInfo.NodeID)))
return mconn, peerInfo, secretConn.RemotePubKey(), nil
}
// onReceive is a callback for MConnection received messages.
func (c *mConnConnection) onReceive(chID byte, payload []byte) {
select {
case c.receiveCh <- mConnMessage{channelID: ChannelID(chID), payload: payload}:
case <-c.closeCh:
}
}
// onError is a callback for MConnection errors. The error is passed via errorCh
// to ReceiveMessage (but not SendMessage, for legacy P2P stack behavior).
func (c *mConnConnection) onError(e interface{}) {
err, ok := e.(error)
if !ok {
err = fmt.Errorf("%v", err)
}
// We have to close the connection here, since MConnection will have stopped
// the service on any errors.
_ = c.Close()
select {
case c.errorCh <- err:
case <-c.closeCh:
}
}
// String displays connection information.
func (c *mConnConnection) String() string {
return c.RemoteEndpoint().String()
}
// SendMessage implements Connection.
func (c *mConnConnection) SendMessage(chID ChannelID, msg []byte) (bool, error) {
if chID > math.MaxUint8 {
return false, fmt.Errorf("MConnection only supports 1-byte channel IDs (got %v)", chID)
}
select {
case err := <-c.errorCh:
return false, err
case <-c.closeCh:
return false, io.EOF
default:
return c.mconn.Send(byte(chID), msg), nil
}
}
// TrySendMessage implements Connection.
func (c *mConnConnection) TrySendMessage(chID ChannelID, msg []byte) (bool, error) {
if chID > math.MaxUint8 {
return false, fmt.Errorf("MConnection only supports 1-byte channel IDs (got %v)", chID)
}
select {
case err := <-c.errorCh:
return false, err
case <-c.closeCh:
return false, io.EOF
default:
return c.mconn.TrySend(byte(chID), msg), nil
}
}
// ReceiveMessage implements Connection.
func (c *mConnConnection) ReceiveMessage() (ChannelID, []byte, error) {
select {
case err := <-c.errorCh:
return 0, nil, err
case <-c.closeCh:
return 0, nil, io.EOF
case msg := <-c.receiveCh:
return msg.channelID, msg.payload, nil
}
}
// LocalEndpoint implements Connection.
func (c *mConnConnection) LocalEndpoint() Endpoint {
endpoint := Endpoint{
Protocol: MConnProtocol,
}
if addr, ok := c.conn.LocalAddr().(*net.TCPAddr); ok {
endpoint.IP = addr.IP
endpoint.Port = uint16(addr.Port)
}
return endpoint
}
// RemoteEndpoint implements Connection.
func (c *mConnConnection) RemoteEndpoint() Endpoint {
endpoint := Endpoint{
Protocol: MConnProtocol,
}
if addr, ok := c.conn.RemoteAddr().(*net.TCPAddr); ok {
endpoint.IP = addr.IP
endpoint.Port = uint16(addr.Port)
}
return endpoint
}
// Status implements Connection.
func (c *mConnConnection) Status() conn.ConnectionStatus {
if c.mconn == nil {
return conn.ConnectionStatus{}
}
return c.mconn.Status()
}
// Close implements Connection.
func (c *mConnConnection) Close() error {
var err error
c.closeOnce.Do(func() {
if c.mconn != nil && c.mconn.IsRunning() {
err = c.mconn.Stop()
} else {
err = c.conn.Close()
}
close(c.closeCh)
})
return err
}
// FlushClose implements Connection.
func (c *mConnConnection) FlushClose() error {
var err error
c.closeOnce.Do(func() {
if c.mconn != nil && c.mconn.IsRunning() {
c.mconn.FlushStop()
} else {
err = c.conn.Close()
}
close(c.closeCh)
})
return err
}