Skip to content

Commit

Permalink
Merge b2de50a into a90f28b
Browse files Browse the repository at this point in the history
  • Loading branch information
Bigomby committed Aug 10, 2016
2 parents a90f28b + b2de50a commit af5a550
Show file tree
Hide file tree
Showing 19 changed files with 517 additions and 701 deletions.
62 changes: 62 additions & 0 deletions components/batch/batch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package batcher

import (
"bytes"
"time"

"github.com/benbjohnson/clock"
"github.com/redBorder/rbforwarder/types"
)

// Batch groups multiple messages
type Batch struct {
Group string
Message *types.Message
Buff *bytes.Buffer
MessageCount uint // Current number of messages in the buffer
Next types.Next // Call to pass the message to the next handler
}

// NewBatch creates a new instance of Batch
func NewBatch(m *types.Message, group string, next types.Next, clk clock.Clock,
timeoutMillis uint, ready chan *Batch) *Batch {
payload, _ := m.PopPayload()
b := &Batch{
Group: group,
Next: next,
Message: m,
MessageCount: 1,
Buff: bytes.NewBuffer(payload),
}

if timeoutMillis != 0 {
timer := clk.Timer(time.Duration(timeoutMillis) * time.Millisecond)

go func() {
<-timer.C
if b.MessageCount > 0 {
ready <- b
}
}()
}

return b
}

// Send the batch of messages to the next handler in the pipeline
func (b *Batch) Send(cb func()) {
b.Message.PushPayload(b.Buff.Bytes())
cb()
b.Next(b.Message)
}

// Add merges a new message in the buffer
func (b *Batch) Add(m *types.Message) {
newReport := m.Reports.Pop()
b.Message.Reports.Push(newReport)

newPayload, _ := m.PopPayload()
b.Buff.Write(newPayload)

b.MessageCount++
}
53 changes: 53 additions & 0 deletions components/batch/batcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package batcher

import (
"github.com/benbjohnson/clock"
"github.com/redBorder/rbforwarder/types"
)

// Batcher allows to merge multiple messages in a single one
type Batcher struct {
id int // Worker ID
batches map[string]*Batch // Collection of batches pending
readyBatches chan *Batch
clk clock.Clock

config Config // Batcher configuration
}

// Init starts a gorutine that can receive:
// - New messages that will be added to a existing or new batch of messages
// - A batch of messages that is ready to send (i.e. batch timeout has expired)
func (b *Batcher) Init(id int) {
b.id = id
b.batches = make(map[string]*Batch)
b.readyBatches = make(chan *Batch)
b.clk = clock.New()

go func() {
for batch := range b.readyBatches {
batch.Send(func() {
delete(b.batches, batch.Group)
})
}
}()
}

// OnMessage is called when a new message is receive. Add the new message to
// a batch
func (b *Batcher) OnMessage(m *types.Message, next types.Next, done types.Done) {
if group, exists := m.Opts["batch_group"].(string); exists {
if batch, exists := b.batches[group]; exists {
batch.Add(m)
if batch.MessageCount >= b.config.Limit {
b.readyBatches <- batch
}
} else {
b.batches[group] = NewBatch(m, group, next, b.clk, b.config.TimeoutMillis, b.readyBatches)
}

return
}

next(m)
}
212 changes: 212 additions & 0 deletions components/batch/batcher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
package batcher

import (
"testing"
"time"

"github.com/benbjohnson/clock"
"github.com/redBorder/rbforwarder/types"
. "github.com/smartystreets/goconvey/convey"
"github.com/stretchr/testify/mock"
)

type NexterDoner struct {
mock.Mock
nextCalled chan *types.Message
}

func (nd *NexterDoner) Next(m *types.Message) {
nd.Called(m)
nd.nextCalled <- m
}

func TestBatcher(t *testing.T) {
Convey("Given a batcher", t, func() {
batcher := &Batcher{
config: Config{
TimeoutMillis: 1000,
Limit: 10,
MaxPendingBatches: 10,
},
}

batcher.Init(0)
batcher.clk = clock.NewMock()

Convey("When a message is received with no batch group", func() {
m := types.NewMessage()
m.PushPayload([]byte("Hello World"))

nd := new(NexterDoner)
nd.nextCalled = make(chan *types.Message, 1)
nd.On("Next", mock.AnythingOfType("*types.Message")).Times(1)

batcher.OnMessage(m, nd.Next, nil)

Convey("Message should be present on the batch", func() {
nd.AssertExpectations(t)
m := <-nd.nextCalled
So(len(batcher.batches), ShouldEqual, 0)
payload, err := m.PopPayload()
So(err, ShouldBeNil)
So(string(payload), ShouldEqual, "Hello World")
})
})

Convey("When a message is received, but not yet sent", func() {
m := types.NewMessage()
m.PushPayload([]byte("Hello World"))
m.Opts = map[string]interface{}{
"batch_group": "group1",
}
m.Reports.Push("Report")

batcher.OnMessage(m, nil, nil)

Convey("Message should be present on the batch", func() {
batch, exists := batcher.batches["group1"]
So(exists, ShouldBeTrue)

data := batch.Buff.Bytes()
So(string(data), ShouldEqual, "Hello World")

opts := batch.Message.Opts
So(opts["batch_group"], ShouldEqual, "group1")

report := batch.Message.Reports.Pop().(string)
So(report, ShouldEqual, "Report")

So(len(batcher.batches), ShouldEqual, 1)
})
})

Convey("When the max number of messages is reached", func() {
var messages []*types.Message

for i := 0; i < int(batcher.config.Limit); i++ {
m := types.NewMessage()
m.PushPayload([]byte("ABC"))
m.Opts = map[string]interface{}{
"batch_group": "group1",
}
m.Reports.Push("Report")

messages = append(messages, m)
}

nd := new(NexterDoner)
nd.nextCalled = make(chan *types.Message)
nd.On("Next", mock.AnythingOfType("*types.Message")).Times(1)

for i := 0; i < int(batcher.config.Limit); i++ {
batcher.OnMessage(messages[i], nd.Next, nil)
}

Convey("The batch should be sent", func() {
m := <-nd.nextCalled
nd.AssertExpectations(t)
data, err := m.PopPayload()

So(err, ShouldBeNil)
So(string(data), ShouldEqual, "ABCABCABCABCABCABCABCABCABCABC")
So(m.Reports.Size(), ShouldEqual, batcher.config.Limit)
So(batcher.batches["group1"], ShouldBeNil)
So(len(batcher.batches), ShouldEqual, 0)
})
})

Convey("When the timeout expires", func() {
var messages []*types.Message

for i := 0; i < 5; i++ {
m := types.NewMessage()
m.PushPayload([]byte("Hello World"))
m.Opts = map[string]interface{}{
"batch_group": "group1",
}
m.Reports.Push("Report")

messages = append(messages, m)
}

nd := new(NexterDoner)
nd.nextCalled = make(chan *types.Message, 1)
nd.On("Next", mock.AnythingOfType("*types.Message")).Times(1)

for i := 0; i < 5; i++ {
batcher.OnMessage(messages[i], nd.Next, nil)
}

clk := batcher.clk.(*clock.Mock)

Convey("The batch should be sent", func() {
clk.Add(500 * time.Millisecond)
So(batcher.batches["group1"], ShouldNotBeNil)
clk.Add(500 * time.Millisecond)
<-nd.nextCalled
So(batcher.batches["group1"], ShouldBeNil)
So(len(batcher.batches), ShouldEqual, 0)
nd.AssertExpectations(t)
})
})

Convey("When multiple messages are received with differente groups", func() {
m1 := types.NewMessage()
m1.PushPayload([]byte("MESSAGE 1"))
m1.Opts = map[string]interface{}{
"batch_group": "group1",
}
m2 := types.NewMessage()
m2.PushPayload([]byte("MESSAGE 2"))
m2.Opts = map[string]interface{}{
"batch_group": "group2",
}
m3 := types.NewMessage()
m3.PushPayload([]byte("MESSAGE 3"))
m3.Opts = map[string]interface{}{
"batch_group": "group2",
}

nd := new(NexterDoner)
nd.nextCalled = make(chan *types.Message, 2)
nd.On("Next", mock.AnythingOfType("*types.Message")).Times(2)

batcher.OnMessage(m1, nd.Next, nil)
batcher.OnMessage(m2, nd.Next, nil)
batcher.OnMessage(m3, nd.Next, nil)

Convey("Each message should be in its group", func() {
group1 := batcher.batches["group1"].Buff.Bytes()
So(string(group1), ShouldEqual, "MESSAGE 1")

group2 := batcher.batches["group2"].Buff.Bytes()
So(string(group2), ShouldEqual, "MESSAGE 2MESSAGE 3")

So(len(batcher.batches), ShouldEqual, 2)
})

Convey("After a timeout the messages should be sent", func() {
clk := batcher.clk.(*clock.Mock)
So(len(batcher.batches), ShouldEqual, 2)

clk.Add(time.Duration(batcher.config.TimeoutMillis) * time.Millisecond)

group1 := <-nd.nextCalled
group1Data, err := group1.PopPayload()
So(err, ShouldBeNil)

group2 := <-nd.nextCalled
group2Data, err := group2.PopPayload()
So(err, ShouldBeNil)

So(string(group1Data), ShouldEqual, "MESSAGE 1")
So(string(group2Data), ShouldEqual, "MESSAGE 2MESSAGE 3")
So(batcher.batches["group1"], ShouldBeNil)
So(batcher.batches["group2"], ShouldBeNil)
So(len(batcher.batches), ShouldEqual, 0)

nd.AssertExpectations(t)
})
})
})
}
File renamed without changes.
Loading

0 comments on commit af5a550

Please sign in to comment.