Skip to content

Commit

Permalink
Improve test and code testability
Browse files Browse the repository at this point in the history
  • Loading branch information
Bigomby committed Jul 26, 2016
1 parent 46d25eb commit 140eea0
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 37 deletions.
26 changes: 20 additions & 6 deletions backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ import (
"github.com/redBorder/rbforwarder/pipeline"
)

type backend struct {
// Backend orchestrates the pipeline
type Backend struct {
sender pipeline.Sender

// Pool of workers
Expand All @@ -21,7 +22,7 @@ type backend struct {
messagePool chan *pipeline.Message

workers int
queue int
queueSize int
maxMessages int
maxBytes int

Expand All @@ -32,24 +33,37 @@ type backend struct {
keepSending chan struct{}
}

func (b *backend) Init() {
// NewBackend creates a new Backend
func NewBackend(workers, queueSize, maxMessages, maxBytes int) *Backend {
b := &Backend{
workers: workers,
queueSize: queueSize,
maxMessages: maxMessages,
maxBytes: maxBytes,
}

b.senderPool = make(chan chan *pipeline.Message, b.workers)

b.messages = make(chan *pipeline.Message)
b.input = make(chan *pipeline.Message)
b.reports = make(chan *pipeline.Message)
b.messagePool = make(chan *pipeline.Message, b.queue)
b.messagePool = make(chan *pipeline.Message, b.queueSize)

b.keepSending = make(chan struct{})

for i := 0; i < b.queue; i++ {
for i := 0; i < b.queueSize; i++ {
b.messagePool <- &pipeline.Message{
Metadata: make(map[string]interface{}),
InputBuffer: new(bytes.Buffer),
OutputBuffer: new(bytes.Buffer),
}
}

return b
}

// Init initializes a backend
func (b *Backend) Init() {
for i := 0; i < b.workers; i++ {
b.startSender(i)
}
Expand Down Expand Up @@ -102,7 +116,7 @@ func (b *backend) Init() {
}

// Worker that sends the message
func (b *backend) startSender(i int) {
func (b *Backend) startSender(i int) {
sender := b.sender
sender.Init(i, b.reports)

Expand Down
13 changes: 3 additions & 10 deletions rbforwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ var Logger = logrus.NewEntry(log)
// send messages and get reports. It has a backend for routing messages between
// workers
type RBForwarder struct {
backend *backend
backend *Backend
reportHandler *reportHandler
reports chan pipeline.Report
counter uint64
Expand All @@ -30,15 +30,8 @@ type RBForwarder struct {

// NewRBForwarder creates a new Forwarder object
func NewRBForwarder(config Config) *RBForwarder {
backend := &backend{
workers: config.Workers,
queue: config.QueueSize,
maxMessages: config.MaxMessages,
maxBytes: config.MaxBytes,
}

forwarder := &RBForwarder{
backend: backend,
backend: NewBackend(config.Workers, config.QueueSize, config.MaxMessages, config.MaxBytes),
reports: make(chan pipeline.Report, config.QueueSize),
config: config,
}
Expand All @@ -47,7 +40,7 @@ func NewRBForwarder(config Config) *RBForwarder {
config.Retries,
config.Backoff,
config.QueueSize,
backend.input,
forwarder.backend.input,
)

fields := logrus.Fields{
Expand Down
52 changes: 31 additions & 21 deletions rbforwarder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,47 +7,57 @@ import (

"github.com/redBorder/rbforwarder/pipeline"
. "github.com/smartystreets/goconvey/convey"
"github.com/stretchr/testify/mock"
)

type TestSender struct {
type MockSender struct {
mock.Mock
channel chan string
reports chan *pipeline.Message
}

func (tsender *TestSender) Init(id int, reports chan *pipeline.Message) error {
tsender.reports = reports
return nil
func (s *MockSender) Init(id int, reports chan *pipeline.Message) error {
args := s.Called(id, reports)
s.reports = reports
return args.Error(0)
}

func (tsender *TestSender) OnMessage(m *pipeline.Message) error {
// time.Sleep((time.Millisecond * 10) * time.Duration(rand.Int31n(50)))
func (s *MockSender) OnMessage(m *pipeline.Message) error {
s.channel <- string(m.InputBuffer.Bytes())
s.reports <- m

tsender.channel <- string(m.InputBuffer.Bytes())
m.Report.StatusCode = 0
m.Report.Status = "OK"
tsender.reports <- m
args := s.Called(m)

return nil
return args.Error(0)
}

func TestBackend(t *testing.T) {
Convey("Given a working pipeline", t, func() {
numMessages := 10000
numWorkers := 10

sender := &TestSender{
sender := &MockSender{
channel: make(chan string, 10000),
}

rbforwarder := NewRBForwarder(Config{
Retries: 1,
Workers: 10,
Retries: 0,
Workers: numWorkers,
QueueSize: 10000,
})

for i := 0; i < numWorkers; i++ {
sender.On("Init", i, rbforwarder.backend.reports).Return(nil)
}

rbforwarder.SetSender(sender)
rbforwarder.Start()

Convey("When 10000 messages are produced", func() {
sender.On("OnMessage", mock.AnythingOfType("*pipeline.Message")).
Return(nil).
Times(numMessages)

go func() {
for i := 0; i < numMessages; i++ {
if err := rbforwarder.Produce([]byte(""), map[string]interface{}{
Expand All @@ -67,6 +77,7 @@ func TestBackend(t *testing.T) {
}

So(i, ShouldEqual, numMessages)
sender.AssertExpectations(t)
})

Convey("10000 reports should be received", func() {
Expand All @@ -78,6 +89,7 @@ func TestBackend(t *testing.T) {
}

So(i, ShouldEqual, numMessages)
sender.AssertExpectations(t)
})

Convey("10000 reports should be received in order", func() {
Expand All @@ -96,6 +108,7 @@ func TestBackend(t *testing.T) {

So(err, ShouldBeNil)
So(i, ShouldEqual, numMessages)
sender.AssertExpectations(t)
})
})

Expand All @@ -107,16 +120,13 @@ func TestBackend(t *testing.T) {
}

Convey("\"Hello World\" message should be get by the worker", func() {
message := <-sender.channel
So(message, ShouldEqual, "Hello World")
})
sender.On("OnMessage", mock.MatchedBy(func(m *pipeline.Message) bool {
return m.Metadata["message_id"] == "test123"
})).Return(nil)

Convey("A report of the \"Hello World\" message should be received", func() {
report := <-rbforwarder.GetReports()
So(report.StatusCode, ShouldEqual, 0)
So(report.Status, ShouldEqual, "OK")
So(report.ID, ShouldEqual, 0)
So(report.Metadata["message_id"], ShouldEqual, "test123")
sender.AssertExpectations(t)
})
})

Expand Down

0 comments on commit 140eea0

Please sign in to comment.