Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: protect useful peers in low buckets #634

Merged
merged 3 commits into from May 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
48 changes: 37 additions & 11 deletions dht.go
Expand Up @@ -42,9 +42,23 @@ var (
baseLogger = logger.Desugar()
)

// BaseConnMgrScore is the base of the score set on the connection manager "kbucket" tag.
// It is added with the common prefix length between two peer IDs.
const BaseConnMgrScore = 5
const (
// BaseConnMgrScore is the base of the score set on the connection
// manager "kbucket" tag. It is added with the common prefix length
// between two peer IDs.
baseConnMgrScore = 5

// UsefulConnMgrScore is given to peers that are among the first peers
// to respond to a query.
//
// This score is given to peers the first time they're useful and lasts
// until we disconnect from the peer.
usefulConnMgrScore = 20

// UsefulConnMgrProtectedBuckets is the number of buckets where useful
// peers are _protected_, instead of just given the useful score.
usefulConnMgrProtectedBuckets = 2
)

type mode int

Expand All @@ -58,11 +72,17 @@ const (
kad2 protocol.ID = "/kad/2.0.0"
)

const (
dhtUsefulTag = "dht-useful"
kbucketTag = "kbucket"
)

// IpfsDHT is an implementation of Kademlia with S/Kademlia modifications.
// It is used to implement the base Routing module.
type IpfsDHT struct {
host host.Host // the network services we need
self peer.ID // Local peer (yourself)
host host.Host // the network services we need
self peer.ID // Local peer (yourself)
selfKey kb.ID
peerstore peerstore.Peerstore // Peer Registry

datastore ds.Datastore // Local data
Expand Down Expand Up @@ -250,6 +270,7 @@ func makeDHT(ctx context.Context, h host.Host, cfg config) (*IpfsDHT, error) {
dht := &IpfsDHT{
datastore: cfg.datastore,
self: h.ID(),
selfKey: kb.ConvertPeerID(h.ID()),
peerstore: h.Peerstore(),
host: h,
strmap: make(map[peer.ID]*messageSender),
Expand Down Expand Up @@ -336,17 +357,22 @@ func makeRtRefreshManager(dht *IpfsDHT, cfg config, maxLastSuccessfulOutboundThr
}

func makeRoutingTable(dht *IpfsDHT, cfg config, maxLastSuccessfulOutboundThreshold time.Duration) (*kb.RoutingTable, error) {
self := kb.ConvertPeerID(dht.host.ID())

rt, err := kb.NewRoutingTable(cfg.bucketSize, self, time.Minute, dht.host.Peerstore(), maxLastSuccessfulOutboundThreshold)
rt, err := kb.NewRoutingTable(cfg.bucketSize, dht.selfKey, time.Minute, dht.host.Peerstore(), maxLastSuccessfulOutboundThreshold)
cmgr := dht.host.ConnManager()

rt.PeerAdded = func(p peer.ID) {
commonPrefixLen := kb.CommonPrefixLen(self, kb.ConvertPeerID(p))
cmgr.TagPeer(p, "kbucket", BaseConnMgrScore+commonPrefixLen)
// We tag our closest peers with higher and higher scores so we
// stay connected to our nearest neighbors.
//
// We _also_ (elsewhere) protect useful peers in the furthest
// buckets (our "core" routing nodes) and give high scores to
// all other useful peers.
commonPrefixLen := kb.CommonPrefixLen(dht.selfKey, kb.ConvertPeerID(p))
cmgr.TagPeer(p, kbucketTag, baseConnMgrScore+commonPrefixLen)
}
rt.PeerRemoved = func(p peer.ID) {
cmgr.UntagPeer(p, "kbucket")
cmgr.Unprotect(p, dhtUsefulTag)
cmgr.UntagPeer(p, kbucketTag)

// try to fix the RT
dht.fixRTIfNeeded()
Expand Down
19 changes: 18 additions & 1 deletion query.go
Expand Up @@ -174,7 +174,24 @@ func (dht *IpfsDHT) runQuery(ctx context.Context, target string, queryFn queryFn
}

func (q *query) recordPeerIsValuable(p peer.ID) {
q.dht.routingTable.UpdateLastUsefulAt(p, time.Now())
if !q.dht.routingTable.UpdateLastUsefulAt(p, time.Now()) {
// not in routing table
return
}

// Protect useful peers, when they're actually useful. This will last
// through disconnects. However, we'll still evict them if they keep
// disconnecting from us.
//
// Restrict to buckets 0, 1 (75% of requests, max 40 peers), so we don't
// protect _too_ many peers.
commonPrefixLen := kb.CommonPrefixLen(q.dht.selfKey, kb.ConvertPeerID(p))
cmgr := q.dht.host.ConnManager()
if commonPrefixLen < usefulConnMgrProtectedBuckets {
cmgr.Protect(p, dhtUsefulTag)
} else {
cmgr.TagPeer(p, dhtUsefulTag, usefulConnMgrScore)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Stebalien The TagPeer call is idempotent and overwrites the existing score for that tag.
How about adding to the current score here so we "reward" peers who are useful to us often.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While we could probably do that if we utilize decaying tags like pubsub is planning on doing (libp2p/go-libp2p-pubsub#306). However, I don't think it's currently worth the complexity especially if we take into account the various ways the connection manager can be gamed to promote one peer over another.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't want to give them too much weight, I just want to make sure that they're marked useful.

}

func (q *query) recordValuablePeers() {
Expand Down