Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New batcher implementation #30

Merged
merged 1 commit into from
Aug 16, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions components/batch/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type Batch struct {
Buff *bytes.Buffer
MessageCount uint // Current number of messages in the buffer
Next utils.Next // Call to pass the message to the next handler
Timer *clock.Timer
}

// NewBatch creates a new instance of Batch
Expand All @@ -30,10 +31,10 @@ func NewBatch(m *utils.Message, group string, next utils.Next, clk clock.Clock,
}

if timeoutMillis != 0 {
timer := clk.Timer(time.Duration(timeoutMillis) * time.Millisecond)
b.Timer = clk.Timer(time.Duration(timeoutMillis) * time.Millisecond)

go func() {
<-timer.C
<-b.Timer.C
if b.MessageCount > 0 {
ready <- b
}
Expand All @@ -47,7 +48,6 @@ func NewBatch(m *utils.Message, group string, next utils.Next, clk clock.Clock,
func (b *Batch) Send(cb func()) {
b.Message.PushPayload(b.Buff.Bytes())
cb()
b.Next(b.Message)
}

// Add merges a new message in the buffer
Expand Down
74 changes: 53 additions & 21 deletions components/batch/batcher.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,25 @@
package batcher

import (
"sync"

"github.com/benbjohnson/clock"
"github.com/redBorder/rbforwarder/utils"
)

// Batcher allows to merge multiple messages in a single one
type Batcher struct {
id int // Worker ID
id int // Worker ID
wg sync.WaitGroup
batches map[string]*Batch // Collection of batches pending
readyBatches chan *Batch
clk clock.Clock
incoming chan struct {
m *utils.Message
next utils.Next
}

config Config // Batcher configuration
Config Config // Batcher configuration
}

// Init starts a gorutine that can receive:
Expand All @@ -22,32 +29,57 @@ func (b *Batcher) Init(id int) {
b.id = id
b.batches = make(map[string]*Batch)
b.readyBatches = make(chan *Batch)
b.clk = clock.New()
b.incoming = make(chan struct {
m *utils.Message
next utils.Next
})
if b.clk == nil {
b.clk = clock.New()
}

go func() {
for batch := range b.readyBatches {
batch.Send(func() {
delete(b.batches, batch.Group)
})
for {
select {
case message := <-b.incoming:
group, exists := message.m.Opts["batch_group"].(string)
if !exists {
message.next(message.m)
} else {
if batch, exists := b.batches[group]; exists {
batch.Add(message.m)

if batch.MessageCount >= b.Config.Limit {
batch.Send(func() {
delete(b.batches, group)
batch.Timer.Stop()
batch.Next(batch.Message)
})
}
} else {
b.batches[group] = NewBatch(message.m, group, message.next, b.clk,
b.Config.TimeoutMillis, b.readyBatches)
}
}

b.wg.Done()

case batch := <-b.readyBatches:
batch.Send(func() {
delete(b.batches, batch.Group)
batch.Next(batch.Message)
})
}
}
}()
}

// OnMessage is called when a new message is receive. Add the new message to
// a batch
func (b *Batcher) OnMessage(m *utils.Message, next utils.Next, done utils.Done) {
if group, exists := m.Opts["batch_group"].(string); exists {
if batch, exists := b.batches[group]; exists {
batch.Add(m)
if batch.MessageCount >= b.config.Limit {
b.readyBatches <- batch
}
} else {
b.batches[group] = NewBatch(m, group, next, b.clk, b.config.TimeoutMillis, b.readyBatches)
}

return
}

next(m)
b.wg.Add(1)
b.incoming <- struct {
m *utils.Message
next utils.Next
}{m, next}
b.wg.Wait()
}
24 changes: 17 additions & 7 deletions components/batch/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func (nd *NexterDoner) Next(m *utils.Message) {
func TestBatcher(t *testing.T) {
Convey("Given a batcher", t, func() {
batcher := &Batcher{
config: Config{
Config: Config{
TimeoutMillis: 1000,
Limit: 10,
MaxPendingBatches: 10,
Expand Down Expand Up @@ -83,7 +83,7 @@ func TestBatcher(t *testing.T) {
Convey("When the max number of messages is reached", func() {
var messages []*utils.Message

for i := 0; i < int(batcher.config.Limit); i++ {
for i := 0; i < int(batcher.Config.Limit); i++ {
m := utils.NewMessage()
m.PushPayload([]byte("ABC"))
m.Opts = map[string]interface{}{
Expand All @@ -95,10 +95,10 @@ func TestBatcher(t *testing.T) {
}

nd := new(NexterDoner)
nd.nextCalled = make(chan *utils.Message)
nd.nextCalled = make(chan *utils.Message, 1)
nd.On("Next", mock.AnythingOfType("*utils.Message")).Times(1)

for i := 0; i < int(batcher.config.Limit); i++ {
for i := 0; i < int(batcher.Config.Limit); i++ {
batcher.OnMessage(messages[i], nd.Next, nil)
}

Expand All @@ -109,7 +109,7 @@ func TestBatcher(t *testing.T) {

So(err, ShouldBeNil)
So(string(data), ShouldEqual, "ABCABCABCABCABCABCABCABCABCABC")
So(m.Reports.Size(), ShouldEqual, batcher.config.Limit)
So(m.Reports.Size(), ShouldEqual, batcher.Config.Limit)
So(batcher.batches["group1"], ShouldBeNil)
So(len(batcher.batches), ShouldEqual, 0)
})
Expand Down Expand Up @@ -150,19 +150,23 @@ func TestBatcher(t *testing.T) {
})
})

Convey("When multiple messages are received with differente groups", func() {
Convey("When multiple messages are received with differents groups", func() {
m1 := utils.NewMessage()
m1.PushPayload([]byte("MESSAGE 1"))
m1.Reports.Push("Report 1")
m1.Opts = map[string]interface{}{
"batch_group": "group1",
}

m2 := utils.NewMessage()
m2.PushPayload([]byte("MESSAGE 2"))
m2.Reports.Push("Report 2")
m2.Opts = map[string]interface{}{
"batch_group": "group2",
}
m3 := utils.NewMessage()
m3.PushPayload([]byte("MESSAGE 3"))
m3.Reports.Push("Report 3")
m3.Opts = map[string]interface{}{
"batch_group": "group2",
}
Expand All @@ -189,15 +193,21 @@ func TestBatcher(t *testing.T) {
clk := batcher.clk.(*clock.Mock)
So(len(batcher.batches), ShouldEqual, 2)

clk.Add(time.Duration(batcher.config.TimeoutMillis) * time.Millisecond)
clk.Add(time.Duration(batcher.Config.TimeoutMillis) * time.Millisecond)

group1 := <-nd.nextCalled
group1Data, err := group1.PopPayload()
report1 := group1.Reports.Pop().(string)
So(err, ShouldBeNil)
So(report1, ShouldEqual, "Report 1")

group2 := <-nd.nextCalled
group2Data, err := group2.PopPayload()
So(err, ShouldBeNil)
report3 := group2.Reports.Pop().(string)
So(report3, ShouldEqual, "Report 3")
report2 := group2.Reports.Pop().(string)
So(report2, ShouldEqual, "Report 2")

So(string(group1Data), ShouldEqual, "MESSAGE 1")
So(string(group2Data), ShouldEqual, "MESSAGE 2MESSAGE 3")
Expand Down