Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix bug in periodic peer pinging #579

Merged
merged 5 commits into from
Apr 9, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
32 changes: 24 additions & 8 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,10 @@ type IpfsDHT struct {
// networks).
enableProviders, enableValues bool

// maxLastSuccessfulOutboundThreshold is the max threshold/upper limit for the value of "lastSuccessfulOutboundQuery"
// of the peer in the bucket above which we will evict it to make place for a new peer if the bucket
// is full
maxLastSuccessfulOutboundThreshold time.Duration
// successfulOutboundQueryGracePeriod is the maximum grace period we will give to a peer
// to between two successful query responses from it, failing which,
// we will ping it to see if it's alive.
successfulOutboundQueryGracePeriod float64

fixLowPeersChan chan struct{}
}
Expand Down Expand Up @@ -294,6 +294,7 @@ func makeRoutingTable(dht *IpfsDHT, cfg config) (*kb.RoutingTable, error) {
self := kb.ConvertPeerID(dht.host.ID())

rt, err := kb.NewRoutingTable(cfg.bucketSize, self, time.Minute, dht.host.Peerstore(), maxLastSuccessfulOutboundThreshold)
dht.successfulOutboundQueryGracePeriod = maxLastSuccessfulOutboundThreshold
cmgr := dht.host.ConnManager()

rt.PeerAdded = func(p peer.ID) {
Expand Down Expand Up @@ -448,22 +449,37 @@ func (dht *IpfsDHT) putLocal(key string, rec *recpb.Record) error {

// peerFound signals the routingTable that we've found a peer that
// might support the DHT protocol.
// If we have a connection a peer but no exchange of a query RPC ->
// LastQueriedAt=time.Now (so we don't ping it for some time for a liveliness check)
// LastUsefulAt=N/A
// If we connect to a peer and exchange a query RPC ->
// LastQueriedAt=time.Now (same reason as above)
// LastUsefulAt=time.Now (so we give it some life in the RT without immediately evicting it)
// If we query a peer we already have in our Routing Table ->
// LastQueriedAt=time.Now()
// LastUsefulAt remains unchanged
// If we connect to a peer we already have in the RT but do not exchange a query (rare)
// Do Nothing.
func (dht *IpfsDHT) peerFound(ctx context.Context, p peer.ID, queryPeer bool) {
logger.Debugw("peer found", "peer", p)
b, err := dht.validRTPeer(p)
if err != nil {
logger.Errorw("failed to validate if peer is a DHT peer", "peer", p, "error", err)
} else if b {
_, err := dht.routingTable.TryAddPeer(p, queryPeer)
newlyAdded, err := dht.routingTable.TryAddPeer(p, queryPeer)
if err != nil {
// peer not added.
return
}

// If we discovered the peer because of a query, we need to ensure we override the "zero" lastSuccessfulOutboundQuery
// If we freshly added the peer because of a query, we need to ensure we override the "zero" lastUsefulAt
// value that must have been set in the Routing Table for this peer when it was first added during a connection.
if queryPeer {
dht.routingTable.UpdateLastSuccessfulOutboundQuery(p, time.Now())
if newlyAdded && queryPeer {
dht.routingTable.UpdateLastUsefulAt(p, time.Now())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is a new peer, don't set this to now in addPeer regardless?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aarshkshah1992 not blocking a merge on this as it shouldn't really matter. We can fix this later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Stebalien The idea is that if we discover and add a peer because of an inbound/outbound query, we do NOT want to evict the peer for one ''cycle".

Note that the time will NOT be "now" for peers we connect to without querying them .

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah! Thanks!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Talking this over with @aschmahmann, I think I was right. In addPeer (in the routing table), if queryPeer is true, we'll set "last useful" to now when the peer wasn't already in our routing table. Then, when the peer wasn't already in our routing table, we'll mark them as useful again.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Stebalien I see what you mean. I am removing this particular check from here and adding this commit so when we first query a peer after connecting to it(that's two calls to addPeer, once on connection and once upon a successful query), we give it one and ONLY one usefulness bump.

} else if queryPeer {
// the peer is already in our RT, but we just successfully queried it and so let's give it a
// bump on the query time so we don't ping it too soon for a liveliness check.
dht.routingTable.UpdateLastSuccessfulOutboundQueryAt(p, time.Now())
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion dht_bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (dht *IpfsDHT) startRefreshing() {
// ping Routing Table peers that haven't been hear of/from in the interval they should have been.
for _, ps := range dht.routingTable.GetPeerInfos() {
// ping the peer if it's due for a ping and evict it if the ping fails
if time.Since(ps.LastSuccessfulOutboundQuery) > dht.maxLastSuccessfulOutboundThreshold {
if float64(time.Since(ps.LastSuccessfulOutboundQueryAt)) > dht.successfulOutboundQueryGracePeriod {
livelinessCtx, cancel := context.WithTimeout(ctx, peerPingTimeout)
if err := dht.host.Connect(livelinessCtx, peer.AddrInfo{ID: ps.Id}); err != nil {
logger.Debugw("evicting peer after failed ping", "peer", ps.Id, "error", err)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ require (
github.com/libp2p/go-eventbus v0.1.0
github.com/libp2p/go-libp2p v0.7.4
github.com/libp2p/go-libp2p-core v0.5.1
github.com/libp2p/go-libp2p-kbucket v0.3.3
github.com/libp2p/go-libp2p-kbucket v0.4.0
github.com/libp2p/go-libp2p-peerstore v0.2.2
github.com/libp2p/go-libp2p-record v0.1.2
github.com/libp2p/go-libp2p-swarm v0.2.3
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -230,8 +230,8 @@ github.com/libp2p/go-libp2p-discovery v0.2.0 h1:1p3YSOq7VsgaL+xVHPi8XAmtGyas6D2J
github.com/libp2p/go-libp2p-discovery v0.2.0/go.mod h1:s4VGaxYMbw4+4+tsoQTqh7wfxg97AEdo4GYBt6BadWg=
github.com/libp2p/go-libp2p-discovery v0.3.0 h1:+JnYBRLzZQtRq0mK3xhyjBwHytLmJXMTZkQfbw+UrGA=
github.com/libp2p/go-libp2p-discovery v0.3.0/go.mod h1:o03drFnz9BVAZdzC/QUQ+NeQOu38Fu7LJGEOK2gQltw=
github.com/libp2p/go-libp2p-kbucket v0.3.3 h1:V2Zwv6QnCK6Who0iiJW2eUKwdlTYGJ2HnLViaolDOcs=
github.com/libp2p/go-libp2p-kbucket v0.3.3/go.mod h1:IWFdYRBOYzaLEHnvrfzEkr+UcuveCXIoeO8QeFZSI6A=
github.com/libp2p/go-libp2p-kbucket v0.4.0 h1:kBzW3HCXA1g48XAEjnFf+sQicCZ3OhvZPvi4LZlwzY4=
github.com/libp2p/go-libp2p-kbucket v0.4.0/go.mod h1:sDQeCkD6yf/Yq8O+HPTNLyGa0TsJi+CURa6ELp1ppkk=
github.com/libp2p/go-libp2p-loggables v0.1.0 h1:h3w8QFfCt2UJl/0/NW4K829HX/0S4KD31PQ7m8UXXO8=
github.com/libp2p/go-libp2p-loggables v0.1.0/go.mod h1:EyumB2Y6PrYjr55Q3/tiJ/o3xoDasoRYM7nOzEpoa90=
github.com/libp2p/go-libp2p-mplex v0.2.0/go.mod h1:Ejl9IyjvXJ0T9iqUTE1jpYATQ9NM3g+OtR+EMMODbKo=
Expand Down
2 changes: 1 addition & 1 deletion query.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func (dht *IpfsDHT) runQuery(ctx context.Context, target string, queryFn queryFn
}

func (q *query) recordPeerIsValuable(p peer.ID) {
q.dht.routingTable.UpdateLastSuccessfulOutboundQuery(p, time.Now())
q.dht.routingTable.UpdateLastUsefulAt(p, time.Now())
}

func (q *query) recordValuablePeers() {
Expand Down