-
Notifications
You must be signed in to change notification settings - Fork 3
/
http_writer.go
86 lines (73 loc) · 2.45 KB
/
http_writer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
package main
import (
"bytes"
"fmt"
"io/ioutil"
"time"
"github.com/klauspost/compress/gzip"
"github.com/valyala/fasthttp"
)
// LineProtocolWriter is the interface used to write OpenTSDB bulk data.
type LineProtocolWriter interface {
// WriteLineProtocol writes the given byte slice containing bulk data
// to an implementation-specific remote server.
// Returns the latency, in nanoseconds, of executing the write against the remote server and applicable errors.
// Implementers must return errors returned by the underlying transport but are free to return
// other, context-specific errors.
WriteLineProtocol([]byte) (latencyNs int64, err error)
}
// HTTPWriterConfig is the configuration used to create an HTTPWriter.
type HTTPWriterConfig struct {
// URL of the host, in form "http://example.com:8086"
Host string
}
// HTTPWriter is a Writer that writes to an OpenTSDB HTTP server.
type HTTPWriter struct {
client fasthttp.Client
c HTTPWriterConfig
url []byte
}
// NewHTTPWriter returns a new HTTPWriter from the supplied HTTPWriterConfig.
func NewHTTPWriter(c HTTPWriterConfig) LineProtocolWriter {
return &HTTPWriter{
client: fasthttp.Client{
Name: "bulk_load_opentsdb",
},
c: c,
url: []byte(c.Host + "/api/put"),
}
}
var (
post = []byte("POST")
applicationJsonHeader = []byte("application/json")
)
// WriteLineProtocol writes the given byte slice to the HTTP server described in the Writer's HTTPWriterConfig.
// It returns the latency in nanoseconds and any error received while sending the data over HTTP,
// or it returns a new error if the HTTP response isn't as expected.
func (w *HTTPWriter) WriteLineProtocol(body []byte) (int64, error) {
r := bytes.NewReader(body)
reader, _ := gzip.NewReader(r)
d, _ := ioutil.ReadAll(reader)
req := fasthttp.AcquireRequest()
req.Header.SetContentTypeBytes(applicationJsonHeader)
req.Header.Set("Content-Encoding", "gzip")
req.Header.SetMethodBytes(post)
req.Header.SetRequestURIBytes(w.url)
req.SetBody(body)
resp := fasthttp.AcquireResponse()
start := time.Now()
err := w.client.Do(req, resp)
lat := time.Since(start).Nanoseconds()
if err == nil {
sc := resp.StatusCode()
if sc != fasthttp.StatusNoContent && sc != fasthttp.StatusOK {
err = fmt.Errorf("invalid write response (status %d): %s", sc, resp.Body())
fmt.Println(string(d), err)
}
} else {
fmt.Println(string(d), err)
}
fasthttp.ReleaseResponse(resp)
fasthttp.ReleaseRequest(req)
return lat, err
}