Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gossip V3 #310

Merged
merged 24 commits into from Dec 11, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
76365a7
move neighborhood to swarm
y0sher Nov 29, 2018
77b3c17
Remove peers management from Gossip protocol and send message using s…
Nov 28, 2018
369b8bc
changes in swarm to support changes in the Gossip protocol impl.
Dec 3, 2018
d8cb079
add gossip layer in gossip.Protocol
Dec 4, 2018
6170602
return to use old log
y0sher Dec 6, 2018
b290565
add ProcessProtocolMessage, change what's needed in onRemoteClientMes…
y0sher Dec 6, 2018
0d06ad7
add new messages methods and tests
y0sher Dec 6, 2018
50abd57
fix bugs and tests in gossip protocol
y0sher Dec 6, 2018
d1cd960
move neighborhood to swarm
y0sher Nov 29, 2018
13cc0e7
Remove peers management from Gossip protocol and send message using s…
Nov 28, 2018
195e2ec
changes in swarm to support changes in the Gossip protocol impl.
Dec 3, 2018
333b2fa
add gossip layer in gossip.Protocol
Dec 4, 2018
7cb2d3b
return to use old log
y0sher Dec 6, 2018
cda3545
add ProcessProtocolMessage, change what's needed in onRemoteClientMes…
y0sher Dec 6, 2018
c90b6d1
add new messages methods and tests
y0sher Dec 6, 2018
193652f
fix bugs and tests in gossip protocol
y0sher Dec 6, 2018
81100f2
Merge remote-tracking branch 'origin/p2p-gossip-swarm' into p2p-gossi…
y0sher Dec 8, 2018
d5138fd
rebasing on develop and fixing some tests
y0sher Dec 8, 2018
6b76184
give some more tolerance in test for slower systems and change log to…
y0sher Dec 8, 2018
6e767c4
fix Dockerfile and some comments.
y0sher Dec 9, 2018
ac14158
give it a second to fail
y0sher Dec 9, 2018
1d877c4
fix more tests races and bugs
y0sher Dec 9, 2018
70a5a60
Also block and check that peers are added correctly.
y0sher Dec 9, 2018
5606898
use AuthAuthor and remove AuthSender, remove extra lock, some cr refa…
y0sher Dec 10, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion p2p/dht/bootstrap.go
Expand Up @@ -63,7 +63,7 @@ func (d *KadDHT) Bootstrap(ctx context.Context) error {
return ErrConnectToBootNode
}

d.local.Debug("lookup using %d preloaded bootnodes ", bn)
d.local.Debug("Lookup using %d preloaded bootnodes ", bn)

ctx, _ = context.WithTimeout(ctx, BootstrapTimeout)
err := d.tryBoot(ctx, c)
Expand Down
27 changes: 19 additions & 8 deletions p2p/dht/dht.go
Expand Up @@ -14,6 +14,8 @@ import (
// DHT is an interface to a general distributed hash table.
type DHT interface {
Update(node node.Node)

InternalLookup(dhtid node.DhtID) []node.Node
Lookup(pubkey string) (node.Node, error)

SelectPeers(qty int) []node.Node
Expand Down Expand Up @@ -52,8 +54,7 @@ func (d *KadDHT) Size() int {
return <-req
}

// SelectPeers asks routingtable to randomly select a slice of nodes in size `qty`
// SelectPeers asks routingtable to randomly select a slice of nodes in size `qty`
// SelectPeers asks routing table to randomly select a slice of nodes in size `qty`
func (d *KadDHT) SelectPeers(qty int) []node.Node {
return d.rt.SelectPeers(qty)
}
Expand All @@ -70,7 +71,7 @@ func New(node *node.LocalNode, config config.SwarmConfig, service service.Servic
return d
}

// Update insert or update a node in the routing table.
// Update insert or updates a node in the routing table.
func (d *KadDHT) Update(node node.Node) {
d.rt.Update(node)
}
Expand All @@ -79,11 +80,11 @@ func (d *KadDHT) Update(node node.Node) {
// if the node can't be found there it sends a query to the network.
func (d *KadDHT) Lookup(pubkey string) (node.Node, error) {
dhtid := node.NewDhtIDFromBase58(pubkey)
poc := make(PeersOpChannel)
d.rt.NearestPeers(NearestPeersReq{dhtid, d.config.RoutingTableAlpha, poc})
res := (<-poc).Peers
if len(res) == 0 {
return node.EmptyNode, ErrEmptyRoutingTable

res := d.InternalLookup(dhtid)

if res == nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Print an error to the log that we lost the bootstrap nodes

return node.EmptyNode, errors.New("no peers found in routing table")
}

if res[0].DhtID().Equals(dhtid) {
Expand All @@ -92,6 +93,16 @@ func (d *KadDHT) Lookup(pubkey string) (node.Node, error) {

return d.kadLookup(pubkey, res)
}
// InternalLookup finds a node in the dht by its public key, it issues a search inside the local routing table
func (d *KadDHT) InternalLookup(dhtid node.DhtID) []node.Node {
poc := make(PeersOpChannel)
d.rt.NearestPeers(NearestPeersReq{dhtid, d.config.RoutingTableAlpha, poc})
res := (<-poc).Peers
if len(res) == 0 {
return nil
}
return res
}

// Implements the kad algo for locating a remote node
// Precondition - node is not in local routing table
Expand Down
45 changes: 34 additions & 11 deletions p2p/dht/dht_mock.go
@@ -1,20 +1,26 @@
package dht

import "github.com/spacemeshos/go-spacemesh/p2p/node"
import (
"context"
"github.com/spacemeshos/go-spacemesh/p2p/node"
)

// MockDHT is a mocked dht
type MockDHT struct {
update func(n node.Node)
updateCount int
bsres error
bsCount int
lookupRes node.Node
lookupErr error
UpdateFunc func(n node.Node)
updateCount int
SelectPeersFunc func(qty int) []node.Node
bsres error
bsCount int
InternalLookupFunc func(dhtid node.DhtID) []node.Node
LookupFunc func(string) (node.Node, error)
lookupRes node.Node
lookupErr error
}

// SetUpdate sets the function to run on an issued update
func (m *MockDHT) SetUpdate(f func(n node.Node)) {
m.update = f
m.UpdateFunc = f
}

// SetLookupResult sets the result ok a lookup operation
Expand All @@ -25,8 +31,8 @@ func (m *MockDHT) SetLookupResult(node node.Node, err error) {

// Update is a dht update operation it updates the updatecount
func (m *MockDHT) Update(node node.Node) {
if m.update != nil {
m.update(node)
if m.UpdateFunc != nil {
m.UpdateFunc(node)
}
m.updateCount++
}
Expand All @@ -43,24 +49,41 @@ func (m *MockDHT) BootstrapCount() int {

// Lookup is a dht lookup operation
func (m *MockDHT) Lookup(pubkey string) (node.Node, error) {
if m.LookupFunc != nil {
return m.LookupFunc(pubkey)
}
return m.lookupRes, m.lookupErr
}

// InternalLookup is a lookup only in the local routing table
func (m *MockDHT) InternalLookup(dhtid node.DhtID) []node.Node {
if m.InternalLookupFunc != nil {
return m.InternalLookupFunc(dhtid)
}
return nil
}

// SetBootstrap set the bootstrap result
func (m *MockDHT) SetBootstrap(err error) {
m.bsres = err
}

// Bootstrap is a dht bootstrap operation function it update the bootstrap count
func (m *MockDHT) Bootstrap() error {
func (m *MockDHT) Bootstrap(ctx context.Context) error {
m.bsCount++
return m.bsres
}


// SelectPeers mocks selecting peers.
func (m *MockDHT) SelectPeers(qty int) []node.Node {
if m.SelectPeersFunc != nil {
return m.SelectPeersFunc(qty)
}
return []node.Node{}
}

// Size returns the size of peers in the dht
func (m *MockDHT) Size() int {
//todo: set size
return m.updateCount
Expand Down
37 changes: 20 additions & 17 deletions p2p/dht/table_test.go
Expand Up @@ -2,6 +2,7 @@ package dht

import (
"math/rand"
"sync"
"testing"
"time"

Expand All @@ -10,7 +11,6 @@ import (

"github.com/spacemeshos/go-spacemesh/p2p/node"
"github.com/stretchr/testify/assert"
"sync"
)

func GetTestLogger(name string) *logging.Logger {
Expand Down Expand Up @@ -236,17 +236,14 @@ func TestRoutingTableImpl_SelectPeersDuplicates(t *testing.T) {

}
func TestRoutingTableImpl_SelectPeers_EnoughPeers(t *testing.T) {
const n = 500
const n = 100
const random = 5

ids := make(map[string]node.Node)
sids := make(map[string]RoutingTable)
toselect := make(map[string]struct{})

var wg sync.WaitGroup
var wg2 sync.WaitGroup

wg.Add(1)
for i := 0; i < n; i++ {
local := node.GenerateRandomNodeData()
localID := local.DhtID()
Expand All @@ -256,20 +253,26 @@ func TestRoutingTableImpl_SelectPeers_EnoughPeers(t *testing.T) {
ids[local.String()] = local
sids[local.String()] = rt

go func(l node.Node, rt RoutingTable) {
wg.Wait()
wg2.Add(1)
for nn := range ids {
if ids[nn].String() != l.String() {
rt.Update(ids[nn])
}
}
wg2.Done()
}

}(local, rt)
var wg sync.WaitGroup

for _, id := range ids {
id := id
for _, secondID := range ids {
if id.DhtID().Equals(secondID.DhtID()) {
continue
}
wg.Add(1)
go func(id, secondID node.Node) {
sids[id.String()].Update(secondID)
wg.Done()
}(id, secondID)
}
}
wg.Done()
wg2.Wait()

wg.Wait()

for rtid := range sids {
sel := sids[rtid].SelectPeers(random)
assert.NotNil(t, sel)
Expand Down