forked from influxdata/influxdb
/
http.go
155 lines (138 loc) · 3.82 KB
/
http.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
package client
import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"fmt"
"io/ioutil"
"mime"
"net"
"net/http"
"net/url"
"strconv"
"github.com/influxdata/flux"
"github.com/influxdata/flux/csv"
"github.com/influxdata/flux/lang"
"github.com/influxdata/flux/repl"
iclient "github.com/influxdata/influxdb/client"
"github.com/pkg/errors"
)
const (
fluxPath = "/api/v2/query"
)
// Shared transports for all clients to prevent leaking connections
var (
skipVerifyTransport = &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
}
defaultTransport = &http.Transport{}
)
// HTTP implements a Flux query client that makes requests to the /api/v2/query
// API endpoint.
type HTTP struct {
Addr string
Username string
Password string
InsecureSkipVerify bool
url *url.URL
}
// NewHTTP creates a HTTP client
func NewHTTP(host string, port int, ssl bool) (*HTTP, error) {
addr := net.JoinHostPort(host, strconv.Itoa(port))
u, e := iclient.ParseConnectionString(addr, ssl)
if e != nil {
return nil, e
}
u.Path = fluxPath
return &HTTP{url: &u}, nil
}
// Query runs a flux query against a influx server and decodes the result
func (s *HTTP) Query(ctx context.Context, r *ProxyRequest) (flux.ResultIterator, error) {
qreq, err := QueryRequestFromProxyRequest(r)
if err != nil {
return nil, err
}
var body bytes.Buffer
if err := json.NewEncoder(&body).Encode(qreq); err != nil {
return nil, err
}
hreq, err := http.NewRequest("POST", s.url.String(), &body)
if err != nil {
return nil, err
}
if s.Username != "" {
hreq.SetBasicAuth(s.Username, s.Password)
}
hreq.Header.Set("Content-Type", "application/json")
hreq.Header.Set("Accept", "text/csv")
hreq = hreq.WithContext(ctx)
hc := newClient(s.url.Scheme, s.InsecureSkipVerify)
resp, err := hc.Do(hreq)
if err != nil {
return nil, err
}
if err := checkError(resp); err != nil {
return nil, err
}
decoder := csv.NewMultiResultDecoder(csv.ResultDecoderConfig{})
return decoder.Decode(resp.Body)
}
func newClient(scheme string, insecure bool) *http.Client {
hc := &http.Client{
Transport: defaultTransport,
}
if scheme == "https" && insecure {
hc.Transport = skipVerifyTransport
}
return hc
}
// CheckError reads the http.Response and returns an error if one exists.
// It will automatically recognize the errors returned by Influx services
// and decode the error into an internal error type. If the error cannot
// be determined in that way, it will create a generic error message.
//
// If there is no error, then this returns nil.
func checkError(resp *http.Response) error {
switch resp.StatusCode / 100 {
case 4:
// We will attempt to parse this error outside of this block.
msg := "client error"
data, _ := ioutil.ReadAll(resp.Body)
mt, _, err := mime.ParseMediaType(resp.Header.Get("content-type"))
if err == nil && mt == "text/plain" && len(msg) > 0 {
msg = string(data)
}
return errors.Wrap(errors.New(resp.Status), msg)
case 1, 2:
return nil
default:
msg := "unknown server error"
return errors.Wrap(errors.New(resp.Status), msg)
}
}
func QueryRequestFromProxyRequest(req *ProxyRequest) (*QueryRequest, error) {
qr := new(QueryRequest)
switch c := req.Compiler.(type) {
case lang.FluxCompiler:
qr.Type = "flux"
qr.Query = c.Query
case repl.Compiler:
qr.Type = "flux"
qr.Spec = c.Spec
default:
return nil, fmt.Errorf("unsupported compiler %T", c)
}
switch d := req.Dialect.(type) {
case *csv.Dialect:
var header = !d.ResultEncoderConfig.NoHeader
qr.Dialect.Header = &header
qr.Dialect.Delimiter = string(d.ResultEncoderConfig.Delimiter)
qr.Dialect.CommentPrefix = "#"
qr.Dialect.DateTimeFormat = "RFC3339"
qr.Dialect.Annotations = d.ResultEncoderConfig.Annotations
default:
return nil, fmt.Errorf("unsupported dialect %T", d)
}
return qr, nil
}