Skip to content

Commit

Permalink
P2p ping (#585)
Browse files Browse the repository at this point in the history
* new ping protocol that uses `MessageServer`
* remove sim methods we had to use when there was no ping
* use random key from second try in bootstrap, refactor loop
* add ping to dht, remove places covered for ping
* minor suite changes, added a really small suite
* swarm: 1.remove from dht on closed connection 2.only do local lookups on sendmessage
* fixed dht test
  • Loading branch information
y0sher committed Feb 27, 2019
1 parent c7ba2f9 commit b9f788a
Show file tree
Hide file tree
Showing 15 changed files with 501 additions and 420 deletions.
41 changes: 18 additions & 23 deletions p2p/dht/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ const (
LookupIntervals = 3 * time.Second
// RefreshInterval is the time we wait between dht refreshes
RefreshInterval = 5 * time.Minute

bootstrapTries = 5
)

var (
Expand Down Expand Up @@ -72,18 +70,18 @@ func (d *KadDHT) Bootstrap(ctx context.Context) error {
func (d *KadDHT) tryBoot(ctx context.Context, minPeers int) error {

searchFor := d.local.PublicKey()
gotpeers := false
tries := 0
var size int
d.local.Debug("BOOTSTRAP: Running kademlia lookup for ourselves")

loop:
for {
reschan := make(chan error)

go func() {
if gotpeers || tries >= bootstrapTries {
// TODO: consider choosing a random key that is close to the local id
// or TODO: implement real kademlia refreshes - #241
if tries > 0 {
//TODO: consider choosing a random key that is close to the local id
//or TODO: implement real kademlia refreshes - #241
searchFor = p2pcrypto.NewRandomPubkey()
d.local.Debug("BOOTSTRAP: Running kademlia lookup for random peer")
}
Expand All @@ -100,30 +98,27 @@ loop:
// if we got the peer we were looking for (us or random)
// the best thing we can do is just try again or try another random peer.
// hence we continue here.
//todo : maybe if we gotpeers than we can just break ?
continue
}
req := make(chan int)
d.rt.Size(req)
size := <-req

if (size) >= minPeers {
if gotpeers {
break loop
}
gotpeers = true
} else {
d.local.Warning("%d lookup didn't bootstrap the routing table. RT now has %d peers", tries, size)
}
size = <-req

timer := time.NewTimer(LookupIntervals)
select {
case <-ctx.Done():
return ErrBootAbort
case <-timer.C:
continue loop
if size >= minPeers {
break loop
}
}

d.local.Warning("%d lookup didn't bootstrap the routing table. RT now has %d peers", tries, size)

timer := time.NewTimer(LookupIntervals)
select {
case <-ctx.Done():
return ErrBootAbort
case <-timer.C:
continue loop
}

}

return nil
Expand Down
136 changes: 105 additions & 31 deletions p2p/dht/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,25 @@
package dht

import (
"context"
"errors"
"github.com/spacemeshos/go-spacemesh/log"
"github.com/spacemeshos/go-spacemesh/p2p/config"
"github.com/spacemeshos/go-spacemesh/p2p/node"
"github.com/spacemeshos/go-spacemesh/p2p/p2pcrypto"
"github.com/spacemeshos/go-spacemesh/p2p/server"
"github.com/spacemeshos/go-spacemesh/p2p/service"

"context"
"errors"
"github.com/spacemeshos/go-spacemesh/ping"
"github.com/spacemeshos/go-spacemesh/ping/pb"
"time"
)

// DHT is an interface to a general distributed hash table.
type DHT interface {
Update(node node.Node)
Update(p node.Node)
Remove(p node.Node)

// todo: change lookup to be an internal lookup. (do not expose net lookup)
InternalLookup(dhtid node.DhtID) []node.Node
Lookup(pubkey p2pcrypto.PublicKey) (node.Node, error)

Expand All @@ -35,15 +40,22 @@ var (
ErrEmptyRoutingTable = errors.New("no nodes to query - routing table is empty")
)

type Pinger interface {
RegisterCallback(f func(ping *pb.Ping) error)
Ping(p p2pcrypto.PublicKey) error
}

// KadDHT represents the Distributed Hash Table, it holds the Routing Table local node cache. and a FindNode kademlia protocol.
// KadDHT Is created with a localNode identity as base. (DhtID)
type KadDHT struct {
config config.SwarmConfig

local *node.LocalNode

rt RoutingTable
fnp *findNodeProtocol
rt RoutingTable

fnp *findNodeProtocol
ping Pinger

service service.Service
}
Expand All @@ -61,20 +73,39 @@ func (d *KadDHT) SelectPeers(qty int) []node.Node {
}

// New creates a new dht
func New(node *node.LocalNode, config config.SwarmConfig, service service.Service) *KadDHT {
func New(ln *node.LocalNode, config config.SwarmConfig, service service.Service) *KadDHT {
pinger := ping.New(ln.Node, service.(server.Service), ln.Log) // TODO : get from outside
d := &KadDHT{
config: config,
local: node,
rt: NewRoutingTable(config.RoutingTableBucketSize, node.DhtID(), node.Log),
local: ln,
rt: NewRoutingTable(config.RoutingTableBucketSize, ln.DhtID(), ln.Log),
ping: pinger,
service: service,
}
d.fnp = newFindNodeProtocol(service, d.rt)

pinger.RegisterCallback(func(p *pb.Ping) error {
//todo: check the address provided with an extra ping before upading. ( if we haven't checked it for a while )
k, err := p2pcrypto.NewPubkeyFromBytes(p.ID)

if err != nil {
return err
}
d.rt.Update(node.New(k, p.ListenAddress))
return nil
})

return d
}

// Update insert or updates a node in the routing table.
func (d *KadDHT) Update(node node.Node) {
d.rt.Update(node)
func (d *KadDHT) Update(p node.Node) {
d.rt.Update(p)
}

// Remove removes a record from the routing table
func (d *KadDHT) Remove(p node.Node) {
d.rt.Remove(p)
}

// Lookup finds a node in the dht by its public key, it issues a search inside the local routing table,
Expand Down Expand Up @@ -113,7 +144,7 @@ func (d *KadDHT) InternalLookup(dhtid node.DhtID) []node.Node {
// Also used as a bootstrap function to populate the routing table with the results.
func (d *KadDHT) kadLookup(id p2pcrypto.PublicKey, searchList []node.Node) (node.Node, error) {
// save queried node ids for the operation
queried := map[string]struct{}{}
queried := make(map[string]bool)

// iterative lookups for nodeId using searchList

Expand All @@ -139,10 +170,20 @@ func (d *KadDHT) kadLookup(id p2pcrypto.PublicKey, searchList []node.Node) (node
return node.EmptyNode, ErrLookupFailed
}

probed := 0
for _, active := range queried {
if active {
probed++
}

if probed >= d.config.RoutingTableBucketSize {
return node.EmptyNode, ErrLookupFailed // todo: maybe just return what we have
}
}

// lookup nodeId using the target servers
res := d.findNodeOp(servers, queried, id, closestNode)
res := d.findNodeOp(servers, queried, id)
if len(res) > 0 {

// merge newly found nodes
searchList = node.Union(searchList, res)
// sort by distance from target
Expand All @@ -155,7 +196,7 @@ func (d *KadDHT) kadLookup(id p2pcrypto.PublicKey, searchList []node.Node) (node
}

// filterFindNodeServers picks up to count server who haven't been queried recently.
func filterFindNodeServers(nodes []node.Node, queried map[string]struct{}, alpha int) []node.Node {
func filterFindNodeServers(nodes []node.Node, queried map[string]bool, alpha int) []node.Node {

// If no server have been queried already, just make sure the list len is alpha
if len(queried) == 0 {
Expand Down Expand Up @@ -185,57 +226,90 @@ func filterFindNodeServers(nodes []node.Node, queried map[string]struct{}, alpha
return newlist
}

type findNodeOpRes struct {
res []node.Node
server node.Node
}

// findNodeOp a target node on one or more servers
// returns closest nodes which are closers than closestNode to targetId
// if node found it will be in top of results list
func (d *KadDHT) findNodeOp(servers []node.Node, queried map[string]struct{}, id p2pcrypto.PublicKey,
closestNode node.Node) []node.Node {
func (d *KadDHT) findNodeOp(servers []node.Node, queried map[string]bool, id p2pcrypto.PublicKey) []node.Node {

var out []node.Node
startTime := time.Now()

defer func() {
d.local.With().Debug("findNodeOp", log.Int("servers", len(servers)), log.Int("result_count", len(out)), log.Duration("time_elapsed", time.Since(startTime)))
}()

l := len(servers)

if l == 0 {
return []node.Node{}
}

// results channel
results := make(chan []node.Node)
results := make(chan *findNodeOpRes)

// todo : kademlia says: This recursion can begin before all of the previous RPCs have returned
// currently we wait for all.

// Issue a parallel FindNode op to all servers on the list
for i := 0; i < l; i++ {
server := servers[i]
queried[server.String()] = struct{}{}
idx := id

// find node protocol adds found nodes to the local routing table
// populates queried node's routing table with us and return.
go func(server node.Node, id p2pcrypto.PublicKey) {
fnd, err := d.fnp.FindNode(server, id)
go func(server node.Node, idx p2pcrypto.PublicKey) {
d.rt.Update(server) // must do this prior to message to make sure we have that node

var err error

defer func() {
if err != nil {
d.local.With().Debug("find_node_failed", log.String("server", server.String()), log.Err(err))
results <- &findNodeOpRes{nil, server}
d.rt.Remove(server) // remove node if it didn't work
return
}
}()

err = d.ping.Ping(server.PublicKey())
if err != nil {
return
}

fnd, err := d.fnp.FindNode(server, idx)
if err != nil {
//TODO: handle errors
return
}
results <- fnd
}(server, idx)

results <- &findNodeOpRes{fnd, server}
}(servers[i], id)
}

done := 0 // To know when all operations finished
idSet := make(map[string]struct{}) // to remove duplicates

out := make([]node.Node, 0) // the end result we collect

timeout := time.NewTimer(LookupTimeout)
Loop:
for {
select {
case res := <-results:
case qres := <-results:

// we mark active nodes
queried[qres.server.String()] = qres.res != nil

res := qres.res
for _, n := range res {

if _, ok := idSet[n.PublicKey().String()]; ok {
continue
}
idSet[n.PublicKey().String()] = struct{}{}

d.rt.Update(n)
if _, ok := queried[n.PublicKey().String()]; ok {
continue
}
out = append(out, n)
}

Expand Down
20 changes: 20 additions & 0 deletions p2p/dht/dht_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,24 @@ import (
"context"
"github.com/spacemeshos/go-spacemesh/p2p/node"
"github.com/spacemeshos/go-spacemesh/p2p/p2pcrypto"
"github.com/spacemeshos/go-spacemesh/ping/pb"
"sync/atomic"
)

type mockPing struct {
called uint32
res error
}

func (mp *mockPing) Ping(key p2pcrypto.PublicKey) error {
atomic.AddUint32(&mp.called, 1)
return mp.res
}

func (mp *mockPing) RegisterCallback(func(pb *pb.Ping) error) {

}

// MockDHT is a mocked dht
type MockDHT struct {
UpdateFunc func(n node.Node)
Expand All @@ -19,6 +35,10 @@ type MockDHT struct {
lookupErr error
}

func (m *MockDHT) Remove(p node.Node) {

}

// SetUpdate sets the function to run on an issued update
func (m *MockDHT) SetUpdate(f func(n node.Node)) {
m.UpdateFunc = f
Expand Down
Loading

0 comments on commit b9f788a

Please sign in to comment.