From 97619dcfcabca0454411eafbcf557c5fcf732314 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Diego=20Fern=C3=A1ndez=20Barrera?= Date: Tue, 23 Aug 2016 10:02:32 +0200 Subject: [PATCH] :lipstick: Reorganize tests and benchmarks --- Makefile | 6 +- benchmarks_test.go | 178 +++++++++++++++++++++++ integration_test.go | 318 ++++++++++++++++++++++++++++++++++++++++++ pipeline_test.go | 1 + reportHandler_test.go | 1 + 5 files changed, 501 insertions(+), 3 deletions(-) create mode 100644 benchmarks_test.go create mode 100644 integration_test.go create mode 100644 pipeline_test.go create mode 100644 reportHandler_test.go diff --git a/Makefile b/Makefile index 95830e2..27de9bc 100644 --- a/Makefile +++ b/Makefile @@ -20,12 +20,12 @@ errcheck: errcheck -ignoretests -verbose ./... vet: - @printf "$(MKL_YELLOW)Runing go vet$(MKL_CLR_RESET)\n" + @printf "$(MKL_YELLOW)Running go vet$(MKL_CLR_RESET)\n" go vet ./... test: - @printf "$(MKL_YELLOW)Runing tests$(MKL_CLR_RESET)\n" - go test -race ./... + @printf "$(MKL_YELLOW)Running tests$(MKL_CLR_RESET)\n" + go test -race ./... -tags=integration @printf "$(MKL_GREEN)Test passed$(MKL_CLR_RESET)\n" coverage: diff --git a/benchmarks_test.go b/benchmarks_test.go new file mode 100644 index 0000000..af4e91f --- /dev/null +++ b/benchmarks_test.go @@ -0,0 +1,178 @@ +package rbforwarder + +import ( + "fmt" + "net/http" + "net/http/httptest" + "net/url" + "testing" + + "github.com/redBorder/rbforwarder/components/batch" + "github.com/redBorder/rbforwarder/components/httpsender" +) + +func NewTestClient(code int, cb func(*http.Request)) *http.Client { + server := httptest.NewServer(http.HandlerFunc( + func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(code) + cb(r) + })) + + transport := &http.Transport{ + Proxy: func(req *http.Request) (*url.URL, error) { + return url.Parse(server.URL) + }, + } + + return &http.Client{Transport: transport} +} + +func BenchmarkNoBatch(b *testing.B) { + var components []interface{} + var workers []int + + f := NewRBForwarder(Config{ + Retries: 3, + Backoff: 5, + QueueSize: 1, + }) + + batch := &batcher.Batcher{ + Config: batcher.Config{ + TimeoutMillis: 1000, + Limit: 10000, + }, + } + components = append(components, batch) + workers = append(workers, 1) + + sender := &httpsender.HTTPSender{ + URL: "http://localhost:8888", + Client: NewTestClient(200, func(r *http.Request) {}), + } + components = append(components, sender) + workers = append(workers, 1) + + f.PushComponents(components, workers) + f.Run() + + opts := map[string]interface{}{ + "http_endpoint": "librb-http", + "batch_group": "librb-http", + } + + for i := 0; i < b.N; i++ { + data := fmt.Sprintf("{\"message\": %d}", i) + f.Produce([]byte(data), opts, i) + } + + for report := range f.GetReports() { + r := report.(Report) + if r.Code > 0 { + b.FailNow() + } + if r.Opaque.(int) == b.N-1 { + break + } + } +} + +func BenchmarkLittleBatch(b *testing.B) { + var components []interface{} + var workers []int + + f := NewRBForwarder(Config{ + Retries: 3, + Backoff: 5, + QueueSize: b.N / 100, + }) + + batch := &batcher.Batcher{ + Config: batcher.Config{ + TimeoutMillis: 1000, + Limit: 10000, + }, + } + components = append(components, batch) + workers = append(workers, 1) + + sender := &httpsender.HTTPSender{ + URL: "http://localhost:8888", + Client: NewTestClient(200, func(r *http.Request) {}), + } + components = append(components, sender) + workers = append(workers, 1) + + f.PushComponents(components, workers) + f.Run() + + opts := map[string]interface{}{ + "http_endpoint": "librb-http", + "batch_group": "librb-http", + } + + for i := 0; i < b.N; i++ { + data := fmt.Sprintf("{\"message\": %d}", i) + f.Produce([]byte(data), opts, i) + } + + for report := range f.GetReports() { + r := report.(Report) + if r.Code > 0 { + b.FailNow() + } + if r.Opaque.(int) == b.N-1 { + break + } + } +} + +func BenchmarkBigBatch(b *testing.B) { + var components []interface{} + var workers []int + + f := NewRBForwarder(Config{ + Retries: 3, + Backoff: 5, + QueueSize: b.N / 10, + }) + + batch := &batcher.Batcher{ + Config: batcher.Config{ + TimeoutMillis: 1000, + Limit: 10000, + }, + } + components = append(components, batch) + workers = append(workers, 1) + + sender := &httpsender.HTTPSender{ + URL: "http://localhost:8888", + Client: NewTestClient(200, func(r *http.Request) {}), + } + components = append(components, sender) + workers = append(workers, 1) + + f.PushComponents(components, workers) + f.Run() + + opts := map[string]interface{}{ + "http_endpoint": "librb-http", + "batch_group": "librb-http", + } + + for i := 0; i < b.N; i++ { + data := fmt.Sprintf("{\"message\": %d}", i) + f.Produce([]byte(data), opts, i) + } + + for report := range f.GetReports() { + r := report.(Report) + if r.Code > 0 { + b.FailNow() + } + if r.Opaque.(int) == b.N-1 { + break + } + } +} diff --git a/integration_test.go b/integration_test.go new file mode 100644 index 0000000..654987b --- /dev/null +++ b/integration_test.go @@ -0,0 +1,318 @@ +// +build integration + +package rbforwarder + +import ( + "testing" + + "github.com/redBorder/rbforwarder/utils" + . "github.com/smartystreets/goconvey/convey" + "github.com/stretchr/testify/mock" +) + +type MockMiddleComponent struct { + mock.Mock +} + +func (c *MockMiddleComponent) Spawn(id int) utils.Composer { + args := c.Called() + return args.Get(0).(utils.Composer) +} + +func (c *MockMiddleComponent) OnMessage(m *utils.Message, done utils.Done) { + c.Called(m) + if data, err := m.PopPayload(); err == nil { + processedData := "-> [" + string(data) + "] <-" + m.PushPayload([]byte(processedData)) + } + + done(m, 0, "") +} + +type MockComponent struct { + mock.Mock + + channel chan string + + status string + statusCode int +} + +func (c *MockComponent) Spawn(id int) utils.Composer { + args := c.Called() + return args.Get(0).(utils.Composer) +} + +func (c *MockComponent) OnMessage(m *utils.Message, done utils.Done) { + c.Called(m) + if data, err := m.PopPayload(); err == nil { + c.channel <- string(data) + } else { + c.channel <- err.Error() + } + + done(m, c.statusCode, c.status) +} + +func TestRBForwarder(t *testing.T) { + Convey("Given a single component working pipeline", t, func() { + numMessages := 1000 + numWorkers := 100 + numRetries := 3 + + component := &MockComponent{ + channel: make(chan string, 10000), + } + + rbforwarder := NewRBForwarder(Config{ + Retries: numRetries, + QueueSize: numMessages, + }) + + component.On("Spawn").Return(component).Times(numWorkers) + + var components []interface{} + var instances []int + components = append(components, component) + instances = append(instances, numWorkers) + + rbforwarder.PushComponents(components, instances) + rbforwarder.Run() + + //////////////////////////////////////////////////////////////////////////// + + Convey("When a \"Hello World\" message is produced", func() { + component.status = "OK" + component.statusCode = 0 + + component.On("OnMessage", mock.AnythingOfType("*utils.Message")) + + err := rbforwarder.Produce( + []byte("Hello World"), + map[string]interface{}{"message_id": "test123"}, + nil, + ) + + Convey("Then \"Hello World\" message should be get by the worker", func() { + var lastReport Report + var reports int + for r := range rbforwarder.GetReports() { + reports++ + lastReport = r.(Report) + rbforwarder.Close() + } + + So(lastReport, ShouldNotBeNil) + So(reports, ShouldEqual, 1) + So(lastReport.Code, ShouldEqual, 0) + So(lastReport.Status, ShouldEqual, "OK") + So(err, ShouldBeNil) + + component.AssertExpectations(t) + }) + }) + + //////////////////////////////////////////////////////////////////////////// + + Convey("When a message is produced after close forwarder", func() { + rbforwarder.Close() + + err := rbforwarder.Produce( + []byte("Hello World"), + map[string]interface{}{"message_id": "test123"}, + nil, + ) + + Convey("Should error", func() { + So(err.Error(), ShouldEqual, "Forwarder has been closed") + }) + }) + + //////////////////////////////////////////////////////////////////////////// + + Convey("When calling OnMessage() with opaque", func() { + component.On("OnMessage", mock.AnythingOfType("*utils.Message")) + + err := rbforwarder.Produce( + []byte("Hello World"), + nil, + "This is an opaque", + ) + + Convey("Should be possible to read the opaque", func() { + So(err, ShouldBeNil) + + var reports int + var lastReport Report + for r := range rbforwarder.GetReports() { + reports++ + lastReport = r.(Report) + rbforwarder.Close() + } + + opaque := lastReport.Opaque.(string) + So(opaque, ShouldEqual, "This is an opaque") + }) + }) + + //////////////////////////////////////////////////////////////////////////// + + Convey("When a message fails to send", func() { + component.status = "Fake Error" + component.statusCode = 99 + + component.On("OnMessage", mock.AnythingOfType("*utils.Message")).Times(4) + + err := rbforwarder.Produce( + []byte("Hello World"), + map[string]interface{}{"message_id": "test123"}, + nil, + ) + + Convey("The message should be retried", func() { + So(err, ShouldBeNil) + + var reports int + var lastReport Report + for r := range rbforwarder.GetReports() { + reports++ + lastReport = r.(Report) + rbforwarder.Close() + } + + So(lastReport, ShouldNotBeNil) + So(reports, ShouldEqual, 1) + So(lastReport.Status, ShouldEqual, "Fake Error") + So(lastReport.Code, ShouldEqual, 99) + So(lastReport.retries, ShouldEqual, numRetries) + + component.AssertExpectations(t) + }) + }) + + //////////////////////////////////////////////////////////////////////////// + + Convey("When multiple messages are produced", func() { + var numErr int + + component.On("OnMessage", mock.AnythingOfType("*utils.Message")). + Return(nil). + Times(numMessages) + + for i := 0; i < numMessages; i++ { + if err := rbforwarder.Produce([]byte("Hello World"), + nil, + i, + ); err != nil { + numErr++ + } + } + + Convey("Then reports should be received", func() { + var reports int + for range rbforwarder.GetReports() { + reports++ + if reports >= numMessages { + rbforwarder.Close() + } + } + + So(numErr, ShouldBeZeroValue) + So(reports, ShouldEqual, numMessages) + + component.AssertExpectations(t) + }) + + Convey("Then reports should be received in order", func() { + ordered := true + var reports int + + for rep := range rbforwarder.GetOrderedReports() { + if rep.(Report).Opaque.(int) != reports { + ordered = false + } + reports++ + if reports >= numMessages { + rbforwarder.Close() + } + } + + So(numErr, ShouldBeZeroValue) + So(ordered, ShouldBeTrue) + So(reports, ShouldEqual, numMessages) + + component.AssertExpectations(t) + }) + }) + }) + + Convey("Given a multi-component working pipeline", t, func() { + numMessages := 100 + numWorkers := 3 + numRetries := 3 + + component1 := &MockMiddleComponent{} + component2 := &MockComponent{ + channel: make(chan string, 10000), + } + + rbforwarder := NewRBForwarder(Config{ + Retries: numRetries, + QueueSize: numMessages, + }) + + for i := 0; i < numWorkers; i++ { + component1.On("Spawn").Return(component1) + component2.On("Spawn").Return(component2) + } + + var components []interface{} + var instances []int + + components = append(components, component1) + components = append(components, component2) + + instances = append(instances, numWorkers) + instances = append(instances, numWorkers) + + rbforwarder.PushComponents(components, instances) + rbforwarder.Run() + + Convey("When a \"Hello World\" message is produced", func() { + component2.status = "OK" + component2.statusCode = 0 + + component1.On("OnMessage", mock.AnythingOfType("*utils.Message")) + component2.On("OnMessage", mock.AnythingOfType("*utils.Message")) + + err := rbforwarder.Produce( + []byte("Hello World"), + map[string]interface{}{"message_id": "test123"}, + nil, + ) + + rbforwarder.Close() + + Convey("\"Hello World\" message should be processed by the pipeline", func() { + reports := 0 + for rep := range rbforwarder.GetReports() { + reports++ + + code := rep.(Report).Code + status := rep.(Report).Status + So(code, ShouldEqual, 0) + So(status, ShouldEqual, "OK") + } + + m := <-component2.channel + + So(err, ShouldBeNil) + So(reports, ShouldEqual, 1) + So(m, ShouldEqual, "-> [Hello World] <-") + + component1.AssertExpectations(t) + component2.AssertExpectations(t) + }) + }) + }) +} diff --git a/pipeline_test.go b/pipeline_test.go new file mode 100644 index 0000000..92407a5 --- /dev/null +++ b/pipeline_test.go @@ -0,0 +1 @@ +package rbforwarder diff --git a/reportHandler_test.go b/reportHandler_test.go new file mode 100644 index 0000000..92407a5 --- /dev/null +++ b/reportHandler_test.go @@ -0,0 +1 @@ +package rbforwarder