-
Notifications
You must be signed in to change notification settings - Fork 568
/
client.go
106 lines (89 loc) · 2.71 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
package client
import (
"context"
"encoding/json"
"io"
"net/http"
"net/url"
"path"
"time"
"github.com/pachyderm/pachyderm/v2/src/internal/errors"
"github.com/pachyderm/pachyderm/v2/src/internal/promutil"
)
// ** Why this is here **
// We use a stripped down version of the loki client as importing
// the main client locks us to old module deps like k8s.io/client-go
var (
queryRangePath = "/loki/api/v1/query_range"
)
// Client holds configuration for the loki
type Client struct {
Address string
}
var lokiClient = &http.Client{
Transport: promutil.InstrumentRoundTripper("loki", http.DefaultTransport),
}
// QueryRange queries Loki in a given time range.
func (c *Client) QueryRange(ctx context.Context, queryStr string, limit int, start, end time.Time, direction string, step, interval time.Duration, quiet bool) (*QueryResponse, error) {
params := newQueryStringBuilder()
params.SetString("query", queryStr)
if limit > 0 {
params.SetInt32("limit", limit)
}
if !start.IsZero() {
params.SetInt("start", start.UnixNano())
}
if !end.IsZero() {
params.SetInt("end", end.UnixNano())
}
params.SetString("direction", direction)
// The step is optional, so we do set it only if provided,
// otherwise we do leverage on the API defaults
if step != 0 {
params.SetFloat("step", step.Seconds())
}
if interval != 0 {
params.SetFloat("interval", interval.Seconds())
}
return c.doQuery(ctx, queryRangePath, params.Encode(), quiet)
}
func (c *Client) doQuery(ctx context.Context, path string, query string, quiet bool) (*QueryResponse, error) {
var err error
var r QueryResponse
if err = c.doRequest(ctx, path, query, quiet, &r); err != nil {
return nil, err
}
return &r, nil
}
func (c *Client) doRequest(ctx context.Context, path, query string, quiet bool, out interface{}) error {
us, err := buildURL(c.Address, path, query)
if err != nil {
return err
}
req, err := http.NewRequestWithContext(ctx, "GET", us, nil)
if err != nil {
return errors.EnsureStack(err)
}
resp, err := lokiClient.Do(req)
if err != nil {
return errors.EnsureStack(err)
}
defer resp.Body.Close()
if resp.StatusCode >= http.StatusBadRequest {
body, err := io.ReadAll(resp.Body)
if err != nil {
return errors.EnsureStack(errors.Errorf("error response from loki: %v (body: %q); additionally, reading body: %v", resp.Status, body, err))
}
return errors.EnsureStack(errors.Errorf("error response from loki: %v (body: %q)", resp.Status, body))
}
return errors.EnsureStack(json.NewDecoder(resp.Body).Decode(out))
}
func buildURL(u, p, q string) (string, error) {
url, err := url.Parse(u)
if err != nil {
return "", errors.EnsureStack(err)
}
url.Path = path.Join(url.Path, p)
url.RawQuery = q
return url.String(), nil
}