Skip to content

Commit

Permalink
p2p: persistent - redial if first dial fails
Browse files Browse the repository at this point in the history
Fixes #1401
  • Loading branch information
tuxcanfly committed Apr 3, 2018
1 parent 6f99569 commit 5ef639f
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 15 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -24,6 +24,7 @@ IMPROVEMENTS:
BUG FIXES:
- Graceful handling/recovery for apps that have non-determinism or fail to halt
- Graceful handling/recovery for violations of safety, or liveness
- Fix reconnect to persistent peer when first dial fails

## 0.17.1 (March 27th, 2018)

Expand Down
1 change: 1 addition & 0 deletions p2p/CHANGELOG.md
Expand Up @@ -7,6 +7,7 @@ BREAKING CHANGES:
- Remove or unexport methods from FuzzedConnection: Active, Mode, ProbDropRW, ProbDropConn, ProbSleep, MaxDelayMilliseconds, Fuzz
- switch.AddPeerWithConnection is unexported and replaced by switch.AddPeer
- switch.DialPeerWithAddress takes a bool, setting the peer as persistent or not
- PeerConfig requires a Dial function

FEATURES:

Expand Down
5 changes: 4 additions & 1 deletion p2p/peer.go
Expand Up @@ -87,6 +87,8 @@ func newPeer(pc peerConn, nodeInfo NodeInfo,
type PeerConfig struct {
AuthEnc bool `mapstructure:"auth_enc"` // authenticated encryption

Dial func(addr *NetAddress, config *PeerConfig) (net.Conn, error)

// times are in seconds
HandshakeTimeout time.Duration `mapstructure:"handshake_timeout"`
DialTimeout time.Duration `mapstructure:"dial_timeout"`
Expand All @@ -101,6 +103,7 @@ type PeerConfig struct {
func DefaultPeerConfig() *PeerConfig {
return &PeerConfig{
AuthEnc: true,
Dial: dial,
HandshakeTimeout: 20, // * time.Second,
DialTimeout: 3, // * time.Second,
MConfig: tmconn.DefaultMConnConfig(),
Expand All @@ -112,7 +115,7 @@ func DefaultPeerConfig() *PeerConfig {
func newOutboundPeerConn(addr *NetAddress, config *PeerConfig, persistent bool, ourNodePrivKey crypto.PrivKey) (peerConn, error) {
var pc peerConn

conn, err := dial(addr, config)
conn, err := config.Dial(addr, config)
if err != nil {
return pc, errors.Wrap(err, "Error creating peer")
}
Expand Down
35 changes: 21 additions & 14 deletions p2p/switch.go
Expand Up @@ -56,6 +56,7 @@ type Switch struct {
reactorsByCh map[byte]Reactor
peers *PeerSet
dialing *cmn.CMap
reconnecting *cmn.CMap
nodeInfo NodeInfo // our node info
nodeKey *NodeKey // our node privkey
addrBook AddrBook
Expand All @@ -75,6 +76,7 @@ func NewSwitch(config *cfg.P2PConfig) *Switch {
reactorsByCh: make(map[byte]Reactor),
peers: NewPeerSet(),
dialing: cmn.NewCMap(),
reconnecting: cmn.NewCMap(),
}

// Ensure we have a completely undeterministic PRNG. cmd.RandInt64() draws
Expand Down Expand Up @@ -255,7 +257,7 @@ func (sw *Switch) StopPeerForError(peer Peer, reason interface{}) {
sw.stopAndRemovePeer(peer, reason)

if peer.IsPersistent() {
go sw.reconnectToPeer(peer)
go sw.reconnectToPeer(peer.NodeInfo().NetAddress())
}
}

Expand All @@ -274,24 +276,28 @@ func (sw *Switch) stopAndRemovePeer(peer Peer, reason interface{}) {
}
}

// reconnectToPeer tries to reconnect to the peer, first repeatedly
// reconnectToPeer tries to reconnect to the addr, first repeatedly
// with a fixed interval, then with exponential backoff.
// If no success after all that, it stops trying, and leaves it
// to the PEX/Addrbook to find the peer again
func (sw *Switch) reconnectToPeer(peer Peer) {
// NOTE this will connect to the self reported address,
// not necessarily the original we dialed
netAddr := peer.NodeInfo().NetAddress()
// to the PEX/Addrbook to find the peer with the addr again
func (sw *Switch) reconnectToPeer(addr *NetAddress) {
if sw.reconnecting.Has(string(addr.ID)) {
return
}

sw.reconnecting.Set(string(addr.ID), addr)
defer sw.reconnecting.Delete(string(addr.ID))

start := time.Now()
sw.Logger.Info("Reconnecting to peer", "peer", peer)
sw.Logger.Info("Reconnecting to peer", "addr", addr)
for i := 0; i < reconnectAttempts; i++ {
if !sw.IsRunning() {
return
}

err := sw.DialPeerWithAddress(netAddr, true)
err := sw.DialPeerWithAddress(addr, true)
if err != nil {
sw.Logger.Info("Error reconnecting to peer. Trying again", "tries", i, "err", err, "peer", peer)
sw.Logger.Info("Error reconnecting to peer. Trying again", "tries", i, "err", err, "addr", addr)
// sleep a set amount
sw.randomSleep(reconnectInterval)
continue
Expand All @@ -301,7 +307,7 @@ func (sw *Switch) reconnectToPeer(peer Peer) {
}

sw.Logger.Error("Failed to reconnect to peer. Beginning exponential backoff",
"peer", peer, "elapsed", time.Since(start))
"addr", addr, "elapsed", time.Since(start))
for i := 0; i < reconnectBackOffAttempts; i++ {
if !sw.IsRunning() {
return
Expand All @@ -310,13 +316,13 @@ func (sw *Switch) reconnectToPeer(peer Peer) {
// sleep an exponentially increasing amount
sleepIntervalSeconds := math.Pow(reconnectBackOffBaseSeconds, float64(i))
sw.randomSleep(time.Duration(sleepIntervalSeconds) * time.Second)
err := sw.DialPeerWithAddress(netAddr, true)
err := sw.DialPeerWithAddress(addr, true)
if err == nil {
return // success
}
sw.Logger.Info("Error reconnecting to peer. Trying again", "tries", i, "err", err, "peer", peer)
sw.Logger.Info("Error reconnecting to peer. Trying again", "tries", i, "err", err, "addr", addr)
}
sw.Logger.Error("Failed to reconnect to peer. Giving up", "peer", peer, "elapsed", time.Since(start))
sw.Logger.Error("Failed to reconnect to peer. Giving up", "addr", addr, "elapsed", time.Since(start))
}

// SetAddrBook allows to set address book on Switch.
Expand Down Expand Up @@ -470,6 +476,7 @@ func (sw *Switch) addOutboundPeerWithConfig(addr *NetAddress, config *PeerConfig
peerConn, err := newOutboundPeerConn(addr, config, persistent, sw.nodeKey.PrivKey)
if err != nil {
sw.Logger.Error("Failed to dial peer", "address", addr, "err", err)
go sw.reconnectToPeer(addr)
return err
}

Expand Down
29 changes: 29 additions & 0 deletions p2p/switch_test.go
Expand Up @@ -8,6 +8,7 @@ import (
"testing"
"time"

"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand All @@ -23,6 +24,11 @@ var (
config *cfg.P2PConfig
)

// badDial returns an error for testing dial errors
func badDial(addr *NetAddress, config *PeerConfig) (net.Conn, error) {
return nil, errors.New("dial err")
}

func init() {
config = cfg.DefaultP2PConfig()
config.PexReactor = true
Expand Down Expand Up @@ -295,6 +301,29 @@ func TestSwitchReconnectsToPersistentPeer(t *testing.T) {
}
assert.NotZero(npeers)
assert.False(peer.IsRunning())

// simulate another remote peer
rp = &remotePeer{PrivKey: crypto.GenPrivKeyEd25519().Wrap(), Config: DefaultPeerConfig()}
rp.Start()
defer rp.Stop()

// simulate first time dial failure
peerConfig := DefaultPeerConfig()
peerConfig.Dial = badDial
err = sw.addOutboundPeerWithConfig(rp.Addr(), peerConfig, true)
require.NotNil(err)

// DialPeerWithAddres - sw.peerConfig resets the dialer

// TODO: same as above
for i := 0; i < 20; i++ {
time.Sleep(250 * time.Millisecond)
npeers = sw.Peers().Size()
if npeers > 1 {
break
}
}
assert.EqualValues(2, npeers)
}

func TestSwitchFullConnectivity(t *testing.T) {
Expand Down

0 comments on commit 5ef639f

Please sign in to comment.