-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Closes #10
- Loading branch information
Showing
4 changed files
with
370 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,70 @@ | ||
package batcher | ||
|
||
import ( | ||
"bytes" | ||
"errors" | ||
"time" | ||
|
||
"github.com/benbjohnson/clock" | ||
"github.com/oleiade/lane" | ||
"github.com/redBorder/rbforwarder/types" | ||
) | ||
|
||
// BatchMessage contains multiple messages | ||
type BatchMessage struct { | ||
Group string // Name used to group messages | ||
Buff *bytes.Buffer // Buffer for merge multiple messages | ||
MessageCount uint // Current number of messages in the buffer | ||
BytesCount uint // Current number of bytes in the buffer | ||
Next types.Next // Call to pass the message to the next handler | ||
Opts *lane.Stack // Original messages options | ||
} | ||
|
||
// StartTimeout initializes a timeout used to send the messages when expires. No | ||
// matters how many messages are in the buffer. | ||
func (b *BatchMessage) StartTimeout(clk clock.Clock, timeoutMillis uint, ready chan *BatchMessage) *BatchMessage { | ||
if clk == nil { | ||
clk = clock.New() | ||
} | ||
|
||
if timeoutMillis != 0 { | ||
timer := clk.Timer(time.Duration(timeoutMillis) * time.Millisecond) | ||
|
||
go func() { | ||
<-timer.C | ||
if b.MessageCount > 0 { | ||
ready <- b | ||
} | ||
}() | ||
} | ||
return b | ||
} | ||
|
||
// Send the batch of messages to the next handler in the pipeline | ||
func (b *BatchMessage) Send(cb func()) { | ||
cb() | ||
b.Next(b) | ||
} | ||
|
||
// Write merges a new message in the buffer | ||
func (b *BatchMessage) Write(data []byte) { | ||
b.Buff.Write(data) | ||
b.MessageCount++ | ||
} | ||
|
||
// PopData returns the buffer with all the messages merged | ||
func (b *BatchMessage) PopData() (ret []byte, err error) { | ||
return b.Buff.Bytes(), nil | ||
} | ||
|
||
// PopOpts get the data stored by the previous handler | ||
func (b *BatchMessage) PopOpts() (ret map[string]interface{}, err error) { | ||
if b.Opts.Empty() { | ||
err = errors.New("Empty stack") | ||
return | ||
} | ||
|
||
ret = b.Opts.Pop().(map[string]interface{}) | ||
|
||
return | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,100 @@ | ||
package batcher | ||
|
||
import ( | ||
"bytes" | ||
"sync" | ||
|
||
"github.com/benbjohnson/clock" | ||
"github.com/oleiade/lane" | ||
"github.com/redBorder/rbforwarder/types" | ||
) | ||
|
||
// Batcher allows to merge multiple messages in a single one | ||
type Batcher struct { | ||
id int // Worker ID | ||
batches map[string]*BatchMessage // Collection of batches pending | ||
newBatches chan *BatchMessage // Send messages to sender gorutine | ||
pendingBatches chan *BatchMessage // Send messages to sender gorutine | ||
wg sync.WaitGroup | ||
clk clock.Clock | ||
|
||
config Config // Batcher configuration | ||
} | ||
|
||
// Init starts a gorutine that can receive: | ||
// - New messages that will be added to a existing or new batch of messages | ||
// - A batch of messages that is ready to send (i.e. batch timeout has expired) | ||
func (b *Batcher) Init(id int) { | ||
b.batches = make(map[string]*BatchMessage) | ||
b.newBatches = make(chan *BatchMessage) | ||
b.pendingBatches = make(chan *BatchMessage) | ||
|
||
readyBatches := make(chan *BatchMessage) | ||
|
||
if b.clk == nil { | ||
b.clk = clock.New() | ||
} | ||
|
||
go func() { | ||
for { | ||
select { | ||
case batchMessage := <-readyBatches: | ||
batchMessage.Send(func() { | ||
delete(b.batches, batchMessage.Group) | ||
}) | ||
|
||
case batchMessage := <-b.newBatches: | ||
batchMessage.StartTimeout(b.clk, b.config.TimeoutMillis, readyBatches) | ||
b.batches[batchMessage.Group] = batchMessage | ||
b.wg.Done() | ||
|
||
case batchMessage := <-b.pendingBatches: | ||
opts, err := batchMessage.PopOpts() | ||
if err != nil { | ||
break | ||
} | ||
|
||
b.batches[batchMessage.Group].Opts.Push(opts) | ||
b.batches[batchMessage.Group].Write(batchMessage.Buff.Bytes()) | ||
|
||
if b.batches[batchMessage.Group].MessageCount >= b.config.Limit { | ||
b.batches[batchMessage.Group].Send(func() { | ||
delete(b.batches, batchMessage.Group) | ||
}) | ||
} | ||
|
||
b.wg.Done() | ||
} | ||
} | ||
}() | ||
} | ||
|
||
// OnMessage is called when a new message is receive. Add the new message to | ||
// a batch | ||
func (b *Batcher) OnMessage(m types.Messenger, next types.Next, done types.Done) { | ||
opts, _ := m.PopOpts() | ||
group, ok := opts["batch_group"].(string) | ||
if !ok { | ||
next(m) | ||
} | ||
|
||
data, _ := m.PopData() | ||
batchMessage := &BatchMessage{ | ||
Buff: bytes.NewBuffer(data), | ||
Opts: lane.NewStack(), | ||
MessageCount: 1, | ||
Next: next, | ||
Group: group, | ||
} | ||
|
||
batchMessage.Opts.Push(opts) | ||
|
||
b.wg.Add(1) | ||
if _, exists := b.batches[group]; exists { | ||
b.pendingBatches <- batchMessage | ||
} else { | ||
b.newBatches <- batchMessage | ||
} | ||
|
||
b.wg.Wait() | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,192 @@ | ||
package batcher | ||
|
||
import ( | ||
"testing" | ||
"time" | ||
|
||
"github.com/benbjohnson/clock" | ||
"github.com/redBorder/rbforwarder/types" | ||
. "github.com/smartystreets/goconvey/convey" | ||
"github.com/stretchr/testify/mock" | ||
) | ||
|
||
type NexterDoner struct { | ||
mock.Mock | ||
nextCalled chan struct{} | ||
} | ||
|
||
func (nd *NexterDoner) Next(m types.Messenger) { | ||
nd.nextCalled <- struct{}{} | ||
nd.Called(m) | ||
} | ||
|
||
func (nd *NexterDoner) Done(m types.Messenger, code int, status string) { | ||
nd.Called(m, code, status) | ||
} | ||
|
||
type TestMessage struct { | ||
mock.Mock | ||
} | ||
|
||
func (m *TestMessage) PopData() (data []byte, err error) { | ||
args := m.Called() | ||
|
||
return args.Get(0).([]byte), args.Error(1) | ||
} | ||
|
||
func (m *TestMessage) PopOpts() (opts map[string]interface{}, err error) { | ||
args := m.Called() | ||
|
||
return args.Get(0).(map[string]interface{}), args.Error(1) | ||
} | ||
|
||
func TestRBForwarder(t *testing.T) { | ||
Convey("Given a batcher", t, func() { | ||
batcher := &Batcher{config: Config{ | ||
TimeoutMillis: 1000, | ||
Limit: 10, | ||
MaxPendingBatches: 10, | ||
}, | ||
clk: clock.NewMock(), | ||
} | ||
|
||
batcher.Init(0) | ||
|
||
Convey("When a message is received, but not yet sent", func() { | ||
m := new(TestMessage) | ||
m.On("PopData").Return([]byte("Hello World"), nil) | ||
m.On("PopOpts").Return(map[string]interface{}{ | ||
"batch_group": "group1", | ||
}, nil) | ||
|
||
batcher.OnMessage(m, nil, nil) | ||
|
||
Convey("Message should be present on the batch", func() { | ||
var data *BatchMessage | ||
var exists bool | ||
data, exists = batcher.batches["group1"] | ||
|
||
So(exists, ShouldBeTrue) | ||
So(string(data.Buff.Bytes()), ShouldEqual, "Hello World") | ||
So(len(batcher.batches), ShouldEqual, 1) | ||
|
||
m.AssertExpectations(t) | ||
}) | ||
}) | ||
|
||
Convey("When the max number of messages is reached", func() { | ||
m := new(TestMessage) | ||
m.On("PopData").Return([]byte("ABC"), nil) | ||
m.On("PopOpts").Return(map[string]interface{}{ | ||
"batch_group": "group1", | ||
}, nil) | ||
|
||
nd := new(NexterDoner) | ||
nd.nextCalled = make(chan struct{}, 1) | ||
nd.On("Next", mock.MatchedBy(func(m *BatchMessage) bool { | ||
data, _ := m.PopData() | ||
return string(data) == "ABCABCABCABCABCABCABCABCABCABC" | ||
})).Times(1) | ||
|
||
for i := 0; i < int(batcher.config.Limit); i++ { | ||
batcher.OnMessage(m, nd.Next, nil) | ||
} | ||
|
||
Convey("The batch should be sent", func() { | ||
nd.AssertExpectations(t) | ||
<-nd.nextCalled | ||
So(batcher.batches["group1"], ShouldBeNil) | ||
So(len(batcher.batches), ShouldEqual, 0) | ||
}) | ||
}) | ||
|
||
Convey("When the timeout expires", func() { | ||
m := new(TestMessage) | ||
m.On("PopData").Return([]byte("ABC"), nil) | ||
m.On("PopOpts").Return(map[string]interface{}{ | ||
"batch_group": "group1", | ||
}, nil) | ||
|
||
nd := new(NexterDoner) | ||
nd.nextCalled = make(chan struct{}, 1) | ||
nd.On("Next", mock.MatchedBy(func(m *BatchMessage) bool { | ||
data, _ := m.PopData() | ||
return string(data) == "ABCABCABCABCABC" | ||
})).Times(1) | ||
|
||
for i := 0; i < 5; i++ { | ||
batcher.OnMessage(m, nd.Next, nil) | ||
} | ||
|
||
clk := batcher.clk.(*clock.Mock) | ||
|
||
Convey("The batch should be sent", func() { | ||
clk.Add(500 * time.Millisecond) | ||
So(batcher.batches["group1"], ShouldNotBeNil) | ||
clk.Add(500 * time.Millisecond) | ||
<-nd.nextCalled | ||
So(batcher.batches["group1"], ShouldBeNil) | ||
So(len(batcher.batches), ShouldEqual, 0) | ||
nd.AssertExpectations(t) | ||
}) | ||
}) | ||
|
||
Convey("When multiple messages are received with differente groups", func() { | ||
m1 := new(TestMessage) | ||
m1.On("PopData").Return([]byte("MESSAGE 1"), nil) | ||
m1.On("PopOpts").Return(map[string]interface{}{ | ||
"batch_group": "group1", | ||
}, nil) | ||
|
||
m2 := new(TestMessage) | ||
m2.On("PopData").Return([]byte("MESSAGE 2"), nil) | ||
m2.On("PopOpts").Return(map[string]interface{}{ | ||
"batch_group": "group2", | ||
}, nil) | ||
|
||
m3 := new(TestMessage) | ||
m3.On("PopData").Return([]byte("MESSAGE 3"), nil) | ||
m3.On("PopOpts").Return(map[string]interface{}{ | ||
"batch_group": "group2", | ||
}, nil) | ||
|
||
nd := new(NexterDoner) | ||
nd.nextCalled = make(chan struct{}, 2) | ||
nd.On("Next", mock.AnythingOfType("*batcher.BatchMessage")).Times(2) | ||
|
||
for i := 0; i < 3; i++ { | ||
batcher.OnMessage(m1, nd.Next, nil) | ||
} | ||
for i := 0; i < 3; i++ { | ||
batcher.OnMessage(m2, nd.Next, nil) | ||
} | ||
batcher.OnMessage(m3, nd.Next, nil) | ||
|
||
Convey("Each message should be in its group", func() { | ||
var err error | ||
group1, err := batcher.batches["group1"].PopData() | ||
So(err, ShouldBeNil) | ||
group2, err := batcher.batches["group2"].PopData() | ||
So(err, ShouldBeNil) | ||
So(string(group1), ShouldEqual, "MESSAGE 1MESSAGE 1MESSAGE 1") | ||
So(string(group2), ShouldEqual, "MESSAGE 2MESSAGE 2MESSAGE 2MESSAGE 3") | ||
So(len(batcher.batches), ShouldEqual, 2) | ||
m1.AssertExpectations(t) | ||
m2.AssertExpectations(t) | ||
m3.AssertExpectations(t) | ||
}) | ||
|
||
Convey("After a timeout the messages should be sent", func() { | ||
clk := batcher.clk.(*clock.Mock) | ||
So(len(batcher.batches), ShouldEqual, 2) | ||
clk.Add(1000 * time.Second) | ||
<-nd.nextCalled | ||
<-nd.nextCalled | ||
So(batcher.batches["group1"], ShouldBeNil) | ||
So(batcher.batches["group2"], ShouldBeNil) | ||
So(len(batcher.batches), ShouldEqual, 0) | ||
nd.AssertExpectations(t) | ||
}) | ||
}) | ||
}) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
package batcher | ||
|
||
// Config stores the config for a Batcher | ||
type Config struct { | ||
TimeoutMillis uint | ||
Limit uint | ||
MaxPendingBatches uint | ||
} |