Skip to content

Commit

Permalink
Merge 7e56a14 into 0c6862b
Browse files Browse the repository at this point in the history
  • Loading branch information
bfbachmann committed Jul 22, 2017
2 parents 0c6862b + 7e56a14 commit df845f2
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 75 deletions.
3 changes: 3 additions & 0 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@ func ConnectAndDiscover(target string) {
}
peer.ConnectionHandler(c)
p := peer.PStore.Get(c.RemoteAddr().String())
if p == nil {
log.Fatal("Failed to exchange listen addresses with target peer")
}
p.Request(peerInfoRequest, peer.PeerInfoHandler)
}

Expand Down
126 changes: 89 additions & 37 deletions msg/message.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package msg

import (
"encoding/gob"
"encoding/json"
"fmt"
"io"

"github.com/ubclaunchpad/cumulus/blockchain"
log "github.com/Sirupsen/logrus"
)

type (
Expand Down Expand Up @@ -32,24 +33,12 @@ const (
// NotImplemented occurs when a message or request is received whos response
// requires functionality that does not yet exist.
NotImplemented = 501
// SubnetFull occurs when a stream is opened with a peer whose Subnet is
// already full.
SubnetFull = 503
)

// Initializes all the types we need to encode.
func init() {
gob.Register(&Request{})
gob.Register(&Response{})
gob.Register(&Push{})
gob.Register(&blockchain.Transaction{})
gob.Register(&blockchain.Block{})
}

// ProtocolError is an error that occured during a request.
type ProtocolError struct {
Code ErrorCode
Message string
Code ErrorCode `json:"Code"`
Message string `json:"Message"`
}

// NewProtocolError returns a new error struct.
Expand All @@ -62,9 +51,19 @@ func (e *ProtocolError) Error() string {
return e.Message
}

// Message is a container for messages, containing a type and either a Request,
// Response, or Push in the payload.
type Message interface {
// Message is a wrapper for requests, responses, and pushes.
// Type must be one of a "Request", "Response", or "Push"
// Payload must be a marshalled representation of a Request, Response, or Push
// when the message is sent.
type Message struct {
Type string `json:"Type"`
Payload []byte `json:"Payload"`
}

// MessagePayload is an interface that is implemented by Request, Response, and
// Push. It is used to generally refer to these 3 payload types so we can
// return only a single value from Read().
type MessagePayload interface {
Write(io.Writer) error
}

Expand All @@ -73,49 +72,102 @@ type Message interface {
// parameters. PeerInfo requests should send all info of all peers. Block requests
// should specify block number in parameters.
type Request struct {
ID string
ResourceType ResourceType
Params map[string]interface{}
ID string `json:"ID"`
ResourceType ResourceType `json:"ResourceType"`
Params map[string]interface{} `json:"Params"`
}

// Response is a container for a response payload, containing the unique request
// ID of the request prompting it, an Error (if one occurred), and the requested
// resource (if no error occurred).
type Response struct {
ID string
Error *ProtocolError
Resource interface{}
ID string `json:"ID"`
Error *ProtocolError `json:"Error"`
Resource interface{} `json:"Resource"`
}

// Push is a container for a push payload, containing a resource proactively sent
// to us by another peer.
type Push struct {
ResourceType ResourceType
Resource interface{}
ResourceType ResourceType `json:"ResourceType"`
Resource interface{} `json:"Resource"`
}

// Write encodes and writes the Message into the given Writer.
func (r *Request) Write(w io.Writer) error {
var m Message = r
return gob.NewEncoder(w).Encode(&m)
payload, err := json.Marshal(r)
if err != nil {
return err
}
msg := Message{
Type: "Request",
Payload: payload,
}
return json.NewEncoder(w).Encode(msg)
}

func (r *Response) Write(w io.Writer) error {
var m Message = r
return gob.NewEncoder(w).Encode(&m)
payload, err := json.Marshal(r)
if err != nil {
return err
}
msg := Message{
Type: "Response",
Payload: payload,
}
return json.NewEncoder(w).Encode(msg)
}

func (p *Push) Write(w io.Writer) error {
var m Message = p
return gob.NewEncoder(w).Encode(&m)
payload, err := json.Marshal(p)
if err != nil {
return err
}
msg := Message{
Type: "Push",
Payload: payload,
}
return json.NewEncoder(w).Encode(msg)
}

// Read decodes a message from a Reader and returns it.
func Read(r io.Reader) (Message, error) {
// Read decodes a message from a Reader and returns the message payload, or an
// error if the read fails. On success, the payload returned will be either a
// Request, Response, or Push.
func Read(r io.Reader) (MessagePayload, error) {
var m Message
err := gob.NewDecoder(r).Decode(&m)
err := json.NewDecoder(r).Decode(&m)
if err != nil {
return nil, err
}
return m, nil

var returnPayload MessagePayload

// Check the message type and use it to unmarshal the payload
switch m.Type {
case "Request":
var req Request
err = json.Unmarshal([]byte(m.Payload), &req)
if err == nil {
log.Debug("Read request ", req)
returnPayload = &req
}
case "Response":
var res Response
err = json.Unmarshal([]byte(m.Payload), &res)
if err == nil {
log.Debug("Read response ", res)
returnPayload = &res
}
case "Push":
var push Push
err = json.Unmarshal([]byte(m.Payload), &push)
if err == nil {
log.Debug("Read push ", push)
returnPayload = &push
}
default:
err = fmt.Errorf("Received message with invalid type %s", m.Type)
}

return returnPayload, err
}
32 changes: 12 additions & 20 deletions msg/message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,14 @@ func TestRequest(t *testing.T) {
t.Fail()
}

out, err := Read(&buf)
payload, err := Read(&buf)
if err != nil {
t.Log(err.Error())
t.Fail()
}

outReq, ok := out.(*Request)
if !ok {
t.Fail()
}

if outReq.ID != id {
req, ok := payload.(*Request)
if !ok || req.ID != id {
t.Fail()
}
}
Expand All @@ -53,27 +49,23 @@ func TestResponse(t *testing.T) {
t.Fail()
}

out, err := Read(&buf)
payload, err := Read(&buf)
if err != nil {
t.Log(err.Error())
t.Fail()
}

outRes, ok := out.(*Response)
if !ok {
t.Fail()
}

if outRes.ID != id {
res, ok := payload.(*Response)
if !ok || res.ID != id {
t.Fail()
}

res, ok := outRes.Resource.(string)
resource, ok := res.Resource.(string)
if !ok {
t.Fail()
}

if res != "resource" {
if resource != "resource" {
t.Fail()
}
}
Expand All @@ -92,23 +84,23 @@ func TestPush(t *testing.T) {
t.Fail()
}

out, err := Read(&buf)
payload, err := Read(&buf)
if err != nil {
t.Log(err.Error())
t.Fail()
}

outPush, ok := out.(*Push)
push, ok := payload.(*Push)
if !ok {
t.Fail()
}

res, ok := outPush.Resource.(string)
resource, ok := push.Resource.(string)
if !ok {
t.Fail()
}

if res != "transaction" {
if resource != "transaction" {
t.Fail()
}
}
Expand Down
42 changes: 24 additions & 18 deletions peer/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func (p *Peer) Dispatch() {
errCount := 0

for {
message, err := msg.Read(p.Connection)
payload, err := msg.Read(p.Connection)
if err != nil {
if err == io.EOF {
// This just means the peer hasn't sent anything
Expand All @@ -223,17 +223,18 @@ func (p *Peer) Dispatch() {
}
errCount = 0

switch message.(type) {
switch payload.(type) {
case *msg.Request:
req := payload.(*msg.Request)
if p.requestHandler == nil {
log.Errorf("Request received but no request handler set for peer %s",
p.Connection.RemoteAddr().String())
} else {
response := p.requestHandler(message.(*msg.Request))
response := p.requestHandler(req)
response.Write(p.Connection)
}
case *msg.Response:
res := message.(*msg.Response)
res := payload.(*msg.Response)
rh := p.getResponseHandler(res.ID)
if rh == nil {
log.Error("Dispatcher could not find response handler for response")
Expand All @@ -242,15 +243,13 @@ func (p *Peer) Dispatch() {
p.removeResponseHandler(res.ID)
}
case *msg.Push:
push := payload.(*msg.Push)
if p.pushHandler == nil {
log.Errorf("Push message received but no push handler set for peer %s",
p.Connection.RemoteAddr().String())
} else {
p.pushHandler(message.(*msg.Push))
p.pushHandler(push)
}
default:
// Invalid messgae type. Ignore
log.Debug("Dispatcher received message with invalid type")
}
}
}
Expand Down Expand Up @@ -316,15 +315,19 @@ func MaintainConnections() {
// PeerInfoHandler will handle the response to a PeerInfo request by attempting
// to establish connections with all new peers in the given response Resource.
func PeerInfoHandler(res *msg.Response) {
peers := res.Resource.([]string)
peers := res.Resource.([]interface{})
log.Debugf("Found peers %s", peers)
for i := 0; i < len(peers) && PStore.Size() < MaxPeers; i++ {
p := PStore.Get(peers[i])
if p != nil || peers[i] == ListenAddr {
peerAddr, ok := peers[i].(string)
if !ok {
continue
}
p := PStore.Get(peerAddr)
if p != nil || peerAddr == ListenAddr {
// We are already connected to this peer. Skip it.
continue
}
newConn, err := conn.Dial(peers[i])
newConn, err := conn.Dial(peerAddr)
if err != nil {
log.WithError(err).Errorf("Failed to dial peer %s", peers[i])
continue
Expand Down Expand Up @@ -370,24 +373,28 @@ func exchangeListenAddrs(c net.Conn, d time.Duration) (string, error) {
receivedAddr := false
sentAddr := false
var addr string
var ok bool

for !receivedAddr || !sentAddr {
message, err := msg.Read(c)
payload, err := msg.Read(c)
if err == io.EOF {
continue
} else if err != nil {
errChan <- err
return
}

switch message.(type) {
switch payload.(type) {
case *msg.Response:
// We got the listen address back
addr = message.(*msg.Response).Resource.(string)
if validAddress(addr) || addr != ListenAddr {
res := payload.(*msg.Response)
addr, ok = res.Resource.(string)
if ok && validAddress(addr) && addr != ListenAddr {
receivedAddr = true
}
case *msg.Request:
if message.(*msg.Request).ResourceType != msg.ResourcePeerInfo {
req := payload.(*msg.Request)
if req.ResourceType != msg.ResourcePeerInfo {
continue
}
// We got a listen address request.
Expand All @@ -401,7 +408,6 @@ func exchangeListenAddrs(c net.Conn, d time.Duration) (string, error) {
errChan <- err
}
sentAddr = true
default:
}
}

Expand Down

0 comments on commit df845f2

Please sign in to comment.