Skip to content

Commit

Permalink
✨ Implemented multiple reports per message (#15)
Browse files Browse the repository at this point in the history
Closes #11
  • Loading branch information
Bigomby committed Aug 2, 2016
1 parent 809055a commit 32af15d
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 74 deletions.
47 changes: 26 additions & 21 deletions message.go
Original file line number Diff line number Diff line change
@@ -1,47 +1,52 @@
package rbforwarder

import "errors"
import (
"errors"

"github.com/oleiade/lane"
)

// message is used to send data through the pipeline
type message struct {
bufferStack [][]byte

payload *lane.Stack
opts *lane.Stack
seq uint64 // Unique ID for the report, used to maintain sequence
status string // Result of the sending
code int // Result of the sending
retries int
opts map[string]interface{}
channel chan *message
}

// PushData store data on an LIFO queue so the nexts handlers can use it
func (m *message) PushData(v []byte) {
m.bufferStack = append(m.bufferStack, v)
if m.payload == nil {
m.payload = lane.NewStack()
}

m.payload.Push(v)
}

// PopData get the data stored by the previous handler
func (m *message) PopData() (ret []byte, err error) {
if len(m.bufferStack) < 1 {
err = errors.New("No data on the stack")
if m.payload.Empty() {
err = errors.New("Empty stack")
return
}

ret = m.bufferStack[len(m.bufferStack)-1]
m.bufferStack = m.bufferStack[0 : len(m.bufferStack)-1]
ret = m.payload.Pop().([]byte)

return
}

// GetOpt returns an option
func (m message) GetOpt(name string) interface{} {
return m.opts[name]
}
func (m message) GetReports() []Report {
var reports []Report

func (m message) GetReport() Report {
return Report{
code: m.code,
status: m.status,
retries: m.retries,
opts: m.opts,
for !m.opts.Empty() {
reports = append(reports, Report{
code: m.code,
status: m.status,
retries: m.retries,
opts: m.opts.Pop().(map[string]interface{}),
})
}

return reports
}
5 changes: 4 additions & 1 deletion rbforwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"sync/atomic"

"github.com/Sirupsen/logrus"
"github.com/oleiade/lane"
"github.com/redBorder/rbforwarder/types"
)

Expand Down Expand Up @@ -91,10 +92,12 @@ func (f *RBForwarder) Produce(buf []byte, options map[string]interface{}) error

message := &message{
seq: seq,
opts: options,
opts: lane.NewStack(),
}

message.PushData(buf)
message.opts.Push(options)

f.p.input <- message

return nil
Expand Down
40 changes: 7 additions & 33 deletions rbforwarder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,18 +85,8 @@ func TestRBForwarder(t *testing.T) {
Convey("When a \"Hello World\" message is produced", func() {
component.status = "OK"
component.statusCode = 0
closed := false

component.On("OnMessage", mock.MatchedBy(func(m *message) bool {
opt := m.GetOpt("message_id")

if !closed {
rbforwarder.Close()
closed = true
}

return opt.(string) == "test123"
})).Times(1)
component.On("OnMessage", mock.AnythingOfType("*rbforwarder.message")).Times(1)

err := rbforwarder.Produce(
[]byte("Hello World"),
Expand All @@ -109,14 +99,14 @@ func TestRBForwarder(t *testing.T) {
for report := range rbforwarder.GetReports() {
reports++
lastReport = report
rbforwarder.Close()
}

So(lastReport, ShouldNotBeNil)
So(reports, ShouldEqual, 1)
So(lastReport.opts["message_id"], ShouldEqual, "test123")
So(lastReport.code, ShouldEqual, 0)
So(lastReport.status, ShouldEqual, "OK")

So(err, ShouldBeNil)

component.AssertExpectations(t)
Expand Down Expand Up @@ -199,32 +189,23 @@ func TestRBForwarder(t *testing.T) {
Convey("When a message fails to send", func() {
component.status = "Fake Error"
component.statusCode = 99
closed := false

component.On("OnMessage", mock.MatchedBy(func(m *message) bool {
opt := m.GetOpt("message_id")

if m.retries >= 3 && !closed {
closed = true
rbforwarder.Close()
}

return opt.(string) == "test123"
})).Times(4)
component.On("OnMessage", mock.AnythingOfType("*rbforwarder.message")).Times(4)

err := rbforwarder.Produce(
[]byte("Hello World"),
map[string]interface{}{"message_id": "test123"},
)

Convey("The message should be retried\n", func() {
Convey("The message should be retried", func() {
So(err, ShouldBeNil)

var reports int
var lastReport Report
for report := range rbforwarder.GetReports() {
reports++
lastReport = report
rbforwarder.Close()
}

So(lastReport, ShouldNotBeNil)
Expand Down Expand Up @@ -328,15 +309,8 @@ func TestRBForwarder(t *testing.T) {
component2.status = "OK"
component2.statusCode = 0

component1.On("OnMessage", mock.MatchedBy(func(m *message) bool {
opt := m.GetOpt("message_id")
return opt.(string) == "test123"
}))

component2.On("OnMessage", mock.MatchedBy(func(m *message) bool {
opt := m.GetOpt("message_id")
return opt.(string) == "test123"
}))
component1.On("OnMessage", mock.AnythingOfType("*rbforwarder.message"))
component2.On("OnMessage", mock.AnythingOfType("*rbforwarder.message"))

err := rbforwarder.Produce(
[]byte("Hello World"),
Expand Down
38 changes: 20 additions & 18 deletions reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,9 @@ func (r *reporter) GetReports() chan Report {

go func() {
for message := range r.out {
reports <- message.GetReport()
for _, report := range message.GetReports() {
reports <- report
}
}

close(reports)
Expand All @@ -94,25 +96,25 @@ func (r *reporter) GetOrderedReports() chan Report {

go func() {
for message := range r.out {
report := message.GetReport()

if message.seq == r.currentReport {
// The message is the expected. Send it.
reports <- report
r.currentReport++
} else {
// This message is not the expected. Store it.
r.queued[message.seq] = report
}

// Check if there are stored messages and send them.
for {
if currentReport, ok := r.queued[r.currentReport]; ok {
reports <- currentReport
delete(r.queued, r.currentReport)
for _, report := range message.GetReports() {
if message.seq == r.currentReport {
// The message is the expected. Send it.
reports <- report
r.currentReport++
} else {
break
// This message is not the expected. Store it.
r.queued[message.seq] = report
}

// Check if there are stored messages and send them.
for {
if currentReport, ok := r.queued[r.currentReport]; ok {
reports <- currentReport
delete(r.queued, r.currentReport)
r.currentReport++
} else {
break
}
}
}
}
Expand Down
1 change: 0 additions & 1 deletion types/messenger.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,4 @@ package types
type Messenger interface {
PopData() ([]byte, error)
PushData(data []byte)
GetOpt(name string) interface{}
}

0 comments on commit 32af15d

Please sign in to comment.