Skip to content

Commit

Permalink
Move exchangeListenAddrs to peerstore.go
Browse files Browse the repository at this point in the history
  • Loading branch information
bfbachmann committed Jul 17, 2017
1 parent 2d6715b commit c2049e4
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 74 deletions.
73 changes: 0 additions & 73 deletions peer/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package peer

import (
"errors"
"fmt"
"io"
"net"
"strconv"
Expand Down Expand Up @@ -225,78 +224,6 @@ func (p *Peer) getResponseHandler(id string) ResponseHandler {
return p.responseHandlers[id]
}

// Performs a handshake over the given connection allowing us to send our
// listen address to the remote peer and to receive its litsten address.
// On success returns remote peer listen address, on failure returns error. If
// the given duration passes and we havn't received a listen address we return
// an error.
func exchangeListenAddrs(c net.Conn, d time.Duration, listenAddr string) (string, error) {
addrChan := make(chan string)
errChan := make(chan error)

req := msg.Request{
ID: uuid.New().String(),
ResourceType: msg.ResourcePeerInfo,
}
err := req.Write(c)
if err != nil {
return "", err
}

// Wait for peer to request our listen address and send us its listen address.
go func() {
receivedAddr := false
sentAddr := false
var addr string

for !receivedAddr || !sentAddr {
message, err := msg.Read(c)
if err == io.EOF {
continue
} else if err != nil {
errChan <- err
}

switch message.(type) {
case *msg.Response:
// We got the listen address back
addr = message.(*msg.Response).Resource.(string)
if validAddress(addr) || addr != listenAddr {
receivedAddr = true
}
case *msg.Request:
if message.(*msg.Request).ResourceType != msg.ResourcePeerInfo {
continue
}
// We got a listen address request.
// Send the remote peer our listen address
res := msg.Response{
ID: uuid.New().String(),
Resource: listenAddr,
}
err = res.Write(c)
if err != nil {
errChan <- err
}
sentAddr = true
default:
}
}

addrChan <- addr
}()

select {
case addr := <-addrChan:
return addr, nil
case err := <-errChan:
return "", err
case <-time.After(d):
return "", fmt.Errorf("Failed to exchange listen addresses with %s",
c.RemoteAddr().String())
}
}

// validAddress checks if the given TCP/IP address is valid
func validAddress(addr string) bool {
parts := strings.Split(addr, ":")
Expand Down
78 changes: 77 additions & 1 deletion peer/peerstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@ package peer

import (
"encoding/json"
"fmt"
"io"
"net"
"sync"
"time"

log "github.com/Sirupsen/logrus"
"github.com/google/uuid"
"github.com/ubclaunchpad/cumulus/msg"
)

Expand Down Expand Up @@ -33,7 +37,7 @@ func NewPeerStore(la string) *PeerStore {
// retrieving messages over the new connection and sending them to App.
func (ps *PeerStore) ConnectionHandler(c net.Conn) {
// Before we can continue we must exchange listen addresses
addr, err := exchangeListenAddrs(c, PeerSearchWaitTime, ps.ListenAddr)
addr, err := ps.exchangeListenAddrs(c, PeerSearchWaitTime)
if err != nil {
log.WithError(err).Error("Failed to retrieve peer listen address")
return
Expand All @@ -56,6 +60,78 @@ func (ps *PeerStore) ConnectionHandler(c net.Conn) {
log.Infof("Connected to %s", p.ListenAddr)
}

// Performs a handshake over the given connection allowing us to send our
// listen address to the remote peer and to receive its litsten address.
// On success returns remote peer listen address, on failure returns error. If
// the given duration passes and we havn't received a listen address we return
// an error.
func (ps *PeerStore) exchangeListenAddrs(c net.Conn, d time.Duration) (string, error) {
addrChan := make(chan string)
errChan := make(chan error)

req := msg.Request{
ID: uuid.New().String(),
ResourceType: msg.ResourcePeerInfo,
}
err := req.Write(c)
if err != nil {
return "", err
}

// Wait for peer to request our listen address and send us its listen address.
go func() {
receivedAddr := false
sentAddr := false
var addr string

for !receivedAddr || !sentAddr {
message, err := msg.Read(c)
if err == io.EOF {
continue
} else if err != nil {
errChan <- err
}

switch message.(type) {
case *msg.Response:
// We got the listen address back
addr = message.(*msg.Response).Resource.(string)
if validAddress(addr) || addr != ps.ListenAddr {
receivedAddr = true
}
case *msg.Request:
if message.(*msg.Request).ResourceType != msg.ResourcePeerInfo {
continue
}
// We got a listen address request.
// Send the remote peer our listen address
res := &msg.Response{
ID: uuid.New().String(),
Resource: ps.ListenAddr,
}
err = res.Write(c)
if err != nil {
errChan <- err
}
sentAddr = true
default:
}
}

addrChan <- addr
}()

select {
case addr := <-addrChan:
return addr, nil
case err := <-errChan:
return "", err
case <-time.After(d):
return "", fmt.Errorf("Failed to exchange listen addresses with %s",
c.RemoteAddr().String())
}
}

// Add synchronously adds the given peer to the peerstore
func (ps *PeerStore) Add(p *Peer) {
ps.lock.Lock()
Expand Down

0 comments on commit c2049e4

Please sign in to comment.