Skip to content

Commit

Permalink
Revert using fasthttp library to net/http
Browse files Browse the repository at this point in the history
  • Loading branch information
sparrc committed Jan 11, 2017
1 parent 9f46a27 commit 8a22a12
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 65 deletions.
2 changes: 0 additions & 2 deletions Godeps
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,6 @@ github.com/shirou/gopsutil 1516eb9ddc5e61ba58874047a98f8b44b5e585e8
github.com/soniah/gosnmp 3fe3beb30fa9700988893c56a63b1df8e1b68c26
github.com/streadway/amqp b4f3ceab0337f013208d31348b578d83c0064744
github.com/stretchr/testify 1f4a1643a57e798696635ea4c126e9127adb7d3c
github.com/valyala/bytebufferpool e746df99fe4a3986f4d4f79e13c1e0117ce9c2f7
github.com/valyala/fasthttp 2f4876aaf2b591786efc9b49f34b86ad44c25074
github.com/vjeantet/grok 83bfdfdfd1a8146795b28e547a8e3c8b28a466c2
github.com/wvanbergen/kafka 46f9a1cf3f670edec492029fadded9c2d9e18866
github.com/wvanbergen/kazoo-go 0f768712ae6f76454f987c3356177e138df258f8
Expand Down
137 changes: 74 additions & 63 deletions plugins/outputs/influxdb/client/http.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
package client

import (
"bytes"
"crypto/tls"
"encoding/base64"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"time"

"github.com/valyala/fasthttp"
)

var (
Expand Down Expand Up @@ -40,13 +40,15 @@ func NewHTTP(config HTTPConfig, defaultWP WriteParams) (Client, error) {
return nil, fmt.Errorf("config.URL scheme must be http(s), got %s", u.Scheme)
}

wu := writeURL(u, defaultWP)
return &httpClient{
writeURL: []byte(wu),
writeURL: writeURL(u, defaultWP),
config: config,
url: u,
client: &fasthttp.Client{
TLSConfig: config.TLSConfig,
client: &http.Client{
Timeout: config.Timeout,
Transport: &http.Transport{
TLSClientConfig: config.TLSConfig,
},
},
}, nil
}
Expand All @@ -58,8 +60,13 @@ type HTTPConfig struct {
// UserAgent sets the User-Agent header.
UserAgent string

// Timeout is the time to wait for a response to each HTTP request (writes
// and queries).
// Timeout specifies a time limit for requests made by this
// Client. The timeout includes connection time, any
// redirects, and reading the response body. The timer remains
// running after Get, Head, Post, or Do return and will
// interrupt reading of the Response.Body.
//
// A Timeout of zero means no timeout.
Timeout time.Duration

// Username is the basic auth username for the server.
Expand Down Expand Up @@ -92,46 +99,53 @@ func (r *Response) Error() error {
}

type httpClient struct {
writeURL []byte
writeURL string
config HTTPConfig
client *fasthttp.Client
client *http.Client
url *url.URL
}

func (c *httpClient) Query(command string) error {
req := c.makeRequest()
req.Header.SetRequestURI(queryURL(c.url, command))

return c.doRequest(req, fasthttp.StatusOK)
req, err := c.makeRequest(queryURL(c.url, command), bytes.NewReader([]byte("")))
if err != nil {
return err
}
return c.doRequest(req, http.StatusOK)
}

func (c *httpClient) Write(b []byte) (int, error) {
req := c.makeWriteRequest(len(b), c.writeURL)
req.SetBody(b)
req, err := c.makeWriteRequest(bytes.NewReader(b), len(b), c.writeURL)
if err != nil {
return 0, nil
}

err := c.doRequest(req, fasthttp.StatusNoContent)
err = c.doRequest(req, http.StatusNoContent)
if err == nil {
return len(b), nil
}
return 0, err
}

func (c *httpClient) WriteWithParams(b []byte, wp WriteParams) (int, error) {
req := c.makeWriteRequest(len(b), []byte(writeURL(c.url, wp)))
req.SetBody(b)
req, err := c.makeWriteRequest(bytes.NewReader(b), len(b), writeURL(c.url, wp))
if err != nil {
return 0, nil
}

err := c.doRequest(req, fasthttp.StatusNoContent)
err = c.doRequest(req, http.StatusNoContent)
if err == nil {
return len(b), nil
}
return 0, err
}

func (c *httpClient) WriteStream(r io.Reader, contentLength int) (int, error) {
req := c.makeWriteRequest(contentLength, c.writeURL)
req.SetBodyStream(r, contentLength)
req, err := c.makeWriteRequest(r, contentLength, c.writeURL)
if err != nil {
return 0, nil
}

err := c.doRequest(req, fasthttp.StatusNoContent)
err = c.doRequest(req, http.StatusNoContent)
if err == nil {
return contentLength, nil
}
Expand All @@ -143,35 +157,40 @@ func (c *httpClient) WriteStreamWithParams(
contentLength int,
wp WriteParams,
) (int, error) {
req := c.makeWriteRequest(contentLength, []byte(writeURL(c.url, wp)))
req.SetBodyStream(r, contentLength)
req, err := c.makeWriteRequest(r, contentLength, writeURL(c.url, wp))
if err != nil {
return 0, nil
}

err := c.doRequest(req, fasthttp.StatusNoContent)
err = c.doRequest(req, http.StatusNoContent)
if err == nil {
return contentLength, nil
}
return 0, err
}

func (c *httpClient) doRequest(
req *fasthttp.Request,
req *http.Request,
expectedCode int,
) error {
resp := fasthttp.AcquireResponse()

err := c.client.DoTimeout(req, resp, c.config.Timeout)
resp, err := c.client.Do(req)
if err != nil {
return err
}

code := resp.StatusCode()
code := resp.StatusCode
// If it's a "no content" response, then release and return nil
if code == fasthttp.StatusNoContent {
fasthttp.ReleaseResponse(resp)
fasthttp.ReleaseRequest(req)
if code == http.StatusNoContent {
return nil
}

// not a "no content" response, so parse the result:
var response Response
decErr := json.Unmarshal(resp.Body(), &response)
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("Fatal error reading body: %s", err)
}
decErr := json.Unmarshal(body, &response)

// If we got a JSON decode error, send that back
if decErr != nil {
Expand All @@ -184,35 +203,37 @@ func (c *httpClient) doRequest(
code, expectedCode, response.Error())
}

fasthttp.ReleaseResponse(resp)
fasthttp.ReleaseRequest(req)

return err
}

func (c *httpClient) makeWriteRequest(
body io.Reader,
contentLength int,
writeURL []byte,
) *fasthttp.Request {
req := c.makeRequest()
req.Header.SetContentLength(contentLength)
req.Header.SetRequestURIBytes(writeURL)
writeURL string,
) (*http.Request, error) {
req, err := c.makeRequest(writeURL, body)
if err != nil {
return nil, err
}
req.Header.Set("Content-Length", fmt.Sprint(contentLength))
// TODO
// if gzip {
// req.Header.SetBytesKV([]byte("Content-Encoding"), []byte("gzip"))
// req.Header.Set("Content-Encoding", "gzip")
// }
return req
return req, nil
}

func (c *httpClient) makeRequest() *fasthttp.Request {
req := fasthttp.AcquireRequest()
req.Header.SetContentTypeBytes([]byte("text/plain"))
req.Header.SetMethodBytes([]byte("POST"))
req.Header.SetUserAgent(c.config.UserAgent)
func (c *httpClient) makeRequest(uri string, body io.Reader) (*http.Request, error) {
req, err := http.NewRequest("POST", uri, body)
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "text/plain")
req.Header.Set("User-Agent", c.config.UserAgent)
if c.config.Username != "" && c.config.Password != "" {
req.Header.Set("Authorization", "Basic "+basicAuth(c.config.Username, c.config.Password))
req.SetBasicAuth(c.config.Username, c.config.Password)
}
return req
return req, nil
}

func (c *httpClient) Close() error {
Expand Down Expand Up @@ -246,13 +267,3 @@ func queryURL(u *url.URL, command string) string {
u.Path = "query"
return u.String()
}

// See 2 (end of page 4) http://www.ietf.org/rfc/rfc2617.txt
// "To receive authorization, the httpClient sends the userid and password,
// separated by a single colon (":") character, within a base64
// encoded string in the credentials."
// It is not meant to be urlencoded.
func basicAuth(username, password string) string {
auth := username + ":" + password
return base64.StdEncoding.EncodeToString([]byte(auth))
}

0 comments on commit 8a22a12

Please sign in to comment.