Skip to content

Commit

Permalink
Merge cbbb3ff into d86acde
Browse files Browse the repository at this point in the history
  • Loading branch information
Bigomby committed Aug 4, 2016
2 parents d86acde + cbbb3ff commit 615c606
Show file tree
Hide file tree
Showing 7 changed files with 69 additions and 38 deletions.
21 changes: 16 additions & 5 deletions message.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"errors"

"github.com/oleiade/lane"
"github.com/redBorder/rbforwarder/types"
)

// message is used to send data through the pipeline
Expand All @@ -27,31 +28,41 @@ func (m *message) PushData(v []byte) {

// PopData get the data stored by the previous handler
func (m *message) PopData() (ret []byte, err error) {
if m.payload == nil {
err = errors.New("Uninitialized payload")
return
}

if m.payload.Empty() {
err = errors.New("Empty stack")
return
}
ret = m.payload.Pop().([]byte)

ret = m.payload.Pop().([]byte)
return
}

// PopData get the data stored by the previous handler
func (m *message) PopOpts() (ret map[string]interface{}, err error) {
if m.opts == nil {
err = errors.New("Uninitialized options")
return
}

if m.opts.Empty() {
err = errors.New("Empty stack")
return
}
ret = m.opts.Pop().(map[string]interface{})

ret = m.opts.Pop().(map[string]interface{})
return
}

func (m message) GetReports() []Report {
var reports []Report
func (m message) Reports() []types.Reporter {
var reports []types.Reporter

for !m.opts.Empty() {
reports = append(reports, Report{
reports = append(reports, report{
code: m.code,
status: m.status,
retries: m.retries,
Expand Down
6 changes: 3 additions & 3 deletions rbforwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ var Logger = logrus.NewEntry(log)
// workers
type RBForwarder struct {
p *pipeline
r *reporter
r *reportHandler

currentProducedID uint64
working uint32
Expand Down Expand Up @@ -71,13 +71,13 @@ func (f *RBForwarder) PushComponents(components []types.Composer, w []int) {

// GetReports is used by the source to get a report for a sent message.
// Reports are delivered on the same order that was sent
func (f *RBForwarder) GetReports() <-chan Report {
func (f *RBForwarder) GetReports() <-chan types.Reporter {
return f.r.GetReports()
}

// GetOrderedReports is the same as GetReports() but the reports are delivered
// in order
func (f *RBForwarder) GetOrderedReports() <-chan Report {
func (f *RBForwarder) GetOrderedReports() <-chan types.Reporter {
return f.r.GetOrderedReports()
}

Expand Down
31 changes: 16 additions & 15 deletions rbforwarder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,11 @@ func TestRBForwarder(t *testing.T) {
)

Convey("\"Hello World\" message should be get by the worker", func() {
var lastReport Report
var lastReport report
var reports int
for report := range rbforwarder.GetReports() {
for r := range rbforwarder.GetReports() {
reports++
lastReport = report
lastReport = r.(report)
rbforwarder.Close()
}

Expand Down Expand Up @@ -143,8 +143,8 @@ func TestRBForwarder(t *testing.T) {

Convey("Should be possible to read an option", func() {
for report := range rbforwarder.GetReports() {
So(report.opts, ShouldNotBeNil)
So(report.opts["option"], ShouldEqual, "example_option")
So(report.GetOpts(), ShouldNotBeNil)
So(report.GetOpts()["option"], ShouldEqual, "example_option")
}

So(err, ShouldBeNil)
Expand All @@ -153,8 +153,8 @@ func TestRBForwarder(t *testing.T) {

Convey("Should not be possible to read an nonexistent option", func() {
for report := range rbforwarder.GetReports() {
So(report.opts, ShouldNotBeNil)
So(report.opts["invalid"], ShouldBeEmpty)
So(report.GetOpts(), ShouldNotBeNil)
So(report.GetOpts()["invalid"], ShouldBeEmpty)
}

So(err, ShouldBeNil)
Expand All @@ -177,7 +177,7 @@ func TestRBForwarder(t *testing.T) {
Convey("Should not be possible to read the option", func() {
for report := range rbforwarder.GetReports() {
So(err, ShouldBeNil)
So(report.opts, ShouldBeNil)
So(report.GetOpts(), ShouldBeNil)
}

So(err, ShouldBeNil)
Expand All @@ -202,10 +202,10 @@ func TestRBForwarder(t *testing.T) {
So(err, ShouldBeNil)

var reports int
var lastReport Report
for report := range rbforwarder.GetReports() {
var lastReport report
for r := range rbforwarder.GetReports() {
reports++
lastReport = report
lastReport = r.(report)
rbforwarder.Close()
}

Expand Down Expand Up @@ -257,7 +257,7 @@ func TestRBForwarder(t *testing.T) {
var reports int

for report := range rbforwarder.GetOrderedReports() {
if report.opts["message_id"] != reports {
if report.GetOpts()["message_id"] != reports {
ordered = false
}
reports++
Expand Down Expand Up @@ -325,9 +325,10 @@ func TestRBForwarder(t *testing.T) {
for report := range rbforwarder.GetReports() {
reports++

So(report.opts["message_id"], ShouldEqual, "test123")
So(report.code, ShouldEqual, 0)
So(report.status, ShouldEqual, "OK")
So(report.GetOpts()["message_id"], ShouldEqual, "test123")
code, status, _ := report.Status()
So(code, ShouldEqual, 0)
So(status, ShouldEqual, "OK")
}

m := <-component2.channel
Expand Down
11 changes: 9 additions & 2 deletions report.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
package rbforwarder

// Report contains information about a produced message
type Report struct {
type report struct {
code int
status string
retries int
opts map[string]interface{}
}

func (r report) Status() (code int, status string, retries int) {
return r.code, r.status, r.retries
}

func (r report) GetOpts() map[string]interface{} {
return r.opts
}
28 changes: 15 additions & 13 deletions reporter.go → reportHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,20 @@ package rbforwarder
import (
"sync"
"time"

"github.com/redBorder/rbforwarder/types"
)

// reporter is used to handle the reports produced by the last element
// reportHandler is used to handle the reports produced by the last element
// of the pipeline. The first element of the pipeline can know the status
// of the produced message using GetReports() or GetOrderedReports()
type reporter struct {
type reportHandler struct {
input chan *message // Receive messages from pipeline
retries chan *message // Send messages back to the pipeline
out chan *message // Send reports to the user

queued map[uint64]Report // Store pending reports
currentReport uint64 // Last delivered report
queued map[uint64]types.Reporter // Store pending reports
currentReport uint64 // Last delivered report

maxRetries int
backoff int
Expand All @@ -26,14 +28,14 @@ type reporter struct {
func newReporter(
maxRetries, backoff int,
input, retries chan *message,
) *reporter {
) *reportHandler {

r := &reporter{
r := &reportHandler{
input: input,
retries: retries,
out: make(chan *message, 100), // NOTE Temp channel size

queued: make(map[uint64]Report),
queued: make(map[uint64]types.Reporter),

maxRetries: maxRetries,
backoff: backoff,
Expand Down Expand Up @@ -75,12 +77,12 @@ func newReporter(
return r
}

func (r *reporter) GetReports() chan Report {
reports := make(chan Report)
func (r *reportHandler) GetReports() chan types.Reporter {
reports := make(chan types.Reporter)

go func() {
for message := range r.out {
for _, report := range message.GetReports() {
for _, report := range message.Reports() {
reports <- report
}
}
Expand All @@ -91,12 +93,12 @@ func (r *reporter) GetReports() chan Report {
return reports
}

func (r *reporter) GetOrderedReports() chan Report {
reports := make(chan Report)
func (r *reportHandler) GetOrderedReports() chan types.Reporter {
reports := make(chan types.Reporter)

go func() {
for message := range r.out {
for _, report := range message.GetReports() {
for _, report := range message.Reports() {
if message.seq == r.currentReport {
// The message is the expected. Send it.
reports <- report
Expand Down
1 change: 1 addition & 0 deletions types/messenger.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ package types
type Messenger interface {
PopData() ([]byte, error)
PopOpts() (map[string]interface{}, error)
Reports() []Reporter
}
9 changes: 9 additions & 0 deletions types/reporter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package types

// Reporter returns information about a processed message. The GetOpts method
// should return, at least, the same info that was pushed to the original
// message from the user.
type Reporter interface {
Status() (code int, status string, retries int)
GetOpts() map[string]interface{}
}

0 comments on commit 615c606

Please sign in to comment.