-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Closes #3
- Loading branch information
Showing
3 changed files
with
351 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,71 @@ | ||
package httpsender | ||
|
||
import ( | ||
"bytes" | ||
"errors" | ||
"io" | ||
"io/ioutil" | ||
"net/http" | ||
|
||
"github.com/asaskevich/govalidator" | ||
"github.com/redBorder/rbforwarder/types" | ||
) | ||
|
||
// HTTPSender is a component for the rbforwarder pipeline that sends messages | ||
// to an HTTP endpoint. It's a final component, so it will call Done() instead | ||
// of Next() and further components shuld not be added after this component. | ||
type HTTPSender struct { | ||
id int | ||
err error | ||
URL string | ||
client *http.Client | ||
} | ||
|
||
// Init initializes the HTTP component | ||
func (s *HTTPSender) Init(id int) { | ||
s.id = id | ||
|
||
if govalidator.IsURL(s.URL) { | ||
s.client = &http.Client{} | ||
} else { | ||
s.err = errors.New("Invalid URL") | ||
} | ||
} | ||
|
||
// OnMessage is called when a new message should be sent via HTTP | ||
func (s *HTTPSender) OnMessage(m *types.Message, next types.Next, done types.Done) { | ||
var u string | ||
|
||
if s.err != nil { | ||
done(m, 2, s.err.Error()) | ||
return | ||
} | ||
|
||
data, err := m.PopPayload() | ||
if err != nil { | ||
done(m, 3, "Can't get payload of message: "+err.Error()) | ||
return | ||
} | ||
|
||
if endpoint, exists := m.Opts["http_endpoint"]; exists { | ||
u = s.URL + "/" + endpoint.(string) | ||
} else { | ||
u = s.URL | ||
} | ||
|
||
buf := bytes.NewBuffer(data) | ||
res, err := s.client.Post(u, "", buf) | ||
if err != nil { | ||
done(m, 1, "HTTPSender error: "+err.Error()) | ||
return | ||
} | ||
io.Copy(ioutil.Discard, res.Body) | ||
res.Body.Close() | ||
|
||
if res.StatusCode >= 400 { | ||
done(m, res.StatusCode, "HTTPSender error: "+res.Status) | ||
return | ||
} | ||
|
||
done(m, 0, res.Status) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,251 @@ | ||
package httpsender | ||
|
||
import ( | ||
"net/http" | ||
"net/http/httptest" | ||
"net/url" | ||
"testing" | ||
|
||
"github.com/redBorder/rbforwarder/types" | ||
. "github.com/smartystreets/goconvey/convey" | ||
"github.com/stretchr/testify/mock" | ||
) | ||
|
||
type Doner struct { | ||
mock.Mock | ||
doneCalled chan struct { | ||
code int | ||
status string | ||
} | ||
} | ||
|
||
func (d *Doner) Done(m *types.Message, code int, status string) { | ||
d.Called(m, code, status) | ||
d.doneCalled <- struct { | ||
code int | ||
status string | ||
}{ | ||
code, | ||
status, | ||
} | ||
} | ||
|
||
func NewTestClient(code int, cb func(*http.Request)) *http.Client { | ||
server := httptest.NewServer(http.HandlerFunc( | ||
func(w http.ResponseWriter, r *http.Request) { | ||
w.WriteHeader(code) | ||
cb(r) | ||
})) | ||
|
||
transport := &http.Transport{ | ||
Proxy: func(req *http.Request) (*url.URL, error) { | ||
return url.Parse(server.URL) | ||
}, | ||
} | ||
|
||
return &http.Client{Transport: transport} | ||
} | ||
|
||
func TestHTTPSender(t *testing.T) { | ||
Convey("Given an HTTP sender with defined URL", t, func() { | ||
sender := &HTTPSender{ | ||
URL: "http://example.com", | ||
} | ||
|
||
Convey("When is initialized", func() { | ||
sender.Init(0) | ||
|
||
Convey("Then the config should be ok", func() { | ||
So(sender.client, ShouldNotBeNil) | ||
}) | ||
}) | ||
|
||
Convey("When a message is sent and the response code is >= 400", func() { | ||
var url string | ||
sender.Init(0) | ||
|
||
m := types.NewMessage() | ||
m.PushPayload([]byte("Hello World")) | ||
sender.client = NewTestClient(401, func(req *http.Request) { | ||
url = req.URL.String() | ||
}) | ||
|
||
d := &Doner{ | ||
doneCalled: make(chan struct { | ||
code int | ||
status string | ||
}, 1), | ||
} | ||
|
||
d.On("Done", mock.AnythingOfType("*types.Message"), | ||
mock.AnythingOfType("int"), mock.AnythingOfType("string")) | ||
|
||
sender.OnMessage(m, nil, d.Done) | ||
|
||
Convey("Then the reporth should contain info about the error", func() { | ||
result := <-d.doneCalled | ||
So(result.status, ShouldEqual, "HTTPSender error: 401 Unauthorized") | ||
So(result.code, ShouldEqual, 401) | ||
So(url, ShouldEqual, "http://example.com/") | ||
|
||
d.AssertExpectations(t) | ||
}) | ||
}) | ||
|
||
Convey("When a message is received without endpoint option", func() { | ||
var url string | ||
sender.Init(0) | ||
|
||
m := types.NewMessage() | ||
m.PushPayload([]byte("Hello World")) | ||
|
||
sender.client = NewTestClient(200, func(req *http.Request) { | ||
url = req.URL.String() | ||
}) | ||
|
||
d := &Doner{ | ||
doneCalled: make(chan struct { | ||
code int | ||
status string | ||
}, 1), | ||
} | ||
d.On("Done", mock.AnythingOfType("*types.Message"), | ||
mock.AnythingOfType("int"), mock.AnythingOfType("string")) | ||
|
||
sender.OnMessage(m, nil, d.Done) | ||
|
||
Convey("Then the message should be sent via HTTP to the URL", func() { | ||
result := <-d.doneCalled | ||
So(result.status, ShouldEqual, "200 OK") | ||
So(result.code, ShouldBeZeroValue) | ||
So(url, ShouldEqual, "http://example.com/") | ||
|
||
d.AssertExpectations(t) | ||
}) | ||
}) | ||
|
||
Convey("When a message is received with endpoint option", func() { | ||
var url string | ||
sender.Init(0) | ||
|
||
m := types.NewMessage() | ||
m.PushPayload([]byte("Hello World")) | ||
m.Opts["http_endpoint"] = "endpoint1" | ||
|
||
sender.client = NewTestClient(200, func(req *http.Request) { | ||
url = req.URL.String() | ||
}) | ||
|
||
d := &Doner{ | ||
doneCalled: make(chan struct { | ||
code int | ||
status string | ||
}, 1), | ||
} | ||
d.On("Done", mock.AnythingOfType("*types.Message"), | ||
mock.AnythingOfType("int"), mock.AnythingOfType("string")) | ||
|
||
sender.OnMessage(m, nil, d.Done) | ||
|
||
Convey("Then the message should be sent to the URL with endpoint as suffix", func() { | ||
result := <-d.doneCalled | ||
So(result.status, ShouldEqual, "200 OK") | ||
So(result.code, ShouldBeZeroValue) | ||
So(url, ShouldEqual, "http://example.com/endpoint1") | ||
|
||
d.AssertExpectations(t) | ||
}) | ||
}) | ||
|
||
Convey("When a message without payload is received", func() { | ||
var url string | ||
sender.Init(0) | ||
|
||
m := types.NewMessage() | ||
|
||
sender.client = NewTestClient(200, func(req *http.Request) { | ||
url = req.URL.String() | ||
}) | ||
|
||
d := &Doner{ | ||
doneCalled: make(chan struct { | ||
code int | ||
status string | ||
}, 1), | ||
} | ||
d.On("Done", mock.AnythingOfType("*types.Message"), | ||
mock.AnythingOfType("int"), mock.AnythingOfType("string")) | ||
|
||
sender.OnMessage(m, nil, d.Done) | ||
|
||
Convey("Then the message should not be sent", func() { | ||
result := <-d.doneCalled | ||
So(result.status, ShouldEqual, "Can't get payload of message: No payload available") | ||
So(result.code, ShouldBeGreaterThan, 0) | ||
So(url, ShouldBeEmpty) | ||
|
||
d.AssertExpectations(t) | ||
}) | ||
}) | ||
|
||
Convey("When a the HTTP client fails", func() { | ||
sender.Init(0) | ||
|
||
m := types.NewMessage() | ||
m.PushPayload([]byte("Hello World")) | ||
|
||
sender.client = NewTestClient(200, func(req *http.Request) { | ||
req.Write(nil) | ||
}) | ||
|
||
d := &Doner{ | ||
doneCalled: make(chan struct { | ||
code int | ||
status string | ||
}, 1), | ||
} | ||
d.On("Done", mock.AnythingOfType("*types.Message"), | ||
mock.AnythingOfType("int"), mock.AnythingOfType("string")) | ||
|
||
sender.OnMessage(m, nil, d.Done) | ||
|
||
Convey("Then the message should not be sent", func() { | ||
result := <-d.doneCalled | ||
So(result.status, ShouldEqual, "HTTPSender error: Post http://example.com: EOF") | ||
So(result.code, ShouldBeGreaterThan, 0) | ||
|
||
d.AssertExpectations(t) | ||
}) | ||
}) | ||
}) | ||
|
||
Convey("Given an HTTP sender with invalid URL", t, func() { | ||
sender := &HTTPSender{} | ||
sender.Init(0) | ||
|
||
Convey("When try to send messages", func() { | ||
m := types.NewMessage() | ||
m.PushPayload([]byte("Hello World")) | ||
m.Opts["http_endpoint"] = "endpoint1" | ||
|
||
d := &Doner{ | ||
doneCalled: make(chan struct { | ||
code int | ||
status string | ||
}, 1), | ||
} | ||
|
||
d.On("Done", mock.AnythingOfType("*types.Message"), | ||
mock.AnythingOfType("int"), mock.AnythingOfType("string")) | ||
|
||
sender.OnMessage(m, nil, d.Done) | ||
|
||
Convey("Then should fail to send messages", func() { | ||
So(sender.err, ShouldNotBeNil) | ||
result := <-d.doneCalled | ||
So(result.status, ShouldEqual, "Invalid URL") | ||
So(result.code, ShouldBeGreaterThan, 0) | ||
}) | ||
}) | ||
}) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
Feature: http sender | ||
In order to send data to an endpoint | ||
As a pipeline component | ||
I need to implement an HTTP client to send HTTP messages | ||
|
||
Scenario: Initialization | ||
Given an HTTP sender with defined URL | ||
When is initialized | ||
Then the config should be ok | ||
|
||
Scenario: Message without endpoint | ||
Given an HTTP sender with defined URL | ||
When a message is received without endpoint option | ||
Then the message should be sent via HTTP to the URL | ||
|
||
Scenario: Message with endpoint | ||
Given an HTTP sender with defined URL | ||
When a message is received with endpoint option | ||
Then the message should be sent to the URL with endpoint as suffix | ||
|
||
Scenario: Message with no data | ||
Given an HTTP sender with defined URL | ||
When a message without data is received | ||
Then the message should not be sent | ||
|
||
Scenario: HTTP Sender with no configuration | ||
Given an HTTP sender without URL | ||
When is initialized | ||
Then should fail |