Skip to content

Commit

Permalink
Add message parsing and refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
bfbachmann committed May 20, 2017
1 parent fdfde21 commit 156dd85
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 27 deletions.
3 changes: 0 additions & 3 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,5 @@ func main() {

log.Debugf("Peer %s read reply: %s", host.ID(), string(reply))

log.Debug("Hanging...")
select {}

host.Close()
}
27 changes: 23 additions & 4 deletions message/message.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package message

import (
"encoding/json"
"errors"
"fmt"
"strings"
)

// Message types
Expand All @@ -12,6 +13,8 @@ import (
const (
// Send the multiaddress of a peer to another peer
PeerInfo = iota
// Request addressess of peers in the remote peer's subnet
RequestPeerInfo = iota
// Send information about a block that was just hashed
NewBlock = iota
// Request chunk of the blockchain from peer
Expand Down Expand Up @@ -48,7 +51,23 @@ func New(c []byte, t int) (*Message, error) {
return m, nil
}

// Bytes returns the given message in []byte format
func (m *Message) Bytes() []byte {
return []byte(fmt.Sprintf("%d:%s", m.msgType, string(m.content)))
// Bytes returns JSON representation of message as a byte array, or error if
// message cannot be marshalled.
func (m *Message) Bytes() ([]byte, error) {
return json.Marshal(m)
}

// FromString parses a message in the form of a string and returns a pointer
// to a new Message struct made from the contents of the string. Returns error
// if string is malformed.
func FromString(s string) (*Message, error) {
var msg Message
s = strings.TrimSpace(s)
err := json.Unmarshal([]byte(s), &msg)
return &msg, err
}

// Type returns msgType for message
func (m *Message) Type() int {
return m.msgType
}
42 changes: 34 additions & 8 deletions peer/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package peer
import (
"bufio"
"context"
"errors"
"fmt"

crypto "github.com/libp2p/go-libp2p-crypto"
Expand Down Expand Up @@ -117,18 +118,23 @@ func (p *Peer) Receive(s net.Stream) {
defer p.subnet.RemovePeer(remoteMA)

buf := bufio.NewReader(s)
str, err := buf.ReadString('\n')
strMsg, err := buf.ReadString('\n') // TODO: set timeout here
if err != nil {
log.Error(err)
return
}

log.Debugf("Peer %s read: %s", p.ID(), str)

_, err = s.Write([]byte(str))
// Turn the string into a message we can deal with
message, err := msg.FromString(strMsg)
if err != nil {
log.Error(err)
return
}

log.Debugf("Peer %s message:\n%s", p.ID(), strMsg)

// Respond to message
p.handleMessage(*message, s)
}

// Connect adds the given multiaddress to p's Peerstore and opens a stream
Expand Down Expand Up @@ -170,6 +176,11 @@ func (p *Peer) Connect(peerma string) (net.Stream, error) {
return stream, err
}

// Broadcast sends message to all peers this peer is currently connected to
func (p *Peer) Broadcast(m msg.Message) error {
return errors.New("Function not implemented")
}

// ExtractPeerInfo extracts the peer ID and multiaddress from the
// given multiaddress.
// Returns peer ID (esentially 46 character hash created by the peer)
Expand Down Expand Up @@ -212,13 +223,18 @@ func (p *Peer) advertisePeers(s net.Stream) {
for mAddr := range mAddrs {
mAddrString := string(mAddr)
log.Debug("\t", mAddrString)
message, msgErr := msg.New([]byte(mAddrString), msg.PeerInfo)
if msgErr != nil {
message, err := msg.New([]byte(mAddrString), msg.PeerInfo)
if err != nil {
log.Error("Failed to create message")
return
}
_, msgErr = s.Write(message.Bytes())
if msgErr != nil {
msgBytes, err := message.Bytes()
if err != nil {
log.Error("Failed to marshal message")
return
}
_, err = s.Write(msgBytes)
if err != nil {
log.Errorf("Failed to send message to %s", string(mAddr))
}
}
Expand All @@ -234,3 +250,13 @@ func makeMultiaddr(iAddr ma.Multiaddr, pid lpeer.ID) (ma.Multiaddr, error) {
mAddr, err := ma.NewMultiaddr(strMA)
return mAddr, err
}

func (p *Peer) handleMessage(m msg.Message, s net.Stream) {
switch m.Type() {
case msg.RequestPeerInfo:
p.advertisePeers(s)
break
default:
// Do nothing. WHEOOO!
}
}
12 changes: 0 additions & 12 deletions subnet/subnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
net "github.com/libp2p/go-libp2p-net"
ma "github.com/multiformats/go-multiaddr"
log "github.com/sirupsen/logrus"
msg "github.com/ubclaunchpad/cumulus/message"
)

const (
Expand Down Expand Up @@ -76,14 +75,3 @@ func (sn *Subnet) Multiaddrs() []ma.Multiaddr {
}
return mAddrs
}

// Broadcast sends information to all peers we are currently connected to
func (sn *Subnet) Broadcast(m msg.Message) error {
return errors.New("Function not implemented")
}

// Listen listens to all peers we are currently connected to
// Call appropriate routines in response to new messages
func (sn *Subnet) Listen() {
panic("Function not implemented")
}

0 comments on commit 156dd85

Please sign in to comment.