Skip to content

Commit

Permalink
Merge pull request #7127 from ipfs/feat/dual-dht
Browse files Browse the repository at this point in the history
feat: introduce the dual WAN/LAN DHT
  • Loading branch information
Stebalien committed Apr 14, 2020
2 parents b760d89 + d0d508b commit 28e31e1
Show file tree
Hide file tree
Showing 11 changed files with 90 additions and 62 deletions.
35 changes: 21 additions & 14 deletions core/commands/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,19 +76,28 @@ var queryDhtCmd = &cmds.Command{
ctx, cancel := context.WithCancel(req.Context)
ctx, events := routing.RegisterForQueryEvents(ctx)

closestPeers, err := nd.DHT.GetClosestPeers(ctx, string(id))
if err != nil {
cancel()
return err
dht := nd.DHT.WAN
if !nd.DHT.WANActive() {
dht = nd.DHT.LAN
}

errCh := make(chan error, 1)
go func() {
defer close(errCh)
defer cancel()
for p := range closestPeers {
routing.PublishQueryEvent(ctx, &routing.QueryEvent{
ID: p,
Type: routing.FinalPeer,
})
closestPeers, err := dht.GetClosestPeers(ctx, string(id))
if closestPeers != nil {
for p := range closestPeers {
routing.PublishQueryEvent(ctx, &routing.QueryEvent{
ID: p,
Type: routing.FinalPeer,
})
}
}

if err != nil {
errCh <- err
return
}
}()

Expand All @@ -98,15 +107,13 @@ var queryDhtCmd = &cmds.Command{
}
}

return nil
return <-errCh
},
Encoders: cmds.EncoderMap{
cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *routing.QueryEvent) error {
pfm := pfuncMap{
routing.PeerResponse: func(obj *routing.QueryEvent, out io.Writer, verbose bool) error {
for _, p := range obj.Responses {
fmt.Fprintf(out, "%s\n", p.ID.Pretty())
}
routing.FinalPeer: func(obj *routing.QueryEvent, out io.Writer, verbose bool) error {
fmt.Fprintf(out, "%s\n", obj.ID)
return nil
},
}
Expand Down
4 changes: 2 additions & 2 deletions core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
peer "github.com/libp2p/go-libp2p-core/peer"
pstore "github.com/libp2p/go-libp2p-core/peerstore"
routing "github.com/libp2p/go-libp2p-core/routing"
dht "github.com/libp2p/go-libp2p-kad-dht"
ddht "github.com/libp2p/go-libp2p-kad-dht/dual"
pubsub "github.com/libp2p/go-libp2p-pubsub"
psrouter "github.com/libp2p/go-libp2p-pubsub-router"
record "github.com/libp2p/go-libp2p-record"
Expand Down Expand Up @@ -92,7 +92,7 @@ type IpfsNode struct {

PubSub *pubsub.PubSub `optional:"true"`
PSRouter *psrouter.PubsubValueStore `optional:"true"`
DHT *dht.IpfsDHT `optional:"true"`
DHT *ddht.DHT `optional:"true"`
P2P *p2p.P2P `optional:"true"`

Process goprocess.Process
Expand Down
10 changes: 6 additions & 4 deletions core/coreapi/test/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/ipfs/go-ipfs/core/bootstrap"
"github.com/ipfs/go-ipfs/core/coreapi"
mock "github.com/ipfs/go-ipfs/core/mock"
"github.com/ipfs/go-ipfs/core/node/libp2p"
"github.com/ipfs/go-ipfs/keystore"
"github.com/ipfs/go-ipfs/repo"

Expand Down Expand Up @@ -65,7 +66,7 @@ func (NodeProvider) MakeAPISwarm(ctx context.Context, fullIdentity bool, n int)
}

c := config.Config{}
c.Addresses.Swarm = []string{fmt.Sprintf("/ip4/127.0.%d.1/tcp/4001", i)}
c.Addresses.Swarm = []string{fmt.Sprintf("/ip4/18.0.%d.1/tcp/4001", i)}
c.Identity = ident
c.Experimental.FilestoreEnabled = true

Expand All @@ -78,9 +79,10 @@ func (NodeProvider) MakeAPISwarm(ctx context.Context, fullIdentity bool, n int)
}

node, err := core.NewNode(ctx, &core.BuildCfg{
Repo: r,
Host: mock.MockHostOption(mn),
Online: fullIdentity,
Routing: libp2p.DHTServerOption,
Repo: r,
Host: mock.MockHostOption(mn),
Online: fullIdentity,
ExtraOpts: map[string]bool{
"pubsub": true,
},
Expand Down
24 changes: 24 additions & 0 deletions core/mock/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package coremock

import (
"context"
"fmt"
"io/ioutil"

libp2p2 "github.com/ipfs/go-ipfs/core/node/libp2p"

Expand Down Expand Up @@ -75,3 +77,25 @@ func MockCmdsCtx() (commands.Context, error) {
},
}, nil
}

func MockPublicNode(ctx context.Context, mn mocknet.Mocknet) (*core.IpfsNode, error) {
ds := syncds.MutexWrap(datastore.NewMapDatastore())
cfg, err := config.Init(ioutil.Discard, 2048)
if err != nil {
return nil, err
}
count := len(mn.Peers())
cfg.Addresses.Swarm = []string{
fmt.Sprintf("/ip4/18.0.%d.%d/tcp/4001", count>>16, count&0xFF),
}
cfg.Datastore = config.Datastore{}
return core.NewNode(ctx, &core.BuildCfg{
Online: true,
Routing: libp2p2.DHTServerOption,
Repo: &repo.Mock{
C: *cfg,
D: ds,
},
Host: MockHostOption(mn),
})
}
11 changes: 6 additions & 5 deletions core/node/libp2p/routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,17 @@ import (
"sort"
"time"

"github.com/ipfs/go-ipfs/core/node/helpers"

host "github.com/libp2p/go-libp2p-core/host"
routing "github.com/libp2p/go-libp2p-core/routing"
dht "github.com/libp2p/go-libp2p-kad-dht"
ddht "github.com/libp2p/go-libp2p-kad-dht/dual"
"github.com/libp2p/go-libp2p-pubsub"
namesys "github.com/libp2p/go-libp2p-pubsub-router"
record "github.com/libp2p/go-libp2p-record"
routinghelpers "github.com/libp2p/go-libp2p-routing-helpers"
"go.uber.org/fx"

"github.com/ipfs/go-ipfs/core/node/helpers"
"go.uber.org/fx"
)

type BaseIpfsRouting routing.Routing
Expand All @@ -31,8 +32,8 @@ type p2pRouterOut struct {
Router Router `group:"routers"`
}

func BaseRouting(lc fx.Lifecycle, in BaseIpfsRouting) (out p2pRouterOut, dr *dht.IpfsDHT) {
if dht, ok := in.(*dht.IpfsDHT); ok {
func BaseRouting(lc fx.Lifecycle, in BaseIpfsRouting) (out p2pRouterOut, dr *ddht.DHT) {
if dht, ok := in.(*ddht.DHT); ok {
dr = dht

lc.Append(fx.Hook{
Expand Down
7 changes: 3 additions & 4 deletions core/node/libp2p/routingopt.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,15 @@ import (
host "github.com/libp2p/go-libp2p-core/host"
routing "github.com/libp2p/go-libp2p-core/routing"
dht "github.com/libp2p/go-libp2p-kad-dht"
dual "github.com/libp2p/go-libp2p-kad-dht/dual"
record "github.com/libp2p/go-libp2p-record"
)

type RoutingOption func(context.Context, host.Host, datastore.Batching, record.Validator) (routing.Routing, error)

func constructDHTRouting(mode dht.ModeOpt) func(ctx context.Context, host host.Host, dstore datastore.Batching, validator record.Validator) (routing.Routing, error) {
return func(ctx context.Context, host host.Host, dstore datastore.Batching, validator record.Validator) (routing.Routing, error) {
return dht.New(
return dual.New(
ctx, host,
dht.Concurrency(10),
dht.Mode(mode),
Expand All @@ -26,9 +27,7 @@ func constructDHTRouting(mode dht.ModeOpt) func(ctx context.Context, host host.H
}

var (
// FIXME: Set this to dht.ModeAuto once we resolve
// https://github.com/libp2p/go-libp2p-kad-dht/issues/564
DHTOption RoutingOption = constructDHTRouting(dht.ModeServer)
DHTOption RoutingOption = constructDHTRouting(dht.ModeAuto)
DHTClientOption = constructDHTRouting(dht.ModeClient)
DHTServerOption = constructDHTRouting(dht.ModeServer)
NilRouterOption = nilrouting.ConstructNilRouting
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ require (
github.com/libp2p/go-libp2p-core v0.5.1
github.com/libp2p/go-libp2p-discovery v0.3.0
github.com/libp2p/go-libp2p-http v0.1.5
github.com/libp2p/go-libp2p-kad-dht v0.6.2
github.com/libp2p/go-libp2p-kbucket v0.3.3
github.com/libp2p/go-libp2p-kad-dht v0.7.2
github.com/libp2p/go-libp2p-kbucket v0.4.1
github.com/libp2p/go-libp2p-loggables v0.1.0
github.com/libp2p/go-libp2p-mplex v0.2.3
github.com/libp2p/go-libp2p-peerstore v0.2.3
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -551,10 +551,10 @@ github.com/libp2p/go-libp2p-interface-connmgr v0.0.1/go.mod h1:GarlRLH0LdeWcLnYM
github.com/libp2p/go-libp2p-interface-connmgr v0.0.4/go.mod h1:GarlRLH0LdeWcLnYM/SaBykKFl9U5JFnbBGruAk/D5k=
github.com/libp2p/go-libp2p-interface-connmgr v0.0.5/go.mod h1:GarlRLH0LdeWcLnYM/SaBykKFl9U5JFnbBGruAk/D5k=
github.com/libp2p/go-libp2p-interface-pnet v0.0.1/go.mod h1:el9jHpQAXK5dnTpKA4yfCNBZXvrzdOU75zz+C6ryp3k=
github.com/libp2p/go-libp2p-kad-dht v0.6.2 h1:ZKXN7iqjIGC3+z4MKoBoyOGq6zvJ294J/tAA7LfihV0=
github.com/libp2p/go-libp2p-kad-dht v0.6.2/go.mod h1:LzZi6RR6NrgfFboyk03I5Yzg1Rr4eoQictDXpKpI45c=
github.com/libp2p/go-libp2p-kbucket v0.3.3 h1:V2Zwv6QnCK6Who0iiJW2eUKwdlTYGJ2HnLViaolDOcs=
github.com/libp2p/go-libp2p-kbucket v0.3.3/go.mod h1:IWFdYRBOYzaLEHnvrfzEkr+UcuveCXIoeO8QeFZSI6A=
github.com/libp2p/go-libp2p-kad-dht v0.7.2 h1:pmjjvk0q3wAQVLWWcFaYb0Ha+QXpa7NlzeXkKvhGYXU=
github.com/libp2p/go-libp2p-kad-dht v0.7.2/go.mod h1:+XvZEgO0gOAd9liN1wjdewxIdcdtWLJXG8U8ou2oo/A=
github.com/libp2p/go-libp2p-kbucket v0.4.1 h1:6FyzbQuGLPzbMv3HiD232zqscIz5iB8ppJwb380+OGI=
github.com/libp2p/go-libp2p-kbucket v0.4.1/go.mod h1:7sCeZx2GkNK1S6lQnGUW5JYZCFPnXzAZCCBBS70lytY=
github.com/libp2p/go-libp2p-loggables v0.0.1/go.mod h1:lDipDlBNYbpyqyPX/KcoO+eq0sJYEVR2JgOexcivchg=
github.com/libp2p/go-libp2p-loggables v0.1.0 h1:h3w8QFfCt2UJl/0/NW4K829HX/0S4KD31PQ7m8UXXO8=
github.com/libp2p/go-libp2p-loggables v0.1.0/go.mod h1:EyumB2Y6PrYjr55Q3/tiJ/o3xoDasoRYM7nOzEpoa90=
Expand Down
5 changes: 1 addition & 4 deletions namesys/republisher/repub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,7 @@ func TestRepublish(t *testing.T) {

var nodes []*core.IpfsNode
for i := 0; i < 10; i++ {
nd, err := core.NewNode(ctx, &core.BuildCfg{
Online: true,
Host: mock.MockHostOption(mn),
})
nd, err := mock.MockPublicNode(ctx, mn)
if err != nil {
t.Fatal(err)
}
Expand Down
16 changes: 3 additions & 13 deletions test/integration/three_legged_cat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"testing"
"time"

core "github.com/ipfs/go-ipfs/core"
bootstrap2 "github.com/ipfs/go-ipfs/core/bootstrap"
"github.com/ipfs/go-ipfs/core/coreapi"
mock "github.com/ipfs/go-ipfs/core/mock"
Expand Down Expand Up @@ -76,28 +75,19 @@ func RunThreeLeggedCat(data []byte, conf testutil.LatencyConfig) error {
Bandwidth: math.MaxInt32,
})

bootstrap, err := core.NewNode(ctx, &core.BuildCfg{
Online: true,
Host: mock.MockHostOption(mn),
})
bootstrap, err := mock.MockPublicNode(ctx, mn)
if err != nil {
return err
}
defer bootstrap.Close()

adder, err := core.NewNode(ctx, &core.BuildCfg{
Online: true,
Host: mock.MockHostOption(mn),
})
adder, err := mock.MockPublicNode(ctx, mn)
if err != nil {
return err
}
defer adder.Close()

catter, err := core.NewNode(ctx, &core.BuildCfg{
Online: true,
Host: mock.MockHostOption(mn),
})
catter, err := mock.MockPublicNode(ctx, mn)
if err != nil {
return err
}
Expand Down
28 changes: 18 additions & 10 deletions test/sharness/t0170-dht.sh
Original file line number Diff line number Diff line change
Expand Up @@ -76,16 +76,24 @@ test_dht() {


# ipfs dht query <peerID>
## We query 3 different keys, to statisically lower the chance that the queryer
## turns out to be the closest to what a key hashes to.
# TODO: flaky. tracked by https://github.com/ipfs/go-ipfs/issues/2620
test_expect_success 'query' '
ipfsi 3 dht query "$(echo banana | ipfsi 3 add -q)" >actual &&
ipfsi 3 dht query "$(echo apple | ipfsi 3 add -q)" >>actual &&
ipfsi 3 dht query "$(echo pear | ipfsi 3 add -q)" >>actual &&
PEERS=$(wc -l actual | cut -d '"'"' '"'"' -f 1) &&
[ -s actual ] ||
test_might_fail test_fsh cat actual
#
# We test all nodes. 4 nodes should see the same peer ID, one node (the
# closest) should see a different one.

for i in $(test_seq 0 4); do
test_expect_success "query from $i" '
ipfsi "$i" dht query "$HASH" | head -1 >closest-$i
'
done

test_expect_success "collecting results" '
cat closest-* | sort | uniq -c | sed -e "s/ *\([0-9]\+\) .*/\1/g" | sort -g > actual &&
echo 1 > expected &&
echo 4 >> expected
'

test_expect_success "checking results" '
test_cmp actual expected
'

test_expect_success 'stop iptb' '
Expand Down

0 comments on commit 28e31e1

Please sign in to comment.