Skip to content

Commit

Permalink
Merge pull request #204 from spacemeshos/p2p-port-on-hs
Browse files Browse the repository at this point in the history
BUG #203
  • Loading branch information
y0sher committed Oct 12, 2018
2 parents eef1bf5 + 3f953e4 commit 0a20c73
Show file tree
Hide file tree
Showing 16 changed files with 267 additions and 175 deletions.
11 changes: 5 additions & 6 deletions p2p/connectionpool/connectionpool.go
Expand Up @@ -17,7 +17,7 @@ type dialResult struct {


type networker interface { type networker interface {
Dial(address string, remotePublicKey crypto.PublicKey) (net.Connection, error) // Connect to a remote node. Can send when no error. Dial(address string, remotePublicKey crypto.PublicKey) (net.Connection, error) // Connect to a remote node. Can send when no error.
SubscribeOnNewRemoteConnections() chan net.Connection SubscribeOnNewRemoteConnections() chan net.NewConnectionEvent
NetworkID() int8 NetworkID() int8
ClosingConnections() chan net.Connection ClosingConnections() chan net.Connection
Logger() *logging.Logger Logger() *logging.Logger
Expand All @@ -37,13 +37,12 @@ type ConnectionPool struct {
dialWait sync.WaitGroup dialWait sync.WaitGroup
shutdown bool shutdown bool


newRemoteConn chan net.Connection newRemoteConn chan net.NewConnectionEvent
teardown chan struct{} teardown chan struct{}
} }


// NewConnectionPool creates new ConnectionPool // NewConnectionPool creates new ConnectionPool
func NewConnectionPool(network networker, lPub crypto.PublicKey) *ConnectionPool { func NewConnectionPool(network networker, lPub crypto.PublicKey) *ConnectionPool {
connC := network.SubscribeOnNewRemoteConnections()
cPool := &ConnectionPool{ cPool := &ConnectionPool{
localPub: lPub, localPub: lPub,
net: network, net: network,
Expand All @@ -53,7 +52,7 @@ func NewConnectionPool(network networker, lPub crypto.PublicKey) *ConnectionPool
pendMutex: sync.Mutex{}, pendMutex: sync.Mutex{},
dialWait: sync.WaitGroup{}, dialWait: sync.WaitGroup{},
shutdown: false, shutdown: false,
newRemoteConn: connC, newRemoteConn: network.SubscribeOnNewRemoteConnections(),
teardown: make(chan struct{}), teardown: make(chan struct{}),
} }
go cPool.beginEventProcessing() go cPool.beginEventProcessing()
Expand Down Expand Up @@ -205,8 +204,8 @@ func (cp *ConnectionPool) beginEventProcessing() {
Loop: Loop:
for { for {
select { select {
case conn := <-cp.newRemoteConn: case nce := <-cp.newRemoteConn:
cp.handleNewConnection(conn.RemotePublicKey(), conn, net.Remote) cp.handleNewConnection(nce.Conn.RemotePublicKey(), nce.Conn, net.Remote)


case conn := <-cp.net.ClosingConnections(): case conn := <-cp.net.ClosingConnections():
cp.handleClosedConnection(conn) cp.handleClosedConnection(conn)
Expand Down
9 changes: 5 additions & 4 deletions p2p/connectionpool/connectionpool_test.go
Expand Up @@ -5,6 +5,7 @@ import (
"fmt" "fmt"
"github.com/spacemeshos/go-spacemesh/crypto" "github.com/spacemeshos/go-spacemesh/crypto"
"github.com/spacemeshos/go-spacemesh/p2p/net" "github.com/spacemeshos/go-spacemesh/p2p/net"
"github.com/spacemeshos/go-spacemesh/p2p/node"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"math/rand" "math/rand"
"testing" "testing"
Expand Down Expand Up @@ -112,7 +113,7 @@ func TestRemoteConnectionWithNoConnection(t *testing.T) {
cPool := NewConnectionPool(n, generatePublicKey()) cPool := NewConnectionPool(n, generatePublicKey())
rConn := net.NewConnectionMock(remotePub) rConn := net.NewConnectionMock(remotePub)
rConn.SetSession(net.NewSessionMock([]byte("aaa"))) rConn.SetSession(net.NewSessionMock([]byte("aaa")))
cPool.newRemoteConn <- rConn cPool.newRemoteConn <- net.NewConnectionEvent{rConn, node.EmptyNode}
time.Sleep(50 * time.Millisecond) time.Sleep(50 * time.Millisecond)
conn, err := cPool.GetConnection(addr, remotePub) conn, err := cPool.GetConnection(addr, remotePub)
assert.Equal(t, remotePub.String(), conn.RemotePublicKey().String()) assert.Equal(t, remotePub.String(), conn.RemotePublicKey().String())
Expand All @@ -133,7 +134,7 @@ func TestRemoteConnectionWithExistingConnection(t *testing.T) {
lConn, _ := cPool.GetConnection(addr, remotePub) lConn, _ := cPool.GetConnection(addr, remotePub)
rConn := net.NewConnectionMock(remotePub) rConn := net.NewConnectionMock(remotePub)
rConn.SetSession(net.NewSessionMock([]byte("111"))) rConn.SetSession(net.NewSessionMock([]byte("111")))
cPool.newRemoteConn <- rConn cPool.newRemoteConn <- net.NewConnectionEvent{rConn, node.EmptyNode}
time.Sleep(20 * time.Millisecond) time.Sleep(20 * time.Millisecond)
assert.Equal(t, remotePub.String(), lConn.RemotePublicKey().String()) assert.Equal(t, remotePub.String(), lConn.RemotePublicKey().String())
assert.Equal(t, int32(1), n.DialCount()) assert.Equal(t, int32(1), n.DialCount())
Expand All @@ -147,7 +148,7 @@ func TestRemoteConnectionWithExistingConnection(t *testing.T) {
lConn, _ = cPool.GetConnection(addr, remotePub) lConn, _ = cPool.GetConnection(addr, remotePub)
rConn = net.NewConnectionMock(remotePub) rConn = net.NewConnectionMock(remotePub)
rConn.SetSession(net.NewSessionMock([]byte("110"))) rConn.SetSession(net.NewSessionMock([]byte("110")))
cPool.newRemoteConn <- rConn cPool.newRemoteConn <- net.NewConnectionEvent{rConn, node.EmptyNode}
time.Sleep(20 * time.Millisecond) time.Sleep(20 * time.Millisecond)
assert.Equal(t, remotePub.String(), lConn.RemotePublicKey().String()) assert.Equal(t, remotePub.String(), lConn.RemotePublicKey().String())
assert.Equal(t, int32(2), n.DialCount()) assert.Equal(t, int32(2), n.DialCount())
Expand Down Expand Up @@ -267,7 +268,7 @@ func TestRandom(t *testing.T) {
sID := make([]byte, 4) sID := make([]byte, 4)
rand.Read(sID) rand.Read(sID)
rConn.SetSession(net.NewSessionMock(sID)) rConn.SetSession(net.NewSessionMock(sID))
cPool.newRemoteConn <- rConn cPool.newRemoteConn <- net.NewConnectionEvent{rConn, node.EmptyNode}
}() }()
} else if r == 1 { } else if r == 1 {
go func() { go func() {
Expand Down
83 changes: 35 additions & 48 deletions p2p/dht/dht_test.go
Expand Up @@ -62,11 +62,14 @@ func TestDHT_Update(t *testing.T) {


assert.True(t, size > config.DefaultConfig().SwarmConfig.RoutingTableBucketSize, "Routing table should be at least as big as bucket size") assert.True(t, size > config.DefaultConfig().SwarmConfig.RoutingTableBucketSize, "Routing table should be at least as big as bucket size")


looked, err := dht.Lookup(randnode.PublicKey().String()) lastnode := evenmorenodes[0]


assert.NoError(t, err, "error finding existing node") looked, err := dht.Lookup(lastnode.PublicKey().String())


assert.True(t, looked.String() == randnode.String(), "didnt find the same node") assert.NoError(t, err, "error finding existing node ")

assert.Equal(t, looked.String(), lastnode.String(), "didnt find the same node")
assert.Equal(t, looked.Address(), lastnode.Address(), "didnt find the same node")


} }


Expand Down Expand Up @@ -120,47 +123,41 @@ func TestDHT_Lookup2(t *testing.T) {


} }


func simNodeWithDHT(t *testing.T, sc config.SwarmConfig, sim *simulator.Simulator) (*simulator.Node, DHT) {
ln, _ := node.GenerateTestNode(t)
n := sim.NewNodeFrom(ln.Node)
dht := New(ln, sc, n)
n.AttachDHT(dht)

return n, dht
}

func bootAndWait(t *testing.T, dht DHT, errchan chan error) {
err := dht.Bootstrap()
errchan <- err
}

func TestDHT_Bootstrap(t *testing.T) { func TestDHT_Bootstrap(t *testing.T) {
// Create a bootstrap node // Create a bootstrap node
ln, _ := node.GenerateTestNode(t)
cfg := config.DefaultConfig()
sim := simulator.New() sim := simulator.New()
n1 := sim.NewNodeFrom(ln.Node) bn, _ := simNodeWithDHT(t, config.DefaultConfig().SwarmConfig, sim)

dht := New(ln, cfg.SwarmConfig, n1)


// config for other nodes // config for other nodes
cfg2 := config.DefaultConfig() cfg2 := config.DefaultConfig()
cfg2.SwarmConfig.RandomConnections = 2 // min numbers of peers to succeed in bootstrap cfg2.SwarmConfig.RandomConnections = 2 // min numbers of peers to succeed in bootstrap
cfg2.SwarmConfig.BootstrapNodes = []string{node.StringFromNode(dht.local.Node)} cfg2.SwarmConfig.BootstrapNodes = []string{node.StringFromNode(bn.Node)}


booted := make(chan error) booted := make(chan error)


// Boot 3 more nodes // boot 3 more dhts
ln2, _ := node.GenerateTestNode(t)
n2 := sim.NewNodeFrom(ln2.Node) _, dht2 := simNodeWithDHT(t, cfg2.SwarmConfig, sim)
dht2 := New(ln2, cfg2.SwarmConfig, n2) _, dht3 := simNodeWithDHT(t, cfg2.SwarmConfig, sim)

_, dht4 := simNodeWithDHT(t, cfg2.SwarmConfig, sim)
ln3, _ := node.GenerateTestNode(t)
n3 := sim.NewNodeFrom(ln3.Node) go bootAndWait(t, dht2, booted)
dht3 := New(ln3, cfg2.SwarmConfig, n3) go bootAndWait(t, dht3, booted)

go bootAndWait(t, dht4, booted)
ln4, _ := node.GenerateTestNode(t)
n4 := sim.NewNodeFrom(ln4.Node)
dht4 := New(ln4, cfg2.SwarmConfig, n4)

go func() {
err2 := dht2.Bootstrap()
booted <- err2
}()
go func() {
err3 := dht3.Bootstrap()
booted <- err3
}()
go func() {
err4 := dht4.Bootstrap()
booted <- err4
}()


// Collect errors // Collect errors
err := <-booted err := <-booted
Expand All @@ -181,29 +178,19 @@ func TestDHT_Bootstrap2(t *testing.T) {
sim := simulator.New() sim := simulator.New()


// Create a bootstrap node // Create a bootstrap node
bs, _ := node.GenerateTestNode(t)
cfg := config.DefaultConfig() cfg := config.DefaultConfig()
bsn := sim.NewNodeFrom(bs.Node) bn, _ := simNodeWithDHT(t, cfg.SwarmConfig, sim)

dht := New(bs, cfg.SwarmConfig, bsn)


// config for other nodes // config for other nodes
cfg2 := config.DefaultConfig() cfg2 := config.DefaultConfig()
cfg2.SwarmConfig.RandomConnections = minToBoot // min numbers of peers to succeed in bootstrap cfg2.SwarmConfig.RandomConnections = minToBoot // min numbers of peers to succeed in bootstrap
cfg2.SwarmConfig.BootstrapNodes = []string{node.StringFromNode(dht.local.Node)} cfg2.SwarmConfig.BootstrapNodes = []string{node.StringFromNode(bn.Node)}


booted := make(chan error) booted := make(chan error)


dhts := make([]*KadDHT, nodesNum)

for i := 0; i < nodesNum; i++ { for i := 0; i < nodesNum; i++ {
lln, _ := node.GenerateTestNode(t) _, d := simNodeWithDHT(t, cfg2.SwarmConfig, sim)
nn := sim.NewNodeFrom(lln.Node) go bootAndWait(t, d, booted)

d := New(lln, cfg2.SwarmConfig, nn)

dhts[i] = d
go func(d *KadDHT) { err := d.Bootstrap(); booted <- err }(d)
} }


timer := time.NewTimer(timeout) timer := time.NewTimer(timeout)
Expand Down
2 changes: 0 additions & 2 deletions p2p/dht/findnode.go
Expand Up @@ -154,8 +154,6 @@ func (p *findNodeProtocol) readLoop() {
return return
} }


p.rt.Update(msg.Sender())

if headers.Req { if headers.Req {
p.handleIncomingRequest(msg.Sender().PublicKey(), headers.ReqID, headers.Payload) p.handleIncomingRequest(msg.Sender().PublicKey(), headers.ReqID, headers.Payload)
return return
Expand Down
6 changes: 6 additions & 0 deletions p2p/dht/table.go
Expand Up @@ -245,6 +245,12 @@ func (rt *routingTableImpl) update(p node.Node) {
return return
} }


if p.Address() == "" {
rt.log.Errorf("Updated non-existing peer without an address pubkey: %v", p.PublicKey().String())
return
}
// this is a new node.

// todo: consider connection metrics // todo: consider connection metrics
if bucket.Len() >= rt.bucketsize { // bucket overflows if bucket.Len() >= rt.bucketsize { // bucket overflows
// TODO: if bucket is full ping oldest node and replace if it fails to answer // TODO: if bucket is full ping oldest node and replace if it fails to answer
Expand Down
4 changes: 2 additions & 2 deletions p2p/net/conn.go
Expand Up @@ -152,12 +152,12 @@ func (c *FormattedConnection) Send(m []byte) error {
// Close closes the connection (implements io.Closer). It is go safe. // Close closes the connection (implements io.Closer). It is go safe.
func (c *FormattedConnection) Close() { func (c *FormattedConnection) Close() {
c.closeOnce.Do(func() { c.closeOnce.Do(func() {
atomic.AddInt32(&c.closed, int32(1)) atomic.AddInt32(&c.closed, 1)
c.closeChan <- struct{}{} c.closeChan <- struct{}{}
}) })
} }


// Reports whether the connection was closed. It is go safe. // Closed Reports whether the connection was closed. It is go safe.
func (c *FormattedConnection) Closed() bool { func (c *FormattedConnection) Closed() bool {
return atomic.LoadInt32(&c.closed) > 0 return atomic.LoadInt32(&c.closed) > 0
} }
Expand Down
11 changes: 4 additions & 7 deletions p2p/net/handshake.go
Expand Up @@ -32,7 +32,7 @@ const HandshakeResp = "/handshake/1.0/handshake-resp/"
// Node that NetworkSession is not yet authenticated - this happens only when the handshake response is processed and authenticated // Node that NetworkSession is not yet authenticated - this happens only when the handshake response is processed and authenticated
// This is called by node1 (initiator) // This is called by node1 (initiator)
func GenerateHandshakeRequestData(localPublicKey crypto.PublicKey, localPrivateKey crypto.PrivateKey, remotePublicKey crypto.PublicKey, func GenerateHandshakeRequestData(localPublicKey crypto.PublicKey, localPrivateKey crypto.PrivateKey, remotePublicKey crypto.PublicKey,
networkID int8) (*pb.HandshakeData, NetworkSession, error) { networkID int8, port uint16) (*pb.HandshakeData, NetworkSession, error) {


// we use the Elliptic Curve Encryption Scheme // we use the Elliptic Curve Encryption Scheme
// https://en.wikipedia.org/wiki/Integrated_Encryption_Scheme // https://en.wikipedia.org/wiki/Integrated_Encryption_Scheme
Expand Down Expand Up @@ -74,6 +74,8 @@ func GenerateHandshakeRequestData(localPublicKey crypto.PublicKey, localPrivateK
data.Hmac = hm.Sum(nil) data.Hmac = hm.Sum(nil)
data.Sign = "" data.Sign = ""


data.Port = uint32(port)

// sign corpus - marshall data without the signature to protobufs3 binary format // sign corpus - marshall data without the signature to protobufs3 binary format
bin, err := proto.Marshal(data) bin, err := proto.Marshal(data)
if err != nil { if err != nil {
Expand Down Expand Up @@ -105,7 +107,6 @@ func ProcessHandshakeRequest(networkID int8, lPub crypto.PublicKey, lPri crypto.
// check that received clientversion is valid client string // check that received clientversion is valid client string
reqVersion := strings.Split(req.ClientVersion, "/") reqVersion := strings.Split(req.ClientVersion, "/")
if len(reqVersion) != 2 { if len(reqVersion) != 2 {
//node.Warning("Dropping incoming message - invalid client version")
return nil, nil, errors.New("invalid client version") return nil, nil, errors.New("invalid client version")
} }


Expand Down Expand Up @@ -231,7 +232,7 @@ func ProcessHandshakeResponse(remotePub crypto.PublicKey, s NetworkSession, resp
expectedMAC := hm.Sum(nil) expectedMAC := hm.Sum(nil)


if !hmac.Equal(resp.Hmac, expectedMAC) { if !hmac.Equal(resp.Hmac, expectedMAC) {
return errors.New("invalid hmac") return fmt.Errorf("invalid hmac need %v got %v", resp.Hmac, expectedMAC)
} }


// verify signature // verify signature
Expand All @@ -253,9 +254,5 @@ func ProcessHandshakeResponse(remotePub crypto.PublicKey, s NetworkSession, resp
return errors.New("invalid signature") return errors.New("invalid signature")
} }


// TODO does peer need to hold all sessions?
// update remote node session here
//r.UpdateSession(s.String(), s)

return nil return nil
} }
17 changes: 13 additions & 4 deletions p2p/net/handshake_test.go
Expand Up @@ -6,6 +6,8 @@ import (
"github.com/spacemeshos/go-spacemesh/p2p/node" "github.com/spacemeshos/go-spacemesh/p2p/node"
"github.com/spacemeshos/go-spacemesh/p2p/pb" "github.com/spacemeshos/go-spacemesh/p2p/pb"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"net"
"strconv"
"testing" "testing"
) )


Expand All @@ -18,20 +20,27 @@ func TestGenerateHandshakeRequestData(t *testing.T) {
assert.NoError(t, err, "should be able to create localnode") assert.NoError(t, err, "should be able to create localnode")
con := NewConnectionMock(remoteNode.PublicKey()) con := NewConnectionMock(remoteNode.PublicKey())
remoteNet, _ := NewNet(config.ConfigValues, remoteNode) remoteNet, _ := NewNet(config.ConfigValues, remoteNode)

//outchan := remoteNet.SubscribeOnNewRemoteConnections() //outchan := remoteNet.SubscribeOnNewRemoteConnections()
_, _, er := GenerateHandshakeRequestData(localNode.PublicKey(), localNode.PrivateKey(), con.RemotePublicKey(), remoteNet.NetworkID()) _, _, er := GenerateHandshakeRequestData(localNode.PublicKey(), localNode.PrivateKey(), con.RemotePublicKey(), remoteNet.NetworkID(), getPort(t, remoteNode.Node))
assert.NoError(t, er, "Sanity failed") assert.NoError(t, er, "Sanity failed")


} }


func getPort(t *testing.T, remote node.Node) uint16 {
_, port, err := net.SplitHostPort(remote.Address())
assert.NoError(t, err)
portint, err := strconv.Atoi(port)
assert.NoError(t, err)
return uint16(portint)
}

func generateRequestData(t *testing.T) (*pb.HandshakeData, node.LocalNode, node.LocalNode, int8) { func generateRequestData(t *testing.T) (*pb.HandshakeData, node.LocalNode, node.LocalNode, int8) {


localNode, _ := node.GenerateTestNode(t) localNode, _ := node.GenerateTestNode(t)
remoteNode, _ := node.GenerateTestNode(t) remoteNode, _ := node.GenerateTestNode(t)
netId := int8(1) netId := int8(1)
con := NewConnectionMock(remoteNode.PublicKey()) out, _, err := GenerateHandshakeRequestData(localNode.PublicKey(), localNode.PrivateKey(), remoteNode.PublicKey(), netId, getPort(t, remoteNode.Node))
//outchan := remoteNet.SubscribeOnNewRemoteConnections()
out, _, err := GenerateHandshakeRequestData(localNode.PublicKey(), localNode.PrivateKey(), con.RemotePublicKey(), netId)
assert.NoError(t, err, "Failed to generate request") assert.NoError(t, err, "Failed to generate request")
return out, *localNode, *remoteNode, netId return out, *localNode, *remoteNode, netId
} }
Expand Down

0 comments on commit 0a20c73

Please sign in to comment.