diff --git a/components/httpsender/httpSender.go b/components/httpsender/httpSender.go new file mode 100644 index 0000000..6505806 --- /dev/null +++ b/components/httpsender/httpSender.go @@ -0,0 +1,71 @@ +package httpsender + +import ( + "bytes" + "errors" + "io" + "io/ioutil" + "net/http" + + "github.com/asaskevich/govalidator" + "github.com/redBorder/rbforwarder/types" +) + +// HTTPSender is a component for the rbforwarder pipeline that sends messages +// to an HTTP endpoint. It's a final component, so it will call Done() instead +// of Next() and further components shuld not be added after this component. +type HTTPSender struct { + id int + err error + URL string + client *http.Client +} + +// Init initializes the HTTP component +func (s *HTTPSender) Init(id int) { + s.id = id + + if govalidator.IsURL(s.URL) { + s.client = &http.Client{} + } else { + s.err = errors.New("Invalid URL") + } +} + +// OnMessage is called when a new message should be sent via HTTP +func (s *HTTPSender) OnMessage(m *types.Message, next types.Next, done types.Done) { + var u string + + if s.err != nil { + done(m, 2, s.err.Error()) + return + } + + data, err := m.PopPayload() + if err != nil { + done(m, 3, "Can't get payload of message: "+err.Error()) + return + } + + if endpoint, exists := m.Opts["http_endpoint"]; exists { + u = s.URL + "/" + endpoint.(string) + } else { + u = s.URL + } + + buf := bytes.NewBuffer(data) + res, err := s.client.Post(u, "", buf) + if err != nil { + done(m, 1, "HTTPSender error: "+err.Error()) + return + } + io.Copy(ioutil.Discard, res.Body) + res.Body.Close() + + if res.StatusCode >= 400 { + done(m, res.StatusCode, "HTTPSender error: "+res.Status) + return + } + + done(m, 0, res.Status) +} diff --git a/components/httpsender/httpSender_test.go b/components/httpsender/httpSender_test.go new file mode 100644 index 0000000..e1a4bb1 --- /dev/null +++ b/components/httpsender/httpSender_test.go @@ -0,0 +1,251 @@ +package httpsender + +import ( + "net/http" + "net/http/httptest" + "net/url" + "testing" + + "github.com/redBorder/rbforwarder/types" + . "github.com/smartystreets/goconvey/convey" + "github.com/stretchr/testify/mock" +) + +type Doner struct { + mock.Mock + doneCalled chan struct { + code int + status string + } +} + +func (d *Doner) Done(m *types.Message, code int, status string) { + d.Called(m, code, status) + d.doneCalled <- struct { + code int + status string + }{ + code, + status, + } +} + +func NewTestClient(code int, cb func(*http.Request)) *http.Client { + server := httptest.NewServer(http.HandlerFunc( + func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(code) + cb(r) + })) + + transport := &http.Transport{ + Proxy: func(req *http.Request) (*url.URL, error) { + return url.Parse(server.URL) + }, + } + + return &http.Client{Transport: transport} +} + +func TestHTTPSender(t *testing.T) { + Convey("Given an HTTP sender with defined URL", t, func() { + sender := &HTTPSender{ + URL: "http://example.com", + } + + Convey("When is initialized", func() { + sender.Init(0) + + Convey("Then the config should be ok", func() { + So(sender.client, ShouldNotBeNil) + }) + }) + + Convey("When a message is sent and the response code is >= 400", func() { + var url string + sender.Init(0) + + m := types.NewMessage() + m.PushPayload([]byte("Hello World")) + sender.client = NewTestClient(401, func(req *http.Request) { + url = req.URL.String() + }) + + d := &Doner{ + doneCalled: make(chan struct { + code int + status string + }, 1), + } + + d.On("Done", mock.AnythingOfType("*types.Message"), + mock.AnythingOfType("int"), mock.AnythingOfType("string")) + + sender.OnMessage(m, nil, d.Done) + + Convey("Then the reporth should contain info about the error", func() { + result := <-d.doneCalled + So(result.status, ShouldEqual, "HTTPSender error: 401 Unauthorized") + So(result.code, ShouldEqual, 401) + So(url, ShouldEqual, "http://example.com/") + + d.AssertExpectations(t) + }) + }) + + Convey("When a message is received without endpoint option", func() { + var url string + sender.Init(0) + + m := types.NewMessage() + m.PushPayload([]byte("Hello World")) + + sender.client = NewTestClient(200, func(req *http.Request) { + url = req.URL.String() + }) + + d := &Doner{ + doneCalled: make(chan struct { + code int + status string + }, 1), + } + d.On("Done", mock.AnythingOfType("*types.Message"), + mock.AnythingOfType("int"), mock.AnythingOfType("string")) + + sender.OnMessage(m, nil, d.Done) + + Convey("Then the message should be sent via HTTP to the URL", func() { + result := <-d.doneCalled + So(result.status, ShouldEqual, "200 OK") + So(result.code, ShouldBeZeroValue) + So(url, ShouldEqual, "http://example.com/") + + d.AssertExpectations(t) + }) + }) + + Convey("When a message is received with endpoint option", func() { + var url string + sender.Init(0) + + m := types.NewMessage() + m.PushPayload([]byte("Hello World")) + m.Opts["http_endpoint"] = "endpoint1" + + sender.client = NewTestClient(200, func(req *http.Request) { + url = req.URL.String() + }) + + d := &Doner{ + doneCalled: make(chan struct { + code int + status string + }, 1), + } + d.On("Done", mock.AnythingOfType("*types.Message"), + mock.AnythingOfType("int"), mock.AnythingOfType("string")) + + sender.OnMessage(m, nil, d.Done) + + Convey("Then the message should be sent to the URL with endpoint as suffix", func() { + result := <-d.doneCalled + So(result.status, ShouldEqual, "200 OK") + So(result.code, ShouldBeZeroValue) + So(url, ShouldEqual, "http://example.com/endpoint1") + + d.AssertExpectations(t) + }) + }) + + Convey("When a message without payload is received", func() { + var url string + sender.Init(0) + + m := types.NewMessage() + + sender.client = NewTestClient(200, func(req *http.Request) { + url = req.URL.String() + }) + + d := &Doner{ + doneCalled: make(chan struct { + code int + status string + }, 1), + } + d.On("Done", mock.AnythingOfType("*types.Message"), + mock.AnythingOfType("int"), mock.AnythingOfType("string")) + + sender.OnMessage(m, nil, d.Done) + + Convey("Then the message should not be sent", func() { + result := <-d.doneCalled + So(result.status, ShouldEqual, "Can't get payload of message: No payload available") + So(result.code, ShouldBeGreaterThan, 0) + So(url, ShouldBeEmpty) + + d.AssertExpectations(t) + }) + }) + + Convey("When a the HTTP client fails", func() { + sender.Init(0) + + m := types.NewMessage() + m.PushPayload([]byte("Hello World")) + + sender.client = NewTestClient(200, func(req *http.Request) { + req.Write(nil) + }) + + d := &Doner{ + doneCalled: make(chan struct { + code int + status string + }, 1), + } + d.On("Done", mock.AnythingOfType("*types.Message"), + mock.AnythingOfType("int"), mock.AnythingOfType("string")) + + sender.OnMessage(m, nil, d.Done) + + Convey("Then the message should not be sent", func() { + result := <-d.doneCalled + So(result.status, ShouldEqual, "HTTPSender error: Post http://example.com: EOF") + So(result.code, ShouldBeGreaterThan, 0) + + d.AssertExpectations(t) + }) + }) + }) + + Convey("Given an HTTP sender with invalid URL", t, func() { + sender := &HTTPSender{} + sender.Init(0) + + Convey("When try to send messages", func() { + m := types.NewMessage() + m.PushPayload([]byte("Hello World")) + m.Opts["http_endpoint"] = "endpoint1" + + d := &Doner{ + doneCalled: make(chan struct { + code int + status string + }, 1), + } + + d.On("Done", mock.AnythingOfType("*types.Message"), + mock.AnythingOfType("int"), mock.AnythingOfType("string")) + + sender.OnMessage(m, nil, d.Done) + + Convey("Then should fail to send messages", func() { + So(sender.err, ShouldNotBeNil) + result := <-d.doneCalled + So(result.status, ShouldEqual, "Invalid URL") + So(result.code, ShouldBeGreaterThan, 0) + }) + }) + }) +} diff --git a/features/httpSender.feature b/features/httpSender.feature new file mode 100644 index 0000000..25abd82 --- /dev/null +++ b/features/httpSender.feature @@ -0,0 +1,29 @@ +Feature: http sender + In order to send data to an endpoint + As a pipeline component + I need to implement an HTTP client to send HTTP messages + + Scenario: Initialization + Given an HTTP sender with defined URL + When is initialized + Then the config should be ok + + Scenario: Message without endpoint + Given an HTTP sender with defined URL + When a message is received without endpoint option + Then the message should be sent via HTTP to the URL + + Scenario: Message with endpoint + Given an HTTP sender with defined URL + When a message is received with endpoint option + Then the message should be sent to the URL with endpoint as suffix + + Scenario: Message with no data + Given an HTTP sender with defined URL + When a message without data is received + Then the message should not be sent + + Scenario: HTTP Sender with no configuration + Given an HTTP sender without URL + When is initialized + Then should fail