Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Signed address records in the DHT #516

Closed
wants to merge 40 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
c013668
upgrade deprecated query events
aschmahmann Dec 27, 2019
c4e94ce
first pass at proper kad
aschmahmann Jan 17, 2020
68b1169
fix Search/GetValue to be Kad compliant. Default quorum is now 0 whic…
aschmahmann Jan 21, 2020
5cabdf6
fix refreshing buckets during query. getting topK peers is more effic…
aschmahmann Jan 21, 2020
a9e21d2
Search/GetValues stops if using quorum
aschmahmann Jan 21, 2020
719a16f
error on queries to empty routing table
aschmahmann Jan 22, 2020
6469073
query switches from alpha mode to k mode if no peers closer than one …
aschmahmann Jan 30, 2020
c79bac3
SearchValue will only update k closest peers to have latest record
aschmahmann Jan 30, 2020
1984feb
added todo comment about FindPeer and peer connectedness
aschmahmann Jan 30, 2020
79648b8
fix: use dht.bucketSize instead of KValue for queries
aschmahmann Feb 2, 2020
0ef814a
test: switch tests to use dht.bucketSize instead of KValue for queries
aschmahmann Feb 2, 2020
f94a3f2
options: KValue and AlphaValue global variables no longer used intern…
aschmahmann Feb 2, 2020
6b25a6c
try another peer sorting metric. fix bug in when findproviders returns
aschmahmann Feb 4, 2020
b4f7fda
check for stopping the query in between dialing and querying
aschmahmann Feb 4, 2020
8ef9d77
cleanup unused code
aschmahmann Feb 19, 2020
5d313b1
Utilize identify events to add peers to the routing table (#472)
aschmahmann Mar 4, 2020
e505723
fix: improve context deadline handling
Stebalien Mar 5, 2020
376300a
fix: actually ignore deadline exceeded errors in Provide (#480)
Stebalien Mar 6, 2020
bf4986e
made deprecated util values have comments that satisfy golint
aschmahmann Mar 5, 2020
c24a52f
Enable switching DHT between client and server modes (#469)
aschmahmann Mar 6, 2020
d440bbf
feat: move options to main package and make internals private (#486)
Stebalien Mar 9, 2020
7ada018
Upgrade DHT version (#479)
aschmahmann Mar 10, 2020
fbb1b36
RT connectivity changes
aarshkshah1992 Mar 6, 2020
e42ee94
Cypress Disjoint Query & Query Termination code cleanup (#489)
aarshkshah1992 Mar 11, 2020
f4e0b0d
change tests
aarshkshah1992 Mar 16, 2020
45c403c
document test failure
aarshkshah1992 Mar 17, 2020
7c27400
Merge pull request #495 from libp2p/feat/fix-tests
aschmahmann Mar 17, 2020
d7b6b1b
change peer validation func
aarshkshah1992 Mar 19, 2020
a4a6e82
fix validation
aarshkshah1992 Mar 19, 2020
c8909e9
queries can take longer
aarshkshah1992 Mar 20, 2020
42addc4
fix(test): increase timeout for notification tests since it's slow on…
aschmahmann Mar 20, 2020
075afe8
fix: protect rng with a mutex
aschmahmann Mar 20, 2020
147dc12
cleanup: documentation and code cleanup for search/getvalue quorum de…
aschmahmann Mar 20, 2020
8eade90
feat: calling FindProvidersAsync with a count of zero now completes t…
aschmahmann Mar 20, 2020
e56c237
Merge pull request #497 from libp2p/feat/change-rt-validation
aarshkshah1992 Mar 24, 2020
fcbc5f9
Asynchronous lookups (#498)
aschmahmann Mar 24, 2020
99716d1
build(deps): bump go-libp2p and kbucket repos
aschmahmann Mar 20, 2020
e09d7fe
signed peer recs
aarshkshah1992 Mar 27, 2020
6d1dcd2
bump peerstore
aarshkshah1992 Mar 27, 2020
b5f8c63
remove replace
aarshkshah1992 Mar 27, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
368 changes: 262 additions & 106 deletions dht.go

Large diffs are not rendered by default.

116 changes: 86 additions & 30 deletions dht_bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
multierror "github.com/hashicorp/go-multierror"
process "github.com/jbenet/goprocess"
processctx "github.com/jbenet/goprocess/context"
"github.com/libp2p/go-libp2p-core/routing"
kbucket "github.com/libp2p/go-libp2p-kbucket"
"github.com/multiformats/go-multiaddr"
_ "github.com/multiformats/go-multiaddr-dns"
)
Expand All @@ -17,7 +17,7 @@ var DefaultBootstrapPeers []multiaddr.Multiaddr

// Minimum number of peers in the routing table. If we drop below this and we
// see a new peer, we trigger a bootstrap round.
var minRTRefreshThreshold = 4
var minRTRefreshThreshold = 10

func init() {
for _, s := range []string{
Expand All @@ -35,6 +35,50 @@ func init() {
}
}

// startSelfLookup starts a go-routine that listens for requests to trigger a self walk on a dedicated channel
// and then sends the error status back on the error channel sent along with the request.
// if multiple callers "simultaneously" ask for a self walk, it performs ONLY one self walk and sends the same error status to all of them.
func (dht *IpfsDHT) startSelfLookup() error {
dht.proc.Go(func(proc process.Process) {
ctx := processctx.OnClosingContext(proc)
for {
var waiting []chan<- error
select {
case res := <-dht.triggerSelfLookup:
if res != nil {
waiting = append(waiting, res)
}
case <-ctx.Done():
return
}

// batch multiple refresh requests if they're all waiting at the same time.
waiting = append(waiting, collectWaitingChannels(dht.triggerSelfLookup)...)

// Do a self walk
queryCtx, cancel := context.WithTimeout(ctx, dht.rtRefreshQueryTimeout)
_, err := dht.GetClosestPeers(queryCtx, string(dht.self))
if err == kbucket.ErrLookupFailure {
err = nil
} else if err != nil {
err = fmt.Errorf("failed to query self during routing table refresh: %s", err)
}
cancel()

// send back the error status
for _, w := range waiting {
w <- err
close(w)
}
if err != nil {
logger.Warning(err)
}
}
})

return nil
}

// Start the refresh worker.
func (dht *IpfsDHT) startRefreshing() error {
// scan the RT table periodically & do a random walk for cpl's that haven't been queried since the given period
Expand Down Expand Up @@ -65,17 +109,7 @@ func (dht *IpfsDHT) startRefreshing() error {
}

// Batch multiple refresh requests if they're all waiting at the same time.
collectWaiting:
for {
select {
case res := <-dht.triggerRtRefresh:
if res != nil {
waiting = append(waiting, res)
}
default:
break collectWaiting
}
}
waiting = append(waiting, collectWaitingChannels(dht.triggerRtRefresh)...)

err := dht.doRefresh(ctx)
for _, w := range waiting {
Expand All @@ -91,11 +125,41 @@ func (dht *IpfsDHT) startRefreshing() error {
return nil
}

func collectWaitingChannels(source chan chan<- error) []chan<- error {
var waiting []chan<- error
for {
select {
case res := <-source:
if res != nil {
waiting = append(waiting, res)
}
default:
return waiting
}
}
}

func (dht *IpfsDHT) doRefresh(ctx context.Context) error {
var merr error
if err := dht.selfWalk(ctx); err != nil {
merr = multierror.Append(merr, err)

// wait for the self walk result
selfWalkres := make(chan error, 1)

select {
case dht.triggerSelfLookup <- selfWalkres:
case <-ctx.Done():
return ctx.Err()
}

select {
case err := <-selfWalkres:
if err != nil {
merr = multierror.Append(merr, err)
}
case <-ctx.Done():
return ctx.Err()
}

if err := dht.refreshCpls(ctx); err != nil {
merr = multierror.Append(merr, err)
}
Expand Down Expand Up @@ -127,6 +191,12 @@ func (dht *IpfsDHT) refreshCpls(ctx context.Context) error {
if time.Since(tcpl.LastRefreshAt) <= dht.rtRefreshPeriod {
continue
}

// do not refresh if bucket is full
if dht.routingTable.IsBucketFull(tcpl.Cpl) {
continue
}

// gen rand peer with the cpl
randPeer, err := dht.routingTable.GenRandPeerID(tcpl.Cpl)
if err != nil {
Expand All @@ -136,10 +206,7 @@ func (dht *IpfsDHT) refreshCpls(ctx context.Context) error {

// walk to the generated peer
walkFnc := func(c context.Context) error {
_, err := dht.FindPeer(c, randPeer)
if err == routing.ErrNotFound {
return nil
}
_, err := dht.GetClosestPeers(c, string(randPeer))
return err
}

Expand All @@ -153,17 +220,6 @@ func (dht *IpfsDHT) refreshCpls(ctx context.Context) error {
return merr
}

// Traverse the DHT toward the self ID
func (dht *IpfsDHT) selfWalk(ctx context.Context) error {
queryCtx, cancel := context.WithTimeout(ctx, dht.rtRefreshQueryTimeout)
defer cancel()
_, err := dht.FindPeer(queryCtx, dht.self)
if err == routing.ErrNotFound {
return nil
}
return fmt.Errorf("failed to query self during routing table refresh: %s", err)
}

// Bootstrap tells the DHT to get into a bootstrapped state satisfying the
// IpfsRouter interface.
//
Expand Down
45 changes: 45 additions & 0 deletions dht_bootstrap_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package dht

import (
"context"
ma "github.com/multiformats/go-multiaddr"
"testing"
"time"

kb "github.com/libp2p/go-libp2p-kbucket"
"github.com/stretchr/testify/require"
)

func TestSelfWalkOnAddressChange(t *testing.T) {
ctx := context.Background()
// create three DHT instances with auto refresh disabled
d1 := setupDHT(ctx, t, false, DisableAutoRefresh())
d2 := setupDHT(ctx, t, false, DisableAutoRefresh())
d3 := setupDHT(ctx, t, false, DisableAutoRefresh())

var connectedTo *IpfsDHT
// connect d1 to whoever is "further"
if kb.CommonPrefixLen(kb.ConvertPeerID(d1.self), kb.ConvertPeerID(d2.self)) <=
kb.CommonPrefixLen(kb.ConvertPeerID(d1.self), kb.ConvertPeerID(d3.self)) {
connect(t, ctx, d1, d3)
connectedTo = d3
} else {
connect(t, ctx, d1, d2)
connectedTo = d2
}

// then connect d2 AND d3
connect(t, ctx, d2, d3)

// d1 should have ONLY 1 peer in it's RT
waitForWellFormedTables(t, []*IpfsDHT{d1}, 1, 1, 2*time.Second)
require.Equal(t, connectedTo.self, d1.routingTable.ListPeers()[0])

// now change the listen address so and event is emitted and we do a self walk
require.NoError(t, d1.host.Network().Listen(ma.StringCast("/ip4/0.0.0.0/tcp/1234")))
require.True(t, waitForWellFormedTables(t, []*IpfsDHT{d1}, 2, 2, 20*time.Second))
// it should now have both peers in the RT
ps := d1.routingTable.ListPeers()
require.Contains(t, ps, d2.self)
require.Contains(t, ps, d3.self)
}
35 changes: 21 additions & 14 deletions dht_net.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/libp2p/go-libp2p-kad-dht/metrics"
pb "github.com/libp2p/go-libp2p-kad-dht/pb"
msmux "github.com/multiformats/go-multistream"

ggio "github.com/gogo/protobuf/io"

Expand Down Expand Up @@ -80,6 +81,11 @@ func (dht *IpfsDHT) handleNewMessage(s network.Stream) bool {
defer timer.Stop()

for {
if dht.getMode() != modeServer {
logger.Errorf("ignoring incoming dht message while not in server mode")
return false
}

var req pb.Message
msgbytes, err := r.ReadMsg()
msgLen := len(msgbytes)
Expand Down Expand Up @@ -142,8 +148,6 @@ func (dht *IpfsDHT) handleNewMessage(s network.Stream) bool {
return false
}

dht.updateFromMessage(ctx, mPeer, &req)

if resp == nil {
continue
}
Expand All @@ -169,6 +173,9 @@ func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.ID, pmes *pb.Message

ms, err := dht.messageSenderForPeer(ctx, p)
if err != nil {
if err == msmux.ErrNotSupported {
dht.peerStoppedDHT(ctx, p)
}
stats.Record(ctx,
metrics.SentRequests.M(1),
metrics.SentRequestErrors.M(1),
Expand All @@ -180,16 +187,16 @@ func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.ID, pmes *pb.Message

rpmes, err := ms.SendRequest(ctx, pmes)
if err != nil {
if err == msmux.ErrNotSupported {
dht.peerStoppedDHT(ctx, p)
}
stats.Record(ctx,
metrics.SentRequests.M(1),
metrics.SentRequestErrors.M(1),
)
return nil, err
}

// update the peer (on valid msgs only)
dht.updateFromMessage(ctx, p, rpmes)

stats.Record(ctx,
metrics.SentRequests.M(1),
metrics.SentBytes.M(int64(pmes.Size())),
Expand All @@ -206,6 +213,9 @@ func (dht *IpfsDHT) sendMessage(ctx context.Context, p peer.ID, pmes *pb.Message

ms, err := dht.messageSenderForPeer(ctx, p)
if err != nil {
if err == msmux.ErrNotSupported {
dht.peerStoppedDHT(ctx, p)
}
stats.Record(ctx,
metrics.SentMessages.M(1),
metrics.SentMessageErrors.M(1),
Expand All @@ -214,6 +224,9 @@ func (dht *IpfsDHT) sendMessage(ctx context.Context, p peer.ID, pmes *pb.Message
}

if err := ms.SendMessage(ctx, pmes); err != nil {
if err == msmux.ErrNotSupported {
dht.peerStoppedDHT(ctx, p)
}
stats.Record(ctx,
metrics.SentMessages.M(1),
metrics.SentMessageErrors.M(1),
Expand All @@ -230,15 +243,6 @@ func (dht *IpfsDHT) sendMessage(ctx context.Context, p peer.ID, pmes *pb.Message
return nil
}

func (dht *IpfsDHT) updateFromMessage(ctx context.Context, p peer.ID, mes *pb.Message) error {
// Make sure that this node is actually a DHT server, not just a client.
protos, err := dht.peerstore.SupportsProtocols(p, dht.protocolStrs()...)
if err == nil && len(protos) > 0 {
dht.Update(ctx, p)
}
return nil
}

func (dht *IpfsDHT) messageSenderForPeer(ctx context.Context, p peer.ID) (*messageSender, error) {
dht.smlk.Lock()
ms, ok := dht.strmap[p]
Expand Down Expand Up @@ -314,6 +318,9 @@ func (ms *messageSender) prep(ctx context.Context) error {
return nil
}

// We only want to speak to peers using our primary protocols. We do not want to query any peer that only speaks
// one of the secondary "server" protocols that we happen to support (e.g. older nodes that we can respond to for
// backwards compatibility reasons).
nstr, err := ms.dht.host.NewStream(ctx, ms.p, ms.dht.protocols...)
if err != nil {
return err
Expand Down