Skip to content

Commit

Permalink
P2p ping address (#676)
Browse files Browse the repository at this point in the history
* change p2p cmd to debug and json log for automation reading.
* update ping callback, add small test
* add metadata to message
* adapt message server and protocols to changes
* code review fixes
  • Loading branch information
y0sher committed Mar 18, 2019
1 parent a51148b commit 6a50382
Show file tree
Hide file tree
Showing 14 changed files with 217 additions and 92 deletions.
22 changes: 20 additions & 2 deletions cmd/p2p/p2p.go
Expand Up @@ -43,7 +43,9 @@ func (app *P2PApp) Cleanup() {
} }


func (app *P2PApp) Start(cmd *cobra.Command, args []string) { func (app *P2PApp) Start(cmd *cobra.Command, args []string) {
// start p2p services // init p2p services
log.JSONLog(true)
log.DebugMode(true)
log.Info("Initializing P2P services") log.Info("Initializing P2P services")
swarm, err := p2p.New(cmdp.Ctx, app.Config.P2P) swarm, err := p2p.New(cmdp.Ctx, app.Config.P2P)
if err != nil { if err != nil {
Expand All @@ -52,10 +54,12 @@ func (app *P2PApp) Start(cmd *cobra.Command, args []string) {
} }
app.p2p = swarm app.p2p = swarm


// Testing stuff
api.ApproveAPIGossipMessages(cmdp.Ctx, app.p2p) api.ApproveAPIGossipMessages(cmdp.Ctx, app.p2p)

metrics.StartCollectingMetrics(app.Config.MetricsPort) metrics.StartCollectingMetrics(app.Config.MetricsPort)


// start the node

err = app.p2p.Start() err = app.p2p.Start()
defer app.p2p.Shutdown() defer app.p2p.Shutdown()


Expand All @@ -64,6 +68,20 @@ func (app *P2PApp) Start(cmd *cobra.Command, args []string) {
panic(err) panic(err)
} }


// start api servers
if app.Config.API.StartGrpcServer || app.Config.API.StartJSONServer {
// start grpc if specified or if json rpc specified
log.Info("Started the GRPC Service")
grpc := api.NewGrpcService(app.p2p, nil)
grpc.StartService(nil)
}

if app.Config.API.StartJSONServer {
log.Info("Started the JSON Service")
json := api.NewJSONHTTPServer()
json.StartService(nil)
}

<-cmdp.Ctx.Done() <-cmdp.Ctx.Done()
} }


Expand Down
43 changes: 32 additions & 11 deletions p2p/dht/dht.go
Expand Up @@ -12,6 +12,8 @@ import (
"github.com/spacemeshos/go-spacemesh/p2p/service" "github.com/spacemeshos/go-spacemesh/p2p/service"
"github.com/spacemeshos/go-spacemesh/ping" "github.com/spacemeshos/go-spacemesh/ping"
"github.com/spacemeshos/go-spacemesh/ping/pb" "github.com/spacemeshos/go-spacemesh/ping/pb"
"net"
"strings"
"time" "time"
) )


Expand Down Expand Up @@ -41,7 +43,7 @@ var (
) )


type Pinger interface { type Pinger interface {
RegisterCallback(f func(ping *pb.Ping) error) OnPing(f func(from net.Addr, ping *pb.Ping) error)
Ping(p p2pcrypto.PublicKey) error Ping(p p2pcrypto.PublicKey) error
} }


Expand Down Expand Up @@ -84,20 +86,39 @@ func New(ln *node.LocalNode, config config.SwarmConfig, service service.Service)
} }
d.fnp = newFindNodeProtocol(service, d.rt) d.fnp = newFindNodeProtocol(service, d.rt)


pinger.RegisterCallback(func(p *pb.Ping) error { pinger.OnPing(d.PingerCallback)
//todo: check the address provided with an extra ping before upading. ( if we haven't checked it for a while )
k, err := p2pcrypto.NewPubkeyFromBytes(p.ID)

if err != nil {
return err
}
d.rt.Update(node.New(k, p.ListenAddress))
return nil
})


return d return d
} }


func (d *KadDHT) PingerCallback(from net.Addr, p *pb.Ping) error {
//todo: check the address provided with an extra ping before upading. ( if we haven't checked it for a while )
k, err := p2pcrypto.NewPubkeyFromBytes(p.ID)

if err != nil {
return err
}

//extract port
_, port, err := net.SplitHostPort(p.ListenAddress)
if err != nil {
return err
}
var addr string

if spl := strings.Split(from.String(), ":"); len(spl) > 1 {
addr, _, err = net.SplitHostPort(from.String())
}

if err != nil {
return err
}

// todo: decide on best way to know our ext address
d.rt.Update(node.New(k, net.JoinHostPort(addr, port)))
return nil
}

// Update insert or updates a node in the routing table. // Update insert or updates a node in the routing table.
func (d *KadDHT) Update(p node.Node) { func (d *KadDHT) Update(p node.Node) {
d.rt.Update(p) d.rt.Update(p)
Expand Down
14 changes: 14 additions & 0 deletions p2p/dht/findnode_test.go
Expand Up @@ -2,6 +2,7 @@ package dht


import ( import (
"fmt" "fmt"
"github.com/btcsuite/btcutil/base58"
"github.com/spacemeshos/go-spacemesh/log" "github.com/spacemeshos/go-spacemesh/log"
"github.com/spacemeshos/go-spacemesh/p2p/config" "github.com/spacemeshos/go-spacemesh/p2p/config"
"github.com/spacemeshos/go-spacemesh/p2p/node" "github.com/spacemeshos/go-spacemesh/p2p/node"
Expand Down Expand Up @@ -79,3 +80,16 @@ func TestFindNodeProtocol_FindNode2(t *testing.T) {
assert.NoError(t, err, "Should not return error") assert.NoError(t, err, "Should not return error")
assert.Equal(t, expected, idarr, "Should be array that contains the node") assert.Equal(t, expected, idarr, "Should be array that contains the node")
} }

func Test_ToNodeInfo(t *testing.T) {
many := node.GenerateRandomNodesData(100)

for i := 0; i < len(many); i++ {
nds := toNodeInfo(many, many[i].String())
for j := 0; j < len(many)-1; j++ {
if base58.Encode(nds[j].NodeId) == many[i].String() {
t.Error("it was there")
}
}
}
}
4 changes: 4 additions & 0 deletions p2p/gossip/protocol_test.go
Expand Up @@ -140,6 +140,10 @@ type TestMessage struct {
data service.Data data service.Data
} }


func (tm TestMessage) Metadata() service.P2PMetadata {
return service.P2PMetadata{}
}

func (tm TestMessage) Sender() p2pcrypto.PublicKey { func (tm TestMessage) Sender() p2pcrypto.PublicKey {
return tm.sender return tm.sender
} }
Expand Down
65 changes: 65 additions & 0 deletions p2p/message.go
Expand Up @@ -4,6 +4,7 @@ import (
"github.com/spacemeshos/go-spacemesh/p2p/config" "github.com/spacemeshos/go-spacemesh/p2p/config"
"github.com/spacemeshos/go-spacemesh/p2p/p2pcrypto" "github.com/spacemeshos/go-spacemesh/p2p/p2pcrypto"
"github.com/spacemeshos/go-spacemesh/p2p/pb" "github.com/spacemeshos/go-spacemesh/p2p/pb"
"github.com/spacemeshos/go-spacemesh/p2p/service"
"time" "time"
) )


Expand All @@ -16,3 +17,67 @@ func NewProtocolMessageMetadata(author p2pcrypto.PublicKey, protocol string) *pb
AuthPubkey: author.Bytes(), AuthPubkey: author.Bytes(),
} }
} }

//type p2pMetadata struct {
// data map[string]string // connection address
// // todo: anything more ?
//}
//
//func (m p2pMetadata) set(k, v string) {
// m.data[k] = v
//}
//
//// Address is the address on which a connection
//func (m p2pMetadata) Get(k string) (string, bool) {
// v, ok := m.data[k]
// return v, ok
//}
//
//func newP2PMetadata() p2pMetadata {
// return p2pMetadata{make(map[string]string)}
//}

type directProtocolMessage struct {
metadata service.P2PMetadata
sender p2pcrypto.PublicKey
data service.Data
}

func (pm directProtocolMessage) Metadata() service.P2PMetadata {
return pm.metadata
}

func (pm directProtocolMessage) Sender() p2pcrypto.PublicKey {
return pm.sender
}

func (pm directProtocolMessage) Data() service.Data {
return pm.data
}

func (pm directProtocolMessage) Bytes() []byte {
return pm.data.Bytes()
}

type gossipProtocolMessage struct {
data service.Data
validationChan chan service.MessageValidation
}

func (pm gossipProtocolMessage) Data() service.Data {
return pm.data
}

func (pm gossipProtocolMessage) Bytes() []byte {
return pm.data.Bytes()
}

func (pm gossipProtocolMessage) ValidationCompletedChan() chan service.MessageValidation {
return pm.validationChan
}

func (pm gossipProtocolMessage) ReportValidation(protocol string, isValid bool) {
if pm.validationChan != nil {
pm.validationChan <- service.NewMessageValidation(pm.Bytes(), protocol, isValid)
}
}
41 changes: 29 additions & 12 deletions p2p/server/msgserver.go
Expand Up @@ -23,6 +23,11 @@ type Message interface {
Data() service.Data Data() service.Data
} }


func extractPayload(m Message) []byte {
data := m.Data().(*service.DataMsgWrapper)
return data.Payload
}

type Item struct { type Item struct {
id uint64 id uint64
timestamp time.Time timestamp time.Time
Expand All @@ -34,11 +39,11 @@ type MessageServer struct {
name string //server name name string //server name
network Service network Service
pendMutex sync.RWMutex pendMutex sync.RWMutex
pendingQueue *list.List //queue of pending messages pendingQueue *list.List //queue of pending messages
resHandlers map[uint64]func(msg []byte) //response handlers by request ReqId resHandlers map[uint64]func(msg []byte) //response handlers by request ReqId
msgRequestHandlers map[MessageType]func(msg []byte) []byte //request handlers by request type msgRequestHandlers map[MessageType]func(message Message) []byte //request handlers by request type
ingressChannel chan service.DirectMessage //chan to relay messages into the server ingressChannel chan service.DirectMessage //chan to relay messages into the server
requestLifetime time.Duration //time a request can stay in the pending queue until evicted requestLifetime time.Duration //time a request can stay in the pending queue until evicted
workerCount sync.WaitGroup workerCount sync.WaitGroup
workerLimiter chan int workerLimiter chan int
exit chan struct{} exit chan struct{}
Expand All @@ -52,7 +57,7 @@ func NewMsgServer(network Service, name string, requestLifetime time.Duration, c
pendingQueue: list.New(), pendingQueue: list.New(),
network: network, network: network,
ingressChannel: network.RegisterDirectProtocolWithChannel(name, c), ingressChannel: network.RegisterDirectProtocolWithChannel(name, c),
msgRequestHandlers: make(map[MessageType]func(msg []byte) []byte), msgRequestHandlers: make(map[MessageType]func(message Message) []byte),
requestLifetime: requestLifetime, requestLifetime: requestLifetime,
exit: make(chan struct{}), exit: make(chan struct{}),
workerLimiter: make(chan int, runtime.NumCPU()), workerLimiter: make(chan int, runtime.NumCPU()),
Expand Down Expand Up @@ -133,17 +138,18 @@ func (p *MessageServer) removeFromPending(reqID uint64) {


func (p *MessageServer) handleMessage(msg Message) { func (p *MessageServer) handleMessage(msg Message) {
data := msg.Data().(*service.DataMsgWrapper) data := msg.Data().(*service.DataMsgWrapper)

if data.Req { if data.Req {
p.handleRequestMessage(msg.Sender(), data) p.handleRequestMessage(msg, data)
} else { } else {
p.handleResponseMessage(data) p.handleResponseMessage(data)
} }
} }


func (p *MessageServer) handleRequestMessage(sender p2pcrypto.PublicKey, headers *service.DataMsgWrapper) { func (p *MessageServer) handleRequestMessage(msg Message, data *service.DataMsgWrapper) {
if payload := p.msgRequestHandlers[MessageType(headers.MsgType)](headers.Payload); payload != nil { if payload := p.msgRequestHandlers[MessageType(data.MsgType)](msg); payload != nil {
rmsg := &service.DataMsgWrapper{MsgType: headers.MsgType, ReqID: headers.ReqID, Payload: payload} rmsg := &service.DataMsgWrapper{MsgType: data.MsgType, ReqID: data.ReqID, Payload: payload}
sendErr := p.network.SendWrappedMessage(sender, p.name, rmsg) sendErr := p.network.SendWrappedMessage(msg.Sender(), p.name, rmsg)
if sendErr != nil { if sendErr != nil {
p.Error("Error sending response message, err:", sendErr) p.Error("Error sending response message, err:", sendErr)
} }
Expand All @@ -164,10 +170,21 @@ func (p *MessageServer) handleResponseMessage(headers *service.DataMsgWrapper) {
} }
} }


func (p *MessageServer) RegisterMsgHandler(msgType MessageType, reqHandler func(msg []byte) []byte) { func (p *MessageServer) RegisterMsgHandler(msgType MessageType, reqHandler func(message Message) []byte) {
p.msgRequestHandlers[msgType] = reqHandler p.msgRequestHandlers[msgType] = reqHandler
} }


func handlerFromBytesHandler(in func(msg []byte) []byte) func(message Message) []byte {
return func(message Message) []byte {
payload := extractPayload(message)
return in(payload)
}
}

func (p *MessageServer) RegisterBytesMsgHandler(msgType MessageType, reqHandler func([]byte) []byte) {
p.msgRequestHandlers[msgType] = handlerFromBytesHandler(reqHandler)
}

func (p *MessageServer) SendRequest(msgType MessageType, payload []byte, address p2pcrypto.PublicKey, resHandler func(msg []byte)) error { func (p *MessageServer) SendRequest(msgType MessageType, payload []byte, address p2pcrypto.PublicKey, resHandler func(msg []byte)) error {
reqID := p.newRequestId() reqID := p.newRequestId()
p.pendMutex.Lock() p.pendMutex.Lock()
Expand Down
8 changes: 4 additions & 4 deletions p2p/server/msgserver_test.go
Expand Up @@ -22,8 +22,8 @@ func TestProtocol_SendRequest(t *testing.T) {
handler := func(msg []byte) []byte { handler := func(msg []byte) []byte {
return []byte("some value to return") return []byte("some value to return")
} }

// todo test nonbyte handlers
fnd1.RegisterMsgHandler(1, handler) fnd1.RegisterBytesMsgHandler(1, handler)


n2 := sim.NewNode() n2 := sim.NewNode()
fnd2 := NewMsgServer(n2, protocol, 5*time.Second, make(chan service.DirectMessage, config.ConfigValues.BufferSize), log.New("t2", "", "")) fnd2 := NewMsgServer(n2, protocol, 5*time.Second, make(chan service.DirectMessage, config.ConfigValues.BufferSize), log.New("t2", "", ""))
Expand Down Expand Up @@ -54,7 +54,7 @@ func TestProtocol_CleanOldPendingMessages(t *testing.T) {
return nil return nil
} }


fnd1.RegisterMsgHandler(1, handler) fnd1.RegisterBytesMsgHandler(1, handler)


n2 := sim.NewNode() n2 := sim.NewNode()
fnd2 := NewMsgServer(n2, protocol, 10*time.Millisecond, make(chan service.DirectMessage, config.ConfigValues.BufferSize), log.New("t4", "", "")) fnd2 := NewMsgServer(n2, protocol, 10*time.Millisecond, make(chan service.DirectMessage, config.ConfigValues.BufferSize), log.New("t4", "", ""))
Expand Down Expand Up @@ -98,7 +98,7 @@ func TestProtocol_Close(t *testing.T) {
return nil return nil
} }


fnd1.RegisterMsgHandler(1, handler) fnd1.RegisterBytesMsgHandler(1, handler)


n2 := sim.NewNode() n2 := sim.NewNode()
fnd2 := NewMsgServer(n2, protocol, 10*time.Millisecond, make(chan service.DirectMessage, config.ConfigValues.BufferSize), log.New("t6", "", "")) fnd2 := NewMsgServer(n2, protocol, 10*time.Millisecond, make(chan service.DirectMessage, config.ConfigValues.BufferSize), log.New("t6", "", ""))
Expand Down
10 changes: 9 additions & 1 deletion p2p/service/service.go
Expand Up @@ -2,6 +2,7 @@ package service


import ( import (
"github.com/spacemeshos/go-spacemesh/p2p/p2pcrypto" "github.com/spacemeshos/go-spacemesh/p2p/p2pcrypto"
"net"
) )


type MessageValidation struct { type MessageValidation struct {
Expand All @@ -22,12 +23,19 @@ func (mv MessageValidation) IsValid() bool {
return mv.isValid return mv.isValid
} }


// Metadata is a generic metadata interface
type P2PMetadata struct {
FromAddress net.Addr
// add here more fields that are needed by protocols
}

func NewMessageValidation(msg []byte, prot string, isValid bool) MessageValidation { func NewMessageValidation(msg []byte, prot string, isValid bool) MessageValidation {
return MessageValidation{msg, prot, isValid} return MessageValidation{msg, prot, isValid}
} }


// DirectMessage is an interface that represents a simple direct message structure // DirectMessage is an interface that represents a simple direct message structure
type DirectMessage interface { type DirectMessage interface {
Metadata() P2PMetadata
Sender() p2pcrypto.PublicKey Sender() p2pcrypto.PublicKey
Bytes() []byte Bytes() []byte
} }
Expand All @@ -47,7 +55,7 @@ type Service interface {
RegisterDirectProtocolWithChannel(protocol string, ingressChannel chan DirectMessage) chan DirectMessage RegisterDirectProtocolWithChannel(protocol string, ingressChannel chan DirectMessage) chan DirectMessage
SendMessage(peerPubkey p2pcrypto.PublicKey, protocol string, payload []byte) error SendMessage(peerPubkey p2pcrypto.PublicKey, protocol string, payload []byte) error
SubscribePeerEvents() (new chan p2pcrypto.PublicKey, del chan p2pcrypto.PublicKey) SubscribePeerEvents() (new chan p2pcrypto.PublicKey, del chan p2pcrypto.PublicKey)
ProcessDirectProtocolMessage(sender p2pcrypto.PublicKey, protocol string, payload Data) error ProcessDirectProtocolMessage(sender p2pcrypto.PublicKey, protocol string, payload Data, metadata P2PMetadata) error
ProcessGossipProtocolMessage(protocol string, data Data, validationCompletedChan chan MessageValidation) error ProcessGossipProtocolMessage(protocol string, data Data, validationCompletedChan chan MessageValidation) error
Broadcast(protocol string, payload []byte) error Broadcast(protocol string, payload []byte) error
Shutdown() Shutdown()
Expand Down

0 comments on commit 6a50382

Please sign in to comment.