diff --git a/backend.go b/backend.go index 6d022de..265ee34 100644 --- a/backend.go +++ b/backend.go @@ -7,7 +7,8 @@ import ( "github.com/redBorder/rbforwarder/pipeline" ) -type backend struct { +// Backend orchestrates the pipeline +type Backend struct { sender pipeline.Sender // Pool of workers @@ -21,7 +22,7 @@ type backend struct { messagePool chan *pipeline.Message workers int - queue int + queueSize int maxMessages int maxBytes int @@ -32,17 +33,25 @@ type backend struct { keepSending chan struct{} } -func (b *backend) Init() { +// NewBackend creates a new Backend +func NewBackend(workers, queueSize, maxMessages, maxBytes int) *Backend { + b := &Backend{ + workers: workers, + queueSize: queueSize, + maxMessages: maxMessages, + maxBytes: maxBytes, + } + b.senderPool = make(chan chan *pipeline.Message, b.workers) b.messages = make(chan *pipeline.Message) b.input = make(chan *pipeline.Message) b.reports = make(chan *pipeline.Message) - b.messagePool = make(chan *pipeline.Message, b.queue) + b.messagePool = make(chan *pipeline.Message, b.queueSize) b.keepSending = make(chan struct{}) - for i := 0; i < b.queue; i++ { + for i := 0; i < b.queueSize; i++ { b.messagePool <- &pipeline.Message{ Metadata: make(map[string]interface{}), InputBuffer: new(bytes.Buffer), @@ -50,6 +59,11 @@ func (b *backend) Init() { } } + return b +} + +// Init initializes a backend +func (b *Backend) Init() { for i := 0; i < b.workers; i++ { b.startSender(i) } @@ -102,7 +116,7 @@ func (b *backend) Init() { } // Worker that sends the message -func (b *backend) startSender(i int) { +func (b *Backend) startSender(i int) { sender := b.sender sender.Init(i, b.reports) diff --git a/rbforwarder.go b/rbforwarder.go index 1739889..d7a2a41 100644 --- a/rbforwarder.go +++ b/rbforwarder.go @@ -20,7 +20,7 @@ var Logger = logrus.NewEntry(log) // send messages and get reports. It has a backend for routing messages between // workers type RBForwarder struct { - backend *backend + backend *Backend reportHandler *reportHandler reports chan pipeline.Report counter uint64 @@ -30,15 +30,8 @@ type RBForwarder struct { // NewRBForwarder creates a new Forwarder object func NewRBForwarder(config Config) *RBForwarder { - backend := &backend{ - workers: config.Workers, - queue: config.QueueSize, - maxMessages: config.MaxMessages, - maxBytes: config.MaxBytes, - } - forwarder := &RBForwarder{ - backend: backend, + backend: NewBackend(config.Workers, config.QueueSize, config.MaxMessages, config.MaxBytes), reports: make(chan pipeline.Report, config.QueueSize), config: config, } @@ -47,7 +40,7 @@ func NewRBForwarder(config Config) *RBForwarder { config.Retries, config.Backoff, config.QueueSize, - backend.input, + forwarder.backend.input, ) fields := logrus.Fields{ diff --git a/rbforwarder_test.go b/rbforwarder_test.go index 6bd79a5..ddb1a5f 100644 --- a/rbforwarder_test.go +++ b/rbforwarder_test.go @@ -7,47 +7,57 @@ import ( "github.com/redBorder/rbforwarder/pipeline" . "github.com/smartystreets/goconvey/convey" + "github.com/stretchr/testify/mock" ) -type TestSender struct { +type MockSender struct { + mock.Mock channel chan string reports chan *pipeline.Message } -func (tsender *TestSender) Init(id int, reports chan *pipeline.Message) error { - tsender.reports = reports - return nil +func (s *MockSender) Init(id int, reports chan *pipeline.Message) error { + args := s.Called(id, reports) + s.reports = reports + return args.Error(0) } -func (tsender *TestSender) OnMessage(m *pipeline.Message) error { - // time.Sleep((time.Millisecond * 10) * time.Duration(rand.Int31n(50))) +func (s *MockSender) OnMessage(m *pipeline.Message) error { + s.channel <- string(m.InputBuffer.Bytes()) + s.reports <- m - tsender.channel <- string(m.InputBuffer.Bytes()) - m.Report.StatusCode = 0 - m.Report.Status = "OK" - tsender.reports <- m + args := s.Called(m) - return nil + return args.Error(0) } func TestBackend(t *testing.T) { Convey("Given a working pipeline", t, func() { numMessages := 10000 + numWorkers := 10 - sender := &TestSender{ + sender := &MockSender{ channel: make(chan string, 10000), } rbforwarder := NewRBForwarder(Config{ - Retries: 1, - Workers: 10, + Retries: 0, + Workers: numWorkers, QueueSize: 10000, }) + for i := 0; i < numWorkers; i++ { + sender.On("Init", i, rbforwarder.backend.reports).Return(nil) + } + rbforwarder.SetSender(sender) rbforwarder.Start() Convey("When 10000 messages are produced", func() { + sender.On("OnMessage", mock.AnythingOfType("*pipeline.Message")). + Return(nil). + Times(numMessages) + go func() { for i := 0; i < numMessages; i++ { if err := rbforwarder.Produce([]byte(""), map[string]interface{}{ @@ -67,6 +77,7 @@ func TestBackend(t *testing.T) { } So(i, ShouldEqual, numMessages) + sender.AssertExpectations(t) }) Convey("10000 reports should be received", func() { @@ -78,6 +89,7 @@ func TestBackend(t *testing.T) { } So(i, ShouldEqual, numMessages) + sender.AssertExpectations(t) }) Convey("10000 reports should be received in order", func() { @@ -96,6 +108,7 @@ func TestBackend(t *testing.T) { So(err, ShouldBeNil) So(i, ShouldEqual, numMessages) + sender.AssertExpectations(t) }) }) @@ -107,16 +120,13 @@ func TestBackend(t *testing.T) { } Convey("\"Hello World\" message should be get by the worker", func() { - message := <-sender.channel - So(message, ShouldEqual, "Hello World") - }) + sender.On("OnMessage", mock.MatchedBy(func(m *pipeline.Message) bool { + return m.Metadata["message_id"] == "test123" + })).Return(nil) - Convey("A report of the \"Hello World\" message should be received", func() { report := <-rbforwarder.GetReports() - So(report.StatusCode, ShouldEqual, 0) - So(report.Status, ShouldEqual, "OK") - So(report.ID, ShouldEqual, 0) So(report.Metadata["message_id"], ShouldEqual, "test123") + sender.AssertExpectations(t) }) })