-
Notifications
You must be signed in to change notification settings - Fork 0
/
elasticsearch.go
108 lines (99 loc) · 2.72 KB
/
elasticsearch.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
package elastic
import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"io"
"net/http"
"time"
elasticsearch "github.com/elastic/go-elasticsearch/v7"
"github.com/elastic/go-elasticsearch/v7/esapi"
"github.com/pkg/errors"
"github.com/ranjit-git/proxify/pkg/types"
)
// Options contains necessary options required for elasticsearch communicaiton
type Options struct {
// Address for elasticsearch instance
Addr string `yaml:"addr"`
// SSL enables ssl for elasticsearch connection
SSL bool `yaml:"ssl"`
// SSLVerification disables SSL verification for elasticsearch
SSLVerification bool `yaml:"ssl-verification"`
// Username for the elasticsearch instance
Username string `yaml:"username"`
// Password is the password for elasticsearch instance
Password string `yaml:"password"`
// IndexName is the name of the elasticsearch index
IndexName string `yaml:"index-name"`
}
// Client type for elasticsearch
type Client struct {
index string
options *Options
esClient *elasticsearch.Client
}
// New creates and returns a new client for elasticsearch
func New(option *Options) (*Client, error) {
scheme := "http://"
if option.SSL {
scheme = "https://"
}
elasticsearchClient, err := elasticsearch.NewClient(elasticsearch.Config{
Addresses: []string{scheme + option.Addr},
Username: option.Username,
Password: option.Password,
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: option.SSLVerification,
},
},
})
if err != nil {
return nil, errors.Wrap(err, "error creating elasticsearch client")
}
client := &Client{
esClient: elasticsearchClient,
index: option.IndexName,
options: option,
}
return client, nil
}
// Store saves a passed log event in elasticsearch
func (c *Client) Save(data types.OutputData) error {
var doc map[string]interface{}
if data.Userdata.HasResponse {
doc = map[string]interface{}{
"response": data.DataString,
"timestamp": time.Now().Format(time.RFC3339),
}
} else {
doc = map[string]interface{}{
"request": data.DataString,
"timestamp": time.Now().Format(time.RFC3339),
}
}
body, err := json.Marshal(&map[string]interface{}{
"doc": doc,
"doc_as_upsert": true,
})
if err != nil {
return err
}
updateRequest := esapi.UpdateRequest{
Index: c.index,
DocumentID: data.Name,
Body: bytes.NewReader(body),
}
res, err := updateRequest.Do(context.Background(), c.esClient)
if err != nil || res == nil {
return errors.New("error thrown by elasticsearch: " + err.Error())
}
if res.StatusCode >= 300 {
return errors.New("elasticsearch responded with an error: " + string(res.String()))
}
// Drain response to reuse connection
_, er := io.Copy(io.Discard, res.Body)
res.Body.Close()
return er
}