This repository has been archived by the owner on Nov 5, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 34
/
storage_es.go
123 lines (108 loc) · 2.62 KB
/
storage_es.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
package tealogs
import (
"errors"
"fmt"
"github.com/TeaWeb/code/tealogs/accesslogs"
"github.com/TeaWeb/code/teautils"
"github.com/iwind/TeaGo/logs"
"github.com/pquerna/ffjson/ffjson"
"io/ioutil"
"net/http"
"regexp"
"strings"
"time"
)
// ElasticSearch存储策略
type ESStorage struct {
Storage `yaml:", inline"`
Endpoint string `yaml:"endpoint" json:"endpoint"`
Index string `yaml:"index" json:"index"`
MappingType string `yaml:"mappingType" json:"mappingType"`
}
// 开启
func (this *ESStorage) Start() error {
if len(this.Endpoint) == 0 {
return errors.New("'endpoint' should not be nil")
}
if !regexp.MustCompile(`(?i)^(http|https)://`).MatchString(this.Endpoint) {
this.Endpoint = "http://" + this.Endpoint
}
if len(this.Index) == 0 {
return errors.New("'index' should not be nil")
}
if len(this.MappingType) == 0 {
return errors.New("'mappingType' should not be nil")
}
return nil
}
// 写入日志
func (this *ESStorage) Write(accessLogs []*accesslogs.AccessLog) error {
if len(accessLogs) == 0 {
return nil
}
bulk := &strings.Builder{}
id := time.Now().UnixNano()
indexName := this.FormatVariables(this.Index)
typeName := this.FormatVariables(this.MappingType)
for _, accessLog := range accessLogs {
id++
opData, err := ffjson.Marshal(map[string]interface{}{
"index": map[string]interface{}{
"_index": indexName,
"_type": typeName,
"_id": fmt.Sprintf("%d", id),
},
})
if err != nil {
logs.Error(err)
continue
}
data, err := this.FormatAccessLogBytes(accessLog)
if err != nil {
logs.Error(err)
continue
}
if this.Format != StorageFormatJSON {
m := map[string]interface{}{
"log": teautils.BytesToString(data),
}
mData, err := ffjson.Marshal(m)
if err != nil {
logs.Error(err)
continue
}
bulk.Write(opData)
bulk.WriteString("\n")
bulk.Write(mData)
bulk.WriteString("\n")
} else {
bulk.Write(opData)
bulk.WriteString("\n")
bulk.Write(data)
bulk.WriteString("\n")
}
}
if bulk.Len() == 0 {
return nil
}
req, err := http.NewRequest(http.MethodPost, this.Endpoint+"/_bulk", strings.NewReader(bulk.String()))
if err != nil {
return err
}
client := teautils.SharedHttpClient(10 * time.Second)
defer req.Body.Close()
resp, err := client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
bodyData, _ := ioutil.ReadAll(resp.Body)
return errors.New("ElasticSearch response status code: " + fmt.Sprintf("%d", resp.StatusCode) + " content: " + string(bodyData))
}
return nil
}
// 关闭
func (this *ESStorage) Close() error {
return nil
}