Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace dead peers & increase replacement cache size #59

Merged
merged 4 commits into from
Mar 20, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
108 changes: 70 additions & 38 deletions table.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ import (
"github.com/libp2p/go-libp2p-core/peerstore"

logging "github.com/ipfs/go-log"
"github.com/jbenet/goprocess"
goprocessctx "github.com/jbenet/goprocess/context"
mh "github.com/multiformats/go-multihash"
)

Expand Down Expand Up @@ -44,7 +42,11 @@ type CplRefresh struct {

// RoutingTable defines the routing table.
type RoutingTable struct {
// the routing table context
ctx context.Context
// function to cancel the RT context
ctxCancel context.CancelFunc

// ID of the local peer
local ID

Expand All @@ -71,17 +73,20 @@ type RoutingTable struct {
PeerRemoved func(peer.ID)
PeerAdded func(peer.ID)

// is peer replacement enabled ?
isReplaceEnabled bool
// peerReplaceCh is the channel to write a peer replacement request to
peerReplaceCh chan peer.ID

// function to determine the validity of a peer for RT membership
peerValidationFnc PeerValidationFunc

// timeout for a single call to the peer validation function
peerValidationTimeout time.Duration

// interval between two runs of the table cleanup routine
tableCleanupInterval time.Duration
// function to select peers that need to be validated
// function to select peers that need to be validated during cleanup
peersForValidationFnc PeerSelectionFunc

proc goprocess.Process
}

// NewRoutingTable creates a new routing table with a given bucketsize, local ID, and latency tolerance.
Expand All @@ -95,7 +100,6 @@ func NewRoutingTable(bucketsize int, localID ID, latency time.Duration, m peerst
}

rt := &RoutingTable{
ctx: context.Background(),
buckets: []*bucket{newBucket()},
bucketsize: bucketsize,
local: localID,
Expand All @@ -108,18 +112,24 @@ func NewRoutingTable(bucketsize int, localID ID, latency time.Duration, m peerst
PeerRemoved: func(peer.ID) {},
PeerAdded: func(peer.ID) {},

peerReplaceCh: make(chan peer.ID, bucketsize*2),

peerValidationFnc: cfg.tableCleanup.peerValidationFnc,
peersForValidationFnc: cfg.tableCleanup.peersForValidationFnc,
peerValidationTimeout: cfg.tableCleanup.peerValidationTimeout,
tableCleanupInterval: cfg.tableCleanup.interval,
}

rt.cplReplacementCache = newCplReplacementCache(rt.local, rt.bucketsize)
rt.proc = goprocessctx.WithContext(rt.ctx)
// create the replacement cache
rt.cplReplacementCache = newCplReplacementCache(rt.local, rt.bucketsize*2)

rt.ctx, rt.ctxCancel = context.WithCancel(context.Background())

// schedule periodic RT cleanup if peer validation function has been passed
if rt.peerValidationFnc != nil {
rt.proc.Go(rt.cleanup)
rt.isReplaceEnabled = (rt.peerValidationFnc != nil)
if rt.isReplaceEnabled {
go rt.cleanup()
go rt.startPeerReplacement()
}

return rt, nil
Expand All @@ -128,10 +138,11 @@ func NewRoutingTable(bucketsize int, localID ID, latency time.Duration, m peerst
// Close shuts down the Routing Table & all associated processes.
// It is safe to call this multiple times.
func (rt *RoutingTable) Close() error {
return rt.proc.Close()
rt.ctxCancel()
return nil
}

func (rt *RoutingTable) cleanup(proc goprocess.Process) {
func (rt *RoutingTable) cleanup() {
validatePeerF := func(p peer.ID) bool {
queryCtx, cancel := context.WithTimeout(rt.ctx, rt.peerValidationTimeout)
defer cancel()
Expand All @@ -153,32 +164,45 @@ func (rt *RoutingTable) cleanup(proc goprocess.Process) {
continue
}

// peer does not seem to be alive, let's try candidates now
log.Infof("failed to validate missing peer=%s, will try candidates now...", pinfo.Id)
// evict missing peer
// peer does not seem to be alive, let's try to replace it
log.Infof("failed to validate missing peer=%s, evicting it from the RT & requesting a replace", pinfo.Id)
// evict missing peer & request replacement
rt.HandlePeerDead(pinfo.Id)
}
case <-rt.ctx.Done():
return
}
}
}

// keep trying replacement candidates for the missing peer till we get a successful validation or
// we run out of candidates
cpl := uint(CommonPrefixLen(ConvertPeerID(pinfo.Id), rt.local))
c, notEmpty := rt.cplReplacementCache.pop(cpl)
for notEmpty {
if validatePeerF(c) {
log.Infof("successfully validated candidate=%s for missing peer=%s", c, pinfo.Id)
break
}
log.Infof("failed to validated candidate=%s", c)
// remove candidate
rt.HandlePeerDead(c)

c, notEmpty = rt.cplReplacementCache.pop(cpl)
}
// replaces a peer using a valid peer from the replacement cache
func (rt *RoutingTable) startPeerReplacement() {
validatePeerF := func(p peer.ID) bool {
queryCtx, cancel := context.WithTimeout(rt.ctx, rt.peerValidationTimeout)
defer cancel()
return rt.peerValidationFnc(queryCtx, p)
}

if !notEmpty {
log.Infof("failed to replace missing peer=%s as all candidates were invalid", pinfo.Id)
for {
select {
case p := <-rt.peerReplaceCh:
// keep trying replacement candidates till we get a successful validation or
// we run out of candidates
cpl := uint(CommonPrefixLen(ConvertPeerID(p), rt.local))
c, notEmpty := rt.cplReplacementCache.pop(cpl)
for notEmpty {
if validatePeerF(c) {
log.Infof("successfully validated candidate=%s for peer=%s", c, p)
break
}
log.Infof("failed to validated candidate=%s", c)
c, notEmpty = rt.cplReplacementCache.pop(cpl)
}
case <-proc.Closing():

if !notEmpty {
log.Infof("failed to replace missing peer=%s as all candidates were invalid", p)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we doing anything with the replacement peer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. peer to be replaced -> we use it to calculate the CPL. But, we can just pass in the CPL in the channel rather than the peer.
  2. The peer we replace with -> we just "validate it". The way things are wried right now, a side effect of the validation is that that the peer gets added to the RT because of the DHT connectedness notification handler. I've documented that in the cleanup function. Should I document it here as well ?

case <-rt.ctx.Done():
return
}
}
Expand Down Expand Up @@ -341,17 +365,25 @@ func (rt *RoutingTable) HandlePeerAlive(p peer.ID) (evicted peer.ID, err error)
}

// HandlePeerDead should be called when the caller is sure that a peer is dead/not dialable.
// It evicts the peer from the Routing Table and also removes it as a replacement candidate if it is one.
// It evicts the peer from the Routing Table and tries to replace it with a valid & eligible
// candidate from the replacement cache.
func (rt *RoutingTable) HandlePeerDead(p peer.ID) {
// remove it as a candidate
rt.cplReplacementCache.remove(p)

// remove it from the RT
rt.tabLock.Lock()
defer rt.tabLock.Unlock()
bucketID := rt.bucketIdForPeer(p)
bucket := rt.buckets[bucketID]
if bucket.remove(p) {
// request a replacement
if rt.isReplaceEnabled {
select {
case rt.peerReplaceCh <- p:
Stebalien marked this conversation as resolved.
Show resolved Hide resolved
default:
log.Errorf("unable to request replacement for peer=%s as queue for replace requests is full", p)
}
}

// peer removed callback
rt.PeerRemoved(p)
}
}
Expand Down
83 changes: 57 additions & 26 deletions table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,37 +167,57 @@ func TestHandlePeerDead(t *testing.T) {
t.Parallel()

local := test.RandPeerIDFatal(t)
var candidate peer.ID
for {
candidate = test.RandPeerIDFatal(t)
if CommonPrefixLen(ConvertPeerID(candidate), ConvertPeerID(local)) == 0 {
break
}
}

var lk sync.Mutex
var added peer.ID
f := func(ctx context.Context, p peer.ID) bool {
if p == candidate {
lk.Lock()
added = p
lk.Unlock()
}
return true
}

m := pstore.NewMetrics()
rt, err := NewRoutingTable(2, ConvertPeerID(local), time.Hour, m, PeerValidationFnc(PeerAlwaysValidFnc))
rt, err := NewRoutingTable(2, ConvertPeerID(local), time.Hour, m, PeerValidationFnc(f))
require.NoError(t, err)

// push 3 peers -> 2 for the first bucket, and 1 as candidates
var peers []peer.ID
for i := 0; i < 3; i++ {
p, err := rt.GenRandPeerID(uint(0))
require.NoError(t, err)
require.NotEmpty(t, p)
rt.HandlePeerAlive(p)
peers = append(peers, p)
}
p1, _ := rt.GenRandPeerID(0)
p2, _ := rt.GenRandPeerID(0)
rt.HandlePeerAlive(p1)
rt.HandlePeerAlive(p2)
rt.HandlePeerAlive(candidate)

// ensure p1 & p2 are in the RT
require.Len(t, rt.ListPeers(), 2)
require.Contains(t, rt.ListPeers(), p1)
require.Contains(t, rt.ListPeers(), p2)

// ensure we have 1 candidate
rt.cplReplacementCache.Lock()
require.NotNil(t, rt.cplReplacementCache.candidates[uint(0)])
require.True(t, len(rt.cplReplacementCache.candidates[uint(0)]) == 1)
require.Len(t, rt.cplReplacementCache.candidates[uint(0)], 1)
require.Contains(t, rt.cplReplacementCache.candidates[uint(0)], candidate)
rt.cplReplacementCache.Unlock()

// mark a peer as dead and ensure it's not in the RT
require.NotEmpty(t, rt.Find(peers[0]))
rt.HandlePeerDead(peers[0])
require.Empty(t, rt.Find(peers[0]))

// mark the peer as dead & verify we don't get it as a candidate
rt.HandlePeerDead(peers[2])

// mark a peer as dead and ensure it's not in the RT & it gets replaced
require.NotEmpty(t, rt.Find(p1))
rt.HandlePeerDead(p1)
require.Empty(t, rt.Find(p1))
time.Sleep(1 * time.Second)
rt.cplReplacementCache.Lock()
require.Nil(t, rt.cplReplacementCache.candidates[uint(0)])
require.Empty(t, rt.cplReplacementCache.candidates)
rt.cplReplacementCache.Unlock()
lk.Lock()
require.Equal(t, candidate, added)
lk.Unlock()
}

func TestTableCallbacks(t *testing.T) {
Expand Down Expand Up @@ -593,7 +613,6 @@ func TestTableCleanup(t *testing.T) {
p := test.RandPeerIDFatal(t)
if CommonPrefixLen(ConvertPeerID(local), ConvertPeerID(p)) == cpl {
cplPeerMap[cpl] = append(cplPeerMap[cpl], p)

i++
if i == 6 {
break
Expand Down Expand Up @@ -634,7 +653,7 @@ func TestTableCleanup(t *testing.T) {
}
}

// validate current state
// validate current TABLE state
rt.tabLock.RLock()
require.Len(t, rt.ListPeers(), 6)
ps0 := rt.buckets[0].peerIds()
Expand All @@ -649,15 +668,22 @@ func TestTableCleanup(t *testing.T) {
require.Contains(t, ps1, cplPeerMap[1][2])
rt.tabLock.RUnlock()

// now disconnect peers 0 1 & 2 from both buckets so it has only 0 left after it gets validated
// validate current state of replacement cache
rt.cplReplacementCache.Lock()
require.Len(t, rt.cplReplacementCache.candidates, 2)
require.Len(t, rt.cplReplacementCache.candidates[0], 3)
require.Len(t, rt.cplReplacementCache.candidates[1], 3)
rt.cplReplacementCache.Unlock()

// now disconnect peers 0 1 & 2 from both buckets so it has only peer 1 left after it gets validated
for _, peers := range cplPeerMap {
rt.HandlePeerDisconnect(peers[0])
rt.HandlePeerDisconnect(peers[1])
rt.HandlePeerDisconnect(peers[2])
}

// let RT cleanup complete
time.Sleep(1 * time.Second)
time.Sleep(2 * time.Second)

// verify RT state
rt.tabLock.RLock()
Expand All @@ -670,7 +696,7 @@ func TestTableCleanup(t *testing.T) {
require.Contains(t, ps1, cplPeerMap[1][1])
rt.tabLock.RUnlock()

// verify candidate state
// verify peers were replaced with candidates
addedCandidatesLk.Lock()
require.Len(t, addedCandidates, 4)
require.Contains(t, addedCandidates, cplPeerMap[0][3])
Expand All @@ -679,6 +705,11 @@ func TestTableCleanup(t *testing.T) {
require.Contains(t, addedCandidates, cplPeerMap[1][5])
addedCandidatesLk.Unlock()

// verify candidates were removed from the replacement cache
rt.cplReplacementCache.Lock()
require.Empty(t, rt.cplReplacementCache.candidates)
rt.cplReplacementCache.Unlock()

// close RT
require.NoError(t, rt.Close())
}
Expand Down