Skip to content

Commit

Permalink
Merge 3990371 into a90f28b
Browse files Browse the repository at this point in the history
  • Loading branch information
Bigomby committed Aug 10, 2016
2 parents a90f28b + 3990371 commit ca83632
Show file tree
Hide file tree
Showing 19 changed files with 605 additions and 698 deletions.
64 changes: 64 additions & 0 deletions components/batch/batch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package batcher

import (
"bytes"
"time"

"github.com/benbjohnson/clock"
"github.com/redBorder/rbforwarder/types"
)

// Batch groups multiple messages
type Batch struct {
Group string
Message *types.Message
MessageCount uint // Current number of messages in the buffer
Next types.Next // Call to pass the message to the next handler
}

// NewBatch creates a new instance of Batch
func NewBatch(m *types.Message, group string, next types.Next, clk clock.Clock,
timeoutMillis uint, ready chan *Batch) *Batch {
b := &Batch{
Group: group,
Next: next,
Message: m,
MessageCount: 1,
}

if timeoutMillis != 0 {
timer := clk.Timer(time.Duration(timeoutMillis) * time.Millisecond)

go func() {
<-timer.C
if b.MessageCount > 0 {
ready <- b
}
}()
}

return b
}

// Send the batch of messages to the next handler in the pipeline
func (b *Batch) Send(cb func()) {
cb()
b.Next(b.Message)
}

// Add merges a new message in the buffer
func (b *Batch) Add(m *types.Message) {
newPayload := m.Payload.Pop().([]byte)
newOptions := m.Opts.Pop().(map[string]interface{})
newReport := m.Reports.Pop()

currentPayload := b.Message.Payload.Pop().([]byte)
buff := bytes.NewBuffer(currentPayload)
buff.Write(newPayload)

b.Message.Payload.Push(buff.Bytes())
b.Message.Opts.Push(newOptions)
b.Message.Reports.Push(newReport)

b.MessageCount++
}
55 changes: 55 additions & 0 deletions components/batch/batcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package batcher

import (
"github.com/benbjohnson/clock"
"github.com/redBorder/rbforwarder/types"
)

// Batcher allows to merge multiple messages in a single one
type Batcher struct {
id int // Worker ID
batches map[string]*Batch // Collection of batches pending
readyBatches chan *Batch
clk clock.Clock

config Config // Batcher configuration
}

// Init starts a gorutine that can receive:
// - New messages that will be added to a existing or new batch of messages
// - A batch of messages that is ready to send (i.e. batch timeout has expired)
func (b *Batcher) Init(id int) {
b.id = id
b.batches = make(map[string]*Batch)
b.readyBatches = make(chan *Batch)
b.clk = clock.New()

go func() {
for batch := range b.readyBatches {
batch.Send(func() {
delete(b.batches, batch.Group)
})
}
}()
}

// OnMessage is called when a new message is receive. Add the new message to
// a batch
func (b *Batcher) OnMessage(m *types.Message, next types.Next, done types.Done) {
if opts, ok := m.Opts.Head().(map[string]interface{}); ok {
if group, exists := opts["batch_group"].(string); exists {
if batch, exists := b.batches[group]; exists {
batch.Add(m)
if batch.MessageCount >= b.config.Limit {
b.readyBatches <- batch
}
} else {
b.batches[group] = NewBatch(m, group, next, b.clk, b.config.TimeoutMillis, b.readyBatches)
}

return
}
}

next(m)
}
238 changes: 238 additions & 0 deletions components/batch/batcher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
package batcher

import (
"testing"
"time"

"github.com/benbjohnson/clock"
"github.com/oleiade/lane"
"github.com/redBorder/rbforwarder/types"
. "github.com/smartystreets/goconvey/convey"
"github.com/stretchr/testify/mock"
)

type NexterDoner struct {
mock.Mock
nextCalled chan *types.Message
}

func (nd *NexterDoner) Next(m *types.Message) {
nd.Called(m)
nd.nextCalled <- m
}

func TestBatcher(t *testing.T) {
Convey("Given a batcher", t, func() {
batcher := &Batcher{
config: Config{
TimeoutMillis: 1000,
Limit: 10,
MaxPendingBatches: 10,
},
}

batcher.Init(0)
batcher.clk = clock.NewMock()

Convey("When a message is received with no batch group", func() {
m := &types.Message{
Payload: lane.NewStack(),
Opts: lane.NewStack(),
Reports: lane.NewStack(),
}
m.Payload.Push([]byte("Hello World"))

nd := new(NexterDoner)
nd.nextCalled = make(chan *types.Message, 1)
nd.On("Next", mock.AnythingOfType("*types.Message")).Times(1)

batcher.OnMessage(m, nd.Next, nil)

Convey("Message should be present on the batch", func() {
nd.AssertExpectations(t)
m := <-nd.nextCalled
So(len(batcher.batches), ShouldEqual, 0)
So(string(m.Payload.Pop().([]byte)), ShouldEqual, "Hello World")
})
})

Convey("When a message is received, but not yet sent", func() {
m := &types.Message{
Payload: lane.NewStack(),
Opts: lane.NewStack(),
Reports: lane.NewStack(),
}
m.Payload.Push([]byte("Hello World"))
m.Opts.Push(map[string]interface{}{
"batch_group": "group1",
})
m.Reports.Push("Report")

batcher.OnMessage(m, nil, nil)

Convey("Message should be present on the batch", func() {
var batch *Batch
var exists bool
batch, exists = batcher.batches["group1"]

So(exists, ShouldBeTrue)
data := batch.Message.Payload.Pop().([]byte)
opts := batch.Message.Opts.Pop().(map[string]interface{})
report := batch.Message.Reports.Pop().(string)
So(string(data), ShouldEqual, "Hello World")
So(opts["batch_group"], ShouldEqual, "group1")
So(report, ShouldEqual, "Report")
So(len(batcher.batches), ShouldEqual, 1)
})
})

Convey("When the max number of messages is reached", func() {
var messages []*types.Message

for i := 0; i < int(batcher.config.Limit); i++ {
m := &types.Message{
Payload: lane.NewStack(),
Opts: lane.NewStack(),
Reports: lane.NewStack(),
}
m.Payload.Push([]byte("ABC"))
m.Opts.Push(map[string]interface{}{
"batch_group": "group1",
})
m.Reports.Push("Report")

messages = append(messages, m)
}

nd := new(NexterDoner)
nd.nextCalled = make(chan *types.Message)
nd.On("Next", mock.AnythingOfType("*types.Message")).Times(1)

for i := 0; i < int(batcher.config.Limit); i++ {
batcher.OnMessage(messages[i], nd.Next, nil)
}

Convey("The batch should be sent", func() {
m := <-nd.nextCalled
nd.AssertExpectations(t)
data := m.Payload.Pop().([]byte)
optsSize := m.Opts.Size()
reportsSize := m.Reports.Size()
So(string(data), ShouldEqual, "ABCABCABCABCABCABCABCABCABCABC")
So(m.Payload.Empty(), ShouldBeTrue)
So(optsSize, ShouldEqual, batcher.config.Limit)
So(reportsSize, ShouldEqual, batcher.config.Limit)
So(batcher.batches["group1"], ShouldBeNil)
So(len(batcher.batches), ShouldEqual, 0)
})
})

Convey("When the timeout expires", func() {
var messages []*types.Message

for i := 0; i < 5; i++ {
m := &types.Message{
Payload: lane.NewStack(),
Opts: lane.NewStack(),
Reports: lane.NewStack(),
}
m.Payload.Push([]byte("ABC"))
m.Opts.Push(map[string]interface{}{
"batch_group": "group1",
})
m.Reports.Push("Report")

messages = append(messages, m)
}

nd := new(NexterDoner)
nd.nextCalled = make(chan *types.Message, 1)
nd.On("Next", mock.AnythingOfType("*types.Message")).Times(1)

for i := 0; i < 5; i++ {
batcher.OnMessage(messages[i], nd.Next, nil)
}

clk := batcher.clk.(*clock.Mock)

Convey("The batch should be sent", func() {
clk.Add(500 * time.Millisecond)
So(batcher.batches["group1"], ShouldNotBeNil)
clk.Add(500 * time.Millisecond)
<-nd.nextCalled
So(batcher.batches["group1"], ShouldBeNil)
So(len(batcher.batches), ShouldEqual, 0)
nd.AssertExpectations(t)
})
})

Convey("When multiple messages are received with differente groups", func() {
m1 := &types.Message{
Payload: lane.NewStack(),
Opts: lane.NewStack(),
Reports: lane.NewStack(),
}
m1.Payload.Push([]byte("MESSAGE 1"))
m1.Opts.Push(map[string]interface{}{
"batch_group": "group1",
})

m2 := &types.Message{
Payload: lane.NewStack(),
Opts: lane.NewStack(),
Reports: lane.NewStack(),
}
m2.Payload.Push([]byte("MESSAGE 2"))
m2.Opts.Push(map[string]interface{}{
"batch_group": "group2",
})

m3 := &types.Message{
Payload: lane.NewStack(),
Opts: lane.NewStack(),
Reports: lane.NewStack(),
}
m3.Payload.Push([]byte("MESSAGE 3"))
m3.Opts.Push(map[string]interface{}{
"batch_group": "group2",
})

nd := new(NexterDoner)
nd.nextCalled = make(chan *types.Message, 2)
nd.On("Next", mock.AnythingOfType("*types.Message")).Times(2)

batcher.OnMessage(m1, nd.Next, nil)
batcher.OnMessage(m2, nd.Next, nil)
batcher.OnMessage(m3, nd.Next, nil)

Convey("Each message should be in its group", func() {
var err error
group1 := batcher.batches["group1"].Message.Payload.Pop().([]byte)
So(err, ShouldBeNil)

group2 := batcher.batches["group2"].Message.Payload.Pop().([]byte)
So(err, ShouldBeNil)

So(string(group1), ShouldEqual, "MESSAGE 1")
So(string(group2), ShouldEqual, "MESSAGE 2MESSAGE 3")
So(len(batcher.batches), ShouldEqual, 2)
})

Convey("After a timeout the messages should be sent", func() {
clk := batcher.clk.(*clock.Mock)
So(len(batcher.batches), ShouldEqual, 2)
clk.Add(time.Duration(batcher.config.TimeoutMillis) * time.Millisecond)
group1 := <-nd.nextCalled
group1Data := group1.Payload.Pop().([]byte)
So(string(group1Data), ShouldEqual, "MESSAGE 1")
group2 := <-nd.nextCalled
group2Data := group2.Payload.Pop().([]byte)
So(string(group2Data), ShouldEqual, "MESSAGE 2MESSAGE 3")
So(batcher.batches["group1"], ShouldBeNil)
So(batcher.batches["group2"], ShouldBeNil)
So(len(batcher.batches), ShouldEqual, 0)
nd.AssertExpectations(t)
})
})
})
}
File renamed without changes.
Loading

0 comments on commit ca83632

Please sign in to comment.