Skip to content

Commit

Permalink
p2p: (seed mode) limit the number of attempts to connect to a peer (#…
Browse files Browse the repository at this point in the history
…3573)

* use dialPeer function in a seed mode

Fixes #3532

by storing a number of attempts we've tried to connect in-memory and
removing the address from addrbook when number of attempts > 16
  • Loading branch information
melekes committed Apr 17, 2019
1 parent 5b8888b commit c0e8fb5
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 25 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG_PENDING.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,5 @@

### BUG FIXES:
- [state] [\#3537](https://github.com/tendermint/tendermint/pull/3537#issuecomment-482711833) LoadValidators: do not return an empty validator set
- [p2p] \#3532 limit the number of attempts to connect to a peer in seed mode
to 16 (as a result, the node will stop retrying after a 35 hours time window)
79 changes: 54 additions & 25 deletions p2p/pex/pex_reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,24 @@ const (
biasToSelectNewPeers = 30 // 70 to select good peers
)

type errMaxAttemptsToDial struct {
}

func (e errMaxAttemptsToDial) Error() string {
return fmt.Sprintf("reached max attempts %d to dial", maxAttemptsToDial)
}

type errTooEarlyToDial struct {
backoffDuration time.Duration
lastDialed time.Time
}

func (e errTooEarlyToDial) Error() string {
return fmt.Sprintf(
"too early to dial (backoff duration: %d, last dialed: %v, time since: %v)",
e.backoffDuration, e.lastDialed, time.Since(e.lastDialed))
}

// PEXReactor handles PEX (peer exchange) and ensures that an
// adequate number of peers are connected to the switch.
//
Expand Down Expand Up @@ -449,7 +467,19 @@ func (r *PEXReactor) ensurePeers() {

// Dial picked addresses
for _, addr := range toDial {
go r.dialPeer(addr)
go func(addr *p2p.NetAddress) {
err := r.dialPeer(addr)
if err != nil {
switch err.(type) {
case errMaxAttemptsToDial:
r.Logger.Debug(err.Error(), "addr", addr)
case errTooEarlyToDial:
r.Logger.Debug(err.Error(), "addr", addr)
default:
r.Logger.Error(err.Error(), "addr", addr)
}
}
}(addr)
}

// If we need more addresses, pick a random peer and ask for more.
Expand Down Expand Up @@ -479,17 +509,16 @@ func (r *PEXReactor) dialAttemptsInfo(addr *p2p.NetAddress) (attempts int, lastD
return atd.number, atd.lastDialed
}

func (r *PEXReactor) dialPeer(addr *p2p.NetAddress) {
func (r *PEXReactor) dialPeer(addr *p2p.NetAddress) error {
attempts, lastDialed := r.dialAttemptsInfo(addr)

if attempts > maxAttemptsToDial {
// Do not log the message if the addr gets readded.
if attempts+1 == maxAttemptsToDial {
r.Logger.Info("Reached max attempts to dial", "addr", addr, "attempts", attempts)
r.attemptsToDial.Store(addr.DialString(), _attemptsToDial{attempts + 1, time.Now()})
}
// TODO(melekes): have a blacklist in the addrbook with peers whom we've
// failed to connect to. Then we can clean up attemptsToDial, which acts as
// a blacklist currently.
// https://github.com/tendermint/tendermint/issues/3572
r.book.MarkBad(addr)
return
return errMaxAttemptsToDial{}
}

// exponential backoff if it's not our first attempt to dial given address
Expand All @@ -498,33 +527,31 @@ func (r *PEXReactor) dialPeer(addr *p2p.NetAddress) {
backoffDuration := jitterSeconds + ((1 << uint(attempts)) * time.Second)
sinceLastDialed := time.Since(lastDialed)
if sinceLastDialed < backoffDuration {
r.Logger.Debug("Too early to dial", "addr", addr, "backoff_duration", backoffDuration, "last_dialed", lastDialed, "time_since", sinceLastDialed)
return
return errTooEarlyToDial{backoffDuration, lastDialed}
}
}

err := r.Switch.DialPeerWithAddress(addr, false)

if err != nil {
if _, ok := err.(p2p.ErrCurrentlyDialingOrExistingAddress); ok {
return
return err
}

r.Logger.Error("Dialing failed", "addr", addr, "err", err, "attempts", attempts)
markAddrInBookBasedOnErr(addr, r.book, err)
if _, ok := err.(p2p.ErrSwitchAuthenticationFailure); ok {
switch err.(type) {
case p2p.ErrSwitchAuthenticationFailure:
// NOTE: addr is removed from addrbook in markAddrInBookBasedOnErr
r.attemptsToDial.Delete(addr.DialString())
} else {
// FIXME: if the addr is going to be removed from the addrbook (hard to
// tell at this point), we need to Delete it from attemptsToDial, not
// record another attempt.
// record attempt
default:
r.attemptsToDial.Store(addr.DialString(), _attemptsToDial{attempts + 1, time.Now()})
}
return
return errors.Wrapf(err, "dialing failed (attempts: %d)", attempts+1)
}

// cleanup any history
r.attemptsToDial.Delete(addr.DialString())
return nil
}

// checkSeeds checks that addresses are well formed.
Expand Down Expand Up @@ -633,14 +660,16 @@ func (r *PEXReactor) crawlPeers(addrs []*p2p.NetAddress) {
LastCrawled: now,
}

err := r.Switch.DialPeerWithAddress(addr, false)
err := r.dialPeer(addr)
if err != nil {
if _, ok := err.(p2p.ErrCurrentlyDialingOrExistingAddress); ok {
continue
switch err.(type) {
case errMaxAttemptsToDial:
r.Logger.Debug(err.Error(), "addr", addr)
case errTooEarlyToDial:
r.Logger.Debug(err.Error(), "addr", addr)
default:
r.Logger.Error(err.Error(), "addr", addr)
}

r.Logger.Error("Dialing failed", "addr", addr, "err", err)
markAddrInBookBasedOnErr(addr, r.book, err)
continue
}

Expand Down
28 changes: 28 additions & 0 deletions p2p/pex/pex_reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,34 @@ func TestPEXReactorSeedMode(t *testing.T) {
assert.Equal(t, 0, sw.Peers().Size())
}

func TestPEXReactorDialsPeerUpToMaxAttemptsInSeedMode(t *testing.T) {
// directory to store address books
dir, err := ioutil.TempDir("", "pex_reactor")
require.Nil(t, err)
defer os.RemoveAll(dir) // nolint: errcheck

pexR, book := createReactor(&PEXReactorConfig{SeedMode: true})
defer teardownReactor(book)

sw := createSwitchAndAddReactors(pexR)
sw.SetAddrBook(book)
err = sw.Start()
require.NoError(t, err)
defer sw.Stop()

peer := mock.NewPeer(nil)
addr := peer.SocketAddr()

err = book.AddAddress(addr, addr)
require.NoError(t, err)

assert.True(t, book.HasAddress(addr))
// imitate maxAttemptsToDial reached
pexR.attemptsToDial.Store(addr.DialString(), _attemptsToDial{maxAttemptsToDial + 1, time.Now()})
pexR.crawlPeers([]*p2p.NetAddress{addr})
assert.False(t, book.HasAddress(addr))
}

// connect a peer to a seed, wait a bit, then stop it.
// this should give it time to request addrs and for the seed
// to call FlushStop, and allows us to test calling Stop concurrently
Expand Down

0 comments on commit c0e8fb5

Please sign in to comment.