Skip to content

Commit

Permalink
Fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
bogh committed Jun 15, 2016
1 parent ccd2774 commit 170eeb4
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 27 deletions.
54 changes: 34 additions & 20 deletions server/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@ package server
import (
"errors"
"fmt"
"github.com/smancke/guble/protocol"
"runtime"
"sync"
"time"

"github.com/smancke/guble/protocol"

log "github.com/Sirupsen/logrus"
)

var (
Expand All @@ -21,11 +24,12 @@ type Route struct {
//
// The queue can have a settable size and if it reaches the capacity the
// route is closed
queue *queue
queue *queue
queueSize int

// Timeout to define how long to wait for the message to be read on the channel
// if timeout is reached the route is closed
timeout time.Duration // timeout before closing channel
timeout time.Duration
closeC chan struct{}

// Indicates if the consumer go routine is running
Expand All @@ -36,6 +40,7 @@ type Route struct {
Path protocol.Path
UserID string // UserID that subscribed or pushes messages to the router
ApplicationID string
logger *log.Entry
}

// NewRoute creates a new route pointer
Expand All @@ -46,11 +51,18 @@ func NewRoute(path, applicationID, userID string, c chan *MessageForRoute) *Rout
route := &Route{
messagesC: c,
queue: newQueue(queueSize),
queueSize: queueSize,
timeout: -1,
closeC: make(chan bool),
closeC: make(chan struct{}),
Path: protocol.Path(path),
UserID: userID,
ApplicationID: applicationID,

logger: logger.WithFields(log.Fields{
"path": path,
"applicationID": applicationID,
"userID": userID,
}),
}
return route
}
Expand Down Expand Up @@ -112,10 +124,11 @@ func (r *Route) MessagesC() chan *MessageForRoute {
// Deliver takes a messages and adds it to the queue to be delivered in to the
// channel
func (r *Route) Deliver(m *protocol.Message) error {
protocol.Debug("Delivering message %s", m)
logger := r.logger.WithField("message", m)
logger.Debug("Delivering message")

if r.isInvalid() {
protocol.Debug("Cannot deliver cause route is invalid")
logger.Debug("Cannot deliver cause route is invalid")
mTotalDeliverMessageErrors.Add(1)
return ErrInvalidRoute
}
Expand All @@ -126,18 +139,18 @@ func (r *Route) Deliver(m *protocol.Message) error {
if r.queueSize == 0 {
return r.sendDirect(m)
} else if r.queue.len() >= r.queueSize {
protocol.Debug("Closing route cause of full queue")
logger.Debug("Closing route cause of full queue")
r.Close()
mTotalDeliverMessageErrors.Add(1)
return ErrQueueFull
}
}

r.queue.push(m)
protocol.Debug("Queue size: %d", r.queue.len())
logger.Debug("Queue size: %d", r.queue.len())

if !r.isConsuming() {
protocol.Debug("Starting consuming")
logger.Debug("Starting consuming")
go r.consume()
}

Expand All @@ -148,7 +161,8 @@ func (r *Route) Deliver(m *protocol.Message) error {
// consume starts to consume the queue and pass the messages to route
// channel. Stops if there are no items in the queue. Should be started as a goroutine.
func (r *Route) consume() {
protocol.Debug("Consuming route %s queue", r)
r.logger.Debug("Consuming route queue")

r.setConsuming(true)
defer r.setConsuming(false)

Expand All @@ -159,7 +173,7 @@ func (r *Route) consume() {

for {
if r.isInvalid() {
protocol.Debug("Stopping consuming cause of invalid route.")
r.logger.Debug("Stopping consuming cause route is invalid.")
mTotalDeliverMessageErrors.Add(1)
return
}
Expand All @@ -168,16 +182,16 @@ func (r *Route) consume() {

if err != nil {
if err == errEmptyQueue {
protocol.Debug("Empty queue")
r.logger.Debug("Empty queue")
return
}
protocol.Err("Error fetching queue message %s", err)
r.logger.WithField("error", err).Error("Error fetching queue message")
continue
}

// send next message throught the channel
if err = r.send(m); err != nil {
protocol.Err("Error sending message %s through route %s", m, r)
r.logger.WithField("message", m).Error("Error sending message through route")
if err == errTimeout || err == ErrInvalidRoute {
// channel been closed, ending the consumer
return
Expand All @@ -190,12 +204,12 @@ func (r *Route) consume() {

func (r *Route) send(m *protocol.Message) error {
defer r.invalidRecover()
protocol.Debug("Sending message %s through route %s channel", m, r)
r.logger.WithField("message", m).Debug("Sending message through route channel")

// no timeout, means we dont close the channel
if r.timeout == -1 {
r.messagesC <- r.format(m)
protocol.Debug("Channel size: %d", len(r.messagesC))
r.logger.WithField("size", len(r.messagesC)).Debug("Channel size")
return nil
}

Expand All @@ -205,7 +219,7 @@ func (r *Route) send(m *protocol.Message) error {
case <-r.closeC:
return ErrInvalidRoute
case <-time.After(r.timeout):
protocol.Debug("Closing route cause of timeout")
r.logger.Debug("Closing route cause of timeout")
r.Close()
return errTimeout
}
Expand All @@ -214,7 +228,7 @@ func (r *Route) send(m *protocol.Message) error {
// recover function to use when sending on closed channel
func (r *Route) invalidRecover() error {
if rc := recover(); rc != nil && r.isInvalid() {
protocol.Debug("Recovered closed router err: %s", rc)
r.logger.WithField("error", rc).Debug("Recovered closed router")
return ErrInvalidRoute
}
return nil
Expand All @@ -233,7 +247,7 @@ func (r *Route) sendDirect(m *protocol.Message) error {
case r.messagesC <- r.format(m):
return nil
default:
protocol.Debug("Closing route cause of full channel")
r.logger.Debug("Closing route cause of full channel")
r.Close()
return ErrChannelFull
}
Expand All @@ -243,7 +257,7 @@ func (r *Route) sendDirect(m *protocol.Message) error {
func (r *Route) Close() error {
r.mu.Lock()
defer r.mu.Unlock()
protocol.Debug("Closing route: %s", r)
r.logger.Debug("Closing route")

// route already closed
if r.invalid {
Expand Down
9 changes: 5 additions & 4 deletions server/route_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package server

import (
"github.com/smancke/guble/protocol"
"github.com/stretchr/testify/assert"
"testing"
"time"

"github.com/smancke/guble/protocol"
"github.com/stretchr/testify/assert"
)

var (
Expand Down Expand Up @@ -52,7 +53,7 @@ func TestRouteDeliver_sendDirect(t *testing.T) {
case _, open := <-r.MessagesC():
a.False(open)
default:
protocol.Debug("len(r.C): %v", len(r.MessagesC()))
logger.Debug("len(r.C): %v", len(r.MessagesC()))
a.Fail("channel was not closed")
}

Expand Down Expand Up @@ -143,7 +144,7 @@ func TestRoute_CloseTwice(t *testing.T) {
}

func TestQueue_ShiftEmpty(t *testing.T) {
q := newQueue()
q := newQueue(5)
q.shift()
assert.Equal(t, 0, q.len())
}
Expand Down
7 changes: 4 additions & 3 deletions server/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@ package server

import (
"errors"
"testing"
"time"

"github.com/golang/mock/gomock"
"github.com/smancke/guble/protocol"
"github.com/smancke/guble/server/auth"
"github.com/smancke/guble/store"
"github.com/smancke/guble/testutil"
"github.com/stretchr/testify/assert"
"testing"
"time"
)

var aTestByteMessage = []byte("Hello World!")
Expand Down Expand Up @@ -253,7 +254,7 @@ func TestRoute_IsRemovedIfChannelIsFull(t *testing.T) {
case _, open := <-r.MessagesC():
a.False(open)
default:
protocol.Debug("len(r.C): %v", len(r.MessagesC()))
logger.Debug("len(r.C): %v", len(r.MessagesC()))
a.Fail("channel was not closed")
}
}
Expand Down

0 comments on commit 170eeb4

Please sign in to comment.