Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Asynchronous lookups #498

Merged
merged 19 commits into from
Mar 24, 2020
Merged

Asynchronous lookups #498

merged 19 commits into from
Mar 24, 2020

Conversation

aschmahmann
Copy link
Contributor

The reworked query logic follows Kademlia, but does so in a very non-optimal way since it queries peers in sequential chunks (i.e. alpha queries at a time). This PR makes this query process asynchronous (i.e. at most alpha outstanding queries at a time).

Copy link
Contributor Author

@aschmahmann aschmahmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@petar Maymounkov thanks, the changes you made look pretty good. However, I think that now that we're not using mutexes it's a lot simpler to scrap all of the NumX, IsX, and MarkX functions and replace them with len(GetX), GetState and SetState.

Comment on lines 11 to 23
type PeerState int

const (
PeerSeen PeerState = iota
PeerWaiting
PeerQueried
PeerUnreachable
)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are this type and these consts exported? The only way they're utilized is by calling IsX(peer.ID) and MarkX(peer.ID), we should probably either make these unexported or replace all the IsX/MarkX functions with GetState(peer.ID) PeerState and SetState(peer.ID, PeerState).

The Get and Set functions to me seem easier to manage than a whole set of functions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. Done.

qpeerset/qpeerset.go Outdated Show resolved Hide resolved
query.go Show resolved Hide resolved
query.go Outdated Show resolved Hide resolved
query.go Outdated Show resolved Hide resolved
qpeerset/qpeerset.go Show resolved Hide resolved
qpeerset/qpeerset.go Outdated Show resolved Hide resolved
query.go Outdated
// closest nodes it has heard about.
if len(peersToQuery) == 0 {
select {
case sawPeers := <-ch:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this isn't just seen peers, maybe rename this variable and the populatePeers function to something indicating that we're updating the states of the peers based on query responses. Perhaps something like response and q.updatePeerState().

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

query.go Outdated
queryDone <- struct{}{}
}()
}

loop:
// wait for all the "d" disjoint queries to complete before we return
// XXX: waiting until all queries are done is a security bug!!!
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leaving this comment in the code without context will probably be confusing. Perhaps put in a couple line explanation like:

While running disjoint path queries may protect users from malicious peers they are directly connected to (assuming the peer is not eclipsed) it still allows the malicious peers to cause you to make many queries since we wait until all paths have completed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

query.go Outdated Show resolved Hide resolved
return
}

func (qp *QueryPeerset) GetClosestNotUnreachable(k int) (result []peer.ID) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should probably be b or beta instead of k since this isn't strictly about the bucket size (and we're passing in beta = 3 by default here)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

qpeerset/qpeerset.go Outdated Show resolved Hide resolved
query.go Show resolved Hide resolved
@aschmahmann aschmahmann force-pushed the feat/async-query branch 2 times, most recently from fe4f7b0 to 36261f0 Compare March 20, 2020 09:12
Copy link
Contributor

@aarshkshah1992 aarshkshah1992 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is poetry.

qpeerset/qpeerset.go Show resolved Hide resolved
query.go Show resolved Hide resolved
kpeerset/sorted_peerset.go Outdated Show resolved Hide resolved
qpeerset/qpeerset.go Show resolved Hide resolved
qpeerset/qpeerset.go Show resolved Hide resolved
qpeerset/qpeerset.go Outdated Show resolved Hide resolved
dht_options.go Show resolved Hide resolved
qpeerset/qpeerset.go Show resolved Hide resolved
query.go Outdated
continue
}
q.queryPeers.TryAdd(p)
q.queryPeers.MarkUnreachable(p)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've commented this elsewhere too:

We can simply remove the peer from queryPeers here and not maintain this state at all.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two reasons:

  • (Main) The QueryPeerset data structure does no support fast removes.
  • Remembering the unreached peers allows me to add richer breakpoint conditions in code, while debugging.

routing.go Outdated
@@ -412,7 +411,7 @@ func (dht *IpfsDHT) getValues(ctx context.Context, key string, stopQuery chan st
func (dht *IpfsDHT) refreshRTIfNoShortcut(key kb.ID, queries []*query) {
shortcutTaken := false
for _, q := range queries {
if q.localPeers.LenUnqueriedFromKClosest() > 0 {
if len(q.queryPeers.GetClosestNotUnreachable(3)) > 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aschmahmann

  1. It should be Beta, not 3.
  2. Wont this always be true ? If we terminate because of isLookupTermination(), it will be because this was greater than zero.

query.go Show resolved Hide resolved
dht_test.go Outdated
Comment on lines 1406 to 1408
for _, d := range dhts {
d.RefreshRoutingTable()
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Stebalien this is sort of interesting in that it shows that our new logic works on valid routing tables and not invalid/random tables.

Copy link
Contributor

@petar petar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All comments addressed.

qpeerset/qpeerset.go Show resolved Hide resolved
qpeerset/qpeerset.go Outdated Show resolved Hide resolved
qpeerset/qpeerset.go Show resolved Hide resolved
qpeerset/qpeerset.go Show resolved Hide resolved
Comment on lines 11 to 23
type PeerState int

const (
PeerSeen PeerState = iota
PeerWaiting
PeerQueried
PeerUnreachable
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. Done.

query.go Show resolved Hide resolved
query.go Show resolved Hide resolved
query.go Show resolved Hide resolved
query.go Outdated Show resolved Hide resolved
query.go Outdated
continue
}
q.queryPeers.TryAdd(p)
q.queryPeers.MarkUnreachable(p)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two reasons:

  • (Main) The QueryPeerset data structure does no support fast removes.
  • Remembering the unreached peers allows me to add richer breakpoint conditions in code, while debugging.

@aschmahmann aschmahmann force-pushed the feat/async-query branch 3 times, most recently from 4f2396a to 0a01437 Compare March 23, 2020 05:03
Copy link
Contributor Author

@aschmahmann aschmahmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made a few updates. Summary:

  1. The query state for a peer used to be able to regress, now it can't
  2. We return a lookup result struct instead of the query structs themselves when completing a lookup operation.
  3. We ensure that the top k peers have been queried before a lookup returns since our higher level logic requires that invariant even though our query logic now only requires the top beta peers to have been queried (the other k-beta can just be peers we've heard about but have yet to connect to).

Comment on lines +1470 to +1471
if len(out) < querier.beta {
t.Fatalf("got wrong number of peers (got %d, expected at least %d)", len(out), querier.beta)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is pretty strange but indicative of the changes we've been making. The test seems to setup a ring of peers and then calls GetClosestPeers. In the past we were much more thorough in our queries and effectively ignored most of the benefits that can be assumed in a Kademlia network with fairly accurate routing tables. This meant we could have a ring of 30 peers and get 20 of them no problem.

Now, we more strongly rely on the routing tables being good and so in this ring setup it seems like we're only guaranteed to return > beta peers instead of k.

Thoughts?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've basically broken the contract of GetClosestPeers as I've commented elsewhere. There are a couple of things we can/should do to address these which have been mentioned in the comment.

query.go Show resolved Hide resolved
query.go Outdated Show resolved Hide resolved
query.go Outdated Show resolved Hide resolved
query.go Show resolved Hide resolved
routing.go Show resolved Hide resolved
routing.go Outdated Show resolved Hide resolved
@@ -399,26 +393,18 @@ func (dht *IpfsDHT) getValues(ctx context.Context, key string, stopQuery chan st
if err != nil {
return
}
queriesCh <- queries
lookupResCh <- lookupRes

if ctx.Err() == nil {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect we could bundle this check into the lookup.completed variable, but haven't gotten around to it yet.

@@ -629,13 +615,13 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key multihash

return peers, nil
},
func(peerset *kpeerset.SortedPeerset) bool {
func() bool {
return ps.Size() >= count
},
)

if err != nil && ctx.Err() == nil {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect we could bundle this check into the lookup.completed variable, but haven't gotten around to it yet.

out <- p
}

if ctx.Err() == nil {
if ctx.Err() == nil && lookupRes.completed {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect we could bundle this check into the lookup.completed variable, but haven't gotten around to it yet.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If ctx.Err != nil, we set lookupRes.completed to false. So, if lookupRes.completed is true, we know the context has not errored.

Hence, we can remove that check. We can just call refreshRTIfNoShortcut here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm going to wait to coalesce the context errors until after disjoint paths are removed. I think it'll make ensuring correctness a lot easier.

Copy link
Contributor

@aarshkshah1992 aarshkshah1992 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will push the tests once the PR is good to go.

Let me know if I've missed breaking changes in any of the API's. I think I've covered them all.

query.go Show resolved Hide resolved
if p == q.dht.self { // don't add self.
continue
}
q.queryPeers.TryAdd(p)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aschmahmann

Just because you've heard about a peer again does not mean it's available. You could easily have the same unreachable peer in multiple RT's and so I'm not sure how helpful this would be.

If you choose to go ahead with it anyways, we should atleast wait for signed peer records as this opens up another attack vector otherwise.


return &queryResult{
foundCloserPeer: foundCloserPeer,
func (q *query) updateState(up *queryUpdate) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aschmahmann

We need a TryAdd call ONLY for up.seen. If the peer is in the queried or unreachable collection but NOT in the queryPeers set, something has gone wrong.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I left them in pending comments from @petar since he was the one who advocated for them being left in defensively. I'm inclined to agree that we should be more closely controlling state transitions and logging errors though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree here with both of you. We can remove TryAdd from queried and unreachable.

The comment I made that I think Adin is referring to was (in my mind) about the "if p == q.dht.self " checks.

query.go Outdated Show resolved Hide resolved
query.go Show resolved Hide resolved
routing.go Outdated Show resolved Hide resolved
@@ -669,7 +655,7 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (_ peer.AddrInfo,
return pi, nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aschmahmann We've changed the contract of FindPeer. In Balsa(master), we don't mandate a peer to be connected for this API to return the address.

Now, we mandate connectivity. IMO, the API should not enforce this AND connectivity will anyways matter less once we have signed peer records.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall I agree that the API should not guarantee connectivity.

Are you referring to the return statement or to the stop function? For the return statement we could be a little more clever and separately track if any peer addresses are discovered for the peer during the target query, but this falls apart if we separately find the peer via another service while FindPeer is going on.

However, what's the abort condition? If the abort condition is just hearing about the peer then we could end up not being able to connect to the peer because we got a single stale address for them and then gave up.

Once we have signed peer records in place FindPeer should no longer require connectedness (although we could have it as an secondary abort condition). See libp2p/go-libp2p#784 for a little more background on FindPeer thoughts.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aschmahmann

1. The return statement:

// TODO: Consider unlucky disconnect timing and potentially utilizing network.CanConnect or something similar
	if dht.host.Network().Connectedness(id) == network.Connected {
		return dht.peerstore.PeerInfo(id), nil
	}

	return peer.AddrInfo{}, routing.ErrNotFound

If we have a address for that peer in the peerstore, irrespective of whether we have a connection or not, we should return it. Even if we separately find the peer via another service while FindPeer is going on(low probability), there is no harm in doing this. I feel like mandating connectivity is too restrictive here.

2. Abort condition

If the abort condition is just hearing about the peer then we could end up not being able to connect to the peer because we got a single stale address for them and then gave up.

Sure. Signed peer records will solve this. Are you saying you want to wait till it goes live before you remove this? If yes, that's fine by me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aarshkshah1992 I modified this a bit lmk if it seems reasonable to you.

  1. I ideally want to return routing.ErrNotFound whenever the DHT didn't cause you to find the records. Most people are probably doing FindPeer and then host.Connect, or peerstore.PeerInfo anyhow instead of actually using the records returned here. This is somewhat impractical currently so instead I've thrown in a few conditions that indicate you may have received valid peerinfo (we can only verify via connecting until signed peer records lands) from the DHT query. lmk if you have issues/concerns with my proposal.

  2. I think we'll have to discuss this once the signed peer records land. I suspect what we'll end up with is similar to the proposals in FindPeer Design Review go-libp2p#784. Have a FindPeerAsync function that returns the records as we get them and perhaps abort early if we're connected since then we know the peerstore definitely has good addresses.

@@ -76,7 +75,7 @@ func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) (<-chan pee
e := logger.EventBegin(ctx, "getClosestPeers", loggableKey(key))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aschmahmann

Balsa: Return the K(bucket size) closest peers we KNOW are alive.
Cypress: Returns the K closest peers, but we KNOW ONLY atmost ANY Beta(default=3, no ordering) are alive.

We should either:

  1. Document this OR
  2. Make Beta=K OR
  3. Allow users to pass in the number of closest peers they want alive.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This also applies by extension to Provide & PutValue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that this is tricky, especially for Provide and PutValue since they imply replication semantics. I'm less concerned about GetClosestPeers since it has no inherent semantics and could just get a documentation change (also given it's lack of semantics I doubt it's in real use see here)

WDYT @petar?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding of the decision here was that even though the internals have changed the guarantees are similar and the nuances are both not something the consumer of the DHT should care about and are a bit much to just throw in as a documentation comment. If we want we can add comments to GetClosestPeers since it's really an implementation specific function (i.e. the contract is, and probably should be, tightly coupled to the implementation).

We are planning on rewriting the DHT spec once the new protocol version is released (or at least in the RC phase) and IMO detailing the specifics there or in a sister document would be excellent.

query.go Outdated Show resolved Hide resolved
Comment on lines +1470 to +1471
if len(out) < querier.beta {
t.Fatalf("got wrong number of peers (got %d, expected at least %d)", len(out), querier.beta)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've basically broken the contract of GetClosestPeers as I've commented elsewhere. There are a couple of things we can/should do to address these which have been mentioned in the comment.

query.go Outdated Show resolved Hide resolved
query.go Outdated
func (dht *IpfsDHT) runDisjointQueries(ctx context.Context, d int, target string, queryFn queryFn, stopFn stopFn) ([]*query, error) {
queryCtx, cancelQuery := context.WithCancel(ctx)
type lookupResult struct {
peers []peer.ID
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a comment here what "peers" are those.

type lookupResult struct {
peers []peer.ID
state []qpeerset.PeerState
completed bool
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would add a comment here too, since the meaning of completed is subtle.

query.go Outdated
numQueriesComplete := 0
queryDone := make(chan struct{}, d)
// runLookup executes the lookup on the target using the given query function and stopping when either the context is
// cancelled or the stop function returns true (if the stop function is not sticky it is not guaranteed to cause a stop
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of saying "if the stop function is not sticky ...", I would say "the stop function MUST be sticky".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

left this mostly as it is, the connectedness check we do during FindPeer is not sticky but we're mostly ok with that and I'm hoping to upgrade that logic a bit once we have signed peer records.

query.go Outdated Show resolved Hide resolved
query.go Outdated Show resolved Hide resolved
query.go Outdated Show resolved Hide resolved
query.go Show resolved Hide resolved
routing.go Show resolved Hide resolved
routing.go Outdated Show resolved Hide resolved
aarshkshah1992 and others added 6 commits March 24, 2020 01:05
…strap logic now uses GetClosestPeers instead of FindPeer. FindPeer can return addresses even if not Connected as long as it was either recently connected (CanConnect) or was discovered during the lookup.
Copy link
Contributor

@aarshkshah1992 aarshkshah1992 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Beautiful.

@aschmahmann aschmahmann marked this pull request as ready for review March 24, 2020 14:57
Comment on lines +692 to 707
dialedPeerDuringQuery := false
for i, p := range lookupRes.peers {
if p == id {
// Note: we consider PeerUnreachable to be a valid state because the peer may not support the DHT protocol
// and therefore the peer would fail the query. The fact that a peer that is returned can be a non-DHT
// server peer and is not identified as such is a bug.
dialedPeerDuringQuery = lookupRes.state[i] != qpeerset.PeerHeard
break
}
}

// TODO: Consider unlucky disconnect timing and potentially utilizing network.CanConnect or something similar
if dht.host.Network().Connectedness(id) == network.Connected {
// Return peer information if we tried to dial the peer during the query or we are (or recently were) connected
// to the peer.
connectedness := dht.host.Network().Connectedness(id)
if dialedPeerDuringQuery || connectedness == network.Connected || connectedness == network.CanConnect {
return dht.peerstore.PeerInfo(id), nil
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aarshkshah1992 just a heads up that I changed this around, going to do the squash (I'd like to make our lives easier when we merge cypress into master so it's not like 1000 commits). If we have any issues we'll just do a follow up

@aschmahmann aschmahmann changed the title Asynchronous queries Asynchronous lookups Mar 24, 2020
@aschmahmann aschmahmann merged commit fcbc5f9 into cypress Mar 24, 2020
@Stebalien Stebalien mentioned this pull request Mar 24, 2020
Stebalien pushed a commit that referenced this pull request Apr 3, 2020
* feat(query): fully async implementation of Kademlia lookup. peers returned from the lookup are not guaranteed to be alive (i.e. we're only guaranteed to have dialed the closest beta peers to the target), but given stable and correct routing tables the expectation that most of the peers returned are alive is high.
* feat(query): add wrapper lookup followup function to followup after the lookup is completed and ensure that the closest k returned peers from a lookup have been queried even for beta < k
* refactor(query) modified the structure returned from lookups to be a useful subset of the full query state instead of the entire query state
* feat(options): beta parameter exposed as the Resiliency parameter
* feat(routing): do not mark the routing table as updated after a FindPeer query
* feat(routing): FindPeer can return addresses even if not Connected as long as it was either recently connected (CanConnect) or was discovered during the lookup
* feat(bootstrap): bootstrap logic now uses GetClosestPeers instead of FindPeer
* refactor(dht): stopFn no longer takes any state
* fix(test): changed GetClosestPeers test to only assume beta instead of k peers since that is now more appropriate given the query logic changes and that the routing tables in that test are bad, i.e. a ring network with arbitrary peerIDs

Co-authored-by: Petar Maymounkov <petarm@gmail.com>
Co-authored-by: Aarsh Shah <aarshkshah1992@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants