-
-
Notifications
You must be signed in to change notification settings - Fork 8
/
instance.go
142 lines (115 loc) · 4.16 KB
/
instance.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
package instance
import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"
"strings"
"github.com/kobsio/kobs/pkg/api/middleware/roundtripper"
"github.com/sirupsen/logrus"
)
var (
log = logrus.WithFields(logrus.Fields{"package": "elasticsearch"})
)
// Config is the structure of the configuration for a single Elasticsearch instance.
type Config struct {
Name string `json:"name"`
DisplayName string `json:"displayName"`
Description string `json:"description"`
Address string `json:"address"`
Username string `json:"username"`
Password string `json:"password"`
Token string `json:"token"`
}
// Instance represents a single Elasticsearch instance, which can be added via the configuration file.
type Instance struct {
Name string
address string
client *http.Client
}
// GetLogs returns the raw log documents and the buckets for the distribution of the logs accross the selected time
// range. We have to pass a query, start and end time to the function. The scrollID can be an empty string to start a
// new query. If a scrollID is provided it will be used for pagination.
func (i *Instance) GetLogs(ctx context.Context, query, scrollID string, timeStart, timeEnd int64) (*Data, error) {
var err error
var body []byte
var url string
if scrollID == "" {
url = fmt.Sprintf("%s/_search?scroll=15m", i.address)
body = []byte(fmt.Sprintf(`{"size":100,"sort":[{"@timestamp":{"order":"desc"}}],"query":{"bool":{"must":[{"range":{"@timestamp":{"gte":"%d","lte":"%d"}}},{"query_string":{"query":"%s"}}]}},"aggs":{"logcount":{"auto_date_histogram":{"field":"@timestamp","buckets":30}}}}`, timeStart*1000, timeEnd*1000, strings.ReplaceAll(query, "\"", "\\\"")))
} else {
url = fmt.Sprintf("%s/_search/scroll", i.address)
body = []byte(`{"scroll" : "15m", "scroll_id" : "` + scrollID + `"}`)
}
log.WithFields(logrus.Fields{"query": string(body)}).Debugf("Run Elasticsearch query")
req, err := http.NewRequest(http.MethodPost, url, bytes.NewBuffer(body))
if err != nil {
return nil, err
}
req.Header.Add("Content-Type", "application/json")
resp, err := i.client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode >= 200 && resp.StatusCode < 300 {
var res Response
err = json.NewDecoder(resp.Body).Decode(&res)
if err != nil {
return nil, err
}
var buckets []Bucket
// When the response from the Elasticsearch API contains a list of buckets, we have to transform them into the
// format needed by our UI.
if len(res.Aggregations.LogCount.Buckets) > 0 {
var timeDiff = res.Aggregations.LogCount.Buckets[len(res.Aggregations.LogCount.Buckets)-1].Key - res.Aggregations.LogCount.Buckets[0].Key
for _, bucket := range res.Aggregations.LogCount.Buckets {
buckets = append(buckets, Bucket{
Time: formateTime(bucket.Key, timeDiff),
Documents: bucket.DocCount,
})
}
}
data := &Data{
ScrollID: res.ScrollID,
Took: res.Took,
Hits: res.Hits.Total.Value,
Documents: res.Hits.Hits,
Buckets: buckets,
}
log.WithFields(logrus.Fields{"scrollID": data.ScrollID, "took": data.Took, "hits": data.Hits, "documents": len(data.Documents), "buckets": len(data.Buckets)}).Debugf("Elasticsearch query results")
return data, nil
}
var res ResponseError
err = json.NewDecoder(resp.Body).Decode(&res)
if err != nil {
return nil, err
}
log.WithFields(logrus.Fields{"type": res.Error.Type, "reason": res.Error.Reason}).Error("The query returned an error.")
return nil, fmt.Errorf("%s: %s", res.Error.Type, res.Error.Reason)
}
// New returns a new Elasticsearch instance for the given configuration.
func New(config Config) (*Instance, error) {
roundTripper := roundtripper.DefaultRoundTripper
if config.Username != "" && config.Password != "" {
roundTripper = roundtripper.BasicAuthTransport{
Transport: roundTripper,
Username: config.Username,
Password: config.Password,
}
}
if config.Token != "" {
roundTripper = roundtripper.TokenAuthTransporter{
Transport: roundTripper,
Token: config.Token,
}
}
return &Instance{
Name: config.Name,
address: config.Address,
client: &http.Client{
Transport: roundTripper,
},
}, nil
}