Skip to content

Commit

Permalink
Merge 87779e4 into a4f7cb6
Browse files Browse the repository at this point in the history
  • Loading branch information
Bigomby committed Aug 16, 2016
2 parents a4f7cb6 + 87779e4 commit 3600319
Show file tree
Hide file tree
Showing 7 changed files with 101 additions and 43 deletions.
61 changes: 61 additions & 0 deletions examples/http_send.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package main

import (
"fmt"

"github.com/redBorder/rbforwarder"
"github.com/redBorder/rbforwarder/components/batch"
"github.com/redBorder/rbforwarder/components/httpsender"
)

func main() {
var components []interface{}
var workers []int
const numMessages = 100000

f := rbforwarder.NewRBForwarder(rbforwarder.Config{
Retries: 3,
Backoff: 5,
QueueSize: 10000,
})

batch := &batcher.Batcher{
Config: batcher.Config{
TimeoutMillis: 1000,
Limit: 1000,
},
}
components = append(components, batch)
workers = append(workers, 1)

sender := &httpsender.HTTPSender{
URL: "http://localhost:8888",
}
components = append(components, sender)
workers = append(workers, 1)

f.PushComponents(components, workers)

opts := map[string]interface{}{
"http_endpoint": "librb-http",
"batch_group": "librb-http",
}

for i := 0; i < numMessages; i++ {
data := fmt.Sprintf("{\"message\": %d}", i)
f.Produce([]byte(data), opts, i)
}

var errors int
for report := range f.GetReports() {
r := report.(rbforwarder.Report)
if r.Code > 0 {
errors += r.Code
}
if r.Opaque.(int) == numMessages-1 {
break
}
}

fmt.Printf("Sent %d messages with %d errors\n", numMessages, errors)
}
7 changes: 4 additions & 3 deletions pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,11 @@ func (p *pipeline) PushComponent(composser utils.Composer, w int) {
}
}, func(m *utils.Message, code int, status string) {
reports := lane.NewStack()

for !m.Reports.Empty() {
rep := m.Reports.Pop().(report)
rep.code = code
rep.status = status
rep := m.Reports.Pop().(Report)
rep.Code = code
rep.Status = status
reports.Push(rep)
}

Expand Down
10 changes: 4 additions & 6 deletions rbforwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"sync/atomic"

"github.com/Sirupsen/logrus"
"github.com/oleiade/lane"
"github.com/redBorder/rbforwarder/utils"
)

Expand Down Expand Up @@ -63,9 +62,9 @@ func (f *RBForwarder) Close() {
}

// PushComponents adds a new component to the pipeline
func (f *RBForwarder) PushComponents(components []utils.Composer, w []int) {
func (f *RBForwarder) PushComponents(components []interface{}, w []int) {
for i, component := range components {
f.p.PushComponent(component, w[i])
f.p.PushComponent(component.(utils.Composer), w[i])
}
}

Expand All @@ -90,15 +89,14 @@ func (f *RBForwarder) Produce(data []byte, opts map[string]interface{}, opaque i
seq := f.currentProducedID
f.currentProducedID++
m := utils.NewMessage()
r := report{
r := Report{
seq: seq,
opaque: lane.NewStack(),
Opaque: opaque,
}

m.PushPayload(data)
m.Opts = opts
m.Reports.Push(r)
r.opaque.Push(opaque)

f.p.input <- m

Expand Down
43 changes: 21 additions & 22 deletions rbforwarder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ type MockMiddleComponent struct {
mock.Mock
}

func (c *MockMiddleComponent) Init(id int) error {
args := c.Called()
return args.Error(0)
func (c *MockMiddleComponent) Init(id int) {
c.Called()
return
}

func (c *MockMiddleComponent) OnMessage(
Expand All @@ -40,9 +40,8 @@ type MockComponent struct {
statusCode int
}

func (c *MockComponent) Init(id int) error {
args := c.Called()
return args.Error(0)
func (c *MockComponent) Init(id int) {
c.Called()
}

func (c *MockComponent) OnMessage(
Expand Down Expand Up @@ -77,7 +76,7 @@ func TestRBForwarder(t *testing.T) {

component.On("Init").Return(nil).Times(numWorkers)

var components []utils.Composer
var components []interface{}
var instances []int
components = append(components, component)
instances = append(instances, numWorkers)
Expand All @@ -99,18 +98,18 @@ func TestRBForwarder(t *testing.T) {
)

Convey("\"Hello World\" message should be get by the worker", func() {
var lastReport report
var lastReport Report
var reports int
for r := range rbforwarder.GetReports() {
reports++
lastReport = r.(report)
lastReport = r.(Report)
rbforwarder.Close()
}

So(lastReport, ShouldNotBeNil)
So(reports, ShouldEqual, 1)
So(lastReport.code, ShouldEqual, 0)
So(lastReport.status, ShouldEqual, "OK")
So(lastReport.Code, ShouldEqual, 0)
So(lastReport.Status, ShouldEqual, "OK")
So(err, ShouldBeNil)

component.AssertExpectations(t)
Expand Down Expand Up @@ -148,14 +147,14 @@ func TestRBForwarder(t *testing.T) {
So(err, ShouldBeNil)

var reports int
var lastReport report
var lastReport Report
for r := range rbforwarder.GetReports() {
reports++
lastReport = r.(report)
lastReport = r.(Report)
rbforwarder.Close()
}

opaque := lastReport.opaque.Pop().(string)
opaque := lastReport.Opaque.(string)
So(opaque, ShouldEqual, "This is an opaque")
})
})
Expand All @@ -178,17 +177,17 @@ func TestRBForwarder(t *testing.T) {
So(err, ShouldBeNil)

var reports int
var lastReport report
var lastReport Report
for r := range rbforwarder.GetReports() {
reports++
lastReport = r.(report)
lastReport = r.(Report)
rbforwarder.Close()
}

So(lastReport, ShouldNotBeNil)
So(reports, ShouldEqual, 1)
So(lastReport.status, ShouldEqual, "Fake Error")
So(lastReport.code, ShouldEqual, 99)
So(lastReport.Status, ShouldEqual, "Fake Error")
So(lastReport.Code, ShouldEqual, 99)
So(lastReport.retries, ShouldEqual, numRetries)

component.AssertExpectations(t)
Expand Down Expand Up @@ -233,7 +232,7 @@ func TestRBForwarder(t *testing.T) {
var reports int

for rep := range rbforwarder.GetOrderedReports() {
if rep.(report).opaque.Pop().(int) != reports {
if rep.(Report).Opaque.(int) != reports {
ordered = false
}
reports++
Expand Down Expand Up @@ -271,7 +270,7 @@ func TestRBForwarder(t *testing.T) {
component2.On("Init").Return(nil)
}

var components []utils.Composer
var components []interface{}
var instances []int

components = append(components, component1)
Expand Down Expand Up @@ -302,8 +301,8 @@ func TestRBForwarder(t *testing.T) {
for rep := range rbforwarder.GetReports() {
reports++

code := rep.(report).code
status := rep.(report).status
code := rep.(Report).Code
status := rep.(Report).Status
So(code, ShouldEqual, 0)
So(status, ShouldEqual, "OK")
}
Expand Down
11 changes: 5 additions & 6 deletions report.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
package rbforwarder

import "github.com/oleiade/lane"
// Report contains information abot a delivered message
type Report struct {
Code int
Status string
Opaque interface{}

type report struct {
seq uint64
code int
status string
retries int

opaque *lane.Stack
}
10 changes: 5 additions & 5 deletions reportHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ func newReporter(
// Get reports from the handler channel
for m := range r.input {
// If the message has status code 0 (success) send the report to the user
rep := m.Reports.Head().(report)
if rep.code == 0 || r.maxRetries == 0 {
rep := m.Reports.Head().(Report)
if rep.Code == 0 || r.maxRetries == 0 {
r.out <- m
continue
}
Expand All @@ -62,7 +62,7 @@ func newReporter(
r.wg.Add(1)
go func(m *utils.Message) {
defer r.wg.Done()
rep := m.Reports.Pop().(report)
rep := m.Reports.Pop().(Report)
rep.retries++
m.Reports.Push(rep)
<-time.After(time.Duration(r.backoff) * time.Second)
Expand All @@ -86,7 +86,7 @@ func (r *reportHandler) GetReports() chan interface{} {
go func() {
for message := range r.out {
for !message.Reports.Empty() {
rep := message.Reports.Pop().(report)
rep := message.Reports.Pop().(Report)
reports <- rep
}
}
Expand All @@ -103,7 +103,7 @@ func (r *reportHandler) GetOrderedReports() chan interface{} {
go func() {
for message := range r.out {
for !message.Reports.Empty() {
rep := message.Reports.Pop().(report)
rep := message.Reports.Pop().(Report)
if rep.seq == r.currentReport {
// The message is the expected. Send it.
reports <- rep
Expand Down
2 changes: 1 addition & 1 deletion utils/composer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,6 @@ type Done func(*Message, int, string)
// Composer represents a component in the pipeline that performs a work on
// a message
type Composer interface {
Init(int) error
Init(int)
OnMessage(*Message, Next, Done)
}

0 comments on commit 3600319

Please sign in to comment.