Skip to content

Commit

Permalink
Update subnet, peer, and message, refactor
Browse files Browse the repository at this point in the history
Note that I moved extractPeerInfo() down to near the bottom of peer.go (because it's not exported)
  • Loading branch information
bfbachmann committed May 19, 2017
1 parent 68b005d commit fdfde21
Show file tree
Hide file tree
Showing 7 changed files with 117 additions and 57 deletions.
2 changes: 1 addition & 1 deletion glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion glide.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@ import:
subpackages:
- p2p/host/basic
- package: github.com/multiformats/go-multiaddr
- package: github.com/Sirupsen/logrus
- package: github.com/sirupsen/logrus
5 changes: 4 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"flag"
"io/ioutil"

log "github.com/Sirupsen/logrus"
log "github.com/sirupsen/logrus"
"github.com/ubclaunchpad/cumulus/peer"
)

Expand Down Expand Up @@ -60,5 +60,8 @@ func main() {

log.Debugf("Peer %s read reply: %s", host.ID(), string(reply))

log.Debug("Hanging...")
select {}

host.Close()
}
10 changes: 9 additions & 1 deletion message/message.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package message

import "errors"
import (
"errors"
"fmt"
)

// Message types
// NOTE: because of the way iota works, changing the order in which the
Expand Down Expand Up @@ -44,3 +47,8 @@ func New(c []byte, t int) (*Message, error) {
m := &Message{msgType: t, content: c}
return m, nil
}

// Bytes returns the given message in []byte format
func (m *Message) Bytes() []byte {
return []byte(fmt.Sprintf("%d:%s", m.msgType, string(m.content)))
}
132 changes: 82 additions & 50 deletions peer/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"context"
"fmt"

log "github.com/Sirupsen/logrus"
crypto "github.com/libp2p/go-libp2p-crypto"
host "github.com/libp2p/go-libp2p-host"
net "github.com/libp2p/go-libp2p-net"
Expand All @@ -15,6 +14,7 @@ import (
bhost "github.com/libp2p/go-libp2p/p2p/host/basic"
ma "github.com/multiformats/go-multiaddr"
log "github.com/sirupsen/logrus"
msg "github.com/ubclaunchpad/cumulus/message"
sn "github.com/ubclaunchpad/cumulus/subnet"
)

Expand Down Expand Up @@ -93,28 +93,29 @@ func New(ip string, port int) (*Peer, error) {

// Receive is the function that gets called when a remote peer
// opens a new stream with this peer.
// This should be passed as the second argument to SetStreamHandler().
// We may want to implement another type of StreamHandler in the future.
// This should be passed as the second argument to SetStreamHandler() after this
// peer is initialized.
func (p *Peer) Receive(s net.Stream) {
log.Debug("Setting basic stream handler.")
defer s.Close()

// Get remote peer's full multiaddress
remoteMA, err := makeMultiaddr(
s.Conn().RemoteMultiaddr(), s.Conn().RemotePeer())
if err != nil {
log.Fatal("Failed to obtain valid remote peer multiaddress")
}

// Add the remote peer to this peer's subnet
// TODO: discuss this behaviour (for now refuse connections if subnet full)
remoteMA := s.Conn().RemoteMultiaddr()
err := p.subnet.AddPeer(remoteMA, s)
err = p.subnet.AddPeer(remoteMA, s)
if err != nil {
log.Warnln(err)
// Subnet is full, advertise other available peers and then close
// the stream
log.Debug("Peer subnet full. Advertising peers...")
p.advertisePeers(s)
return
}
defer p.subnet.RemovePeer(remoteMA)

p.doCumulus(s)
}

// Communicate with peers.
// TODO: Update this to do something useful. For now it just reads from the
// stream and writes back what it read.
func (p *Peer) doCumulus(s net.Stream) {
buf := bufio.NewReader(s)
str, err := buf.ReadString('\n')
if err != nil {
Expand All @@ -130,40 +131,6 @@ func (p *Peer) doCumulus(s net.Stream) {
}
}

// ExtractPeerInfo extracts the peer ID and multiaddress from the
// given multiaddress.
// Returns peer ID (esentially 46 character hash created by the peer)
// and the peer's multiaddress in the form /ip4/<peer IP>/tcp/<CumulusPort>.
func extractPeerInfo(peerma string) (lpeer.ID, ma.Multiaddr, error) {
log.Debug("Extracting peer info from ", peerma)

ipfsaddr, err := ma.NewMultiaddr(peerma)
if err != nil {
return "-", nil, err
}

// Cannot throw error when passed P_IPFS
pid, err := ipfsaddr.ValueForProtocol(ma.P_IPFS)
if err != nil {
return "-", nil, err
}

// Cannot return error if no error was returned in ValueForProtocol
peerid, _ := lpeer.IDB58Decode(pid)

// Decapsulate the /ipfs/<peerID> part from the target
// /ip4/<a.b.c.d>/ipfs/<peer> becomes /ip4/<a.b.c.d>
targetPeerAddr, err := ma.NewMultiaddr(
fmt.Sprintf("/ipfs/%s", lpeer.IDB58Encode(peerid)))
if err != nil {
return "-", nil, err
}

trgtAddr := ipfsaddr.Decapsulate(targetPeerAddr)

return peerid, trgtAddr, nil
}

// Connect adds the given multiaddress to p's Peerstore and opens a stream
// with the peer at that multiaddress if the multiaddress is valid, otherwise
// returns error. On success the stream and corresponding multiaddress are
Expand All @@ -178,7 +145,7 @@ func (p *Peer) Connect(peerma string) (net.Stream, error) {
p.Peerstore().AddAddr(peerid, targetAddr, pstore.PermanentAddrTTL)

log.Debug("Connected to Cumulus Peer:")
log.Debug("Peer ID:", peerid.Pretty())
log.Debugf("Peer ID: %s", peerid.Pretty())
log.Debug("Peer Address:", targetAddr)

// Open a stream with the peer
Expand All @@ -202,3 +169,68 @@ func (p *Peer) Connect(peerma string) (net.Stream, error) {

return stream, err
}

// ExtractPeerInfo extracts the peer ID and multiaddress from the
// given multiaddress.
// Returns peer ID (esentially 46 character hash created by the peer)
// and the peer's multiaddress in the form /ip4/<peer IP>/tcp/<CumulusPort>.
func extractPeerInfo(peerma string) (lpeer.ID, ma.Multiaddr, error) {
log.Debug("Extracting peer info from ", peerma)

ipfsaddr, err := ma.NewMultiaddr(peerma)
if err != nil {
return "-", nil, err
}

// Cannot throw error when passed P_IPFS
pid, err := ipfsaddr.ValueForProtocol(ma.P_IPFS)
if err != nil {
return "-", nil, err
}

// Cannot return error if no error was returned in ValueForProtocol
peerid, _ := lpeer.IDB58Decode(pid)

// Decapsulate the /ipfs/<peerID> part from the target
// /ip4/<a.b.c.d>/ipfs/<peer> becomes /ip4/<a.b.c.d>
targetPeerAddr, err := ma.NewMultiaddr(
fmt.Sprintf("/ipfs/%s", lpeer.IDB58Encode(peerid)))
if err != nil {
return "-", nil, err
}

trgtAddr := ipfsaddr.Decapsulate(targetPeerAddr)

return peerid, trgtAddr, nil
}

// advertisePeers writes messages into the given stream advertising the
// multiaddress of each peer in this peer's subnet.
func (p *Peer) advertisePeers(s net.Stream) {
mAddrs := p.subnet.Multiaddrs()
log.Debug("Peers on this subnet: ")
for mAddr := range mAddrs {
mAddrString := string(mAddr)
log.Debug("\t", mAddrString)
message, msgErr := msg.New([]byte(mAddrString), msg.PeerInfo)
if msgErr != nil {
log.Error("Failed to create message")
return
}
_, msgErr = s.Write(message.Bytes())
if msgErr != nil {
log.Errorf("Failed to send message to %s", string(mAddr))
}
}
}

// makeMultiaddr creates a Multiaddress from the given Multiaddress (of the form
// /ip4/<ip address>/tcp/<TCP port>) and the peer id (a hash) and turn them
// into one Multiaddress. Will return error if Multiaddress is invalid.
func makeMultiaddr(iAddr ma.Multiaddr, pid lpeer.ID) (ma.Multiaddr, error) {
strAddr := iAddr.String()
strID := pid.Pretty()
strMA := fmt.Sprintf("%s/ipfs/%s", strAddr, strID)
mAddr, err := ma.NewMultiaddr(strMA)
return mAddr, err
}
2 changes: 1 addition & 1 deletion peer/peer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"fmt"
"testing"

log "github.com/Sirupsen/logrus"
log "github.com/sirupsen/logrus"
)

func TestMain(t *testing.T) {
Expand Down
21 changes: 19 additions & 2 deletions subnet/subnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

net "github.com/libp2p/go-libp2p-net"
ma "github.com/multiformats/go-multiaddr"
log "github.com/sirupsen/logrus"
msg "github.com/ubclaunchpad/cumulus/message"
)

Expand Down Expand Up @@ -42,14 +43,21 @@ func (sn *Subnet) AddPeer(mAddr ma.Multiaddr, stream net.Stream) error {
return err
}

sn.peers[mAddr] = stream
sn.numPeers++
// Check if it's already in this subnet
if sn.peers[mAddr] != nil {
log.Debugf("Peer %s is already in subnet", mAddr.String())
} else {
log.Debugf("Adding peer %s to subnet", mAddr.String())
sn.peers[mAddr] = stream
sn.numPeers++
}
return nil
}

// RemovePeer removes the mapping with the key mAddr from the subnet if it
// exists.
func (sn *Subnet) RemovePeer(mAddr ma.Multiaddr) {
log.Debugf("Removing peer %s from subnet", mAddr.String())
delete(sn.peers, mAddr)
sn.numPeers--
}
Expand All @@ -60,6 +68,15 @@ func (sn *Subnet) isFull() bool {
return sn.numPeers >= sn.maxPeers
}

// Multiaddrs returns a list of all multiaddresses contined in this subnet
func (sn *Subnet) Multiaddrs() []ma.Multiaddr {
mAddrs := make([]ma.Multiaddr, 0, len(sn.peers))
for mAddr := range sn.peers {
mAddrs = append(mAddrs, mAddr)
}
return mAddrs
}

// Broadcast sends information to all peers we are currently connected to
func (sn *Subnet) Broadcast(m msg.Message) error {
return errors.New("Function not implemented")
Expand Down

0 comments on commit fdfde21

Please sign in to comment.