Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

p2p: persistent - redial if first dial fails #1404

Merged
merged 3 commits into from Apr 28, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -24,6 +24,7 @@ IMPROVEMENTS:
BUG FIXES:
- Graceful handling/recovery for apps that have non-determinism or fail to halt
- Graceful handling/recovery for violations of safety, or liveness
- Fix reconnect to persistent peer when first dial fails

## 0.17.1 (March 27th, 2018)

Expand Down
1 change: 1 addition & 0 deletions p2p/CHANGELOG.md
Expand Up @@ -7,6 +7,7 @@ BREAKING CHANGES:
- Remove or unexport methods from FuzzedConnection: Active, Mode, ProbDropRW, ProbDropConn, ProbSleep, MaxDelayMilliseconds, Fuzz
- switch.AddPeerWithConnection is unexported and replaced by switch.AddPeer
- switch.DialPeerWithAddress takes a bool, setting the peer as persistent or not
- PeerConfig requires a Dial function

FEATURES:

Expand Down
5 changes: 4 additions & 1 deletion p2p/peer.go
Expand Up @@ -87,6 +87,8 @@ func newPeer(pc peerConn, nodeInfo NodeInfo,
type PeerConfig struct {
AuthEnc bool `mapstructure:"auth_enc"` // authenticated encryption

Dial func(addr *NetAddress, config *PeerConfig) (net.Conn, error)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather not have functions inside config.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we can Have badAddr, which returns error on DialTimeout. This will require to inroduce NetAddress interface.

Copy link
Contributor

@ebuchman ebuchman Apr 3, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think better would be to set a flag on the remotePeer to allow it to reject the the connection in its accept routine. We should try to avoid putting methods in config structs like this, and I'm not sure we want the NetAddress to be an interface (though perhaps I could be convinced, just feels like not the right way to solve the problem of writing these tests, even if its after all a good idea).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can have a the Dial func somewhere else? It seems more convenient to have a configurable dial behavior for testing, for e.g it could be used for slow connections / timeouts.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe Switch?


// times are in seconds
HandshakeTimeout time.Duration `mapstructure:"handshake_timeout"`
DialTimeout time.Duration `mapstructure:"dial_timeout"`
Expand All @@ -101,6 +103,7 @@ type PeerConfig struct {
func DefaultPeerConfig() *PeerConfig {
return &PeerConfig{
AuthEnc: true,
Dial: dial,
HandshakeTimeout: 20, // * time.Second,
DialTimeout: 3, // * time.Second,
MConfig: tmconn.DefaultMConnConfig(),
Expand All @@ -112,7 +115,7 @@ func DefaultPeerConfig() *PeerConfig {
func newOutboundPeerConn(addr *NetAddress, config *PeerConfig, persistent bool, ourNodePrivKey crypto.PrivKey) (peerConn, error) {
var pc peerConn

conn, err := dial(addr, config)
conn, err := config.Dial(addr, config)
if err != nil {
return pc, errors.Wrap(err, "Error creating peer")
}
Expand Down
35 changes: 21 additions & 14 deletions p2p/switch.go
Expand Up @@ -56,6 +56,7 @@ type Switch struct {
reactorsByCh map[byte]Reactor
peers *PeerSet
dialing *cmn.CMap
reconnecting *cmn.CMap
nodeInfo NodeInfo // our node info
nodeKey *NodeKey // our node privkey
addrBook AddrBook
Expand All @@ -75,6 +76,7 @@ func NewSwitch(config *cfg.P2PConfig) *Switch {
reactorsByCh: make(map[byte]Reactor),
peers: NewPeerSet(),
dialing: cmn.NewCMap(),
reconnecting: cmn.NewCMap(),
}

// Ensure we have a completely undeterministic PRNG. cmd.RandInt64() draws
Expand Down Expand Up @@ -255,7 +257,7 @@ func (sw *Switch) StopPeerForError(peer Peer, reason interface{}) {
sw.stopAndRemovePeer(peer, reason)

if peer.IsPersistent() {
go sw.reconnectToPeer(peer)
go sw.reconnectToPeer(peer.NodeInfo().NetAddress())
}
}

Expand All @@ -274,24 +276,28 @@ func (sw *Switch) stopAndRemovePeer(peer Peer, reason interface{}) {
}
}

// reconnectToPeer tries to reconnect to the peer, first repeatedly
// reconnectToPeer tries to reconnect to the addr, first repeatedly
// with a fixed interval, then with exponential backoff.
// If no success after all that, it stops trying, and leaves it
// to the PEX/Addrbook to find the peer again
func (sw *Switch) reconnectToPeer(peer Peer) {
// NOTE this will connect to the self reported address,
// not necessarily the original we dialed
netAddr := peer.NodeInfo().NetAddress()
// to the PEX/Addrbook to find the peer with the addr again
func (sw *Switch) reconnectToPeer(addr *NetAddress) {
if sw.reconnecting.Has(string(addr.ID)) {
return
}

sw.reconnecting.Set(string(addr.ID), addr)
defer sw.reconnecting.Delete(string(addr.ID))

start := time.Now()
sw.Logger.Info("Reconnecting to peer", "peer", peer)
sw.Logger.Info("Reconnecting to peer", "addr", addr)
for i := 0; i < reconnectAttempts; i++ {
if !sw.IsRunning() {
return
}

err := sw.DialPeerWithAddress(netAddr, true)
err := sw.DialPeerWithAddress(addr, true)
if err != nil {
sw.Logger.Info("Error reconnecting to peer. Trying again", "tries", i, "err", err, "peer", peer)
sw.Logger.Info("Error reconnecting to peer. Trying again", "tries", i, "err", err, "addr", addr)
// sleep a set amount
sw.randomSleep(reconnectInterval)
continue
Expand All @@ -301,7 +307,7 @@ func (sw *Switch) reconnectToPeer(peer Peer) {
}

sw.Logger.Error("Failed to reconnect to peer. Beginning exponential backoff",
"peer", peer, "elapsed", time.Since(start))
"addr", addr, "elapsed", time.Since(start))
for i := 0; i < reconnectBackOffAttempts; i++ {
if !sw.IsRunning() {
return
Expand All @@ -310,13 +316,13 @@ func (sw *Switch) reconnectToPeer(peer Peer) {
// sleep an exponentially increasing amount
sleepIntervalSeconds := math.Pow(reconnectBackOffBaseSeconds, float64(i))
sw.randomSleep(time.Duration(sleepIntervalSeconds) * time.Second)
err := sw.DialPeerWithAddress(netAddr, true)
err := sw.DialPeerWithAddress(addr, true)
if err == nil {
return // success
}
sw.Logger.Info("Error reconnecting to peer. Trying again", "tries", i, "err", err, "peer", peer)
sw.Logger.Info("Error reconnecting to peer. Trying again", "tries", i, "err", err, "addr", addr)
}
sw.Logger.Error("Failed to reconnect to peer. Giving up", "peer", peer, "elapsed", time.Since(start))
sw.Logger.Error("Failed to reconnect to peer. Giving up", "addr", addr, "elapsed", time.Since(start))
}

// SetAddrBook allows to set address book on Switch.
Expand Down Expand Up @@ -470,6 +476,7 @@ func (sw *Switch) addOutboundPeerWithConfig(addr *NetAddress, config *PeerConfig
peerConn, err := newOutboundPeerConn(addr, config, persistent, sw.nodeKey.PrivKey)
if err != nil {
sw.Logger.Error("Failed to dial peer", "address", addr, "err", err)
go sw.reconnectToPeer(addr)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we only want to do this if the persistent == true

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

return err
}

Expand Down
29 changes: 29 additions & 0 deletions p2p/switch_test.go
Expand Up @@ -8,6 +8,7 @@ import (
"testing"
"time"

"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand All @@ -23,6 +24,11 @@ var (
config *cfg.P2PConfig
)

// badDial returns an error for testing dial errors
func badDial(addr *NetAddress, config *PeerConfig) (net.Conn, error) {
return nil, errors.New("dial err")
}

func init() {
config = cfg.DefaultP2PConfig()
config.PexReactor = true
Expand Down Expand Up @@ -295,6 +301,29 @@ func TestSwitchReconnectsToPersistentPeer(t *testing.T) {
}
assert.NotZero(npeers)
assert.False(peer.IsRunning())

// simulate another remote peer
rp = &remotePeer{PrivKey: crypto.GenPrivKeyEd25519().Wrap(), Config: DefaultPeerConfig()}
rp.Start()
defer rp.Stop()

// simulate first time dial failure
peerConfig := DefaultPeerConfig()
peerConfig.Dial = badDial
err = sw.addOutboundPeerWithConfig(rp.Addr(), peerConfig, true)
require.NotNil(err)

// DialPeerWithAddres - sw.peerConfig resets the dialer

// TODO: same as above
for i := 0; i < 20; i++ {
time.Sleep(250 * time.Millisecond)
npeers = sw.Peers().Size()
if npeers > 1 {
break
}
}
assert.EqualValues(2, npeers)
}

func TestSwitchFullConnectivity(t *testing.T) {
Expand Down