Skip to content

Commit

Permalink
Merge pull request #112 from trivago/bugfix/104_prod_httprequest_url_…
Browse files Browse the repository at this point in the history
…parsing

Bugfix/104: Improve parsing of Address parameter
  • Loading branch information
arnecls committed May 5, 2017
2 parents 1e2ea95 + 9227c2b commit b2e47af
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 45 deletions.
34 changes: 23 additions & 11 deletions config/http_in_out.conf
Original file line number Diff line number Diff line change
@@ -1,17 +1,9 @@
# HTTP server consumer
# Feed this with:
# Feed the consumer with with:
#
# curl -vv -X PUT http://localhost:8001/somepath -d "TESTDATA"
#
"HttpIn01":
Type: "consumer.Http"
Streams: "http_01"
Address: ":8001"
WithHeaders: false

# HTTP client producer
#
# To run a debugging HTTP server for this, do:
# To run a debugging HTTP server for the producer, do:
#
# curl -sLO https://gist.githubusercontent.com/ppar/29c75e557671e6ba0ff5894a824be1df/raw/3afd6fe6b70c30ace99cfaf71d5d341dcc64a2d6/http_sink.py
#
Expand All @@ -20,9 +12,29 @@
#
# For 500 error results:
# python http_sink.py 8099 500


# To test the "RawData: true" setting in the producer, feed the consumer with:
#
# echo -n -e "POST /foo/bar HTTP/1.0\nContent-type: text/plain\nContent-length: 24\n\nDummy test\nRequest data\n" > testrequest.txt
#
# curl -vv -X PUT http://localhost:8001/somepath --data-binary @testrequest.txt

# HTTP server consumer
"HttpIn01":
Type: "consumer.Http"
Streams: "http_01"
Address: ":8001"
WithHeaders: false

# HTTP client producer
"HttpOut01":
Type: "producer.HTTPRequest"
Streams: "http_01"
Address: "http://localhost:8099/test"
RawData: false
#Address: "localhost:8099"
RawData: true

#"StdOut01":
# Type: "producer.Console"
# Streams: "http_01"
87 changes: 53 additions & 34 deletions producer/httprequest.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,38 +25,58 @@ import (
"github.com/trivago/tgo/thealthcheck"
"io/ioutil"
"strconv"
"net/url"
"strings"
)

// HTTPRequest producer plugin
// The HTTPRequest producers sends messages as HTTP packet to a given webserver.
// This producer uses a fuse breaker when a request fails with an error
// code > 400 or the connection is down.
//
// The HTTPRequest producer sends messages as HTTP requests to a given webserver.
//
// Configuration example
//
// - "producer.HTTPRequest":
// RawData: true
// Encoding: "text/plain; charset=utf-8"
// Address: "localhost:80"
// Address: "http://localhost:80"
//
// Address defines the URL to send http requests to. Set by default
// to "http://localhost:80". If the value doesn't contain "://",
// it is prepended with "http://", so short forms like "localhost:8088"
// are accepted.
//
// Address defines the webserver to send http requests to. Set to "localhost:80"
// by default.
// If RawData = true, the incoming messages are expected to contain complete
// HTTP requests in "wire format", such as:
//
// RawData switches between creating POST data from the incoming message (false)
// and passing the message as HTTP request without changes (true).
// This setting is enabled by default.
// POST /foo/bar HTTP/1.0\n
// Content-type: text/plain\n
// Content-length: 24
// \n
// Dummy test\n
// Request data\n
//
// In this mode, the message's contents is parsed as an HTTP request and
// sent to the destination server (virtually) unchanged. If the message
// cannot be parsed as an HTTP request, an error is logged. Only the scheme,
// host and port components of the "Address" URL are used; any path and query
// parameters are ignored. The "Encoding" parameter is ignored.
//
// If RawData = false, a POST request is made to the destination server
// for each incoming message, using the complete URL in "Address". The
// incoming message's contents are delivered in the POST request's body
// and Content-type is set to the value of "Encoding"
//
// Encoding defines the payload encoding when RawData is set to false.
// Set to "text/plain; charset=utf-8" by default.
//
type HTTPRequest struct {
core.BufferedProducer
host string
port string
protocol string
address string
encoding string
rawPackets bool
listen *tnet.StopListener
lastError error

destinationUrl *url.URL
encoding string
rawPackets bool
listen *tnet.StopListener
lastError error
}

func init() {
Expand All @@ -70,15 +90,13 @@ func (prod *HTTPRequest) Configure(conf core.PluginConfigReader) error {
prod.SetStopCallback(prod.close)
prod.SetCheckFuseCallback(prod.isHostUp)

address := conf.GetString("Address", "localhost:80")
prod.protocol, prod.host, prod.port, err = tnet.SplitAddress(address, "http")
conf.Errors.Push(err)

if prod.host == "" {
prod.host = "localhost"
address := conf.GetString("Address", "http://localhost:80")
if strings.Index(address, "://") == -1 {
address = "http://" + address
}
prod.destinationUrl, err = url.Parse(address)
conf.Errors.Push(err)

prod.address = fmt.Sprintf("%s://%s:%s", prod.protocol, prod.host, prod.port)
prod.encoding = conf.GetString("Encoding", "text/plain; charset=utf-8")
prod.rawPackets = conf.GetBool("RawData", true)

Expand All @@ -100,7 +118,7 @@ func (prod *HTTPRequest) Configure(conf core.PluginConfigReader) error {
}

func (prod *HTTPRequest) healthcheckPingBackend() (int, string) {
code, body, err := httpRequestWrapper(http.Get(prod.address))
code, body, err := httpRequestWrapper(http.Get(prod.destinationUrl.String()))
if err != nil {
return code, strconv.Quote(err.Error())
}
Expand Down Expand Up @@ -135,10 +153,11 @@ func httpRequestWrapper(resp *http.Response, err error) (int, string, error) {
}

func (prod *HTTPRequest) isHostUp() bool {
resp, err := http.Get(prod.address)
resp, err := http.Get(prod.destinationUrl.String())
return err != nil && resp != nil && resp.StatusCode < 400
}

// The onMessage callback
func (prod *HTTPRequest) sendReq(msg *core.Message) {
var (
req *http.Request
Expand All @@ -149,24 +168,24 @@ func (prod *HTTPRequest) sendReq(msg *core.Message) {
requestData := bytes.NewBuffer(msg.Data())

if prod.rawPackets {
// Pass raw request
// Assume the message already contains an HTTP request in wire format.
// Create a Request object, override host, port and scheme, and send it out.
req, err = http.ReadRequest(bufio.NewReader(requestData))

if req != nil {
req.URL.Host = prod.address
req.URL.Host = prod.destinationUrl.Host
req.URL.Scheme = prod.destinationUrl.Scheme
req.RequestURI = ""
req.URL.Scheme = prod.protocol
}
} else {
// Convert to POST request
req, err = http.NewRequest("POST", prod.address, requestData)
// Encapsulate the message in a POST request
req, err = http.NewRequest("POST", prod.destinationUrl.String(), requestData)
if req != nil {
req.Header.Add("content-type", prod.encoding)
req.Header.Add("Content-type", prod.encoding)
}
}

if err != nil {
prod.Log.Error.Print("Invalid request", err)
prod.Log.Error.Print("Invalid request: ", err)
prod.Drop(originalMsg)
prod.lastError = err
return // ### return, malformed request ###
Expand Down

0 comments on commit b2e47af

Please sign in to comment.