Skip to content

Commit

Permalink
Merge pull request #30 from redBorder/feature/fix-batcher
Browse files Browse the repository at this point in the history
New batcher implementation
  • Loading branch information
Bigomby committed Aug 16, 2016
2 parents f704bdd + 730bd05 commit a4f7cb6
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 31 deletions.
6 changes: 3 additions & 3 deletions components/batch/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type Batch struct {
Buff *bytes.Buffer
MessageCount uint // Current number of messages in the buffer
Next utils.Next // Call to pass the message to the next handler
Timer *clock.Timer
}

// NewBatch creates a new instance of Batch
Expand All @@ -30,10 +31,10 @@ func NewBatch(m *utils.Message, group string, next utils.Next, clk clock.Clock,
}

if timeoutMillis != 0 {
timer := clk.Timer(time.Duration(timeoutMillis) * time.Millisecond)
b.Timer = clk.Timer(time.Duration(timeoutMillis) * time.Millisecond)

go func() {
<-timer.C
<-b.Timer.C
if b.MessageCount > 0 {
ready <- b
}
Expand All @@ -47,7 +48,6 @@ func NewBatch(m *utils.Message, group string, next utils.Next, clk clock.Clock,
func (b *Batch) Send(cb func()) {
b.Message.PushPayload(b.Buff.Bytes())
cb()
b.Next(b.Message)
}

// Add merges a new message in the buffer
Expand Down
74 changes: 53 additions & 21 deletions components/batch/batcher.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,25 @@
package batcher

import (
"sync"

"github.com/benbjohnson/clock"
"github.com/redBorder/rbforwarder/utils"
)

// Batcher allows to merge multiple messages in a single one
type Batcher struct {
id int // Worker ID
id int // Worker ID
wg sync.WaitGroup
batches map[string]*Batch // Collection of batches pending
readyBatches chan *Batch
clk clock.Clock
incoming chan struct {
m *utils.Message
next utils.Next
}

config Config // Batcher configuration
Config Config // Batcher configuration
}

// Init starts a gorutine that can receive:
Expand All @@ -22,32 +29,57 @@ func (b *Batcher) Init(id int) {
b.id = id
b.batches = make(map[string]*Batch)
b.readyBatches = make(chan *Batch)
b.clk = clock.New()
b.incoming = make(chan struct {
m *utils.Message
next utils.Next
})
if b.clk == nil {
b.clk = clock.New()
}

go func() {
for batch := range b.readyBatches {
batch.Send(func() {
delete(b.batches, batch.Group)
})
for {
select {
case message := <-b.incoming:
group, exists := message.m.Opts["batch_group"].(string)
if !exists {
message.next(message.m)
} else {
if batch, exists := b.batches[group]; exists {
batch.Add(message.m)

if batch.MessageCount >= b.Config.Limit {
batch.Send(func() {
delete(b.batches, group)
batch.Timer.Stop()
batch.Next(batch.Message)
})
}
} else {
b.batches[group] = NewBatch(message.m, group, message.next, b.clk,
b.Config.TimeoutMillis, b.readyBatches)
}
}

b.wg.Done()

case batch := <-b.readyBatches:
batch.Send(func() {
delete(b.batches, batch.Group)
batch.Next(batch.Message)
})
}
}
}()
}

// OnMessage is called when a new message is receive. Add the new message to
// a batch
func (b *Batcher) OnMessage(m *utils.Message, next utils.Next, done utils.Done) {
if group, exists := m.Opts["batch_group"].(string); exists {
if batch, exists := b.batches[group]; exists {
batch.Add(m)
if batch.MessageCount >= b.config.Limit {
b.readyBatches <- batch
}
} else {
b.batches[group] = NewBatch(m, group, next, b.clk, b.config.TimeoutMillis, b.readyBatches)
}

return
}

next(m)
b.wg.Add(1)
b.incoming <- struct {
m *utils.Message
next utils.Next
}{m, next}
b.wg.Wait()
}
24 changes: 17 additions & 7 deletions components/batch/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func (nd *NexterDoner) Next(m *utils.Message) {
func TestBatcher(t *testing.T) {
Convey("Given a batcher", t, func() {
batcher := &Batcher{
config: Config{
Config: Config{
TimeoutMillis: 1000,
Limit: 10,
MaxPendingBatches: 10,
Expand Down Expand Up @@ -83,7 +83,7 @@ func TestBatcher(t *testing.T) {
Convey("When the max number of messages is reached", func() {
var messages []*utils.Message

for i := 0; i < int(batcher.config.Limit); i++ {
for i := 0; i < int(batcher.Config.Limit); i++ {
m := utils.NewMessage()
m.PushPayload([]byte("ABC"))
m.Opts = map[string]interface{}{
Expand All @@ -95,10 +95,10 @@ func TestBatcher(t *testing.T) {
}

nd := new(NexterDoner)
nd.nextCalled = make(chan *utils.Message)
nd.nextCalled = make(chan *utils.Message, 1)
nd.On("Next", mock.AnythingOfType("*utils.Message")).Times(1)

for i := 0; i < int(batcher.config.Limit); i++ {
for i := 0; i < int(batcher.Config.Limit); i++ {
batcher.OnMessage(messages[i], nd.Next, nil)
}

Expand All @@ -109,7 +109,7 @@ func TestBatcher(t *testing.T) {

So(err, ShouldBeNil)
So(string(data), ShouldEqual, "ABCABCABCABCABCABCABCABCABCABC")
So(m.Reports.Size(), ShouldEqual, batcher.config.Limit)
So(m.Reports.Size(), ShouldEqual, batcher.Config.Limit)
So(batcher.batches["group1"], ShouldBeNil)
So(len(batcher.batches), ShouldEqual, 0)
})
Expand Down Expand Up @@ -150,19 +150,23 @@ func TestBatcher(t *testing.T) {
})
})

Convey("When multiple messages are received with differente groups", func() {
Convey("When multiple messages are received with differents groups", func() {
m1 := utils.NewMessage()
m1.PushPayload([]byte("MESSAGE 1"))
m1.Reports.Push("Report 1")
m1.Opts = map[string]interface{}{
"batch_group": "group1",
}

m2 := utils.NewMessage()
m2.PushPayload([]byte("MESSAGE 2"))
m2.Reports.Push("Report 2")
m2.Opts = map[string]interface{}{
"batch_group": "group2",
}
m3 := utils.NewMessage()
m3.PushPayload([]byte("MESSAGE 3"))
m3.Reports.Push("Report 3")
m3.Opts = map[string]interface{}{
"batch_group": "group2",
}
Expand All @@ -189,15 +193,21 @@ func TestBatcher(t *testing.T) {
clk := batcher.clk.(*clock.Mock)
So(len(batcher.batches), ShouldEqual, 2)

clk.Add(time.Duration(batcher.config.TimeoutMillis) * time.Millisecond)
clk.Add(time.Duration(batcher.Config.TimeoutMillis) * time.Millisecond)

group1 := <-nd.nextCalled
group1Data, err := group1.PopPayload()
report1 := group1.Reports.Pop().(string)
So(err, ShouldBeNil)
So(report1, ShouldEqual, "Report 1")

group2 := <-nd.nextCalled
group2Data, err := group2.PopPayload()
So(err, ShouldBeNil)
report3 := group2.Reports.Pop().(string)
So(report3, ShouldEqual, "Report 3")
report2 := group2.Reports.Pop().(string)
So(report2, ShouldEqual, "Report 2")

So(string(group1Data), ShouldEqual, "MESSAGE 1")
So(string(group2Data), ShouldEqual, "MESSAGE 2MESSAGE 3")
Expand Down

0 comments on commit a4f7cb6

Please sign in to comment.