Skip to content

Commit

Permalink
rename message -> msg
Browse files Browse the repository at this point in the history
  • Loading branch information
jordanschalm committed Jun 8, 2017
1 parent b2bb416 commit b8d39cc
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 33 deletions.
2 changes: 1 addition & 1 deletion message/message.go → msg/message.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package message
package msg

import (
"encoding/gob"
Expand Down
2 changes: 1 addition & 1 deletion message/message_test.go → msg/message_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package message
package msg

import (
"bytes"
Expand Down
62 changes: 31 additions & 31 deletions peer/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
log "github.com/Sirupsen/logrus"
"github.com/google/uuid"
"github.com/ubclaunchpad/cumulus/conn"
"github.com/ubclaunchpad/cumulus/message"
"github.com/ubclaunchpad/cumulus/msg"
)

const (
Expand Down Expand Up @@ -75,12 +75,12 @@ func (ps *PeerStore) Addrs() []string {

// ChanStore is a threadsafe container for response channels.
type ChanStore struct {
chans map[string]chan *message.Response
chans map[string]chan *msg.Response
lock sync.RWMutex
}

// Add synchronously adds a channel with the given id to the store.
func (cs *ChanStore) Add(id string, channel chan *message.Response) {
func (cs *ChanStore) Add(id string, channel chan *msg.Response) {
cs.lock.Lock()
defer cs.lock.Lock()
cs.chans[id] = channel
Expand All @@ -94,7 +94,7 @@ func (cs *ChanStore) Remove(id string) {
}

// Get retrieves the channel with the given ID.
func (cs *ChanStore) Get(id string) chan *message.Response {
func (cs *ChanStore) Get(id string) chan *msg.Response {
cs.lock.RLock()
defer cs.lock.RUnlock()
return cs.chans[id]
Expand All @@ -106,24 +106,24 @@ type Peer struct {
Connection net.Conn
Store *PeerStore
resChans *ChanStore
reqChan chan *message.Request
pushChan chan *message.Push
reqChan chan *msg.Request
pushChan chan *msg.Push
lock sync.RWMutex
}

// New returns a new Peer
func New(c net.Conn, ps *PeerStore) *Peer {
cs := &ChanStore{
chans: make(map[string]chan *message.Response),
chans: make(map[string]chan *msg.Response),
lock: sync.RWMutex{},
}
return &Peer{
ID: uuid.New(),
Connection: c,
Store: ps,
resChans: cs,
reqChan: make(chan *message.Request),
pushChan: make(chan *message.Push),
reqChan: make(chan *msg.Request),
pushChan: make(chan *msg.Push),
}
}

Expand All @@ -147,28 +147,28 @@ func (p *Peer) Dispatch() {
p.Connection.SetDeadline(time.Now().Add(Timeout))

for {
msg, err := message.Read(p.Connection)
message, err := msg.Read(p.Connection)
if err != nil {
log.WithError(err).Error("Dispatcher failed to read message")
continue
}

switch msg.Type() {
case message.MessageRequest:
p.reqChan <- msg.(*message.Request)
switch message.Type() {
case msg.MessageRequest:
p.reqChan <- message.(*msg.Request)
break
case message.MessageResponse:
res := msg.(*message.Response)
case msg.MessageResponse:
res := message.(*msg.Response)
resChan := p.resChans.Get(res.ID)
if resChan != nil {
resChan <- msg.(*message.Response)
resChan <- message.(*msg.Response)
} else {
log.Errorf("Dispatcher could not find channel for response %s", res.ID)
}
p.resChans.Remove(res.ID)
break
case message.MessagePush:
p.pushChan <- msg.(*message.Push)
case msg.MessagePush:
p.pushChan <- message.(*msg.Push)
break
default:
// Invalid messgae type. Ignore
Expand All @@ -180,7 +180,7 @@ func (p *Peer) Dispatch() {
// RequestHandler waits on this peer's request channel for incoming requests
// from the Dispatcher, responding to each request appropriately.
func (p *Peer) RequestHandler() {
var req *message.Request
var req *msg.Request
for {
select {
case req = <-p.reqChan:
Expand All @@ -189,18 +189,18 @@ func (p *Peer) RequestHandler() {
continue
}

res := message.Response{ID: req.ID}
res := msg.Response{ID: req.ID}

switch req.ResourceType {
case message.ResourcePeerInfo:
case msg.ResourcePeerInfo:
res.Resource = p.Store.Addrs()
break
case message.ResourceBlock, message.ResourceTransaction:
res.Error = message.NewProtocolError(message.NotImplemented,
case msg.ResourceBlock, msg.ResourceTransaction:
res.Error = msg.NewProtocolError(msg.NotImplemented,
"Block and Transaction requests are not yet implemented on this peer")
break
default:
res.Error = message.NewProtocolError(message.InvalidResourceType,
res.Error = msg.NewProtocolError(msg.InvalidResourceType,
"Invalid resource type")
}

Expand All @@ -214,7 +214,7 @@ func (p *Peer) RequestHandler() {
// PushHandler waits on this peer's request channel for incoming requests
// from the Dispatcher, responding to each request appropriately.
func (p *Peer) PushHandler() {
var push *message.Push
var push *msg.Push
for {
select {
case push = <-p.pushChan:
Expand All @@ -224,7 +224,7 @@ func (p *Peer) PushHandler() {
}

switch push.ResourceType {
case message.ResourcePeerInfo:
case msg.ResourcePeerInfo:
for _, addr := range push.Resource.([]string) {
c, err := conn.Dial(addr)
if err != nil {
Expand All @@ -234,8 +234,8 @@ func (p *Peer) PushHandler() {
}
}
break
case message.ResourceBlock:
case message.ResourceTransaction:
case msg.ResourceBlock:
case msg.ResourceTransaction:
default:
// Invalid resource type. Ignore
}
Expand All @@ -244,7 +244,7 @@ func (p *Peer) PushHandler() {

// AwaitResponse waits on a response channel for a response message sent by the
// Dispatcher. When a response arrives it is handled appropriately.
func (p *Peer) AwaitResponse(req message.Request, c chan *message.Response) {
func (p *Peer) AwaitResponse(req msg.Request, c chan *msg.Response) {
defer p.resChans.Remove(req.ID)
select {
case res := <-c:
Expand All @@ -258,8 +258,8 @@ func (p *Peer) AwaitResponse(req message.Request, c chan *message.Response) {
// Request sends the given request over this peer's Connection and spawns a
// response listener with AwaitResponse. Returns error if request could not be
// written.
func (p *Peer) Request(req message.Request) error {
resChan := make(chan *message.Response)
func (p *Peer) Request(req msg.Request) error {
resChan := make(chan *msg.Response)
p.resChans.Add(req.ID, resChan)
err := req.Write(p.Connection)
if err != nil {
Expand Down

0 comments on commit b8d39cc

Please sign in to comment.