diff --git a/components/batcher/batch.go b/components/batcher/batch.go new file mode 100644 index 0000000..ef50eb8 --- /dev/null +++ b/components/batcher/batch.go @@ -0,0 +1,75 @@ +package batcher + +import ( + "bytes" + "errors" + "time" + + "github.com/benbjohnson/clock" + "github.com/oleiade/lane" + "github.com/redBorder/rbforwarder/types" +) + +// BatchMessage contains multiple messages +type BatchMessage struct { + Group string // Name used to group messages + Buff *bytes.Buffer // Buffer for merge multiple messages + MessageCount uint // Current number of messages in the buffer + BytesCount uint // Current number of bytes in the buffer + Next types.Next // Call to pass the message to the next handler + Opts *lane.Stack // Original messages options +} + +// StartTimeout initializes a timeout used to send the messages when expires. No +// matters how many messages are in the buffer. +func (b *BatchMessage) StartTimeout(clk clock.Clock, timeoutMillis uint, ready chan *BatchMessage) *BatchMessage { + if clk == nil { + clk = clock.New() + } + + if timeoutMillis != 0 { + timer := clk.Timer(time.Duration(timeoutMillis) * time.Millisecond) + + go func() { + <-timer.C + if b.MessageCount > 0 { + ready <- b + } + }() + } + return b +} + +// Send the batch of messages to the next handler in the pipeline +func (b *BatchMessage) Send(cb func()) { + cb() + b.Next(b) +} + +// Write merges a new message in the buffer +func (b *BatchMessage) Write(data []byte) { + b.Buff.Write(data) + b.MessageCount++ +} + +// PopData returns the buffer with all the messages merged +func (b *BatchMessage) PopData() (ret []byte, err error) { + return b.Buff.Bytes(), nil +} + +// PopOpts get the data stored by the previous handler +func (b *BatchMessage) PopOpts() (ret map[string]interface{}, err error) { + if b.Opts.Empty() { + err = errors.New("Empty stack") + return + } + + ret = b.Opts.Pop().(map[string]interface{}) + + return +} + +// Reports do nothing +func (b *BatchMessage) Reports() []types.Reporter { + return nil +} diff --git a/components/batcher/batcher.go b/components/batcher/batcher.go new file mode 100644 index 0000000..1303d41 --- /dev/null +++ b/components/batcher/batcher.go @@ -0,0 +1,100 @@ +package batcher + +import ( + "bytes" + "sync" + + "github.com/benbjohnson/clock" + "github.com/oleiade/lane" + "github.com/redBorder/rbforwarder/types" +) + +// Batcher allows to merge multiple messages in a single one +type Batcher struct { + id int // Worker ID + batches map[string]*BatchMessage // Collection of batches pending + newBatches chan *BatchMessage // Send messages to sender gorutine + pendingBatches chan *BatchMessage // Send messages to sender gorutine + wg sync.WaitGroup + clk clock.Clock + + config Config // Batcher configuration +} + +// Init starts a gorutine that can receive: +// - New messages that will be added to a existing or new batch of messages +// - A batch of messages that is ready to send (i.e. batch timeout has expired) +func (b *Batcher) Init(id int) { + b.batches = make(map[string]*BatchMessage) + b.newBatches = make(chan *BatchMessage) + b.pendingBatches = make(chan *BatchMessage) + + readyBatches := make(chan *BatchMessage) + + if b.clk == nil { + b.clk = clock.New() + } + + go func() { + for { + select { + case batchMessage := <-readyBatches: + batchMessage.Send(func() { + delete(b.batches, batchMessage.Group) + }) + + case batchMessage := <-b.newBatches: + batchMessage.StartTimeout(b.clk, b.config.TimeoutMillis, readyBatches) + b.batches[batchMessage.Group] = batchMessage + b.wg.Done() + + case batchMessage := <-b.pendingBatches: + opts, err := batchMessage.PopOpts() + if err != nil { + break + } + + b.batches[batchMessage.Group].Opts.Push(opts) + b.batches[batchMessage.Group].Write(batchMessage.Buff.Bytes()) + + if b.batches[batchMessage.Group].MessageCount >= b.config.Limit { + b.batches[batchMessage.Group].Send(func() { + delete(b.batches, batchMessage.Group) + }) + } + + b.wg.Done() + } + } + }() +} + +// OnMessage is called when a new message is receive. Add the new message to +// a batch +func (b *Batcher) OnMessage(m types.Messenger, next types.Next, done types.Done) { + opts, _ := m.PopOpts() + group, ok := opts["batch_group"].(string) + if !ok { + next(m) + } + + data, _ := m.PopData() + batchMessage := &BatchMessage{ + Buff: bytes.NewBuffer(data), + Opts: lane.NewStack(), + MessageCount: 1, + Next: next, + Group: group, + } + + batchMessage.Opts.Push(opts) + + b.wg.Add(1) + if _, exists := b.batches[group]; exists { + b.pendingBatches <- batchMessage + } else { + b.newBatches <- batchMessage + } + + b.wg.Wait() +} diff --git a/components/batcher/batcher_test.go b/components/batcher/batcher_test.go new file mode 100644 index 0000000..a98ed89 --- /dev/null +++ b/components/batcher/batcher_test.go @@ -0,0 +1,196 @@ +package batcher + +import ( + "testing" + "time" + + "github.com/benbjohnson/clock" + "github.com/redBorder/rbforwarder/types" + . "github.com/smartystreets/goconvey/convey" + "github.com/stretchr/testify/mock" +) + +type NexterDoner struct { + mock.Mock + nextCalled chan struct{} +} + +func (nd *NexterDoner) Next(m types.Messenger) { + nd.nextCalled <- struct{}{} + nd.Called(m) +} + +func (nd *NexterDoner) Done(m types.Messenger, code int, status string) { + nd.Called(m, code, status) +} + +type TestMessage struct { + mock.Mock +} + +func (m *TestMessage) PopData() (data []byte, err error) { + args := m.Called() + + return args.Get(0).([]byte), args.Error(1) +} + +func (m *TestMessage) PopOpts() (opts map[string]interface{}, err error) { + args := m.Called() + + return args.Get(0).(map[string]interface{}), args.Error(1) +} + +func (m *TestMessage) Reports() []types.Reporter { + return nil +} + +func TestRBForwarder(t *testing.T) { + Convey("Given a batcher", t, func() { + batcher := &Batcher{config: Config{ + TimeoutMillis: 1000, + Limit: 10, + MaxPendingBatches: 10, + }, + clk: clock.NewMock(), + } + + batcher.Init(0) + + Convey("When a message is received, but not yet sent", func() { + m := new(TestMessage) + m.On("PopData").Return([]byte("Hello World"), nil) + m.On("PopOpts").Return(map[string]interface{}{ + "batch_group": "group1", + }, nil) + + batcher.OnMessage(m, nil, nil) + + Convey("Message should be present on the batch", func() { + var data *BatchMessage + var exists bool + data, exists = batcher.batches["group1"] + + So(exists, ShouldBeTrue) + So(string(data.Buff.Bytes()), ShouldEqual, "Hello World") + So(len(batcher.batches), ShouldEqual, 1) + + m.AssertExpectations(t) + }) + }) + + Convey("When the max number of messages is reached", func() { + m := new(TestMessage) + m.On("PopData").Return([]byte("ABC"), nil) + m.On("PopOpts").Return(map[string]interface{}{ + "batch_group": "group1", + }, nil) + + nd := new(NexterDoner) + nd.nextCalled = make(chan struct{}, 1) + nd.On("Next", mock.MatchedBy(func(m *BatchMessage) bool { + data, _ := m.PopData() + return string(data) == "ABCABCABCABCABCABCABCABCABCABC" + })).Times(1) + + for i := 0; i < int(batcher.config.Limit); i++ { + batcher.OnMessage(m, nd.Next, nil) + } + + Convey("The batch should be sent", func() { + nd.AssertExpectations(t) + <-nd.nextCalled + So(batcher.batches["group1"], ShouldBeNil) + So(len(batcher.batches), ShouldEqual, 0) + }) + }) + + Convey("When the timeout expires", func() { + m := new(TestMessage) + m.On("PopData").Return([]byte("ABC"), nil) + m.On("PopOpts").Return(map[string]interface{}{ + "batch_group": "group1", + }, nil) + + nd := new(NexterDoner) + nd.nextCalled = make(chan struct{}, 1) + nd.On("Next", mock.MatchedBy(func(m *BatchMessage) bool { + data, _ := m.PopData() + return string(data) == "ABCABCABCABCABC" + })).Times(1) + + for i := 0; i < 5; i++ { + batcher.OnMessage(m, nd.Next, nil) + } + + clk := batcher.clk.(*clock.Mock) + + Convey("The batch should be sent", func() { + clk.Add(500 * time.Millisecond) + So(batcher.batches["group1"], ShouldNotBeNil) + clk.Add(500 * time.Millisecond) + <-nd.nextCalled + So(batcher.batches["group1"], ShouldBeNil) + So(len(batcher.batches), ShouldEqual, 0) + nd.AssertExpectations(t) + }) + }) + + Convey("When multiple messages are received with differente groups", func() { + m1 := new(TestMessage) + m1.On("PopData").Return([]byte("MESSAGE 1"), nil) + m1.On("PopOpts").Return(map[string]interface{}{ + "batch_group": "group1", + }, nil) + + m2 := new(TestMessage) + m2.On("PopData").Return([]byte("MESSAGE 2"), nil) + m2.On("PopOpts").Return(map[string]interface{}{ + "batch_group": "group2", + }, nil) + + m3 := new(TestMessage) + m3.On("PopData").Return([]byte("MESSAGE 3"), nil) + m3.On("PopOpts").Return(map[string]interface{}{ + "batch_group": "group2", + }, nil) + + nd := new(NexterDoner) + nd.nextCalled = make(chan struct{}, 2) + nd.On("Next", mock.AnythingOfType("*batcher.BatchMessage")).Times(2) + + for i := 0; i < 3; i++ { + batcher.OnMessage(m1, nd.Next, nil) + } + for i := 0; i < 3; i++ { + batcher.OnMessage(m2, nd.Next, nil) + } + batcher.OnMessage(m3, nd.Next, nil) + + Convey("Each message should be in its group", func() { + var err error + group1, err := batcher.batches["group1"].PopData() + So(err, ShouldBeNil) + group2, err := batcher.batches["group2"].PopData() + So(err, ShouldBeNil) + So(string(group1), ShouldEqual, "MESSAGE 1MESSAGE 1MESSAGE 1") + So(string(group2), ShouldEqual, "MESSAGE 2MESSAGE 2MESSAGE 2MESSAGE 3") + So(len(batcher.batches), ShouldEqual, 2) + m1.AssertExpectations(t) + m2.AssertExpectations(t) + m3.AssertExpectations(t) + }) + + Convey("After a timeout the messages should be sent", func() { + clk := batcher.clk.(*clock.Mock) + So(len(batcher.batches), ShouldEqual, 2) + clk.Add(1000 * time.Second) + <-nd.nextCalled + <-nd.nextCalled + So(batcher.batches["group1"], ShouldBeNil) + So(batcher.batches["group2"], ShouldBeNil) + So(len(batcher.batches), ShouldEqual, 0) + nd.AssertExpectations(t) + }) + }) + }) +} diff --git a/components/batcher/config.go b/components/batcher/config.go new file mode 100644 index 0000000..f992080 --- /dev/null +++ b/components/batcher/config.go @@ -0,0 +1,8 @@ +package batcher + +// Config stores the config for a Batcher +type Config struct { + TimeoutMillis uint + Limit uint + MaxPendingBatches uint +}