Skip to content

Commit

Permalink
Merge pull request #59 from libp2p/feat/better-replacement
Browse files Browse the repository at this point in the history
Replace dead peers & increase replacement cache size
  • Loading branch information
aarshkshah1992 committed Mar 20, 2020
2 parents 6f18d79 + 2bffeff commit d1a1e92
Show file tree
Hide file tree
Showing 5 changed files with 304 additions and 201 deletions.
6 changes: 4 additions & 2 deletions sorting.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ type peerDistanceSorter struct {
target ID
}

func (pds *peerDistanceSorter) Len() int { return len(pds.peers) }
func (pds *peerDistanceSorter) Swap(a, b int) { pds.peers[a], pds.peers[b] = pds.peers[b], pds.peers[a] }
func (pds *peerDistanceSorter) Len() int { return len(pds.peers) }
func (pds *peerDistanceSorter) Swap(a, b int) {
pds.peers[a], pds.peers[b] = pds.peers[b], pds.peers[a]
}
func (pds *peerDistanceSorter) Less(a, b int) bool {
return pds.peers[a].distance.less(pds.peers[b].distance)
}
Expand Down
120 changes: 48 additions & 72 deletions table.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ import (
"github.com/libp2p/go-libp2p-core/peerstore"

logging "github.com/ipfs/go-log"
"github.com/jbenet/goprocess"
goprocessctx "github.com/jbenet/goprocess/context"
mh "github.com/multiformats/go-multihash"
)

Expand Down Expand Up @@ -44,7 +42,11 @@ type CplRefresh struct {

// RoutingTable defines the routing table.
type RoutingTable struct {
// the routing table context
ctx context.Context
// function to cancel the RT context
ctxCancel context.CancelFunc

// ID of the local peer
local ID

Expand All @@ -71,17 +73,20 @@ type RoutingTable struct {
PeerRemoved func(peer.ID)
PeerAdded func(peer.ID)

// is peer replacement enabled ?
isReplaceEnabled bool
// peerReplaceCh is the channel to write a peer replacement request to
peerReplaceCh chan peer.ID

// function to determine the validity of a peer for RT membership
peerValidationFnc PeerValidationFunc

// timeout for a single call to the peer validation function
peerValidationTimeout time.Duration

// interval between two runs of the table cleanup routine
tableCleanupInterval time.Duration
// function to select peers that need to be validated
// function to select peers that need to be validated during cleanup
peersForValidationFnc PeerSelectionFunc

proc goprocess.Process
}

// NewRoutingTable creates a new routing table with a given bucketsize, local ID, and latency tolerance.
Expand All @@ -95,7 +100,6 @@ func NewRoutingTable(bucketsize int, localID ID, latency time.Duration, m peerst
}

rt := &RoutingTable{
ctx: context.Background(),
buckets: []*bucket{newBucket()},
bucketsize: bucketsize,
local: localID,
Expand All @@ -108,18 +112,24 @@ func NewRoutingTable(bucketsize int, localID ID, latency time.Duration, m peerst
PeerRemoved: func(peer.ID) {},
PeerAdded: func(peer.ID) {},

peerReplaceCh: make(chan peer.ID, bucketsize*2),

peerValidationFnc: cfg.tableCleanup.peerValidationFnc,
peersForValidationFnc: cfg.tableCleanup.peersForValidationFnc,
peerValidationTimeout: cfg.tableCleanup.peerValidationTimeout,
tableCleanupInterval: cfg.tableCleanup.interval,
}

rt.cplReplacementCache = newCplReplacementCache(rt.local, rt.bucketsize)
rt.proc = goprocessctx.WithContext(rt.ctx)
// create the replacement cache
rt.cplReplacementCache = newCplReplacementCache(rt.local, rt.bucketsize*2)

rt.ctx, rt.ctxCancel = context.WithCancel(context.Background())

// schedule periodic RT cleanup if peer validation function has been passed
if rt.peerValidationFnc != nil {
rt.proc.Go(rt.cleanup)
rt.isReplaceEnabled = (rt.peerValidationFnc != nil)
if rt.isReplaceEnabled {
go rt.cleanup()
go rt.startPeerReplacement()
}

return rt, nil
Expand All @@ -128,60 +138,8 @@ func NewRoutingTable(bucketsize int, localID ID, latency time.Duration, m peerst
// Close shuts down the Routing Table & all associated processes.
// It is safe to call this multiple times.
func (rt *RoutingTable) Close() error {
return rt.proc.Close()
}

func (rt *RoutingTable) cleanup(proc goprocess.Process) {
validatePeerF := func(p peer.ID) bool {
queryCtx, cancel := context.WithTimeout(rt.ctx, rt.peerValidationTimeout)
defer cancel()
return rt.peerValidationFnc(queryCtx, p)
}

cleanupTickr := time.NewTicker(rt.tableCleanupInterval)
defer cleanupTickr.Stop()
for {
select {
case <-cleanupTickr.C:
ps := rt.peersToValidate()
for _, pinfo := range ps {
// continue if we are able to successfully validate the peer
// it will be marked alive in the RT when the DHT connection notification handler calls RT.HandlePeerAlive()
// TODO Should we revisit this ? It makes more sense for the RT to mark it as active here
if validatePeerF(pinfo.Id) {
log.Infof("successfully validated missing peer=%s", pinfo.Id)
continue
}

// peer does not seem to be alive, let's try candidates now
log.Infof("failed to validate missing peer=%s, will try candidates now...", pinfo.Id)
// evict missing peer
rt.HandlePeerDead(pinfo.Id)

// keep trying replacement candidates for the missing peer till we get a successful validation or
// we run out of candidates
cpl := uint(CommonPrefixLen(ConvertPeerID(pinfo.Id), rt.local))
c, notEmpty := rt.cplReplacementCache.pop(cpl)
for notEmpty {
if validatePeerF(c) {
log.Infof("successfully validated candidate=%s for missing peer=%s", c, pinfo.Id)
break
}
log.Infof("failed to validated candidate=%s", c)
// remove candidate
rt.HandlePeerDead(c)

c, notEmpty = rt.cplReplacementCache.pop(cpl)
}

if !notEmpty {
log.Infof("failed to replace missing peer=%s as all candidates were invalid", pinfo.Id)
}
}
case <-proc.Closing():
return
}
}
rt.ctxCancel()
return nil
}

// returns the peers that need to be validated.
Expand Down Expand Up @@ -299,6 +257,11 @@ func (rt *RoutingTable) HandlePeerAlive(p peer.ID) (evicted peer.ID, err error)
rt.tabLock.Lock()
defer rt.tabLock.Unlock()

return rt.addPeer(p)
}

// locking is the responsibility of the caller
func (rt *RoutingTable) addPeer(p peer.ID) (evicted peer.ID, err error) {
bucketID := rt.bucketIdForPeer(p)
bucket := rt.buckets[bucketID]
if peer := bucket.getPeer(p); peer != nil {
Expand Down Expand Up @@ -341,17 +304,29 @@ func (rt *RoutingTable) HandlePeerAlive(p peer.ID) (evicted peer.ID, err error)
}

// HandlePeerDead should be called when the caller is sure that a peer is dead/not dialable.
// It evicts the peer from the Routing Table and also removes it as a replacement candidate if it is one.
// It evicts the peer from the Routing Table and tries to replace it with a valid & eligible
// candidate from the replacement cache.
func (rt *RoutingTable) HandlePeerDead(p peer.ID) {
// remove it as a candidate
rt.cplReplacementCache.remove(p)

// remove it from the RT
rt.tabLock.Lock()
defer rt.tabLock.Unlock()
rt.removePeer(p)
}

// locking is the responsibility of the caller
func (rt *RoutingTable) removePeer(p peer.ID) {
bucketID := rt.bucketIdForPeer(p)
bucket := rt.buckets[bucketID]
if bucket.remove(p) {
// request a replacement
if rt.isReplaceEnabled {
select {
case rt.peerReplaceCh <- p:
default:
log.Errorf("unable to request replacement for peer=%s as queue for replace requests is full", p)
}
}

// peer removed callback
rt.PeerRemoved(p)
}
}
Expand Down Expand Up @@ -469,12 +444,13 @@ func (rt *RoutingTable) Size() int {

// ListPeers takes a RoutingTable and returns a list of all peers from all buckets in the table.
func (rt *RoutingTable) ListPeers() []peer.ID {
var peers []peer.ID
rt.tabLock.RLock()
defer rt.tabLock.RUnlock()

var peers []peer.ID
for _, buck := range rt.buckets {
peers = append(peers, buck.peerIds()...)
}
rt.tabLock.RUnlock()
return peers
}

Expand Down
97 changes: 97 additions & 0 deletions table_cleanup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package kbucket

import (
"context"
"time"

"github.com/libp2p/go-libp2p-core/peer"
)

func (rt *RoutingTable) cleanup() {
validatePeerF := func(p peer.ID) bool {
queryCtx, cancel := context.WithTimeout(rt.ctx, rt.peerValidationTimeout)
defer cancel()
return rt.peerValidationFnc(queryCtx, p)
}

cleanupTickr := time.NewTicker(rt.tableCleanupInterval)
defer cleanupTickr.Stop()
for {
select {
case <-cleanupTickr.C:
ps := rt.peersToValidate()
for _, pinfo := range ps {
// TODO This is racy
// A peer could disconnect immediately after we validate it & would thus be in missing state again
// which means we would wrongly mark it as active here. The longer term solution is to
// handle all peer related events in a single event loop in the RT or more fingrained locking at bucket/peer level.
// See https://github.com/libp2p/go-libp2p-kbucket/issues/60
if validatePeerF(pinfo.Id) {
rt.tabLock.Lock()
// add it back/mark it as active ONLY if it is still in the RT
// to avoid adding it back if it's been marked as dead
i := rt.bucketIdForPeer(pinfo.Id)
if peer := rt.buckets[i].getPeer(pinfo.Id); peer != nil {
log.Debugf("successfully validated missing peer=%s, marking it as active", pinfo.Id)
peer.State = PeerStateActive
}
rt.tabLock.Unlock()
continue
}

// peer does not seem to be alive, let's try to replace it
// evict missing peer & request replacement ONLY if it's NOT marked as active to avoid removing a peer that connected after
// the failed validation
rt.tabLock.Lock()
i := rt.bucketIdForPeer(pinfo.Id)
p := rt.buckets[i].getPeer(pinfo.Id)
if p != nil && p.State != PeerStateActive {
log.Debugf("failed to validate missing peer=%s, evicting it from the RT & requesting a replace", pinfo.Id)
rt.removePeer(pinfo.Id)
}
rt.tabLock.Unlock()
}
case <-rt.ctx.Done():
return
}
}
}

// replaces a peer using a valid peer from the replacement cache
func (rt *RoutingTable) startPeerReplacement() {
validatePeerF := func(p peer.ID) bool {
queryCtx, cancel := context.WithTimeout(rt.ctx, rt.peerValidationTimeout)
defer cancel()
return rt.peerValidationFnc(queryCtx, p)
}

for {
select {
case p := <-rt.peerReplaceCh:
// keep trying replacement candidates till we get a successful validation or
// we run out of candidates
cpl := uint(CommonPrefixLen(ConvertPeerID(p), rt.local))
c, notEmpty := rt.cplReplacementCache.pop(cpl)
for notEmpty {
if validatePeerF(c) {
log.Debugf("successfully validated candidate=%s for peer=%s", c, p)
// TODO There is a race here. The peer could disconnect from us or stop supporting the DHT
// protocol after the validation which means we should not be adding it to the RT here.
// See https://github.com/libp2p/go-libp2p-kbucket/issues/60
rt.tabLock.Lock()
rt.addPeer(c)
rt.tabLock.Unlock()
break
}
log.Debugf("failed to validated candidate=%s", c)
c, notEmpty = rt.cplReplacementCache.pop(cpl)
}

if !notEmpty {
log.Debugf("failed to replace missing peer=%s as all candidates were invalid", p)
}
case <-rt.ctx.Done():
return
}
}
}

0 comments on commit d1a1e92

Please sign in to comment.