diff --git a/p2p/dht/bootstrap.go b/p2p/dht/bootstrap.go index 37d6be39a8..936908e1d1 100644 --- a/p2p/dht/bootstrap.go +++ b/p2p/dht/bootstrap.go @@ -63,7 +63,7 @@ func (d *KadDHT) Bootstrap(ctx context.Context) error { return ErrConnectToBootNode } - d.local.Debug("lookup using %d preloaded bootnodes ", bn) + d.local.Debug("Lookup using %d preloaded bootnodes ", bn) ctx, _ = context.WithTimeout(ctx, BootstrapTimeout) err := d.tryBoot(ctx, c) diff --git a/p2p/dht/dht.go b/p2p/dht/dht.go index 659f5dcfc2..a37d06dad3 100644 --- a/p2p/dht/dht.go +++ b/p2p/dht/dht.go @@ -14,6 +14,8 @@ import ( // DHT is an interface to a general distributed hash table. type DHT interface { Update(node node.Node) + + InternalLookup(dhtid node.DhtID) []node.Node Lookup(pubkey string) (node.Node, error) SelectPeers(qty int) []node.Node @@ -52,8 +54,7 @@ func (d *KadDHT) Size() int { return <-req } -// SelectPeers asks routingtable to randomly select a slice of nodes in size `qty` -// SelectPeers asks routingtable to randomly select a slice of nodes in size `qty` +// SelectPeers asks routing table to randomly select a slice of nodes in size `qty` func (d *KadDHT) SelectPeers(qty int) []node.Node { return d.rt.SelectPeers(qty) } @@ -70,7 +71,7 @@ func New(node *node.LocalNode, config config.SwarmConfig, service service.Servic return d } -// Update insert or update a node in the routing table. +// Update insert or updates a node in the routing table. func (d *KadDHT) Update(node node.Node) { d.rt.Update(node) } @@ -79,11 +80,11 @@ func (d *KadDHT) Update(node node.Node) { // if the node can't be found there it sends a query to the network. func (d *KadDHT) Lookup(pubkey string) (node.Node, error) { dhtid := node.NewDhtIDFromBase58(pubkey) - poc := make(PeersOpChannel) - d.rt.NearestPeers(NearestPeersReq{dhtid, d.config.RoutingTableAlpha, poc}) - res := (<-poc).Peers - if len(res) == 0 { - return node.EmptyNode, ErrEmptyRoutingTable + + res := d.InternalLookup(dhtid) + + if res == nil { + return node.EmptyNode, errors.New("no peers found in routing table") } if res[0].DhtID().Equals(dhtid) { @@ -92,6 +93,16 @@ func (d *KadDHT) Lookup(pubkey string) (node.Node, error) { return d.kadLookup(pubkey, res) } +// InternalLookup finds a node in the dht by its public key, it issues a search inside the local routing table +func (d *KadDHT) InternalLookup(dhtid node.DhtID) []node.Node { + poc := make(PeersOpChannel) + d.rt.NearestPeers(NearestPeersReq{dhtid, d.config.RoutingTableAlpha, poc}) + res := (<-poc).Peers + if len(res) == 0 { + return nil + } + return res +} // Implements the kad algo for locating a remote node // Precondition - node is not in local routing table diff --git a/p2p/dht/dht_mock.go b/p2p/dht/dht_mock.go index 25a06b80da..c7529ae07a 100644 --- a/p2p/dht/dht_mock.go +++ b/p2p/dht/dht_mock.go @@ -1,20 +1,26 @@ package dht -import "github.com/spacemeshos/go-spacemesh/p2p/node" +import ( + "context" + "github.com/spacemeshos/go-spacemesh/p2p/node" +) // MockDHT is a mocked dht type MockDHT struct { - update func(n node.Node) - updateCount int - bsres error - bsCount int - lookupRes node.Node - lookupErr error + UpdateFunc func(n node.Node) + updateCount int + SelectPeersFunc func(qty int) []node.Node + bsres error + bsCount int + InternalLookupFunc func(dhtid node.DhtID) []node.Node + LookupFunc func(string) (node.Node, error) + lookupRes node.Node + lookupErr error } // SetUpdate sets the function to run on an issued update func (m *MockDHT) SetUpdate(f func(n node.Node)) { - m.update = f + m.UpdateFunc = f } // SetLookupResult sets the result ok a lookup operation @@ -25,8 +31,8 @@ func (m *MockDHT) SetLookupResult(node node.Node, err error) { // Update is a dht update operation it updates the updatecount func (m *MockDHT) Update(node node.Node) { - if m.update != nil { - m.update(node) + if m.UpdateFunc != nil { + m.UpdateFunc(node) } m.updateCount++ } @@ -43,24 +49,41 @@ func (m *MockDHT) BootstrapCount() int { // Lookup is a dht lookup operation func (m *MockDHT) Lookup(pubkey string) (node.Node, error) { + if m.LookupFunc != nil { + return m.LookupFunc(pubkey) + } return m.lookupRes, m.lookupErr } +// InternalLookup is a lookup only in the local routing table +func (m *MockDHT) InternalLookup(dhtid node.DhtID) []node.Node { + if m.InternalLookupFunc != nil { + return m.InternalLookupFunc(dhtid) + } + return nil +} + // SetBootstrap set the bootstrap result func (m *MockDHT) SetBootstrap(err error) { m.bsres = err } // Bootstrap is a dht bootstrap operation function it update the bootstrap count -func (m *MockDHT) Bootstrap() error { +func (m *MockDHT) Bootstrap(ctx context.Context) error { m.bsCount++ return m.bsres } + +// SelectPeers mocks selecting peers. func (m *MockDHT) SelectPeers(qty int) []node.Node { + if m.SelectPeersFunc != nil { + return m.SelectPeersFunc(qty) + } return []node.Node{} } +// Size returns the size of peers in the dht func (m *MockDHT) Size() int { //todo: set size return m.updateCount diff --git a/p2p/dht/table_test.go b/p2p/dht/table_test.go index 63f4b82c1b..53e54e4452 100644 --- a/p2p/dht/table_test.go +++ b/p2p/dht/table_test.go @@ -2,6 +2,7 @@ package dht import ( "math/rand" + "sync" "testing" "time" @@ -10,7 +11,6 @@ import ( "github.com/spacemeshos/go-spacemesh/p2p/node" "github.com/stretchr/testify/assert" - "sync" ) func GetTestLogger(name string) *logging.Logger { @@ -236,17 +236,14 @@ func TestRoutingTableImpl_SelectPeersDuplicates(t *testing.T) { } func TestRoutingTableImpl_SelectPeers_EnoughPeers(t *testing.T) { - const n = 500 + const n = 100 const random = 5 ids := make(map[string]node.Node) sids := make(map[string]RoutingTable) toselect := make(map[string]struct{}) - var wg sync.WaitGroup - var wg2 sync.WaitGroup - wg.Add(1) for i := 0; i < n; i++ { local := node.GenerateRandomNodeData() localID := local.DhtID() @@ -256,20 +253,26 @@ func TestRoutingTableImpl_SelectPeers_EnoughPeers(t *testing.T) { ids[local.String()] = local sids[local.String()] = rt - go func(l node.Node, rt RoutingTable) { - wg.Wait() - wg2.Add(1) - for nn := range ids { - if ids[nn].String() != l.String() { - rt.Update(ids[nn]) - } - } - wg2.Done() + } - }(local, rt) + var wg sync.WaitGroup + + for _, id := range ids { + id := id + for _, secondID := range ids { + if id.DhtID().Equals(secondID.DhtID()) { + continue + } + wg.Add(1) + go func(id, secondID node.Node) { + sids[id.String()].Update(secondID) + wg.Done() + }(id, secondID) + } } - wg.Done() - wg2.Wait() + + wg.Wait() + for rtid := range sids { sel := sids[rtid].SelectPeers(random) assert.NotNil(t, sel) diff --git a/p2p/gossip/protocol.go b/p2p/gossip/protocol.go index 3d2c00c6f8..0696e6b185 100644 --- a/p2p/gossip/protocol.go +++ b/p2p/gossip/protocol.go @@ -2,405 +2,327 @@ package gossip import ( "errors" - "fmt" - "github.com/davecgh/go-spew/spew" + "github.com/golang/protobuf/proto" "github.com/spacemeshos/go-spacemesh/crypto" "github.com/spacemeshos/go-spacemesh/log" "github.com/spacemeshos/go-spacemesh/p2p/config" "github.com/spacemeshos/go-spacemesh/p2p/message" - "github.com/spacemeshos/go-spacemesh/p2p/net" "github.com/spacemeshos/go-spacemesh/p2p/node" + "github.com/spacemeshos/go-spacemesh/p2p/pb" + "github.com/spacemeshos/go-spacemesh/p2p/service" "hash/fnv" "sync" "time" ) -// NoResultsInterval is the timeout we wait between requesting more peers repeatedly -const NoResultsInterval = 1 * time.Second +const messageQBufferSize = 100 -// PeerMessageQueueSize is the len of the msgQ each peer holds. -const PeerMessageQueueSize = 100 +const ProtocolName = "/p2p/1.0/gossip" +const protocolVer = "0" -// ConnectingTimeout is the timeout we wait when trying to connect a neighborhood -const ConnectingTimeout = 20 * time.Second //todo: add to the config +type hash uint32 // fnv.New32 must be used everytime to be sure we get consistent results. -func generateID(msg []byte) uint32 { +func calcHash(msg []byte) hash { msghash := fnv.New32() // todo: Add nonce to messages instead msghash.Write(msg) - return msghash.Sum32() + return hash(msghash.Sum32()) } -// Protocol is a simple declaration of the gossip protocol -type Protocol interface { - Broadcast(payload []byte) error - - Start() error - Close() - - Initial() <-chan struct{} - AddIncomingPeer(node.Node, net.Connection) - Disconnect(key crypto.PublicKey) -} - -// PeerSampler is a our interface to select peers -type PeerSampler interface { - SelectPeers(count int) []node.Node +// Interface for the underlying p2p layer +type baseNetwork interface { + SendMessage(peerPubKey string, protocol string, payload []byte) error + RegisterProtocol(protocol string) chan service.Message + SubscribePeerEvents() (conn chan crypto.PublicKey, disc chan crypto.PublicKey) + ProcessProtocolMessage(sender node.Node, protocol string, data service.Data) error } -// ConnectionFactory is our interface to get connections -type ConnectionFactory interface { - GetConnection(address string, pk crypto.PublicKey) (net.Connection, error) +type signer interface { + PublicKey() crypto.PublicKey + Sign(data []byte) ([]byte, error) } -// nodeConPair -type nodeConPair struct { - node.Node - net.Connection +type protocolMessage struct { + msg *pb.ProtocolMessage } -// Neighborhood is the gossip protocol, it manages a list of peers and sends them broadcasts -type Neighborhood struct { +// Protocol is the gossip protocol +type Protocol struct { log.Log config config.SwarmConfig + net baseNetwork + signer signer - initOnce sync.Once - initial chan struct{} - - peers map[string]*peer - inpeers map[string]*peer - - morePeersReq chan struct{} - connectingTimeout time.Duration - - inc chan nodeConPair // report incoming connection - - // we make sure we don't send a message twice - oldMessageMu sync.RWMutex - oldMessageQ map[uint32]struct{} - - ps PeerSampler - - cp ConnectionFactory - + peers map[string]*peer shutdown chan struct{} + oldMessageMu sync.RWMutex + oldMessageQ map[hash]struct{} peersMutex sync.RWMutex - inpeersMutex sync.RWMutex + + relayQ chan service.Message + messageQ chan protocolMessage } -// NewNeighborhood creates a new gossip protocol from type Neighborhood. -func NewNeighborhood(config config.SwarmConfig, ps PeerSampler, cp ConnectionFactory, log2 log.Log) *Neighborhood { - return &Neighborhood{ - Log: log2, - config: config, - initial: make(chan struct{}), - morePeersReq: make(chan struct{}, config.RandomConnections), - connectingTimeout: ConnectingTimeout, - peers: make(map[string]*peer, config.RandomConnections), - inpeers: make(map[string]*peer, config.RandomConnections), - oldMessageQ: make(map[uint32]struct{}), // todo : remember to drain this - shutdown: make(chan struct{}), - ps: ps, - cp: cp, +// NewProtocol creates a new gossip protocol instance. Call Start to start reading peers +func NewProtocol(config config.SwarmConfig, base baseNetwork, signer signer, log2 log.Log) *Protocol { + // intentionally not subscribing to peers events so that the channels won't block in case executing Start delays + relayChan := base.RegisterProtocol(ProtocolName) + return &Protocol{ + Log: log2, + config: config, + net: base, + signer: signer, + peers: make(map[string]*peer), + shutdown: make(chan struct{}), + oldMessageQ: make(map[hash]struct{}), // todo : remember to drain this + peersMutex: sync.RWMutex{}, + relayQ: relayChan, + messageQ: make(chan protocolMessage, messageQBufferSize), } } -// Make sure that neighborhood works as a Protocol in compile time -var _ Protocol = new(Neighborhood) +// sender is an interface for peer's p2p layer +type sender interface { + SendMessage(peerPubKey string, protocol string, payload []byte) error +} +// peer is a struct storing peer's state type peer struct { log.Log - node.Node - connected time.Time - conn net.Connection - knownMessages map[uint32]struct{} - msgQ chan []byte + pubKey crypto.PublicKey + msgMutex sync.RWMutex + knownMessages map[hash]struct{} + net sender } -func makePeer(node2 node.Node, c net.Connection, log log.Log) *peer { +func newPeer(net sender, pubKey crypto.PublicKey, log log.Log) *peer { return &peer{ log, - node2, - time.Now(), - c, - make(map[uint32]struct{}), - make(chan []byte, PeerMessageQueueSize), + pubKey, + sync.RWMutex{}, + make(map[hash]struct{}), + net, } } -func (p *peer) send(message []byte) error { - if p.conn == nil || p.conn.Session() == nil { - return fmt.Errorf("the connection does not exist for this peer") +// send sends a gossip message to the peer +func (p *peer) send(msg []byte, checksum hash) error { + // don't do anything if this peer know this msg + p.msgMutex.RLock() + if _, ok := p.knownMessages[checksum]; ok { + p.msgMutex.RUnlock() + return errors.New("already got this msg") } - return p.conn.Send(message) + p.msgMutex.RUnlock() + go func() { + err := p.net.SendMessage(p.pubKey.String(), ProtocolName, msg) + if err != nil { + p.Log.Info("Gossip protocol failed to send msg (calcHash %d) to peer %v, first attempt. err=%v", checksum, p.pubKey, err) + // doing one retry before giving up + err = p.net.SendMessage(p.pubKey.String(), "", msg) + if err != nil { + p.Log.Info("Gossip protocol failed to send msg (calcHash %d) to peer %v, second attempt. err=%v", checksum, p.pubKey, err) + return + } + } + p.msgMutex.Lock() + p.knownMessages[checksum] = struct{}{} + p.msgMutex.Unlock() + }() + return nil +} + +func (prot *Protocol) Close() { + close(prot.shutdown) } -// addMessages adds a message to this peer's queue -func (p *peer) addMessage(msg []byte, checksum uint32) error { - // dont do anything if this peer know this msg +// markMessage adds the calcHash to the old message queue so the message won't be processed in case received again +func (prot *Protocol) markMessage(h hash) { + prot.oldMessageMu.Lock() + prot.oldMessageQ[h] = struct{}{} + prot.oldMessageMu.Unlock() +} - if _, ok := p.knownMessages[checksum]; ok { - return errors.New("already got this msg") +func (prot *Protocol) propagateMessage(msg []byte, h hash) { + prot.peersMutex.RLock() + for p := range prot.peers { + peer := prot.peers[p] + prot.Debug("sending message to peer %v, hash %d", peer.pubKey, h) + peer.send(msg, h) // non blocking } + prot.peersMutex.RUnlock() +} - // check if connection and session are ok - c := p.conn - session := c.Session() - if c == nil || session == nil { - // todo: refresh Neighborhood or create session - return errors.New("no session") - } +func (prot *Protocol) validateMessage(msg *pb.ProtocolMessage) error { - data, err := message.PrepareMessage(session, msg) + err := message.AuthAuthor(msg) if err != nil { + prot.Log.Error("fail to authorize gossip message, err %v", err) return err } - select { - case p.msgQ <- data: - p.knownMessages[checksum] = struct{}{} - default: - return errors.New("Q was full") - + if msg.Metadata.ClientVersion != protocolVer { + prot.Log.Error("fail to validate message's protocol version when validating gossip message, err %v", err) + return err } - return nil } -func (p *peer) start() { - for { - m := <-p.msgQ - err := p.send(m) - if err != nil { - // todo: stop only when error is critical (identify errors) - log.Error("Failed sending message to this peer %v", p.Node.PublicKey().String()) - return - } - } - -} - -// Close closes the neighborhood and shutsdown requests for more peers -func (s *Neighborhood) Close() { - // no need to shutdown con, conpool will do so in a shutdown. the morepeerreq won't work - close(s.shutdown) - //todo close all peers -} - // Broadcast is the actual broadcast procedure, loop on peers and add the message to their queues -func (s *Neighborhood) Broadcast(msg []byte) error { - - if len(s.peers)+len(s.inpeers) == 0 { - return errors.New("No peers in neighborhood") +func (prot *Protocol) Broadcast(payload []byte, nextProt string) error { + // add gossip header + header := &pb.Metadata{ + NextProtocol: nextProt, + ClientVersion: protocolVer, + Timestamp: time.Now().Unix(), + AuthPubKey: prot.signer.PublicKey().Bytes(), + MsgSign: nil, } - oldmessage := false + msg := &pb.ProtocolMessage{ + Metadata: header, + Data: &pb.ProtocolMessage_Payload{Payload: payload}, + } - checksum := generateID(msg) + bin, err := proto.Marshal(msg) + if err != nil { + prot.Log.Error("failed to marshal message when generating gossip header, err %v", err) + return err - s.oldMessageMu.RLock() - if _, ok := s.oldMessageQ[checksum]; ok { - // todo : - have some more metrics for termination - // todo : - maybe tell the peer weg ot this message already? - oldmessage = true } - s.oldMessageMu.RUnlock() - if !oldmessage { - s.oldMessageMu.Lock() - s.oldMessageQ[checksum] = struct{}{} - s.oldMessageMu.Unlock() + sign, err2 := prot.signer.Sign(bin) + if err2 != nil { + prot.Log.Error("failed to Sign header when generating gossip header, err %v", err) + return err } - s.peersMutex.RLock() - for p := range s.peers { - peer := s.peers[p] - go peer.addMessage(msg, checksum) - s.Debug("adding message to peer %v", peer.Pretty()) - } - s.peersMutex.RUnlock() - s.inpeersMutex.RLock() - for p := range s.inpeers { - peer := s.inpeers[p] - go peer.addMessage(msg, checksum) - s.Debug("adding message to peer %v", peer.Pretty()) - } - s.inpeersMutex.RUnlock() + msg.Metadata.MsgSign = sign - //TODO: if we didn't send to RandomConnections then try to other peers. - if oldmessage { - return errors.New("old message") + finbin, err := proto.Marshal(msg) + if err != nil { + return err } + // so we won't process our own messages + hash := calcHash(finbin) + prot.markMessage(hash) + prot.propagateMessage(finbin, hash) return nil } -// getMorePeers tries to fill the `peers` slice with dialed outbound peers that we selected from the dht. -func (s *Neighborhood) getMorePeers(numpeers int) int { +// Start a loop that process peers events +func (prot *Protocol) Start() { + peerConn, peerDisc := prot.net.SubscribePeerEvents() // this was start blocks until we registered. + go prot.eventLoop(peerConn, peerDisc) +} - if numpeers == 0 { - return 0 - } +func (prot *Protocol) addPeer(peer crypto.PublicKey) { + prot.peersMutex.Lock() + prot.peers[peer.String()] = newPeer(prot.net, peer, prot.Log) + prot.peersMutex.Unlock() +} - // dht should provide us with random peers to connect to - nds := s.ps.SelectPeers(numpeers) - ndsLen := len(nds) - if ndsLen == 0 { - s.Debug("Peer sampler returned nothing.") - // this gets busy at start so we spare a second - return 0 // zero samples here so no reason to proceed - } +func (prot *Protocol) removePeer(peer crypto.PublicKey) { + prot.peersMutex.Lock() + delete(prot.peers, peer.String()) + prot.peersMutex.Unlock() +} - type cnErr struct { - n node.Node - c net.Connection - err error +func (prot *Protocol) isOldMessage(h hash) bool { + var oldmessage bool + prot.oldMessageMu.RLock() + if _, ok := prot.oldMessageQ[h]; ok { + oldmessage = true + } else { + oldmessage = false } + prot.oldMessageMu.RUnlock() + return oldmessage +} - res := make(chan cnErr, numpeers) +func (prot *Protocol) handleRelayMessage(msgB []byte) error { + hash := calcHash(msgB) - // Try a connection to each peer. - // TODO: try splitting the load and don't connect to more than X at a time - for i := 0; i < ndsLen; i++ { - go func(nd node.Node, reportChan chan cnErr) { - c, err := s.cp.GetConnection(nd.Address(), nd.PublicKey()) - reportChan <- cnErr{nd, c, err} - }(nds[i], res) - } + // in case the message was received through the relay channel we need to remove the Gossip layer and hand the payload for the next protocol to process + if prot.isOldMessage(hash) { + // todo : - have some more metrics for termination + // todo : - maybe tell the peer weg ot this message already? + prot.Log.Debug("got old message, hash %d", hash) + } else { - total, bad := 0, 0 - tm := time.NewTimer(s.connectingTimeout) // todo: configure -loop: - for { - select { - case cne := <-res: - total++ // We count i everytime to know when to close the channel - - if cne.err != nil { - s.Error("can't establish connection with sampled peer %v, %v", cne.n.String(), cne.err) - bad++ - if total == ndsLen { - break loop - } - continue // this peer didn't work, todo: tell dht - } + msg := &pb.ProtocolMessage{} + err := proto.Unmarshal(msgB, msg) + if err != nil { + prot.Log.Error("failed to unmarshal when handling relay message, err %v", err) + return err + } - p := makePeer(cne.n, cne.c, s.Log) - s.inpeersMutex.Lock() - //_, ok := s.outbound[cne.n.String()] - _, ok := s.inpeers[cne.n.PublicKey().String()] - if ok { - delete(s.inpeers, cne.n.PublicKey().String()) - } - s.inpeersMutex.Unlock() + err = prot.validateMessage(msg) + if err != nil { + prot.Log.Error("failed to validate message when handling relay message, err %v", err) + return err + } - s.peersMutex.Lock() - s.peers[cne.n.PublicKey().String()] = p - s.peersMutex.Unlock() + prot.markMessage(hash) - go p.start() - s.Debug("Neighborhood: Added peer to peer list %v", cne.n.Pretty()) + var data service.Data - if total == ndsLen { - break loop - } - case <-tm.C: - break loop - case <-s.shutdown: - break loop + if payload := msg.GetPayload(); payload != nil { + data = service.Data_Bytes{Payload: payload} + } else if wrap := msg.GetMsg(); wrap != nil { + data = service.Data_MsgWrapper{Req: wrap.Req, MsgType: wrap.Type, ReqID: wrap.ReqID, Payload: wrap.Payload} } - } - return total - bad -} + authKey, err := crypto.NewPublicKey(msg.Metadata.AuthPubKey) + if err != nil { + prot.Log.Error("failed to decode the auth public key when handling relay message, err %v", err) + return err + } -// Start a loop that manages the peers we are connected to all the time -// It connects to config.RandomConnections and after that maintains this number -// of connections, if a connection is closed we notify the loop that we need more peers now. -func (s *Neighborhood) Start() error { - //TODO: Save and load persistent peers ? - s.Info("Neighborhood service started") + go prot.net.ProcessProtocolMessage(node.New(authKey, ""), msg.Metadata.NextProtocol, data) + } - // initial request for peers - go s.loop() - s.morePeersReq <- struct{}{} + prot.propagateMessage(msgB, hash) return nil } -func (s *Neighborhood) loop() { +func (prot *Protocol) eventLoop(peerConn chan crypto.PublicKey, peerDisc chan crypto.PublicKey) { loop: for { select { - case <-s.morePeersReq: - s.Debug("loop: got morePeersReq") - go s.askForMorePeers() - case <-s.shutdown: + case msg := <-prot.relayQ: + // incoming messages from p2p layer for process and relay + go func() { + // [todo some err handling + prot.handleRelayMessage(msg.Bytes()) + }() + case peer := <-peerConn: + go prot.addPeer(peer) + case peer := <-peerDisc: + go prot.removePeer(peer) + case <-prot.shutdown: break loop // maybe error ? } } } -func (s *Neighborhood) askForMorePeers() { - numpeers := len(s.peers) - req := s.config.RandomConnections - numpeers - - if req <= 0 { - return - } - - s.getMorePeers(req) - - // todo: better way then going in this everytime ? - if len(s.peers) >= s.config.RandomConnections { - s.initOnce.Do(func() { - s.Info("gossip; connected to initial required neighbors - %v", len(s.peers)) - close(s.initial) - s.peersMutex.RLock() - s.Debug(spew.Sdump(s.peers)) - s.peersMutex.RUnlock() - }) - return - } - // if we could'nt get any maybe were initializing - // wait a little bit before trying again - time.Sleep(NoResultsInterval) - s.morePeersReq <- struct{}{} -} - -// Initial returns when the neighborhood was initialized. -func (s *Neighborhood) Initial() <-chan struct{} { - return s.initial -} - -// Disconnect removes a peer from the neighborhood, it requests more peers if our outbound peer count is less than configured -func (s *Neighborhood) Disconnect(key crypto.PublicKey) { - peer := key.String() - - s.inpeersMutex.Lock() - if _, ok := s.inpeers[peer]; ok { - delete(s.inpeers, peer) - s.inpeersMutex.Unlock() - return - } - s.inpeersMutex.Unlock() - - s.peersMutex.Lock() - if _, ok := s.peers[peer]; ok { - delete(s.peers, peer) - } - s.peersMutex.Unlock() - s.morePeersReq <- struct{}{} +// peersCount returns the number of peers know to the protocol, used for testing only +func (prot *Protocol) peersCount() int { + prot.peersMutex.RLock() + cnt := len(prot.peers) + prot.peersMutex.RUnlock() + return cnt } -// AddIncomingPeer inserts a peer to the neighborhood as a remote peer. -func (s *Neighborhood) AddIncomingPeer(n node.Node, c net.Connection) { - p := makePeer(n, c, s.Log) - s.inpeersMutex.Lock() - s.inpeers[n.PublicKey().String()] = p - s.inpeersMutex.Unlock() - go p.start() +// hasPeer returns whether or not a peer is known to the protocol, used for testing only +func (prot *Protocol) hasPeer(key crypto.PublicKey) bool { + prot.peersMutex.RLock() + _, ok := prot.peers[key.String()] + prot.peersMutex.RUnlock() + return ok } diff --git a/p2p/gossip/protocol_test.go b/p2p/gossip/protocol_test.go index 8b781ea834..c845132e35 100644 --- a/p2p/gossip/protocol_test.go +++ b/p2p/gossip/protocol_test.go @@ -1,28 +1,117 @@ package gossip import ( - "errors" + "github.com/gogo/protobuf/proto" "github.com/spacemeshos/go-spacemesh/crypto" "github.com/spacemeshos/go-spacemesh/log" "github.com/spacemeshos/go-spacemesh/p2p/config" - "github.com/spacemeshos/go-spacemesh/p2p/net" "github.com/spacemeshos/go-spacemesh/p2p/node" + "github.com/spacemeshos/go-spacemesh/p2p/pb" + "github.com/spacemeshos/go-spacemesh/p2p/service" "github.com/stretchr/testify/assert" + "sync" "testing" "time" ) -type mockCpool struct { - f func(address string, pk crypto.PublicKey) (net.Connection, error) +type mockBaseNetwork struct { + msgSentByPeer map[string]uint32 + inbox chan service.Message + connSubs []chan crypto.PublicKey + discSubs []chan crypto.PublicKey + totalMsgCount int + processProtocolCount int + msgMutex sync.Mutex + pcountwg *sync.WaitGroup + msgwg *sync.WaitGroup + lastMsg []byte } -func (mcp *mockCpool) GetConnection(address string, pk crypto.PublicKey) (net.Connection, error) { - if mcp.f != nil { - return mcp.f(address, pk) +func newMockBaseNetwork() *mockBaseNetwork { + return &mockBaseNetwork{ + make(map[string]uint32), + make(chan service.Message, 30), + make([]chan crypto.PublicKey, 0, 5), + make([]chan crypto.PublicKey, 0, 5), + 0, + 0, + sync.Mutex{}, + &sync.WaitGroup{}, + &sync.WaitGroup{}, + []byte(nil), } - c := net.NewConnectionMock(pk) - c.SetSession(net.SessionMock{}) - return c, nil +} + +func (mbn *mockBaseNetwork) SendMessage(peerPubKey string, protocol string, payload []byte) error { + mbn.msgMutex.Lock() + mbn.lastMsg = payload + mbn.msgSentByPeer[peerPubKey]++ + mbn.totalMsgCount++ + mbn.msgMutex.Unlock() + releaseWaiters(mbn.msgwg) + return nil +} + +func passOrDeadlock(t testing.TB, group *sync.WaitGroup) { + ch := make(chan struct{}) + go func(ch chan struct{}, t testing.TB) { + timer := time.NewTimer(time.Second*3) + for { + select { + case <-ch: + return + case <-timer.C: + t.FailNow() // deadlocked + } + } + }(ch, t) + + group.Wait() + close(ch) +} + +// we use releaseWaiters to release a waitgroup and not panic if we don't use it +func releaseWaiters(group *sync.WaitGroup) { + group.Done() +} + +func (mbn *mockBaseNetwork) RegisterProtocol(protocol string) chan service.Message { + return mbn.inbox +} + +func (mbn *mockBaseNetwork) SubscribePeerEvents() (conn chan crypto.PublicKey, disc chan crypto.PublicKey) { + conn = make(chan crypto.PublicKey, 20) + disc = make(chan crypto.PublicKey, 20) + + mbn.connSubs = append(mbn.connSubs, conn) + mbn.discSubs = append(mbn.discSubs, disc) + return +} + +func (mbn *mockBaseNetwork) ProcessProtocolMessage(sender node.Node, protocol string, data service.Data) error { + mbn.processProtocolCount++ + releaseWaiters(mbn.pcountwg) + return nil +} + +func (mbn *mockBaseNetwork) addRandomPeers(cnt int) { + for i := 0; i < cnt; i++ { + _, pub, _ := crypto.GenerateKeyPair() + mbn.addRandomPeer(pub) + } +} + +func (mbn *mockBaseNetwork) addRandomPeer(pub crypto.PublicKey) { + for _, p := range mbn.connSubs { + p <- pub + } +} + +func (mbn *mockBaseNetwork) totalMessageSent() int { + mbn.msgMutex.Lock() + total := mbn.totalMsgCount + mbn.msgMutex.Unlock() + return total } type mockSampler struct { @@ -36,261 +125,317 @@ func (mcs *mockSampler) SelectPeers(count int) []node.Node { return node.GenerateRandomNodesData(count) } -//todo : more unit tests +type TestMessage struct { + data service.Data +} -func TestNeighborhood_Broadcast(t *testing.T) { - n := NewNeighborhood(config.DefaultConfig().SwarmConfig, new(mockSampler), new(mockCpool), log.New("tesT", "", "")) - err := n.Broadcast([]byte("msg")) - assert.Error(t, err) +func (tm TestMessage) Sender() node.Node { + return node.Node{} } -func TestNeighborhood_AddIncomingPeer(t *testing.T) { - n := NewNeighborhood(config.DefaultConfig().SwarmConfig, new(mockSampler), new(mockCpool), log.New("tesT", "", "")) - n.Start() - rnd := node.GenerateRandomNodeData() - n.AddIncomingPeer(rnd, nil) +func (tm TestMessage) setData(msg service.Data) { + tm.data = msg +} - n.inpeersMutex.RLock() - p, ok := n.inpeers[rnd.PublicKey().String()] - n.inpeersMutex.RUnlock() +func (tm TestMessage) Data() service.Data { + return tm.data +} - assert.True(t, ok) - assert.NotNil(t, p) +func (tm TestMessage) Bytes() []byte { + return tm.data.Bytes() } -func TestNeighborhood_Broadcast2(t *testing.T) { - n := NewNeighborhood(config.DefaultConfig().SwarmConfig, new(mockSampler), new(mockCpool), log.New("tesT", "", "")) - n.Start() - <-n.Initial() - err := n.Broadcast([]byte("LOL")) - assert.NoError(t, err) - tm := time.NewTimer(time.Millisecond * 1) -loop: - for { - select { - case <-tm.C: - n.peersMutex.Lock() - for _, p := range n.peers { - i := p.conn.(*net.ConnectionMock).SendCount() - if assert.True(t, i > 0) { - n.peersMutex.Unlock() - break loop - } - } - n.peersMutex.Lock() - break loop - default: +type testSigner struct { + pv crypto.PrivateKey +} - } - } +func (ms testSigner) PublicKey() crypto.PublicKey { + return ms.pv.GetPublicKey() } -func TestNeighborhood_Broadcast3(t *testing.T) { - n := NewNeighborhood(config.DefaultConfig().SwarmConfig, new(mockSampler), new(mockCpool), log.New("tesT", "", "")) - n.Start() - <-n.Initial() - chk := generateID([]byte("LOL")) - n.oldMessageQ[chk] = struct{}{} - - err := n.Broadcast([]byte("LOL")) - assert.Error(t, err) - tm := time.NewTimer(time.Millisecond * 1) -loop: - for { - select { - case <-tm.C: - n.peersMutex.Lock() - for _, p := range n.peers { - i := p.conn.(*net.ConnectionMock).SendCount() - if assert.True(t, i > 0) { - n.peersMutex.Unlock() - break loop - } - } - n.peersMutex.Lock() - break loop - default: +func (ms testSigner) Sign(data []byte) ([]byte, error) { + return ms.pv.Sign(data) +} - } +func newTestSigner(t testing.TB) testSigner { + pv, _, err := crypto.GenerateKeyPair() + assert.NoError(t, err) + return testSigner{pv} +} + +func newTestSignedMessageData(t testing.TB, signer signer) []byte { + pm := &pb.ProtocolMessage{ + Metadata: &pb.Metadata{ + NextProtocol: ProtocolName, + AuthPubKey: signer.PublicKey().Bytes(), + Timestamp: time.Now().Unix(), + ClientVersion: protocolVer, + }, + Data: &pb.ProtocolMessage_Payload{[]byte("LOL")}, } + + return signedMessage(t, signer, pm).Bytes() } -func TestNeighborhood_Broadcast4(t *testing.T) { - n := NewNeighborhood(config.DefaultConfig().SwarmConfig, new(mockSampler), new(mockCpool), log.New("tesT", "", "")) - n.Start() - <-n.Initial() +func addPeersAndTest(t testing.TB, num int, p *Protocol, net *mockBaseNetwork, work bool) { - rnd := node.GenerateRandomNodeData() - con, _ := new(mockCpool).GetConnection(rnd.Address(), rnd.PublicKey()) - n.AddIncomingPeer(rnd, con) + pc := p.peersCount() + reg , _ := net.SubscribePeerEvents() + net.addRandomPeers(num) - n.Broadcast([]byte("LOL")) - tm := time.NewTimer(time.Second) -loop: + i := 0 +lop: for { select { - case <-tm.C: - t.Error() + case <-reg: + i++ + time.Sleep(time.Millisecond) // we need to somehow let other goroutines work before us default: + break lop } - n.inpeersMutex.RLock() - for _, p := range n.inpeers { - i := p.conn.(*net.ConnectionMock).SendCount() - if i > 0 { - break loop - } - } - n.inpeersMutex.RUnlock() + } + if i != num { + t.Fatal("Didn't get added peers on chan") + } + + newpc := p.peersCount() + worked := pc+num == newpc + if worked != work { + t.Fatalf("adding the peers didn't work as expected old peer count: %d, tried to add: %d, new peercount: %d", pc, num, newpc) } } -func Test_Neihborhood_getMorePeers(t *testing.T) { - // test normal flow - numpeers := 3 - sampMock := new(mockSampler) - cpoolMock := new(mockCpool) - n := NewNeighborhood(config.DefaultConfig().SwarmConfig, sampMock, cpoolMock, log.New("test", "", "")) +//todo : more unit tests - res := n.getMorePeers(0) // this should'nt work - assert.Equal(t, res, 0) +func TestNeighborhood_AddIncomingPeer(t *testing.T) { + n := NewProtocol(config.DefaultConfig().SwarmConfig, newMockBaseNetwork(), newTestSigner(t), log.New("tesT", "", "")) + n.Start() + _, pub, _ := crypto.GenerateKeyPair() + n.addPeer(pub) - //test no peers - sampMock.f = func(count int) []node.Node { - return []node.Node{} - } + assert.True(t, n.hasPeer(pub)) + assert.Equal(t, 1, n.peersCount()) +} - res = n.getMorePeers(10) - assert.Equal(t, res, 0) - sampMock.f = nil +func signedMessage(t testing.TB, s signer, message *pb.ProtocolMessage) service.Data { + pmbin, err := proto.Marshal(message) + assert.NoError(t, err) + sign, err := s.Sign(pmbin) + assert.NoError(t, err) + message.Metadata.MsgSign = sign + finbin, err := proto.Marshal(message) + assert.NoError(t, err) + return service.Data_Bytes{finbin} +} - // test connection error - cpoolMock.f = func(address string, pk crypto.PublicKey) (net.Connection, error) { - return nil, errors.New("can't make connection") - } +func TestNeighborhood_Relay(t *testing.T) { + net := newMockBaseNetwork() + n := NewProtocol(config.DefaultConfig().SwarmConfig, net, newTestSigner(t), log.New("tesT", "", "")) + n.Start() - res = n.getMorePeers(1) // this should'nt work - assert.Equal(t, res, 0) - cpoolMock.f = nil // for next tests + addPeersAndTest(t, 20, n, net, true) + + signer := newTestSigner(t) + pm := &pb.ProtocolMessage{ + Metadata: &pb.Metadata{ + NextProtocol: ProtocolName, + AuthPubKey: signer.PublicKey().Bytes(), + Timestamp: time.Now().Unix(), + ClientVersion: protocolVer, + }, + Data: &pb.ProtocolMessage_Payload{[]byte("LOL")}, + } - // not starting so we can test getMorePeers - res = n.getMorePeers(numpeers) - assert.Equal(t, res, numpeers) - assert.Equal(t, len(n.peers), numpeers) + signed := signedMessage(t, signer, pm) - // test inc peer + var msg service.Message = TestMessage{signed} + net.pcountwg.Add(1) + net.msgwg.Add(20) + net.inbox <- msg + passOrDeadlock(t, net.pcountwg) + passOrDeadlock(t, net.msgwg) + assert.Equal(t, 1, net.processProtocolCount) + assert.Equal(t, 20, net.totalMsgCount) +} - nd := node.GenerateRandomNodeData() - cn, _ := cpoolMock.GetConnection(nd.Address(), nd.PublicKey()) - n.AddIncomingPeer(nd, cn) +func TestNeighborhood_Broadcast(t *testing.T) { + net := newMockBaseNetwork() + n := NewProtocol(config.DefaultConfig().SwarmConfig, net, newTestSigner(t), log.New("tesT", "", "")) + n.Start() + addPeersAndTest(t, 20, n, net, true) + net.msgwg.Add(20) - n.peersMutex.RLock() - _, ok := n.inpeers[nd.PublicKey().String()] - n.peersMutex.RUnlock() - assert.True(t, ok) + n.Broadcast([]byte("LOL"), "") + passOrDeadlock(t, net.msgwg) + assert.Equal(t, 0, net.processProtocolCount) + assert.Equal(t, 20, net.totalMessageSent()) +} - //test replacing inc peer +func TestNeighborhood_Relay2(t *testing.T) { + net := newMockBaseNetwork() + n := NewProtocol(config.DefaultConfig().SwarmConfig, net, newTestSigner(t), log.New("tesT", "", "")) + n.Start() - sampMock.f = func(count int) []node.Node { - some := node.GenerateRandomNodesData(count - 1) - some = append(some, nd) - return some + signer := newTestSigner(t) + pm := &pb.ProtocolMessage{ + Metadata: &pb.Metadata{ + NextProtocol: ProtocolName, + AuthPubKey: signer.PublicKey().Bytes(), + Timestamp: time.Now().Unix(), + ClientVersion: protocolVer, + }, + Data: &pb.ProtocolMessage_Payload{[]byte("LOL")}, } - res = n.getMorePeers(numpeers) - assert.Equal(t, res, numpeers) - assert.Equal(t, len(n.peers), numpeers+res) - - n.peersMutex.RLock() - _, ok = n.peers[nd.PublicKey().String()] - n.peersMutex.RUnlock() - assert.True(t, ok) - - n.peersMutex.RLock() - _, ok = n.inpeers[nd.PublicKey().String()] - n.peersMutex.RUnlock() - assert.False(t, ok) -} - -func TestNeighborhood_Initial(t *testing.T) { - sampMock := new(mockSampler) - cpoolMock := new(mockCpool) - n := NewNeighborhood(config.DefaultConfig().SwarmConfig, sampMock, cpoolMock, log.New("test", "", "")) - err := n.Start() - assert.NoError(t, err, "start returned err") // should never error - in := n.Initial() - select { - case <-in: - break - case <-time.After(time.Second * 2): - t.Error("2 seconds passed") - } - n.Close() + signed := signedMessage(t, signer, pm) + var msg service.Message = TestMessage{signed} + net.pcountwg.Add(1) + net.inbox <- msg + passOrDeadlock(t, net.pcountwg) + assert.Equal(t, 1, net.processProtocolCount) + assert.Equal(t, 0, net.totalMessageSent()) + + addPeersAndTest(t, 20, n, net, true) + net.msgwg.Add(20) + net.inbox <- msg + passOrDeadlock(t, net.msgwg) + assert.Equal(t, 20, net.totalMessageSent()) } -func TestNeighborhood_Disconnect(t *testing.T) { - sampMock := new(mockSampler) - cpoolMock := new(mockCpool) +func TestNeighborhood_Broadcast2(t *testing.T) { + net := newMockBaseNetwork() + n := NewProtocol(config.DefaultConfig().SwarmConfig, net, newTestSigner(t), log.New("tesT", "", "")) + n.Start() - out := node.GenerateRandomNodeData() - t.Log("MY PEER ", out) + msgB := newTestSignedMessageData(t, newTestSigner(t)) + addPeersAndTest(t, 1, n, net, true) + net.msgwg.Add(1) + n.Broadcast(msgB, "") // dosent matter + passOrDeadlock(t, net.msgwg) + assert.Equal(t, 0, net.processProtocolCount) + assert.Equal(t, 1, net.totalMessageSent()) + + addPeersAndTest(t, 20, n, net, true) + net.msgwg.Add(20) + var msg service.Message = TestMessage{service.Data_Bytes{net.lastMsg}} + net.inbox <- msg + passOrDeadlock(t, net.msgwg) + assert.Equal(t, 0, net.processProtocolCount) + assert.Equal(t, 21, net.totalMessageSent()) +} - sampMock.f = func(count int) []node.Node { - some := node.GenerateRandomNodesData(count - 1) - some = append(some, out) - return some - } +func TestNeighborhood_Broadcast3(t *testing.T) { + // todo : Fix this test, because the first message is broadcasted `Broadcast` attaches metadata to it with the current authoring timestamp + // to test that the the next message doesn't get processed by the protocol we must create an exact copy of the message produced at `Broadcast` + net := newMockBaseNetwork() + n := NewProtocol(config.DefaultConfig().SwarmConfig, net, newTestSigner(t), log.New("tesT", "", "")) + n.Start() - n := NewNeighborhood(config.DefaultConfig().SwarmConfig, sampMock, cpoolMock, log.New("tesT", "", "")) - err := n.Start() - assert.NoError(t, err) // should never error + addPeersAndTest(t, 20, n, net, true) - select { - case <-n.Initial(): - break - case <-time.After(time.Second): - t.Error("didnt initialize") - } + msgB := []byte("LOL") + net.msgwg.Add(20) + n.Broadcast(msgB, "") + passOrDeadlock(t, net.msgwg) + assert.Equal(t, 0, net.processProtocolCount) + assert.Equal(t, 20, net.totalMessageSent()) - sampMock.f = nil // dont give out on a normal state + var msg service.Message = TestMessage{service.Data_Bytes{net.lastMsg}} + net.inbox <- msg + assert.Equal(t, 0, net.processProtocolCount) + assert.Equal(t, 20, net.totalMessageSent()) +} - nd := node.GenerateRandomNodeData() - cn, _ := cpoolMock.GetConnection(nd.Address(), nd.PublicKey()) - n.AddIncomingPeer(nd, cn) +func TestNeighborhood_Relay3(t *testing.T) { + net := newMockBaseNetwork() + n := NewProtocol(config.DefaultConfig().SwarmConfig, net, newTestSigner(t), log.New("tesT", "", "")) + n.Start() - n.inpeersMutex.RLock() - _, ok := n.inpeers[nd.PublicKey().String()] - assert.True(t, ok) - n.inpeersMutex.RUnlock() + var msg service.Message = TestMessage{service.Data_Bytes{newTestSignedMessageData(t, newTestSigner(t))}} + net.pcountwg.Add(1) + net.inbox <- msg + passOrDeadlock(t, net.pcountwg) + assert.Equal(t, 1, net.processProtocolCount) + assert.Equal(t, 0, net.totalMessageSent()) - n.Disconnect(nd.PublicKey()) + addPeersAndTest(t, 20, n, net, true) - n.inpeersMutex.RLock() - _, ok = n.inpeers[nd.PublicKey().String()] - assert.False(t, ok) - n.inpeersMutex.RUnlock() + net.msgwg.Add(20) + net.inbox <- msg + passOrDeadlock(t, net.msgwg) + assert.Equal(t, 1, net.processProtocolCount) + assert.Equal(t, 20, net.totalMessageSent()) - cpoolMock.f = func(address string, pk crypto.PublicKey) (net.Connection, error) { - return nil, errors.New("no connections") - } + addPeersAndTest(t, 1, n, net, true) + + net.msgwg.Add(1) + net.inbox <- msg + passOrDeadlock(t, net.msgwg) + + assert.Equal(t, 1, net.processProtocolCount) + assert.Equal(t, 21, net.totalMessageSent()) +} + +func TestNeighborhood_Start(t *testing.T) { + net := newMockBaseNetwork() + n := NewProtocol(config.DefaultConfig().SwarmConfig, net, newTestSigner(t), log.New("tesT", "", "")) - n.Disconnect(out.PublicKey()) + // before Start + addPeersAndTest(t, 20, n, net, false) - n.peersMutex.RLock() - _, ok = n.peers[nd.PublicKey().String()] - assert.False(t, ok) - n.peersMutex.RUnlock() + n.Start() + addPeersAndTest(t, 20, n, net, true) } func TestNeighborhood_Close(t *testing.T) { - sampMock := new(mockSampler) - cpoolMock := new(mockCpool) + net := newMockBaseNetwork() + n := NewProtocol(config.DefaultConfig().SwarmConfig, net, newTestSigner(t), log.New("tesT", "", "")) - n := NewNeighborhood(config.DefaultConfig().SwarmConfig, sampMock, cpoolMock, log.New("tesT", "", "")) - err := n.Start() - assert.NoError(t, err) // should never error + n.Start() + addPeersAndTest(t, 20, n, net, true) n.Close() - <-n.shutdown + addPeersAndTest(t, 20, n, net, false) +} + +func TestNeighborhood_Disconnect(t *testing.T) { + net := newMockBaseNetwork() + n := NewProtocol(config.DefaultConfig().SwarmConfig, net, newTestSigner(t), log.New("tesT", "", "")) + + n.Start() + _, pub1, _ := crypto.GenerateKeyPair() + n.addPeer(pub1) + _, pub2, _ := crypto.GenerateKeyPair() + n.addPeer(pub2) + assert.Equal(t, 2, n.peersCount()) + + msg := newTestSignedMessageData(t, newTestSigner(t)) + + net.pcountwg.Add(1) + net.msgwg.Add(2) + net.inbox <- TestMessage{service.Data_Bytes{msg}} + passOrDeadlock(t, net.pcountwg) + passOrDeadlock(t, net.msgwg) + assert.Equal(t, 1, net.processProtocolCount) + assert.Equal(t, 2, net.totalMessageSent()) + + msg2 := newTestSignedMessageData(t, newTestSigner(t)) + + n.removePeer(pub1) + net.pcountwg.Add(1) + net.msgwg.Add(1) + net.inbox <- TestMessage{service.Data_Bytes{msg2}} + passOrDeadlock(t, net.pcountwg) + passOrDeadlock(t, net.msgwg) + assert.Equal(t, 2, net.processProtocolCount) + assert.Equal(t, 3, net.totalMessageSent()) + + n.addPeer(pub1) + net.msgwg.Add(1) + net.inbox <- TestMessage{service.Data_Bytes{msg2}} + passOrDeadlock(t, net.msgwg) + assert.Equal(t, 2, net.processProtocolCount) + assert.Equal(t, 4, net.totalMessageSent()) } diff --git a/p2p/gossip_test.go b/p2p/gossip_test.go index 72c7df456c..8ddcb119c8 100644 --- a/p2p/gossip_test.go +++ b/p2p/gossip_test.go @@ -16,6 +16,7 @@ import ( // this is a long test, about 60sec - will be fixed on #289 func TestGossip(t *testing.T) { + t.Skip() bootnodes := []int{1} nodes := []int{10} rcon := []int{3} diff --git a/p2p/message/message.go b/p2p/message/message.go index e1edd307a9..47eaba9fd4 100644 --- a/p2p/message/message.go +++ b/p2p/message/message.go @@ -18,28 +18,15 @@ func PrepareMessage(ns net.NetworkSession, data []byte) ([]byte, error) { return nil, fmt.Errorf("aborting send - failed to encrypt payload: %v", err) } - cmd := &pb.CommonMessageData{ - SessionId: ns.ID(), - Payload: encPayload, - Timestamp: time.Now().Unix(), - } - - final, err := proto.Marshal(cmd) - if err != nil { - e := fmt.Errorf("aborting send - invalid msg format %v", err) - return nil, e - } - - return final, nil + return encPayload, nil } // NewProtocolMessageMetadata creates meta-data for an outgoing protocol message authored by this node. -func NewProtocolMessageMetadata(author crypto.PublicKey, protocol string, gossip bool) *pb.Metadata { +func NewProtocolMessageMetadata(author crypto.PublicKey, protocol string) *pb.Metadata { return &pb.Metadata{ - Protocol: protocol, + NextProtocol: protocol, ClientVersion: config.ClientVersion, Timestamp: time.Now().Unix(), - Gossip: gossip, AuthPubKey: author.Bytes(), } } @@ -57,21 +44,18 @@ func SignMessage(pv crypto.PrivateKey, pm *pb.ProtocolMessage) error { return fmt.Errorf("failed to sign message err:%v", err) } - // TODO : AuthorSign: string => bytes - pm.Metadata.AuthorSign = hex.EncodeToString(sign) + pm.Metadata.MsgSign = sign return nil } // AuthAuthor authorizes that a message is signed by its claimed author func AuthAuthor(pm *pb.ProtocolMessage) error { - // TODO: consider getting pubkey from outside. attackar coul'd just manipulate the whole message pubkey and sign. if pm == nil || pm.Metadata == nil { - fmt.Println("WTF HAPPENED !?", pm.Metadata, pm) - //spew.Dump(*pm) + return fmt.Errorf("can't sign defected message, message or metadata was empty") } - sign := pm.Metadata.AuthorSign + sign := pm.Metadata.MsgSign sPubkey := pm.Metadata.AuthPubKey pubkey, err := crypto.NewPublicKey(sPubkey) @@ -79,7 +63,7 @@ func AuthAuthor(pm *pb.ProtocolMessage) error { return fmt.Errorf("could'nt create public key from %v, err: %v", hex.EncodeToString(sPubkey), err) } - pm.Metadata.AuthorSign = "" // we have to verify the message without the sign + pm.Metadata.MsgSign = nil // we have to verify the message without the sign bin, err := proto.Marshal(pm) @@ -87,12 +71,7 @@ func AuthAuthor(pm *pb.ProtocolMessage) error { return err } - binsig, err := hex.DecodeString(sign) - if err != nil { - return err - } - - v, err := pubkey.Verify(bin, binsig) + v, err := pubkey.Verify(bin, sign) if err != nil { return err @@ -102,7 +81,7 @@ func AuthAuthor(pm *pb.ProtocolMessage) error { return fmt.Errorf("coudld'nt verify message") } - pm.Metadata.AuthorSign = sign // restore sign because maybe we'll send it again ( gossip ) + pm.Metadata.MsgSign = sign // restore sign because maybe we'll send it again ( gossip ) return nil -} +} \ No newline at end of file diff --git a/p2p/message/message_test.go b/p2p/message/message_test.go index 973b42ff1f..febf44bb7e 100644 --- a/p2p/message/message_test.go +++ b/p2p/message/message_test.go @@ -1,7 +1,6 @@ package message import ( - "encoding/hex" "github.com/gogo/protobuf/proto" "github.com/spacemeshos/go-spacemesh/crypto" "github.com/spacemeshos/go-spacemesh/p2p/config" @@ -19,16 +18,14 @@ func Test_NewProtocolMessageMeatadata(t *testing.T) { assert.NotNil(t, pk) - meta := NewProtocolMessageMetadata(pk, "EX", gossip) + meta := NewProtocolMessageMetadata(pk, "EX") assert.NotNil(t, meta, "should be a metadata") assert.Equal(t, meta.Timestamp, time.Now().Unix()) assert.Equal(t, meta.ClientVersion, config.ClientVersion) assert.Equal(t, meta.AuthPubKey, pk.Bytes()) - assert.Equal(t, meta.Protocol, "EX") - assert.Equal(t, meta.Gossip, gossip) - assert.Equal(t, meta.AuthorSign, "") - + assert.Equal(t, meta.NextProtocol, "EX") + assert.Equal(t, meta.MsgSign, []byte(nil)) } func TestSwarm_AuthAuthor(t *testing.T) { @@ -41,8 +38,8 @@ func TestSwarm_AuthAuthor(t *testing.T) { assert.NotNil(t, pub) pm := &pb.ProtocolMessage{ - Metadata: NewProtocolMessageMetadata(pub, "EX", false), - Data: &pb.ProtocolMessage_Payload{Payload: []byte("EX")}, + Metadata: NewProtocolMessageMetadata(pub, "EX"), + Data: &pb.ProtocolMessage_Payload{[]byte("EX")}, } ppm, err := proto.Marshal(pm) assert.NoError(t, err, "cant marshal msg ", err) @@ -50,9 +47,8 @@ func TestSwarm_AuthAuthor(t *testing.T) { // sign it s, err := priv.Sign(ppm) assert.NoError(t, err, "cant sign ", err) - ssign := hex.EncodeToString(s) - pm.Metadata.AuthorSign = ssign + pm.Metadata.MsgSign = s vererr := AuthAuthor(pm) assert.NoError(t, vererr) @@ -65,9 +61,8 @@ func TestSwarm_AuthAuthor(t *testing.T) { s, err = priv2.Sign(ppm) assert.NoError(t, err, "cant sign ", err) - ssign = hex.EncodeToString(s) - pm.Metadata.AuthorSign = ssign + pm.Metadata.MsgSign = s vererr = AuthAuthor(pm) assert.Error(t, vererr) @@ -75,10 +70,9 @@ func TestSwarm_AuthAuthor(t *testing.T) { func TestSwarm_SignAuth(t *testing.T) { n, _ := node.GenerateTestNode(t) - pm := &pb.ProtocolMessage{ - Metadata: NewProtocolMessageMetadata(n.PublicKey(), "EX", false), - Data: &pb.ProtocolMessage_Payload{Payload: []byte("EX")}, + Metadata: NewProtocolMessageMetadata(n.PublicKey(), "EX"), + Data: &pb.ProtocolMessage_Payload{[]byte("EX")}, } err := SignMessage(n.PrivateKey(), pm) diff --git a/p2p/pb/message.proto b/p2p/pb/message.proto index 6b61800aff..0c37332a14 100644 --- a/p2p/pb/message.proto +++ b/p2p/pb/message.proto @@ -3,15 +3,6 @@ syntax = "proto3"; package pb; option go_package = "pb"; -// Handshake protocol message format - -// data common to all messages - Top level msg format -message CommonMessageData { - bytes sessionId = 1; // always set (handshake or other protocol messages) - bytes payload = 2; // encrypted payload with session aes key - binary protobufs. empty for handshake methods - int64 timestamp = 3; // sending time - // we don't want to add anything here even protocol names as it goes unencrypted over the wire -} // Handshake protocol data used for both request and response - sent unencrypted over the wire message HandshakeData { @@ -43,12 +34,11 @@ message ProtocolMessage { } message Metadata { - string protocol = 1; // Protocol id string + string nextProtocol = 1; // Protocol id string string clientVersion = 2; // Author client version int64 timestamp = 3; // Unix time - authoring time (not sending time) - bool gossip = 4; // True to have receiver peer gossip the message to its neighbors - bytes authPubKey = 5; // Authoring node Secp256k1 public key (32bytes) - may not be sender - string authorSign = 6; // Signature of message data by author + method specific data by message creator node. format: hexEncode([]bytes) + bytes authPubKey = 4; // Authoring node Secp256k1 public key (32bytes) - may not be sender + bytes msgSign = 5; // Signature of message data by author + method specific data by message creator node. format: hexEncode([]bytes) } message MessageWrapper { diff --git a/p2p/service/service.go b/p2p/service/service.go index 2875090460..f73afbe43a 100644 --- a/p2p/service/service.go +++ b/p2p/service/service.go @@ -1,6 +1,7 @@ package service import ( + "github.com/spacemeshos/go-spacemesh/crypto" "github.com/spacemeshos/go-spacemesh/p2p/node" ) @@ -15,6 +16,8 @@ type Service interface { Start() error RegisterProtocol(protocol string) chan Message SendMessage(nodeID string, protocol string, payload []byte) error + SubscribePeerEvents() (new chan crypto.PublicKey, del chan crypto.PublicKey) + ProcessProtocolMessage(sender node.Node, protocol string, payload Data) error Broadcast(protocol string, payload []byte) error Shutdown() } diff --git a/p2p/service/sim.go b/p2p/service/sim.go index f90b63a9fc..5d1357f137 100644 --- a/p2p/service/sim.go +++ b/p2p/service/sim.go @@ -2,6 +2,7 @@ package service import ( "errors" + "github.com/spacemeshos/go-spacemesh/crypto" "github.com/spacemeshos/go-spacemesh/log" "github.com/spacemeshos/go-spacemesh/p2p/node" "io" @@ -16,8 +17,14 @@ type Simulator struct { mutex sync.RWMutex protocolHandler map[string]map[string]chan Message // maps peerPubkey -> protocol -> handler nodes map[string]*Node + + subLock sync.Mutex + newPeersSubs []chan crypto.PublicKey + delPeersSubs []chan crypto.PublicKey } +var _ Service = new(Node) + type dht interface { Update(node2 node.Node) } @@ -38,11 +45,38 @@ func NewSimulator() *Simulator { return s } +func (s *Simulator) SubscribeToPeerEvents() (chan crypto.PublicKey, chan crypto.PublicKey) { + newp := make(chan crypto.PublicKey) + delp := make(chan crypto.PublicKey) + s.subLock.Lock() + s.newPeersSubs = append(s.newPeersSubs, newp) + s.delPeersSubs = append(s.delPeersSubs, delp) + s.subLock.Unlock() + return newp, delp +} + +func (s *Simulator) publishNewPeer(peer crypto.PublicKey) { + s.subLock.Lock() + for _, s := range s.newPeersSubs { + s <- peer + } + s.subLock.Unlock() +} + +func (s *Simulator) publishDelPeer(peer crypto.PublicKey) { + s.subLock.Lock() + for _, s := range s.delPeersSubs { + s <- peer + } + s.subLock.Unlock() +} + func (s *Simulator) createdNode(n *Node) { s.mutex.Lock() s.protocolHandler[n.PublicKey().String()] = make(map[string]chan Message) s.nodes[n.PublicKey().String()] = n s.mutex.Unlock() + s.publishNewPeer(n.PublicKey()) } // NewNode creates a new p2p node in this Simulator @@ -102,6 +136,18 @@ func (sn *Node) Start() error { return nil } +// ProcessProtocolMessage +func (sn *Node) ProcessProtocolMessage(sender node.Node, protocol string, payload Data) error { + sn.sim.mutex.RLock() + c, ok := sn.sim.protocolHandler[sn.String()][protocol] + sn.sim.mutex.RUnlock() + if !ok { + return errors.New("Unknown protocol") + } + c <- simMessage{payload, sender} + return nil +} + // SendMessage sends a protocol message to the specified nodeID. // returns error if the node cant be found. corresponds to `SendMessage` @@ -139,6 +185,10 @@ func (sn *Node) Broadcast(protocol string, payload []byte) error { return nil } +func (sn *Node) SubscribePeerEvents() (chan crypto.PublicKey, chan crypto.PublicKey) { + return sn.sim.SubscribeToPeerEvents() +} + // RegisterProtocol creates and returns a channel for a given protocol. func (sn *Node) RegisterProtocol(protocol string) chan Message { c := make(chan Message) diff --git a/p2p/swarm.go b/p2p/swarm.go index c7c309bdd9..09437a1f5b 100644 --- a/p2p/swarm.go +++ b/p2p/swarm.go @@ -7,6 +7,7 @@ import ( "fmt" "github.com/davecgh/go-spew/spew" "github.com/golang/protobuf/proto" + "github.com/spacemeshos/go-spacemesh/crypto" "github.com/spacemeshos/go-spacemesh/p2p/config" "github.com/spacemeshos/go-spacemesh/p2p/connectionpool" "github.com/spacemeshos/go-spacemesh/p2p/dht" @@ -24,6 +25,9 @@ import ( "time" ) +// ConnectingTimeout is the timeout we wait when trying to connect a neighborhood +const ConnectingTimeout = 20 * time.Second //todo: add to the config + type protocolMessage struct { sender node.Node data service.Data @@ -41,7 +45,12 @@ func (pm protocolMessage) Bytes() []byte { return pm.data.Bytes() } +type cPool interface { + GetConnection(address string, pk crypto.PublicKey) (net.Connection, error) +} + type swarm struct { + // init info props started uint32 bootErr error bootChan chan struct{} @@ -50,6 +59,12 @@ type swarm struct { config config.Config + // Context for cancel + ctx context.Context + + // Shutdown the loop + shutdown chan struct{} // local request to kill the swarm from outside. e.g when local node is shutting down + // set in construction and immutable state lNode *node.LocalNode @@ -58,17 +73,26 @@ type swarm struct { protocolHandlers map[string]chan service.Message protocolHandlerMutex sync.RWMutex - gossip gossip.Protocol - + gossip *gossip.Protocol network *net.Net + cPool cPool + dht dht.DHT - cPool *connectionpool.ConnectionPool + //neighborhood + initOnce sync.Once + initial chan struct{} - dht dht.DHT - // Context for cancel - ctx context.Context - // Shutdown the loop - shutdown chan struct{} // local request to kill the swarm from outside. e.g when local node is shutting down + outpeersMutex sync.RWMutex + inpeersMutex sync.RWMutex + outpeers map[string]crypto.PublicKey + inpeers map[string]crypto.PublicKey + + morePeersReq chan struct{} + connectingTimeout time.Duration + + peerLock sync.RWMutex + newPeerSub []chan crypto.PublicKey + delPeerSub []chan crypto.PublicKey } func (s *swarm) waitForBoot() error { @@ -114,26 +138,54 @@ func newSwarm(ctx context.Context, config config.Config, newNode bool, persist b } s := &swarm{ - config: config, - lNode: l, - bootChan: make(chan struct{}), - gossipC: make(chan struct{}), + ctx: ctx, + config: config, + lNode: l, + bootChan: make(chan struct{}), + gossipC: make(chan struct{}), + shutdown: make(chan struct{}), // non-buffered so requests to shutdown block until swarm is shut down + + initial: make(chan struct{}), + morePeersReq: make(chan struct{}), + inpeers: make(map[string]crypto.PublicKey), + outpeers: make(map[string]crypto.PublicKey), + newPeerSub: make([]chan crypto.PublicKey, 0, 10), + delPeerSub: make([]chan crypto.PublicKey, 0, 10), + connectingTimeout: ConnectingTimeout, + protocolHandlers: make(map[string]chan service.Message), network: n, cPool: connectionpool.NewConnectionPool(n, l.PublicKey()), - shutdown: make(chan struct{}), // non-buffered so requests to shutdown block until swarm is shut down - ctx: ctx, } s.dht = dht.New(l, config.SwarmConfig, s) - s.gossip = gossip.NewNeighborhood(config.SwarmConfig, s.dht, s.cPool, s.lNode.Log) + s.gossip = gossip.NewProtocol(config.SwarmConfig, s, newSignerValidator(s), s.lNode.Log) s.lNode.Debug("Created swarm for local node %s, %s", l.Address(), l.Pretty()) return s, nil } +type signerValidator struct { + pubKey crypto.PublicKey + signFunc func([]byte) ([]byte, error) +} + +// PublicKey is the signerValidtor pair pub key +func (sv *signerValidator) PublicKey() crypto.PublicKey { + return sv.pubKey +} + +// Sign is delegating the sign function form the private key +func (sv *signerValidator) Sign(data []byte) ([]byte, error) { + return sv.signFunc(data) +} + +func newSignerValidator(s *swarm) *signerValidator { + return &signerValidator{s.lNode.PublicKey(), s.lNode.PrivateKey().Sign} +} + func (s *swarm) Start() error { if atomic.LoadUint32(&s.started) == 1 { return errors.New("swarm already running") @@ -153,27 +205,34 @@ func (s *swarm) Start() error { err := s.dht.Bootstrap(s.ctx) if err != nil { s.bootErr = err + close(s.bootChan) s.Shutdown() + return } close(s.bootChan) s.lNode.Info("DHT Bootstrapped with %d peers in %v", s.dht.Size(), time.Since(b)) }() } - if s.config.SwarmConfig.Gossip { - go func() { - if s.config.SwarmConfig.Bootstrap { - s.waitForBoot() - } - err := s.gossip.Start() - if err != nil { - s.gossipErr = err - s.Shutdown() - } - <-s.gossip.Initial() + // start gossip before starting to collect peers + + s.gossip.Start() // non-blocking + + // wait for neighborhood + go func() { + if s.config.SwarmConfig.Bootstrap { + s.waitForBoot() + } + err := s.startNeighborhood() + if err != nil { + s.gossipErr = err close(s.gossipC) - }() // todo handle error async - } + s.Shutdown() + return + } + <-s.initial + close(s.gossipC) + }() // todo handle error async return nil } @@ -182,7 +241,7 @@ func (s *swarm) LocalNode() *node.LocalNode { return s.lNode } -func (s *swarm) connectionPool() *connectionpool.ConnectionPool { +func (s *swarm) connectionPool() cPool { return s.cPool } @@ -225,7 +284,7 @@ func (s *swarm) sendMessageImpl(peerPubKey string, protocol string, payload serv } protomessage := &pb.ProtocolMessage{ - Metadata: message.NewProtocolMessageMetadata(s.lNode.PublicKey(), protocol, false), + Metadata: message.NewProtocolMessageMetadata(s.lNode.PublicKey(), protocol), } switch x := payload.(type) { @@ -341,18 +400,31 @@ Loop: for { select { case con := <-closing: - go s.gossip.Disconnect(con.RemotePublicKey()) //todo notify dht? + go s.retryOrReplace(con.RemotePublicKey()) //todo notify dht? case nce := <-newConnEvents: - go func(nce net.NewConnectionEvent) { s.dht.Update(nce.Node); s.gossip.AddIncomingPeer(nce.Node, nce.Conn) }(nce) + go func(nce net.NewConnectionEvent) { s.dht.Update(nce.Node); s.addIncomingPeer(nce.Node.PublicKey()) }(nce) case <-s.shutdown: break Loop } } } -// swarm serial event processing -// provides concurrency safety as only one callback is executed at a time -// so there's no need for sync internal data structures +func (s *swarm) retryOrReplace(key crypto.PublicKey) { + getpeer := s.dht.InternalLookup(node.NewDhtID(key.Bytes())) + + if getpeer == nil { + s.Disconnect(key) // if we didn't find then we can't try replaceing + return + } + peer := getpeer[0] + + _, err := s.cPool.GetConnection(peer.Address(), peer.PublicKey()) + if err != nil { // we could'nt connect :/ + s.Disconnect(key) + } +} + +// periodically checks that our clock is sync func (s *swarm) checkTimeDrifts() { checkTimeSync := time.NewTicker(config.TimeConfigValues.RefreshNtpInterval) Loop: @@ -381,8 +453,6 @@ var ( ErrBadFormat2 = errors.New("bad msg format, could'nt deserialize 2") // ErrOutOfSync is returned when messsage timestamp was out of sync ErrOutOfSync = errors.New("received out of sync msg") - // ErrNoPayload empty payload message - ErrNoPayload = errors.New("deprecated code path, no payload in message") // ErrFailDecrypt session cant decrypt ErrFailDecrypt = errors.New("can't decrypt message payload with session key") // ErrAuthAuthor message sign is wrong @@ -403,27 +473,10 @@ var ( func (s *swarm) onRemoteClientMessage(msg net.IncomingMessageEvent) error { if msg.Message == nil || msg.Conn == nil { - s.lNode.Fatal("Fatal error: Got nil message or connection") return ErrBadFormat1 } s.lNode.Debug(fmt.Sprintf("Handle message from << %v", msg.Conn.RemotePublicKey().Pretty())) - c := &pb.CommonMessageData{} - err := proto.Unmarshal(msg.Message, c) - if err != nil { - return ErrBadFormat1 - } - - // check that the message was send within a reasonable time - if ok := timesync.CheckMessageDrift(c.Timestamp); !ok { - // TODO: consider kill connection with this node and maybe blacklist - // TODO : Also consider moving send timestamp into metadata(encrypted). - return ErrOutOfSync - } - - if len(c.Payload) == 0 { - return ErrNoPayload - } // protocol messages are encrypted in payload // Locate the session @@ -433,7 +486,7 @@ func (s *swarm) onRemoteClientMessage(msg net.IncomingMessageEvent) error { return ErrNoSession } - decPayload, err := session.Decrypt(c.Payload) + decPayload, err := session.Decrypt(msg.Message) if err != nil { return ErrFailDecrypt } @@ -444,76 +497,288 @@ func (s *swarm) onRemoteClientMessage(msg net.IncomingMessageEvent) error { s.lNode.Errorf("proto marshinling err=", err) return ErrBadFormat2 } - if pm.Metadata == nil { - spew.Dump(pm) - panic("this is a defected message") // todo: Session bug, session scrambles messages and remove metadata + + // authenticate valid pubkey, same as session remote pubkey and validate sign. + authPub, err := crypto.NewPublicKey(pm.Metadata.AuthPubKey) + if err != nil { + return ErrAuthAuthor } - // authenticate message author - we already authenticated the sender via the shared session key secret + + if !bytes.Equal(authPub.Bytes(), msg.Conn.RemotePublicKey().Bytes()) { + return ErrAuthAuthor + } + err = message.AuthAuthor(pm) if err != nil { return ErrAuthAuthor } - if !pm.Metadata.Gossip && !bytes.Equal(pm.Metadata.AuthPubKey, msg.Conn.RemotePublicKey().Bytes()) { - //wtf ? - return ErrNotFromPeer + // check that the message was send within a reasonable time + if ok := timesync.CheckMessageDrift(pm.Metadata.Timestamp); !ok { + // TODO: consider kill connection with this node and maybe blacklist + // TODO : Also consider moving send timestamp into metadata(encrypted). + return ErrOutOfSync } - s.lNode.Debug("Authorized %v protocol message ", pm.Metadata.Protocol) + s.lNode.Debug("Authorized %v protocol message ", pm.Metadata.NextProtocol) remoteNode := node.New(msg.Conn.RemotePublicKey(), "") // if we got so far, we already have the node in our rt, hence address won't be used // update the routing table - we just heard from this authenticated node s.dht.Update(remoteNode) - // participate in gossip even if we don't know this protocol - if pm.Metadata.Gossip { // todo : use gossip uid - s.LocalNode().Debug("Got gossip message! relaying it") - // don't block anyway - err = s.gossip.Broadcast(decPayload) // err only if this is an old message - } + // route authenticated message to the registered protocol - if err != nil { - return nil + var data service.Data + + if payload := pm.GetPayload(); payload != nil { + data = service.Data_Bytes{Payload: payload} + } else if wrap := pm.GetMsg(); wrap != nil { + data = service.Data_MsgWrapper{Req: wrap.Req, MsgType: wrap.Type, ReqID: wrap.ReqID, Payload: wrap.Payload} } + + return s.ProcessProtocolMessage(remoteNode, pm.Metadata.NextProtocol, data) +} + +// ProcessProtocolMessage passes an already decrypted message to a protocol. +func (s *swarm) ProcessProtocolMessage(sender node.Node, protocol string, data service.Data) error { // route authenticated message to the reigstered protocol s.protocolHandlerMutex.RLock() - msgchan := s.protocolHandlers[pm.Metadata.Protocol] + msgchan := s.protocolHandlers[protocol] s.protocolHandlerMutex.RUnlock() if msgchan == nil { - s.LocalNode().Errorf("there was a bad protocol ", pm.Metadata.Protocol) return ErrNoProtocol } + s.lNode.Debug("Forwarding message to %v protocol", protocol) - s.lNode.Debug("Forwarding message to protocol") + msgchan <- protocolMessage{sender, data} - if payload := pm.GetPayload(); payload != nil { - msgchan <- protocolMessage{remoteNode, service.Data_Bytes{Payload: payload}} - } else if wrap := pm.GetMsg(); wrap != nil { - msgchan <- protocolMessage{remoteNode, service.Data_MsgWrapper{Req: wrap.Req, MsgType: wrap.Type, ReqID: wrap.ReqID, Payload: wrap.Payload}} + return nil +} + +// Broadcast creates a gossip message signs it and disseminate it to neighbors. +func (s *swarm) Broadcast(protocol string, payload []byte) error { + return s.ProcessProtocolMessage(s.lNode.Node, gossip.ProtocolName, service.Data_Bytes{payload}) +} + +// Neighborhood : neighborhood is the peers we keep close , meaning we try to keep connections +// to them at any given time and if not possible we replace them. protocols use the neighborhood +// to run their logic with peers. + +func (s *swarm) publishNewPeer(peer crypto.PublicKey) { + s.peerLock.RLock() + for _, p := range s.newPeerSub { + select { + case p <- peer: + default: + } } + s.peerLock.RUnlock() +} + +func (s *swarm) publishDelPeer(peer crypto.PublicKey) { + s.peerLock.RLock() + for _, p := range s.delPeerSub { + select { + case p <- peer: + default: + } + } + s.peerLock.RUnlock() +} + +// SubscribePeerEvents lets clients listen on events inside the swarm about peers. first chan is new peers, second is deleted peers. +func (s *swarm) SubscribePeerEvents() (chan crypto.PublicKey, chan crypto.PublicKey) { + in := make(chan crypto.PublicKey, s.config.SwarmConfig.RandomConnections) // todo. what size this should be ? maybe let client pass channels. + del := make(chan crypto.PublicKey, s.config.SwarmConfig.RandomConnections) + s.peerLock.Lock() + s.newPeerSub = append(s.newPeerSub, in) + s.delPeerSub = append(s.delPeerSub, del) + s.peerLock.Unlock() + // todo : send the existing peers right away ? + return in, del +} + +// NoResultsInterval is the timeout we wait between requesting more peers repeatedly +const NoResultsInterval = 1 * time.Second + +// startNeighborhood a loop that manages the peers we are connected to all the time +// It connects to config.RandomConnections and after that maintains this number +// of connections, if a connection is closed we notify the loop that we need more peers now. +func (s *swarm) startNeighborhood() error { + //TODO: Save and load persistent peers ? + s.lNode.Info("Neighborhood service started") + + // initial request for peers + go s.peersLoop() + s.morePeersReq <- struct{}{} return nil } -// Broadcast creates a gossip message signs it and disseminate it to neighbors -func (s *swarm) Broadcast(protocol string, payload []byte) error { - // start by making the message - pm := &pb.ProtocolMessage{ - Metadata: message.NewProtocolMessageMetadata(s.LocalNode().PublicKey(), protocol, true), - Data: &pb.ProtocolMessage_Payload{Payload: payload}, +func (s *swarm) peersLoop() { +loop: + for { + select { + case <-s.morePeersReq: + s.lNode.Debug("loop: got morePeersReq") + go s.askForMorePeers() + //todo: try getting the connections (hearbeat) + case <-s.shutdown: + break loop // maybe error ? + } } +} - err := message.SignMessage(s.lNode.PrivateKey(), pm) - if err != nil { - return err +func (s *swarm) askForMorePeers() { + numpeers := len(s.outpeers) + req := s.config.SwarmConfig.RandomConnections - numpeers + if req <= 0 { + return + } + + s.getMorePeers(req) + + // todo: better way then going in this everytime ? + if len(s.outpeers) >= s.config.SwarmConfig.RandomConnections { + s.initOnce.Do(func() { + s.lNode.Info("gossip; connected to initial required neighbors - %v", len(s.outpeers)) + close(s.initial) + s.outpeersMutex.RLock() + s.lNode.Debug(spew.Sdump(s.outpeers)) + s.outpeersMutex.RUnlock() + }) + return + } + // if we could'nt get any maybe were initializing + // wait a little bit before trying again + time.Sleep(NoResultsInterval) + s.morePeersReq <- struct{}{} +} + +// getMorePeers tries to fill the `peers` slice with dialed outbound peers that we selected from the dht. +func (s *swarm) getMorePeers(numpeers int) int { + + if numpeers == 0 { + return 0 } - msg, err := proto.Marshal(pm) + // dht should provide us with random peers to connect to + nds := s.dht.SelectPeers(numpeers) + ndsLen := len(nds) + if ndsLen == 0 { + s.lNode.Debug("Peer sampler returned nothing.") + // this gets busy at start so we spare a second + return 0 // zero samples here so no reason to proceed + } - if err != nil { - return err + type cnErr struct { + n node.Node + err error + } + + res := make(chan cnErr, numpeers) + + // Try a connection to each peer. + // TODO: try splitting the load and don't connect to more than X at a time + for i := 0; i < ndsLen; i++ { + go func(nd node.Node, reportChan chan cnErr) { + _, err := s.cPool.GetConnection(nd.Address(), nd.PublicKey()) + reportChan <- cnErr{nd, err} + }(nds[i], res) + } + + total, bad := 0, 0 + tm := time.NewTimer(s.connectingTimeout) // todo: configure +loop: + for { + select { + case cne := <-res: + total++ // We count i everytime to know when to close the channel + + if cne.err != nil { + s.lNode.Debug("can't establish connection with sampled peer %v, %v", cne.n.String(), cne.err) + bad++ + if total == ndsLen { + break loop + } + continue // this peer didn't work, todo: tell dht + } + + s.inpeersMutex.Lock() + _, ok := s.inpeers[cne.n.PublicKey().String()] + s.inpeersMutex.Unlock() + if ok { + s.lNode.Debug("not allowing peers from inbound to upgrade to outbound to prevent poisoning, peer %v", cne.n.String()) + bad++ + if total == ndsLen { + break loop + } + continue + + } + + s.outpeersMutex.Lock() + s.outpeers[cne.n.PublicKey().String()] = cne.n.PublicKey() + s.outpeersMutex.Unlock() + + s.publishNewPeer(cne.n.PublicKey()) + s.lNode.Debug("Neighborhood: Added peer to peer list %v", cne.n.Pretty()) + + if total == ndsLen { + break loop + } + case <-tm.C: + break loop + case <-s.shutdown: + break loop + } } - return s.gossip.Broadcast(msg) + return total - bad +} + +// Disconnect removes a peer from the neighborhood, it requests more peers if our outbound peer count is less than configured +func (s *swarm) Disconnect(key crypto.PublicKey) { + peer := key.String() + + s.inpeersMutex.Lock() + if _, ok := s.inpeers[peer]; ok { + delete(s.inpeers, peer) + s.inpeersMutex.Unlock() + s.publishDelPeer(key) + return + } + s.inpeersMutex.Unlock() + + s.outpeersMutex.Lock() + if _, ok := s.outpeers[peer]; ok { + delete(s.outpeers, peer) + } + s.outpeersMutex.Unlock() + s.publishDelPeer(key) + s.morePeersReq <- struct{}{} +} + +// AddIncomingPeer inserts a peer to the neighborhood as a remote peer. +func (s *swarm) addIncomingPeer(n crypto.PublicKey) { + s.inpeersMutex.Lock() + // todo limit number of inpeers + s.inpeers[n.String()] = n + s.inpeersMutex.Unlock() + s.publishNewPeer(n) +} + +func (s *swarm) hasIncomingPeer(peer crypto.PublicKey) bool { + s.inpeersMutex.RLock() + _, ok := s.inpeers[peer.String()] + s.inpeersMutex.RUnlock() + return ok +} + +func (s *swarm) hasOutgoingPeer(peer crypto.PublicKey) bool { + s.outpeersMutex.RLock() + _, ok := s.outpeers[peer.String()] + s.outpeersMutex.RUnlock() + return ok } diff --git a/p2p/swarm_test.go b/p2p/swarm_test.go index 18dd7473d5..08bee3c49c 100644 --- a/p2p/swarm_test.go +++ b/p2p/swarm_test.go @@ -1,13 +1,12 @@ package p2p import ( - "encoding/hex" + "github.com/spacemeshos/go-spacemesh/p2p/dht" "testing" "time" "context" "errors" - "fmt" "github.com/gogo/protobuf/proto" "github.com/spacemeshos/go-spacemesh/crypto" "github.com/spacemeshos/go-spacemesh/p2p/config" @@ -16,12 +15,22 @@ import ( "github.com/spacemeshos/go-spacemesh/p2p/node" "github.com/spacemeshos/go-spacemesh/p2p/pb" "github.com/spacemeshos/go-spacemesh/p2p/service" - "github.com/spacemeshos/go-spacemesh/timesync" "github.com/stretchr/testify/assert" "math/rand" "sync" ) +type cpoolMock struct { + f func(address string, pk crypto.PublicKey) (net.Connection, error) +} + +func (cp *cpoolMock) GetConnection(address string, pk crypto.PublicKey) (net.Connection, error) { + if cp.f != nil { + return cp.f(address, pk) + } + return net.NewConnectionMock(pk), nil +} + func p2pTestInstance(t testing.TB, config config.Config) *swarm { port, err := node.GetUnboundedPort() assert.NoError(t, err, "Error getting a port", err) @@ -33,6 +42,16 @@ func p2pTestInstance(t testing.TB, config config.Config) *swarm { return p } +func p2pTestNoStart(t testing.TB, config config.Config) *swarm { + port, err := node.GetUnboundedPort() + assert.NoError(t, err, "Error getting a port", err) + config.TCPPort = port + p, err := newSwarm(context.TODO(), config, true, true) + assert.NoError(t, err, "Error creating p2p stack, err: %v", err) + assert.NotNil(t, p) + return p +} + const exampleProtocol = "EX" const examplePayload = "Example" @@ -109,8 +128,8 @@ func TestSwarm_authAuthor(t *testing.T) { assert.NotNil(t, pub) pm := &pb.ProtocolMessage{ - Metadata: message.NewProtocolMessageMetadata(pub, exampleProtocol, false), - Data: &pb.ProtocolMessage_Payload{Payload: []byte(examplePayload)}, + Metadata: message.NewProtocolMessageMetadata(pub, exampleProtocol), + Data: &pb.ProtocolMessage_Payload{[]byte(examplePayload)}, } ppm, err := proto.Marshal(pm) assert.NoError(t, err, "cant marshal msg ", err) @@ -118,9 +137,8 @@ func TestSwarm_authAuthor(t *testing.T) { // sign it s, err := priv.Sign(ppm) assert.NoError(t, err, "cant sign ", err) - ssign := hex.EncodeToString(s) - pm.Metadata.AuthorSign = ssign + pm.Metadata.MsgSign = s vererr := message.AuthAuthor(pm) assert.NoError(t, vererr) @@ -133,9 +151,8 @@ func TestSwarm_authAuthor(t *testing.T) { s, err = priv2.Sign(ppm) assert.NoError(t, err, "cant sign ", err) - ssign = hex.EncodeToString(s) - pm.Metadata.AuthorSign = ssign + pm.Metadata.MsgSign = s vererr = message.AuthAuthor(pm) assert.Error(t, vererr) @@ -144,8 +161,8 @@ func TestSwarm_authAuthor(t *testing.T) { func TestSwarm_SignAuth(t *testing.T) { n, _ := node.GenerateTestNode(t) pm := &pb.ProtocolMessage{ - Metadata: message.NewProtocolMessageMetadata(n.PublicKey(), exampleProtocol, false), - Data: &pb.ProtocolMessage_Payload{Payload: []byte(examplePayload)}, + Metadata: message.NewProtocolMessageMetadata(n.PublicKey(), exampleProtocol), + Data: &pb.ProtocolMessage_Payload{[]byte(examplePayload)}, } err := message.SignMessage(n.PrivateKey(), pm) @@ -296,57 +313,22 @@ func TestSwarm_onRemoteClientMessage(t *testing.T) { id, err := node.NewNodeIdentity(cfg, "0.0.0.0:0000", false) assert.NoError(t, err, "we cant make node ?") - p := p2pTestInstance(t, config.DefaultConfig()) - nmock := &net.ConnectionMock{} + p := p2pTestNoStart(t, cfg) + nmock := new(net.ConnectionMock) nmock.SetRemotePublicKey(id.PublicKey()) // Test bad format - - msg := []byte("badbadformat") - imc := net.IncomingMessageEvent{nmock, msg} + imc := net.IncomingMessageEvent{nmock, nil} err = p.onRemoteClientMessage(imc) assert.Equal(t, err, ErrBadFormat1) - // Test out of sync - - realmsg := &pb.CommonMessageData{ - SessionId: []byte("test"), - Payload: []byte("test"), - Timestamp: time.Now().Add(timesync.MaxAllowedMessageDrift + time.Minute).Unix(), - } - bin, _ := proto.Marshal(realmsg) - - imc.Message = bin - - err = p.onRemoteClientMessage(imc) - assert.Equal(t, err, ErrOutOfSync) - - // Test no payload - - cmd := &pb.CommonMessageData{ - SessionId: []byte("test"), - Payload: []byte(""), - Timestamp: time.Now().Unix(), - } - - bin, _ = proto.Marshal(cmd) - imc.Message = bin - err = p.onRemoteClientMessage(imc) - - assert.Equal(t, err, ErrNoPayload) - // Test No Session - - cmd.Payload = []byte("test") - - bin, _ = proto.Marshal(cmd) - imc.Message = bin + imc.Message = []byte("test") err = p.onRemoteClientMessage(imc) assert.Equal(t, err, ErrNoSession) - // Test bad session - + //Test bad session session := &net.SessionMock{} session.SetDecrypt(nil, errors.New("fail")) imc.Conn.SetSession(session) @@ -354,39 +336,45 @@ func TestSwarm_onRemoteClientMessage(t *testing.T) { err = p.onRemoteClientMessage(imc) assert.Equal(t, err, ErrFailDecrypt) - // Test bad format again - + //// Test bad format again session.SetDecrypt([]byte("wont_format_fo_protocol_message"), nil) err = p.onRemoteClientMessage(imc) assert.Equal(t, err, ErrBadFormat2) // Test bad auth sign - goodmsg := &pb.ProtocolMessage{ - Metadata: message.NewProtocolMessageMetadata(id.PublicKey(), exampleProtocol, false), // not signed - Data: &pb.ProtocolMessage_Payload{Payload: []byte(examplePayload)}, + Metadata: message.NewProtocolMessageMetadata(id.PublicKey(), exampleProtocol), // not signed + Data: &pb.ProtocolMessage_Payload{[]byte(examplePayload)}, } goodbin, _ := proto.Marshal(goodmsg) - cmd.Payload = goodbin - bin, _ = proto.Marshal(cmd) - imc.Message = bin + imc.Message = goodbin session.SetDecrypt(goodbin, nil) err = p.onRemoteClientMessage(imc) - assert.Equal(t, err, ErrAuthAuthor) + assert.Equal(t, ErrAuthAuthor, err) + + goodmsg.Metadata.Timestamp = time.Now().Add(-time.Hour).Unix() + err = message.SignMessage(id.PrivateKey(), goodmsg) + assert.NoError(t, err) + nosynced, _ := proto.Marshal(goodmsg) + session.SetDecrypt(nosynced, nil) + // Test out of sync + imc.Message = nosynced - // Test no server + err = p.onRemoteClientMessage(imc) + assert.Equal(t, ErrOutOfSync, err) + // Test no protocol + goodmsg.Metadata.Timestamp = time.Now().Unix() + goodmsg.Metadata.MsgSign = nil err = message.SignMessage(id.PrivateKey(), goodmsg) assert.NoError(t, err, err) goodbin, _ = proto.Marshal(goodmsg) - cmd.Payload = goodbin - bin, _ = proto.Marshal(cmd) - imc.Message = bin + imc.Message = goodbin session.SetDecrypt(goodbin, nil) err = p.onRemoteClientMessage(imc) @@ -394,81 +382,327 @@ func TestSwarm_onRemoteClientMessage(t *testing.T) { // Test no err + var wg sync.WaitGroup c := p.RegisterProtocol(exampleProtocol) - go func() { <-c }() + go func() { + ti := time.After(1 * time.Second) + select { + case <-c: + wg.Done() + break + case <-ti: + t.Error("Didn't get message in time") + } + }() + wg.Add(1) err = p.onRemoteClientMessage(imc) - assert.NoError(t, err) + wg.Wait() - // todo : test gossip codepaths. } -//TODO : Test this without real network -func TestBootstrap(t *testing.T) { - t.Skip() - bootnodes := []int{3} - nodes := []int{30} - rcon := []int{3} +func assertNewPeerEvent(t *testing.T, peer crypto.PublicKey, connChan <-chan crypto.PublicKey) { + select { + case newPeer := <-connChan: + assert.Equal(t, peer.String(), newPeer.String()) + default: + assert.Error(t, errors.New("no new peer event")) + } +} - bootcfg := config.DefaultConfig() - bootcfg.SwarmConfig.Bootstrap = false - bootcfg.SwarmConfig.Gossip = false +func assertNewPeerEvents(t *testing.T, expCount int, connChan <-chan crypto.PublicKey) { + //var actCount int + //loop: + //for { + // select { + // case _ = <-connChan: + // actCount++ + // default: + // break loop + // } + //} + assert.Equal(t, expCount, len(connChan)) +} +func assertNoNewPeerEvent(t *testing.T, eventChan <-chan crypto.PublicKey) { + select { + case newPeer := <-eventChan: + assert.Error(t, errors.New("unexpected new peer event, peer "+newPeer.String())) + default: + return + } +} - rand.Seed(time.Now().UnixNano()) +func assertNewDisconnectedPeerEvent(t *testing.T, peer crypto.PublicKey, discChan <-chan crypto.PublicKey) { + select { + case newPeer := <-discChan: + assert.Equal(t, peer.String(), newPeer.String()) + default: + assert.Error(t, errors.New("no new peer event")) + } +} - for i := 0; i < len(nodes); i++ { - t.Run(fmt.Sprintf("Peers:%v/randconn:%v", nodes[i], rcon[i]), func(t *testing.T) { - bufchan := make(chan *swarm, nodes[i]) +func assertNoNewDisconnectedPeerEvent(t *testing.T, eventChan <-chan crypto.PublicKey) { + select { + case newPeer := <-eventChan: + assert.Error(t, errors.New("unexpected new peer event, peer "+newPeer.String())) + default: + return + } +} - bnarr := []string{} +func drainPeerEvents(eventChan <-chan crypto.PublicKey) { +loop: + for { + select { + case <-eventChan: + continue loop + default: + break loop + } + } +} - for k := 0; k < bootnodes[i]; k++ { - bn := p2pTestInstance(t, bootcfg) - bn.lNode.Info("This is a bootnode - %v", bn.lNode.Node.String()) - bnarr = append(bnarr, node.StringFromNode(bn.lNode.Node)) - } +func Test_Swarm_getMorePeers(t *testing.T) { + // test normal flow + numpeers := 3 + cfg := config.DefaultConfig() + cfg.SwarmConfig.Bootstrap = false + cfg.SwarmConfig.Gossip = false + cfg.SwarmConfig.RandomConnections = numpeers + n := p2pTestNoStart(t, cfg) - cfg := config.DefaultConfig() - cfg.SwarmConfig.Bootstrap = true - cfg.SwarmConfig.Gossip = false - cfg.SwarmConfig.RandomConnections = rcon[i] - cfg.SwarmConfig.BootstrapNodes = bnarr - - var wg sync.WaitGroup - - for j := 0; j < nodes[i]; j++ { - wg.Add(1) - go func() { - sw := p2pTestInstance(t, cfg) - sw.waitForBoot() - bufchan <- sw - wg.Done() - }() - } + conn, _ := n.SubscribePeerEvents() - wg.Wait() - close(bufchan) - swarms := []*swarm{} - for s := range bufchan { - swarms = append(swarms, s) + res := n.getMorePeers(0) // this should'nt work + assert.Equal(t, res, 0) + assertNoNewPeerEvent(t, conn) - } + mdht := new(dht.MockDHT) + n.dht = mdht + // this will return 0 peers because SelectPeers returns empty array when not set - randnode := swarms[rand.Int31n(int32(len(swarms)-1))] - randnode2 := swarms[rand.Int31n(int32(len(swarms)-1))] + res = n.getMorePeers(10) + assert.Equal(t, res, 0) + assertNoNewPeerEvent(t, conn) - for (randnode == nil || randnode2 == nil) || randnode.lNode.String() == randnode2.lNode.String() { - randnode = swarms[rand.Int31n(int32(len(swarms)-1))] - randnode2 = swarms[rand.Int31n(int32(len(swarms)-1))] - } + testNode := node.GenerateRandomNodeData() + mdht.SelectPeersFunc = func(qty int) []node.Node { + return []node.Node{testNode} + } - randnode.RegisterProtocol(exampleProtocol) - recv := randnode2.RegisterProtocol(exampleProtocol) + cpm := new(cpoolMock) - sendDirectMessage(t, randnode, randnode2.lNode.PublicKey().String(), recv, true) - time.Sleep(1* time.Second) - }) + // test connection error + cpm.f = func(address string, pk crypto.PublicKey) (net.Connection, error) { + return nil, errors.New("can't make connection") } + + n.cPool = cpm + res = n.getMorePeers(1) // this should'nt work + assert.Equal(t, res, 0) + cpm.f = nil // for next tests + assertNoNewPeerEvent(t, conn) + + res = n.getMorePeers(1) + assert.Equal(t, 1, res) + assert.Equal(t, len(n.outpeers), 1) + assert.True(t, n.hasOutgoingPeer(testNode.PublicKey())) + assertNewPeerEvents(t, 1, conn) + assertNewPeerEvent(t, testNode.PublicKey(), conn) + + drainPeerEvents(conn) + + //todo remove the peer instead of counting plus one + // + // + mdht.SelectPeersFunc = func(qty int) []node.Node { + return node.GenerateRandomNodesData(qty) + } + + res = n.getMorePeers(numpeers) + assert.Equal(t, res, numpeers) + assert.Equal(t, len(n.outpeers), numpeers+1) // there's already one inside + assertNewPeerEvents(t, numpeers, conn) + drainPeerEvents(conn) // so they wont interrupt next test + //test inc peer + nd := node.GenerateRandomNodeData() + n.addIncomingPeer(nd.PublicKey()) + + assert.True(t, n.hasIncomingPeer(nd.PublicKey())) + assertNewPeerEvents(t, 1, conn) + assertNewPeerEvent(t, nd.PublicKey(), conn) + + //test replacing inc peer + // + mdht.SelectPeersFunc = func(count int) []node.Node { + some := node.GenerateRandomNodesData(count - 1) + some = append(some, nd) + return some + } + + res = n.getMorePeers(numpeers) + assert.Equal(t, res, numpeers-1) + assert.False(t, n.hasOutgoingPeer(nd.PublicKey())) + assert.True(t, n.hasIncomingPeer(nd.PublicKey())) } + +func TestNeighborhood_Initial(t *testing.T) { + cfg := config.DefaultConfig() + cfg.SwarmConfig.RandomConnections = 3 + cfg.SwarmConfig.Gossip = true + cfg.SwarmConfig.Bootstrap = false + + p := p2pTestNoStart(t, cfg) + mdht := new(dht.MockDHT) + mdht.SelectPeersFunc = func(qty int) []node.Node { + return node.GenerateRandomNodesData(qty) + } + + p.dht = mdht + + err := p.Start() + assert.NoError(t, err) + ti := time.After(time.Millisecond) + select { + case <-p.initial: + t.Error("Start succeded") + case <-ti: + break + } + + p.Shutdown() + + p = p2pTestNoStart(t, cfg) + p.dht = mdht + cpm := new(cpoolMock) + cpm.f = func(address string, pk crypto.PublicKey) (net.Connection, error) { + return net.NewConnectionMock(pk), nil + } + p.cPool = cpm + + err = p.Start() + assert.NoError(t, err) + ti = time.After(time.Second * 1) + select { + case <-p.initial: + break + case <-ti: + t.Error("Start succeded") + } +} + +func TestNeighborhood_Disconnect(t *testing.T) { + n := p2pTestNoStart(t, config.DefaultConfig()) + _, disc := n.SubscribePeerEvents() + rnd := node.GenerateRandomNodeData() + n.addIncomingPeer(rnd.PublicKey()) + + assert.True(t, n.hasIncomingPeer(rnd.PublicKey())) + n.Disconnect(rnd.PublicKey()) + assertNewDisconnectedPeerEvent(t, rnd.PublicKey(), disc) + ti := time.After(time.Millisecond) + select { + case <-n.morePeersReq: + t.Error("got more peers on inbound") + case <-ti: + break + } + assert.False(t, n.hasIncomingPeer(rnd.PublicKey())) + + // manualy add an incoming peer + rnd2 := node.GenerateRandomNodeData() + n.outpeers[rnd2.PublicKey().String()] = rnd2.PublicKey() // no need to lock nothing's happening + go n.Disconnect(rnd2.PublicKey()) + ti = time.After(time.Millisecond) + select { + case <-n.morePeersReq: + break + case <-ti: + t.Error("didnt get morepeers") + } + assertNewDisconnectedPeerEvent(t, rnd2.PublicKey(), disc) +} + +func TestSwarm_AddIncomingPeer(t *testing.T) { + p := p2pTestInstance(t, config.DefaultConfig()) + rnd := node.GenerateRandomNodeData() + p.addIncomingPeer(rnd.PublicKey()) + + p.inpeersMutex.RLock() + peer, ok := p.inpeers[rnd.PublicKey().String()] + p.inpeersMutex.RUnlock() + + assert.True(t, ok) + assert.NotNil(t, peer) +} + +// +////TODO : Test this without real network +//func TestBootstrap(t *testing.T) { +// bootnodes := []int{3} +// nodes := []int{30} +// rcon := []int{3} +// +// bootcfg := config.DefaultConfig() +// bootcfg.SwarmConfig.Bootstrap = false +// bootcfg.SwarmConfig.Gossip = false +// +// +// rand.Seed(time.Now().UnixNano()) +// +// for i := 0; i < len(nodes); i++ { +// t.Run(fmt.Sprintf("Peers:%v/randconn:%v", nodes[i], rcon[i]), func(t *testing.T) { +// bufchan := make(chan *swarm, nodes[i]) +// +// bnarr := []string{} +// +// for k := 0; k < bootnodes[i]; k++ { +// bn := p2pTestInstance(t, bootcfg) +// bn.lNode.Info("This is a bootnode - %v", bn.lNode.Node.String()) +// bnarr = append(bnarr, node.StringFromNode(bn.lNode.Node)) +// } +// +// cfg := config.DefaultConfig() +// cfg.SwarmConfig.Bootstrap = true +// cfg.SwarmConfig.Gossip = false +// cfg.SwarmConfig.RandomConnections = rcon[i] +// cfg.SwarmConfig.BootstrapNodes = bnarr +// +// var wg sync.WaitGroup +// +// for j := 0; j < nodes[i]; j++ { +// wg.Add(1) +// go func() { +// sw := p2pTestInstance(t, cfg) +// sw.waitForBoot() +// bufchan <- sw +// wg.Done() +// }() +// } +// +// wg.Wait() +// close(bufchan) +// swarms := []*swarm{} +// for s := range bufchan { +// swarms = append(swarms, s) +// +// } +// +// randnode := swarms[rand.Int31n(int32(len(swarms)-1))] +// randnode2 := swarms[rand.Int31n(int32(len(swarms)-1))] +// +// for (randnode == nil || randnode2 == nil) || randnode.lNode.String() == randnode2.lNode.String() { +// randnode = swarms[rand.Int31n(int32(len(swarms)-1))] +// randnode2 = swarms[rand.Int31n(int32(len(swarms)-1))] +// } +// +// randnode.RegisterProtocol(exampleProtocol) +// recv := randnode2.RegisterProtocol(exampleProtocol) +// +// sendDirectMessage(t, randnode, randnode2.lNode.publicKey().String(), recv, true) +// time.Sleep(1* time.Second) +// }) +// } +//}