Skip to content

Commit

Permalink
Merge pull request #75 from libp2p/feat/peer-queried-at
Browse files Browse the repository at this point in the history
Add LastUsefulAt and LastSuccessfulQueryAt for each peer
  • Loading branch information
Stebalien committed Apr 9, 2020
2 parents e79a3b7 + 8ee3433 commit 6f708c1
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 33 deletions.
11 changes: 8 additions & 3 deletions bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,14 @@ import (
// PeerInfo holds all related information for a peer in the K-Bucket.
type PeerInfo struct {
Id peer.ID
// LastSuccessfulOutboundQuery is the time instant when we last made a successful
// outbound query to this peer
LastSuccessfulOutboundQuery time.Time

// LastUsefulAt is the time instant at which the peer was last "useful" to us.
// Please see the DHT docs for the definition of usefulness.
LastUsefulAt time.Time

// LastSuccessfulOutboundQueryAt is the time instant at which we last got a
// successful query response from the peer.
LastSuccessfulOutboundQueryAt time.Time

// Id of the peer in the DHT XOR keyspace
dhtId ID
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/libp2p/go-libp2p-kbucket
require (
github.com/ipfs/go-ipfs-util v0.0.1
github.com/ipfs/go-log v1.0.3
github.com/jbenet/goprocess v0.1.3
github.com/jbenet/goprocess v0.1.4
github.com/libp2p/go-libp2p-core v0.5.1
github.com/libp2p/go-libp2p-peerstore v0.2.2
github.com/minio/sha256-simd v0.1.1
Expand Down
48 changes: 33 additions & 15 deletions table.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,14 @@ type RoutingTable struct {
PeerRemoved func(peer.ID)
PeerAdded func(peer.ID)

// maxLastSuccessfulOutboundThreshold is the max threshold/upper limit for the value of "LastSuccessfulOutboundQuery"
// of the peer in the bucket above which we will evict it to make place for a new peer if the bucket
// usefulnessGracePeriod is the maximum grace period we will give to a
// peer in the bucket to be useful to us, failing which, we will evict it to make place for a new peer if the bucket
// is full
maxLastSuccessfulOutboundThreshold float64
usefulnessGracePeriod float64
}

// NewRoutingTable creates a new routing table with a given bucketsize, local ID, and latency tolerance.
// Passing a nil PeerValidationFunc disables periodic table cleanup.
func NewRoutingTable(bucketsize int, localID ID, latency time.Duration, m peerstore.Metrics, maxLastSuccessfulOutboundThreshold float64) (*RoutingTable, error) {
func NewRoutingTable(bucketsize int, localID ID, latency time.Duration, m peerstore.Metrics, usefulnessGracePeriod float64) (*RoutingTable, error) {
rt := &RoutingTable{
buckets: []*bucket{newBucket()},
bucketsize: bucketsize,
Expand All @@ -71,7 +70,7 @@ func NewRoutingTable(bucketsize int, localID ID, latency time.Duration, m peerst
PeerRemoved: func(peer.ID) {},
PeerAdded: func(peer.ID) {},

maxLastSuccessfulOutboundThreshold: maxLastSuccessfulOutboundThreshold,
usefulnessGracePeriod: usefulnessGracePeriod,
}

rt.ctx, rt.ctxCancel = context.WithCancel(context.Background())
Expand Down Expand Up @@ -111,9 +110,9 @@ func (rt *RoutingTable) TryAddPeer(p peer.ID, queryPeer bool) (bool, error) {
func (rt *RoutingTable) addPeer(p peer.ID, queryPeer bool) (bool, error) {
bucketID := rt.bucketIdForPeer(p)
bucket := rt.buckets[bucketID]
var lastSuccessfulOutboundQuery time.Time
var lastUsefulAt time.Time
if queryPeer {
lastSuccessfulOutboundQuery = time.Now()
lastUsefulAt = time.Now()
}

// peer already exists in the Routing Table.
Expand All @@ -129,7 +128,8 @@ func (rt *RoutingTable) addPeer(p peer.ID, queryPeer bool) (bool, error) {

// We have enough space in the bucket (whether spawned or grouped).
if bucket.len() < rt.bucketsize {
bucket.pushFront(&PeerInfo{p, lastSuccessfulOutboundQuery, ConvertPeerID(p)})
bucket.pushFront(&PeerInfo{Id: p, LastUsefulAt: lastUsefulAt, LastSuccessfulOutboundQueryAt: time.Now(),
dhtId: ConvertPeerID(p)})
rt.PeerAdded(p)
return true, nil
}
Expand All @@ -143,7 +143,8 @@ func (rt *RoutingTable) addPeer(p peer.ID, queryPeer bool) (bool, error) {

// push the peer only if the bucket isn't overflowing after slitting
if bucket.len() < rt.bucketsize {
bucket.pushFront(&PeerInfo{p, lastSuccessfulOutboundQuery, ConvertPeerID(p)})
bucket.pushFront(&PeerInfo{Id: p, LastUsefulAt: lastUsefulAt, LastSuccessfulOutboundQueryAt: time.Now(),
dhtId: ConvertPeerID(p)})
rt.PeerAdded(p)
return true, nil
}
Expand All @@ -153,10 +154,11 @@ func (rt *RoutingTable) addPeer(p peer.ID, queryPeer bool) (bool, error) {
// in that bucket with a LastSuccessfulOutboundQuery value above the maximum threshold and replace it.
allPeers := bucket.peers()
for _, pc := range allPeers {
if float64(time.Since(pc.LastSuccessfulOutboundQuery)) > rt.maxLastSuccessfulOutboundThreshold {
if float64(time.Since(pc.LastUsefulAt)) > rt.usefulnessGracePeriod {
// let's evict it and add the new peer
if bucket.remove(pc.Id) {
bucket.pushFront(&PeerInfo{p, lastSuccessfulOutboundQuery, ConvertPeerID(p)})
bucket.pushFront(&PeerInfo{Id: p, LastUsefulAt: lastUsefulAt, LastSuccessfulOutboundQueryAt: time.Now(),
dhtId: ConvertPeerID(p)})
rt.PeerAdded(p)
return true, nil
}
Expand All @@ -180,17 +182,33 @@ func (rt *RoutingTable) GetPeerInfos() []PeerInfo {
return pis
}

// UpdateLastSuccessfulOutboundQuery updates the LastSuccessfulOutboundQuery time of the peer
// UpdateLastSuccessfulOutboundQuery updates the LastSuccessfulOutboundQueryAt time of the peer.
// Returns true if the update was successful, false otherwise.
func (rt *RoutingTable) UpdateLastSuccessfulOutboundQuery(p peer.ID, t time.Time) bool {
func (rt *RoutingTable) UpdateLastSuccessfulOutboundQueryAt(p peer.ID, t time.Time) bool {
rt.tabLock.Lock()
defer rt.tabLock.Unlock()

bucketID := rt.bucketIdForPeer(p)
bucket := rt.buckets[bucketID]

if pc := bucket.getPeer(p); pc != nil {
pc.LastSuccessfulOutboundQuery = t
pc.LastSuccessfulOutboundQueryAt = t
return true
}
return false
}

// UpdateLastUsefulAt updates the LastUsefulAt time of the peer.
// Returns true if the update was successful, false otherwise.
func (rt *RoutingTable) UpdateLastUsefulAt(p peer.ID, t time.Time) bool {
rt.tabLock.Lock()
defer rt.tabLock.Unlock()

bucketID := rt.bucketIdForPeer(p)
bucket := rt.buckets[bucketID]

if pc := bucket.getPeer(p); pc != nil {
pc.LastUsefulAt = t
return true
}
return false
Expand Down
53 changes: 39 additions & 14 deletions table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,14 @@ func TestPrint(t *testing.T) {
func TestBucket(t *testing.T) {
t.Parallel()
testTime1 := time.Now()
testTime2 := time.Now().AddDate(1, 0, 0)

b := newBucket()

peers := make([]peer.ID, 100)
for i := 0; i < 100; i++ {
peers[i] = test.RandPeerIDFatal(t)
b.pushFront(&PeerInfo{peers[i], testTime1, ConvertPeerID(peers[i])})
b.pushFront(&PeerInfo{peers[i], testTime1, testTime2, ConvertPeerID(peers[i])})
}

local := test.RandPeerIDFatal(t)
Expand All @@ -47,14 +48,17 @@ func TestBucket(t *testing.T) {
require.NotNil(t, p)
require.Equal(t, peers[i], p.Id)
require.Equal(t, ConvertPeerID(peers[i]), p.dhtId)
require.EqualValues(t, testTime1, p.LastSuccessfulOutboundQuery)
require.EqualValues(t, testTime1, p.LastUsefulAt)
require.EqualValues(t, testTime2, p.LastSuccessfulOutboundQueryAt)

// mark as missing
t2 := time.Now().Add(1 * time.Hour)
p.LastSuccessfulOutboundQuery = t2
t3 := t2.Add(1 * time.Hour)
p.LastSuccessfulOutboundQueryAt = t2
p.LastUsefulAt = t3
p = b.getPeer(peers[i])
require.NotNil(t, p)
require.EqualValues(t, t2, p.LastSuccessfulOutboundQuery)
require.EqualValues(t, t2, p.LastSuccessfulOutboundQueryAt)
require.EqualValues(t, t3, p.LastUsefulAt)

spl := b.split(0, ConvertPeerID(local))
llist := b.list
Expand Down Expand Up @@ -201,7 +205,7 @@ func TestTableFind(t *testing.T) {
}
}

func TestUpdateLastSuccessfulOutboundQuery(t *testing.T) {
func TestUpdateLastSuccessfulOutboundQueryAt(t *testing.T) {
local := test.RandPeerIDFatal(t)
m := pstore.NewMetrics()
rt, err := NewRoutingTable(10, ConvertPeerID(local), time.Hour, m, NoOpThreshold)
Expand All @@ -214,11 +218,32 @@ func TestUpdateLastSuccessfulOutboundQuery(t *testing.T) {

// increment and assert
t2 := time.Now().Add(1 * time.Hour)
rt.UpdateLastSuccessfulOutboundQuery(p, t2)
rt.UpdateLastSuccessfulOutboundQueryAt(p, t2)
rt.tabLock.Lock()
pi := rt.buckets[0].getPeer(p)
require.NotNil(t, pi)
require.EqualValues(t, t2, pi.LastSuccessfulOutboundQuery)
require.EqualValues(t, t2, pi.LastSuccessfulOutboundQueryAt)
rt.tabLock.Unlock()
}

func TestUpdateLastUsefulAt(t *testing.T) {
local := test.RandPeerIDFatal(t)
m := pstore.NewMetrics()
rt, err := NewRoutingTable(10, ConvertPeerID(local), time.Hour, m, NoOpThreshold)
require.NoError(t, err)

p := test.RandPeerIDFatal(t)
b, err := rt.TryAddPeer(p, true)
require.True(t, b)
require.NoError(t, err)

// increment and assert
t2 := time.Now().Add(1 * time.Hour)
rt.UpdateLastUsefulAt(p, t2)
rt.tabLock.Lock()
pi := rt.buckets[0].getPeer(p)
require.NotNil(t, pi)
require.EqualValues(t, t2, pi.LastUsefulAt)
rt.tabLock.Unlock()
}

Expand Down Expand Up @@ -257,9 +282,9 @@ func TestTryAddPeer(t *testing.T) {
require.True(t, b)
require.Equal(t, p4, rt.Find(p4))

// adding a peer with cpl 0 works if an existing peer has LastSuccessfulOutboundQuery above the max threshold
// adding a peer with cpl 0 works if an existing peer has LastUsefulAt above the max threshold
// because that existing peer will get replaced
require.True(t, rt.UpdateLastSuccessfulOutboundQuery(p2, time.Now().AddDate(0, 0, -2)))
require.True(t, rt.UpdateLastUsefulAt(p2, time.Now().AddDate(0, 0, -2)))
b, err = rt.TryAddPeer(p3, true)
require.NoError(t, err)
require.True(t, b)
Expand All @@ -271,7 +296,7 @@ func TestTryAddPeer(t *testing.T) {
// however adding peer fails if below threshold
p5, err := rt.GenRandPeerID(0)
require.NoError(t, err)
require.True(t, rt.UpdateLastSuccessfulOutboundQuery(p1, time.Now()))
require.True(t, rt.UpdateLastUsefulAt(p1, time.Now()))
b, err = rt.TryAddPeer(p5, true)
require.Error(t, err)
require.False(t, b)
Expand All @@ -285,7 +310,7 @@ func TestTryAddPeer(t *testing.T) {
rt.tabLock.Lock()
pi := rt.buckets[rt.bucketIdForPeer(p6)].getPeer(p6)
require.NotNil(t, p6)
require.True(t, pi.LastSuccessfulOutboundQuery.IsZero())
require.True(t, pi.LastUsefulAt.IsZero())
rt.tabLock.Unlock()

}
Expand Down Expand Up @@ -425,9 +450,9 @@ func TestGetPeerInfos(t *testing.T) {
}

require.Equal(t, p1, ms[p1].Id)
require.True(t, ms[p1].LastSuccessfulOutboundQuery.IsZero())
require.True(t, ms[p1].LastUsefulAt.IsZero())
require.Equal(t, p2, ms[p2].Id)
require.False(t, ms[p2].LastSuccessfulOutboundQuery.IsZero())
require.False(t, ms[p2].LastUsefulAt.IsZero())
}

func BenchmarkAddPeer(b *testing.B) {
Expand Down

0 comments on commit 6f708c1

Please sign in to comment.