/
client.go
217 lines (185 loc) · 5.98 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
package log
import (
"context"
"fmt"
"os"
"sort"
"time"
"github.com/gorilla/websocket"
"github.com/grafana/dskit/backoff"
logclient "github.com/grafana/loki/pkg/logcli/client"
"github.com/grafana/loki/pkg/logcli/output"
"github.com/grafana/loki/pkg/loghttp"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logqlmodel"
"github.com/grafana/loki/pkg/util/unmarshal"
"github.com/prometheus/common/config"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/retry"
)
type Client struct {
logclient.Client
StdOut output.LogOutput
}
type Query struct {
QueryString string
Start time.Time
End time.Time
Limit int
Step time.Duration
Interval time.Duration
Quiet bool
Direction logproto.Direction
}
// NewClient returns a new log API client.
func NewClient(address, token, orgID string, insecure bool) (*Client, error) {
out, err := StdOut("default")
if err != nil {
return nil, err
}
tls := config.TLSConfig{}
if insecure {
tls.InsecureSkipVerify = true
}
return &Client{
StdOut: out,
Client: &logclient.DefaultClient{
Address: address,
BearerToken: token,
OrgID: orgID,
TLSConfig: tls,
},
}, nil
}
// Mode translates the mode to lokis terminology.
func Mode(m string) string {
// the json output mode is called "jsonl" for some reason
if m == "json" {
return "jsonl"
}
return m
}
// StdOut sets up an stdout log output with the specified mode.
func StdOut(mode string) (output.LogOutput, error) {
out, err := output.NewLogOutput(os.Stdout, mode, &output.LogOutputOptions{
NoLabels: true, ColoredOutput: true, Timezone: time.Local,
})
if err != nil {
return nil, fmt.Errorf("unable to create log output: %s", err)
}
return out, nil
}
// QueryRange queries logs within a specific time range.
func (c *Client) QueryRange(ctx context.Context, out output.LogOutput, q Query) error {
resp, err := c.Client.QueryRange(q.QueryString, q.Limit, q.Start, q.End, q.Direction, q.Step, q.Interval, q.Quiet)
if err != nil {
return err
}
return printResult(resp.Data.Result, out)
}
// QueryRangeWithRetry queries logs within a specific time range with a retry
// in case of an error or not finding any logs.
func (c *Client) QueryRangeWithRetry(ctx context.Context, out output.LogOutput, q Query) error {
return retry.OnError(
wait.Backoff{
Steps: 5,
Duration: 200 * time.Millisecond,
Factor: 2.0,
Jitter: 0.1,
Cap: 10 * time.Second,
},
func(err error) bool {
// retry regardless of the error
return true
},
func() error {
resp, err := c.Client.QueryRange(q.QueryString, q.Limit, q.Start, q.End, q.Direction, q.Step, q.Interval, q.Quiet)
if err != nil {
return err
}
switch streams := resp.Data.Result.(type) {
case loghttp.Streams:
if len(streams) == 0 {
return fmt.Errorf("received no log streams")
}
}
return printResult(resp.Data.Result, out)
})
}
func printResult(value loghttp.ResultValue, out output.LogOutput) error {
switch value.Type() {
case logqlmodel.ValueTypeStreams:
printStream(value.(loghttp.Streams), out)
default:
return fmt.Errorf("unable to print unsupported type: %v", value.Type())
}
return nil
}
func printStream(streams loghttp.Streams, out output.LogOutput) {
allEntries := []loghttp.Entry{}
for _, s := range streams {
allEntries = append(allEntries, s.Entries...)
}
sort.Slice(allEntries, func(i, j int) bool { return allEntries[i].Timestamp.Before(allEntries[j].Timestamp) })
for _, e := range allEntries {
out.FormatAndPrintln(e.Timestamp, nil, 0, e.Line)
}
}
// TailQuery tails logs using the loki websocket endpoint.
// This has been adapted from https://github.com/grafana/loki/blob/v2.8.2/pkg/logcli/query/tail.go#L22
// as it directly prints out messages using builtin log, which we don't want.
func (c *Client) TailQuery(ctx context.Context, delayFor time.Duration, out output.LogOutput, q Query) error {
conn, err := c.LiveTailQueryConn(q.QueryString, delayFor, q.Limit, q.Start, q.Quiet)
if err != nil {
return fmt.Errorf("tailing logs failed: %w", err)
}
go func() {
<-ctx.Done()
// if sending the close message fails there's not much we can do.
// Printing the message would probably confuse the user more than
// anything.
_ = conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
}()
tailResponse := new(loghttp.TailResponse)
lastReceivedTimestamp := q.Start
for {
err := unmarshal.ReadTailResponseJSON(tailResponse, conn)
if err != nil {
// Check if the websocket connection closed unexpectedly. If so, retry.
// The connection might close unexpectedly if the querier handling the tail request
// in Loki stops running. The following error would be printed:
// "websocket: close 1006 (abnormal closure): unexpected EOF"
if websocket.IsCloseError(err, websocket.CloseAbnormalClosure) {
// Close previous connection. If it fails to close the connection it should be fine as it is already broken.
_ = conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
// Try to re-establish the connection up to 5 times.
backoff := backoff.New(context.Background(), backoff.Config{
MinBackoff: 1 * time.Second,
MaxBackoff: 10 * time.Second,
MaxRetries: 5,
})
for backoff.Ongoing() {
conn, err = c.LiveTailQueryConn(q.QueryString, delayFor, q.Limit, lastReceivedTimestamp, q.Quiet)
if err == nil {
break
}
backoff.Wait()
}
if err = backoff.Err(); err != nil {
return fmt.Errorf("error recreating tailing connection: %w", err)
}
continue
}
if websocket.IsCloseError(err, websocket.CloseNormalClosure) {
return nil
}
return fmt.Errorf("error reading stream: %w", err)
}
for _, stream := range tailResponse.Streams {
for _, entry := range stream.Entries {
out.FormatAndPrintln(entry.Timestamp, stream.Labels, 0, entry.Line)
lastReceivedTimestamp = entry.Timestamp
}
}
}
}