Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace dead peers & increase replacement cache size #59

Merged
merged 4 commits into from
Mar 20, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 4 additions & 2 deletions sorting.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ type peerDistanceSorter struct {
target ID
}

func (pds *peerDistanceSorter) Len() int { return len(pds.peers) }
func (pds *peerDistanceSorter) Swap(a, b int) { pds.peers[a], pds.peers[b] = pds.peers[b], pds.peers[a] }
func (pds *peerDistanceSorter) Len() int { return len(pds.peers) }
func (pds *peerDistanceSorter) Swap(a, b int) {
pds.peers[a], pds.peers[b] = pds.peers[b], pds.peers[a]
}
func (pds *peerDistanceSorter) Less(a, b int) bool {
return pds.peers[a].distance.less(pds.peers[b].distance)
}
Expand Down
120 changes: 48 additions & 72 deletions table.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ import (
"github.com/libp2p/go-libp2p-core/peerstore"

logging "github.com/ipfs/go-log"
"github.com/jbenet/goprocess"
goprocessctx "github.com/jbenet/goprocess/context"
mh "github.com/multiformats/go-multihash"
)

Expand Down Expand Up @@ -44,7 +42,11 @@ type CplRefresh struct {

// RoutingTable defines the routing table.
type RoutingTable struct {
// the routing table context
ctx context.Context
// function to cancel the RT context
ctxCancel context.CancelFunc

// ID of the local peer
local ID

Expand All @@ -71,17 +73,20 @@ type RoutingTable struct {
PeerRemoved func(peer.ID)
PeerAdded func(peer.ID)

// is peer replacement enabled ?
isReplaceEnabled bool
// peerReplaceCh is the channel to write a peer replacement request to
peerReplaceCh chan peer.ID

// function to determine the validity of a peer for RT membership
peerValidationFnc PeerValidationFunc

// timeout for a single call to the peer validation function
peerValidationTimeout time.Duration

// interval between two runs of the table cleanup routine
tableCleanupInterval time.Duration
// function to select peers that need to be validated
// function to select peers that need to be validated during cleanup
peersForValidationFnc PeerSelectionFunc

proc goprocess.Process
}

// NewRoutingTable creates a new routing table with a given bucketsize, local ID, and latency tolerance.
Expand All @@ -95,7 +100,6 @@ func NewRoutingTable(bucketsize int, localID ID, latency time.Duration, m peerst
}

rt := &RoutingTable{
ctx: context.Background(),
buckets: []*bucket{newBucket()},
bucketsize: bucketsize,
local: localID,
Expand All @@ -108,18 +112,24 @@ func NewRoutingTable(bucketsize int, localID ID, latency time.Duration, m peerst
PeerRemoved: func(peer.ID) {},
PeerAdded: func(peer.ID) {},

peerReplaceCh: make(chan peer.ID, bucketsize*2),

peerValidationFnc: cfg.tableCleanup.peerValidationFnc,
peersForValidationFnc: cfg.tableCleanup.peersForValidationFnc,
peerValidationTimeout: cfg.tableCleanup.peerValidationTimeout,
tableCleanupInterval: cfg.tableCleanup.interval,
}

rt.cplReplacementCache = newCplReplacementCache(rt.local, rt.bucketsize)
rt.proc = goprocessctx.WithContext(rt.ctx)
// create the replacement cache
rt.cplReplacementCache = newCplReplacementCache(rt.local, rt.bucketsize*2)

rt.ctx, rt.ctxCancel = context.WithCancel(context.Background())

// schedule periodic RT cleanup if peer validation function has been passed
if rt.peerValidationFnc != nil {
rt.proc.Go(rt.cleanup)
rt.isReplaceEnabled = (rt.peerValidationFnc != nil)
if rt.isReplaceEnabled {
go rt.cleanup()
go rt.startPeerReplacement()
}

return rt, nil
Expand All @@ -128,60 +138,8 @@ func NewRoutingTable(bucketsize int, localID ID, latency time.Duration, m peerst
// Close shuts down the Routing Table & all associated processes.
// It is safe to call this multiple times.
func (rt *RoutingTable) Close() error {
return rt.proc.Close()
}

func (rt *RoutingTable) cleanup(proc goprocess.Process) {
validatePeerF := func(p peer.ID) bool {
queryCtx, cancel := context.WithTimeout(rt.ctx, rt.peerValidationTimeout)
defer cancel()
return rt.peerValidationFnc(queryCtx, p)
}

cleanupTickr := time.NewTicker(rt.tableCleanupInterval)
defer cleanupTickr.Stop()
for {
select {
case <-cleanupTickr.C:
ps := rt.peersToValidate()
for _, pinfo := range ps {
// continue if we are able to successfully validate the peer
// it will be marked alive in the RT when the DHT connection notification handler calls RT.HandlePeerAlive()
// TODO Should we revisit this ? It makes more sense for the RT to mark it as active here
if validatePeerF(pinfo.Id) {
log.Infof("successfully validated missing peer=%s", pinfo.Id)
continue
}

// peer does not seem to be alive, let's try candidates now
log.Infof("failed to validate missing peer=%s, will try candidates now...", pinfo.Id)
// evict missing peer
rt.HandlePeerDead(pinfo.Id)

// keep trying replacement candidates for the missing peer till we get a successful validation or
// we run out of candidates
cpl := uint(CommonPrefixLen(ConvertPeerID(pinfo.Id), rt.local))
c, notEmpty := rt.cplReplacementCache.pop(cpl)
for notEmpty {
if validatePeerF(c) {
log.Infof("successfully validated candidate=%s for missing peer=%s", c, pinfo.Id)
break
}
log.Infof("failed to validated candidate=%s", c)
// remove candidate
rt.HandlePeerDead(c)

c, notEmpty = rt.cplReplacementCache.pop(cpl)
}

if !notEmpty {
log.Infof("failed to replace missing peer=%s as all candidates were invalid", pinfo.Id)
}
}
case <-proc.Closing():
return
}
}
rt.ctxCancel()
return nil
}

// returns the peers that need to be validated.
Expand Down Expand Up @@ -299,6 +257,11 @@ func (rt *RoutingTable) HandlePeerAlive(p peer.ID) (evicted peer.ID, err error)
rt.tabLock.Lock()
defer rt.tabLock.Unlock()

return rt.addPeer(p)
}

// locking is the responsibility of the caller
func (rt *RoutingTable) addPeer(p peer.ID) (evicted peer.ID, err error) {
bucketID := rt.bucketIdForPeer(p)
bucket := rt.buckets[bucketID]
if peer := bucket.getPeer(p); peer != nil {
Expand Down Expand Up @@ -341,17 +304,29 @@ func (rt *RoutingTable) HandlePeerAlive(p peer.ID) (evicted peer.ID, err error)
}

// HandlePeerDead should be called when the caller is sure that a peer is dead/not dialable.
// It evicts the peer from the Routing Table and also removes it as a replacement candidate if it is one.
// It evicts the peer from the Routing Table and tries to replace it with a valid & eligible
// candidate from the replacement cache.
func (rt *RoutingTable) HandlePeerDead(p peer.ID) {
// remove it as a candidate
rt.cplReplacementCache.remove(p)

// remove it from the RT
rt.tabLock.Lock()
defer rt.tabLock.Unlock()
rt.removePeer(p)
}

// locking is the responsibility of the caller
func (rt *RoutingTable) removePeer(p peer.ID) {
bucketID := rt.bucketIdForPeer(p)
bucket := rt.buckets[bucketID]
if bucket.remove(p) {
// request a replacement
if rt.isReplaceEnabled {
select {
case rt.peerReplaceCh <- p:
Stebalien marked this conversation as resolved.
Show resolved Hide resolved
default:
log.Errorf("unable to request replacement for peer=%s as queue for replace requests is full", p)
}
}

// peer removed callback
rt.PeerRemoved(p)
}
}
Expand Down Expand Up @@ -469,12 +444,13 @@ func (rt *RoutingTable) Size() int {

// ListPeers takes a RoutingTable and returns a list of all peers from all buckets in the table.
func (rt *RoutingTable) ListPeers() []peer.ID {
var peers []peer.ID
rt.tabLock.RLock()
defer rt.tabLock.RUnlock()

var peers []peer.ID
for _, buck := range rt.buckets {
peers = append(peers, buck.peerIds()...)
}
rt.tabLock.RUnlock()
return peers
}

Expand Down
97 changes: 97 additions & 0 deletions table_cleanup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package kbucket

import (
"context"
"time"

"github.com/libp2p/go-libp2p-core/peer"
)

func (rt *RoutingTable) cleanup() {
validatePeerF := func(p peer.ID) bool {
queryCtx, cancel := context.WithTimeout(rt.ctx, rt.peerValidationTimeout)
defer cancel()
return rt.peerValidationFnc(queryCtx, p)
}

cleanupTickr := time.NewTicker(rt.tableCleanupInterval)
defer cleanupTickr.Stop()
for {
select {
case <-cleanupTickr.C:
ps := rt.peersToValidate()
for _, pinfo := range ps {
// TODO This is racy
// A peer could disconnect immediately after we validate it & would thus be in missing state again
// which means we would wrongly mark it as active here. The longer term solution is to
// handle all peer related events in a single event loop in the RT or more fingrained locking at bucket/peer level.
// See https://github.com/libp2p/go-libp2p-kbucket/issues/60
if validatePeerF(pinfo.Id) {
rt.tabLock.Lock()
// add it back/mark it as active ONLY if it is still in the RT
// to avoid adding it back if it's been marked as dead
i := rt.bucketIdForPeer(pinfo.Id)
if rt.buckets[i].getPeer(pinfo.Id) != nil {
log.Infof("successfully validated missing peer=%s, marking it as active", pinfo.Id)
rt.addPeer(pinfo.Id)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't we just set the peer state to active here instead of calling addPeer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}
rt.tabLock.Unlock()
continue
}

// peer does not seem to be alive, let's try to replace it
// evict missing peer & request replacement ONLY if it's NOT marked as active to avoid removing a peer that connected after
// the failed validation
rt.tabLock.Lock()
i := rt.bucketIdForPeer(pinfo.Id)
p := rt.buckets[i].getPeer(pinfo.Id)
if p != nil && p.State != PeerStateActive {
log.Infof("failed to validate missing peer=%s, evicting it from the RT & requesting a replace", pinfo.Id)
rt.removePeer(pinfo.Id)
}
rt.tabLock.Unlock()
}
case <-rt.ctx.Done():
return
}
}
}

// replaces a peer using a valid peer from the replacement cache
func (rt *RoutingTable) startPeerReplacement() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need another goroutine or can we just have a single "run" goroutine?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replacement is also requested by the DHT in addition to the cleanup routine. What do you have in mind when you say have a single "run" go-routine ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean merge the cleanup and replacement goroutines to reduce the number fo components updating the routing table. But it's not really an important change.

validatePeerF := func(p peer.ID) bool {
queryCtx, cancel := context.WithTimeout(rt.ctx, rt.peerValidationTimeout)
defer cancel()
return rt.peerValidationFnc(queryCtx, p)
}

for {
select {
case p := <-rt.peerReplaceCh:
// keep trying replacement candidates till we get a successful validation or
// we run out of candidates
cpl := uint(CommonPrefixLen(ConvertPeerID(p), rt.local))
c, notEmpty := rt.cplReplacementCache.pop(cpl)
for notEmpty {
if validatePeerF(c) {
log.Infof("successfully validated candidate=%s for peer=%s", c, p)
// TODO There is a race here. The peer could disconnect from us or stop supporting the DHT
// protocol after the validation which means we should not be adding it to the RT here.
// See https://github.com/libp2p/go-libp2p-kbucket/issues/60
rt.tabLock.Lock()
rt.addPeer(c)
rt.tabLock.Unlock()
break
}
log.Infof("failed to validated candidate=%s", c)
c, notEmpty = rt.cplReplacementCache.pop(cpl)
}

if !notEmpty {
log.Infof("failed to replace missing peer=%s as all candidates were invalid", p)
}
case <-rt.ctx.Done():
return
}
}
}