From c2049e42422e4cd0542d9f2190154a175149a91b Mon Sep 17 00:00:00 2001 From: Bruno Bachmann Date: Sun, 16 Jul 2017 18:09:39 -0700 Subject: [PATCH] Move exchangeListenAddrs to peerstore.go --- peer/peer.go | 73 -------------------------------------------- peer/peerstore.go | 78 ++++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 77 insertions(+), 74 deletions(-) diff --git a/peer/peer.go b/peer/peer.go index ff798fb..835a732 100644 --- a/peer/peer.go +++ b/peer/peer.go @@ -2,7 +2,6 @@ package peer import ( "errors" - "fmt" "io" "net" "strconv" @@ -225,78 +224,6 @@ func (p *Peer) getResponseHandler(id string) ResponseHandler { return p.responseHandlers[id] } -// Performs a handshake over the given connection allowing us to send our -// listen address to the remote peer and to receive its litsten address. -// On success returns remote peer listen address, on failure returns error. If -// the given duration passes and we havn't received a listen address we return -// an error. -func exchangeListenAddrs(c net.Conn, d time.Duration, listenAddr string) (string, error) { - addrChan := make(chan string) - errChan := make(chan error) - - req := msg.Request{ - ID: uuid.New().String(), - ResourceType: msg.ResourcePeerInfo, - } - err := req.Write(c) - if err != nil { - return "", err - } - - // Wait for peer to request our listen address and send us its listen address. - go func() { - receivedAddr := false - sentAddr := false - var addr string - - for !receivedAddr || !sentAddr { - message, err := msg.Read(c) - if err == io.EOF { - continue - } else if err != nil { - errChan <- err - } - - switch message.(type) { - case *msg.Response: - // We got the listen address back - addr = message.(*msg.Response).Resource.(string) - if validAddress(addr) || addr != listenAddr { - receivedAddr = true - } - case *msg.Request: - if message.(*msg.Request).ResourceType != msg.ResourcePeerInfo { - continue - } - // We got a listen address request. - // Send the remote peer our listen address - res := msg.Response{ - ID: uuid.New().String(), - Resource: listenAddr, - } - err = res.Write(c) - if err != nil { - errChan <- err - } - sentAddr = true - default: - } - } - - addrChan <- addr - }() - - select { - case addr := <-addrChan: - return addr, nil - case err := <-errChan: - return "", err - case <-time.After(d): - return "", fmt.Errorf("Failed to exchange listen addresses with %s", - c.RemoteAddr().String()) - } -} - // validAddress checks if the given TCP/IP address is valid func validAddress(addr string) bool { parts := strings.Split(addr, ":") diff --git a/peer/peerstore.go b/peer/peerstore.go index a319d51..3ae4c5c 100644 --- a/peer/peerstore.go +++ b/peer/peerstore.go @@ -2,10 +2,14 @@ package peer import ( "encoding/json" + "fmt" + "io" "net" "sync" + "time" log "github.com/Sirupsen/logrus" + "github.com/google/uuid" "github.com/ubclaunchpad/cumulus/msg" ) @@ -33,7 +37,7 @@ func NewPeerStore(la string) *PeerStore { // retrieving messages over the new connection and sending them to App. func (ps *PeerStore) ConnectionHandler(c net.Conn) { // Before we can continue we must exchange listen addresses - addr, err := exchangeListenAddrs(c, PeerSearchWaitTime, ps.ListenAddr) + addr, err := ps.exchangeListenAddrs(c, PeerSearchWaitTime) if err != nil { log.WithError(err).Error("Failed to retrieve peer listen address") return @@ -56,6 +60,78 @@ func (ps *PeerStore) ConnectionHandler(c net.Conn) { log.Infof("Connected to %s", p.ListenAddr) } +// Performs a handshake over the given connection allowing us to send our +// listen address to the remote peer and to receive its litsten address. +// On success returns remote peer listen address, on failure returns error. If +// the given duration passes and we havn't received a listen address we return +// an error. +func (ps *PeerStore) exchangeListenAddrs(c net.Conn, d time.Duration) (string, error) { + addrChan := make(chan string) + errChan := make(chan error) + + req := msg.Request{ + ID: uuid.New().String(), + ResourceType: msg.ResourcePeerInfo, + } + err := req.Write(c) + if err != nil { + return "", err + } + + // Wait for peer to request our listen address and send us its listen address. + go func() { + receivedAddr := false + sentAddr := false + var addr string + + for !receivedAddr || !sentAddr { + message, err := msg.Read(c) + if err == io.EOF { + continue + } else if err != nil { + errChan <- err + } + + switch message.(type) { + case *msg.Response: + // We got the listen address back + addr = message.(*msg.Response).Resource.(string) + if validAddress(addr) || addr != ps.ListenAddr { + receivedAddr = true + } + case *msg.Request: + if message.(*msg.Request).ResourceType != msg.ResourcePeerInfo { + continue + } + // We got a listen address request. + // Send the remote peer our listen address + res := &msg.Response{ + ID: uuid.New().String(), + Resource: ps.ListenAddr, + } + err = res.Write(c) + if err != nil { + errChan <- err + } + sentAddr = true + default: + } + } + + addrChan <- addr + }() + + select { + case addr := <-addrChan: + return addr, nil + case err := <-errChan: + return "", err + case <-time.After(d): + return "", fmt.Errorf("Failed to exchange listen addresses with %s", + c.RemoteAddr().String()) + } +} + // Add synchronously adds the given peer to the peerstore func (ps *PeerStore) Add(p *Peer) { ps.lock.Lock()