Skip to content

Commit

Permalink
Refactor peer's stream
Browse files Browse the repository at this point in the history
  • Loading branch information
bfbachmann committed May 28, 2017
1 parent 5a831c7 commit 0d890ff
Show file tree
Hide file tree
Showing 6 changed files with 95 additions and 110 deletions.
42 changes: 0 additions & 42 deletions errors/error.go

This file was deleted.

2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func main() {
ID: uuid.New().String(),
ResourceType: message.ResourcePeerInfo,
}
_, err = host.Request(request, stream)
_, err = host.Request(request, *stream)
if err != nil {
log.WithError(err).Error("Error writing message to stream")
return
Expand Down
92 changes: 36 additions & 56 deletions peer/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,19 @@ import (
"encoding/json"
"errors"
"fmt"
"sync"
"time"

log "github.com/Sirupsen/logrus"
"github.com/libp2p/go-libp2p-crypto"
"github.com/libp2p/go-libp2p-host"
"github.com/libp2p/go-libp2p-net"
net "github.com/libp2p/go-libp2p-net"
lpeer "github.com/libp2p/go-libp2p-peer"
"github.com/libp2p/go-libp2p-peerstore"
"github.com/libp2p/go-libp2p-swarm"
bhost "github.com/libp2p/go-libp2p/p2p/host/basic"
multiaddr "github.com/multiformats/go-multiaddr"
"github.com/ubclaunchpad/cumulus/message"
"github.com/ubclaunchpad/cumulus/stream"
"github.com/ubclaunchpad/cumulus/subnet"
)

Expand All @@ -36,9 +36,7 @@ const (
// Peer is a cumulus Peer composed of a host
type Peer struct {
host.Host
subnet sn.Subnet
listeners map[string]chan *message.Response
listenersLock *sync.RWMutex
subnet subnet.Subnet
}

// New creates a Cumulus host with the given IP addr and TCP port.
Expand Down Expand Up @@ -82,10 +80,8 @@ func New(ip string, port int) (*Peer, error) {
// Actually create the host and peer with the network we just set up.
host := bhost.New(network)
peer := &Peer{
Host: host,
subnet: subnet,
listeners: make(map[string]chan *message.Response),
listenersLock: &sync.RWMutex{},
Host: host,
subnet: sn,
}

// Build host multiaddress
Expand All @@ -108,35 +104,39 @@ func New(ip string, port int) (*Peer, error) {
// peer is initialized.
func (p *Peer) Receive(s net.Stream) {
// Get remote peer's full multiaddress
ma, err := NewMultiaddr(
s.Conn().RemoteMultiaddr(), s.Conn().RemotePeer())
ma, err := NewMultiaddr(s.Conn().RemoteMultiaddr(), s.Conn().RemotePeer())
if err != nil {
log.WithError(err).Error("Failed to obtain valid remote peer multiaddress")
return
}

peerstream := *stream.New(s)

// Add the remote peer to this peer's subnet
err = p.subnet.AddPeer(ma.String(), s)
err = p.subnet.AddPeer(ma.String(), peerstream)
if err != nil {
// Subnet is full, advertise other available peers and then close
// the stream
log.WithError(err).Debug("Peer subnet full. Advertising peers...")
msg := message.Push{ResourceType: message.ResourcePeerInfo, Resource: p.subnet.StringMultiaddrs}
msg := message.Push{
ResourceType: message.ResourcePeerInfo,
Resource: p.subnet.Multiaddrs(),
}
msgErr := msg.Write(s)
if msgErr != nil {
log.WithError(err).Error("Failed to send ResourcePeerInfo")
}
s.Close()
return
}
go p.Listen(ma.String(), s)
go p.Listen(ma.String(), peerstream)
}

// Connect adds the given multiaddress to p's Peerstore and opens a stream
// with the peer at that multiaddress if the multiaddress is valid, otherwise
// returns error. On success the stream and corresponding multiaddress are
// added to this peer's subnet.
func (p *Peer) Connect(sma string) (net.Stream, error) {
func (p *Peer) Connect(sma string) (*stream.Stream, error) {
pid, targetAddr, err := extractPeerInfo(sma)
if err != nil {
return nil, err
Expand All @@ -146,22 +146,24 @@ func (p *Peer) Connect(sma string) (net.Stream, error) {
p.Peerstore().AddAddr(pid, targetAddr, peerstore.PermanentAddrTTL)

// Open a stream with the peer
stream, err := p.NewStream(context.Background(), pid, CumulusProtocol)
s, err := p.NewStream(context.Background(), pid, CumulusProtocol)
if err != nil {
return nil, err
}
err = stream.SetDeadline(time.Now().Add(Timeout))
err = s.SetDeadline(time.Now().Add(Timeout))
if err != nil {
log.WithError(err).Error("Failed to set read deadline on stream")
}

err = p.subnet.AddPeer(sma, stream)
// Make a stream.Stream out of a net.Stream (sorry)
peerstream := *stream.New(s)
err = p.subnet.AddPeer(sma, peerstream)
if err != nil {
stream.Close()
s.Close()
}

go p.Listen(sma, stream)
return stream, nil
go p.Listen(sma, peerstream)
return &peerstream, nil
}

// Broadcast sends message to all peers this peer is currently connected to
Expand All @@ -179,10 +181,10 @@ func (p *Peer) Broadcast(m message.Message) error {
// Request sends a request to a remote peer over the given stream.
// Returns response if a response was received, otherwise returns error.
// You should typically run this function in a goroutine
func (p *Peer) Request(req message.Request, s net.Stream) (*message.Response, error) {
func (p *Peer) Request(req message.Request, s stream.Stream) (*message.Response, error) {
// Set up a listen channel to listen for the response (remove when done)
lchan := p.newListener(req.ID)
defer p.removeListener(req.ID)
lchan := s.NewListener(req.ID)
defer s.RemoveListener(req.ID)

// Send request
err := req.Write(s)
Expand All @@ -200,28 +202,28 @@ func (p *Peer) Request(req message.Request, s net.Stream) (*message.Response, er
}

// Respond responds to a request from another peer
func (p *Peer) Respond(req *message.Request, s net.Stream) {
func (p *Peer) Respond(req *message.Request, s stream.Stream) {
response := message.Response{ID: req.ID}

switch req.ResourceType {
case message.ResourcePeerInfo:
response.Resource = p.subnet.Multiaddrs()
break
case message.ResourceBlock:
// response.Error = protoerr.New(protoerr.NotImplemented)
break
case message.ResourceTransaction:
// response.Error = protoerr.New(protoerr.NotImplemented)
response.Error = message.NewError(message.NotImplemented,
"Functionality not implemented on this peer")
break
default:
// response.Error = protoerr.New(protoerr.InvalidResourceType)
response.Error = message.NewError(message.InvalidResourceType,
"Invalid resource type")
}

err := response.Write(s)
if err != nil {
log.WithError(err).Error("Failed to send reponse")
} else {
msgJSON, _ := json.Marshal(msg)
msgJSON, _ := json.Marshal(response)
log.Infof("Sending response: \n%s", string(msgJSON))
}
}
Expand All @@ -238,7 +240,7 @@ func NewMultiaddr(iAddr multiaddr.Multiaddr, pid lpeer.ID) (multiaddr.Multiaddr,
}

// HandleMessage responds to a received message
func (p *Peer) handleMessage(m message.Message, s net.Stream) {
func (p *Peer) handleMessage(m message.Message, s stream.Stream) {
msgJSON, err := json.Marshal(m)
if err == nil {
log.Infof("Received message: \n%s", string(msgJSON))
Expand All @@ -252,7 +254,7 @@ func (p *Peer) handleMessage(m message.Message, s net.Stream) {
case message.MessageResponse:
// Pass the response to the goroutine that requested it
res := m.(*message.Response)
lchan := p.getListener(res.ID)
lchan := s.Listener(res.ID)
if lchan != nil {
log.Debug("Found listener channel for response")
lchan <- res
Expand All @@ -271,7 +273,7 @@ func (p *Peer) handleMessage(m message.Message, s net.Stream) {
// Listen listens for messages over the stream and responds to them, closing
// the given stream and removing the remote peer from this peer's subnet when
// done. This should be run as a goroutine.
func (p *Peer) Listen(sma string, s net.Stream) {
func (p *Peer) Listen(sma string, s stream.Stream) {
defer s.Close()
defer p.subnet.RemovePeer(sma)
for {
Expand All @@ -286,7 +288,7 @@ func (p *Peer) Listen(sma string, s net.Stream) {
return
}
log.Debug("Listener received message")
go p.HandleMessage(msg, s)
go p.handleMessage(msg, s)
}
}

Expand Down Expand Up @@ -322,25 +324,3 @@ func extractPeerInfo(sma string) (lpeer.ID, multiaddr.Multiaddr, error) {
trgtAddr := ipfsaddr.Decapsulate(targetPeerAddr)
return peerid, trgtAddr, nil
}

// newListener synchronously adds a listener to this peer's listeners map
func (p *Peer) newListener(id string) chan *message.Response {
p.listenersLock.Lock()
p.listeners[id] = make(chan *message.Response)
p.listenersLock.Unlock()
return p.listeners[id]
}

// removeListener synchronously removes a listener from this peer's listeners map
func (p *Peer) removeListener(id string) {
p.listenersLock.Lock()
delete(p.listeners, id)
p.listenersLock.Unlock()
}

func (p *Peer) getListener(id string) chan *message.Response {
p.listenersLock.RLock()
lchan := p.listeners[id]
p.listenersLock.RUnlock()
return lchan
}
6 changes: 2 additions & 4 deletions peer/peer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (

log "github.com/Sirupsen/logrus"
"github.com/google/uuid"
protoerr "github.com/ubclaunchpad/cumulus/errors"
"github.com/ubclaunchpad/cumulus/message"
sn "github.com/ubclaunchpad/cumulus/subnet"
)
Expand Down Expand Up @@ -202,12 +201,11 @@ func TestRequest(t *testing.T) {
ResourceType: message.ResourcePeerInfo,
Params: nil,
}
response, err := requester.Request(request, stream)
emptyErr := protoerr.ProtocolError{}
response, err := requester.Request(request, *stream)
if err != nil {
fmt.Printf("Failed to make request: %s", err)
t.FailNow()
} else if response.Error != emptyErr {
} else if response.Error != nil {
fmt.Printf("Remote peer returned response %s", response.Error)
t.FailNow()
}
Expand Down
49 changes: 49 additions & 0 deletions stream/stream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package stream

import (
"sync"

"github.com/libp2p/go-libp2p-net"
"github.com/ubclaunchpad/cumulus/message"
)

// Stream is a synchronized container for net.Stream and a mapping of listener
// ids to response channels
type Stream struct {
net.Stream
listeners map[string]chan *message.Response
lock *sync.RWMutex
}

// New returns a new Stream containing the given net.Stream
func New(s net.Stream) *Stream {
return &Stream{
s,
make(map[string]chan *message.Response),
&sync.RWMutex{},
}
}

// NewListener synchronously adds a listener to this peer's listeners map
func (s *Stream) NewListener(id string) chan *message.Response {
s.lock.Lock()
s.listeners[id] = make(chan *message.Response)
s.lock.Unlock()
return s.listeners[id]
}

// RemoveListener synchronously removes a listener from this peer's listeners map
func (s *Stream) RemoveListener(id string) {
s.lock.Lock()
delete(s.listeners, id)
s.lock.Unlock()
}

// Listener synchronously retrieves the channel the listener with the given
// request/response id is waiting on
func (s *Stream) Listener(id string) chan *message.Response {
s.lock.RLock()
lchan := s.listeners[id]
s.lock.RUnlock()
return lchan
}
Loading

0 comments on commit 0d890ff

Please sign in to comment.