Skip to content

Commit

Permalink
Merge f323464 into d86acde
Browse files Browse the repository at this point in the history
  • Loading branch information
Bigomby committed Aug 3, 2016
2 parents d86acde + f323464 commit 43b4464
Show file tree
Hide file tree
Showing 4 changed files with 370 additions and 0 deletions.
70 changes: 70 additions & 0 deletions components/batcher/batch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package batcher

import (
"bytes"
"errors"
"time"

"github.com/benbjohnson/clock"
"github.com/oleiade/lane"
"github.com/redBorder/rbforwarder/types"
)

// BatchMessage contains multiple messages
type BatchMessage struct {
Group string // Name used to group messages
Buff *bytes.Buffer // Buffer for merge multiple messages
MessageCount uint // Current number of messages in the buffer
BytesCount uint // Current number of bytes in the buffer
Next types.Next // Call to pass the message to the next handler
Opts *lane.Stack // Original messages options
}

// StartTimeout initializes a timeout used to send the messages when expires. No
// matters how many messages are in the buffer.
func (b *BatchMessage) StartTimeout(clk clock.Clock, timeoutMillis uint, ready chan *BatchMessage) *BatchMessage {
if clk == nil {
clk = clock.New()
}

if timeoutMillis != 0 {
timer := clk.Timer(time.Duration(timeoutMillis) * time.Millisecond)

go func() {
<-timer.C
if b.MessageCount > 0 {
ready <- b
}
}()
}
return b
}

// Send the batch of messages to the next handler in the pipeline
func (b *BatchMessage) Send(cb func()) {
cb()
b.Next(b)
}

// Write merges a new message in the buffer
func (b *BatchMessage) Write(data []byte) {
b.Buff.Write(data)
b.MessageCount++
}

// PopData returns the buffer with all the messages merged
func (b *BatchMessage) PopData() (ret []byte, err error) {
return b.Buff.Bytes(), nil
}

// PopOpts get the data stored by the previous handler
func (b *BatchMessage) PopOpts() (ret map[string]interface{}, err error) {
if b.Opts.Empty() {
err = errors.New("Empty stack")
return
}

ret = b.Opts.Pop().(map[string]interface{})

return
}
100 changes: 100 additions & 0 deletions components/batcher/batcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package batcher

import (
"bytes"
"sync"

"github.com/benbjohnson/clock"
"github.com/oleiade/lane"
"github.com/redBorder/rbforwarder/types"
)

// Batcher allows to merge multiple messages in a single one
type Batcher struct {
id int // Worker ID
batches map[string]*BatchMessage // Collection of batches pending
newBatches chan *BatchMessage // Send messages to sender gorutine
pendingBatches chan *BatchMessage // Send messages to sender gorutine
wg sync.WaitGroup
clk clock.Clock

config Config // Batcher configuration
}

// Init starts a gorutine that can receive:
// - New messages that will be added to a existing or new batch of messages
// - A batch of messages that is ready to send (i.e. batch timeout has expired)
func (b *Batcher) Init(id int) {
b.batches = make(map[string]*BatchMessage)
b.newBatches = make(chan *BatchMessage)
b.pendingBatches = make(chan *BatchMessage)

readyBatches := make(chan *BatchMessage)

if b.clk == nil {
b.clk = clock.New()
}

go func() {
for {
select {
case batchMessage := <-readyBatches:
batchMessage.Send(func() {
delete(b.batches, batchMessage.Group)
})

case batchMessage := <-b.newBatches:
batchMessage.StartTimeout(b.clk, b.config.TimeoutMillis, readyBatches)
b.batches[batchMessage.Group] = batchMessage
b.wg.Done()

case batchMessage := <-b.pendingBatches:
opts, err := batchMessage.PopOpts()
if err != nil {
break
}

b.batches[batchMessage.Group].Opts.Push(opts)
b.batches[batchMessage.Group].Write(batchMessage.Buff.Bytes())

if b.batches[batchMessage.Group].MessageCount >= b.config.Limit {
b.batches[batchMessage.Group].Send(func() {
delete(b.batches, batchMessage.Group)
})
}

b.wg.Done()
}
}
}()
}

// OnMessage is called when a new message is receive. Add the new message to
// a batch
func (b *Batcher) OnMessage(m types.Messenger, next types.Next, done types.Done) {
opts, _ := m.PopOpts()
group, ok := opts["batch_group"].(string)
if !ok {
next(m)
}

data, _ := m.PopData()
batchMessage := &BatchMessage{
Buff: bytes.NewBuffer(data),
Opts: lane.NewStack(),
MessageCount: 1,
Next: next,
Group: group,
}

batchMessage.Opts.Push(opts)

b.wg.Add(1)
if _, exists := b.batches[group]; exists {
b.pendingBatches <- batchMessage
} else {
b.newBatches <- batchMessage
}

b.wg.Wait()
}
192 changes: 192 additions & 0 deletions components/batcher/batcher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
package batcher

import (
"testing"
"time"

"github.com/benbjohnson/clock"
"github.com/redBorder/rbforwarder/types"
. "github.com/smartystreets/goconvey/convey"
"github.com/stretchr/testify/mock"
)

type NexterDoner struct {
mock.Mock
nextCalled chan struct{}
}

func (nd *NexterDoner) Next(m types.Messenger) {
nd.nextCalled <- struct{}{}
nd.Called(m)
}

func (nd *NexterDoner) Done(m types.Messenger, code int, status string) {
nd.Called(m, code, status)
}

type TestMessage struct {
mock.Mock
}

func (m *TestMessage) PopData() (data []byte, err error) {
args := m.Called()

return args.Get(0).([]byte), args.Error(1)
}

func (m *TestMessage) PopOpts() (opts map[string]interface{}, err error) {
args := m.Called()

return args.Get(0).(map[string]interface{}), args.Error(1)
}

func TestRBForwarder(t *testing.T) {
Convey("Given a batcher", t, func() {
batcher := &Batcher{config: Config{
TimeoutMillis: 1000,
Limit: 10,
MaxPendingBatches: 10,
},
clk: clock.NewMock(),
}

batcher.Init(0)

Convey("When a message is received, but not yet sent", func() {
m := new(TestMessage)
m.On("PopData").Return([]byte("Hello World"), nil)
m.On("PopOpts").Return(map[string]interface{}{
"batch_group": "group1",
}, nil)

batcher.OnMessage(m, nil, nil)

Convey("Message should be present on the batch", func() {
var data *BatchMessage
var exists bool
data, exists = batcher.batches["group1"]

So(exists, ShouldBeTrue)
So(string(data.Buff.Bytes()), ShouldEqual, "Hello World")
So(len(batcher.batches), ShouldEqual, 1)

m.AssertExpectations(t)
})
})

Convey("When the max number of messages is reached", func() {
m := new(TestMessage)
m.On("PopData").Return([]byte("ABC"), nil)
m.On("PopOpts").Return(map[string]interface{}{
"batch_group": "group1",
}, nil)

nd := new(NexterDoner)
nd.nextCalled = make(chan struct{}, 1)
nd.On("Next", mock.MatchedBy(func(m *BatchMessage) bool {
data, _ := m.PopData()
return string(data) == "ABCABCABCABCABCABCABCABCABCABC"
})).Times(1)

for i := 0; i < int(batcher.config.Limit); i++ {
batcher.OnMessage(m, nd.Next, nil)
}

Convey("The batch should be sent", func() {
nd.AssertExpectations(t)
<-nd.nextCalled
So(batcher.batches["group1"], ShouldBeNil)
So(len(batcher.batches), ShouldEqual, 0)
})
})

Convey("When the timeout expires", func() {
m := new(TestMessage)
m.On("PopData").Return([]byte("ABC"), nil)
m.On("PopOpts").Return(map[string]interface{}{
"batch_group": "group1",
}, nil)

nd := new(NexterDoner)
nd.nextCalled = make(chan struct{}, 1)
nd.On("Next", mock.MatchedBy(func(m *BatchMessage) bool {
data, _ := m.PopData()
return string(data) == "ABCABCABCABCABC"
})).Times(1)

for i := 0; i < 5; i++ {
batcher.OnMessage(m, nd.Next, nil)
}

clk := batcher.clk.(*clock.Mock)

Convey("The batch should be sent", func() {
clk.Add(500 * time.Millisecond)
So(batcher.batches["group1"], ShouldNotBeNil)
clk.Add(500 * time.Millisecond)
<-nd.nextCalled
So(batcher.batches["group1"], ShouldBeNil)
So(len(batcher.batches), ShouldEqual, 0)
nd.AssertExpectations(t)
})
})

Convey("When multiple messages are received with differente groups", func() {
m1 := new(TestMessage)
m1.On("PopData").Return([]byte("MESSAGE 1"), nil)
m1.On("PopOpts").Return(map[string]interface{}{
"batch_group": "group1",
}, nil)

m2 := new(TestMessage)
m2.On("PopData").Return([]byte("MESSAGE 2"), nil)
m2.On("PopOpts").Return(map[string]interface{}{
"batch_group": "group2",
}, nil)

m3 := new(TestMessage)
m3.On("PopData").Return([]byte("MESSAGE 3"), nil)
m3.On("PopOpts").Return(map[string]interface{}{
"batch_group": "group2",
}, nil)

nd := new(NexterDoner)
nd.nextCalled = make(chan struct{}, 2)
nd.On("Next", mock.AnythingOfType("*batcher.BatchMessage")).Times(2)

for i := 0; i < 3; i++ {
batcher.OnMessage(m1, nd.Next, nil)
}
for i := 0; i < 3; i++ {
batcher.OnMessage(m2, nd.Next, nil)
}
batcher.OnMessage(m3, nd.Next, nil)

Convey("Each message should be in its group", func() {
var err error
group1, err := batcher.batches["group1"].PopData()
So(err, ShouldBeNil)
group2, err := batcher.batches["group2"].PopData()
So(err, ShouldBeNil)
So(string(group1), ShouldEqual, "MESSAGE 1MESSAGE 1MESSAGE 1")
So(string(group2), ShouldEqual, "MESSAGE 2MESSAGE 2MESSAGE 2MESSAGE 3")
So(len(batcher.batches), ShouldEqual, 2)
m1.AssertExpectations(t)
m2.AssertExpectations(t)
m3.AssertExpectations(t)
})

Convey("After a timeout the messages should be sent", func() {
clk := batcher.clk.(*clock.Mock)
So(len(batcher.batches), ShouldEqual, 2)
clk.Add(1000 * time.Second)
<-nd.nextCalled
<-nd.nextCalled
So(batcher.batches["group1"], ShouldBeNil)
So(batcher.batches["group2"], ShouldBeNil)
So(len(batcher.batches), ShouldEqual, 0)
nd.AssertExpectations(t)
})
})
})
}
8 changes: 8 additions & 0 deletions components/batcher/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package batcher

// Config stores the config for a Batcher
type Config struct {
TimeoutMillis uint
Limit uint
MaxPendingBatches uint
}

0 comments on commit 43b4464

Please sign in to comment.