forked from grafana/grafana
/
api.go
268 lines (221 loc) · 6.77 KB
/
api.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
package loki
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"path"
"strconv"
"github.com/grafana/grafana-plugin-sdk-go/data"
jsoniter "github.com/json-iterator/go"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/util/converter"
)
type LokiAPI struct {
client *http.Client
url string
log log.Logger
}
type RawLokiResponse struct {
Body []byte
Status int
Encoding string
}
func newLokiAPI(client *http.Client, url string, log log.Logger) *LokiAPI {
return &LokiAPI{client: client, url: url, log: log}
}
func makeDataRequest(ctx context.Context, lokiDsUrl string, query lokiQuery) (*http.Request, error) {
qs := url.Values{}
qs.Set("query", query.Expr)
qs.Set("direction", string(query.Direction))
// MaxLines defaults to zero when not received,
// and Loki does not like limit=0, even when it is not needed
// (for example for metric queries), so we
// only send it when it's set
if query.MaxLines > 0 {
qs.Set("limit", fmt.Sprintf("%d", query.MaxLines))
}
lokiUrl, err := url.Parse(lokiDsUrl)
if err != nil {
return nil, err
}
switch query.QueryType {
case QueryTypeRange:
{
qs.Set("start", strconv.FormatInt(query.Start.UnixNano(), 10))
qs.Set("end", strconv.FormatInt(query.End.UnixNano(), 10))
// NOTE: technically for streams-producing queries `step`
// is ignored, so it would be nicer to not send it in such cases,
// but we cannot detect that situation, so we always send it.
// it should not break anything.
// NOTE2: we do this at millisecond precision for two reasons:
// a. Loki cannot do steps with better precision anyway,
// so the microsecond & nanosecond part can be ignored.
// b. having it always be number+'ms' makes it more robust and
// precise, as Loki does not support step with float number
// and time-specifier, like "1.5s"
qs.Set("step", fmt.Sprintf("%dms", query.Step.Milliseconds()))
lokiUrl.Path = path.Join(lokiUrl.Path, "/loki/api/v1/query_range")
}
case QueryTypeInstant:
{
qs.Set("time", strconv.FormatInt(query.End.UnixNano(), 10))
lokiUrl.Path = path.Join(lokiUrl.Path, "/loki/api/v1/query")
}
default:
return nil, fmt.Errorf("invalid QueryType: %v", query.QueryType)
}
lokiUrl.RawQuery = qs.Encode()
req, err := http.NewRequestWithContext(ctx, "GET", lokiUrl.String(), nil)
if err != nil {
return nil, err
}
if query.SupportingQueryType != SupportingQueryNone {
value := getSupportingQueryHeaderValue(req, query.SupportingQueryType)
if value != "" {
req.Header.Set("X-Query-Tags", "Source="+value)
}
}
return req, nil
}
type lokiResponseError struct {
Message string `json:"message"`
TraceID string `json:"traceID,omitempty"`
}
type lokiError struct {
Message string
}
func makeLokiError(bytes []byte) error {
var data lokiError
err := json.Unmarshal(bytes, &data)
if err != nil {
// we were unable to convert the bytes to JSON, we return the whole text
return fmt.Errorf("%v", string(bytes))
}
if data.Message == "" {
// we got no usable error message, we return the whole text
return fmt.Errorf("%v", string(bytes))
}
return fmt.Errorf("%v", data.Message)
}
// we know there is an error,
// based on the http-response-body
// we have to make an informative error-object
func readLokiError(body io.ReadCloser) error {
var buf bytes.Buffer
_, err := buf.ReadFrom(body)
if err != nil {
return err
}
bytes := buf.Bytes()
// the error-message is probably a JSON structure,
// with a string-field named "message". we want the
// value of that field.
// but, the response might be just a simple string,
// this was used in older Loki versions.
// so our approach is this:
// - we try to convert the bytes to JSON
// - we take the value of the field "message"
// - if any of these steps fail, or if "message" is empty, we return the whole text
return makeLokiError(bytes)
}
func (api *LokiAPI) DataQuery(ctx context.Context, query lokiQuery, responseOpts ResponseOpts) (data.Frames, error) {
req, err := makeDataRequest(ctx, api.url, query)
if err != nil {
return nil, err
}
resp, err := api.client.Do(req)
if err != nil {
return nil, err
}
defer func() {
if err := resp.Body.Close(); err != nil {
api.log.Warn("Failed to close response body", "err", err)
}
}()
if resp.StatusCode/100 != 2 {
return nil, readLokiError(resp.Body)
}
iter := jsoniter.Parse(jsoniter.ConfigDefault, resp.Body, 1024)
res := converter.ReadPrometheusStyleResult(iter, converter.Options{MatrixWideSeries: false, VectorWideSeries: false, Dataplane: responseOpts.metricDataplane})
if res.Error != nil {
return nil, res.Error
}
return res.Frames, nil
}
func makeRawRequest(ctx context.Context, lokiDsUrl string, resourcePath string) (*http.Request, error) {
lokiUrl, err := url.Parse(lokiDsUrl)
if err != nil {
return nil, err
}
resourceUrl, err := url.Parse(resourcePath)
if err != nil {
return nil, err
}
// we take the path and the query-string only
lokiUrl.RawQuery = resourceUrl.RawQuery
lokiUrl.Path = path.Join(lokiUrl.Path, resourceUrl.Path)
req, err := http.NewRequestWithContext(ctx, "GET", lokiUrl.String(), nil)
if err != nil {
return nil, err
}
return req, nil
}
func (api *LokiAPI) RawQuery(ctx context.Context, resourcePath string) (RawLokiResponse, error) {
req, err := makeRawRequest(ctx, api.url, resourcePath)
if err != nil {
return RawLokiResponse{}, err
}
resp, err := api.client.Do(req)
if err != nil {
return RawLokiResponse{}, err
}
defer func() {
if err := resp.Body.Close(); err != nil {
api.log.Warn("Failed to close response body", "err", err)
}
}()
// server errors are handled by the plugin-proxy to hide the error message
if resp.StatusCode/100 == 5 {
return RawLokiResponse{}, readLokiError(resp.Body)
}
body, err := io.ReadAll(resp.Body)
if err != nil {
return RawLokiResponse{}, err
}
// client errors are passed as a json struct to the client
if resp.StatusCode/100 != 2 {
lokiResponseErr := lokiResponseError{Message: makeLokiError(body).Error()}
traceID := tracing.TraceIDFromContext(ctx, false)
if traceID != "" {
lokiResponseErr.TraceID = traceID
}
body, err = json.Marshal(lokiResponseErr)
if err != nil {
return RawLokiResponse{}, err
}
}
rawLokiResponse := RawLokiResponse{
Body: body,
Status: resp.StatusCode,
Encoding: resp.Header.Get("Content-Encoding"),
}
return rawLokiResponse, nil
}
func getSupportingQueryHeaderValue(req *http.Request, supportingQueryType SupportingQueryType) string {
value := ""
switch supportingQueryType {
case SupportingQueryLogsVolume:
value = "logvolhist"
case SupportingQueryLogsSample:
value = "logsample"
case SupportingQueryDataSample:
value = "datasample"
default: //ignore
}
return value
}