Skip to content
Permalink
Browse files

P2P - limit connections and swarm neighbors (#719)

* limit number of connections on network
* add session-timeout, max-pending-connections, target-outbound and max-inbound config params
* limit swarm incoming peers
  • Loading branch information...
y0sher committed Mar 21, 2019
1 parent 4c71329 commit 705ac1caeb60ac5212261bee3d77a9ec1858c185
Showing with 240 additions and 66 deletions.
  1. +8 −2 cmd/root.go
  2. +4 −0 config.toml
  3. +26 −22 p2p/config/config.go
  4. +35 −7 p2p/net/conn.go
  5. +18 −13 p2p/net/conn_test.go
  6. +47 −20 p2p/net/network.go
  7. +69 −0 p2p/net/network_test.go
  8. +17 −2 p2p/swarm.go
  9. +16 −0 p2p/swarm_test.go
@@ -34,8 +34,6 @@ func AddCommands(cmd *cobra.Command) {
cmd.PersistentFlags().IntVar(&config.LayerAvgSize, "layer-average-size",
config.LayerDurationSec, "Duration between layers in seconds")
/** ======================== P2P Flags ========================== **/
cmd.PersistentFlags().IntVar(&config.P2P.SecurityParam, "security-param",
config.P2P.SecurityParam, "Consensus protocol k security param")
cmd.PersistentFlags().IntVar(&config.P2P.TCPPort, "tcp-port",
config.P2P.TCPPort, "TCP Port to listen on")
cmd.PersistentFlags().DurationVar(&config.P2P.DialTimeout, "dial-timeout",
@@ -46,12 +44,20 @@ func AddCommands(cmd *cobra.Command) {
config.P2P.NetworkID, "NetworkID to run on (0 - mainnet, 1 - testnet)")
cmd.PersistentFlags().DurationVar(&config.P2P.ResponseTimeout, "response-timeout",
config.P2P.ResponseTimeout, "Timeout for waiting on resposne message")
cmd.PersistentFlags().DurationVar(&config.P2P.SessionTimeout, "session-timeout",
config.P2P.SessionTimeout, "Timeout for waiting on session message")
cmd.PersistentFlags().StringVar(&config.P2P.NodeID, "node-id",
config.P2P.NodeID, "Load node data by id (pub key) from local store")
cmd.PersistentFlags().BoolVar(&config.P2P.NewNode, "new-node",
config.P2P.NewNode, "Load node data by id (pub key) from local store")
cmd.PersistentFlags().IntVar(&config.P2P.BufferSize, "buffer-size",
config.P2P.BufferSize, "Size of the messages handler's buffer")
cmd.PersistentFlags().IntVar(&config.P2P.MaxPendingConnections, "max-pending-connections",
config.P2P.MaxPendingConnections, "The maximum number of pending connections")
cmd.PersistentFlags().IntVar(&config.P2P.OutboundPeersTarget, "outbound-target",
config.P2P.OutboundPeersTarget, "The outbound peer target we're trying to connect")
cmd.PersistentFlags().IntVar(&config.P2P.MaxInboundPeers, "max-inbound",
config.P2P.MaxInboundPeers, "The maximum number of inbound peers ")
cmd.PersistentFlags().BoolVar(&config.P2P.SwarmConfig.Gossip, "gossip",
config.P2P.SwarmConfig.Gossip, "should we start a gossiping node?")
cmd.PersistentFlags().BoolVar(&config.P2P.SwarmConfig.Bootstrap, "bootstrap",
@@ -25,6 +25,10 @@ dial-timeout = "1m"
conn-keepalive = "48h"
network-id = 1 # 0 - MainNet, 1 - TestNet
response-timeout = "2s"
session-timeout = "2s"
max-pending-connections = 50
target-outbound = 10
max-inbound = 100
buffer-size = 100

# Node Swarm Config
@@ -26,17 +26,19 @@ func duration(duration string) (dur time.Duration) {

// Config defines the configuration options for the Spacemesh peer-to-peer networking layer
type Config struct {
SecurityParam int `mapstructure:"security-param"`
FastSync bool `mapstructure:"fast-sync"`
TCPPort int `mapstructure:"tcp-port"`
NodeID string `mapstructure:"node-id"`
NewNode bool `mapstructure:"new-node"`
DialTimeout time.Duration `mapstructure:"dial-timeout"`
ConnKeepAlive time.Duration `mapstructure:"conn-keepalive"`
NetworkID int8 `mapstructure:"network-id"`
ResponseTimeout time.Duration `mapstructure:"response-timeout"`
SwarmConfig SwarmConfig `mapstructure:"swarm"`
BufferSize int `mapstructure:"buffer-size"`
TCPPort int `mapstructure:"tcp-port"`
NodeID string `mapstructure:"node-id"`
NewNode bool `mapstructure:"new-node"`
DialTimeout time.Duration `mapstructure:"dial-timeout"`
ConnKeepAlive time.Duration `mapstructure:"conn-keepalive"`
NetworkID int8 `mapstructure:"network-id"`
ResponseTimeout time.Duration `mapstructure:"response-timeout"`
SessionTimeout time.Duration `mapstructure:"session-timeout"`
MaxPendingConnections int `mapstructure:"max-pending-connections"`
OutboundPeersTarget int `mapstructure:"outbound-target"`
MaxInboundPeers int `mapstructure:"max-inbound"`
SwarmConfig SwarmConfig `mapstructure:"swarm"`
BufferSize int `mapstructure:"buffer-size"`
}

// SwarmConfig specifies swarm config params.
@@ -64,16 +66,18 @@ func DefaultConfig() Config {
}

return Config{
SecurityParam: 20,
FastSync: true,
TCPPort: 7513,
NodeID: "",
NewNode: false,
DialTimeout: duration("1m"),
ConnKeepAlive: duration("48h"),
NetworkID: TestNet,
ResponseTimeout: duration("15s"),
SwarmConfig: SwarmConfigValues,
BufferSize: 100,
TCPPort: 7513,
NodeID: "",
NewNode: false,
DialTimeout: duration("1m"),
ConnKeepAlive: duration("48h"),
NetworkID: TestNet,
ResponseTimeout: duration("15s"),
SessionTimeout: duration("5s"),
MaxPendingConnections: 100,
OutboundPeersTarget: 10,
MaxInboundPeers: 100,
SwarmConfig: SwarmConfigValues,
BufferSize: 100,
}
}
@@ -177,6 +177,36 @@ func (c *FormattedConnection) shutdown(err error) {
c.formatter.Close()
}

var ErrTriedToSetupExistingConn = errors.New("tried to setup existing connection")
var ErrIncomingSessionTimeout = errors.New("timeout waiting for handshake message")

func (c *FormattedConnection) setupIncoming(timeout time.Duration) error {
var err error
tm := time.NewTimer(timeout)
select {
case msg, ok := <-c.formatter.In():
if !ok { // chan closed
err = ErrClosedIncomingChannel
break
}

if c.session == nil {
err = c.networker.HandlePreSessionIncomingMessage(c, msg)
if err != nil {
break
}
return nil
}
err = ErrTriedToSetupExistingConn
break
case <-tm.C:
err = ErrIncomingSessionTimeout
}

c.Close()
return err
}

// Push outgoing message to the connections
// Read from the incoming new messages and send down the connection
func (c *FormattedConnection) beginEventProcessing() {
@@ -193,15 +223,13 @@ Loop:
}

if c.session == nil {
err = c.networker.HandlePreSessionIncomingMessage(c, msg)
if err != nil {
break Loop
}
} else {
metrics.PeerRecv.With(metrics.PeerIdLabel, c.remotePub.String()).Add(float64(len(msg)))
c.publish(msg)
err = ErrTriedToSetupExistingConn
break Loop
}

metrics.PeerRecv.With(metrics.PeerIdLabel, c.remotePub.String()).Add(float64(len(msg)))
c.publish(msg)

case <-c.closeChan:
err = ErrConnectionClosed
break Loop
@@ -6,6 +6,7 @@ import (
"github.com/spacemeshos/go-spacemesh/p2p/delimited"
"github.com/spacemeshos/go-spacemesh/p2p/p2pcrypto"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"net"
"runtime"
"sync"
@@ -72,10 +73,23 @@ func TestPreSessionMessage(t *testing.T) {
rPub := p2pcrypto.NewRandomPubkey()
formatter := delimited.NewChan(10)
conn := newConnection(rwcam, netw, formatter, rPub, nil, netw.logger)
go conn.beginEventProcessing()
rwcam.SetReadResult([]byte{3, 1, 1, 1}, nil)
err := conn.setupIncoming(time.Second)
require.NoError(t, err)
require.Equal(t, conn.Closed(), false)
require.Equal(t, int32(1), netw.PreSessionCount())
}

func TestPreSessionMessageAfterSession(t *testing.T) {
netw := NewNetworkMock()
rwcam := NewReadWriteCloseAddresserMock()
rPub := p2pcrypto.NewRandomPubkey()
formatter := delimited.NewChan(10)
conn := newConnection(rwcam, netw, formatter, rPub, nil, netw.logger)
rwcam.SetReadResult([]byte{3, 1, 1, 1}, nil)
go conn.beginEventProcessing()
time.Sleep(50 * time.Millisecond)
assert.Equal(t, int32(1), netw.PreSessionCount())
assert.Equal(t, conn.Closed(), true)
}

func TestPreSessionError(t *testing.T) {
@@ -86,17 +100,8 @@ func TestPreSessionError(t *testing.T) {
conn := newConnection(rwcam, netw, formatter, rPub, nil, netw.logger)
netw.SetPreSessionResult(fmt.Errorf("fail"))

var wg sync.WaitGroup
wg.Add(1)
netw.SubscribeClosingConnections(func(closedConn Connection) {
assert.Equal(t, conn.id, closedConn.ID())
wg.Done()
})

go conn.beginEventProcessing()
rwcam.SetReadResult([]byte{3, 1, 1, 1}, nil)
wg.Wait()
assert.Equal(t, int32(1), netw.PreSessionCount())
err := conn.setupIncoming(time.Second)
require.Error(t, err)
}

func TestErrClose(t *testing.T) {
@@ -50,8 +50,8 @@ type Net struct {
localNode *node.LocalNode
logger log.Log

tcpListener net.Listener
tcpListenAddress *net.TCPAddr // Address to open connection: localhost:9999\
listener net.Listener
listenAddress *net.TCPAddr // Address to open connection: localhost:9999\
isShuttingDown bool

@@ -89,7 +89,7 @@ func NewNet(conf config.Config, localEntity *node.LocalNode) (*Net, error) {
networkID: conf.NetworkID,
localNode: localEntity,
logger: localEntity.Log,
tcpListenAddress: tcpAddress,
listenAddress: tcpAddress,
regNewRemoteConn: make([]func(NewConnectionEvent), 0, 3),
closingConnections: make([]func(Connection), 0, 3),
queuesCount: qcount,
@@ -101,17 +101,16 @@ func NewNet(conf config.Config, localEntity *node.LocalNode) (*Net, error) {
n.incomingMessagesQueue[imq] = make(chan IncomingMessageEvent, qsize)
}

err = n.listen()

if err != nil {
return nil, err
}

n.logger.Debug("created network with tcp address: %s", n.tcpListenAddress)
n.logger.Debug("created network with tcp address: %s", n.listenAddress)

return n, nil
}

func (n *Net) Start() error { // todo: maybe add context
err := n.listen(n.newTcpListener)
return err
}

// Logger returns a reference to logger
func (n *Net) Logger() log.Log {
return n.logger
@@ -204,7 +203,7 @@ func (n *Net) createSecuredConnection(address string, remotePubkey p2pcrypto.Pub
return nil, err
}

handshakeMessage, err := generateHandshakeMessage(session, n.networkID, n.tcpListenAddress.Port, n.localNode.PublicKey())
handshakeMessage, err := generateHandshakeMessage(session, n.networkID, n.listenAddress.Port, n.localNode.PublicKey())
if err != nil {
conn.Close()
return nil, err
@@ -239,25 +238,45 @@ func (n *Net) Dial(address string, remotePubkey p2pcrypto.PublicKey) (Connection
// Shutdown initiate a graceful closing of the TCP listener and all other internal routines
func (n *Net) Shutdown() {
n.isShuttingDown = true
n.tcpListener.Close()
if n.listener != nil {
err := n.listener.Close()
if err != nil {
n.logger.Error("Error closing listener err=%v", err)
}
}
}

func (n *Net) newTcpListener() (net.Listener, error) {
n.logger.Info("Starting to listen on tcp:%v", n.listenAddress)
tcpListener, err := net.Listen("tcp", n.listenAddress.String())
if err != nil {
return nil, err
}
return tcpListener, nil
}

// Start network server
func (n *Net) listen() error {
n.logger.Info("Starting to listen on %v", n.tcpListenAddress)
tcpListener, err := net.Listen("tcp", n.tcpListenAddress.String())
func (n *Net) listen(lis func() (listener net.Listener, err error)) error {
listener, err := lis()
n.listener = listener
if err != nil {
return err
}
n.tcpListener = tcpListener
go n.acceptTCP()
go n.accept(listener)
return nil
}

func (n *Net) acceptTCP() {
func (n *Net) accept(listen net.Listener) {
n.logger.Debug("Waiting for incoming connections...")
pending := make(chan struct{}, n.config.MaxPendingConnections)

for i := 0; i < n.config.MaxPendingConnections; i++ {
pending <- struct{}{}
}

for {
netConn, err := n.tcpListener.Accept()
<-pending
netConn, err := listen.Accept()
if err != nil {

if !n.isShuttingDown {
@@ -270,8 +289,16 @@ func (n *Net) acceptTCP() {
n.logger.Debug("Got new connection... Remote Address: %s", netConn.RemoteAddr())
formatter := delimited.NewChan(10)
c := newConnection(netConn, n, formatter, nil, nil, n.logger)
go func(con Connection) {
defer func() { pending <- struct{}{} }()
err := c.setupIncoming(n.config.SessionTimeout)
if err != nil {
n.logger.Warning("Error handling incoming connection with ", c.remoteAddr.String())
return
}
go c.beginEventProcessing()
}(c)

go c.beginEventProcessing()
// network won't publish the connection before it the remote node had established a session
}
}
Oops, something went wrong.

0 comments on commit 705ac1c

Please sign in to comment.
You can’t perform that action at this time.